Skip to content

Commit

Permalink
Add more extensions to connectivity service
Browse files Browse the repository at this point in the history
Signed-off-by: David Schwilk <david.schwilk@bosch.io>
  • Loading branch information
DerSchwilk committed May 11, 2022
1 parent caf8fc9 commit 07cf4f4
Show file tree
Hide file tree
Showing 17 changed files with 264 additions and 105 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -74,23 +74,18 @@ public final class ConnectivityRootActor extends DittoRootActor {

@SuppressWarnings("unused")
private ConnectivityRootActor(final ConnectivityConfig connectivityConfig,
final ActorRef pubSubMediator,
final UnaryOperator<Signal<?>> commandForwarderSignalTransformer,
final ConnectionPriorityProviderFactory connectionPriorityProviderFactory) {
final ActorRef pubSubMediator) {

final ClusterConfig clusterConfig = connectivityConfig.getClusterConfig();
final ActorSystem actorSystem = getContext().system();

final ActorRef commandForwarder =
getCommandForwarder(clusterConfig, pubSubMediator, commandForwarderSignalTransformer);
final ActorRef commandForwarder = getCommandForwarder(clusterConfig, pubSubMediator);

final ActorRef proxyActor =
startChildActor(ConnectivityProxyActor.ACTOR_NAME, ConnectivityProxyActor.props(commandForwarder));

final var connectionSupervisorProps = getConnectivitySupervisorActorProps(
pubSubMediator,
connectionPriorityProviderFactory,
proxyActor
pubSubMediator, proxyActor
);
// Create persistence streaming actor (with no cache) and make it known to pubSubMediator.
final ActorRef persistenceStreamingActor =
Expand Down Expand Up @@ -131,10 +126,8 @@ private ConnectivityRootActor(final ConnectivityConfig connectivityConfig,
}

private static Props getConnectivitySupervisorActorProps(final ActorRef pubSubMediator,
final ConnectionPriorityProviderFactory connectionPriorityProviderFactory,
final ActorRef proxyActor) {
return ConnectionSupervisorActor.props(proxyActor,
connectionPriorityProviderFactory, pubSubMediator, providePreEnforcer());
return ConnectionSupervisorActor.props(proxyActor, pubSubMediator, providePreEnforcer());
}

private static PreEnforcer providePreEnforcer() {
Expand All @@ -147,34 +140,11 @@ private static PreEnforcer providePreEnforcer() {
*
* @param connectivityConfig the configuration of the Connectivity service.
* @param pubSubMediator the PubSub mediator Actor.
* @param commandForwarderSignalTransformer a function which transforms signals before forwarding them.
* @param connectionPriorityProviderFactory used to determine the reconnect priority of a connection.
* @return the Akka configuration Props object.
*/
public static Props props(final ConnectivityConfig connectivityConfig,
final ActorRef pubSubMediator,
final UnaryOperator<Signal<?>> commandForwarderSignalTransformer,
final ConnectionPriorityProviderFactory connectionPriorityProviderFactory) {
//todo dgs: which ones of these props can be moved to extension?

return Props.create(ConnectivityRootActor.class, connectivityConfig, pubSubMediator,
commandForwarderSignalTransformer, connectionPriorityProviderFactory);
}

/**
* Creates Akka configuration object Props for this ConnectivityRootActor.
*
* @param connectivityConfig the configuration of the Connectivity service.
* @param pubSubMediator the PubSub mediator Actor.
* @param commandForwarderSignalTransformer a function which transforms signals before forwarding them.
* @return the Akka configuration Props object.
*/
public static Props props(final ConnectivityConfig connectivityConfig, final ActorRef pubSubMediator,
final UnaryOperator<Signal<?>> commandForwarderSignalTransformer) {
public static Props props(final ConnectivityConfig connectivityConfig, final ActorRef pubSubMediator) {

return Props.create(ConnectivityRootActor.class, connectivityConfig, pubSubMediator,
commandForwarderSignalTransformer,
(ConnectionPriorityProviderFactory) UsageBasedPriorityProvider::getInstance);
return Props.create(ConnectivityRootActor.class, connectivityConfig, pubSubMediator);
}

@Override
Expand Down Expand Up @@ -208,12 +178,11 @@ private ActorRef getHealthCheckingActor(final ConnectivityConfig connectivityCon
));
}

private ActorRef getCommandForwarder(final ClusterConfig clusterConfig, final ActorRef pubSubMediator,
final UnaryOperator<Signal<?>> conciergeForwarderSignalTransformer) {
private ActorRef getCommandForwarder(final ClusterConfig clusterConfig, final ActorRef pubSubMediator) {

return startChildActor(ConciergeForwarderActor.ACTOR_NAME,
ConciergeForwarderActor.props(pubSubMediator, ShardRegions.of(getContext().getSystem(), clusterConfig),
conciergeForwarderSignalTransformer));
ConciergeForwarderActor.props(pubSubMediator,
ShardRegions.of(getContext().getSystem(), clusterConfig)));
}

private static ActorRef startConnectionShardRegion(final ActorSystem actorSystem,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,9 @@
*/
package org.eclipse.ditto.connectivity.service;

import java.util.function.UnaryOperator;

import org.eclipse.ditto.base.service.DittoService;
import org.eclipse.ditto.connectivity.service.config.ConnectivityConfig;
import org.eclipse.ditto.connectivity.service.config.DittoConnectivityConfig;
import org.eclipse.ditto.connectivity.service.messaging.ClientActorPropsFactory;
import org.eclipse.ditto.connectivity.service.messaging.DefaultClientActorPropsFactory;
import org.eclipse.ditto.internal.utils.config.ScopedConfig;
import org.eclipse.ditto.utils.jsr305.annotations.AllParametersAndReturnValuesAreNonnullByDefault;
import org.slf4j.Logger;
Expand Down Expand Up @@ -66,7 +62,7 @@ protected ConnectivityConfig getServiceSpecificConfig(final ScopedConfig dittoCo
@Override
protected Props getMainRootActorProps(final ConnectivityConfig connectivityConfig, final ActorRef pubSubMediator) {

return ConnectivityRootActor.props(connectivityConfig, pubSubMediator, UnaryOperator.identity());
return ConnectivityRootActor.props(connectivityConfig, pubSubMediator);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,7 @@ public interface ConnectionConfig extends WithSupervisorConfig, WithActivityChec

/**
* Whether usernames and passwords in connection URIs should be double decoded, when creating the connection.
*
* @return whether double decoding in enabled.
*/
boolean doubleDecodingEnabled();
Expand All @@ -171,14 +172,25 @@ public interface ConnectionConfig extends WithSupervisorConfig, WithActivityChec

/**
* Returns the full qualified classname of the {@code org.eclipse.ditto.connectivity.service.messaging.ClientActorPropsFactory}
* implementation to use for custom executions in {@code ClientActorPropsFactory}.
* implementation to use for custom client actor props.
*
* @return the full qualified classname of the {@code ClientActorPropsFactory} implementation to use.
* @since 3.0.0
*/
String getClientActorPropsFactory();


/**
* Returns the full qualified classname of the
* {@code org.eclipse.ditto.connectivity.service.messaging.persistence.ConnectionPriorityProviderFactory}
* implementation to use for custom priority providers.
*
* @return the full qualified classname of the {@code ConnectionPriorityProviderFactory} implementation to use.
* @since 3.0.0
*/
String getConnectionPriorityProviderFactory();


/**
* An enumeration of the known config path expressions and their associated default values for
* {@code ConnectionConfig}.
Expand Down Expand Up @@ -247,17 +259,27 @@ enum ConnectionConfigValue implements KnownConfigValue {

/**
* The full qualified classname of the {@code CustomConnectivityCommandInterceptorProvider} to instantiate.
*
* @since 3.0.0
*/
CUSTOM_COMMAND_INTERCEPTOR_PROVIDER("custom-command-interceptor-provider",
"org.eclipse.ditto.connectivity.service.messaging.validation.NoOpConnectivityCommandInterceptorProvider"),

/**
* The full qualified classname of the {@code ClientActorPropsFactory} to instantiate.
*
* @since 3.0.0
*/
CLIENT_ACTOR_PROPS_FACTORY("client-actor-props-factory",
"org.eclipse.ditto.connectivity.service.messaging.DefaultClientActorPropsFactory");
"org.eclipse.ditto.connectivity.service.messaging.DefaultClientActorPropsFactory"),

/**
* The full qualified classname of the {@code ConnectionPriorityProviderFactory} to instantiate.
*
* @since 3.0.0
*/
CONNECTION_PRIORITY_PROVIDER_FACTORY("connection-priority-provider-factory",
"org.eclipse.ditto.connectivity.service.messaging.persistence.UsageBasedPriorityProviderFactory");

private final String path;
private final Object defaultValue;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ public final class DefaultConnectionConfig implements ConnectionConfig {
private final boolean doubleDecodingEnabled;
private final String customCommandInterceptorProvider;
private final String clientActorPropsFactory;
private final String connectionPriorityProviderFactory;

private DefaultConnectionConfig(final ConfigWithFallback config) {
clientActorAskTimeout =
Expand Down Expand Up @@ -95,6 +96,8 @@ private DefaultConnectionConfig(final ConfigWithFallback config) {
customCommandInterceptorProvider =
config.getString(ConnectionConfigValue.CUSTOM_COMMAND_INTERCEPTOR_PROVIDER.getConfigPath());
clientActorPropsFactory = config.getString(ConnectionConfigValue.CLIENT_ACTOR_PROPS_FACTORY.getConfigPath());
connectionPriorityProviderFactory =
config.getString(ConnectionConfigValue.CONNECTION_PRIORITY_PROVIDER_FACTORY.getConfigPath());
}

/**
Expand Down Expand Up @@ -236,6 +239,11 @@ public String getClientActorPropsFactory() {
return clientActorPropsFactory;
}

@Override
public String getConnectionPriorityProviderFactory() {
return connectionPriorityProviderFactory;
}

@Override
public boolean equals(final Object o) {
if (this == o) {
Expand Down Expand Up @@ -268,7 +276,8 @@ public boolean equals(final Object o) {
allClientActorsOnOneNode == that.allClientActorsOnOneNode &&
doubleDecodingEnabled == that.doubleDecodingEnabled &&
Objects.equals(customCommandInterceptorProvider, that.customCommandInterceptorProvider) &&
Objects.equals(clientActorPropsFactory, that.clientActorPropsFactory);
Objects.equals(clientActorPropsFactory, that.clientActorPropsFactory) &&
Objects.equals(connectionPriorityProviderFactory, that.connectionPriorityProviderFactory);
}

@Override
Expand All @@ -278,7 +287,7 @@ public int hashCode() {
acknowledgementConfig, cleanupConfig, maxNumberOfTargets, maxNumberOfSources, activityCheckConfig,
amqp10Config, amqp091Config, mqttConfig, kafkaConfig, httpPushConfig, ackLabelDeclareInterval,
priorityUpdateInterval, allClientActorsOnOneNode, doubleDecodingEnabled,
customCommandInterceptorProvider, clientActorPropsFactory);
customCommandInterceptorProvider, clientActorPropsFactory, connectionPriorityProviderFactory);
}

@Override
Expand Down Expand Up @@ -308,6 +317,7 @@ public String toString() {
", doubleDecodingEnabled=" + doubleDecodingEnabled +
", customCommandInterceptorProvider=" + customCommandInterceptorProvider +
", clientActorPropsFactory=" + clientActorPropsFactory +
", connectionPriorityProviderFactory=" + connectionPriorityProviderFactory +
"]";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,12 @@ protected ClientActorPropsFactory(final ActorSystem actorSystem) {
* @param dittoHeaders Ditto headers of the command that caused the client actors to be created.
* @return the actor props
*/
public abstract Props getActorPropsForType(Connection connection, ActorRef proxyActor, ActorRef connectionActor,
ActorSystem actorSystem, DittoHeaders dittoHeaders, Config connectivityConfigOverwrites);
public abstract Props getActorPropsForType(Connection connection,
ActorRef proxyActor,
ActorRef connectionActor,
ActorSystem actorSystem,
DittoHeaders dittoHeaders,
Config connectivityConfigOverwrites);

/**
* Loads the implementation of {@code ClientActorPropsFactory} which is configured for the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,22 +193,22 @@ public final class ConnectionPersistenceActor
ConnectionPersistenceActor(final ConnectionId connectionId,
final ActorRef proxyActor,
final ActorRef pubSubMediator,
final ConnectionPriorityProviderFactory connectionPriorityProviderFactory,
final Trilean allClientActorsOnOneNode,
final Config connectivityConfigOverwrites) {

super(connectionId, new ConnectionMongoSnapshotAdapter());

cluster = Cluster.get(getContext().getSystem());
final var actorSystem = getContext().getSystem();
cluster = Cluster.get(actorSystem);
this.proxyActor = proxyActor;
this.propsFactory = ClientActorPropsFactory.get(getContext().getSystem());
propsFactory = ClientActorPropsFactory.get(actorSystem);
this.pubSubMediator = pubSubMediator;
this.connectivityConfigOverwrites = connectivityConfigOverwrites;
connectivityConfig = getConnectivityConfigWithOverwrites(connectivityConfigOverwrites);
commandValidator = getCommandValidator();
final ConnectionConfig connectionConfig = this.connectivityConfig.getConnectionConfig();
final ConnectionConfig connectionConfig = connectivityConfig.getConnectionConfig();
this.allClientActorsOnOneNode = allClientActorsOnOneNode.orElse(connectionConfig.areAllClientActorsOnOneNode());
connectionPriorityProvider = connectionPriorityProviderFactory.newProvider(self(), log);
connectionPriorityProvider = ConnectionPriorityProviderFactory.get(actorSystem).newProvider(self(), log);
clientActorAskTimeout = connectionConfig.getClientActorAskTimeout();
final MonitoringConfig monitoringConfig = connectivityConfig.getMonitoringConfig();
connectionLoggerRegistry = ConnectionLoggerRegistry.fromConfig(monitoringConfig.logger());
Expand Down Expand Up @@ -254,18 +254,16 @@ protected DittoDiagnosticLoggingAdapter createLogger() {
*
* @param connectionId the connection ID.
* @param proxyActor the actor used to send signals into the ditto cluster..
* @param connectionPriorityProviderFactory Creates a new connection priority provider.
* @param connectivityConfigOverwrites the overwrites for the connectivity config for the given connection.
* @return the Akka configuration Props object.
*/
public static Props props(final ConnectionId connectionId,
final ActorRef proxyActor,
final ActorRef pubSubMediator,
final ConnectionPriorityProviderFactory connectionPriorityProviderFactory,
final Config connectivityConfigOverwrites
) {
return Props.create(ConnectionPersistenceActor.class, connectionId, proxyActor, pubSubMediator,
connectionPriorityProviderFactory, Trilean.UNKNOWN, connectivityConfigOverwrites);
Trilean.UNKNOWN, connectivityConfigOverwrites);
}

/**
Expand Down Expand Up @@ -640,7 +638,7 @@ private ConnectionLogger getAppropriateLogger(final ConnectionId connectionId, f

@Override
protected void becomeDeletedHandler() {
this.cancelPeriodicPriorityUpdate();
cancelPeriodicPriorityUpdate();
super.becomeDeletedHandler();
}

Expand Down Expand Up @@ -729,7 +727,7 @@ private void updatePriority(final UpdatePriority updatePriority) {
log.withCorrelationId(updatePriority)
.info("Updating priority of connection <{}> from <{}> to <{}>", entityId, priority,
desiredPriority);
this.priority = desiredPriority;
priority = desiredPriority;
final DittoHeaders headersWithJournalTags =
updatePriority.getDittoHeaders().toBuilder().journalTags(journalTags()).build();
final EmptyEvent emptyEvent = new EmptyEvent(EmptyEvent.EFFECT_PRIORITY_UPDATE, getRevisionNumber() + 1,
Expand Down Expand Up @@ -819,7 +817,7 @@ private void logDroppedSignal(final WithDittoHeaders withDittoHeaders, final Str
}

private void retrieveConnectionLogs(final RetrieveConnectionLogs command, final ActorRef sender) {
this.updateLoggingIfEnabled();
updateLoggingIfEnabled();
broadcastCommandWithDifferentSender(command,
(existingConnection, timeout) -> RetrieveConnectionLogsAggregatorActor.props(
existingConnection, sender, command.getDittoHeaders(), timeout,
Expand All @@ -834,7 +832,7 @@ private boolean isLoggingEnabled() {
private void loggingEnabled() {
// start check logging scheduler
startEnabledLoggingChecker();
loggingEnabledUntil = Instant.now().plus(this.loggingEnabledDuration);
loggingEnabledUntil = Instant.now().plus(loggingEnabledDuration);
}

private void updateLoggingIfEnabled() {
Expand Down
Loading

0 comments on commit 07cf4f4

Please sign in to comment.