Skip to content

Commit

Permalink
Add config to control read concern and read preference of search pers…
Browse files Browse the repository at this point in the history
…istence.

Signed-off-by: Yufei Cai <yufei.cai@bosch.io>
  • Loading branch information
yufei-cai committed Jun 29, 2022
1 parent e412f32 commit 783e248
Show file tree
Hide file tree
Showing 17 changed files with 136 additions and 48 deletions.
Expand Up @@ -26,18 +26,18 @@
import com.typesafe.config.Config;

/**
* This class is the default implementation of {@link UpdaterPersistenceConfig}.
* This class is the default implementation of {@link SearchPersistenceConfig}.
*/
@Immutable
public final class DefaultUpdaterPersistenceConfig implements UpdaterPersistenceConfig {
public final class DefaultSearchPersistenceConfig implements SearchPersistenceConfig {

private static final String CONFIG_PATH = "persistence";

private final ReadPreference readPreference;
private final ReadConcern readConcern;


private DefaultUpdaterPersistenceConfig(final ConfigWithFallback config) {
private DefaultSearchPersistenceConfig(final ConfigWithFallback config) {
final var readPreferenceString =
config.getString(MongoDbConfig.OptionsConfig.OptionsConfigValue.READ_PREFERENCE.getConfigPath());
readPreference = ReadPreference.ofReadPreference(readPreferenceString)
Expand Down Expand Up @@ -65,8 +65,8 @@ private DefaultUpdaterPersistenceConfig(final ConfigWithFallback config) {
* @return the instance.
* @throws DittoConfigError if {@code config} is invalid.
*/
public static DefaultUpdaterPersistenceConfig of(final Config config) {
return new DefaultUpdaterPersistenceConfig(ConfigWithFallback.newInstance(config, CONFIG_PATH, ConfigValue.values()));
public static DefaultSearchPersistenceConfig of(final Config config) {
return new DefaultSearchPersistenceConfig(ConfigWithFallback.newInstance(config, CONFIG_PATH, ConfigValue.values()));
}


Expand All @@ -88,7 +88,7 @@ public boolean equals(final Object o) {
if (o == null || getClass() != o.getClass()) {
return false;
}
final DefaultUpdaterPersistenceConfig that = (DefaultUpdaterPersistenceConfig) o;
final DefaultSearchPersistenceConfig that = (DefaultSearchPersistenceConfig) o;
return readPreference == that.readPreference && readConcern == that.readConcern;
}

Expand Down
Expand Up @@ -41,7 +41,7 @@ public final class DefaultUpdaterConfig implements UpdaterConfig {
private final double forceUpdateAfterStartRandomFactor;
private final BackgroundSyncConfig backgroundSyncConfig;
private final StreamConfig streamConfig;
private final UpdaterPersistenceConfig updaterPersistenceConfig;
private final SearchPersistenceConfig updaterPersistenceConfig;

private DefaultUpdaterConfig(final ConfigWithFallback updaterScopedConfig) {
maxIdleTime = updaterScopedConfig.getNonNegativeDurationOrThrow(UpdaterConfigValue.MAX_IDLE_TIME);
Expand All @@ -59,7 +59,7 @@ private DefaultUpdaterConfig(final ConfigWithFallback updaterScopedConfig) {
UpdaterConfigValue.FORCE_UPDATE_AFTER_START_RANDOM_FACTOR.getConfigPath());
backgroundSyncConfig = DefaultBackgroundSyncConfig.fromUpdaterConfig(updaterScopedConfig);
streamConfig = DefaultStreamConfig.of(updaterScopedConfig);
updaterPersistenceConfig = DefaultUpdaterPersistenceConfig.of(updaterScopedConfig);
updaterPersistenceConfig = DefaultSearchPersistenceConfig.of(updaterScopedConfig);
}

/**
Expand Down Expand Up @@ -120,7 +120,7 @@ public StreamConfig getStreamConfig() {
}

@Override
public UpdaterPersistenceConfig getUpdaterPersistenceConfig() {
public SearchPersistenceConfig getUpdaterPersistenceConfig() {
return updaterPersistenceConfig;
}

Expand Down
Expand Up @@ -36,6 +36,8 @@
import org.eclipse.ditto.internal.utils.persistence.operations.PersistenceOperationsConfig;
import org.eclipse.ditto.internal.utils.tracing.config.TracingConfig;

import com.typesafe.config.ConfigFactory;

/**
* This class is the default implementation of {@link SearchConfig}.
*/
Expand All @@ -44,6 +46,8 @@ public final class DittoSearchConfig implements SearchConfig, WithConfigPath {

private static final String CONFIG_PATH = "search";

private static final String QUERY_PATH = "query";

private final DittoServiceConfig dittoServiceConfig;
@Nullable private final String mongoHintsByNamespace;
@Nullable private final String queryCriteriaValidator;
Expand All @@ -54,6 +58,7 @@ public final class DittoSearchConfig implements SearchConfig, WithConfigPath {
private final IndexInitializationConfig indexInitializationConfig;
private final PersistenceOperationsConfig persistenceOperationsConfig;
private final MongoDbConfig mongoDbConfig;
private final SearchPersistenceConfig queryPersistenceConfig;

private DittoSearchConfig(final ScopedConfig dittoScopedConfig) {
dittoServiceConfig = DittoServiceConfig.of(dittoScopedConfig, CONFIG_PATH);
Expand All @@ -69,6 +74,11 @@ private DittoSearchConfig(final ScopedConfig dittoScopedConfig) {
searchUpdateObserver = configWithFallback.getStringOrNull(SearchConfigValue.SEARCH_UPDATE_OBSERVER);
updaterConfig = DefaultUpdaterConfig.of(configWithFallback);
indexInitializationConfig = DefaultIndexInitializationConfig.of(configWithFallback);

final var queryConfig = configWithFallback.hasPath(QUERY_PATH)
? configWithFallback.getConfig(QUERY_PATH)
: ConfigFactory.empty();
queryPersistenceConfig = DefaultSearchPersistenceConfig.of(queryConfig);
}

/**
Expand Down Expand Up @@ -111,6 +121,11 @@ public UpdaterConfig getUpdaterConfig() {
return updaterConfig;
}

@Override
public SearchPersistenceConfig getQueryPersistenceConfig() {
return queryPersistenceConfig;
}

@Override
public ClusterConfig getClusterConfig() {
return dittoServiceConfig.getClusterConfig();
Expand Down Expand Up @@ -175,15 +190,15 @@ public boolean equals(final Object o) {
Objects.equals(healthCheckConfig, that.healthCheckConfig) &&
Objects.equals(indexInitializationConfig, that.indexInitializationConfig) &&
Objects.equals(persistenceOperationsConfig, that.persistenceOperationsConfig) &&
Objects.equals(mongoDbConfig, that.mongoDbConfig);
Objects.equals(mongoDbConfig, that.mongoDbConfig) &&
Objects.equals(queryPersistenceConfig, that.queryPersistenceConfig);
}

@Override
public int hashCode() {
return Objects.hash(mongoHintsByNamespace, queryCriteriaValidator, searchUpdateMapper, searchUpdateObserver,
updaterConfig, dittoServiceConfig, healthCheckConfig, indexInitializationConfig,
persistenceOperationsConfig,
mongoDbConfig);
persistenceOperationsConfig, mongoDbConfig, queryPersistenceConfig);
}

@Override
Expand All @@ -199,6 +214,7 @@ public String toString() {
", indexInitializationConfig=" + indexInitializationConfig +
", persistenceOperationsConfig=" + persistenceOperationsConfig +
", mongoDbConfig=" + mongoDbConfig +
", queryPersistenceConfig=" + queryPersistenceConfig +
"]";
}

Expand Down
Expand Up @@ -66,6 +66,13 @@ public interface SearchConfig extends ServiceSpecificConfig, WithHealthCheckConf
*/
UpdaterConfig getUpdaterConfig();

/**
* Returns the query persistence config.
*
* @return the config.
*/
SearchPersistenceConfig getQueryPersistenceConfig();

/**
* An enumeration of the known config path expressions and their associated default values for SearchConfig.
*/
Expand Down
Expand Up @@ -19,27 +19,27 @@
import org.eclipse.ditto.internal.utils.persistence.mongo.config.ReadPreference;

/**
* Provides configuration settings of the Search updater persistence.
* Provides configuration settings of the Search query and updater persistence.
*/
@Immutable
public interface UpdaterPersistenceConfig {
public interface SearchPersistenceConfig {

/**
* Gets the desired read preference that should be used for queries done in the updater persistence.
* Gets the desired read preference that should be used for queries done in the persistence.
*
* @return the desired read preference.
*/
ReadPreference readPreference();

/**
* Gets the desired read concern that should be used for queries done in the updater persistence.
* Gets the desired read concern that should be used for queries done in the persistence.
*
* @return the desired read concern.
*/
ReadConcern readConcern();

/**
* An enumeration of known config path expressions and their associated default values for {@code UpdaterPersistenceConfig}.
* An enumeration of known config path expressions and their associated default values for {@code SearchPersistenceConfig}.
*/
enum ConfigValue implements KnownConfigValue {

Expand Down
Expand Up @@ -100,7 +100,7 @@ public interface UpdaterConfig {
*
* @return the config.
*/
UpdaterPersistenceConfig getUpdaterPersistenceConfig();
SearchPersistenceConfig getUpdaterPersistenceConfig();

/**
* An enumeration of the known config path expressions and their associated default values for
Expand Down
Expand Up @@ -43,6 +43,7 @@
import org.eclipse.ditto.things.model.ThingId;
import org.eclipse.ditto.thingsearch.api.SearchNamespaceReportResult;
import org.eclipse.ditto.thingsearch.api.SearchNamespaceResultEntry;
import org.eclipse.ditto.thingsearch.service.common.config.SearchPersistenceConfig;
import org.eclipse.ditto.thingsearch.service.common.model.ResultList;
import org.eclipse.ditto.thingsearch.service.common.model.ResultListImpl;
import org.eclipse.ditto.thingsearch.service.common.model.TimestampedThingId;
Expand Down Expand Up @@ -95,9 +96,12 @@ public final class MongoThingsSearchPersistence implements ThingsSearchPersisten
* @param mongoClient the mongoDB persistence wrapper.
* @param actorSystem the Akka ActorSystem.
*/
public MongoThingsSearchPersistence(final DittoMongoClient mongoClient, final ActorSystem actorSystem) {
public MongoThingsSearchPersistence(final DittoMongoClient mongoClient, final ActorSystem actorSystem,
final SearchPersistenceConfig persistenceConfig) {
final MongoDatabase database = mongoClient.getDefaultDatabase();
collection = database.getCollection(PersistenceConstants.THINGS_COLLECTION_NAME);
collection = database.getCollection(PersistenceConstants.THINGS_COLLECTION_NAME)
.withReadConcern(persistenceConfig.readConcern().getMongoReadConcern())
.withReadPreference(persistenceConfig.readPreference().getMongoReadPreference());
log = Logging.getLogger(actorSystem, getClass());
indexInitializer = IndexInitializer.of(database, SystemMaterializer.get(actorSystem).materializer());
maxQueryTime = mongoClient.getDittoSettings().getMaxQueryTime();
Expand Down
Expand Up @@ -32,7 +32,7 @@
import org.eclipse.ditto.policies.api.PolicyTag;
import org.eclipse.ditto.policies.model.PolicyId;
import org.eclipse.ditto.things.model.ThingId;
import org.eclipse.ditto.thingsearch.service.common.config.UpdaterPersistenceConfig;
import org.eclipse.ditto.thingsearch.service.common.config.SearchPersistenceConfig;
import org.eclipse.ditto.thingsearch.service.persistence.PersistenceConstants;
import org.eclipse.ditto.thingsearch.service.persistence.write.ThingsSearchUpdaterPersistence;
import org.eclipse.ditto.thingsearch.service.persistence.write.model.AbstractWriteModel;
Expand All @@ -56,7 +56,7 @@ public final class MongoThingsSearchUpdaterPersistence implements ThingsSearchUp
private final MongoCollection<Document> collection;

private MongoThingsSearchUpdaterPersistence(final MongoDatabase database,
final UpdaterPersistenceConfig updaterPersistenceConfig) {
final SearchPersistenceConfig updaterPersistenceConfig) {

collection = database.getCollection(PersistenceConstants.THINGS_COLLECTION_NAME)
.withReadConcern(updaterPersistenceConfig.readConcern().getMongoReadConcern())
Expand All @@ -70,7 +70,7 @@ private MongoThingsSearchUpdaterPersistence(final MongoDatabase database,
* @param updaterPersistenceConfig the updater persistence config to use.
*/
public static ThingsSearchUpdaterPersistence of(final MongoDatabase database,
final UpdaterPersistenceConfig updaterPersistenceConfig) {
final SearchPersistenceConfig updaterPersistenceConfig) {
return new MongoThingsSearchUpdaterPersistence(database, updaterPersistenceConfig);
}

Expand Down
Expand Up @@ -102,8 +102,8 @@ private MongoThingsSearchPersistence getThingsSearchPersistence(final SearchConf
final DittoMongoClient mongoDbClient) {

final ActorContext context = getContext();
final var persistence =
new MongoThingsSearchPersistence(mongoDbClient, context.getSystem());
final var persistenceConfig = searchConfig.getQueryPersistenceConfig();
final var persistence = new MongoThingsSearchPersistence(mongoDbClient, context.getSystem(), persistenceConfig);

final var indexInitializationConfig = searchConfig.getIndexInitializationConfig();
if (indexInitializationConfig.isIndexInitializationConfigEnabled()) {
Expand Down Expand Up @@ -145,8 +145,11 @@ private static ThingsFieldExpressionFactory getThingsFieldExpressionFactory() {
final Map<String, String> mappings = new HashMap<>(6);
mappings.put(FieldExpressionUtil.FIELD_NAME_THING_ID, FieldExpressionUtil.FIELD_ID);
mappings.put(FieldExpressionUtil.FIELD_NAME_NAMESPACE, FieldExpressionUtil.FIELD_NAMESPACE);
addMapping(mappings, Thing.JsonFields.POLICY_ID); // also present as top-level field in search collection, however not indexed
addMapping(mappings, Thing.JsonFields.REVISION); // also present as top-level field in search collection, however not indexed

// also present as top-level field in search collection, whether indexed or not
addMapping(mappings, Thing.JsonFields.POLICY_ID);
addMapping(mappings, Thing.JsonFields.REVISION);

addMapping(mappings, Thing.JsonFields.MODIFIED);
addMapping(mappings, Thing.JsonFields.CREATED);
addMapping(mappings, Thing.JsonFields.DEFINITION);
Expand Down
12 changes: 12 additions & 0 deletions thingsearch/service/src/main/resources/search.conf
Expand Up @@ -61,6 +61,18 @@ ditto {
enabled = ${?INDEX_INITIALIZATION_ENABLED}
}

query {
persistence {
# read preference is one of: primary, primaryPreferred, secondary, secondaryPreferred, nearest
readPreference = ${ditto.mongodb.options.readPreference}
readPreference = ${?QUERY_PERSISTENCE_MONGO_DB_READ_PREFERENCE}

# read concern is one of: default, local, majority, linearizable, snapshot, available
readConcern = ${ditto.mongodb.options.readConcern}
readConcern = ${?QUERY_PERSISTENCE_MONGO_DB_READ_CONCERN}
}
}

updater {
max-idle-time = 25h
max-idle-time = ${?ACTIVITY_CHECK_INTERVAL}
Expand Down
Expand Up @@ -20,7 +20,6 @@
import org.eclipse.ditto.internal.utils.persistence.mongo.config.ReadConcern;
import org.eclipse.ditto.internal.utils.persistence.mongo.config.ReadPreference;
import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;

Expand All @@ -30,9 +29,9 @@
import nl.jqno.equalsverifier.EqualsVerifier;

/**
* Unit tests for {@link DefaultUpdaterPersistenceConfig}.
* Unit tests for {@link DefaultSearchPersistenceConfig}.
*/
public final class DefaultUpdaterPersistenceConfigTest {
public final class DefaultSearchPersistenceConfigTest {


private static Config config;
Expand All @@ -47,45 +46,45 @@ public static void initTestFixture() {

@Test
public void assertImmutability() {
assertInstancesOf(DefaultUpdaterPersistenceConfig.class,
assertInstancesOf(DefaultSearchPersistenceConfig.class,
areImmutable(),
provided(ReadPreference.class).isAlsoImmutable());
}

@Test
public void testHashCodeAndEquals() {
EqualsVerifier.forClass(DefaultUpdaterPersistenceConfig.class)
EqualsVerifier.forClass(DefaultSearchPersistenceConfig.class)
.usingGetClass()
.verify();
}

@Test
public void underTestReturnsDefaultValuesIfBaseConfigWasEmpty() {
final UpdaterPersistenceConfig underTest = DefaultUpdaterPersistenceConfig.of(ConfigFactory.empty());
final SearchPersistenceConfig underTest = DefaultSearchPersistenceConfig.of(ConfigFactory.empty());

softly.assertThat(underTest.readConcern())
.as(UpdaterPersistenceConfig.ConfigValue.READ_CONCERN.getConfigPath())
.as(SearchPersistenceConfig.ConfigValue.READ_CONCERN.getConfigPath())
.isEqualTo(ReadConcern.ofReadConcern(
(String) UpdaterPersistenceConfig.ConfigValue.READ_CONCERN.getDefaultValue())
(String) SearchPersistenceConfig.ConfigValue.READ_CONCERN.getDefaultValue())
.orElseThrow());

softly.assertThat(underTest.readPreference())
.as(UpdaterPersistenceConfig.ConfigValue.READ_PREFERENCE.getConfigPath())
.as(SearchPersistenceConfig.ConfigValue.READ_PREFERENCE.getConfigPath())
.isEqualTo(ReadPreference.ofReadPreference(
(String) UpdaterPersistenceConfig.ConfigValue.READ_PREFERENCE.getDefaultValue())
(String) SearchPersistenceConfig.ConfigValue.READ_PREFERENCE.getDefaultValue())
.orElseThrow());
}

@Test
public void underTestReturnsValuesOfConfigFile() {
final UpdaterPersistenceConfig underTest = DefaultUpdaterPersistenceConfig.of(config);
final SearchPersistenceConfig underTest = DefaultSearchPersistenceConfig.of(config);

softly.assertThat(underTest.readConcern())
.as(UpdaterPersistenceConfig.ConfigValue.READ_CONCERN.getConfigPath())
.as(SearchPersistenceConfig.ConfigValue.READ_CONCERN.getConfigPath())
.isEqualTo(ReadConcern.AVAILABLE);

softly.assertThat(underTest.readPreference())
.as(UpdaterPersistenceConfig.ConfigValue.READ_PREFERENCE.getConfigPath())
.as(SearchPersistenceConfig.ConfigValue.READ_PREFERENCE.getConfigPath())
.isEqualTo(ReadPreference.SECONDARY_PREFERRED);
}

Expand Down
Expand Up @@ -19,7 +19,6 @@
import org.assertj.core.api.JUnitSoftAssertions;
import org.eclipse.ditto.thingsearch.service.common.config.UpdaterConfig.UpdaterConfigValue;
import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;

Expand All @@ -46,7 +45,7 @@ public static void initTestFixture() {
@Test
public void assertImmutability() {
assertInstancesOf(DefaultUpdaterConfig.class, areImmutable(),
provided(BackgroundSyncConfig.class, DefaultStreamConfig.class, DefaultUpdaterPersistenceConfig.class)
provided(BackgroundSyncConfig.class, DefaultStreamConfig.class, DefaultSearchPersistenceConfig.class)
.isAlsoImmutable());
}

Expand Down

0 comments on commit 783e248

Please sign in to comment.