Skip to content

Commit

Permalink
preserve maxPid in listNewestActiveSnapshotsByBatch aggregation
Browse files Browse the repository at this point in the history
in the rare case where all or most of the elements in an aggregation are pruned for being deleted it is not possible to preserve maxPid which in turn maces it impossible to iterate the whole collection.

Signed-off-by: Aleksandar Stanchev <aleksandar.stanchev@bosch.com>
  • Loading branch information
alstanchev authored and thjaeckle committed Nov 29, 2023
1 parent ec9df8b commit 3e3683c
Showing 1 changed file with 11 additions and 8 deletions.
Expand Up @@ -28,6 +28,7 @@
import org.bson.BsonArray;
import org.bson.BsonDocument;
import org.bson.BsonInt32;
import org.bson.BsonNull;
import org.bson.BsonString;
import org.bson.Document;
import org.bson.conversions.Bson;
Expand Down Expand Up @@ -896,8 +897,18 @@ private static Source<SnapshotBatch, NotUsed> listNewestActiveSnapshotsByBatch(
// sort stage 2 -- order after group stage is not defined
pipeline.add(Aggregates.sort(Sorts.ascending(S_ID)));

// Separate $group Stage to Calculate maxPid as this is not possible after filtering out DELETED snapshots
final String maxPid = "m";
final String items = "i";
pipeline.add(Aggregates.group(
new Document("_id", new BsonNull()),
Accumulators.max(maxPid, "$"+ S_ID),
Accumulators.push(items,"$$ROOT")));

// redact stage - "$$PRUNE"s documents with "__lifecycle" = DELETED if includeDeleted=false
// if includeDeleted=true keeps them using "$$DESCEND"
// redacts operates recursively, so it evaluates all documents in items array which
// allows us to preserve maxPid even when all elements in the array are PRUNE-ed
pipeline.add(new Document().append("$redact", new Document()
.append("$cond", new Document()
.append("if",
Expand All @@ -906,14 +917,6 @@ private static Source<SnapshotBatch, NotUsed> listNewestActiveSnapshotsByBatch(
.append("else", includeDeleted ? "$$DESCEND" : "$$PRUNE")
)));

// group stage 2: group by max encountered pid, "push" all elements calculated in previous "redact"
final String maxPid = "m";
final String items = "i";
pipeline.add(Aggregates.group(null,
Accumulators.max(maxPid, "$" + S_ID),
Accumulators.push(items, "$$ROOT")
));

return Source.fromPublisher(snapshotStore.aggregate(pipeline)
.batchSize(batchSize) // use batchSize also for the cursor batchSize (16 by default bc of backpressure!)
)
Expand Down

0 comments on commit 3e3683c

Please sign in to comment.