Skip to content

Commit

Permalink
change cluster role, root actor path and mongodb collection name of t…
Browse files Browse the repository at this point in the history
…hing search to avoid conflict with existing search, make actor path of search actor configurable in concierge service

Signed-off-by: Dominik Guggemos <dominik.guggemos@bosch.io>
  • Loading branch information
dguggemos committed Apr 9, 2022
1 parent 4fc79a7 commit a736eca
Show file tree
Hide file tree
Showing 9 changed files with 77 additions and 31 deletions.
Expand Up @@ -15,6 +15,7 @@
import javax.annotation.concurrent.Immutable;

import org.eclipse.ditto.base.service.config.ServiceSpecificConfig;
import org.eclipse.ditto.internal.utils.config.KnownConfigValue;
import org.eclipse.ditto.internal.utils.health.config.WithHealthCheckConfig;

/**
Expand Down Expand Up @@ -44,4 +45,38 @@ public interface ConciergeConfig extends ServiceSpecificConfig, WithHealthCheckC
*/
ThingsAggregatorConfig getThingsAggregatorConfig();

/**
* @return the path where to dispatch search requests
*/
String getSearchActorPath();

/**
* An enumeration of the known config path expressions and their associated default values for {@code ConciergeConfig}.
*/
enum ConciergeConfigValue implements KnownConfigValue {

/**
* The path of the search actor where to dispatch search requests.
*/
SEARCH_ACTOR_PATH("search-actor-path", "/user/thingsWildcardSearchRoot/thingsSearch");

private final String path;
private final Object defaultValue;

ConciergeConfigValue(final String thePath, final Object theDefaultValue) {
path = thePath;
defaultValue = theDefaultValue;
}

@Override
public String getConfigPath() {
return path;
}

@Override
public Object getDefaultValue() {
return defaultValue;
}

}
}
Expand Up @@ -21,6 +21,7 @@
import org.eclipse.ditto.base.service.config.http.HttpConfig;
import org.eclipse.ditto.base.service.config.limits.LimitsConfig;
import org.eclipse.ditto.internal.utils.cluster.config.ClusterConfig;
import org.eclipse.ditto.internal.utils.config.ConfigWithFallback;
import org.eclipse.ditto.internal.utils.config.ScopedConfig;
import org.eclipse.ditto.internal.utils.config.WithConfigPath;
import org.eclipse.ditto.internal.utils.health.config.DefaultHealthCheckConfig;
Expand All @@ -41,13 +42,17 @@ public final class DittoConciergeConfig implements ConciergeConfig, WithConfigPa
private final DefaultEnforcementConfig enforcementConfig;
private final DefaultCachesConfig cachesConfig;
private final DefaultThingsAggregatorConfig thingsAggregatorConfig;
private final String searchActorPath;

private DittoConciergeConfig(final ScopedConfig dittoScopedConfig) {
serviceSpecificConfig = DittoServiceConfig.of(dittoScopedConfig, CONFIG_PATH);
healthCheckConfig = DefaultHealthCheckConfig.of(dittoScopedConfig);
enforcementConfig = DefaultEnforcementConfig.of(serviceSpecificConfig);
cachesConfig = DefaultCachesConfig.of(serviceSpecificConfig);
thingsAggregatorConfig = DefaultThingsAggregatorConfig.of(serviceSpecificConfig);
final ConfigWithFallback conciergeConfig =
ConfigWithFallback.newInstance(dittoScopedConfig, CONFIG_PATH, ConciergeConfigValue.values());
searchActorPath = conciergeConfig.getString(ConciergeConfigValue.SEARCH_ACTOR_PATH.getConfigPath());
}

