Skip to content

Commit

Permalink
re-enabled formerly disabled tests
Browse files Browse the repository at this point in the history
* got rid of "collation" -> "numericOrdering(true)" using another step in the aggregation
* append "$$ROOT" in snapshot listing to include all selected fields
* adjusted some "batchSizes" in unit tests

Signed-off-by: Thomas Jaeckle <thomas.jaeckle@beyonnex.io>
  • Loading branch information
thjaeckle committed Jan 24, 2023
1 parent 5864c06 commit b9f96ac
Show file tree
Hide file tree
Showing 2 changed files with 236 additions and 224 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -614,7 +614,7 @@ private static Source<String, NotUsed> listPidsInJournalOrderedByPriorityTag(
pipeline.add(Aggregates.group("$" + J_PROCESSOR_ID, Accumulators.last(J_TAGS, "$" + J_TAGS)));

// Filter irrelevant tags for priority ordering.
final BsonDocument arrayFilter = BsonDocument.parse(
pipeline.add(Aggregates.project(Projections.computed(J_TAGS, BsonDocument.parse(
"{\n" +
" $filter: {\n" +
" input: \"$" + J_TAGS + "\",\n" +
Expand All @@ -629,8 +629,25 @@ private static Source<String, NotUsed> listPidsInJournalOrderedByPriorityTag(
" }\n" +
" }\n" +
"}"
);
pipeline.add(Aggregates.project(Projections.computed(J_TAGS, arrayFilter)));
))));

// extract priority as "int" from relevant tags so that they can be compared numerically:
pipeline.add(Aggregates.project(Projections.computed(J_TAGS, BsonDocument.parse(
"{\n" +
" $map: {\n" +
" input: \"$" + J_TAGS + "\",\n" +
" as: \"tag\",\n" +
" in: {\n" +
" $convert: {\n" +
" input: {\n" +
" $substrCP: [\"$$tag\", " + PRIORITY_TAG_PREFIX.length() + ", { $strLenCP: \"$$tag\"}]\n" +
" },\n" +
" to: \"int\"\n" +
" }\n" +
" }\n" +
" }\n" +
"}\n"
))));

// sort stage 2 -- order after group stage is not defined
pipeline.add(Aggregates.sort(Sorts.orderBy(Sorts.descending(J_TAGS))));
Expand All @@ -643,7 +660,6 @@ private static Source<String, NotUsed> listPidsInJournalOrderedByPriorityTag(
.withMaxRestarts(maxRestarts, minBackOff);
return RestartSource.onFailuresWithBackoff(restartSettings, () ->
Source.fromPublisher(journal.aggregate(pipeline))
// .collation(Collation.builder().locale("en_US").numericOrdering(true).build()))
.flatMapConcat(document -> {
final Object pid = document.get(J_ID);
if (pid instanceof CharSequence) {
Expand Down Expand Up @@ -733,7 +749,7 @@ private static Source<SnapshotBatch, NotUsed> listNewestActiveSnapshotsByBatch(
// sort stage
pipeline.add(Aggregates.sort(Sorts.orderBy(Sorts.ascending(S_PROCESSOR_ID), Sorts.descending(S_SN))));

// limit stage. It should come before group stage or MongoDB would scan the entire journal collection.
// limit stage. It should come before group stage or MongoDB would scan the entire snapshot collection
pipeline.add(Aggregates.limit(batchSize));

// group stage 1: by PID. PID is from now on in field _id (S_ID)
Expand All @@ -757,11 +773,7 @@ private static Source<SnapshotBatch, NotUsed> listNewestActiveSnapshotsByBatch(
final String items = "i";
pipeline.add(Aggregates.group(null,
Accumulators.max(maxPid, "$" + S_ID),
Accumulators.push(items, new Document()
.append("_id", "$_id")
.append(S_SN, "$sn")
.append(LIFECYCLE, "$__lifecycle")
)
Accumulators.push(items, "$$ROOT")
));

return Source.fromPublisher(snapshotStore.aggregate(pipeline)
Expand Down

0 comments on commit b9f96ac

Please sign in to comment.