From 3d4293d0e90012e0e4ddcc9e8ed1d279b0954c48 Mon Sep 17 00:00:00 2001 From: Thomas Jaeckle Date: Mon, 29 Aug 2022 18:32:13 +0200 Subject: [PATCH] use batchSize param in MongoReadJournal also for aggregation pipeline batchSize in order to reduce unneeded DB roundtrips Signed-off-by: Thomas Jaeckle --- .../persistence/mongo/streaming/MongoReadJournal.java | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/internal/utils/persistence/src/main/java/org/eclipse/ditto/internal/utils/persistence/mongo/streaming/MongoReadJournal.java b/internal/utils/persistence/src/main/java/org/eclipse/ditto/internal/utils/persistence/mongo/streaming/MongoReadJournal.java index 5891af54d8..0b06eaffd5 100644 --- a/internal/utils/persistence/src/main/java/org/eclipse/ditto/internal/utils/persistence/mongo/streaming/MongoReadJournal.java +++ b/internal/utils/persistence/src/main/java/org/eclipse/ditto/internal/utils/persistence/mongo/streaming/MongoReadJournal.java @@ -673,7 +673,9 @@ private static Source listLatestJournalEntries(final MongoCol MongoReadJournal.MAX_BACK_OFF_DURATION, randomFactor) .withMaxRestarts(maxRestarts, minBackOff); return RestartSource.onFailuresWithBackoff(restartSettings, () -> - Source.fromPublisher(journal.aggregate(pipeline)) + Source.fromPublisher(journal.aggregate(pipeline) + .batchSize(batchSize) // use batchSize also for the cursor batchSize (16 by default bc of backpressure!) + ) ); } @@ -750,7 +752,9 @@ private static Source listNewestActiveSnapshotsByBatch( )); } - return Source.fromPublisher(snapshotStore.aggregate(pipeline)) + return Source.fromPublisher(snapshotStore.aggregate(pipeline) + .batchSize(batchSize) // use batchSize also for the cursor batchSize (16 by default bc of backpressure!) + ) .flatMapConcat(document -> { final String theMaxPid = document.getString(maxPid); if (theMaxPid == null) {