From 3e3683ce8d41aea826010156491dbf58c13a6d83 Mon Sep 17 00:00:00 2001 From: Aleksandar Stanchev Date: Mon, 27 Nov 2023 15:58:08 +0200 Subject: [PATCH] preserve maxPid in listNewestActiveSnapshotsByBatch aggregation in the rare case where all or most of the elements in an aggregation are pruned for being deleted it is not possible to preserve maxPid which in turn maces it impossible to iterate the whole collection. Signed-off-by: Aleksandar Stanchev --- .../mongo/streaming/MongoReadJournal.java | 19 +++++++++++-------- 1 file changed, 11 insertions(+), 8 deletions(-) diff --git a/internal/utils/persistence/src/main/java/org/eclipse/ditto/internal/utils/persistence/mongo/streaming/MongoReadJournal.java b/internal/utils/persistence/src/main/java/org/eclipse/ditto/internal/utils/persistence/mongo/streaming/MongoReadJournal.java index bf5cd38d8d..35aa8ee5f5 100644 --- a/internal/utils/persistence/src/main/java/org/eclipse/ditto/internal/utils/persistence/mongo/streaming/MongoReadJournal.java +++ b/internal/utils/persistence/src/main/java/org/eclipse/ditto/internal/utils/persistence/mongo/streaming/MongoReadJournal.java @@ -28,6 +28,7 @@ import org.bson.BsonArray; import org.bson.BsonDocument; import org.bson.BsonInt32; +import org.bson.BsonNull; import org.bson.BsonString; import org.bson.Document; import org.bson.conversions.Bson; @@ -896,8 +897,18 @@ private static Source listNewestActiveSnapshotsByBatch( // sort stage 2 -- order after group stage is not defined pipeline.add(Aggregates.sort(Sorts.ascending(S_ID))); + // Separate $group Stage to Calculate maxPid as this is not possible after filtering out DELETED snapshots + final String maxPid = "m"; + final String items = "i"; + pipeline.add(Aggregates.group( + new Document("_id", new BsonNull()), + Accumulators.max(maxPid, "$"+ S_ID), + Accumulators.push(items,"$$ROOT"))); + // redact stage - "$$PRUNE"s documents with "__lifecycle" = DELETED if includeDeleted=false // if includeDeleted=true keeps them using "$$DESCEND" + // redacts operates recursively, so it evaluates all documents in items array which + // allows us to preserve maxPid even when all elements in the array are PRUNE-ed pipeline.add(new Document().append("$redact", new Document() .append("$cond", new Document() .append("if", @@ -906,14 +917,6 @@ private static Source listNewestActiveSnapshotsByBatch( .append("else", includeDeleted ? "$$DESCEND" : "$$PRUNE") ))); - // group stage 2: group by max encountered pid, "push" all elements calculated in previous "redact" - final String maxPid = "m"; - final String items = "i"; - pipeline.add(Aggregates.group(null, - Accumulators.max(maxPid, "$" + S_ID), - Accumulators.push(items, "$$ROOT") - )); - return Source.fromPublisher(snapshotStore.aggregate(pipeline) .batchSize(batchSize) // use batchSize also for the cursor batchSize (16 by default bc of backpressure!) )