Skip to content

Commit

Permalink
Moved DittoExtensionPoint to own package in order to clean up depende…
Browse files Browse the repository at this point in the history
…ncies

Made SnapshotAdapter a DittoExtensionPoint and initialized it in PersistenceActor, instead of Service
Removed dependency on ditto-internals-persistence from ditto-base-service

Signed-off-by: David Schwilk <david.schwilk@bosch.io>
  • Loading branch information
DerSchwilk committed Jul 18, 2022
1 parent 1297520 commit 8c85dfc
Show file tree
Hide file tree
Showing 67 changed files with 442 additions and 198 deletions.
14 changes: 13 additions & 1 deletion base/service/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,19 @@
</dependency>
<dependency>
<groupId>org.eclipse.ditto</groupId>
<artifactId>ditto-internal-utils-persistence</artifactId>
<artifactId>ditto-internal-utils-tracing</artifactId>
</dependency>
<dependency>
<groupId>org.eclipse.ditto</groupId>
<artifactId>ditto-internal-utils-health</artifactId>
</dependency>
<dependency>
<groupId>org.eclipse.ditto</groupId>
<artifactId>ditto-internal-utils-cluster</artifactId>
</dependency>
<dependency>
<groupId>org.eclipse.ditto</groupId>
<artifactId>ditto-internal-utils-extension</artifactId>
</dependency>

