Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1031,22 +1031,27 @@ private static Stream<Arguments> concurrentMixParameters() {
void concurrentMix(final boolean isSynthetic) throws IOException {
// We never touch the same record twice.
AtomicInteger step = new AtomicInteger(0);
AtomicInteger updates = new AtomicInteger(0);
AtomicInteger deletes = new AtomicInteger(0);
AtomicInteger saves = new AtomicInteger(0);
concurrentTestWithinTransaction(isSynthetic, (dataModel, recordStore) ->
RecordCursor.fromList(dataModel.recordsUnderTest())
.mapPipelined(record -> {
switch (step.incrementAndGet() % 3) {
case 0:
updates.incrementAndGet();
return record.updateOtherValue(recordStore);
case 1:
deletes.incrementAndGet();
return record.deleteRecord(recordStore);
default:
saves.incrementAndGet();
return dataModel.saveRecordAsync(true, recordStore, 1)
.thenAccept(vignore -> { });
}
}, 10)
.asList().join(),
// Note: this assertion only works because we are inserting an even multiple of 3 to begin with
Assertions::assertEquals);
(inserted, actual) -> assertEquals(inserted + saves.get() - deletes.get(), actual));
}

private void concurrentTestWithinTransaction(boolean isSynthetic,
Expand Down Expand Up @@ -1080,7 +1085,8 @@ private void concurrentTestWithinTransaction(boolean isSynthetic,
.build();

final int repartitionCount = 10;
final int loopCount = 50;
final int recordsPerIteration = 10;
final int loopCount = 40;

final RecordLayerPropertyStorage contextProps = RecordLayerPropertyStorage.newBuilder()
.addProp(LuceneRecordContextProperties.LUCENE_REPARTITION_DOCUMENT_COUNT, repartitionCount)
Expand All @@ -1096,33 +1102,25 @@ private void concurrentTestWithinTransaction(boolean isSynthetic,
"docMaxPerGroup", dataModel.groupingKeyToPrimaryKeyToPartitionKey.values().stream().mapToInt(Map::size).max()));

try (FDBRecordContext context = openContext(contextProps)) {
dataModel.saveRecords(10, context, 1);
dataModel.saveRecords(recordsPerIteration, context, 1);
commit(context);
}
explicitMergeIndex(dataModel.index, contextProps, dataModel.schemaSetup);
}

final Map<Tuple, Map<Tuple, Tuple>> initial = dataModel.groupingKeyToPrimaryKeyToPartitionKey.entrySet().stream().collect(Collectors.toMap(
Map.Entry::getKey,
entry -> Map.copyOf(entry.getValue())
));

dataModel.validate(() -> openContext(contextProps));

try (FDBRecordContext context = openContext(contextProps)) {
FDBRecordStore recordStore = Objects.requireNonNull(dataModel.schemaSetup.apply(context));
recordStore.getIndexDeferredMaintenanceControl().setAutoMergeDuringCommit(false);
assertThat(dataModel.recordsUnderTest(), Matchers.hasSize(Matchers.greaterThan(30)));
LOGGER.info("concurrentUpdate: Starting updates");
LOGGER.info("concurrentTestWithinTransaction: Starting applyChanges");
applyChangeConcurrently.accept(dataModel, recordStore);
LOGGER.info("concurrentTestWithinTransaction: Done applyChanges");
commit(context);
}

System.out.println("=== initial ===");
System.out.println(initial);
System.out.println("=== updated ===");
System.out.println(dataModel.groupingKeyToPrimaryKeyToPartitionKey);
assertDataModelCount.accept(500,
assertDataModelCount.accept(recordsPerIteration * loopCount,
dataModel.groupingKeyToPrimaryKeyToPartitionKey.values().stream().mapToInt(Map::size).sum());

dataModel.validate(() -> openContext(contextProps));
Expand All @@ -1140,7 +1138,7 @@ static Stream<Arguments> concurrentStoreTest() {
Arguments.of(random.nextBoolean(),
random.nextBoolean(),
random.nextBoolean(),
random.nextInt(30) + 3,
random.nextInt(20) + 3,
random.nextLong())));
}

Expand Down
Loading