Skip to content

Commit

Permalink
use batchSize param in MongoReadJournal also for aggregation pipeline…
Browse files Browse the repository at this point in the history
… batchSize in order to reduce unneeded DB roundtrips

Signed-off-by: Thomas Jaeckle <thomas.jaeckle@bosch.io>
  • Loading branch information
thjaeckle committed Aug 29, 2022
1 parent 754a7e3 commit 3d4293d
Showing 1 changed file with 6 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -673,7 +673,9 @@ private static Source<Document, NotUsed> listLatestJournalEntries(final MongoCol
MongoReadJournal.MAX_BACK_OFF_DURATION, randomFactor)
.withMaxRestarts(maxRestarts, minBackOff);
return RestartSource.onFailuresWithBackoff(restartSettings, () ->
Source.fromPublisher(journal.aggregate(pipeline))
Source.fromPublisher(journal.aggregate(pipeline)
.batchSize(batchSize) // use batchSize also for the cursor batchSize (16 by default bc of backpressure!)
)
);
}

Expand Down Expand Up @@ -750,7 +752,9 @@ private static Source<SnapshotBatch, NotUsed> listNewestActiveSnapshotsByBatch(
));
}

return Source.fromPublisher(snapshotStore.aggregate(pipeline))
return Source.fromPublisher(snapshotStore.aggregate(pipeline)
.batchSize(batchSize) // use batchSize also for the cursor batchSize (16 by default bc of backpressure!)
)
.flatMapConcat(document -> {
final String theMaxPid = document.getString(maxPid);
if (theMaxPid == null) {
Expand Down

0 comments on commit 3d4293d

Please sign in to comment.