Skip to content

Commit

Permalink
Configure snapshot adapter on ditto.extensions level
Browse files Browse the repository at this point in the history
* Remove unrequired actorsystem parameter
* Remove unrequired initializations of SnapshotAdapter

Signed-off-by: Yannic Klem <yannic.klem@bosch.io>
  • Loading branch information
Yannic92 committed Jul 20, 2022
1 parent 65e4e90 commit 528899c
Show file tree
Hide file tree
Showing 24 changed files with 114 additions and 178 deletions.
Expand Up @@ -394,8 +394,7 @@ private void startServiceRootActors(final ActorSystem actorSystem, final C servi

injectSystemPropertiesLimits(serviceSpecificConfig);

startMainRootActor(actorSystem,
getMainRootActorProps(serviceSpecificConfig, rawConfig, pubSubMediator, actorSystem));
startMainRootActor(actorSystem, getMainRootActorProps(serviceSpecificConfig, rawConfig, pubSubMediator));
RootActorStarter.get(actorSystem, ScopedConfig.dittoExtension(actorSystem.settings().config())).execute();
});
}
Expand Down Expand Up @@ -430,13 +429,11 @@ private static ActorRef getDistributedPubSubMediatorActor(final ActorSystem acto
*
* @param serviceSpecificConfig the configuration of this service.
* @param pubSubMediator ActorRef of the distributed pub-sub-mediator.
* @param actorSystem the actorSystem of the service.
* @return the Props.
*/
protected abstract Props getMainRootActorProps(C serviceSpecificConfig,
final Config rawConfig,
final ActorRef pubSubMediator,
final ActorSystem actorSystem);
final ActorRef pubSubMediator);

