Skip to content

Commit

Permalink
added config parameter for "documentDB-compatibility-mode"
Browse files Browse the repository at this point in the history
* based on that changed how the priority sorting is done
* apply a workaround to get priority sorting also for documentDB mode
* use "collation" and "numericOrdering=true" (as it was done before) for non-DocumentDB compatibility mode
* run MongoReadJournalIT parameterized with and without documentDB compatibility mode

Signed-off-by: Thomas Jaeckle <thomas.jaeckle@beyonnex.io>
  • Loading branch information
thjaeckle committed Jan 24, 2023
1 parent b9f96ac commit bccbe14
Show file tree
Hide file tree
Showing 10 changed files with 145 additions and 29 deletions.
4 changes: 4 additions & 0 deletions internal/utils/config/src/main/resources/ditto-mongo.conf
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@ ditto.mongodb {
maxQueryTime = 60s
maxQueryTime = ${?MONGO_DB_QUERY_TIMEOUT}

# whether or not to configure compatibility mode for "DocumentDB" (as replacement for MongoDB)
documentDB-compatibility-mode = false
documentDB-compatibility-mode = ${?MONGO_DB_DOCUMENTDB_COMPATIBILITY_MODE}

options {
ssl = false
ssl = ${?MONGO_DB_SSL_ENABLED}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,14 @@ interface GeneralPropertiesStep {
*/
GeneralPropertiesStep maxQueryTime(@Nullable Duration maxQueryTime);

/**
* Configures whether to run in "DocumentDB" compatibility mode or not.
*
* @param documentDbCompatibilityMode whether to run in "DocumentDB" compatibility mode or not.
* @return this builder instance to allow method chaining.
*/
GeneralPropertiesStep documentDbCompatibilityMode(boolean documentDbCompatibilityMode);

/**
* Sets the minimum number of connections to the database always kept alive in the pool.
* Default is {@code 0} connections.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

import java.time.Duration;
import java.time.temporal.TemporalUnit;
import java.util.Objects;

import javax.annotation.Nullable;
import javax.annotation.concurrent.Immutable;
Expand All @@ -29,9 +30,11 @@
public final class DittoMongoClientSettings {

private final Duration maxQueryTime;
private final boolean documentDBCompatibilityMode;

private DittoMongoClientSettings(final Builder builder) {
maxQueryTime = builder.maxQueryTime;
documentDBCompatibilityMode = builder.documentDbCompatibilityMode;
}

/**
Expand All @@ -52,6 +55,15 @@ public Duration getMaxQueryTime() {
return maxQueryTime;
}

/**
* Returns whether to initialize the DittoMongoClient with the "DocumentDB" compatibility mode enabled or not.
*
* @return the DocumentDB compatibility mode.
*/
public boolean isDocumentDBCompatibilityMode() {
return documentDBCompatibilityMode;
}

/**
* A mutable builder with a fluent API for a {@code DittoMongoClientSettings}.
*/
Expand All @@ -61,10 +73,15 @@ public static final class Builder {
private static final Duration DEFAULT_MAX_QUERY_TIME =
(Duration) MongoDbConfig.MongoDbConfigValue.MAX_QUERY_TIME.getDefaultValue();

private static final boolean DEFAULT_DOCUMENTDB_COMPATIBILITY_MODE =
(boolean) MongoDbConfig.MongoDbConfigValue.DOCUMENT_DB_COMPATIBILITY_MODE.getDefaultValue();

private Duration maxQueryTime;
private boolean documentDbCompatibilityMode;

private Builder() {
maxQueryTime = DEFAULT_MAX_QUERY_TIME;
documentDbCompatibilityMode = DEFAULT_DOCUMENTDB_COMPATIBILITY_MODE;
}

/**
Expand All @@ -88,11 +105,20 @@ public Builder maxQueryTime(final long amount, final TemporalUnit temporalUnit)
* @return this builder instance to allow method chaining.
*/
public Builder maxQueryTime(@Nullable final Duration maxQueryTime) {
if (null != maxQueryTime) {
this.maxQueryTime = maxQueryTime;
} else {
this.maxQueryTime = DEFAULT_MAX_QUERY_TIME;
}
this.maxQueryTime = Objects.requireNonNullElse(maxQueryTime, DEFAULT_MAX_QUERY_TIME);
return this;
}

/**
* Sets the DocumentDB compatibility mode.
*
* @param documentDbCompatibilityMode whether to initialize the DittoMongoClient with the "DocumentDB"
* compatibility mode enabled or not.
*
* @return this builder instance to allow method chaining.
*/
public Builder documentDbCompatibilityMode(final boolean documentDbCompatibilityMode) {
this.documentDbCompatibilityMode = documentDbCompatibilityMode;
return this;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,7 @@ static GeneralPropertiesStep newInstance(final MongoDbConfig mongoDbConfig) {

final MongoClientWrapperBuilder builder = new MongoClientWrapperBuilder();
builder.maxQueryTime(mongoDbConfig.getMaxQueryTime());
builder.documentDbCompatibilityMode(mongoDbConfig.isDocumentDBCompatibilityMode());
builder.connectionString(mongoDbConfig.getMongoDbUri());

final MongoDbConfig.ConnectionPoolConfig connectionPoolConfig = mongoDbConfig.getConnectionPoolConfig();
Expand Down Expand Up @@ -336,6 +337,12 @@ public GeneralPropertiesStep maxQueryTime(@Nullable final Duration maxQueryTime)
return this;
}

@Override
public GeneralPropertiesStep documentDbCompatibilityMode(final boolean documentDbCompatibilityMode) {
dittoMongoClientSettingsBuilder.documentDbCompatibilityMode(documentDbCompatibilityMode);
return this;
}

@Override
public DatabaseNameStep hostnameAndPort(final CharSequence hostname, final int portNumber) {
checkNotNull(hostname, "hostname");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,15 @@ public final class DefaultMongoDbConfig implements MongoDbConfig {

private final String mongoDbUri;
private final Duration maxQueryTime;
private final boolean documentDbCompatibilityMode;
private final DefaultOptionsConfig optionsConfig;
private final DefaultConnectionPoolConfig connectionPoolConfig;
private final DefaultCircuitBreakerConfig circuitBreakerConfig;
private final DefaultMonitoringConfig monitoringConfig;

private DefaultMongoDbConfig(final ConfigWithFallback config) {
maxQueryTime = config.getNonNegativeAndNonZeroDurationOrThrow(MongoDbConfigValue.MAX_QUERY_TIME);
documentDbCompatibilityMode = config.getBoolean(MongoDbConfigValue.DOCUMENT_DB_COMPATIBILITY_MODE.getConfigPath());
optionsConfig = DefaultOptionsConfig.of(config);
final var configuredUri = config.getString(MongoDbConfigValue.URI.getConfigPath());
final Map<String, Object> configuredExtraUriOptions = optionsConfig.extraUriOptions();
Expand Down Expand Up @@ -95,6 +97,11 @@ public Duration getMaxQueryTime() {
return maxQueryTime;
}

@Override
public boolean isDocumentDBCompatibilityMode() {
return documentDbCompatibilityMode;
}

@Override
public OptionsConfig getOptionsConfig() {
return optionsConfig;
Expand Down Expand Up @@ -126,6 +133,7 @@ public boolean equals(final Object o) {
final DefaultMongoDbConfig that = (DefaultMongoDbConfig) o;
return Objects.equals(mongoDbUri, that.mongoDbUri) &&
Objects.equals(maxQueryTime, that.maxQueryTime) &&
documentDbCompatibilityMode == that.documentDbCompatibilityMode &&
Objects.equals(optionsConfig, that.optionsConfig) &&
Objects.equals(connectionPoolConfig, that.connectionPoolConfig) &&
Objects.equals(circuitBreakerConfig, that.circuitBreakerConfig) &&
Expand All @@ -134,15 +142,16 @@ public boolean equals(final Object o) {

@Override
public int hashCode() {
return Objects.hash(mongoDbUri, maxQueryTime, optionsConfig, connectionPoolConfig, circuitBreakerConfig,
monitoringConfig);
return Objects.hash(mongoDbUri, maxQueryTime, documentDbCompatibilityMode, optionsConfig, connectionPoolConfig,
circuitBreakerConfig, monitoringConfig);
}

@Override
public String toString() {
return getClass().getSimpleName() + " [" +
"mongoDbUri=" + mongoDbUri +
", maxQueryTime=" + maxQueryTime +
", documentDbCompatibilityMode=" + documentDbCompatibilityMode +
", optionsConfig=" + optionsConfig +
", connectionPoolConfig=" + connectionPoolConfig +
", circuitBreakerConfig=" + circuitBreakerConfig +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,13 @@ public interface MongoDbConfig {
*/
Duration getMaxQueryTime();

/**
* Returns whether to initialize the DittoMongoClient with the "DocumentDB" compatibility mode enabled or not.
*
* @return whether to run in "DocumentDB" compatibility mode or not.
*/
boolean isDocumentDBCompatibilityMode();

/**
* Returns the configuration settings of the MongoDB options.
*
Expand Down Expand Up @@ -83,7 +90,12 @@ enum MongoDbConfigValue implements KnownConfigValue {
/**
* The maximum query duration.
*/
MAX_QUERY_TIME("maxQueryTime", Duration.ofMinutes(1L));
MAX_QUERY_TIME("maxQueryTime", Duration.ofMinutes(1L)),

/**
* Whether to run in "DocumentDB" compatibility mode or not.
*/
DOCUMENT_DB_COMPATIBILITY_MODE("documentDB-compatibility-mode", false);

private final String path;
private final Object defaultValue;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,12 @@
import com.mongodb.client.model.Accumulators;
import com.mongodb.client.model.Aggregates;
import com.mongodb.client.model.BsonField;
import com.mongodb.client.model.Collation;
import com.mongodb.client.model.Filters;
import com.mongodb.client.model.Projections;
import com.mongodb.client.model.Sorts;
import com.mongodb.client.result.DeleteResult;
import com.mongodb.reactivestreams.client.AggregatePublisher;
import com.mongodb.reactivestreams.client.MongoCollection;
import com.typesafe.config.Config;

Expand Down Expand Up @@ -599,7 +601,7 @@ private <T> Source<List<T>, NotUsed> unfoldBatchedSource(
.withAttributes(Attributes.inputBuffer(1, 1));
}

private static Source<String, NotUsed> listPidsInJournalOrderedByPriorityTag(
private Source<String, NotUsed> listPidsInJournalOrderedByPriorityTag(
final MongoCollection<Document> journal,
final String tag,
final int maxRestarts) {
Expand All @@ -623,31 +625,53 @@ private static Source<String, NotUsed> listPidsInJournalOrderedByPriorityTag(
" $eq: [\n" +
" {\n" +
" $substrCP: [\"$$tags\", 0, " + PRIORITY_TAG_PREFIX.length() + "]\n" +
" }\n," +
" },\n" +
" \"" + PRIORITY_TAG_PREFIX + "\"\n" +
" ]\n" +
" }\n" +
" }\n" +
"}"
))));

// extract priority as "int" from relevant tags so that they can be compared numerically:
pipeline.add(Aggregates.project(Projections.computed(J_TAGS, BsonDocument.parse(
"{\n" +
" $map: {\n" +
" input: \"$" + J_TAGS + "\",\n" +
" as: \"tag\",\n" +
" in: {\n" +
" $convert: {\n" +
" input: {\n" +
" $substrCP: [\"$$tag\", " + PRIORITY_TAG_PREFIX.length() + ", { $strLenCP: \"$$tag\"}]\n" +
" },\n" +
" to: \"int\"\n" +
" }\n" +
" }\n" +
" }\n" +
"}\n"
))));
if (mongoClient.getDittoSettings().isDocumentDBCompatibilityMode()) {
// extract priority as "int" from relevant tags so that they can be compared numerically:
pipeline.add(Aggregates.project(Projections.computed(J_TAGS, BsonDocument.parse(
"{\n" +
" $map: {\n" +
" input: \"$" + J_TAGS + "\",\n" +
" as: \"tag\",\n" +
" in: {\n" +
" $reduce: {\n" +
" input: {\n" +
" $range: [\n" +
" 0,\n" +
" {\n" +
" $subtract: [\n" +
" 1000,\n" + // assumption: max prio is 1000 - all higher prios are not correctly ordered
" {\n" +
" $strLenCP: {\n" +
" $substrCP: [\n" +
" \"$$tag\", " + PRIORITY_TAG_PREFIX.length() + ", { $strLenCP: \"$$tag\" }\n" +
" ]\n" +
" }\n" +
" }\n" +
" ]\n" +
" }\n" +
" ],\n" +
" },\n" +
" initialValue: \"$$tag\",\n" +
" in: {\n" +
" $concat: [\n" +
" \" \",\n" +
" \"$$value\"\n" +
" ]\n" +
" }\n" +
" }\n" +
" }\n" +
" }\n" +
"}\n"
))));
}

// sort stage 2 -- order after group stage is not defined
pipeline.add(Aggregates.sort(Sorts.orderBy(Sorts.descending(J_TAGS))));
Expand All @@ -658,8 +682,17 @@ private static Source<String, NotUsed> listPidsInJournalOrderedByPriorityTag(
final RestartSettings restartSettings = RestartSettings.create(minBackOff,
MongoReadJournal.MAX_BACK_OFF_DURATION, randomFactor)
.withMaxRestarts(maxRestarts, minBackOff);

final AggregatePublisher<Document> aggregatePublisher;
if (mongoClient.getDittoSettings().isDocumentDBCompatibilityMode()) {
aggregatePublisher = journal.aggregate(pipeline);
} else {
aggregatePublisher = journal.aggregate(pipeline)
.collation(Collation.builder().locale("en_US").numericOrdering(true).build());
}

return RestartSource.onFailuresWithBackoff(restartSettings, () ->
Source.fromPublisher(journal.aggregate(pipeline))
Source.fromPublisher(aggregatePublisher)
.flatMapConcat(document -> {
final Object pid = document.get(J_ID);
if (pid instanceof CharSequence) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ public void defaultMongodbConfigContainsExactlyValuesOfResourceConfigFile() {
final DefaultMongoDbConfig underTest = DefaultMongoDbConfig.of(rawMongoDbConfig);

softly.assertThat(underTest.getMaxQueryTime()).isEqualTo(Duration.ofSeconds(10));
softly.assertThat(underTest.isDocumentDBCompatibilityMode()).isEqualTo(true);
// query options from the configured Mongo URI in "mongodb_test.conf" must be preserved
// exception is the "ssl" option where the configured value in the config has priority
softly.assertThat(underTest.getMongoDbUri())
Expand Down Expand Up @@ -126,7 +127,10 @@ public void defaultMongodbConfigContainsExactlyFallBackValuesIfEmptyResourceConf
ConfigFactory.parseMap(Collections.singletonMap(absoluteMongoDbUriPath, sourceMongoDbUri));
final DefaultMongoDbConfig underTest = DefaultMongoDbConfig.of(originalMongoDbConfig);

softly.assertThat(underTest.getMaxQueryTime()).as("maxQueryTime").isEqualTo(Duration.ofMinutes(1));
softly.assertThat(underTest.getMaxQueryTime()).as("maxQueryTime")
.isEqualTo(MongoDbConfig.MongoDbConfigValue.MAX_QUERY_TIME.getDefaultValue());
softly.assertThat(underTest.isDocumentDBCompatibilityMode()).as("documentDBCompatibilityMode")
.isEqualTo(MongoDbConfig.MongoDbConfigValue.DOCUMENT_DB_COMPATIBILITY_MODE.getDefaultValue());
softly.assertThat(underTest.getMongoDbUri()).as("mongoDbUri").isEqualTo("mongodb://foo:bar@mongodb:27017/test?ssl=false");
softly.assertThat(underTest.getOptionsConfig()).satisfies(optionsConfig -> {
softly.assertThat(optionsConfig.isSslEnabled()).as("ssl").isFalse();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

import com.mongodb.client.result.DeleteResult;
import com.typesafe.config.Config;
Expand All @@ -54,10 +56,19 @@
* Tests {@link MongoReadJournal}.
* CAUTION: Do not use Akka streams testkit; it does not work for Source.fromPublisher against reactive-streams client.
*/
@RunWith(Parameterized.class)
public final class MongoReadJournalIT {

private static final String MONGO_DB = "mongoReadJournalIT";

@Parameterized.Parameters(name = "documentDbCompatibilityMode={0}")
public static Collection<Boolean> documentDbCompatibilityMode() {
return Arrays.asList(false, true);
}

@Parameterized.Parameter
public static boolean documentDbCompatibilityMode;

@ClassRule
public static final MongoDbResource MONGO_RESOURCE = new MongoDbResource();
private static DittoMongoClient mongoClient;
Expand All @@ -71,6 +82,7 @@ public static void startMongoResource() {
mongoClient = MongoClientWrapper.getBuilder()
.hostnameAndPort(MONGO_RESOURCE.getBindIp(), MONGO_RESOURCE.getPort())
.defaultDatabaseName(MONGO_DB)
.documentDbCompatibilityMode(documentDbCompatibilityMode)
.connectionPoolMaxSize(100)
.connectionPoolMaxWaitTime(Duration.ofSeconds(30))
.build();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
mongodb {
maxQueryTime = 10s
documentDB-compatibility-mode = true

hostname = "mongodb"
port = 27017
Expand Down

0 comments on commit bccbe14

Please sign in to comment.