Skip to content

Commit

Permalink
Merge pull request #1194 from bosch-io/feature/configure-mongo-read-s…
Browse files Browse the repository at this point in the history
…ettings-for-updater-persistence

make readConcern and readPreference to use for MongoThingsSearchUpdaterPersistence configurable
  • Loading branch information
yufei-cai authored Sep 27, 2021
2 parents 1af8746 + 2fb7951 commit eb3c00a
Show file tree
Hide file tree
Showing 12 changed files with 327 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public com.mongodb.ReadConcern getMongoReadConcern() {
* @return An optional of the ReadConcern matching to the given read preference string. Empty if no matching
* ReadConcern exists.
*/
static Optional<ReadConcern> ofReadConcern(final String readConcern) {
public static Optional<ReadConcern> ofReadConcern(final String readConcern) {
checkNotNull(readConcern, "readConcern");
return Arrays.stream(values())
.filter(c -> c.name.contentEquals(readConcern))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ public com.mongodb.ReadPreference getMongoReadPreference() {
* @return An optional of the ReadPreference matching to the given read preference string. Empty if no matching
* ReadPreference exists.
*/
static Optional<ReadPreference> ofReadPreference(final String readPreference) {
public static Optional<ReadPreference> ofReadPreference(final String readPreference) {
checkNotNull(readPreference, "readPreference");
return Arrays.stream(values())
.filter(c -> c.name.contentEquals(readPreference))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ public final class DefaultUpdaterConfig implements UpdaterConfig {
private final double forceUpdateProbability;
private final BackgroundSyncConfig backgroundSyncConfig;
private final StreamConfig streamConfig;
private final UpdaterPersistenceConfig updaterPersistenceConfig;

private DefaultUpdaterConfig(final ConfigWithFallback updaterScopedConfig) {
maxIdleTime = updaterScopedConfig.getNonNegativeDurationOrThrow(UpdaterConfigValue.MAX_IDLE_TIME);
Expand All @@ -49,6 +50,7 @@ private DefaultUpdaterConfig(final ConfigWithFallback updaterScopedConfig) {
updaterScopedConfig.getDouble(UpdaterConfigValue.FORCE_UPDATE_PROBABILITY.getConfigPath());
backgroundSyncConfig = DefaultBackgroundSyncConfig.fromUpdaterConfig(updaterScopedConfig);
streamConfig = DefaultStreamConfig.of(updaterScopedConfig);
updaterPersistenceConfig = DefaultUpdaterPersistenceConfig.of(updaterScopedConfig);
}

/**
Expand Down Expand Up @@ -93,6 +95,11 @@ public StreamConfig getStreamConfig() {
return streamConfig;
}

@Override
public UpdaterPersistenceConfig getUpdaterPersistenceConfig() {
return updaterPersistenceConfig;
}

@Override
public boolean equals(final Object o) {
if (this == o) {
Expand All @@ -107,13 +114,14 @@ public boolean equals(final Object o) {
Objects.equals(shardingStatePollInterval, that.shardingStatePollInterval) &&
Double.compare(forceUpdateProbability, that.forceUpdateProbability) == 0 &&
Objects.equals(backgroundSyncConfig, that.backgroundSyncConfig) &&
Objects.equals(streamConfig, that.streamConfig);
Objects.equals(streamConfig, that.streamConfig) &&
Objects.equals(updaterPersistenceConfig, that.updaterPersistenceConfig);
}

@Override
public int hashCode() {
return Objects.hash(maxIdleTime, shardingStatePollInterval, eventProcessingActive, forceUpdateProbability,
backgroundSyncConfig, streamConfig);
backgroundSyncConfig, streamConfig, updaterPersistenceConfig);
}

@Override
Expand All @@ -125,6 +133,7 @@ public String toString() {
", forceUpdateProbability=" + forceUpdateProbability +
", backgroundSyncConfig=" + backgroundSyncConfig +
", streamConfig=" + streamConfig +
", updaterPersistenceConfig=" + updaterPersistenceConfig +
"]";
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
/*
* Copyright (c) 2021 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0
*
* SPDX-License-Identifier: EPL-2.0
*/
package org.eclipse.ditto.thingsearch.service.common.config;

import java.text.MessageFormat;
import java.util.Objects;

import javax.annotation.concurrent.Immutable;

import org.eclipse.ditto.internal.utils.config.ConfigWithFallback;
import org.eclipse.ditto.internal.utils.config.DittoConfigError;
import org.eclipse.ditto.internal.utils.persistence.mongo.config.MongoDbConfig;
import org.eclipse.ditto.internal.utils.persistence.mongo.config.ReadConcern;
import org.eclipse.ditto.internal.utils.persistence.mongo.config.ReadPreference;

import com.typesafe.config.Config;

/**
* This class is the default implementation of {@link UpdaterPersistenceConfig}.
*/
@Immutable
public final class DefaultUpdaterPersistenceConfig implements UpdaterPersistenceConfig {

private static final String CONFIG_PATH = "persistence";

private final ReadPreference readPreference;
private final ReadConcern readConcern;


private DefaultUpdaterPersistenceConfig(final ConfigWithFallback config) {
final var readPreferenceString =
config.getString(MongoDbConfig.OptionsConfig.OptionsConfigValue.READ_PREFERENCE.getConfigPath());
readPreference = ReadPreference.ofReadPreference(readPreferenceString)
.orElseThrow(() -> {
final String msg =
MessageFormat.format("Could not parse a ReadPreference from configured string <{0}>",
readPreferenceString);
return new DittoConfigError(msg);
});
final var readConcernString =
config.getString(MongoDbConfig.OptionsConfig.OptionsConfigValue.READ_CONCERN.getConfigPath());
readConcern = ReadConcern.ofReadConcern(readConcernString)
.orElseThrow(() -> {
final String msg =
MessageFormat.format("Could not parse a ReadConcern from configured string <{0}>",
readConcernString);
return new DittoConfigError(msg);
});
}

/**
* Returns an instance of DefaultUpdaterPersistenceConfig based on the settings of the specified Config.
*
* @param config is supposed to provide the settings of the stream config at {@value CONFIG_PATH}.
* @return the instance.
* @throws DittoConfigError if {@code config} is invalid.
*/
public static DefaultUpdaterPersistenceConfig of(final Config config) {
return new DefaultUpdaterPersistenceConfig(ConfigWithFallback.newInstance(config, CONFIG_PATH, ConfigValue.values()));
}


@Override
public ReadPreference readPreference() {
return readPreference;
}

@Override
public ReadConcern readConcern() {
return readConcern;
}

@Override
public boolean equals(final Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
final DefaultUpdaterPersistenceConfig that = (DefaultUpdaterPersistenceConfig) o;
return readPreference == that.readPreference && readConcern == that.readConcern;
}

@Override
public int hashCode() {
return Objects.hash(readPreference, readConcern);
}

@Override
public String toString() {
return getClass().getSimpleName() + " [" +
"readPreference=" + readPreference +
", readConcern=" + readConcern +
"]";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,13 @@ public interface UpdaterConfig {
*/
StreamConfig getStreamConfig();

/**
* Returns the updater persistence config.
*
* @return the config.
*/
UpdaterPersistenceConfig getUpdaterPersistenceConfig();

/**
* An enumeration of the known config path expressions and their associated default values for
* UpdaterConfig.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
* Copyright (c) 2021 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0
*
* SPDX-License-Identifier: EPL-2.0
*/
package org.eclipse.ditto.thingsearch.service.common.config;

import javax.annotation.concurrent.Immutable;

import org.eclipse.ditto.internal.utils.config.KnownConfigValue;
import org.eclipse.ditto.internal.utils.persistence.mongo.config.ReadConcern;
import org.eclipse.ditto.internal.utils.persistence.mongo.config.ReadPreference;

/**
* Provides configuration settings of the Search updater persistence.
*/
@Immutable
public interface UpdaterPersistenceConfig {

/**
* Gets the desired read preference that should be used for queries done in the updater persistence.
*
* @return the desired read preference.
*/
ReadPreference readPreference();

/**
* Gets the desired read concern that should be used for queries done in the updater persistence.
*
* @return the desired read concern.
*/
ReadConcern readConcern();

/**
* An enumeration of known config path expressions and their associated default values for {@code UpdaterPersistenceConfig}.
*/
enum ConfigValue implements KnownConfigValue {

/**
* Determines the read preference used for MongoDB connections. See {@link ReadPreference} for available options.
*/
READ_PREFERENCE("readPreference", "primaryPreferred"),

/**
* Determines the read concern used for MongoDB connections. See {@link ReadConcern} for available options.
*/
READ_CONCERN("readConcern", "default");

private final String configPath;
private final Object defaultValue;

ConfigValue(final String configPath, final Object defaultValue) {
this.configPath = configPath;
this.defaultValue = defaultValue;
}

@Override
public Object getDefaultValue() {
return defaultValue;
}

@Override
public String getConfigPath() {
return configPath;
}

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,11 @@
import org.bson.BsonString;
import org.bson.Document;
import org.bson.conversions.Bson;
import org.eclipse.ditto.policies.model.PolicyId;
import org.eclipse.ditto.things.model.ThingId;
import org.eclipse.ditto.policies.api.PolicyReferenceTag;
import org.eclipse.ditto.policies.api.PolicyTag;
import org.eclipse.ditto.policies.model.PolicyId;
import org.eclipse.ditto.things.model.ThingId;
import org.eclipse.ditto.thingsearch.service.common.config.UpdaterPersistenceConfig;
import org.eclipse.ditto.thingsearch.service.persistence.PersistenceConstants;
import org.eclipse.ditto.thingsearch.service.persistence.write.ThingsSearchUpdaterPersistence;
import org.eclipse.ditto.thingsearch.service.persistence.write.model.AbstractWriteModel;
Expand All @@ -54,17 +55,23 @@ public final class MongoThingsSearchUpdaterPersistence implements ThingsSearchUp

private final MongoCollection<Document> collection;

private MongoThingsSearchUpdaterPersistence(final MongoDatabase database) {
collection = database.getCollection(PersistenceConstants.THINGS_COLLECTION_NAME);
private MongoThingsSearchUpdaterPersistence(final MongoDatabase database,
final UpdaterPersistenceConfig updaterPersistenceConfig) {

collection = database.getCollection(PersistenceConstants.THINGS_COLLECTION_NAME)
.withReadConcern(updaterPersistenceConfig.readConcern().getMongoReadConcern())
.withReadPreference(updaterPersistenceConfig.readPreference().getMongoReadPreference());
}

/**
* Constructor.
*
* @param database the database.
* @param updaterPersistenceConfig the updater persistence config to use.
*/
public static ThingsSearchUpdaterPersistence of(final MongoDatabase database) {
return new MongoThingsSearchUpdaterPersistence(database);
public static ThingsSearchUpdaterPersistence of(final MongoDatabase database,
final UpdaterPersistenceConfig updaterPersistenceConfig) {
return new MongoThingsSearchUpdaterPersistence(database, updaterPersistenceConfig);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,8 @@ private SearchUpdaterRootActor(final SearchConfig searchConfig,
updaterStreamWithAcknowledgementsKillSwitch = searchUpdaterStream.start(getContext(), true);

final var searchUpdaterPersistence =
MongoThingsSearchUpdaterPersistence.of(dittoMongoClient.getDefaultDatabase());
MongoThingsSearchUpdaterPersistence.of(dittoMongoClient.getDefaultDatabase(),
updaterConfig.getUpdaterPersistenceConfig());

pubSubMediator.tell(DistPubSubAccess.put(getSelf()), getSelf());

Expand Down
10 changes: 10 additions & 0 deletions thingsearch/service/src/main/resources/things-search.conf
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,16 @@ ditto {
}

}

persistence {
# read preference is one of: primary, primaryPreferred, secondary, secondaryPreferred, nearest
readPreference = ${ditto.mongodb.options.readPreference}
readPreference = ${?UPDATER_PERSISTENCE_MONGO_DB_READ_PREFERENCE}

# read concern is one of: default, local, majority, linearizable, snapshot, available
readConcern = ${ditto.mongodb.options.readConcern}
readConcern = ${?UPDATER_PERSISTENCE_MONGO_DB_READ_CONCERN}
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,8 @@ public static void initTestFixture() {
@Test
public void assertImmutability() {
assertInstancesOf(DefaultUpdaterConfig.class, areImmutable(),
provided(BackgroundSyncConfig.class, DefaultStreamConfig.class).isAlsoImmutable());
provided(BackgroundSyncConfig.class, DefaultStreamConfig.class, DefaultUpdaterPersistenceConfig.class)
.isAlsoImmutable());
}

@Test
Expand Down
Loading

0 comments on commit eb3c00a

Please sign in to comment.