Skip to content

Commit

Permalink
Allow custom configuration for the ClientActorPropsFactory extension
Browse files Browse the repository at this point in the history
Signed-off-by: Yannic Klem <yannic.klem@bosch.io>
  • Loading branch information
Yannic92 committed Jul 11, 2022
1 parent 9e5bb7a commit 230de2f
Show file tree
Hide file tree
Showing 7 changed files with 38 additions and 26 deletions.
Expand Up @@ -15,6 +15,7 @@
import static org.eclipse.ditto.base.model.common.ConditionChecker.checkNotNull;

import org.eclipse.ditto.base.model.headers.DittoHeaders;
import org.eclipse.ditto.base.service.DittoExtensionIds;
import org.eclipse.ditto.base.service.DittoExtensionPoint;
import org.eclipse.ditto.connectivity.model.Connection;

Expand Down Expand Up @@ -51,21 +52,30 @@ Props getActorPropsForType(Connection connection,
* {@code ActorSystem}.
*
* @param actorSystem the actorSystem in which the {@code ClientActorPropsFactory} should be loaded.
* @param config the configuration of this extension.
* @return the {@code ClientActorPropsFactory} implementation.
* @throws NullPointerException if {@code actorSystem} is {@code null}.
*/
static ClientActorPropsFactory get(final ActorSystem actorSystem) {
static ClientActorPropsFactory get(final ActorSystem actorSystem, final Config config) {
checkNotNull(actorSystem, "actorSystem");
return ExtensionId.INSTANCE.get(actorSystem);
checkNotNull(config, "config");
final var extensionIdConfig = ExtensionId.computeConfig(config);
return DittoExtensionIds.get(actorSystem)
.computeIfAbsent(extensionIdConfig, ExtensionId::new)
.get(actorSystem);
}

final class ExtensionId extends DittoExtensionPoint.ExtensionId<ClientActorPropsFactory> {

private static final String CONFIG_PATH = "ditto.connectivity.connection.client-actor-props-factory";
private static final ExtensionId INSTANCE = new ExtensionId(ClientActorPropsFactory.class);
private static final String CONFIG_KEY = "client-actor-props-factory";
private static final String CONFIG_PATH = "ditto.extensions." + CONFIG_KEY;

private ExtensionId(final Class<ClientActorPropsFactory> parentClass) {
super(parentClass);
private ExtensionId(final ExtensionIdConfig<ClientActorPropsFactory> extensionIdConfig) {
super(extensionIdConfig);
}

static ExtensionIdConfig<ClientActorPropsFactory> computeConfig(final Config config) {
return ExtensionIdConfig.of(ClientActorPropsFactory.class, config, CONFIG_KEY);
}

@Override
Expand Down
Expand Up @@ -205,7 +205,8 @@ public final class ConnectionPersistenceActor
final var actorSystem = getContext().getSystem();
cluster = Cluster.get(actorSystem);
this.commandForwarderActor = commandForwarderActor;
propsFactory = ClientActorPropsFactory.get(actorSystem);
propsFactory =
ClientActorPropsFactory.get(actorSystem, ScopedConfig.dittoExtension(actorSystem.settings().config()));
this.pubSubMediator = pubSubMediator;
this.connectivityConfigOverwrites = connectivityConfigOverwrites;
connectivityConfig = getConnectivityConfigWithOverwrites(connectivityConfigOverwrites);
Expand Down
7 changes: 2 additions & 5 deletions connectivity/service/src/main/resources/connectivity.conf
Expand Up @@ -14,6 +14,8 @@ ditto {
extensions {
# Factory for custom connection priority provider.
connection-priority-provider-factory = "org.eclipse.ditto.connectivity.service.messaging.persistence.UsageBasedPriorityProviderFactory"
# Factory for custom client actor props.
client-actor-props-factory = "org.eclipse.ditto.connectivity.service.messaging.DefaultClientActorPropsFactory"
}

persistence.operations.delay-after-persistence-actor-shutdown = 5s
Expand Down Expand Up @@ -161,11 +163,6 @@ ditto {
custom-command-interceptor-provider = "org.eclipse.ditto.connectivity.service.messaging.validation.NoOpConnectivityCommandInterceptorProvider"
custom-command-interceptor-provider = ${?CONNECTIVITY_CUSTOM_COMMAND_INTERCEPTOR_PROVIDER}

# Factory for custom client actor props.
client-actor-props-factory = "org.eclipse.ditto.connectivity.service.messaging.DefaultClientActorPropsFactory"
client-actor-props-factory = ${?CONNECTIVITY_CLIENT_ACTOR_PROPS_FACTORY}


acknowledgement {
# lifetime of ack forwarder. Must be bigger than the largest possible command timeout (60s)
forwarder-fallback-timeout = 65s
Expand Down
Expand Up @@ -44,6 +44,7 @@
import org.eclipse.ditto.connectivity.model.signals.commands.query.RetrieveConnectionStatus;
import org.eclipse.ditto.connectivity.service.messaging.internal.ssl.SSLContextCreator;
import org.eclipse.ditto.connectivity.service.messaging.monitoring.logs.ConnectionLogger;
import org.eclipse.ditto.internal.utils.config.ScopedConfig;
import org.junit.After;
import org.junit.Test;

Expand Down Expand Up @@ -88,9 +89,9 @@ public void closeServerBinding() {

protected static ConnectionBuilder getHttpConnectionBuilderToLocalBinding(final boolean isSecure, final int port) {
return ConnectivityModelFactory.newConnectionBuilder(TestConstants.createRandomConnectionId(),
ConnectionType.HTTP_PUSH,
ConnectivityStatus.CLOSED,
(isSecure ? "https" : "http") + "://127.0.0.1:" + port)
ConnectionType.HTTP_PUSH,
ConnectivityStatus.CLOSED,
(isSecure ? "https" : "http") + "://127.0.0.1:" + port)
.targets(singletonList(HTTP_TARGET))
.validateCertificate(isSecure);
}
Expand Down Expand Up @@ -163,7 +164,8 @@ public void testTLSConnectionWithoutCertificateCheck() {
.failoverEnabled(false)
.build();
final ActorRef underTest = watch(actorSystem.actorOf(
ClientActorPropsFactory.get(actorSystem)
ClientActorPropsFactory.get(actorSystem,
ScopedConfig.dittoExtension(actorSystem.settings().config()))
.getActorPropsForType(insecureConnection, getRef(), getRef(), actorSystem,
DittoHeaders.empty(), ConfigFactory.empty())
));
Expand Down
Expand Up @@ -25,6 +25,7 @@
import org.eclipse.ditto.connectivity.model.Connection;
import org.eclipse.ditto.connectivity.model.ConnectionType;
import org.eclipse.ditto.connectivity.model.ConnectivityModelFactory;
import org.eclipse.ditto.internal.utils.config.ScopedConfig;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
Expand All @@ -51,7 +52,8 @@ public final class DefaultClientActorPropsFactoryTest extends WithMockServers {
public void setUp() {
actorSystem = ActorSystem.create("AkkaTestSystem", TestConstants.CONFIG);
serialization = SerializationExtension.get(actorSystem);
underTest = ClientActorPropsFactory.get(actorSystem);
underTest =
ClientActorPropsFactory.get(actorSystem, ScopedConfig.dittoExtension(actorSystem.settings().config()));
}

@After
Expand Down
Expand Up @@ -51,7 +51,7 @@ public class ErrorHandlingActorTest extends WithMockServers {

public void setUp(final boolean allowFirstCreateCommand, final boolean allowCloseCommands) {
actorSystem = ActorSystem.create("AkkaTestSystem", ConfigFactory.parseMap(
Map.of("ditto.connectivity.connection.client-actor-props-factory",
Map.of("ditto.extensions.client-actor-props-factory",
"org.eclipse.ditto.connectivity.service.messaging.FaultyClientActorPropsFactory",
"allowFirstCreateCommand", allowFirstCreateCommand, "allowCloseCommands",
allowCloseCommands))
Expand Down
Expand Up @@ -110,21 +110,21 @@ public final class ConnectionPersistenceActorTest extends WithMockServers {
@Rule
public final ActorSystemResource actorSystemResource1 = ActorSystemResource.newInstance(
ConfigFactory.parseMap(Map.of(
"ditto.connectivity.connection.client-actor-props-factory",
"ditto.extensions.client-actor-props-factory",
"org.eclipse.ditto.connectivity.service.messaging.MockClientActorPropsFactory"
)).withFallback(TestConstants.CONFIG));

@Rule
public final ActorSystemResource actorSystemResource2 = ActorSystemResource.newInstance(
ConfigFactory.parseMap(Map.of(
"ditto.connectivity.connection.client-actor-props-factory",
"ditto.extensions.client-actor-props-factory",
"org.eclipse.ditto.connectivity.service.messaging.MockClientActorPropsFactory"
)).withFallback(TestConstants.CONFIG));

@Rule
public final ActorSystemResource actorSystemResourceWithBlocklist = ActorSystemResource.newInstance(
ConfigFactory.parseMap(Map.of(
"ditto.connectivity.connection.client-actor-props-factory",
"ditto.extensions.client-actor-props-factory",
"org.eclipse.ditto.connectivity.service.messaging.MockClientActorPropsFactory",
"ditto.connectivity.connection.blocked-hostnames",
ConfigValueFactory.fromAnyRef("127.0.0.1")
Expand All @@ -135,14 +135,14 @@ public final class ConnectionPersistenceActorTest extends WithMockServers {
@Rule
public final ActorSystemResource exceptionalClientProviderSystemResource = ActorSystemResource.newInstance(
ConfigFactory.parseMap(Map.of(
"ditto.connectivity.connection.client-actor-props-factory",
"ditto.extensions.client-actor-props-factory",
"org.eclipse.ditto.connectivity.service.messaging.ExceptionClientActorPropsFactory"
)).withFallback(TestConstants.CONFIG));

@Rule
public final ActorSystemResource exceptionalCommandValidatorSystemResource = ActorSystemResource.newInstance(
ConfigFactory.parseMap(Map.of(
"ditto.connectivity.connection.client-actor-props-factory",
"ditto.extensions.client-actor-props-factory",
"org.eclipse.ditto.connectivity.service.messaging.MockClientActorPropsFactory",
"ditto.connectivity.connection.custom-command-interceptor-provider",
"org.eclipse.ditto.connectivity.service.messaging.ExceptionalCommandValidator"
Expand All @@ -151,7 +151,7 @@ public final class ConnectionPersistenceActorTest extends WithMockServers {
@Rule
public final ActorSystemResource failingClientProviderSystemResource = ActorSystemResource.newInstance(
ConfigFactory.parseMap(Map.of(
"ditto.connectivity.connection.client-actor-props-factory",
"ditto.extensions.client-actor-props-factory",
"org.eclipse.ditto.connectivity.service.messaging.FailingActorProvider",
"failingRetries",
TestConstants.CONNECTION_CONFIG.getClientActorRestartsBeforeEscalation()
Expand All @@ -160,15 +160,15 @@ public final class ConnectionPersistenceActorTest extends WithMockServers {
@Rule
public final ActorSystemResource tooManyFailingClientProviderSystemResource = ActorSystemResource.newInstance(
ConfigFactory.parseMap(Map.of(
"ditto.connectivity.connection.client-actor-props-factory",
"ditto.extensions.client-actor-props-factory",
"org.eclipse.ditto.connectivity.service.messaging.FailingActorProvider", "failingRetries",
1 + TestConstants.CONNECTION_CONFIG.getClientActorRestartsBeforeEscalation()
)).withFallback(TestConstants.CONFIG));

@Rule
public final ActorSystemResource searchForwardingSystemResource = ActorSystemResource.newInstance(
ConfigFactory.parseMap(Map.of(
"ditto.connectivity.connection.client-actor-props-factory",
"ditto.extensions.client-actor-props-factory",
"org.eclipse.ditto.connectivity.service.messaging.SearchForwardingClientActorPropsFactory"
)).withFallback(TestConstants.CONFIG));

Expand Down

0 comments on commit 230de2f

Please sign in to comment.