Skip to content

Commit

Permalink
Make SearchUpdateMapper a configurable DittoExtensionPoint
Browse files Browse the repository at this point in the history
Signed-off-by: Yannic Klem <yannic.klem@bosch.io>
  • Loading branch information
Yannic92 committed Jul 13, 2022
1 parent 4ee6520 commit ad6d022
Show file tree
Hide file tree
Showing 8 changed files with 52 additions and 34 deletions.
Expand Up @@ -17,6 +17,8 @@
import org.eclipse.ditto.thingsearch.service.persistence.write.model.AbstractWriteModel;
import org.eclipse.ditto.thingsearch.service.updater.actors.MongoWriteModel;

import com.typesafe.config.Config;

import akka.NotUsed;
import akka.actor.ActorSystem;
import akka.stream.javadsl.Source;
Expand All @@ -32,8 +34,8 @@ public final class DefaultSearchUpdateMapper extends SearchUpdateMapper {
* Instantiate this provider. Called by reflection.
*/
@SuppressWarnings("unused")
private DefaultSearchUpdateMapper(final ActorSystem actorSystem) {
super(actorSystem);
private DefaultSearchUpdateMapper(final ActorSystem actorSystem, final Config config) {
super(actorSystem, config);
// Nothing to initialize.
}

Expand Down
Expand Up @@ -12,19 +12,19 @@
*/
package org.eclipse.ditto.thingsearch.service.persistence.write.streaming;

import java.util.List;
import static org.eclipse.ditto.base.model.common.ConditionChecker.checkNotNull;

import org.eclipse.ditto.base.service.DittoExtensionIds;
import org.eclipse.ditto.base.service.DittoExtensionPoint;
import org.eclipse.ditto.internal.utils.akka.AkkaClassLoader;
import org.eclipse.ditto.internal.utils.metrics.instruments.timer.StartedTimer;
import org.eclipse.ditto.thingsearch.service.persistence.write.model.AbstractWriteModel;
import org.eclipse.ditto.thingsearch.service.updater.actors.MongoWriteModel;
import org.slf4j.Logger;

import com.typesafe.config.Config;

import akka.NotUsed;
import akka.actor.AbstractExtensionId;
import akka.actor.ActorSystem;
import akka.actor.ExtendedActorSystem;
import akka.stream.javadsl.Source;

/**
Expand All @@ -36,11 +36,8 @@
*/
public abstract class SearchUpdateMapper implements DittoExtensionPoint {

private static final ExtensionId EXTENSION_ID = new ExtensionId();
protected final ActorSystem actorSystem;

protected SearchUpdateMapper(final ActorSystem actorSystem) {
this.actorSystem = actorSystem;
protected SearchUpdateMapper(final ActorSystem actorSystem, final Config config) {
//No-Op
}

/**
Expand All @@ -59,8 +56,13 @@ public abstract Source<MongoWriteModel, NotUsed> processWriteModel(AbstractWrite
* @param actorSystem The actor system in which to load the listener.
* @return The listener.
*/
public static SearchUpdateMapper get(final ActorSystem actorSystem) {
return EXTENSION_ID.get(actorSystem);
public static SearchUpdateMapper get(final ActorSystem actorSystem, final Config config) {
checkNotNull(actorSystem, "actorSystem");
checkNotNull(config, "config");
final var extensionIdConfig = ExtensionId.computeConfig(config);
return DittoExtensionIds.get(actorSystem)
.computeIfAbsent(extensionIdConfig, ExtensionId::new)
.get(actorSystem);
}

/**
Expand All @@ -85,8 +87,7 @@ public static SearchUpdateMapper get(final ActorSystem actorSystem) {
logger.debug("MongoWriteModel={}", result);
return Source.single(result);
}
}
catch (final Exception error) {
} catch (final Exception error) {
logger.error("Failed to compute write model " + model, error);
try {
model.getMetadata().getTimers().forEach(StartedTimer::stop);
Expand All @@ -100,16 +101,24 @@ public static SearchUpdateMapper get(final ActorSystem actorSystem) {
/**
* ID of the actor system extension to validate the {@code SearchUpdateListener}.
*/
private static final class ExtensionId extends AbstractExtensionId<SearchUpdateMapper> {
private static final String CONFIG_PATH = "ditto.search.search-update-mapper.implementation";
private static final class ExtensionId extends DittoExtensionPoint.ExtensionId<SearchUpdateMapper> {

private static final String CONFIG_KEY = "search-update-mapper";
private static final String CONFIG_PATH = "ditto.extensions." + CONFIG_KEY;

private ExtensionId(final ExtensionIdConfig<SearchUpdateMapper> extensionIdConfig) {
super(extensionIdConfig);
}

static ExtensionIdConfig<SearchUpdateMapper> computeConfig(final Config config) {
return ExtensionIdConfig.of(SearchUpdateMapper.class, config, CONFIG_KEY);
}

@Override
public SearchUpdateMapper createExtension(final ExtendedActorSystem system) {
final String implementation = system.settings().config().getString(CONFIG_PATH);
return AkkaClassLoader.instantiate(system, SearchUpdateMapper.class,
implementation,
List.of(ActorSystem.class),
List.of(system));
protected String getConfigPath() {
return CONFIG_PATH;
}

}

}
Expand Up @@ -17,6 +17,7 @@
import org.eclipse.ditto.internal.utils.akka.streaming.TimestampPersistence;
import org.eclipse.ditto.internal.utils.cluster.ClusterUtil;
import org.eclipse.ditto.internal.utils.cluster.DistPubSubAccess;
import org.eclipse.ditto.internal.utils.config.ScopedConfig;
import org.eclipse.ditto.internal.utils.health.RetrieveHealth;
import org.eclipse.ditto.internal.utils.namespaces.BlockedNamespaces;
import org.eclipse.ditto.internal.utils.persistence.mongo.DittoMongoClient;
Expand Down Expand Up @@ -90,7 +91,8 @@ private SearchUpdaterRootActor(final SearchConfig searchConfig,

final ActorRef thingsShard = shardRegionFactory.getThingsShardRegion(numberOfShards);
final ActorRef policiesShard = shardRegionFactory.getPoliciesShardRegion(numberOfShards);
final var searchUpdateMapper = SearchUpdateMapper.get(actorSystem);
final var dittoExtensionsConfig = ScopedConfig.dittoExtension(actorSystem.settings().config());
final var searchUpdateMapper = SearchUpdateMapper.get(actorSystem, dittoExtensionsConfig);
final SearchUpdaterStream searchUpdaterStream =
SearchUpdaterStream.of(updaterConfig, actorSystem, thingsShard, policiesShard,
dittoMongoClient.getDefaultDatabase(), blockedNamespaces,
Expand Down
3 changes: 1 addition & 2 deletions thingsearch/service/src/main/resources/search.conf
Expand Up @@ -15,6 +15,7 @@ ditto {
pre-enforcers = []
}
}
search-update-mapper = org.eclipse.ditto.thingsearch.service.persistence.write.streaming.DefaultSearchUpdateMapper
}

mongodb {
Expand Down Expand Up @@ -57,8 +58,6 @@ ditto {
}

search {
search-update-mapper.implementation = org.eclipse.ditto.thingsearch.service.persistence.write.streaming.DefaultSearchUpdateMapper
search-update-mapper.implementation = ${?SEARCH_UPDATE_MAPPER_IMPLEMENTATION}

mongo-hints-by-namespace = ${?MONGO_HINTS_BY_NAMESPACE}

Expand Down
Expand Up @@ -23,21 +23,22 @@
import javax.annotation.Nullable;

import org.bson.Document;
import org.eclipse.ditto.base.service.config.limits.DefaultLimitsConfig;
import org.eclipse.ditto.internal.utils.config.ScopedConfig;
import org.eclipse.ditto.internal.utils.persistence.mongo.DittoMongoClient;
import org.eclipse.ditto.internal.utils.persistence.mongo.MongoClientWrapper;
import org.eclipse.ditto.internal.utils.test.mongo.MongoDbResource;
import org.eclipse.ditto.rql.query.Query;
import org.eclipse.ditto.rql.query.QueryBuilderFactory;
import org.eclipse.ditto.rql.query.criteria.CriteriaFactory;
import org.eclipse.ditto.rql.query.expression.FieldExpressionUtil;
import org.eclipse.ditto.rql.query.expression.ThingsFieldExpressionFactory;
import org.eclipse.ditto.things.model.ThingId;
import org.eclipse.ditto.base.service.config.limits.DefaultLimitsConfig;
import org.eclipse.ditto.thingsearch.service.common.model.ResultList;
import org.eclipse.ditto.thingsearch.service.persistence.read.MongoThingsSearchPersistence;
import org.eclipse.ditto.thingsearch.service.persistence.read.query.MongoQueryBuilderFactory;
import org.eclipse.ditto.thingsearch.service.persistence.write.streaming.SearchUpdateMapper;
import org.eclipse.ditto.thingsearch.service.persistence.write.streaming.TestSearchUpdaterStream;
import org.eclipse.ditto.internal.utils.persistence.mongo.DittoMongoClient;
import org.eclipse.ditto.internal.utils.persistence.mongo.MongoClientWrapper;
import org.eclipse.ditto.internal.utils.test.mongo.MongoDbResource;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
Expand Down Expand Up @@ -115,7 +116,9 @@ private MongoThingsSearchPersistence provideReadPersistence() {
}

private TestSearchUpdaterStream provideWritePersistence() {
return TestSearchUpdaterStream.of(mongoClient.getDefaultDatabase(), SearchUpdateMapper.get(actorSystem));
final var dittoExtensionsConfig = ScopedConfig.dittoExtension(actorSystem.settings().config());
return TestSearchUpdaterStream.of(mongoClient.getDefaultDatabase(),
SearchUpdateMapper.get(actorSystem, dittoExtensionsConfig));
}

private static DittoMongoClient provideClientWrapper() {
Expand Down
Expand Up @@ -31,6 +31,7 @@
import org.eclipse.ditto.base.model.headers.DittoHeaders;
import org.eclipse.ditto.base.model.signals.commands.Command;
import org.eclipse.ditto.base.service.config.limits.DefaultLimitsConfig;
import org.eclipse.ditto.internal.utils.config.ScopedConfig;
import org.eclipse.ditto.internal.utils.persistence.mongo.DittoMongoClient;
import org.eclipse.ditto.internal.utils.persistence.mongo.MongoClientWrapper;
import org.eclipse.ditto.internal.utils.test.mongo.MongoDbResource;
Expand Down Expand Up @@ -130,7 +131,9 @@ private MongoThingsSearchPersistence provideReadPersistence() {
}

private static TestSearchUpdaterStream provideWritePersistence(final ActorSystem system) {
return TestSearchUpdaterStream.of(mongoClient.getDefaultDatabase(), SearchUpdateMapper.get(system));
final var dittoExtensionsConfig = ScopedConfig.dittoExtension(system.settings().config());
return TestSearchUpdaterStream.of(mongoClient.getDefaultDatabase(),
SearchUpdateMapper.get(system, dittoExtensionsConfig));
}

private static DittoMongoClient provideClientWrapper() {
Expand Down
2 changes: 1 addition & 1 deletion thingsearch/service/src/test/resources/actors-test.conf
Expand Up @@ -11,6 +11,7 @@ ditto {
]
}
}
search-update-mapper = "org.eclipse.ditto.thingsearch.service.persistence.write.streaming.DefaultSearchUpdateMapper"
}
mapping-strategy.implementation = "org.eclipse.ditto.thingsearch.api.ThingSearchMappingStrategies"

Expand Down Expand Up @@ -53,7 +54,6 @@ ditto {
}

search {
search-update-mapper.implementation = "org.eclipse.ditto.thingsearch.service.persistence.write.streaming.DefaultSearchUpdateMapper"
mongo-hints-by-namespace = ${?MONGO_HINTS_BY_NAMESPACE}

index-initialization {
Expand Down
2 changes: 1 addition & 1 deletion thingsearch/service/src/test/resources/test.conf
Expand Up @@ -4,6 +4,7 @@ ditto {
caching-signal-enrichment-facade-provider = org.eclipse.ditto.thingsearch.service.persistence.write.streaming.DittoCachingSignalEnrichmentFacadeProvider
search-update-observer = org.eclipse.ditto.thingsearch.service.updater.actors.DefaultSearchUpdateObserver
query-criteria-validator = org.eclipse.ditto.thingsearch.service.persistence.query.validation.DefaultQueryCriteriaValidator
search-update-mapper = "org.eclipse.ditto.thingsearch.service.persistence.write.streaming.DefaultSearchUpdateMapper"
}
mongodb {
uri = "mongodb://localhost:27017/test"
Expand All @@ -26,7 +27,6 @@ ditto {
query {
mongodb.timeout = 5s
}
search-update-mapper.implementation = "org.eclipse.ditto.thingsearch.service.persistence.write.streaming.DefaultSearchUpdateMapper"
}
}

Expand Down

0 comments on commit ad6d022

Please sign in to comment.