Skip to content

Commit

Permalink
fix performance regression issue when running against MongoDB 5, prov…
Browse files Browse the repository at this point in the history
…iding the Mongo query planner an index hint for aggregation queries

Signed-off-by: Thomas Jäckle <thomas.jaeckle@beyonnex.io>
  • Loading branch information
thjaeckle committed Jun 10, 2024
1 parent 28bf170 commit a291e45
Show file tree
Hide file tree
Showing 14 changed files with 350 additions and 73 deletions.
11 changes: 11 additions & 0 deletions connectivity/service/src/main/resources/connectivity.conf
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,17 @@ ditto {
mongodb {
database = "connectivity"
database = ${?MONGO_DB_DATABASE}

read-journal {
hint-name-filterPidsThatDoesntContainTagInNewestEntry = null
hint-name-filterPidsThatDoesntContainTagInNewestEntry = ${?MONGODB_READ_JOURNAL_HINT_NAME_FILTER_PIDS_THAT_DOESNT_CONTAIN_TAG_IN_NEWEST_ENTRY}

hint-name-listLatestJournalEntries = null
hint-name-listLatestJournalEntries = ${?MONGODB_READ_JOURNAL_HINT_NAME_LIST_LATEST_JOURNAL_ENTRIES}

hint-name-listNewestActiveSnapshotsByBatch = "_id_"
hint-name-listNewestActiveSnapshotsByBatch = ${?MONGODB_READ_JOURNAL_HINT_NAME_LIST_NEWEST_ACTIVE_SNAPSHOT_BY_BATCH}
}
}

extensions {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,19 @@
import java.util.List;
import java.util.function.Function;

import org.apache.pekko.NotUsed;
import org.apache.pekko.stream.javadsl.Source;
import org.eclipse.ditto.internal.models.streaming.BatchedEntityIdWithRevisions;
import org.eclipse.ditto.internal.models.streaming.EntityIdWithRevision;
import org.eclipse.ditto.internal.models.streaming.SudoStreamPids;
import org.eclipse.ditto.internal.utils.pekko.streaming.AbstractStreamingActor;
import org.eclipse.ditto.internal.utils.config.DefaultScopedConfig;
import org.eclipse.ditto.internal.utils.pekko.streaming.AbstractStreamingActor;
import org.eclipse.ditto.internal.utils.persistence.mongo.config.DefaultMongoDbConfig;
import org.eclipse.ditto.internal.utils.persistence.mongo.config.MongoDbConfig;
import org.eclipse.ditto.internal.utils.persistence.mongo.streaming.MongoReadJournal;
import org.eclipse.ditto.internal.utils.persistence.mongo.streaming.PidWithSeqNr;
import org.eclipse.ditto.utils.jsr305.annotations.AllValuesAreNonnullByDefault;

import org.apache.pekko.NotUsed;
import org.apache.pekko.stream.javadsl.Source;

/**
* Abstract implementation of an Actor that streams information about persisted entities modified in a time window in
* the past.
Expand Down Expand Up @@ -64,7 +63,8 @@ protected AbstractPersistenceStreamingActor(final Function<PidWithSeqNr, T> enti
final MongoDbConfig mongoDbConfig =
DefaultMongoDbConfig.of(DefaultScopedConfig.dittoScoped(config));
mongoClient = MongoClientWrapper.newInstance(mongoDbConfig);
readJournal = MongoReadJournal.newInstance(config, mongoClient, getContext().getSystem());
readJournal = MongoReadJournal.newInstance(config, mongoClient, mongoDbConfig.getReadJournalConfig(),
getContext().getSystem());
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,32 @@
import java.util.function.Function;
import java.util.stream.Collectors;

import org.apache.pekko.Done;
import org.apache.pekko.NotUsed;
import org.apache.pekko.actor.ActorRef;
import org.apache.pekko.actor.CoordinatedShutdown;
import org.apache.pekko.actor.Props;
import org.apache.pekko.cluster.pubsub.DistributedPubSub;
import org.apache.pekko.cluster.pubsub.DistributedPubSubMediator;
import org.apache.pekko.japi.pf.ReceiveBuilder;
import org.apache.pekko.pattern.Patterns;
import org.apache.pekko.stream.KillSwitches;
import org.apache.pekko.stream.Materializer;
import org.apache.pekko.stream.SharedKillSwitch;
import org.apache.pekko.stream.SourceRef;
import org.apache.pekko.stream.javadsl.Source;
import org.apache.pekko.stream.javadsl.StreamRefs;
import org.bson.Document;
import org.eclipse.ditto.base.model.entity.id.AbstractNamespacedEntityId;
import org.eclipse.ditto.base.model.entity.id.EntityId;
import org.eclipse.ditto.base.model.entity.type.EntityType;
import org.eclipse.ditto.internal.models.streaming.StreamedSnapshot;
import org.eclipse.ditto.internal.models.streaming.SudoStreamSnapshots;
import org.eclipse.ditto.internal.utils.cluster.DistPubSubAccess;
import org.eclipse.ditto.internal.utils.config.DefaultScopedConfig;
import org.eclipse.ditto.internal.utils.pekko.actors.AbstractActorWithShutdownBehavior;
import org.eclipse.ditto.internal.utils.pekko.logging.DittoDiagnosticLoggingAdapter;
import org.eclipse.ditto.internal.utils.pekko.logging.DittoLoggerFactory;
import org.eclipse.ditto.internal.utils.cluster.DistPubSubAccess;
import org.eclipse.ditto.internal.utils.config.DefaultScopedConfig;
import org.eclipse.ditto.internal.utils.persistence.mongo.config.DefaultMongoDbConfig;
import org.eclipse.ditto.internal.utils.persistence.mongo.config.MongoDbConfig;
import org.eclipse.ditto.internal.utils.persistence.mongo.streaming.MongoReadJournal;
Expand All @@ -38,22 +53,6 @@
import org.eclipse.ditto.json.JsonValue;
import org.eclipse.ditto.utils.jsr305.annotations.AllValuesAreNonnullByDefault;

import org.apache.pekko.Done;
import org.apache.pekko.NotUsed;
import org.apache.pekko.actor.ActorRef;
import org.apache.pekko.actor.CoordinatedShutdown;
import org.apache.pekko.actor.Props;
import org.apache.pekko.cluster.pubsub.DistributedPubSub;
import org.apache.pekko.cluster.pubsub.DistributedPubSubMediator;
import org.apache.pekko.japi.pf.ReceiveBuilder;
import org.apache.pekko.pattern.Patterns;
import org.apache.pekko.stream.KillSwitches;
import org.apache.pekko.stream.Materializer;
import org.apache.pekko.stream.SharedKillSwitch;
import org.apache.pekko.stream.SourceRef;
import org.apache.pekko.stream.javadsl.Source;
import org.apache.pekko.stream.javadsl.StreamRefs;


/**
* An actor that streams from the snapshot store of a service with Mongo persistence plugin on request.
Expand Down Expand Up @@ -103,7 +102,8 @@ private SnapshotStreamingActor(final Function<String, EntityId> pid2EntityId,
final MongoDbConfig mongoDbConfig =
DefaultMongoDbConfig.of(DefaultScopedConfig.dittoScoped(config));
mongoClient = MongoClientWrapper.newInstance(mongoDbConfig);
readJournal = MongoReadJournal.newInstance(config, mongoClient, getContext().getSystem());
readJournal = MongoReadJournal.newInstance(config, mongoClient, mongoDbConfig.getReadJournalConfig(),
getContext().getSystem());
pubSubMediator = DistributedPubSub.get(getContext().getSystem()).mediator();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ public final class DefaultMongoDbConfig implements MongoDbConfig {
private final DefaultConnectionPoolConfig connectionPoolConfig;
private final DefaultCircuitBreakerConfig circuitBreakerConfig;
private final DefaultMonitoringConfig monitoringConfig;
private final MongoReadJournalConfig readJournalConfig;

private DefaultMongoDbConfig(final ConfigWithFallback config) {
maxQueryTime = config.getNonNegativeAndNonZeroDurationOrThrow(MongoDbConfigValue.MAX_QUERY_TIME);
Expand All @@ -61,6 +62,7 @@ private DefaultMongoDbConfig(final ConfigWithFallback config) {
connectionPoolConfig = DefaultConnectionPoolConfig.of(config);
circuitBreakerConfig = DefaultCircuitBreakerConfig.of(config);
monitoringConfig = DefaultMonitoringConfig.of(config);
readJournalConfig = DefaultMongoReadJournalConfig.of(config);
}

/**
Expand Down Expand Up @@ -122,6 +124,11 @@ public MonitoringConfig getMonitoringConfig() {
return monitoringConfig;
}

@Override
public MongoReadJournalConfig getReadJournalConfig() {
return readJournalConfig;
}

@Override
public boolean equals(final Object o) {
if (this == o) {
Expand All @@ -137,13 +144,14 @@ public boolean equals(final Object o) {
Objects.equals(optionsConfig, that.optionsConfig) &&
Objects.equals(connectionPoolConfig, that.connectionPoolConfig) &&
Objects.equals(circuitBreakerConfig, that.circuitBreakerConfig) &&
Objects.equals(readJournalConfig, that.readJournalConfig) &&
Objects.equals(monitoringConfig, that.monitoringConfig);
}

@Override
public int hashCode() {
return Objects.hash(mongoDbUri, maxQueryTime, documentDbCompatibilityMode, optionsConfig, connectionPoolConfig,
circuitBreakerConfig, monitoringConfig);
circuitBreakerConfig, monitoringConfig, readJournalConfig);
}

@Override
Expand All @@ -156,6 +164,7 @@ public String toString() {
", connectionPoolConfig=" + connectionPoolConfig +
", circuitBreakerConfig=" + circuitBreakerConfig +
", monitoringConfig=" + monitoringConfig +
", readJournalConfig=" + readJournalConfig +
"]";
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
/*
* Copyright (c) 2024 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0
*
* SPDX-License-Identifier: EPL-2.0
*/
package org.eclipse.ditto.internal.utils.persistence.mongo.config;

import java.util.Objects;
import java.util.Optional;

import javax.annotation.Nullable;
import javax.annotation.concurrent.Immutable;

import org.eclipse.ditto.internal.utils.config.ConfigWithFallback;
import org.eclipse.ditto.internal.utils.config.KnownConfigValue;
import org.eclipse.ditto.internal.utils.config.ScopedConfig;

import com.typesafe.config.Config;

/**
* This class implements the config for the handling of event journal entries.
*/
@Immutable
public final class DefaultMongoReadJournalConfig implements MongoReadJournalConfig {

private static final String CONFIG_PATH = "read-journal";

@Nullable private final String hintNameFilterPidsThatDoesntContainTagInNewestEntry;
@Nullable private final String hintNameListLatestJournalEntries;
@Nullable private final String listNewestActiveSnapshotsByBatch;

private DefaultMongoReadJournalConfig(final ScopedConfig config) {
hintNameFilterPidsThatDoesntContainTagInNewestEntry = getNullableString(config,
MongoReadJournalConfigValue.HINT_NAME_FILTER_PIDS_THAT_DOESNT_CONTAIN_TAG_IN_NEWEST_ENTRY);
hintNameListLatestJournalEntries = getNullableString(config,
MongoReadJournalConfigValue.HINT_NAME_LIST_LATEST_JOURNAL_ENTRIES);
listNewestActiveSnapshotsByBatch = getNullableString(config,
MongoReadJournalConfigValue.HINT_NAME_LIST_NEWEST_ACTIVE_SNAPSHOT_BY_BATCH);
}

/**
* Returns an instance of the default mongo read journal config based on the settings of the specified Config.
*
* @param config is supposed to provide the settings of the mongo read journal config at {@value #CONFIG_PATH}.
* @return instance
* @throws org.eclipse.ditto.internal.utils.config.DittoConfigError if {@code config} is invalid.
*/
public static DefaultMongoReadJournalConfig of(final Config config) {
return new DefaultMongoReadJournalConfig(
ConfigWithFallback.newInstance(config, CONFIG_PATH, MongoReadJournalConfigValue.values()));
}

@Nullable
private static String getNullableString(final Config config, final KnownConfigValue configValue) {
return config.getIsNull(configValue.getConfigPath()) ? null : config.getString(configValue.getConfigPath());
}

@Override
public Optional<String> getIndexNameHintForFilterPidsThatDoesntContainTagInNewestEntry() {
return Optional.ofNullable(hintNameFilterPidsThatDoesntContainTagInNewestEntry);
}

@Override
public Optional<String> getIndexNameHintForListLatestJournalEntries() {
return Optional.ofNullable(hintNameListLatestJournalEntries);
}

@Override
public Optional<String> getIndexNameHintForListNewestActiveSnapshotsByBatch() {
return Optional.ofNullable(listNewestActiveSnapshotsByBatch);
}

@Override
public boolean equals(final Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
final DefaultMongoReadJournalConfig that = (DefaultMongoReadJournalConfig) o;
return Objects.equals(hintNameFilterPidsThatDoesntContainTagInNewestEntry,
that.hintNameFilterPidsThatDoesntContainTagInNewestEntry) &&
Objects.equals(hintNameListLatestJournalEntries, that.hintNameListLatestJournalEntries) &&
Objects.equals(listNewestActiveSnapshotsByBatch, that.listNewestActiveSnapshotsByBatch);
}

@Override
public int hashCode() {
return Objects.hash(hintNameFilterPidsThatDoesntContainTagInNewestEntry, hintNameListLatestJournalEntries,
listNewestActiveSnapshotsByBatch);
}

@Override
public String toString() {
return getClass().getSimpleName() + " [" +
"hintNameFilterPidsThatDoesntContainTagInNewestEntry=" +
hintNameFilterPidsThatDoesntContainTagInNewestEntry +
", hintNameListLatestJournalEntries=" + hintNameListLatestJournalEntries +
", listNewestActiveSnapshotsByBatch=" + listNewestActiveSnapshotsByBatch +
"]";
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,13 @@ public interface MongoDbConfig {
*/
MonitoringConfig getMonitoringConfig();

/**
* Returns the configuration settings of the MongoReadJournal.
*
* @return the MongoReadJournal config.
*/
MongoReadJournalConfig getReadJournalConfig();

/**
* An enumeration of known value paths and associated default values of the MongoDbConfig.
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/*
* Copyright (c) 2024 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0
*
* SPDX-License-Identifier: EPL-2.0
*/
package org.eclipse.ditto.internal.utils.persistence.mongo.config;

import java.util.Optional;

import javax.annotation.Nullable;
import javax.annotation.concurrent.Immutable;

import org.eclipse.ditto.internal.utils.config.KnownConfigValue;

/**
* Provides configuration settings for the {@code MongoReadJournal}.
*/
@Immutable
public interface MongoReadJournalConfig {

/**
* @return the optional hint name for aggregation done in {@code filterPidsThatDoesntContainTagInNewestEntry}.
*/
Optional<String> getIndexNameHintForFilterPidsThatDoesntContainTagInNewestEntry();

/**
* @return the optional hint name for aggregation done in {@code listLatestJournalEntries}.
*/
Optional<String> getIndexNameHintForListLatestJournalEntries();

/**
* @return the optional hint name for aggregation done in {@code listNewestActiveSnapshotsByBatch}.
*/
Optional<String> getIndexNameHintForListNewestActiveSnapshotsByBatch();


/**
* An enumeration of the known config path expressions and their associated default values for
* {@code MongoReadJournalConfig}.
*/
enum MongoReadJournalConfigValue implements KnownConfigValue {

/**
* Hint name for aggregation done in {@code filterPidsThatDoesntContainTagInNewestEntry}.
*/
HINT_NAME_FILTER_PIDS_THAT_DOESNT_CONTAIN_TAG_IN_NEWEST_ENTRY("hint-name-filterPidsThatDoesntContainTagInNewestEntry", null),

/**
* Hint name for aggregation done in {@code listLatestJournalEntries}.
*/
HINT_NAME_LIST_LATEST_JOURNAL_ENTRIES("hint-name-listLatestJournalEntries", null),

/**
* Hint name for aggregation done in {@code listNewestActiveSnapshotsByBatch}.
*/
HINT_NAME_LIST_NEWEST_ACTIVE_SNAPSHOT_BY_BATCH("hint-name-listNewestActiveSnapshotsByBatch", null);

private final String path;
private final Object defaultValue;

MongoReadJournalConfigValue(final String thePath, @Nullable final Object theDefaultValue) {
path = thePath;
defaultValue = theDefaultValue;
}

@Override
public Object getDefaultValue() {
return defaultValue;
}

@Override
public String getConfigPath() {
return path;
}

}

}
Loading

0 comments on commit a291e45

Please sign in to comment.