Skip to content

Commit

Permalink
Allow custom configuration for the ConnectionPriorityProviderFactory …
Browse files Browse the repository at this point in the history
…extension

Signed-off-by: Yannic Klem <yannic.klem@bosch.io>
  • Loading branch information
Yannic92 committed Jul 11, 2022
1 parent a5cdf86 commit 9e5bb7a
Show file tree
Hide file tree
Showing 4 changed files with 32 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@
import org.eclipse.ditto.internal.utils.akka.logging.DittoLoggerFactory;
import org.eclipse.ditto.internal.utils.cluster.DistPubSubAccess;
import org.eclipse.ditto.internal.utils.config.InstanceIdentifierSupplier;
import org.eclipse.ditto.internal.utils.config.ScopedConfig;
import org.eclipse.ditto.internal.utils.metrics.DittoMetrics;
import org.eclipse.ditto.internal.utils.persistence.mongo.config.ActivityCheckConfig;
import org.eclipse.ditto.internal.utils.persistence.mongo.config.SnapshotConfig;
Expand Down Expand Up @@ -211,7 +212,9 @@ public final class ConnectionPersistenceActor
commandValidator = getCommandValidator();
final ConnectionConfig connectionConfig = connectivityConfig.getConnectionConfig();
this.allClientActorsOnOneNode = allClientActorsOnOneNode.orElse(connectionConfig.areAllClientActorsOnOneNode());
connectionPriorityProvider = ConnectionPriorityProviderFactory.get(actorSystem).newProvider(self(), log);
final Config dittoExtensionsConfig = ScopedConfig.dittoExtension(actorSystem.settings().config());
connectionPriorityProvider = ConnectionPriorityProviderFactory.get(actorSystem, dittoExtensionsConfig)
.newProvider(self(), log);
clientActorAskTimeout = connectionConfig.getClientActorAskTimeout();
final MonitoringConfig monitoringConfig = connectivityConfig.getMonitoringConfig();
connectionLoggerRegistry = ConnectionLoggerRegistry.fromConfig(monitoringConfig.logger());
Expand Down Expand Up @@ -1188,7 +1191,8 @@ private ConnectivityCommandInterceptor getCommandValidator() {
HttpPushValidator.newInstance(connectivityConfig.getConnectionConfig().getHttpPushConfig()));

