diff --git a/thingsearch/service/src/main/java/org/eclipse/ditto/thingsearch/service/persistence/write/streaming/EnforcementFlow.java b/thingsearch/service/src/main/java/org/eclipse/ditto/thingsearch/service/persistence/write/streaming/EnforcementFlow.java index ba78ffd71d..3aac2d4e6e 100644 --- a/thingsearch/service/src/main/java/org/eclipse/ditto/thingsearch/service/persistence/write/streaming/EnforcementFlow.java +++ b/thingsearch/service/src/main/java/org/eclipse/ditto/thingsearch/service/persistence/write/streaming/EnforcementFlow.java @@ -168,19 +168,26 @@ public SubSource, T> create( final Source, T> source, final int parallelismPerBulkShard, final int bulkShardCount, + final int maxBulkSize, final ActorSystem system) { - return source.flatMapConcat(changes -> Source.fromIterator(changes::iterator)) - .groupBy(bulkShardCount, metadata -> Math.floorMod(metadata.getThingId().hashCode(), bulkShardCount)) - .mapAsync(parallelismPerBulkShard, changedMetadata -> - retrieveThingFromCachingFacade(changedMetadata.getThingId(), changedMetadata) - .flatMapConcat(pair -> { - final JsonObject thing = pair.second(); - searchUpdateObserver.process(changedMetadata, thing); - return computeWriteModel(changedMetadata, thing); - }) - .runWith(Sink.seq(), system) - ); + return source.flatMapConcat(changes -> Source.fromIterator(changes::iterator) + .groupBy(bulkShardCount, metadata -> Math.floorMod(metadata.getThingId().hashCode(), bulkShardCount)) + .mapAsync(parallelismPerBulkShard, changedMetadata -> + retrieveThingFromCachingFacade(changedMetadata.getThingId(), changedMetadata) + .flatMapConcat(pair -> { + final JsonObject thing = pair.second(); + searchUpdateObserver.process(changedMetadata, thing); + return computeWriteModel(changedMetadata, thing); + }) + .runWith(Sink.seq(), system) + ) + .mapConcat(models -> models) + .grouped(maxBulkSize) + .mergeSubstreams()) + .filterNot(List::isEmpty) + .groupBy(bulkShardCount, models -> + Math.floorMod(models.get(0).getMetadata().getThingId().hashCode(), bulkShardCount)); } private Source, NotUsed> retrieveThingFromCachingFacade(final ThingId thingId, diff --git a/thingsearch/service/src/main/java/org/eclipse/ditto/thingsearch/service/persistence/write/streaming/MongoSearchUpdaterFlow.java b/thingsearch/service/src/main/java/org/eclipse/ditto/thingsearch/service/persistence/write/streaming/MongoSearchUpdaterFlow.java index 36acd73758..557b5034c9 100644 --- a/thingsearch/service/src/main/java/org/eclipse/ditto/thingsearch/service/persistence/write/streaming/MongoSearchUpdaterFlow.java +++ b/thingsearch/service/src/main/java/org/eclipse/ditto/thingsearch/service/persistence/write/streaming/MongoSearchUpdaterFlow.java @@ -96,16 +96,13 @@ public static MongoSearchUpdaterFlow of(final MongoDatabase database, * @param subSource sub-source of write models. * @param shouldAcknowledge whether to use a write concern to guarantee the consistency of acknowledgements. * {@link org.eclipse.ditto.base.model.acks.DittoAcknowledgementLabel#SEARCH_PERSISTED} was required or not. - * @param maxBulkSize How many writes to perform in one bulk. * @return sub-source of write results. */ public SubSource start( final SubSource, NotUsed> subSource, - final boolean shouldAcknowledge, - final int maxBulkSize) { + final boolean shouldAcknowledge) { - return subSource.flatMapConcat(models -> Source.from(models).grouped(maxBulkSize)) - .map(MongoSearchUpdaterFlow::sortBySeqNr) + return subSource.map(MongoSearchUpdaterFlow::sortBySeqNr) .flatMapConcat(searchUpdateMapper::processWriteModels) .flatMapConcat(writeModels -> executeBulkWrite(shouldAcknowledge, writeModels)); } diff --git a/thingsearch/service/src/main/java/org/eclipse/ditto/thingsearch/service/persistence/write/streaming/SearchUpdaterStream.java b/thingsearch/service/src/main/java/org/eclipse/ditto/thingsearch/service/persistence/write/streaming/SearchUpdaterStream.java index 073a96b6ed..2698e30880 100644 --- a/thingsearch/service/src/main/java/org/eclipse/ditto/thingsearch/service/persistence/write/streaming/SearchUpdaterStream.java +++ b/thingsearch/service/src/main/java/org/eclipse/ditto/thingsearch/service/persistence/write/streaming/SearchUpdaterStream.java @@ -140,16 +140,16 @@ private Source, NotUsed> createRestartResultSource() acknowledgedSource.mergePrioritized(unacknowledgedSource, 1023, 1, true); final SubSource, NotUsed> enforcementSource = enforcementFlow.create( - mergedSource.via(filterMapKeysByBlockedNamespaces()), - retrievalConfig.getParallelism(), - persistenceConfig.getParallelism(), - actorSystem); + mergedSource.via(filterMapKeysByBlockedNamespaces()), + retrievalConfig.getParallelism(), + persistenceConfig.getParallelism(), + persistenceConfig.getMaxBulkSize(), + actorSystem); final String logName = "SearchUpdaterStream/BulkWriteResult"; final SubSource persistenceSource = mongoSearchUpdaterFlow.start( enforcementSource, - true, - persistenceConfig.getMaxBulkSize() + true ); final SubSource loggingSource = persistenceSource.via(bulkWriteResultAckFlow.start(persistenceConfig.getAckDelay())) diff --git a/thingsearch/service/src/main/java/org/eclipse/ditto/thingsearch/service/persistence/write/streaming/TestSearchUpdaterStream.java b/thingsearch/service/src/main/java/org/eclipse/ditto/thingsearch/service/persistence/write/streaming/TestSearchUpdaterStream.java index a74242381c..32143ab660 100644 --- a/thingsearch/service/src/main/java/org/eclipse/ditto/thingsearch/service/persistence/write/streaming/TestSearchUpdaterStream.java +++ b/thingsearch/service/src/main/java/org/eclipse/ditto/thingsearch/service/persistence/write/streaming/TestSearchUpdaterStream.java @@ -78,7 +78,7 @@ public Source write(final Thing thing, final var source = Source.single(writeModel) .groupBy(1, foo -> 0) .grouped(1); - return mongoSearchUpdaterFlow.start(source, false, 1).mergeSubstreams(); + return mongoSearchUpdaterFlow.start(source, false).mergeSubstreams(); } /** @@ -106,7 +106,7 @@ private Source delete(final Metadata metadata) { final var source = Source.single(writeModel) .groupBy(1, foo -> 0) .grouped(1); - return mongoSearchUpdaterFlow.start(source, false, 1).mergeSubstreams(); + return mongoSearchUpdaterFlow.start(source, false).mergeSubstreams(); } } diff --git a/thingsearch/service/src/test/java/org/eclipse/ditto/thingsearch/service/persistence/write/streaming/EnforcementFlowTest.java b/thingsearch/service/src/test/java/org/eclipse/ditto/thingsearch/service/persistence/write/streaming/EnforcementFlowTest.java index d786c63578..81ed78d898 100644 --- a/thingsearch/service/src/test/java/org/eclipse/ditto/thingsearch/service/persistence/write/streaming/EnforcementFlowTest.java +++ b/thingsearch/service/src/test/java/org/eclipse/ditto/thingsearch/service/persistence/write/streaming/EnforcementFlowTest.java @@ -674,7 +674,7 @@ public TestActor.AutoPilot run(final ActorRef sender, final Object msg) { private void materializeTestProbes(final EnforcementFlow enforcementFlow) { final var source = TestSource.>probe(system); final var sink = TestSink.>probe(system); - final var runnableGraph = enforcementFlow.create(source, 16, 1, system) + final var runnableGraph = enforcementFlow.create(source, 16, 1, 1, system) .mapConcat(x -> x) .map(List::of) .mergeSubstreams() diff --git a/thingsearch/service/src/test/java/org/eclipse/ditto/thingsearch/service/persistence/write/streaming/MongoSearchUpdaterFlowTest.java b/thingsearch/service/src/test/java/org/eclipse/ditto/thingsearch/service/persistence/write/streaming/MongoSearchUpdaterFlowTest.java index 47a28c0aae..98be7e4b91 100644 --- a/thingsearch/service/src/test/java/org/eclipse/ditto/thingsearch/service/persistence/write/streaming/MongoSearchUpdaterFlowTest.java +++ b/thingsearch/service/src/test/java/org/eclipse/ditto/thingsearch/service/persistence/write/streaming/MongoSearchUpdaterFlowTest.java @@ -110,7 +110,7 @@ public void testThroughput() throws Throwable { .groupBy(parallelism, w -> Math.floorMod(w.getMetadata().getThingId().hashCode(), parallelism)) .map(List::of); final var runnableGraph = - flow.start(writeModelSource, false, maxBulkSize) + flow.start(writeModelSource, false) .map(writeResultAndErrors -> { if (writeResultAndErrors.getBulkWriteErrors().isEmpty()) { writeResultAndErrors.getWriteModels().forEach(writeModel -> {