Skip to content

Commit

Permalink
Merge pull request #1022 from bosch-io/feature/improve-connection-pri…
Browse files Browse the repository at this point in the history
…orization

Improves the priority ordering in MongoReadJournal
  • Loading branch information
thjaeckle committed Apr 8, 2021
2 parents 2be7257 + 9128779 commit 5ad09e9
Show file tree
Hide file tree
Showing 4 changed files with 105 additions and 26 deletions.
Expand Up @@ -77,6 +77,7 @@
import org.eclipse.ditto.services.utils.akka.logging.DittoLoggerFactory;
import org.eclipse.ditto.services.utils.config.InstanceIdentifierSupplier;
import org.eclipse.ditto.services.utils.persistence.mongo.config.ActivityCheckConfig;
import org.eclipse.ditto.services.utils.persistence.mongo.streaming.MongoReadJournal;
import org.eclipse.ditto.services.utils.persistentactors.AbstractShardedPersistenceActor;
import org.eclipse.ditto.services.utils.persistentactors.EmptyEvent;
import org.eclipse.ditto.services.utils.persistentactors.commands.CommandStrategy;
Expand Down Expand Up @@ -405,7 +406,7 @@ protected ConnectivityEvent<?> modifyEventBeforePersist(final ConnectivityEvent<

private Set<String> journalTags() {
return Set.of(JOURNAL_TAG_ALWAYS_ALIVE,
JOURNAL_TAG_ALWAYS_ALIVE + "-" + Optional.ofNullable(priority).orElse(0));
MongoReadJournal.PRIORITY_TAG_PREFIX + Optional.ofNullable(priority).orElse(0));
}

@Override
Expand Down
Expand Up @@ -23,6 +23,7 @@
import java.util.stream.Collectors;
import java.util.stream.Stream;

