Skip to content

Commit

Permalink
remove not supported mongo features
Browse files Browse the repository at this point in the history
 - removed cursor.collation()\
 - removed $$CURRENT and $setDifference

Signed-off-by: Kalin Kostashki <kalin.kostashki@bosch.io>
  • Loading branch information
Kalin Kostashki committed Jan 18, 2023
1 parent 21f1b0d commit f6dd31f
Showing 1 changed file with 18 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -644,8 +644,8 @@ private static Source<String, NotUsed> listPidsInJournalOrderedByPriorityTag(
MongoReadJournal.MAX_BACK_OFF_DURATION, randomFactor)
.withMaxRestarts(maxRestarts, minBackOff);
return RestartSource.onFailuresWithBackoff(restartSettings, () ->
Source.fromPublisher(journal.aggregate(pipeline)
.collation(Collation.builder().locale("en_US").numericOrdering(true).build()))
Source.fromPublisher(journal.aggregate(pipeline))
// .collation(Collation.builder().locale("en_US").numericOrdering(true).build()))
.flatMapConcat(document -> {
final Object pid = document.get(J_ID);
if (pid instanceof CharSequence) {
Expand Down Expand Up @@ -752,23 +752,24 @@ private static Source<SnapshotBatch, NotUsed> listNewestActiveSnapshotsByBatch(
Accumulators.max(maxPid, "$" + S_ID),
Accumulators.push(
items,
new Document().append("$cond", new Document()
.append("if",
new Document().append("$ne", Arrays.asList("$" + LIFECYCLE, "DELETED")))
.append("then", "$$CURRENT")
.append("else", includeDeleted ? "$$CURRENT" : null)
))
));
new Document())));
// .append("$cond", new Document()
// .append("if",
// new Document().append("$ne", Arrays.asList("$" + LIFECYCLE, "DELETED")))
// .append("then", "$$CURRENT")
// .append("else", includeDeleted ? "$$CURRENT" : null)
// ))

// remove null entries by projection
if (!includeDeleted) {
pipeline.add(Aggregates.project(new Document()
.append(maxPid, 1)
.append(items, new Document()
.append("$setDifference", Arrays.asList("$" + items, Collections.singletonList(null)))
)
));
}
// if (!includeDeleted) {
// pipeline.add(Aggregates.project(new Document()
// .append(maxPid, 1)
// .append(items, new Document()
// .append("$setDifference", Arrays.asList("$" + items, Collections.singletonList(null)))
// )
// ));
// pipeline.add(Aggregates.match(Filters.ne("type", )))
// }

return Source.fromPublisher(snapshotStore.aggregate(pipeline)
.batchSize(batchSize) // use batchSize also for the cursor batchSize (16 by default bc of backpressure!)
Expand Down

0 comments on commit f6dd31f

Please sign in to comment.