Skip to content

Commit

Permalink
Reduce additional queries to number of batches
Browse files Browse the repository at this point in the history
* By reducing the additional queries by the number of batches
 (match filters now via an "in" for each batch instead of eq for each element)
  we also can guarantee that the ping command is sent only to open
  connections

Signed-off-by: Yannic Klem <yannic.klem@bosch.io>
  • Loading branch information
Yannic92 committed Aug 17, 2022
1 parent f347ac2 commit 3ec1e4f
Show file tree
Hide file tree
Showing 4 changed files with 15 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ private ConnectionIdsRetrievalActor(final MongoReadJournal readJournal,

taggedPidSourceFunction =
tag -> readJournal.getJournalPidsWithTag(tag, connectionIdsRetrievalConfig.getReadJournalBatchSize(),
Duration.ofSeconds(1), materializer, true);
Duration.ofSeconds(1), materializer);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -278,36 +278,29 @@ private Source<List<Document>, NotUsed> listLatestJournalEntries(final MongoColl
public Source<String, NotUsed> getJournalPidsWithTag(final String tag,
final int batchSize,
final Duration maxIdleTime,
final Materializer mat,
final boolean tagMustExistInLatestEntry) {
final Materializer mat) {

final int maxRestarts = computeMaxRestarts(maxIdleTime);
return getJournal().withAttributes(Attributes.inputBuffer(1, 1))
.flatMapConcat(journal ->
listPidsInJournal(journal, "", tag, batchSize, mat, maxRestarts)
.mapConcat(pids -> pids)
.flatMapConcat(pid -> {
if (tagMustExistInLatestEntry) {
return filterPidsThatDoesntContainTagInNewestEntry(journal, pid, tag);
} else {
return Source.single(pid);
}
}));
.flatMapConcat(journal -> listPidsInJournal(journal, "", tag, batchSize, mat, maxRestarts)
.mapConcat(pids -> pids)
.grouped(batchSize)
.flatMapConcat(pids -> filterPidsThatDoesntContainTagInNewestEntry(journal, pids, tag)));
}

private Source<String, NotUsed> filterPidsThatDoesntContainTagInNewestEntry(final MongoCollection<Document> journal,
final String pid, final String tag) {
final List<String> pids, final String tag) {
return Source.fromPublisher(journal.aggregate(List.of(
Aggregates.match(Filters.eq(J_EVENT_PID, pid)),
Aggregates.match(Filters.in(J_PROCESSOR_ID, pids)),
Aggregates.sort(Sorts.descending(J_TO)),
Aggregates.group(
"$pid",
toFirstJournalEntryFields(Set.of(J_EVENT_PID, J_TAGS))
"$" + J_PROCESSOR_ID,
toFirstJournalEntryFields(Set.of(J_PROCESSOR_ID, J_TAGS))
),
Aggregates.match(Filters.eq(J_TAGS, tag))
)))
.flatMapConcat(document -> {
final Object objectPid = document.get(J_EVENT_PID);
final Object objectPid = document.get(J_PROCESSOR_ID);
if (objectPid instanceof CharSequence) {
return Source.single(objectPid.toString());
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,7 @@ public void extractJournalPidsWithSpecificTag() {
insert("test_journal", new JournalEntry("pid4").withSn(2L).withTags(tagged).getDocument());

final List<String> pids =
readJournal.getJournalPidsWithTag("always-live", 2, Duration.ZERO, materializer, true)
readJournal.getJournalPidsWithTag("always-live", 2, Duration.ZERO, materializer)
.runWith(Sink.seq(), materializer)
.toCompletableFuture().join();

Expand Down Expand Up @@ -347,7 +347,7 @@ public void extractJournalPidsInOrderOfTagsOfNewestEventWhenNewestDoesNotContain
new JournalEntry("pid4").withSn(2L).withTags(Set.of()).getDocument());

final List<String> pids =
readJournal.getJournalPidsWithTag("test", 5, Duration.ZERO, materializer, true)
readJournal.getJournalPidsWithTag("test", 5, Duration.ZERO, materializer)
.runWith(Sink.seq(), materializer)
.toCompletableFuture().join();

Expand All @@ -368,7 +368,7 @@ public void extractJournalPidsInOrderOfTagsOfNewestEventWhenNewestDoesNotContain
new JournalEntry("pid4").withSn(2L).withTags(Set.of()).getDocument());

final List<String> pids =
readJournal.getJournalPidsWithTag("test", 2, Duration.ZERO, materializer, true)
readJournal.getJournalPidsWithTag("test", 2, Duration.ZERO, materializer)
.runWith(Sink.seq(), materializer)
.toCompletableFuture().join();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,8 +93,7 @@ private PersistencePingActor(final ActorRef persistenceActorShardRegion, final P
persistenceIdsSourceSupplier = () -> readJournal.getJournalPidsWithTag(pingConfig.getJournalTag(),
pingConfig.getReadJournalBatchSize(),
pingConfig.getInterval(),
materializer,
false);
materializer);
}
readJournal.ensureTagPidIndex().exceptionally(e -> {
log.error(e, "Failed to create TagPidIndex");
Expand Down

0 comments on commit 3ec1e4f

Please sign in to comment.