Skip to content

Commit

Permalink
preserve maxPid in listNewestActiveSnapshotsByBatch aggregation
Browse files Browse the repository at this point in the history
in the rare case where all or most of the elements in an aggregation are pruned for being deleted it is not possible to preserve maxPid which in turn maces it impossible to iterate the whole collection.

Signed-off-by: Aleksandar Stanchev <aleksandar.stanchev@bosch.com>
  • Loading branch information
alstanchev committed Nov 27, 2023
1 parent 2d49083 commit 8b1bfdc
Showing 1 changed file with 11 additions and 8 deletions.
Expand Up @@ -28,6 +28,7 @@
import org.bson.BsonArray;
import org.bson.BsonDocument;
import org.bson.BsonInt32;
import org.bson.BsonNull;
import org.bson.BsonString;
import org.bson.Document;
import org.bson.conversions.Bson;
Expand Down Expand Up @@ -896,8 +897,18 @@ private static Source<SnapshotBatch, NotUsed> listNewestActiveSnapshotsByBatch(
// sort stage 2 -- order after group stage is not defined
pipeline.add(Aggregates.sort(Sorts.ascending(S_ID)));

// Separate $group Stage to Calculate maxPid as this is not possible after filtering out DELETED snapshots
final String maxPid = "m";
final String items = "i";
pipeline.add(Aggregates.group(
new Document("_id", new BsonNull()),
Accumulators.max(maxPid, "$"+ S_ID),
Accumulators.push(items,"$$ROOT")));

// redact stage - "$$PRUNE"s documents with "__lifecycle" = DELETED if includeDeleted=false
// if includeDeleted=true keeps them using "$$DESCEND"
// redacts operates recursively, so it evaluates all documents in items array which
// allows us to preserve maxPid even when all elements in the array are PRUNE-ed
pipeline.add(new Document().append("$redact", new Document()
.append("$cond", new Document()
.append("if",
Expand All @@ -906,14 +917,6 @@ private static Source<SnapshotBatch, NotUsed> listNewestActiveSnapshotsByBatch(
.append("else", includeDeleted ? "$$DESCEND" : "$$PRUNE")
)));

// group stage 2: group by max encountered pid, "push" all elements calculated in previous "redact"
final String maxPid = "m";
final String items = "i";
pipeline.add(Aggregates.group(null,
Accumulators.max(maxPid, "$" + S_ID),
Accumulators.push(items, "$$ROOT")
));

return Source.fromPublisher(snapshotStore.aggregate(pipeline)
.batchSize(batchSize) // use batchSize also for the cursor batchSize (16 by default bc of backpressure!)
)
Expand Down

0 comments on commit 8b1bfdc

Please sign in to comment.