Skip to content

Commit

Permalink
add a requirement for non-idempotent indexes-from-indexes that they u…
Browse files Browse the repository at this point in the history
…se a new store format version to prevent data corruption
  • Loading branch information
alecgrieser committed Nov 11, 2022
1 parent 5b54f9a commit df488e7
Show file tree
Hide file tree
Showing 5 changed files with 110 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -190,11 +190,13 @@ public class FDBRecordStore extends FDBStoreBase implements FDBRecordStoreBase<M
public static final int CACHEABLE_STATE_FORMAT_VERSION = 7;
// 8 - add custom fields to store header
public static final int HEADER_USER_FIELDS_FORMAT_VERSION = 8;

// 9 - add READABLE_UNIQUE_PENDING index state
public static final int READABLE_UNIQUE_PENDING_FORMAT_VERSION = 9;
// 10 - check index build type during update
public static final int CHECK_INDEX_BUILD_TYPE_DURING_UPDATE_FORMAT_VERSION = 10;

// The current code can read and write up to the format version below
public static final int MAX_SUPPORTED_FORMAT_VERSION = READABLE_UNIQUE_PENDING_FORMAT_VERSION;
public static final int MAX_SUPPORTED_FORMAT_VERSION = CHECK_INDEX_BUILD_TYPE_DURING_UPDATE_FORMAT_VERSION;

// By default, record stores attempt to upgrade to this version
// NOTE: Updating this can break certain users during upgrades.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,6 @@ List<Object> indexingLogMessageKeyValues() {
);
}