/**
* Starts the main root actor of this service. May be overridden to change the way of starting this service's root
Expand Down
Expand Up @@ -24,7 +24,6 @@
import com.typesafe.config.ConfigValueFactory;

import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Props;

/**
Expand Down Expand Up @@ -64,11 +63,8 @@ protected ConnectivityConfig getServiceSpecificConfig(final ScopedConfig dittoCo
}

@Override
protected Props getMainRootActorProps(final ConnectivityConfig connectivityConfig,
final Config rawConfig,
final ActorRef pubSubMediator,
final ActorSystem actorSystem) {

protected Props getMainRootActorProps(final ConnectivityConfig connectivityConfig, final Config rawConfig,
final ActorRef pubSubMediator) {
return ConnectivityRootActor.props(connectivityConfig, pubSubMediator);
}

Expand Down
Expand Up @@ -113,7 +113,6 @@
import org.eclipse.ditto.internal.utils.config.InstanceIdentifierSupplier;
import org.eclipse.ditto.internal.utils.config.ScopedConfig;
import org.eclipse.ditto.internal.utils.metrics.DittoMetrics;
import org.eclipse.ditto.internal.utils.persistence.SnapshotAdapter;
import org.eclipse.ditto.internal.utils.persistence.mongo.config.ActivityCheckConfig;
import org.eclipse.ditto.internal.utils.persistence.mongo.config.SnapshotConfig;
import org.eclipse.ditto.internal.utils.persistence.mongo.streaming.MongoReadJournal;
Expand Down Expand Up @@ -203,14 +202,13 @@ public final class ConnectionPersistenceActor
final ActorRef commandForwarderActor,
final ActorRef pubSubMediator,
final Trilean allClientActorsOnOneNode,
final Config connectivityConfigOverwrites,
final ActorSystem actorSystem) {

super(connectionId, SnapshotAdapter.get(actorSystem, getConnectivityRawConfig(actorSystem)));
final Config connectivityConfigOverwrites) {

super(connectionId);
final ActorSystem actorSystem = context().system();
cluster = Cluster.get(actorSystem);
final Config dittoExtensionConfig = ScopedConfig.dittoExtension(actorSystem.settings().config());
this.updatedConnectionTester = UpdatedConnectionTester.get(actorSystem, dittoExtensionConfig);
this.updatedConnectionTester = UpdatedConnectionTester.get(actorSystem, dittoExtensionConfig);
this.commandForwarderActor = commandForwarderActor;
propsFactory = ClientActorPropsFactory.get(actorSystem, dittoExtensionConfig);
this.pubSubMediator = pubSubMediator;
Expand All @@ -234,10 +232,6 @@ public final class ConnectionPersistenceActor
startUpdatePriorityPeriodically(fuzzyPriorityUpdateInterval);
}

private static Config getConnectivityRawConfig(final ActorSystem actorSystem) {
return ScopedConfig.getOrEmpty(actorSystem.settings().config(), "ditto.connectivity");
}

private ConnectivityConfig getConnectivityConfigWithOverwrites(final Config connectivityConfigOverwrites) {
final Config defaultConfig = getContext().getSystem().settings().config();
final Config withOverwrites = connectivityConfigOverwrites.withFallback(defaultConfig);
Expand Down Expand Up @@ -276,11 +270,10 @@ protected DittoDiagnosticLoggingAdapter createLogger() {
public static Props props(final ConnectionId connectionId,
final ActorRef commandForwarderActor,
final ActorRef pubSubMediator,
final Config connectivityConfigOverwrites,
final ActorSystem actorSystem
final Config connectivityConfigOverwrites
) {
return Props.create(ConnectionPersistenceActor.class, () -> new ConnectionPersistenceActor(connectionId,
commandForwarderActor, pubSubMediator, Trilean.UNKNOWN, connectivityConfigOverwrites, actorSystem));
commandForwarderActor, pubSubMediator, Trilean.UNKNOWN, connectivityConfigOverwrites));
}

/**
Expand Down
Expand Up @@ -113,7 +113,8 @@ public static Props props(final ActorRef commandForwarder,
final ActorRef pubSubMediator,
final ConnectionEnforcerActorPropsFactory enforcerActorPropsFactory) {

return Props.create(ConnectionSupervisorActor.class, commandForwarder, pubSubMediator, enforcerActorPropsFactory);
return Props.create(ConnectionSupervisorActor.class, commandForwarder, pubSubMediator,
enforcerActorPropsFactory);
}

@Override
Expand Down Expand Up @@ -169,7 +170,7 @@ protected void handleMessagesDuringStartup(final Object message) {
@Override
protected Props getPersistenceActorProps(final ConnectionId entityId) {
return ConnectionPersistenceActor.props(entityId, commandForwarderActor, pubSubMediator,
connectivityConfigOverwrites, getContext().getSystem());
connectivityConfigOverwrites);
}

@Override
Expand Down Expand Up @@ -271,6 +272,7 @@ private enum Control {
}

private static class CheckForOverwritesConfig {

@Nullable private final DittoHeaders dittoHeaders;

private CheckForOverwritesConfig(@Nullable final DittoHeaders dittoHeaders) {
Expand Down
5 changes: 1 addition & 4 deletions connectivity/service/src/main/resources/connectivity.conf
Expand Up @@ -45,17 +45,14 @@ ditto {
pre-enforcer-provider.extension-config.pre-enforcers = [
"org.eclipse.ditto.policies.enforcement.pre.CommandWithOptionalEntityPreEnforcer"
]
snapshot-adapter = "org.eclipse.ditto.connectivity.service.messaging.persistence.ConnectionMongoSnapshotAdapter"
}

persistence.operations.delay-after-persistence-actor-shutdown = 5s
persistence.operations.delay-after-persistence-actor-shutdown = ${?DELAY_AFTER_PERSISTENCE_ACTOR_SHUTDOWN}

connectivity {

snapshot-adapter = {
extension-class = "org.eclipse.ditto.connectivity.service.messaging.persistence.ConnectionMongoSnapshotAdapter"
}

user-indicated-errors-base = [
# Kafka
{exceptionName: "org.apache.kafka.common.errors.SaslAuthenticationException", messagePattern: ".*"}
Expand Down
Expand Up @@ -110,67 +110,67 @@ public final class ConnectionPersistenceActorTest extends WithMockServers {
@Rule
public final ActorSystemResource actorSystemResource1 = ActorSystemResource.newInstance(
ConfigFactory.parseMap(Map.of(
"ditto.extensions.client-actor-props-factory",
"org.eclipse.ditto.connectivity.service.messaging.MockClientActorPropsFactory"
)).withFallback(TestConstants.CONFIG));
"ditto.extensions.client-actor-props-factory",
"org.eclipse.ditto.connectivity.service.messaging.MockClientActorPropsFactory"
)).withFallback(TestConstants.CONFIG));

@Rule
public final ActorSystemResource actorSystemResource2 = ActorSystemResource.newInstance(
ConfigFactory.parseMap(Map.of(
"ditto.extensions.client-actor-props-factory",
"org.eclipse.ditto.connectivity.service.messaging.MockClientActorPropsFactory"
)).withFallback(TestConstants.CONFIG));
"ditto.extensions.client-actor-props-factory",
"org.eclipse.ditto.connectivity.service.messaging.MockClientActorPropsFactory"
)).withFallback(TestConstants.CONFIG));

@Rule
public final ActorSystemResource actorSystemResourceWithBlocklist = ActorSystemResource.newInstance(
ConfigFactory.parseMap(Map.of(
"ditto.extensions.client-actor-props-factory",
"org.eclipse.ditto.connectivity.service.messaging.MockClientActorPropsFactory",
"ditto.connectivity.connection.blocked-hostnames",
ConfigValueFactory.fromAnyRef("127.0.0.1")
ConfigValueFactory.fromAnyRef("127.0.0.1")
))
.withFallback(TestConstants.CONFIG)
);

@Rule
public final ActorSystemResource exceptionalClientProviderSystemResource = ActorSystemResource.newInstance(
ConfigFactory.parseMap(Map.of(
"ditto.extensions.client-actor-props-factory",
"org.eclipse.ditto.connectivity.service.messaging.ExceptionClientActorPropsFactory"
)).withFallback(TestConstants.CONFIG));
"ditto.extensions.client-actor-props-factory",
"org.eclipse.ditto.connectivity.service.messaging.ExceptionClientActorPropsFactory"
)).withFallback(TestConstants.CONFIG));

@Rule
public final ActorSystemResource exceptionalCommandValidatorSystemResource = ActorSystemResource.newInstance(
ConfigFactory.parseMap(Map.of(
"ditto.extensions.client-actor-props-factory",
"org.eclipse.ditto.connectivity.service.messaging.MockClientActorPropsFactory",
"ditto.extensions.custom-connectivity-command-interceptor-provider",
"org.eclipse.ditto.connectivity.service.messaging.ExceptionalCommandValidator"
)).withFallback(TestConstants.CONFIG));
"ditto.extensions.client-actor-props-factory",
"org.eclipse.ditto.connectivity.service.messaging.MockClientActorPropsFactory",
"ditto.extensions.custom-connectivity-command-interceptor-provider",
"org.eclipse.ditto.connectivity.service.messaging.ExceptionalCommandValidator"
)).withFallback(TestConstants.CONFIG));

@Rule
public final ActorSystemResource failingClientProviderSystemResource = ActorSystemResource.newInstance(
ConfigFactory.parseMap(Map.of(
"ditto.extensions.client-actor-props-factory",
"org.eclipse.ditto.connectivity.service.messaging.FailingActorProvider",
"failingRetries",
TestConstants.CONNECTION_CONFIG.getClientActorRestartsBeforeEscalation()
)).withFallback(TestConstants.CONFIG));
"ditto.extensions.client-actor-props-factory",
"org.eclipse.ditto.connectivity.service.messaging.FailingActorProvider",
"failingRetries",
TestConstants.CONNECTION_CONFIG.getClientActorRestartsBeforeEscalation()
)).withFallback(TestConstants.CONFIG));

@Rule
public final ActorSystemResource tooManyFailingClientProviderSystemResource = ActorSystemResource.newInstance(
ConfigFactory.parseMap(Map.of(
"ditto.extensions.client-actor-props-factory",
"org.eclipse.ditto.connectivity.service.messaging.FailingActorProvider", "failingRetries",
1 + TestConstants.CONNECTION_CONFIG.getClientActorRestartsBeforeEscalation()
)).withFallback(TestConstants.CONFIG));
"ditto.extensions.client-actor-props-factory",
"org.eclipse.ditto.connectivity.service.messaging.FailingActorProvider", "failingRetries",
1 + TestConstants.CONNECTION_CONFIG.getClientActorRestartsBeforeEscalation()
)).withFallback(TestConstants.CONFIG));

@Rule
public final ActorSystemResource searchForwardingSystemResource = ActorSystemResource.newInstance(
ConfigFactory.parseMap(Map.of(
"ditto.extensions.client-actor-props-factory",
"org.eclipse.ditto.connectivity.service.messaging.SearchForwardingClientActorPropsFactory"
)).withFallback(TestConstants.CONFIG));
"ditto.extensions.client-actor-props-factory",
"org.eclipse.ditto.connectivity.service.messaging.SearchForwardingClientActorPropsFactory"
)).withFallback(TestConstants.CONFIG));

@Rule
public final TestNameCorrelationId testNameCorrelationId = TestNameCorrelationId.newInstance();
Expand Down Expand Up @@ -987,10 +987,9 @@ public void recoverDeletedConnection() {
@Test
public void exceptionDuringClientActorPropsCreation() {

final var connectionActorProps = ConnectionPersistenceActor.props(TestConstants.createRandomConnectionId(),
commandForwarderActor,
pubSubMediator,
ConfigFactory.empty(), exceptionalClientProviderSystemResource.getActorSystem());
final var connectionActorProps = ConnectionPersistenceActor.props(
TestConstants.createRandomConnectionId(), commandForwarderActor, pubSubMediator, ConfigFactory.empty()
);

// create another actor because this it is stopped and we want to test if the child is terminated
final var parent = exceptionalClientProviderSystemResource.newTestKit();
Expand All @@ -1012,7 +1011,7 @@ public void exceptionDueToCustomValidator() {
final var connectionActorProps = ConnectionPersistenceActor.props(TestConstants.createRandomConnectionId(),
commandForwarderActor,
pubSubMediator,
ConfigFactory.empty(), exceptionalCommandValidatorSystemResource.getActorSystem());
ConfigFactory.empty());

// create another actor because we want to test if the child is terminated
final var parent = exceptionalCommandValidatorSystemResource.newTestKit();
Expand Down Expand Up @@ -1268,7 +1267,7 @@ public void forwardSearchCommands() {
proxyActorProbe.ref(),
pubSubMediatorProbe.ref(),
Trilean.TRUE,
ConfigFactory.empty(), searchForwardingSystemResource.getActorSystem()));
ConfigFactory.empty()));

// GIVEN: connection persistence actor created with 2 client actors that are allowed to start on same node
final var underTest = searchForwardingSystemResource.newActor(connectionActorProps, myConnectionId.toString());
Expand Down Expand Up @@ -1327,7 +1326,7 @@ public void retriesStartingClientActor() {
commandForwarderActor,
pubSubMediator,
Trilean.FALSE,
ConfigFactory.empty(), failingClientProviderSystemResource.getActorSystem())
ConfigFactory.empty())
)
);
final var testProbe = failingClientProviderSystemResource.newTestProbe();
Expand All @@ -1349,7 +1348,7 @@ public void escalatesWhenClientActorFailsTooOften() {
commandForwarderActor,
pubSubMediator,
Trilean.FALSE,
ConfigFactory.empty(), tooManyFailingClientProviderSystemResource.getActorSystem())
ConfigFactory.empty())
)
);
final var testProbe = tooManyFailingClientProviderSystemResource.newTestProbe();
Expand Down
Expand Up @@ -23,7 +23,6 @@
import com.typesafe.config.Config;

import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Props;

/**
Expand Down Expand Up @@ -59,10 +58,8 @@ protected GatewayConfig getServiceSpecificConfig(final ScopedConfig dittoConfig)
}

@Override
protected Props getMainRootActorProps(final GatewayConfig gatewayConfig,
final Config rawConfig,
final ActorRef pubSubMediator,
final ActorSystem actorSystem) {
protected Props getMainRootActorProps(final GatewayConfig gatewayConfig, final Config rawConfig,
final ActorRef pubSubMediator) {

return GatewayRootActor.props(gatewayConfig, pubSubMediator);
}
Expand Down
Expand Up @@ -30,6 +30,7 @@
import org.eclipse.ditto.internal.utils.akka.PingCommand;
import org.eclipse.ditto.internal.utils.akka.PingCommandResponse;
import org.eclipse.ditto.internal.utils.akka.logging.DittoDiagnosticLoggingAdapter;
import org.eclipse.ditto.internal.utils.config.ScopedConfig;
import org.eclipse.ditto.internal.utils.persistence.SnapshotAdapter;
import org.eclipse.ditto.internal.utils.persistence.mongo.config.ActivityCheckConfig;
import org.eclipse.ditto.internal.utils.persistence.mongo.config.SnapshotConfig;
Expand Down Expand Up @@ -98,12 +99,13 @@ public abstract class AbstractPersistenceActor<
* Instantiate the actor.
*
* @param entityId the entity ID.
* @param snapshotAdapter the entity's snapshot adapter.
*/
@SuppressWarnings("unchecked")
protected AbstractPersistenceActor(final I entityId, final SnapshotAdapter<S> snapshotAdapter) {
protected AbstractPersistenceActor(final I entityId) {
this.entityId = entityId;
this.snapshotAdapter = snapshotAdapter;
final var actorSystem = context().system();
final var dittoExtensionsConfig = ScopedConfig.dittoExtension(actorSystem.settings().config());
this.snapshotAdapter = SnapshotAdapter.get(actorSystem, dittoExtensionsConfig);
entity = null;

lastSnapshotRevision = 0L;
Expand Down

0 comments on commit 528899c

Please sign in to comment.