final DittoConnectivityCommandValidator dittoCommandValidator =
new DittoConnectivityCommandValidator(propsFactory, commandForwarderActor, getSelf(), connectionValidator,
new DittoConnectivityCommandValidator(propsFactory, commandForwarderActor, getSelf(),
connectionValidator,
actorSystem);

final var customCommandValidator =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,12 @@

import static org.eclipse.ditto.base.model.common.ConditionChecker.checkNotNull;

import org.eclipse.ditto.base.service.DittoExtensionIds;
import org.eclipse.ditto.base.service.DittoExtensionPoint;
import org.eclipse.ditto.internal.utils.akka.logging.DittoDiagnosticLoggingAdapter;

import com.typesafe.config.Config;

import akka.actor.ActorRef;
import akka.actor.ActorSystem;

Expand All @@ -40,21 +43,30 @@ public interface ConnectionPriorityProviderFactory extends DittoExtensionPoint {
* {@code ActorSystem}.
*
* @param actorSystem the actorSystem in which the {@code ConnectionPriorityProviderFactory} should be loaded.
* @param config the configuration of this extension.
* @return the {@code ConnectionPriorityProviderFactory} implementation.
* @throws NullPointerException if {@code actorSystem} is {@code null}.
*/
static ConnectionPriorityProviderFactory get(final ActorSystem actorSystem) {
static ConnectionPriorityProviderFactory get(final ActorSystem actorSystem, final Config config) {
checkNotNull(actorSystem, "actorSystem");
return ExtensionId.INSTANCE.get(actorSystem);
checkNotNull(config, "config");
final var extensionIdConfig = ExtensionId.computeConfig(config);
return DittoExtensionIds.get(actorSystem)
.computeIfAbsent(extensionIdConfig, ExtensionId::new)
.get(actorSystem);
}

final class ExtensionId extends DittoExtensionPoint.ExtensionId<ConnectionPriorityProviderFactory> {

private static final String CONFIG_PATH = "ditto.connectivity.connection.connection-priority-provider-factory";
private static final ExtensionId INSTANCE = new ExtensionId(ConnectionPriorityProviderFactory.class);
private static final String CONFIG_KEY = "connection-priority-provider-factory";
private static final String CONFIG_PATH = "ditto.extensions." + CONFIG_KEY;

private ExtensionId(final ExtensionIdConfig<ConnectionPriorityProviderFactory> extensionIdConfig) {
super(extensionIdConfig);
}

private ExtensionId(final Class<ConnectionPriorityProviderFactory> parentClass) {
super(parentClass);
static ExtensionIdConfig<ConnectionPriorityProviderFactory> computeConfig(final Config config) {
return ExtensionIdConfig.of(ConnectionPriorityProviderFactory.class, config, CONFIG_KEY);
}

@Override
Expand Down
9 changes: 5 additions & 4 deletions connectivity/service/src/main/resources/connectivity.conf
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,11 @@ ditto {
database = ${?MONGO_DB_DATABASE}
}

extensions {
# Factory for custom connection priority provider.
connection-priority-provider-factory = "org.eclipse.ditto.connectivity.service.messaging.persistence.UsageBasedPriorityProviderFactory"
}

persistence.operations.delay-after-persistence-actor-shutdown = 5s
persistence.operations.delay-after-persistence-actor-shutdown = ${?DELAY_AFTER_PERSISTENCE_ACTOR_SHUTDOWN}

Expand Down Expand Up @@ -160,10 +165,6 @@ ditto {
client-actor-props-factory = "org.eclipse.ditto.connectivity.service.messaging.DefaultClientActorPropsFactory"
client-actor-props-factory = ${?CONNECTIVITY_CLIENT_ACTOR_PROPS_FACTORY}

# Factory for custom connection priority provider.
connection-priority-provider-factory = "org.eclipse.ditto.connectivity.service.messaging.persistence.UsageBasedPriorityProviderFactory"
connection-priority-provider-factory = ${?CONNECTIVITY_CONNECTION_PRIORITY_PROVIDER_FACTORY}


acknowledgement {
# lifetime of ack forwarder. Must be bigger than the largest possible command timeout (60s)
Expand Down
4 changes: 3 additions & 1 deletion connectivity/service/src/test/resources/test.conf
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ akka.cluster.jmx.multi-mbeans-in-same-jvm = on
akka.cluster.roles = ["thing-event-aware", "live-signal-aware", "acks-aware", "policy-announcement-aware"]

ditto {
extensions {
connection-priority-provider-factory = org.eclipse.ditto.connectivity.service.messaging.persistence.UsageBasedPriorityProviderFactory
}
mapping-strategy.implementation = "org.eclipse.ditto.connectivity.api.ConnectivityMappingStrategies"
pre-enforcers = [
"org.eclipse.ditto.policies.enforcement.pre.CommandWithOptionalEntityPreEnforcer",
Expand Down Expand Up @@ -52,7 +55,6 @@ ditto {
connection {
enforcer-actor-props-factory = org.eclipse.ditto.connectivity.service.enforcement.NoOpEnforcerActorPropsFactory
custom-command-interceptor-provider = org.eclipse.ditto.connectivity.service.messaging.validation.NoOpConnectivityCommandInterceptorProvider
connection-priority-provider-factory = org.eclipse.ditto.connectivity.service.messaging.persistence.UsageBasedPriorityProviderFactory
client-actor-props-factory = org.eclipse.ditto.connectivity.service.messaging.DefaultClientActorPropsFactory
// allow localhost in unit tests
blocked-hostnames = ""
Expand Down

0 comments on commit 9e5bb7a

Please sign in to comment.