import org.bson.BsonDocument;
import org.bson.Document;
import org.bson.conversions.Bson;
import org.eclipse.ditto.services.utils.config.DefaultScopedConfig;
Expand All @@ -40,6 +41,7 @@
import com.mongodb.client.model.BsonField;
import com.mongodb.client.model.Collation;
import com.mongodb.client.model.Filters;
import com.mongodb.client.model.Projections;
import com.mongodb.client.model.Sorts;
import com.mongodb.reactivestreams.client.MongoCollection;
import com.typesafe.config.Config;
Expand Down Expand Up @@ -81,6 +83,13 @@ public class MongoReadJournal {
*/
public static final String J_ID = JournallingFieldNames$.MODULE$.ID();

/**
* Prefix of the priority tag which is used in
* {@link #getJournalPidsWithTagOrderedByPriorityTag(String, java.time.Duration)}
* for sorting/ordering by.
*/
public static final String PRIORITY_TAG_PREFIX = "priority-";

private static final String AKKA_PERSISTENCE_JOURNAL_AUTO_START =
"akka.persistence.journal.auto-start-journals";
private static final String AKKA_PERSISTENCE_SNAPS_AUTO_START =
Expand All @@ -91,7 +100,6 @@ public class MongoReadJournal {

private static final String J_PROCESSOR_ID = JournallingFieldNames$.MODULE$.PROCESSOR_ID();
private static final String J_TAGS = JournallingFieldNames$.MODULE$.TAGS();
private static final String J_TO = JournallingFieldNames$.MODULE$.TO();
private static final String S_SN = SnapshottingFieldNames$.MODULE$.SEQUENCE_NUMBER();

// Not working: SnapshottingFieldNames.V2$.MODULE$.SERIALIZED()
Expand Down Expand Up @@ -206,21 +214,23 @@ public Source<String, NotUsed> getJournalPidsWithTag(final String tag, final int

/**
* Retrieve all unique PIDs in journals selected by a provided {@code tag}.
* The PIDs are ordered based on the tags of the events.
* The PIDs are ordered based on the {@link #PRIORITY_TAG_PREFIX} tags of the events: Descending by the appended
* priority (an integer value).
*
* @param tag the Tag name the journal entries have to contain in order to be selected, or an empty string to select
* all journal entries.
* @param maxIdleTime how long the stream is allowed to idle without sending any element. Bounds the number of
* retries with exponential back-off.
* @return Source of all persistence IDs such that each element contains the persistence IDs in events that do not
* occur in prior buckets.
* @return Source of all persistence IDs tagged with the provided {@code tag}, sorted ascending by the value of an
* additional {@link #PRIORITY_TAG_PREFIX} tag.
*/
public Source<String, NotUsed> getJournalPidsWithTagOrderedByTags(final String tag, final Duration maxIdleTime) {
public Source<String, NotUsed> getJournalPidsWithTagOrderedByPriorityTag(final String tag,
final Duration maxIdleTime) {

final int maxRestarts = computeMaxRestarts(maxIdleTime);
return getJournal().withAttributes(Attributes.inputBuffer(1, 1))
.flatMapConcat(journal ->
listPidsInJournalOrderedByTags(journal, tag, MAX_BACK_OFF_DURATION, maxRestarts)
listPidsInJournalOrderedByPriorityTag(journal, tag, MAX_BACK_OFF_DURATION, maxRestarts)
);
}

Expand Down Expand Up @@ -334,7 +344,7 @@ private <T> Source<List<T>, NotUsed> unfoldBatchedSource(
.withAttributes(Attributes.inputBuffer(1, 1));
}

private Source<String, NotUsed> listPidsInJournalOrderedByTags(final MongoCollection<Document> journal,
private Source<String, NotUsed> listPidsInJournalOrderedByPriorityTag(final MongoCollection<Document> journal,
final String tag, final Duration maxBackOff, final int maxRestarts) {

final List<Bson> pipeline = new ArrayList<>(4);
Expand All @@ -343,14 +353,28 @@ private Source<String, NotUsed> listPidsInJournalOrderedByTags(final MongoCollec
pipeline.add(Aggregates.match(Filters.eq(J_TAGS, tag)));
}

// sort stage
// note that there is no index on this combination "pid" -> 1, "to" -> 1
// so if this query ever gets too slow, this could be a potential optimization
pipeline.add(Aggregates.sort(Sorts.orderBy(Sorts.ascending(J_PROCESSOR_ID), Sorts.ascending(J_TO))));

// group stage
// group stage. We can assume that the $last element ist also new latest event because of the insert order.
pipeline.add(Aggregates.group("$" + J_PROCESSOR_ID, Accumulators.last(J_TAGS, "$" + J_TAGS)));

// Filter irrelevant tags for priority ordering.
final BsonDocument arrayFilter = BsonDocument.parse(
"{\n" +
" $filter: {\n" +
" input: \"$" + J_TAGS + "\",\n" +
" as: \"tags\",\n" +
" cond: {\n" +
" $eq: [\n" +
" {\n" +
" $substrCP: [\"$$tags\", 0, " + PRIORITY_TAG_PREFIX.length() + "]\n" +
" }\n," +
" \"" + PRIORITY_TAG_PREFIX + "\"\n" +
" ]\n" +
" }\n" +
" }\n" +
"}"
);
pipeline.add(Aggregates.project(Projections.computed(J_TAGS, arrayFilter)));

// sort stage 2 -- order after group stage is not defined
pipeline.add(Aggregates.sort(Sorts.orderBy(Sorts.descending(J_TAGS))));

Expand Down
Expand Up @@ -259,23 +259,23 @@ public void extractJournalPidsWithSpecificTag() {
public void extractJournalPidsInOrderOfTags() {
insert("test_journal", new Document()
.append("pid", "pid1")
.append("_tg", Set.of("always-alive", "always-alive-10"))
.append("_tg", Set.of("always-alive", "priority-10"))
.append("to", 1L));
insert("test_journal", new Document()
.append("pid", "pid2")
.append("_tg", Set.of("always-alive", "always-alive-2"))
.append("_tg", Set.of("always-alive", "priority-2"))
.append("to", 1L));
insert("test_journal", new Document()
.append("pid", "pid3")
.append("_tg", Set.of("always-alive", "always-alive-3"))
.append("_tg", Set.of("always-alive", "priority-3"))
.append("to", 2L));
insert("test_journal", new Document()
.append("pid", "pid4")
.append("_tg", Set.of("always-alive", "always-alive-4"))
.append("_tg", Set.of("always-alive", "priority-4"))
.append("to", 2L));

final List<String> pids =
readJournal.getJournalPidsWithTagOrderedByTags("always-alive", Duration.ZERO)
readJournal.getJournalPidsWithTagOrderedByPriorityTag("always-alive", Duration.ZERO)
.runWith(Sink.seq(), materializer)
.toCompletableFuture().join();

Expand All @@ -286,33 +286,87 @@ public void extractJournalPidsInOrderOfTags() {
public void extractJournalPidsInOrderOfTagsOfNewestEvent() {
insert("test_journal", new Document()
.append("pid", "pid1")
.append("_tg", Set.of("always-alive", "always-alive-99"))
.append("_tg", Set.of("always-alive", "priority-99"))
.append("to", 1L));
insert("test_journal", new Document()
.append("pid", "pid1")
.append("_tg", Set.of("always-alive", "always-alive-1"))
.append("_tg", Set.of("always-alive", "priority-1"))
.append("to", 2L));
insert("test_journal", new Document()
.append("pid", "pid2")
.append("_tg", Set.of("always-alive", "always-alive-2"))
.append("_tg", Set.of("always-alive", "priority-2"))
.append("to", 1L));
insert("test_journal", new Document()
.append("pid", "pid3")
.append("_tg", Set.of("always-alive", "always-alive-3"))
.append("_tg", Set.of("always-alive", "priority-3"))
.append("to", 2L));
insert("test_journal", new Document()
.append("pid", "pid4")
.append("_tg", Set.of("always-alive", "always-alive-4"))
.append("_tg", Set.of("always-alive", "priority-4"))
.append("to", 2L));

final List<String> pids =
readJournal.getJournalPidsWithTagOrderedByTags("always-alive", Duration.ZERO)
readJournal.getJournalPidsWithTagOrderedByPriorityTag("always-alive", Duration.ZERO)
.runWith(Sink.seq(), materializer)
.toCompletableFuture().join();

assertThat(pids).containsExactly("pid4", "pid3", "pid2", "pid1");
}

@Test
public void extractJournalPidsInOrderOfTagsIgnoresOtherTags() {
insert("test_journal", new Document()
.append("pid", "pid1")
.append("_tg", Set.of("always-alive", "priority-99"))
.append("to", 1L));
insert("test_journal", new Document()
.append("pid", "pid2")
.append("_tg", Set.of("always-alive", "priority-2"))
.append("to", 1L));
insert("test_journal", new Document()
.append("pid", "pid3")
.append("_tg", Set.of("always-alive", "z-tag", "priority-3"))
.append("to", 2L));
insert("test_journal", new Document()
.append("pid", "pid4")
.append("_tg", Set.of("always-alive", "priority-4"))
.append("to", 2L));

final List<String> pids =
readJournal.getJournalPidsWithTagOrderedByPriorityTag("always-alive", Duration.ZERO)
.runWith(Sink.seq(), materializer)
.toCompletableFuture().join();

assertThat(pids).containsExactly("pid1", "pid4", "pid3", "pid2");
}

@Test
public void extractJournalPidsWithTagOrderedByPriorityTagWhenPriorityTagMissing() {
insert("test_journal", new Document()
.append("pid", "pid1")
.append("_tg", Set.of("always-alive"))
.append("to", 1L));
insert("test_journal", new Document()
.append("pid", "pid2")
.append("_tg", Set.of("always-alive"))
.append("to", 1L));
insert("test_journal", new Document()
.append("pid", "pid3")
.append("_tg", Set.of("always-alive"))
.append("to", 2L));
insert("test_journal", new Document()
.append("pid", "pid4")
.append("_tg", Set.of("always-alive"))
.append("to", 2L));

final List<String> pids =
readJournal.getJournalPidsWithTagOrderedByPriorityTag("always-alive", Duration.ZERO)
.runWith(Sink.seq(), materializer)
.toCompletableFuture().join();

assertThat(pids).containsExactlyInAnyOrder("pid1", "pid2", "pid3", "pid4");
}

@Test
public void extractJournalPidsAboveALowerBound() {
insert("test_journal", new Document().append("pid", "pid1").append("to", 1L));
Expand Down
Expand Up @@ -85,7 +85,7 @@ private PersistencePingActor(final ActorRef persistenceActorShardRegion, final P
switch (streamingOrder) {
case TAGS:
persistenceIdsSourceSupplier = () ->
readJournal.getJournalPidsWithTagOrderedByTags(pingConfig.getJournalTag(),
readJournal.getJournalPidsWithTagOrderedByPriorityTag(pingConfig.getJournalTag(),
pingConfig.getInterval());
break;
case ID:
Expand Down

0 comments on commit 5ad09e9

Please sign in to comment.