<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
import org.eclipse.ditto.internal.utils.config.raw.RawConfigSupplier;
import org.eclipse.ditto.internal.utils.health.status.StatusSupplierActor;
import org.eclipse.ditto.internal.utils.metrics.prometheus.PrometheusReporterRoute;
import org.eclipse.ditto.internal.utils.persistence.mongo.config.WithMongoDbConfig;
import org.eclipse.ditto.internal.utils.tracing.DittoTracing;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -75,7 +74,8 @@
* <li>{@link #startStatusSupplierActor(akka.actor.ActorSystem)},</li>
* <li>{@link #startServiceRootActors(akka.actor.ActorSystem, org.eclipse.ditto.base.service.config.ServiceSpecificConfig)}.
* <ol>
* <li>{@link #getMainRootActorProps(org.eclipse.ditto.base.service.config.ServiceSpecificConfig, akka.actor.ActorRef)},</li>
* <li>{@link #getMainRootActorProps(org.eclipse.ditto.base.service.config.ServiceSpecificConfig,
* com.typesafe.config.Config, akka.actor.ActorRef, akka.actor.ActorSystem)},</li>
* <li>{@link #startMainRootActor(akka.actor.ActorSystem, akka.actor.Props)},</li>
* </ol>
* </li>
Expand All @@ -96,6 +96,8 @@ public abstract class DittoService<C extends ServiceSpecificConfig> {
*/
public static final String DITTO_CONFIG_PATH = ScopedConfig.DITTO_SCOPE;

protected static final String MONGO_URI_CONFIG_PATH = "akka.contrib.persistence.mongodb.mongo.mongouri";

private final Logger logger;
private final String serviceName;
private final String rootActorName;
Expand Down Expand Up @@ -206,26 +208,18 @@ public ActorSystem start() {
*/
protected ActorSystem doStart() {
logRuntimeParameters();
final var actorSystemConfig = appendDittoInfo(appendAkkaPersistenceMongoUriToRawConfig());
final var actorSystemConfig =
appendDittoInfo(appendAkkaPersistenceMongoUriToRawConfig(rawConfig, serviceSpecificConfig));
startKamon();
final var actorSystem = createActorSystem(actorSystemConfig);
initializeActorSystem(actorSystem);
startKamonPrometheusHttpEndpoint(actorSystem);
return actorSystem;
}

private Config appendAkkaPersistenceMongoUriToRawConfig() {
if (!isServiceWithMongoDbConfig()) {
return rawConfig;
}
final var configPath = "akka.contrib.persistence.mongodb.mongo.mongouri";
final var mongoDbConfig = ((WithMongoDbConfig) serviceSpecificConfig).getMongoDbConfig();
final String mongoDbUri = mongoDbConfig.getMongoDbUri();
return rawConfig.withValue(configPath, ConfigValueFactory.fromAnyRef(mongoDbUri));
}

private boolean isServiceWithMongoDbConfig() {
return serviceSpecificConfig instanceof WithMongoDbConfig;
@SuppressWarnings("unused")
protected Config appendAkkaPersistenceMongoUriToRawConfig(final Config rawConfig, final C serviceSpecificConfig) {
return rawConfig;
}

private void logRuntimeParameters() {
Expand Down Expand Up @@ -384,7 +378,8 @@ protected void startDevOpsCommandsActor(final ActorSystem actorSystem) {
* is overridden, the following methods will not be called automatically:</em>
* </p>
* <ul>
* <li>{@link #getMainRootActorProps(org.eclipse.ditto.base.service.config.ServiceSpecificConfig, akka.actor.ActorRef)},</li>
* <li>{@link #getMainRootActorProps(org.eclipse.ditto.base.service.config.ServiceSpecificConfig,
* com.typesafe.config.Config, akka.actor.ActorRef, akka.actor.ActorSystem)},</li>
* <li>{@link #startMainRootActor(akka.actor.ActorSystem, akka.actor.Props)},</li>
* </ul>
*
Expand All @@ -401,7 +396,8 @@ protected void startServiceRootActors(final ActorSystem actorSystem, final C ser

injectSystemPropertiesLimits(serviceSpecificConfig);

startMainRootActor(actorSystem, getMainRootActorProps(serviceSpecificConfig, pubSubMediator));
startMainRootActor(actorSystem,
getMainRootActorProps(serviceSpecificConfig, rawConfig, pubSubMediator, actorSystem));
RootActorStarter.get(actorSystem, ScopedConfig.dittoExtension(actorSystem.settings().config())).execute();
});
}
Expand Down Expand Up @@ -436,9 +432,13 @@ private static ActorRef getDistributedPubSubMediatorActor(final ActorSystem acto
*
* @param serviceSpecificConfig the configuration of this service.
* @param pubSubMediator ActorRef of the distributed pub-sub-mediator.
* @param actorSystem the actorSystem of the service.
* @return the Props.
*/
protected abstract Props getMainRootActorProps(C serviceSpecificConfig, ActorRef pubSubMediator);
protected abstract Props getMainRootActorProps(C serviceSpecificConfig,
final Config rawConfig,
final ActorRef pubSubMediator,
final ActorSystem actorSystem);

/**
* Starts the main root actor of this service. May be overridden to change the way of starting this service's root
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,10 @@
public final class NoOpRootChildActorStarter implements RootChildActorStarter {

/**
* @param actorSystem the actor system in which to load the extension.
* @param actorSystem the actor system in which to load the extension
* @param config the config of the extension.
*/
@SuppressWarnings("unused")
public NoOpRootChildActorStarter(final ActorSystem actorSystem, final Config config) {
//No-Op because extensions need a constructor accepting an actorSystem
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@

import static org.eclipse.ditto.base.model.common.ConditionChecker.checkNotNull;

import org.eclipse.ditto.internal.utils.extension.DittoExtensionIds;
import org.eclipse.ditto.internal.utils.extension.DittoExtensionPoint;

import com.typesafe.config.Config;

import akka.actor.ActorSystem;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@

import static org.eclipse.ditto.base.model.common.ConditionChecker.checkNotNull;

import org.eclipse.ditto.internal.utils.extension.DittoExtensionIds;
import org.eclipse.ditto.internal.utils.extension.DittoExtensionPoint;

import com.typesafe.config.Config;

import akka.actor.ActorContext;
Expand Down
5 changes: 5 additions & 0 deletions bom/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -666,6 +666,11 @@
<artifactId>ditto-internal-utils-metrics</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.eclipse.ditto</groupId>
<artifactId>ditto-internal-utils-extension</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.eclipse.ditto</groupId>
<artifactId>ditto-internal-utils-conditional-headers</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,11 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.typesafe.config.Config;
import com.typesafe.config.ConfigValueFactory;

import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Props;

/**
Expand All @@ -38,7 +42,7 @@ public final class ConnectivityService extends DittoService<ConnectivityConfig>
/**
* Name for the Akka Actor System of the Connectivity service.
*/
private static final String SERVICE_NAME = "connectivity";
public static final String SERVICE_NAME = "connectivity";

private ConnectivityService() {
super(LOGGER, SERVICE_NAME, ConnectivityRootActor.ACTOR_NAME);
Expand All @@ -60,9 +64,21 @@ protected ConnectivityConfig getServiceSpecificConfig(final ScopedConfig dittoCo
}

@Override
protected Props getMainRootActorProps(final ConnectivityConfig connectivityConfig, final ActorRef pubSubMediator) {
protected Props getMainRootActorProps(final ConnectivityConfig connectivityConfig,
final Config rawConfig,
final ActorRef pubSubMediator,
final ActorSystem actorSystem) {

return ConnectivityRootActor.props(connectivityConfig, pubSubMediator);
}

@Override
protected Config appendAkkaPersistenceMongoUriToRawConfig(final Config rawConfig,
final ConnectivityConfig serviceSpecificConfig) {

final var mongoDbConfig = serviceSpecificConfig.getMongoDbConfig();
final String mongoDbUri = mongoDbConfig.getMongoDbUri();
return rawConfig.withValue(MONGO_URI_CONFIG_PATH, ConfigValueFactory.fromAnyRef(mongoDbUri));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@

import static org.eclipse.ditto.base.model.common.ConditionChecker.checkNotNull;

import org.eclipse.ditto.base.service.DittoExtensionIds;
import org.eclipse.ditto.base.service.DittoExtensionPoint;
import org.eclipse.ditto.connectivity.model.ConnectionId;
import org.eclipse.ditto.internal.utils.extension.DittoExtensionPoint;
import org.eclipse.ditto.internal.utils.extension.DittoExtensionIds;

import com.typesafe.config.Config;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,10 @@

import static org.eclipse.ditto.base.model.common.ConditionChecker.checkNotNull;

import org.eclipse.ditto.base.service.DittoExtensionIds;
import org.eclipse.ditto.base.service.DittoExtensionPoint;
import org.eclipse.ditto.connectivity.model.ConnectionId;
import org.eclipse.ditto.internal.models.signalenrichment.SignalEnrichmentFacade;
import org.eclipse.ditto.internal.utils.extension.DittoExtensionPoint;
import org.eclipse.ditto.internal.utils.extension.DittoExtensionIds;

import com.typesafe.config.Config;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@
import static org.eclipse.ditto.base.model.common.ConditionChecker.checkNotNull;

import org.eclipse.ditto.base.model.headers.DittoHeaders;
import org.eclipse.ditto.base.service.DittoExtensionIds;
import org.eclipse.ditto.base.service.DittoExtensionPoint;
import org.eclipse.ditto.connectivity.model.Connection;
import org.eclipse.ditto.internal.utils.extension.DittoExtensionPoint;
import org.eclipse.ditto.internal.utils.extension.DittoExtensionIds;

import com.typesafe.config.Config;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,23 @@
import org.eclipse.ditto.json.JsonValue;
import org.slf4j.LoggerFactory;

import com.typesafe.config.Config;

import akka.actor.ActorSystem;

/**
* SnapshotAdapter for {@link String}s persisted to/from MongoDB.
*/
public final class ConnectionMongoSnapshotAdapter extends AbstractMongoSnapshotAdapter<Connection> {

/**
* @param actorSystem the actor system in which to load the extension
* @param config the config of the extension.
*/
@SuppressWarnings("unused")
public ConnectionMongoSnapshotAdapter(final ActorSystem actorSystem, final Config config) {
this();
}

public ConnectionMongoSnapshotAdapter() {
super(LoggerFactory.getLogger(ConnectionMongoSnapshotAdapter.class));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@
import org.eclipse.ditto.internal.utils.config.InstanceIdentifierSupplier;
import org.eclipse.ditto.internal.utils.config.ScopedConfig;
import org.eclipse.ditto.internal.utils.metrics.DittoMetrics;
import org.eclipse.ditto.internal.utils.persistence.SnapshotAdapter;
import org.eclipse.ditto.internal.utils.persistence.mongo.config.ActivityCheckConfig;
import org.eclipse.ditto.internal.utils.persistence.mongo.config.SnapshotConfig;
import org.eclipse.ditto.internal.utils.persistence.mongo.streaming.MongoReadJournal;
Expand All @@ -127,6 +128,7 @@
import com.typesafe.config.Config;

import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.actor.Status;
import akka.actor.SupervisorStrategy;
Expand Down Expand Up @@ -201,10 +203,11 @@ public final class ConnectionPersistenceActor
final ActorRef commandForwarderActor,
final ActorRef pubSubMediator,
final Trilean allClientActorsOnOneNode,
final Config connectivityConfigOverwrites) {
final Config connectivityConfigOverwrites,
final ActorSystem actorSystem) {

super(connectionId, SnapshotAdapter.get(actorSystem, getConnectivityRawConfig(actorSystem)));

super(connectionId, new ConnectionMongoSnapshotAdapter());
final var actorSystem = getContext().getSystem();
cluster = Cluster.get(actorSystem);
final Config dittoExtensionConfig = ScopedConfig.dittoExtension(actorSystem.settings().config());
this.updatedConnectionTester = UpdatedConnectionTester.get(actorSystem, dittoExtensionConfig);
Expand All @@ -231,6 +234,10 @@ public final class ConnectionPersistenceActor
startUpdatePriorityPeriodically(fuzzyPriorityUpdateInterval);
}

private static Config getConnectivityRawConfig(final ActorSystem actorSystem) {
return ScopedConfig.getOrEmpty(actorSystem.settings().config(), "ditto.connectivity");
}

private ConnectivityConfig getConnectivityConfigWithOverwrites(final Config connectivityConfigOverwrites) {
final Config defaultConfig = getContext().getSystem().settings().config();
final Config withOverwrites = connectivityConfigOverwrites.withFallback(defaultConfig);
Expand Down Expand Up @@ -269,10 +276,11 @@ protected DittoDiagnosticLoggingAdapter createLogger() {
public static Props props(final ConnectionId connectionId,
final ActorRef commandForwarderActor,
final ActorRef pubSubMediator,
final Config connectivityConfigOverwrites
final Config connectivityConfigOverwrites,
final ActorSystem actorSystem
) {
return Props.create(ConnectionPersistenceActor.class, connectionId, commandForwarderActor, pubSubMediator,
Trilean.UNKNOWN, connectivityConfigOverwrites);
return Props.create(ConnectionPersistenceActor.class, () -> new ConnectionPersistenceActor(connectionId,
commandForwarderActor, pubSubMediator, Trilean.UNKNOWN, connectivityConfigOverwrites, actorSystem));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,10 @@

import static org.eclipse.ditto.base.model.common.ConditionChecker.checkNotNull;

import org.eclipse.ditto.base.service.DittoExtensionIds;
import org.eclipse.ditto.base.service.DittoExtensionPoint;
import org.eclipse.ditto.internal.utils.akka.logging.DittoDiagnosticLoggingAdapter;
import org.eclipse.ditto.internal.utils.extension.DittoExtensionPoint;
import org.eclipse.ditto.internal.utils.extension.DittoExtensionIds;


import com.typesafe.config.Config;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ protected void handleMessagesDuringStartup(final Object message) {
@Override
protected Props getPersistenceActorProps(final ConnectionId entityId) {
return ConnectionPersistenceActor.props(entityId, commandForwarderActor, pubSubMediator,
connectivityConfigOverwrites);
connectivityConfigOverwrites, getContext().getSystem());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,10 @@

import static org.eclipse.ditto.base.model.common.ConditionChecker.checkNotNull;

import org.eclipse.ditto.base.service.DittoExtensionIds;
import org.eclipse.ditto.base.service.DittoExtensionPoint;
import org.eclipse.ditto.connectivity.model.signals.commands.ConnectivityCommandInterceptor;
import org.eclipse.ditto.internal.utils.extension.DittoExtensionPoint;
import org.eclipse.ditto.internal.utils.extension.DittoExtensionIds;


import com.typesafe.config.Config;

Expand Down
5 changes: 5 additions & 0 deletions connectivity/service/src/main/resources/connectivity.conf
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,11 @@ ditto {
persistence.operations.delay-after-persistence-actor-shutdown = ${?DELAY_AFTER_PERSISTENCE_ACTOR_SHUTDOWN}

connectivity {

snapshot-adapter = {
extension-class = "org.eclipse.ditto.connectivity.service.messaging.persistence.ConnectionMongoSnapshotAdapter"
}

user-indicated-errors-base = [
# Kafka
{exceptionName: "org.apache.kafka.common.errors.SaslAuthenticationException", messagePattern: ".*"}
Expand Down
Loading

0 comments on commit 8c85dfc

Please sign in to comment.