diff --git a/internal/utils/persistence/src/main/java/org/eclipse/ditto/internal/utils/persistence/mongo/streaming/MongoReadJournal.java b/internal/utils/persistence/src/main/java/org/eclipse/ditto/internal/utils/persistence/mongo/streaming/MongoReadJournal.java index 5891af54d8..0b06eaffd5 100644 --- a/internal/utils/persistence/src/main/java/org/eclipse/ditto/internal/utils/persistence/mongo/streaming/MongoReadJournal.java +++ b/internal/utils/persistence/src/main/java/org/eclipse/ditto/internal/utils/persistence/mongo/streaming/MongoReadJournal.java @@ -673,7 +673,9 @@ private static Source listLatestJournalEntries(final MongoCol MongoReadJournal.MAX_BACK_OFF_DURATION, randomFactor) .withMaxRestarts(maxRestarts, minBackOff); return RestartSource.onFailuresWithBackoff(restartSettings, () -> - Source.fromPublisher(journal.aggregate(pipeline)) + Source.fromPublisher(journal.aggregate(pipeline) + .batchSize(batchSize) // use batchSize also for the cursor batchSize (16 by default bc of backpressure!) + ) ); } @@ -750,7 +752,9 @@ private static Source listNewestActiveSnapshotsByBatch( )); } - return Source.fromPublisher(snapshotStore.aggregate(pipeline)) + return Source.fromPublisher(snapshotStore.aggregate(pipeline) + .batchSize(batchSize) // use batchSize also for the cursor batchSize (16 by default bc of backpressure!) + ) .flatMapConcat(document -> { final String theMaxPid = document.getString(maxPid); if (theMaxPid == null) {