@Nonnull
private Index getSourceIndex(RecordMetaData metaData) {
if (policy.getSourceIndexSubspaceKey() != null) {
Expand Down Expand Up @@ -149,9 +148,8 @@ private CompletableFuture<Boolean> buildRangeOnly(@Nonnull FDBRecordStore store,
validateSameMetadataOrThrow(store);
final Index index = common.getIndex();
final IndexMaintainer maintainer = store.getIndexMaintainer(index);
validateIdempotenceIfNecessary(store, maintainer);

// idempotence - We could have verified it at the first iteration only, but the repeating checks seem harmless
// validateOrThrowEx(maintainer.isIdempotent(), "target index is not idempotent");
// readability - This method shouldn't block if one has already opened the record store (as we did)
Index srcIndex = getSourceIndex(store.getRecordMetaData());
validateOrThrowEx(store.isIndexScannable(srcIndex), "source index is not scannable");
Expand Down Expand Up @@ -195,7 +193,7 @@ private CompletableFuture<Boolean> buildRangeOnly(@Nonnull FDBRecordStore store,
@SuppressWarnings("unused")
private CompletableFuture<FDBStoredRecord<Message>> getRecordIfTypeMatch(FDBRecordStore store, @Nonnull RecordCursorResult<FDBIndexedRecord<Message>> cursorResult) {
FDBIndexedRecord<Message> indexResult = cursorResult.get();
FDBStoredRecord<Message> rec = indexResult == null ? null : indexResult.getStoredRecord();
FDBStoredRecord<Message> rec = indexResult == null ? null : indexResult.getStoredRecord();
return recordIfInIndexedTypes(rec);
}

Expand All @@ -218,20 +216,17 @@ CompletableFuture<Void> rebuildIndexInternalAsync(FDBRecordStore store) {
@Nonnull
@SuppressWarnings("PMD.CloseResource")
private CompletableFuture<Tuple> rebuildRangeOnly(@Nonnull FDBRecordStore store, Tuple cont, @Nonnull AtomicLong recordsScanned) {

validateSameMetadataOrThrow(store);
final Index index = common.getIndex();
final IndexMaintainer maintainer = store.getIndexMaintainer(index);

// idempotence - We could have verified it at the first iteration only, but the repeating checks seem harmless
validateOrThrowEx(maintainer.isIdempotent(), "target index is not idempotent");
validateIdempotenceIfNecessary(store, maintainer);

// readability - This method shouldn't block if one has already opened the record store (as we did)
final Index srcIndex = getSourceIndex(store.getRecordMetaData());
validateOrThrowEx(store.isIndexScannable(srcIndex), "source index is not scannable");

final ExecuteProperties.Builder executeProperties = ExecuteProperties.newBuilder()
.setIsolationLevel(IsolationLevel.SNAPSHOT);
.setIsolationLevel(maintainer.isIdempotent() ? IsolationLevel.SNAPSHOT : IsolationLevel.SERIALIZABLE);

final ScanProperties scanProperties = new ScanProperties(executeProperties.build());
final TupleRange tupleRange = TupleRange.between(cont, null);
Expand All @@ -242,12 +237,22 @@ private CompletableFuture<Tuple> rebuildRangeOnly(@Nonnull FDBRecordStore store,
final AtomicReference<RecordCursorResult<FDBIndexedRecord<Message>>> lastResult = new AtomicReference<>(RecordCursorResult.exhausted());
final AtomicBoolean hasMore = new AtomicBoolean(true);

final boolean isIdempotent = true ; // Note that currently indexing by index is online implemented for idempotent indexes
return iterateRangeOnly(store, cursor,
this::getRecordIfTypeMatch,
lastResult, hasMore, recordsScanned, isIdempotent
lastResult, hasMore, recordsScanned, maintainer.isIdempotent()
).thenApply(vignore -> hasMore.get() ?
lastResult.get().get().getIndexEntry().getKey() :
null );
}

private void validateIdempotenceIfNecessary(@Nonnull FDBRecordStore store, @Nonnull IndexMaintainer maintainer) {
// idempotence - Non-idempotent indexes can only be built from a source if the store format version supports it
// Prior to this format version, record updates to non-idempotent indexes would check to see if the record
// had been built already by looking for its primary key in the range set, which is incorrect for most source
// indexes. After this format version, we are assured that all updates will check the index type first and
// respond appropriately
if (store.getFormatVersion() < FDBRecordStore.CHECK_INDEX_BUILD_TYPE_DURING_UPDATE_FORMAT_VERSION) {
validateOrThrowEx(maintainer.isIdempotent(), "target index is not idempotent");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2168,8 +2168,13 @@ protected Builder() {

/**
* Use this source index to scan records for indexing.
* Some sanity checks will be performed, but it is the caller's responsibility to verify that this source-index
* covers <em>all</em> the relevant records for the target-index.
* Some sanity checks will be performed, but it is the caller's responsibility to verify that this
* source-index covers <em>all</em> the relevant records for the target-index. Also, note that
* if the {@linkplain OnlineIndexer.Builder#setIndex(Index) target index} is not idempotent,
* the index build will not be executed using the given source index unless the store's
* format version is at least {@link FDBRecordStore#CHECK_INDEX_BUILD_TYPE_DURING_UPDATE_FORMAT_VERSION},
* as concurrent updates to the index during such a build on older format versions can
* result in corrupting the index.
*
* @param sourceIndex an existing, readable, index.
* @return this builder
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,6 @@ public void testIndexFromIndexSimple() {

@Test
public void testIndexFromIndexContinuation() {

final FDBStoreTimer timer = new FDBStoreTimer();
final int numRecords = 107;
final int chunkSize = 17;
Expand Down Expand Up @@ -170,29 +169,64 @@ public void testIndexFromIndexContinuation() {
}
assertEquals(numRecords, timer.getCount(FDBStoreTimer.Counts.ONLINE_INDEX_BUILDER_RECORDS_SCANNED));
assertEquals(numRecords, timer.getCount(FDBStoreTimer.Counts.ONLINE_INDEX_BUILDER_RECORDS_INDEXED));
assertEquals(numChunks , timer.getCount(FDBStoreTimer.Counts.ONLINE_INDEX_BUILDER_RANGES_BY_COUNT));
assertEquals(numChunks, timer.getCount(FDBStoreTimer.Counts.ONLINE_INDEX_BUILDER_RANGES_BY_COUNT));
}

@Test
public void testIndexFromIndexFallback() {
// Let target index be a non-idempotent index
public void testNonIdempotentIndexFromIndex() {
this.formatVersion = Math.min(FDBRecordStore.CHECK_INDEX_BUILD_TYPE_DURING_UPDATE_FORMAT_VERSION, this.formatVersion);
final FDBStoreTimer timer = new FDBStoreTimer();
final long numRecords = 8;
final long otherRecords = 4;

Index srcIndex = new Index("src_index", field("num_value_2"), EmptyKeyExpression.EMPTY, IndexTypes.VALUE, IndexOptions.UNIQUE_OPTIONS);
Index tgtIndex = new Index("tgt_index", field("num_value_3_indexed").ungrouped(), IndexTypes.SUM);
FDBRecordStoreTestBase.RecordMetaDataHook hook = myHook(srcIndex, tgtIndex);

openSimpleMetaData();
populateData(numRecords, otherRecords);

openSimpleMetaData(hook);
buildSrcIndex(srcIndex);

openSimpleMetaData(hook);
try (OnlineIndexer indexBuilder = newIndexerBuilder()
.addTargetIndex(tgtIndex)
.setIndexingPolicy(OnlineIndexer.IndexingPolicy.newBuilder()
.setSourceIndex("src_index")
.build())
.setTimer(timer)
.build()) {

indexBuilder.buildIndex(true);
}
assertEquals(numRecords, timer.getCount(FDBStoreTimer.Counts.ONLINE_INDEX_BUILDER_RECORDS_SCANNED));
assertEquals(numRecords, timer.getCount(FDBStoreTimer.Counts.ONLINE_INDEX_BUILDER_RECORDS_INDEXED));
}

@Test
public void testCanBuildNonIdempotentIndexFromIndexOnNewStoreWithOldFormatVersionInIndexer() {
this.formatVersion = Math.min(FDBRecordStore.CHECK_INDEX_BUILD_TYPE_DURING_UPDATE_FORMAT_VERSION, this.formatVersion);
final FDBStoreTimer timer = new FDBStoreTimer();
final long numRecords = 6;
final long numRecords = 8;
final long otherRecords = 4;

Index srcIndex = new Index("src_index", field("num_value_2"), EmptyKeyExpression.EMPTY, IndexTypes.VALUE, IndexOptions.UNIQUE_OPTIONS);
Index tgtIndex = new Index("tgt_index", field("num_value_3_indexed").ungrouped(), IndexTypes.SUM);
FDBRecordStoreTestBase.RecordMetaDataHook hook = myHook(srcIndex, tgtIndex);

openSimpleMetaData();
populateData(numRecords);
populateData(numRecords, otherRecords);

openSimpleMetaData(hook);
buildSrcIndex(srcIndex);

openSimpleMetaData(hook);
try (OnlineIndexer indexBuilder = newIndexerBuilder()
.addTargetIndex(tgtIndex)
// Set a format version on the store that does not allow index-from-index builds
// Because the store already has this format version, though it should be allowed
.setFormatVersion(FDBRecordStore.CHECK_INDEX_BUILD_TYPE_DURING_UPDATE_FORMAT_VERSION - 1)
.setIndexingPolicy(OnlineIndexer.IndexingPolicy.newBuilder()
.setSourceIndex("src_index")
.build())
Expand All @@ -206,18 +240,53 @@ public void testIndexFromIndexFallback() {
}

@Test
public void testIndexFromIndexNoFallback() {
// Let target index be a non-idempotent index
public void testNonIdempotentIndexFromIndexOldFormatFallback() {
// Attempt to build a non-idempotent index at an older format version. This should fall back to a full record scan
this.formatVersion = FDBRecordStore.CHECK_INDEX_BUILD_TYPE_DURING_UPDATE_FORMAT_VERSION - 1;
final FDBStoreTimer timer = new FDBStoreTimer();
final long numRecords = 6;
final long otherRecords = 5;

Index srcIndex = new Index("src_index", field("num_value_2"), EmptyKeyExpression.EMPTY, IndexTypes.VALUE, IndexOptions.UNIQUE_OPTIONS);
Index tgtIndex = new Index("tgt_index", field("num_value_3_indexed").ungrouped(), IndexTypes.SUM);
FDBRecordStoreTestBase.RecordMetaDataHook hook = myHook(srcIndex, tgtIndex);

openSimpleMetaData();
populateData(numRecords, otherRecords);

openSimpleMetaData(hook);
buildSrcIndex(srcIndex);

openSimpleMetaData(hook);
try (OnlineIndexer indexBuilder = newIndexerBuilder()
.addTargetIndex(tgtIndex)
.setIndexingPolicy(OnlineIndexer.IndexingPolicy.newBuilder()
.setSourceIndex("src_index")
.build())
.setTimer(timer)
.build()) {

indexBuilder.buildIndex(true);
}
assertEquals(numRecords + otherRecords, timer.getCount(FDBStoreTimer.Counts.ONLINE_INDEX_BUILDER_RECORDS_SCANNED));
assertEquals(numRecords, timer.getCount(FDBStoreTimer.Counts.ONLINE_INDEX_BUILDER_RECORDS_INDEXED));
}

@Test
public void testNonIdempotentIndexFromIndexOldFormatNoFallback() {
// Attempt to build a non-idempotent index at old format version where this is not supported. This should
// error as falling back to a record scan is not enabled
this.formatVersion = FDBRecordStore.CHECK_INDEX_BUILD_TYPE_DURING_UPDATE_FORMAT_VERSION - 1;
final FDBStoreTimer timer = new FDBStoreTimer();
final long numRecords = 7;
final long otherRecords = 8;

Index srcIndex = new Index("src_index", field("num_value_2"), EmptyKeyExpression.EMPTY, IndexTypes.VALUE, IndexOptions.UNIQUE_OPTIONS);
Index tgtIndex = new Index("tgt_index", field("num_value_3_indexed").ungrouped(), IndexTypes.SUM);
FDBRecordStoreTestBase.RecordMetaDataHook hook = myHook(srcIndex, tgtIndex);

openSimpleMetaData();
populateData(numRecords);
populateData(numRecords, otherRecords);

openSimpleMetaData(hook);
buildSrcIndex(srcIndex);
Expand All @@ -232,10 +301,8 @@ public void testIndexFromIndexNoFallback() {
.setTimer(timer)
.build()) {

indexBuilder.buildIndex(true);
assertThrows(IndexingByIndex.ValidationException.class, indexBuilder::buildIndex);
}
assertEquals(numRecords, timer.getCount(FDBStoreTimer.Counts.ONLINE_INDEX_BUILDER_RECORDS_SCANNED));
assertEquals(numRecords, timer.getCount(FDBStoreTimer.Counts.ONLINE_INDEX_BUILDER_RECORDS_INDEXED));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ public abstract class OnlineIndexerTest extends FDBTestBase {
FDBDatabase fdb;
Subspace subspace;
private IndexMaintenanceFilter indexMaintenanceFilter;
int formatVersion = FDBRecordStore.MAX_SUPPORTED_FORMAT_VERSION;

private static long oldMaxDelayMillis;
private static long oldInitialDelayMillis;
Expand Down Expand Up @@ -141,7 +142,7 @@ OnlineIndexer.Builder newIndexerBuilder() {
.setMetaData(metaData)
.setSubspace(subspace)
.setIndexMaintenanceFilter(getIndexMaintenanceFilter())
.setFormatVersion(FDBRecordStore.MAX_SUPPORTED_FORMAT_VERSION);
.setFormatVersion(formatVersion);
}

OnlineIndexScrubber.Builder newScrubberBuilder() {
Expand All @@ -150,15 +151,15 @@ OnlineIndexScrubber.Builder newScrubberBuilder() {
.setMetaData(metaData)
.setSubspace(subspace)
.setIndexMaintenanceFilter(getIndexMaintenanceFilter())
.setFormatVersion(FDBRecordStore.MAX_SUPPORTED_FORMAT_VERSION);
.setFormatVersion(formatVersion);
}

FDBRecordContext openContext(boolean checked) {
FDBRecordContext context = fdb.openContext();
FDBRecordStore.Builder builder = FDBRecordStore.newBuilder()
.setMetaDataProvider(metaData)
.setContext(context)
.setFormatVersion(FDBRecordStore.MAX_SUPPORTED_FORMAT_VERSION)
.setFormatVersion(formatVersion)
.setSubspace(subspace)
.setIndexMaintenanceFilter(getIndexMaintenanceFilter());
if (checked) {
Expand Down

0 comments on commit df488e7

Please sign in to comment.