diff --git a/internal/utils/persistence/src/main/java/org/eclipse/ditto/internal/utils/persistence/mongo/streaming/MongoReadJournal.java b/internal/utils/persistence/src/main/java/org/eclipse/ditto/internal/utils/persistence/mongo/streaming/MongoReadJournal.java index bf5cd38d8d..35aa8ee5f5 100644 --- a/internal/utils/persistence/src/main/java/org/eclipse/ditto/internal/utils/persistence/mongo/streaming/MongoReadJournal.java +++ b/internal/utils/persistence/src/main/java/org/eclipse/ditto/internal/utils/persistence/mongo/streaming/MongoReadJournal.java @@ -28,6 +28,7 @@ import org.bson.BsonArray; import org.bson.BsonDocument; import org.bson.BsonInt32; +import org.bson.BsonNull; import org.bson.BsonString; import org.bson.Document; import org.bson.conversions.Bson; @@ -896,8 +897,18 @@ private static Source listNewestActiveSnapshotsByBatch( // sort stage 2 -- order after group stage is not defined pipeline.add(Aggregates.sort(Sorts.ascending(S_ID))); + // Separate $group Stage to Calculate maxPid as this is not possible after filtering out DELETED snapshots + final String maxPid = "m"; + final String items = "i"; + pipeline.add(Aggregates.group( + new Document("_id", new BsonNull()), + Accumulators.max(maxPid, "$"+ S_ID), + Accumulators.push(items,"$$ROOT"))); + // redact stage - "$$PRUNE"s documents with "__lifecycle" = DELETED if includeDeleted=false // if includeDeleted=true keeps them using "$$DESCEND" + // redacts operates recursively, so it evaluates all documents in items array which + // allows us to preserve maxPid even when all elements in the array are PRUNE-ed pipeline.add(new Document().append("$redact", new Document() .append("$cond", new Document() .append("if", @@ -906,14 +917,6 @@ private static Source listNewestActiveSnapshotsByBatch( .append("else", includeDeleted ? "$$DESCEND" : "$$PRUNE") ))); - // group stage 2: group by max encountered pid, "push" all elements calculated in previous "redact" - final String maxPid = "m"; - final String items = "i"; - pipeline.add(Aggregates.group(null, - Accumulators.max(maxPid, "$" + S_ID), - Accumulators.push(items, "$$ROOT") - )); - return Source.fromPublisher(snapshotStore.aggregate(pipeline) .batchSize(batchSize) // use batchSize also for the cursor batchSize (16 by default bc of backpressure!) )