Skip to content

Commit

Permalink
Restore effectiveness of bulk size configuration.
Browse files Browse the repository at this point in the history
Signed-off-by: Yufei Cai <yufei.cai@bosch.io>
  • Loading branch information
yufei-cai committed Mar 20, 2022
1 parent d7d44e1 commit b6f9cfc
Show file tree
Hide file tree
Showing 6 changed files with 30 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -168,19 +168,26 @@ public <T> SubSource<List<AbstractWriteModel>, T> create(
final Source<Collection<Metadata>, T> source,
final int parallelismPerBulkShard,
final int bulkShardCount,
final int maxBulkSize,
final ActorSystem system) {

return source.flatMapConcat(changes -> Source.fromIterator(changes::iterator))
.groupBy(bulkShardCount, metadata -> Math.floorMod(metadata.getThingId().hashCode(), bulkShardCount))
.mapAsync(parallelismPerBulkShard, changedMetadata ->
retrieveThingFromCachingFacade(changedMetadata.getThingId(), changedMetadata)
.flatMapConcat(pair -> {
final JsonObject thing = pair.second();
searchUpdateObserver.process(changedMetadata, thing);
return computeWriteModel(changedMetadata, thing);
})
.runWith(Sink.seq(), system)
);
return source.flatMapConcat(changes -> Source.fromIterator(changes::iterator)
.groupBy(bulkShardCount, metadata -> Math.floorMod(metadata.getThingId().hashCode(), bulkShardCount))
.mapAsync(parallelismPerBulkShard, changedMetadata ->
retrieveThingFromCachingFacade(changedMetadata.getThingId(), changedMetadata)
.flatMapConcat(pair -> {
final JsonObject thing = pair.second();
searchUpdateObserver.process(changedMetadata, thing);
return computeWriteModel(changedMetadata, thing);
})
.runWith(Sink.seq(), system)
)
.mapConcat(models -> models)
.grouped(maxBulkSize)
.mergeSubstreams())
.filterNot(List::isEmpty)
.groupBy(bulkShardCount, models ->
Math.floorMod(models.get(0).getMetadata().getThingId().hashCode(), bulkShardCount));
}

private Source<Pair<ThingId, JsonObject>, NotUsed> retrieveThingFromCachingFacade(final ThingId thingId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,16 +96,13 @@ public static MongoSearchUpdaterFlow of(final MongoDatabase database,
* @param subSource sub-source of write models.
* @param shouldAcknowledge whether to use a write concern to guarantee the consistency of acknowledgements.
* {@link org.eclipse.ditto.base.model.acks.DittoAcknowledgementLabel#SEARCH_PERSISTED} was required or not.
* @param maxBulkSize How many writes to perform in one bulk.
* @return sub-source of write results.
*/
public SubSource<WriteResultAndErrors, NotUsed> start(
final SubSource<List<AbstractWriteModel>, NotUsed> subSource,
final boolean shouldAcknowledge,
final int maxBulkSize) {
final boolean shouldAcknowledge) {

return subSource.flatMapConcat(models -> Source.from(models).grouped(maxBulkSize))
.map(MongoSearchUpdaterFlow::sortBySeqNr)
return subSource.map(MongoSearchUpdaterFlow::sortBySeqNr)
.flatMapConcat(searchUpdateMapper::processWriteModels)
.flatMapConcat(writeModels -> executeBulkWrite(shouldAcknowledge, writeModels));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,16 +140,16 @@ private Source<SubSource<String, NotUsed>, NotUsed> createRestartResultSource()
acknowledgedSource.mergePrioritized(unacknowledgedSource, 1023, 1, true);

final SubSource<List<AbstractWriteModel>, NotUsed> enforcementSource = enforcementFlow.create(
mergedSource.via(filterMapKeysByBlockedNamespaces()),
retrievalConfig.getParallelism(),
persistenceConfig.getParallelism(),
actorSystem);
mergedSource.via(filterMapKeysByBlockedNamespaces()),
retrievalConfig.getParallelism(),
persistenceConfig.getParallelism(),
persistenceConfig.getMaxBulkSize(),
actorSystem);

final String logName = "SearchUpdaterStream/BulkWriteResult";
final SubSource<WriteResultAndErrors, NotUsed> persistenceSource = mongoSearchUpdaterFlow.start(
enforcementSource,
true,
persistenceConfig.getMaxBulkSize()
true
);
final SubSource<String, NotUsed> loggingSource =
persistenceSource.via(bulkWriteResultAckFlow.start(persistenceConfig.getAckDelay()))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ public Source<WriteResultAndErrors, NotUsed> write(final Thing thing,
final var source = Source.single(writeModel)
.groupBy(1, foo -> 0)
.grouped(1);
return mongoSearchUpdaterFlow.start(source, false, 1).mergeSubstreams();
return mongoSearchUpdaterFlow.start(source, false).mergeSubstreams();
}

/**
Expand Down Expand Up @@ -106,7 +106,7 @@ private Source<WriteResultAndErrors, NotUsed> delete(final Metadata metadata) {
final var source = Source.single(writeModel)
.groupBy(1, foo -> 0)
.grouped(1);
return mongoSearchUpdaterFlow.start(source, false, 1).mergeSubstreams();
return mongoSearchUpdaterFlow.start(source, false).mergeSubstreams();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -674,7 +674,7 @@ public TestActor.AutoPilot run(final ActorRef sender, final Object msg) {
private void materializeTestProbes(final EnforcementFlow enforcementFlow) {
final var source = TestSource.<Collection<Metadata>>probe(system);
final var sink = TestSink.<List<AbstractWriteModel>>probe(system);
final var runnableGraph = enforcementFlow.create(source, 16, 1, system)
final var runnableGraph = enforcementFlow.create(source, 16, 1, 1, system)
.mapConcat(x -> x)
.map(List::of)
.mergeSubstreams()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ public void testThroughput() throws Throwable {
.groupBy(parallelism, w -> Math.floorMod(w.getMetadata().getThingId().hashCode(), parallelism))
.map(List::of);
final var runnableGraph =
flow.start(writeModelSource, false, maxBulkSize)
flow.start(writeModelSource, false)
.map(writeResultAndErrors -> {
if (writeResultAndErrors.getBulkWriteErrors().isEmpty()) {
writeResultAndErrors.getWriteModels().forEach(writeModel -> {
Expand Down

0 comments on commit b6f9cfc

Please sign in to comment.