Skip to content

Commit

Permalink
update names and documentation, and execute range checks in parallel
Browse files Browse the repository at this point in the history
  • Loading branch information
alecgrieser committed Nov 14, 2022
1 parent df488e7 commit 0cd43c9
Show file tree
Hide file tree
Showing 4 changed files with 36 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3492,9 +3492,17 @@ public CompletableFuture<IndexBuildState> getIndexBuildStateAsync(Index index) {
return IndexBuildState.loadIndexBuildStateAsync(this, index);
}

/**
* Load the indexing type stamp for an index. This stamp contains information about the kind of
* index build being used to construct a new index. This method is {@link API.Status#INTERNAL}.
*
* @param index the index being built
* @return the indexing type stamp for the index's current build
* @see #saveIndexingTypeStamp(Index, IndexBuildProto.IndexBuildIndexingStamp)
*/
@API(API.Status.INTERNAL)
@Nonnull
public CompletableFuture<IndexBuildProto.IndexBuildIndexingStamp> loadIndexBuildStampAsync(Index index) {
public CompletableFuture<IndexBuildProto.IndexBuildIndexingStamp> loadIndexingTypeStampAsync(Index index) {
byte[] stampKey = IndexingBase.indexBuildTypeSubspace(this, index).pack();
return ensureContextActive().get(stampKey).thenApply(serializedStamp -> {
if (serializedStamp == null) {
Expand All @@ -3512,8 +3520,17 @@ public CompletableFuture<IndexBuildProto.IndexBuildIndexingStamp> loadIndexBuild
});
}

/**
* Update the indexing type stamp for the given index. This is used by the {@link OnlineIndexer}
* to document what kind of indexing procedure is being used to build the given index. This method
* is {@link API.Status#INTERNAL}.
*
* @param index the index being built
* @param stamp the new value of the index's indexing type stamp
* @see #loadIndexingTypeStampAsync(Index)
*/
@API(API.Status.INTERNAL)
public void saveIndexBuildStamp(Index index, IndexBuildProto.IndexBuildIndexingStamp stamp) {
public void saveIndexingTypeStamp(Index index, IndexBuildProto.IndexBuildIndexingStamp stamp) {
byte[] stampKey = IndexingBase.indexBuildTypeSubspace(this, index).pack();
ensureContextActive().set(stampKey, stamp.toByteArray());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -409,10 +409,10 @@ private CompletableFuture<Void> setIndexingTypeOrThrow(FDBRecordStore store, boo
private CompletableFuture<Void> setIndexingTypeOrThrow(FDBRecordStore store, boolean continuedBuild, Index index, IndexBuildProto.IndexBuildIndexingStamp indexingTypeStamp) {
if (forceStampOverwrite && !continuedBuild) {
// Fresh session + overwrite = no questions asked
store.saveIndexBuildStamp(index, indexingTypeStamp);
store.saveIndexingTypeStamp(index, indexingTypeStamp);
return AsyncUtil.DONE;
}
return store.loadIndexBuildStampAsync(index)
return store.loadIndexingTypeStampAsync(index)
.thenCompose(savedStamp -> {
if (savedStamp == null) {
if (continuedBuild && indexingTypeStamp.getMethod() !=
Expand All @@ -422,7 +422,7 @@ private CompletableFuture<Void> setIndexingTypeOrThrow(FDBRecordStore store, boo
.thenCompose(noRecordScanned -> throwAsByRecordsUnlessNoRecordWasScanned(noRecordScanned, store, index, indexingTypeStamp));
}
// Here: either not a continuedBuild (new session), or a BY_RECORD session (allowed to overwrite the null stamp)
store.saveIndexBuildStamp(index, indexingTypeStamp);
store.saveIndexingTypeStamp(index, indexingTypeStamp);
return AsyncUtil.DONE;
}
// Here: has non-null type stamp
Expand All @@ -434,7 +434,7 @@ private CompletableFuture<Void> setIndexingTypeOrThrow(FDBRecordStore store, boo
indexingTypeStamp.getMethod() == IndexBuildProto.IndexBuildIndexingStamp.Method.BY_RECORDS &&
savedStamp.getMethod() == IndexBuildProto.IndexBuildIndexingStamp.Method.MULTI_TARGET_BY_RECORDS) {
// Special case: partly built with multi target, but may be continued indexing on its own
store.saveIndexBuildStamp(index, indexingTypeStamp);
store.saveIndexingTypeStamp(index, indexingTypeStamp);
return AsyncUtil.DONE;
}
if (forceStampOverwrite) { // and a continued Build
Expand Down Expand Up @@ -462,7 +462,7 @@ private CompletableFuture<Void> throwAsByRecordsUnlessNoRecordWasScanned(boolean
.addKeysAndValues(common.indexLogMessageKeyValues())
.toString());
}
store.saveIndexBuildStamp(index, indexingTypeStamp);
store.saveIndexingTypeStamp(index, indexingTypeStamp);
return AsyncUtil.DONE;
}
// Here: there is no type stamp, but indexing is ongoing. For backward compatibility reasons, we'll consider it a BY_RECORDS stamp
Expand All @@ -485,7 +485,7 @@ private CompletableFuture<Void> throwUnlessNoRecordWasScanned(boolean noRecordSc
// Ditto (a complicated way to reduce complexity)
if (noRecordScanned) {
// we can safely overwrite the previous type stamp
store.saveIndexBuildStamp(index, indexingTypeStamp);
store.saveIndexingTypeStamp(index, indexingTypeStamp);
return AsyncUtil.DONE;
}
// A force overwrite cannot be allowed when partly built
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2174,7 +2174,9 @@ protected Builder() {
* the index build will not be executed using the given source index unless the store's
* format version is at least {@link FDBRecordStore#CHECK_INDEX_BUILD_TYPE_DURING_UPDATE_FORMAT_VERSION},
* as concurrent updates to the index during such a build on older format versions can
* result in corrupting the index.
* result in corrupting the index. On older format versions, the indexer will throw an
* exception and the build may fall back to building the index by a records scan depending
* on the value given to {@link #setForbidRecordScan(boolean)}.
*
* @param sourceIndex an existing, readable, index.
* @return this builder
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,7 @@ public <M extends Message> CompletableFuture<Void> updateWhileWriteOnly(@Nullabl
// Idempotent indexes can just update the index data structures directly
return update(oldRecord, newRecord);
}
return state.store.loadIndexBuildStampAsync(state.index).thenCompose(stamp -> {
return state.store.loadIndexingTypeStampAsync(state.index).thenCompose(stamp -> {
if (stamp == null) {
// Either the index build has not started (in which case the record should not be
// indexed) or this is a by-records build that did not write the stamp, which can
Expand All @@ -282,7 +282,7 @@ public <M extends Message> CompletableFuture<Void> updateWhileWriteOnly(@Nullabl
}

private <M extends Message> CompletableFuture<Void> updateWriteOnlyByRecords(@Nullable final FDBIndexableRecord<M> oldRecord, @Nullable final FDBIndexableRecord<M> newRecord) {
// Check if the record has been built be checking its primary key in the range set. Update the index
// Check if the record has been built by checking its primary key in the range set. Update the index
// if it is a built range
Tuple primaryKey = oldRecord == null ? Verify.verifyNotNull(newRecord).getPrimaryKey() : oldRecord.getPrimaryKey();
return addedRangeWithKey(primaryKey).thenCompose(inRange ->
Expand All @@ -302,9 +302,13 @@ private <M extends Message> CompletableFuture<Void> updateWriteOnlyByIndex(@Nonn
} else {
// The old and new record use different keys in the source index. Check each one individually,
// and then simulate deleting the old record (if needed) and adding the new record (if needed)
return addedRangeWithKey(oldEntryKey)
// Note: index maintainers on the same index are not thread safe, so the index updates need to
// be serialized here, but the range set checks can be executed concurrently
CompletableFuture<Boolean> oldInRangeFuture = addedRangeWithKey(oldEntryKey);
CompletableFuture<Boolean> newInRangeFuture = addedRangeWithKey(newEntryKey);
return oldInRangeFuture
.thenCompose(oldInRange -> oldInRange ? update(oldRecord, null) : AsyncUtil.DONE)
.thenCompose(ignore -> addedRangeWithKey(newEntryKey))
.thenCompose(ignore -> newInRangeFuture)
.thenCompose(newInRange -> newInRange ? update(null, newRecord) : AsyncUtil.DONE);
}
} else {
Expand Down

0 comments on commit 0cd43c9

Please sign in to comment.