/**
Expand Down Expand Up @@ -107,6 +112,11 @@ public TracingConfig getTracingConfig() {
return serviceSpecificConfig.getTracingConfig();
}

@Override
public String getSearchActorPath() {
return searchActorPath;
}

@Override
public String getConfigPath() {
return CONFIG_PATH;
Expand All @@ -125,13 +135,14 @@ public boolean equals(@Nullable final Object o) {
healthCheckConfig.equals(that.healthCheckConfig) &&
enforcementConfig.equals(that.enforcementConfig) &&
cachesConfig.equals(that.cachesConfig) &&
thingsAggregatorConfig.equals(that.thingsAggregatorConfig);
thingsAggregatorConfig.equals(that.thingsAggregatorConfig) &&
searchActorPath.equals(that.searchActorPath);
}

@Override
public int hashCode() {
return Objects.hash(serviceSpecificConfig, healthCheckConfig, enforcementConfig, cachesConfig,
thingsAggregatorConfig);
thingsAggregatorConfig, searchActorPath);
}

@Override
Expand All @@ -142,6 +153,7 @@ public String toString() {
", enforcementConfig=" + enforcementConfig +
", cachesConfig=" + cachesConfig +
", thingsAggregatorConfig=" + thingsAggregatorConfig +
", searchActorPath=" + searchActorPath +
"]";
}
}
Expand Up @@ -13,7 +13,6 @@
package org.eclipse.ditto.concierge.service.starter.actors;

import static org.eclipse.ditto.concierge.api.ConciergeMessagingConstants.DISPATCHER_ACTOR_PATH;
import static org.eclipse.ditto.thingsearch.api.ThingsSearchConstants.SEARCH_ACTOR_PATH;

import java.util.Arrays;
import java.util.HashMap;
Expand Down Expand Up @@ -123,8 +122,9 @@ protected int getBufferSize() {
* @param enforcerActor address of the enforcer actor.
* @return the Props object.
*/
public static Props props(final ActorRef pubSubMediator, final ActorRef enforcerActor) {
return props(pubSubMediator, enforcerActor, CompletableFuture::completedFuture);
public static Props props(final String searchActorPath, final ActorRef pubSubMediator,
final ActorRef enforcerActor) {
return props(searchActorPath, pubSubMediator, enforcerActor, CompletableFuture::completedFuture);
}

/**
Expand All @@ -135,12 +135,11 @@ public static Props props(final ActorRef pubSubMediator, final ActorRef enforcer
* @param preEnforcer the pre-enforcer as graph.
* @return the Props object.
*/
public static Props props(final ActorRef pubSubMediator,
final ActorRef enforcerActor,
final PreEnforcer preEnforcer) {
public static Props props(final String searchActorPath, final ActorRef pubSubMediator,
final ActorRef enforcerActor, final PreEnforcer preEnforcer) {

final Flow<ImmutableDispatch, ImmutableDispatch, NotUsed> dispatchFlow =
Flow.fromGraph(createDispatchFlow(pubSubMediator, preEnforcer));
Flow.fromGraph(createDispatchFlow(searchActorPath, pubSubMediator, preEnforcer));

return Props.create(DispatcherActor.class, enforcerActor, pubSubMediator, dispatchFlow);
}
Expand All @@ -152,7 +151,7 @@ public static Props props(final ActorRef pubSubMediator,
* @return stream to dispatch search and thing commands.
*/
private static Graph<FlowShape<ImmutableDispatch, ImmutableDispatch>, NotUsed> createDispatchFlow(
final ActorRef pubSubMediator,
final String searchActorPath, final ActorRef pubSubMediator,
final PreEnforcer preEnforcer) {

return GraphDSL.create(builder -> {
Expand All @@ -163,7 +162,7 @@ private static Graph<FlowShape<ImmutableDispatch, ImmutableDispatch>, NotUsed> c
builder.add(multiplexBy(RetrieveThings.class, SudoRetrieveThings.class));

final SinkShape<ImmutableDispatch> forwardToSearchActor =
builder.add(searchActorSink(pubSubMediator, preEnforcer));
builder.add(searchActorSink(searchActorPath, pubSubMediator, preEnforcer));

final SinkShape<ImmutableDispatch> forwardToThingsAggregator =
builder.add(thingsAggregatorSink(preEnforcer));
Expand All @@ -185,8 +184,8 @@ private static Graph<FanOutShape2<ImmutableDispatch, ImmutableDispatch, Immutabl
: Optional.empty());
}

private static Sink<ImmutableDispatch, ?> searchActorSink(final ActorRef pubSubMediator,
final PreEnforcer preEnforcer) {
private static Sink<ImmutableDispatch, ?> searchActorSink(final String searchActorPath,
final ActorRef pubSubMediator, final PreEnforcer preEnforcer) {
return Sink.foreach(dispatchToPreEnforce ->
preEnforce(dispatchToPreEnforce, preEnforcer, dispatch -> {
final DittoHeadersSettable<?> command = dispatch.message;
Expand All @@ -201,17 +200,14 @@ private static Graph<FanOutShape2<ImmutableDispatch, ImmutableDispatch, Immutabl
if (searchCommand instanceof ThingSearchQueryCommand) {
final String filter = ((ThingSearchQueryCommand<?>) searchCommand)
.getFilter().orElse(null);
l.withCorrelationId(command).info(
"Forwarding search query command type <{}> with filter <{}> and " +
"fields <{}>",
searchCommand.getType(),
filter,
searchCommand.getSelectedFields().orElse(null));
l.withCorrelationId(command)
.info("Forwarding search query command type <{}> with filter <{}> and fields <{}>",
searchCommand.getType(), filter,
searchCommand.getSelectedFields().orElse(null));
}
});
}
pubSubMediator.tell(
DistPubSubAccess.send(SEARCH_ACTOR_PATH, dispatch.getMessage()),
pubSubMediator.tell(DistPubSubAccess.send(searchActorPath, dispatch.getMessage()),
dispatch.getSender());
})
);
Expand Down
Expand Up @@ -140,8 +140,8 @@ public ActorRef startEnforcerActor(final ActorContext context,
ConciergeEnforcerClusterRouterFactory.createConciergeEnforcerClusterRouter(context,
conciergeConfig.getClusterConfig().getNumberOfShards());

context.actorOf(DispatcherActor.props(pubSubMediator, conciergeEnforcerRouter),
DispatcherActor.ACTOR_NAME);
context.actorOf(DispatcherActor.props(conciergeConfig.getSearchActorPath(), pubSubMediator,
conciergeEnforcerRouter), DispatcherActor.ACTOR_NAME);

final ActorRef conciergeForwarder =
context.actorOf(ConciergeForwarderActor.props(pubSubMediator, conciergeEnforcerRouter),
Expand Down
3 changes: 3 additions & 0 deletions concierge/service/src/main/resources/concierge.conf
Expand Up @@ -113,6 +113,9 @@ ditto {
max-parallelism = 20
max-parallelism = ${?THINGS_AGGREGATOR_MAX_PARALLELISM}
}

search-actor-path = "/user/thingsWildcardSearchRoot/thingsSearch"
search-actor-path = ${?CONCIERGE_SEARCH_ACTOR_PATH}
}
}

Expand Down
Expand Up @@ -23,20 +23,20 @@ public final class ThingsSearchConstants {
/**
* Name of the shard region for Things-Search updater.
*/
public static final String SHARD_REGION = "search-updater";
public static final String SHARD_REGION = "search-wildcard-updater";

/**
* Name of the akka cluster role.
*/
public static final String CLUSTER_ROLE = "things-search";
public static final String CLUSTER_ROLE = "things-wildcard-search";

@SuppressWarnings("squid:S1075")
private static final String USER_PATH = "/user";

/**
* Path of the root actor.
*/
public static final String ROOT_ACTOR_PATH = USER_PATH + "/thingsSearchRoot";
public static final String ROOT_ACTOR_PATH = USER_PATH + "/thingsWildcardSearchRoot";

/**
* Path of the updater root actor.
Expand Down
Expand Up @@ -53,7 +53,7 @@ public final class SearchRootActor extends DittoRootActor {
/**
* The name of this Actor in the ActorSystem.
*/
public static final String ACTOR_NAME = "thingsSearchRoot";
public static final String ACTOR_NAME = "thingsWildcardSearchRoot";

private final LoggingAdapter log;

Expand Down
Expand Up @@ -56,9 +56,9 @@ public final class SearchUpdaterRootActor extends AbstractActor {
/**
* The main cluster role of the cluster member where this actor and its children start.
*/
public static final String CLUSTER_ROLE = "things-search";
public static final String CLUSTER_ROLE = "things-wildcard-search";

private static final String SEARCH_ROLE = "things-search";
private static final String SEARCH_ROLE = "things-wildcard-search";

private final LoggingAdapter log = Logging.getLogger(getContext().system(), this);

Expand Down
4 changes: 2 additions & 2 deletions thingsearch/service/src/main/resources/things-search.conf
Expand Up @@ -7,7 +7,7 @@ ditto {

mongodb {

database = "searchDB"
database = "search"
database = ${?MONGO_DB_DATABASE}

pool {
Expand Down Expand Up @@ -259,7 +259,7 @@ akka {
}

roles = [
"things-search",
"things-wildcard-search",
"blocked-namespaces-aware",
"thing-event-aware"
]
Expand Down

0 comments on commit a736eca

Please sign in to comment.