Skip to content

Commit

Permalink
[eclipse-ditto#964] Add an index on journal collections for Persisten…
Browse files Browse the repository at this point in the history
…cePingActor.

Add the compound index on _tg, pid so that PersistencePingActor
need not sort all results.

Signed-off-by: Yufei Cai <yufei.cai@bosch.io>
  • Loading branch information
yufei-cai committed Mar 3, 2021
1 parent e32e62d commit e0279b2
Show file tree
Hide file tree
Showing 8 changed files with 70 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
import java.time.ZoneId;
import java.time.temporal.ChronoUnit;
import java.util.NoSuchElementException;
import java.util.Set;
import java.util.concurrent.TimeUnit;

import org.assertj.core.api.Assertions;
Expand Down Expand Up @@ -787,9 +786,8 @@ public void createSubjectWithExpiry() {

// THEN: the subject expiry should be rounded up to the configured "subject-expiry-granularity"
// (10s for this test)
expectMsgEquals(
modifySubjectResponse(policyId, POLICY_LABEL, expectedAdjustedSubjectToAdd,
headersMockWithOtherAuth, true));
expectMsgEquals(modifySubjectResponse(policyId, POLICY_LABEL, expectedAdjustedSubjectToAdd,
headersMockWithOtherAuth, true));

final RetrieveSubject retrieveSubject =
RetrieveSubject.of(policyId, POLICY_LABEL, subjectToAdd.getId(), headersMockWithOtherAuth);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ protected AbstractPersistenceStreamingActor(final Function<PidWithSeqNr, T> enti
final MongoDbConfig mongoDbConfig =
DefaultMongoDbConfig.of(DefaultScopedConfig.dittoScoped(config));
mongoClient = MongoClientWrapper.newInstance(mongoDbConfig);
readJournal = MongoReadJournal.newInstance(config, mongoClient);
readJournal = MongoReadJournal.newInstance(config, mongoClient, getContext().getSystem());
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ private SnapshotStreamingActor(final Function<String, EntityId> pid2EntityId,
final MongoDbConfig mongoDbConfig =
DefaultMongoDbConfig.of(DefaultScopedConfig.dittoScoped(config));
mongoClient = MongoClientWrapper.newInstance(mongoDbConfig);
readJournal = MongoReadJournal.newInstance(config, mongoClient);
readJournal = MongoReadJournal.newInstance(config, mongoClient, getContext().getSystem());
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,22 @@ public static Index newInstance(final String name, final List<String> fields, fi
return newInstanceWithCustomKeys(name, keys, unique);
}

/**
* Returns a new {@link Index} with all fields having default (i.e. ascending) index direction.
*
* @param name the name of the index.
* @param fields the fields which form the index.
* @param unique whether or not the index should be unique.
* @param sparse whether or not the index should be sparse.
* @return the index.
* @see #newInstanceWithCustomKeys(String, List, boolean)
*/
public static Index newInstance(final String name, final List<String> fields, final boolean unique,
final boolean sparse) {
final BsonDocument keys = createKeysDocument(createDefaultKeys(requireNonNull(fields)));
return Index.of(keys, name, unique, sparse, BACKGROUND_OPTION_DEFAULT);
}

/**
* Returns a new {@link Index} with custom keys, in contrast to method {@link #newInstance(String, List, boolean)}.
* When {@code unique} is true, the created index will also be {@code sparse}.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,14 @@ public CompletionStage<Void> initialize(final String collectionName, final List<
});
}

private CompletionStage<Done> createNonExistingIndices(final String collectionName,
final List<Index> indices) {
/**
* Create defined indices whose names are not among the indexes of the collection.
*
* @param collectionName name of the collection.
* @param indices the defined indices.
* @return a future that completes after index creation or fails when index creation fails.
*/
public CompletionStage<Done> createNonExistingIndices(final String collectionName, final List<Index> indices) {
if (indices.isEmpty()) {
LOGGER.warn("No indices are defined, thus no indices are created.");
return CompletableFuture.completedFuture(Done.getInstance());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletionStage;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
Expand All @@ -29,6 +30,9 @@
import org.eclipse.ditto.services.utils.persistence.mongo.MongoClientWrapper;
import org.eclipse.ditto.services.utils.persistence.mongo.config.DefaultMongoDbConfig;
import org.eclipse.ditto.services.utils.persistence.mongo.config.MongoDbConfig;
import org.eclipse.ditto.services.utils.persistence.mongo.indices.Index;
import org.eclipse.ditto.services.utils.persistence.mongo.indices.IndexFactory;
import org.eclipse.ditto.services.utils.persistence.mongo.indices.IndexInitializer;
import org.eclipse.ditto.utils.jsr305.annotations.AllValuesAreNonnullByDefault;

import com.mongodb.client.model.Accumulators;
Expand All @@ -39,6 +43,7 @@
import com.mongodb.reactivestreams.client.MongoCollection;
import com.typesafe.config.Config;

import akka.Done;
import akka.NotUsed;
import akka.actor.ActorSystem;
import akka.contrib.persistence.mongodb.JournallingFieldNames$;
Expand All @@ -47,6 +52,7 @@
import akka.stream.Attributes;
import akka.stream.Materializer;
import akka.stream.RestartSettings;
import akka.stream.SystemMaterializer;
import akka.stream.javadsl.RestartSource;
import akka.stream.javadsl.Sink;
import akka.stream.javadsl.Source;
Expand Down Expand Up @@ -92,15 +98,21 @@ public class MongoReadJournal {

private static final Duration MAX_BACK_OFF_DURATION = Duration.ofSeconds(128L);

private static final Index TAG_PID_INDEX =
IndexFactory.newInstance("ditto_tag_pid", List.of(J_TAGS, J_PROCESSOR_ID), false, true);

private final String journalCollection;
private final String snapsCollection;
private final DittoMongoClient mongoClient;
private final IndexInitializer indexInitializer;

private MongoReadJournal(final String journalCollection, final String snapsCollection,
final DittoMongoClient mongoClient) {
final DittoMongoClient mongoClient, final ActorSystem actorSystem) {
this.journalCollection = journalCollection;
this.snapsCollection = snapsCollection;
this.mongoClient = mongoClient;
final var materializer = SystemMaterializer.get(actorSystem).materializer();
indexInitializer = IndexInitializer.of(mongoClient.getDefaultDatabase(), materializer);
}

/**
Expand All @@ -113,21 +125,7 @@ public static MongoReadJournal newInstance(final ActorSystem system) {
final Config config = system.settings().config();
final MongoDbConfig mongoDbConfig =
DefaultMongoDbConfig.of(DefaultScopedConfig.dittoScoped(config));
return newInstance(config, MongoClientWrapper.newInstance(mongoDbConfig));
}

/**
* Instantiate a read journal from collection names and database client, primarily for tests.
*
* @param journalCollection the journal collection name.
* @param snapsCollection the snapshot collection name.
* @param dittoMongoClient the client.
* @return a read journal for the journal and snapshot collections.
*/
public static MongoReadJournal newInstance(final String journalCollection, final String snapsCollection,
final DittoMongoClient dittoMongoClient) {

return new MongoReadJournal(journalCollection, snapsCollection, dittoMongoClient);
return newInstance(config, MongoClientWrapper.newInstance(mongoDbConfig), system);
}

/**
Expand All @@ -137,14 +135,24 @@ public static MongoReadJournal newInstance(final String journalCollection, final
* @param mongoClient The Mongo client wrapper.
* @return A {@code MongoReadJournal} object.
*/
public static MongoReadJournal newInstance(final Config config, final DittoMongoClient mongoClient) {
public static MongoReadJournal newInstance(final Config config, final DittoMongoClient mongoClient,
final ActorSystem actorSystem) {
final String autoStartJournalKey = extractAutoStartConfigKey(config, AKKA_PERSISTENCE_JOURNAL_AUTO_START);
final String autoStartSnapsKey = extractAutoStartConfigKey(config, AKKA_PERSISTENCE_SNAPS_AUTO_START);
final String journalCollection =
getOverrideCollectionName(config.getConfig(autoStartJournalKey), JOURNAL_COLLECTION_NAME_KEY);
final String snapshotCollection =
getOverrideCollectionName(config.getConfig(autoStartSnapsKey), SNAPS_COLLECTION_NAME_KEY);
return new MongoReadJournal(journalCollection, snapshotCollection, mongoClient);
return new MongoReadJournal(journalCollection, snapshotCollection, mongoClient, actorSystem);
}

/**
* Ensure a compound index exists for journal PID streaming based on tags.
*
* @return a future that completes after index creation completes or fails when index creation fails.
*/
public CompletionStage<Done> ensureTagPidIndex() {
return indexInitializer.createNonExistingIndices(journalCollection, List.of(TAG_PID_INDEX));
}

/**
Expand Down Expand Up @@ -181,7 +189,8 @@ public Source<String, NotUsed> getJournalPids(final int batchSize, final Duratio
* @return Source of all persistence IDs such that each element contains the persistence IDs in {@code batchSize}
* events that do not occur in prior buckets.
*/
public Source<String, NotUsed> getJournalPidsWithTag(final String tag, final int batchSize, final Duration maxIdleTime,
public Source<String, NotUsed> getJournalPidsWithTag(final String tag, final int batchSize,
final Duration maxIdleTime,
final Materializer mat) {

final int maxRestarts = computeMaxRestarts(maxIdleTime);
Expand Down Expand Up @@ -274,7 +283,8 @@ private Source<List<Document>, NotUsed> listNewestSnapshots(final MongoCollectio
return this.unfoldBatchedSource(lowerBoundPid,
mat,
SnapshotBatch::getMaxPid,
actualStartPid -> listNewestActiveSnapshotsByBatch(snapshotStore, actualStartPid, batchSize, snapshotFields))
actualStartPid -> listNewestActiveSnapshotsByBatch(snapshotStore, actualStartPid, batchSize,
snapshotFields))
.mapConcat(x -> x)
.map(SnapshotBatch::getItems);
}
Expand Down Expand Up @@ -305,16 +315,13 @@ private Source<String, NotUsed> listJournalPidsAbove(final MongoCollection<Docum
final String tag, final int batchSize, final Duration maxBackOff, final int maxRestarts) {

final List<Bson> pipeline = new ArrayList<>(5);
// optional match stage
if (!startPid.isEmpty()) {
if (!tag.isEmpty()) {
pipeline.add(Aggregates.match(Filters.and(Filters.gt(J_PROCESSOR_ID, startPid), Filters.eq(J_TAGS, tag))));
} else {
pipeline.add(Aggregates.match(Filters.gt(J_PROCESSOR_ID, startPid)));
}
} else if (!tag.isEmpty()) {
// optional match stages: consecutive match stages are optimized together ($match + $match coalescence)
if (!tag.isEmpty()) {
pipeline.add(Aggregates.match(Filters.eq(J_TAGS, tag)));
}
if (!startPid.isEmpty()) {
pipeline.add(Aggregates.match(Filters.gt(J_PROCESSOR_ID, startPid)));
}

// sort stage
pipeline.add(Aggregates.sort(Sorts.ascending(J_PROCESSOR_ID)));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ public void setUp() {
.withValue("akka.contrib.persistence.mongodb.mongo.mongouri", ConfigValueFactory.fromAnyRef(mongoUri));
actorSystem = ActorSystem.create("AkkaTestSystem", config);
materializer = SystemMaterializer.get(actorSystem).materializer();
readJournal = MongoReadJournal.newInstance(config, mongoClient);
readJournal = MongoReadJournal.newInstance(config, mongoClient, actorSystem);
}

@After
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.eclipse.ditto.services.utils.akka.PingCommand;
import org.eclipse.ditto.services.utils.akka.PingCommandResponse;
import org.eclipse.ditto.services.utils.akka.logging.DittoLoggerFactory;
import org.eclipse.ditto.services.utils.akka.logging.ThreadSafeDittoLoggingAdapter;
import org.eclipse.ditto.services.utils.persistence.mongo.streaming.MongoReadJournal;
import org.eclipse.ditto.services.utils.persistentactors.config.PingConfig;
import org.eclipse.ditto.services.utils.persistentactors.config.RateConfig;
Expand All @@ -33,7 +34,6 @@
import akka.actor.ActorRef;
import akka.actor.Cancellable;
import akka.actor.Props;
import akka.event.DiagnosticLoggingAdapter;
import akka.japi.pf.ReceiveBuilder;
import akka.stream.Materializer;
import akka.stream.javadsl.Source;
Expand All @@ -54,7 +54,7 @@ public final class PersistencePingActor extends AbstractActor {

private static final String CORRELATION_ID_PREFIX = "persistence-ping-actor-triggered:";

private final DiagnosticLoggingAdapter log = DittoLoggerFactory.getDiagnosticLoggingAdapter(this);
private final ThreadSafeDittoLoggingAdapter log = DittoLoggerFactory.getThreadSafeDittoLoggingAdapter(this);

private final ActorRef persistenceActorShardRegion;
private final Supplier<Source<String, NotUsed>> persistenceIdsSourceSupplier;
Expand Down Expand Up @@ -85,6 +85,10 @@ private PersistencePingActor(final ActorRef persistenceActorShardRegion, final P
pingConfig.getReadJournalBatchSize(),
pingConfig.getInterval(),
materializer);
readJournal.ensureTagPidIndex().exceptionally(e -> {
log.error(e, "Failed to create TagPidIndex");
return null;
});
}

/**
Expand Down

0 comments on commit e0279b2

Please sign in to comment.