Skip to content

Commit

Permalink
add tests for deletes and updates during sum index builds
Browse files Browse the repository at this point in the history
  • Loading branch information
alecgrieser committed Nov 10, 2022
1 parent 758e1ab commit de7b817
Show file tree
Hide file tree
Showing 5 changed files with 117 additions and 45 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,9 @@ abstract class OnlineIndexerBuildIndexTest extends OnlineIndexerTest {
}

@SuppressWarnings("deprecation")
void singleRebuild(@Nonnull List<TestRecords1Proto.MySimpleRecord> records, @Nullable List<TestRecords1Proto.MySimpleRecord> recordsWhileBuilding,
void singleRebuild(@Nonnull List<TestRecords1Proto.MySimpleRecord> records,
@Nullable List<TestRecords1Proto.MySimpleRecord> recordsWhileBuilding,
@Nullable List<Tuple> deleteWhileBuilding,
int agents, boolean overlap, boolean splitLongRecords,
@Nonnull Index index, @Nullable Index sourceIndex, @Nonnull Runnable beforeBuild, @Nonnull Runnable afterBuild, @Nonnull Runnable afterReadable) {
LOGGER.info(KeyValueLogMessage.of("beginning rebuild test",
Expand Down Expand Up @@ -259,7 +261,7 @@ void singleRebuild(@Nonnull List<TestRecords1Proto.MySimpleRecord> records, @Nul
fdb::mapAsyncToSyncException);
}

if (recordsWhileBuilding != null && recordsWhileBuilding.size() > 0) {
if (recordsWhileBuilding != null && !recordsWhileBuilding.isEmpty()) {
int i = 0;
while (i < recordsWhileBuilding.size()) {
List<TestRecords1Proto.MySimpleRecord> thisBatch = recordsWhileBuilding.subList(i, Math.min(i + 30, recordsWhileBuilding.size()));
Expand All @@ -272,6 +274,19 @@ void singleRebuild(@Nonnull List<TestRecords1Proto.MySimpleRecord> records, @Nul
}
}

if (deleteWhileBuilding != null && !deleteWhileBuilding.isEmpty()) {
int i = 0;
while (i < deleteWhileBuilding.size()) {
List<Tuple> thisBatch = deleteWhileBuilding.subList(i, Math.min(i + 10, deleteWhileBuilding.size()));
fdb.run(context -> {
FDBRecordStore store = recordStore.asBuilder().setContext(context).build();
thisBatch.forEach(store::deleteRecord);
return null;
});
i += 10;
}
}

buildFuture.join();

// if a record is added to a range that has already been built, it will not be counted, otherwise,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ public void run() {
}
};

singleRebuild(records, recordsWhileBuilding, agents, overlap, false, index, null, beforeBuild, afterBuild, afterReadable);
singleRebuild(records, recordsWhileBuilding, null, agents, overlap, false, index, null, beforeBuild, afterBuild, afterReadable);
}

private void rankRebuild(@Nonnull List<TestRecords1Proto.MySimpleRecord> records, @Nullable List<TestRecords1Proto.MySimpleRecord> recordsWhileBuilding) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import com.apple.foundationdb.record.metadata.IndexTypes;
import com.apple.foundationdb.record.metadata.MetaDataException;
import com.apple.foundationdb.record.provider.foundationdb.query.FDBRestrictedIndexQueryTest;
import com.apple.foundationdb.tuple.Tuple;
import com.apple.test.Tags;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
Expand Down Expand Up @@ -61,36 +62,33 @@ private OnlineIndexerBuildSumIndexTest(boolean safeBuild) {
super(safeBuild);
}

private void sumRebuild(@Nonnull List<TestRecords1Proto.MySimpleRecord> records, @Nullable List<TestRecords1Proto.MySimpleRecord> recordsWhileBuilding,
@Nullable Index sourceIndex, int agents, boolean overlap) {
@SuppressWarnings("try")
private void sumRebuild(@Nonnull List<TestRecords1Proto.MySimpleRecord> records,
@Nullable List<TestRecords1Proto.MySimpleRecord> recordsWhileBuilding,
@Nullable List<Long> deletedIds,
@Nullable Index sourceIndex,
int agents,
boolean overlap) {
Index index = new Index("newSumIndex", field("num_value_2").ungrouped(), IndexTypes.SUM);
IndexAggregateFunction aggregateFunction = new IndexAggregateFunction(FunctionNames.SUM, index.getRootExpression(), index.getName());

Runnable beforeBuild = new Runnable() {
@SuppressWarnings("try")
@Override
public void run() {
try (FDBRecordContext context = openContext()) {
metaData.getIndex(index.getName());
} catch (MetaDataException e) {
assertEquals("Index newSumIndex not defined", e.getMessage());
}
Runnable beforeBuild = () -> {
try (FDBRecordContext context = openContext()) {
metaData.getIndex(index.getName());
} catch (MetaDataException e) {
assertEquals("Index newSumIndex not defined", e.getMessage());
}
};

Runnable afterBuild = new Runnable() {
@SuppressWarnings("try")
@Override
public void run() {
metaData.getIndex(index.getName());
try (FDBRecordContext context = openContext()) {
FDBRestrictedIndexQueryTest.assertThrowsAggregateFunctionNotSupported(() ->
recordStore.evaluateAggregateFunction(Collections.singletonList("MySimpleRecord"),
aggregateFunction, TupleRange.ALL, IsolationLevel.SNAPSHOT),
"newSumIndex.sum(Field { 'num_value_2' None} group 1)");
} catch (Exception e) {
fail();
}
Runnable afterBuild = () -> {
metaData.getIndex(index.getName());
try (FDBRecordContext context = openContext()) {
FDBRestrictedIndexQueryTest.assertThrowsAggregateFunctionNotSupported(() ->
recordStore.evaluateAggregateFunction(Collections.singletonList("MySimpleRecord"),
aggregateFunction, TupleRange.ALL, IsolationLevel.SNAPSHOT),
"newSumIndex.sum(Field { 'num_value_2' None} group 1)");
} catch (Exception e) {
fail();
}
};

Expand All @@ -101,24 +99,33 @@ public void run() {
updatedRecords = updated(records, recordsWhileBuilding);
}

Runnable afterReadable = new Runnable() {
@SuppressWarnings("try")
@Override
public void run() {
try (FDBRecordContext context = openContext()) {
long sum = recordStore.evaluateAggregateFunction(Collections.singletonList("MySimpleRecord"), aggregateFunction, TupleRange.ALL, IsolationLevel.SNAPSHOT).join().getLong(0);
long expected = updatedRecords.stream().mapToInt(msg -> msg.hasNumValue2() ? msg.getNumValue2() : 0).sum();
assertEquals(expected, sum);
}
final List<Tuple> deletePrimaryKeys;
if (deletedIds != null && !deletedIds.isEmpty()) {
updatedRecords = updatedRecords.stream()
.filter(rec -> !deletedIds.contains(rec.getRecNo()))
.collect(Collectors.toList());
deletePrimaryKeys = deletedIds.stream()
.map(Tuple::from)
.collect(Collectors.toList());
} else {
deletePrimaryKeys = null;
}
long expected = updatedRecords.stream().mapToLong(TestRecords1Proto.MySimpleRecord::getNumValue2).sum();

Runnable afterReadable = () -> {
try (FDBRecordContext context = openContext()) {
long sum = recordStore.evaluateAggregateFunction(Collections.singletonList("MySimpleRecord"), aggregateFunction, TupleRange.ALL, IsolationLevel.SNAPSHOT).join().getLong(0);
assertEquals(expected, sum);
}
};

singleRebuild(records, recordsWhileBuilding, agents, overlap, false, index, sourceIndex, beforeBuild, afterBuild, afterReadable);
singleRebuild(records, recordsWhileBuilding, deletePrimaryKeys, agents, overlap, false, index, sourceIndex, beforeBuild, afterBuild, afterReadable);
}

private void sumRebuild(@Nonnull List<TestRecords1Proto.MySimpleRecord> records, @Nullable List<TestRecords1Proto.MySimpleRecord> recordsWhileBuilding,
private void sumRebuild(@Nonnull List<TestRecords1Proto.MySimpleRecord> records,
@Nullable List<TestRecords1Proto.MySimpleRecord> recordsWhileBuilding,
@Nullable Index sourceIndex) {
sumRebuild(records, recordsWhileBuilding, sourceIndex, 1, false);
sumRebuild(records, recordsWhileBuilding, null, sourceIndex, 1, false);
}

private void sumRebuild(@Nonnull List<TestRecords1Proto.MySimpleRecord> records, @Nullable List<TestRecords1Proto.MySimpleRecord> recordsWhileBuilding) {
Expand Down Expand Up @@ -166,7 +173,7 @@ public void oneHundredElementsParallelSum() {
List<TestRecords1Proto.MySimpleRecord> records = Stream.generate(() ->
TestRecords1Proto.MySimpleRecord.newBuilder().setRecNo(r.nextLong() / 2).setNumValue2(r.nextInt(10)).build()
).limit(100).sorted(Comparator.comparingLong(TestRecords1Proto.MySimpleRecord::getRecNo)).collect(Collectors.toList());
sumRebuild(records, null, null, 5, false);
sumRebuild(records, null, null, null, 5, false);
}

@Test
Expand All @@ -176,7 +183,7 @@ public void oneHundredElementsParallelOverlapSum() {
List<TestRecords1Proto.MySimpleRecord> records = Stream.generate(() ->
TestRecords1Proto.MySimpleRecord.newBuilder().setRecNo(r.nextLong() / 2).setNumValue2(r.nextInt(10)).build()
).limit(100).sorted(Comparator.comparingLong(TestRecords1Proto.MySimpleRecord::getRecNo)).collect(Collectors.toList());
sumRebuild(records, null, null, 5, true);
sumRebuild(records, null, null, null, 5, true);
}

@ParameterizedTest(name = "addWhileBuildingSum[sourceIndex={0}]")
Expand All @@ -203,7 +210,7 @@ public void addWhileBuildingParallelSum() {
List<TestRecords1Proto.MySimpleRecord> recordsWhileBuilding = Stream.generate(() ->
TestRecords1Proto.MySimpleRecord.newBuilder().setRecNo(r.nextLong() / 2).setNumValue2(r.nextInt(10)).build()
).limit(200).sorted(Comparator.comparingLong(TestRecords1Proto.MySimpleRecord::getRecNo)).collect(Collectors.toList());
sumRebuild(records, recordsWhileBuilding, null, 5, false);
sumRebuild(records, recordsWhileBuilding, null, null, 5, false);
}

@ParameterizedTest(name = "somePreloadedSum[sourceIndex={0}]")
Expand Down Expand Up @@ -257,9 +264,59 @@ public void addSequentialWhileBuildingParallelSum() {
List<TestRecords1Proto.MySimpleRecord> recordsWhileBuilding = Stream.generate(() ->
TestRecords1Proto.MySimpleRecord.newBuilder().setRecNo(r.nextInt(100)).setNumValue2(r.nextInt(20) + 20).build()
).limit(100).sorted(Comparator.comparingLong(TestRecords1Proto.MySimpleRecord::getRecNo)).collect(Collectors.toList());
sumRebuild(records, recordsWhileBuilding, null, 5, false);
sumRebuild(records, recordsWhileBuilding, null, null, 5, false);
}

@ParameterizedTest(name = "updateRecordsWhileBuildingSum[sourceIndex={0}]")
@MethodSource("sourceIndexes")
@Tag(Tags.Slow)
public void updateRecordsWhileBuildingSum(@Nullable Index sourceIndex) {
Random r = new Random();
List<TestRecords1Proto.MySimpleRecord> records = LongStream.range(0, 300).mapToObj(val ->
TestRecords1Proto.MySimpleRecord.newBuilder().setRecNo(r.nextLong()).setNumValue2(r.nextInt(20)).build()
).collect(Collectors.toList());
List<TestRecords1Proto.MySimpleRecord> recordsWhileBuilding = records.stream()
.filter(rec -> r.nextBoolean())
.map(rec -> rec.toBuilder().setNumValue2(r.nextInt(20)).build())
.collect(Collectors.toList());
sumRebuild(records, recordsWhileBuilding, sourceIndex);
}

@ParameterizedTest(name = "deleteRecordsWhileBuildingSum[sourceIndex={0}]")
@MethodSource("sourceIndexes")
@Tag(Tags.Slow)
public void deleteRecordsWhileBuildingSum(@Nullable Index sourceIndex) {
Random r = new Random();
List<TestRecords1Proto.MySimpleRecord> records = LongStream.range(0, 300).mapToObj(val ->
TestRecords1Proto.MySimpleRecord.newBuilder().setRecNo(r.nextLong()).setNumValue2(r.nextInt(50)).build()
).collect(Collectors.toList());
List<Long> toDelete = records.stream()
.filter(rec -> r.nextBoolean())
.map(TestRecords1Proto.MySimpleRecord::getRecNo)
.collect(Collectors.toList());
sumRebuild(records, null, toDelete, sourceIndex, 1, false);
}

@ParameterizedTest(name = "updateAndDeleteRecordsWhileBuildingSum[sourceIndex={0}]")
@MethodSource("sourceIndexes")
@Tag(Tags.Slow)
public void updateAndDeleteRecordsWhileBuildingSum(@Nullable Index sourceIndex) {
Random r = new Random();
List<TestRecords1Proto.MySimpleRecord> records = LongStream.range(0, 300).mapToObj(val ->
TestRecords1Proto.MySimpleRecord.newBuilder().setRecNo(r.nextLong()).setNumValue2(r.nextInt(50)).build()
).collect(Collectors.toList());
List<TestRecords1Proto.MySimpleRecord> recordsWhileBuilding = records.stream()
.filter(rec -> r.nextBoolean())
.map(rec -> rec.toBuilder().setNumValue2(r.nextInt(20)).build())
.collect(Collectors.toList());
List<Long> toDelete = records.stream()
.filter(rec -> r.nextBoolean())
.map(TestRecords1Proto.MySimpleRecord::getRecNo)
.collect(Collectors.toList());
sumRebuild(records, recordsWhileBuilding, toDelete, sourceIndex, 1, false);
}


/**
* Build indexes with the unchecked index build interfaces.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ public void run() {
}
};

singleRebuild(records, recordsWhileBuilding, agents, overlap, splitLongRecords, index, null, beforeBuild, afterBuild, afterReadable);
singleRebuild(records, recordsWhileBuilding, null, agents, overlap, splitLongRecords, index, null, beforeBuild, afterBuild, afterReadable);
}

private void valueRebuild(@Nonnull List<TestRecords1Proto.MySimpleRecord> records, @Nullable List<TestRecords1Proto.MySimpleRecord> recordsWhileBuilding,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ public void run() {
}
};

singleRebuild(records, recordsWhileBuilding, agents, overlap, false, index, null, beforeBuild, afterBuild, afterReadable);
singleRebuild(records, recordsWhileBuilding, null, agents, overlap, false, index, null, beforeBuild, afterBuild, afterReadable);
}

private void versionRebuild(@Nonnull List<TestRecords1Proto.MySimpleRecord> records, @Nullable List<TestRecords1Proto.MySimpleRecord> recordsWhileBuilding) {
Expand Down

0 comments on commit de7b817

Please sign in to comment.