Skip to content

Commit

Permalink
rewrite MongoReadJournal aggregation for gathering "newest" active sn…
Browse files Browse the repository at this point in the history
…apshots

* conditionally removing "DELETED" snapshots using "$redact" instead of using "$$CURRENT" and "$setDifference"

Signed-off-by: Thomas Jaeckle <thomas.jaeckle@beyonnex.io>
  • Loading branch information
thjaeckle committed Jan 24, 2023
1 parent 04173fb commit 5864c06
Showing 1 changed file with 17 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.Set;
Expand Down Expand Up @@ -44,7 +43,6 @@
import com.mongodb.client.model.Accumulators;
import com.mongodb.client.model.Aggregates;
import com.mongodb.client.model.BsonField;
import com.mongodb.client.model.Collation;
import com.mongodb.client.model.Filters;
import com.mongodb.client.model.Projections;
import com.mongodb.client.model.Sorts;
Expand Down Expand Up @@ -744,32 +742,27 @@ private static Source<SnapshotBatch, NotUsed> listNewestActiveSnapshotsByBatch(
// sort stage 2 -- order after group stage is not defined
pipeline.add(Aggregates.sort(Sorts.ascending(S_ID)));

// group stage 2: filter out pids whose latest snapshot is a deleted snapshot, but retain max encountered pid
// ---- or ---- : include latest deleted snapshots
// redact stage - "$$PRUNE"s documents with "__lifecycle" = DELETED if includeDeleted=false
// if includeDeleted=true keeps them using "$$DESCEND"
pipeline.add(new Document().append("$redact", new Document()
.append("$cond", new Document()
.append("if",
new Document().append("$ne", Arrays.asList("$" + LIFECYCLE, "DELETED")))
.append("then", "$$DESCEND")
.append("else", includeDeleted ? "$$DESCEND" : "$$PRUNE")
)));

// group stage 2: group by max encountered pid, "push" all elements calculated in previous "redact"
final String maxPid = "m";
final String items = "i";
pipeline.add(Aggregates.group(null,
Accumulators.max(maxPid, "$" + S_ID),
Accumulators.push(
items,
new Document())));
// .append("$cond", new Document()
// .append("if",
// new Document().append("$ne", Arrays.asList("$" + LIFECYCLE, "DELETED")))
// .append("then", "$$CURRENT")
// .append("else", includeDeleted ? "$$CURRENT" : null)
// ))

// remove null entries by projection
// if (!includeDeleted) {
// pipeline.add(Aggregates.project(new Document()
// .append(maxPid, 1)
// .append(items, new Document()
// .append("$setDifference", Arrays.asList("$" + items, Collections.singletonList(null)))
// )
// ));
// pipeline.add(Aggregates.match(Filters.ne("type", )))
// }
Accumulators.push(items, new Document()
.append("_id", "$_id")
.append(S_SN, "$sn")
.append(LIFECYCLE, "$__lifecycle")
)
));

return Source.fromPublisher(snapshotStore.aggregate(pipeline)
.batchSize(batchSize) // use batchSize also for the cursor batchSize (16 by default bc of backpressure!)
Expand Down

0 comments on commit 5864c06

Please sign in to comment.