Skip to content

Commit

Permalink
Add extensions to connectivity service
Browse files Browse the repository at this point in the history
Signed-off-by: David Schwilk <david.schwilk@bosch.io>
  • Loading branch information
DerSchwilk committed May 10, 2022
1 parent 12bfbe0 commit caf8fc9
Show file tree
Hide file tree
Showing 36 changed files with 1,347 additions and 595 deletions.
Expand Up @@ -15,17 +15,14 @@
import java.util.concurrent.CompletableFuture;
import java.util.function.UnaryOperator;

import javax.annotation.Nullable;
import javax.jms.JMSRuntimeException;
import javax.naming.NamingException;

import org.eclipse.ditto.base.model.signals.Signal;
import org.eclipse.ditto.base.service.actors.DittoRootActor;
import org.eclipse.ditto.connectivity.api.ConnectivityMessagingConstants;
import org.eclipse.ditto.connectivity.model.signals.commands.ConnectivityCommandInterceptor;
import org.eclipse.ditto.connectivity.service.config.ConnectionIdsRetrievalConfig;
import org.eclipse.ditto.connectivity.service.config.ConnectivityConfig;
import org.eclipse.ditto.connectivity.service.messaging.ClientActorPropsFactory;
import org.eclipse.ditto.connectivity.service.messaging.ConnectionIdsRetrievalActor;
import org.eclipse.ditto.connectivity.service.messaging.ConnectivityProxyActor;
import org.eclipse.ditto.connectivity.service.messaging.persistence.ConnectionPersistenceOperationsActor;
Expand Down Expand Up @@ -79,9 +76,7 @@ public final class ConnectivityRootActor extends DittoRootActor {
private ConnectivityRootActor(final ConnectivityConfig connectivityConfig,
final ActorRef pubSubMediator,
final UnaryOperator<Signal<?>> commandForwarderSignalTransformer,
@Nullable final ConnectivityCommandInterceptor commandValidator,
final ConnectionPriorityProviderFactory connectionPriorityProviderFactory,
final ClientActorPropsFactory clientActorPropsFactory) {
final ConnectionPriorityProviderFactory connectionPriorityProviderFactory) {

final ClusterConfig clusterConfig = connectivityConfig.getClusterConfig();
final ActorSystem actorSystem = getContext().system();
Expand All @@ -94,12 +89,9 @@ private ConnectivityRootActor(final ConnectivityConfig connectivityConfig,

final var connectionSupervisorProps = getConnectivitySupervisorActorProps(
pubSubMediator,
commandValidator,
connectionPriorityProviderFactory,
clientActorPropsFactory,
proxyActor
);

// Create persistence streaming actor (with no cache) and make it known to pubSubMediator.
final ActorRef persistenceStreamingActor =
startChildActor(ConnectionPersistenceStreamingActorCreator.ACTOR_NAME,
Expand Down Expand Up @@ -128,6 +120,8 @@ private ConnectivityRootActor(final ConnectivityConfig connectivityConfig,
ConnectionPersistenceOperationsActor.props(pubSubMediator, connectivityConfig.getMongoDbConfig(),
actorSystem.settings().config(), connectivityConfig.getPersistenceOperationsConfig()));

CustomConnectivityRootExecutor.get(actorSystem).execute(getContext());

final var cleanupConfig = connectivityConfig.getConnectionConfig().getCleanupConfig();
final var cleanupActorProps = PersistenceCleanupActor.props(cleanupConfig, mongoReadJournal, CLUSTER_ROLE);
startChildActor(PersistenceCleanupActor.NAME, cleanupActorProps);
Expand All @@ -137,11 +131,9 @@ private ConnectivityRootActor(final ConnectivityConfig connectivityConfig,
}

private static Props getConnectivitySupervisorActorProps(final ActorRef pubSubMediator,
@Nullable final ConnectivityCommandInterceptor commandValidator,
final ConnectionPriorityProviderFactory connectionPriorityProviderFactory,
final ClientActorPropsFactory clientActorPropsFactory,
final ActorRef proxyActor) {
return ConnectionSupervisorActor.props(proxyActor, clientActorPropsFactory, commandValidator,
return ConnectionSupervisorActor.props(proxyActor,
connectionPriorityProviderFactory, pubSubMediator, providePreEnforcer());
}

Expand All @@ -156,21 +148,17 @@ private static PreEnforcer providePreEnforcer() {
* @param connectivityConfig the configuration of the Connectivity service.
* @param pubSubMediator the PubSub mediator Actor.
* @param commandForwarderSignalTransformer a function which transforms signals before forwarding them.
* @param commandValidator custom command validator for connectivity commands
* @param connectionPriorityProviderFactory used to determine the reconnect priority of a connection.
* @param clientActorPropsFactory props factory of the client actors
* @return the Akka configuration Props object.
*/
public static Props props(final ConnectivityConfig connectivityConfig,
final ActorRef pubSubMediator,
final UnaryOperator<Signal<?>> commandForwarderSignalTransformer,
final ConnectivityCommandInterceptor commandValidator,
final ConnectionPriorityProviderFactory connectionPriorityProviderFactory,
final ClientActorPropsFactory clientActorPropsFactory) {
final ConnectionPriorityProviderFactory connectionPriorityProviderFactory) {
//todo dgs: which ones of these props can be moved to extension?

return Props.create(ConnectivityRootActor.class, connectivityConfig, pubSubMediator,
commandForwarderSignalTransformer, commandValidator, connectionPriorityProviderFactory,
clientActorPropsFactory);
commandForwarderSignalTransformer, connectionPriorityProviderFactory);
}

/**
Expand All @@ -179,16 +167,14 @@ public static Props props(final ConnectivityConfig connectivityConfig,
* @param connectivityConfig the configuration of the Connectivity service.
* @param pubSubMediator the PubSub mediator Actor.
* @param commandForwarderSignalTransformer a function which transforms signals before forwarding them.
* @param clientActorPropsFactory props factory of the client actors.
* @return the Akka configuration Props object.
*/
public static Props props(final ConnectivityConfig connectivityConfig, final ActorRef pubSubMediator,
final UnaryOperator<Signal<?>> commandForwarderSignalTransformer,
final ClientActorPropsFactory clientActorPropsFactory) {
final UnaryOperator<Signal<?>> commandForwarderSignalTransformer) {

return Props.create(ConnectivityRootActor.class, connectivityConfig, pubSubMediator,
commandForwarderSignalTransformer, null,
(ConnectionPriorityProviderFactory) UsageBasedPriorityProvider::getInstance, clientActorPropsFactory);
commandForwarderSignalTransformer,
(ConnectionPriorityProviderFactory) UsageBasedPriorityProvider::getInstance);
}

@Override
Expand Down
Expand Up @@ -17,6 +17,7 @@
import org.eclipse.ditto.base.service.DittoService;
import org.eclipse.ditto.connectivity.service.config.ConnectivityConfig;
import org.eclipse.ditto.connectivity.service.config.DittoConnectivityConfig;
import org.eclipse.ditto.connectivity.service.messaging.ClientActorPropsFactory;
import org.eclipse.ditto.connectivity.service.messaging.DefaultClientActorPropsFactory;
import org.eclipse.ditto.internal.utils.config.ScopedConfig;
import org.eclipse.ditto.utils.jsr305.annotations.AllParametersAndReturnValuesAreNonnullByDefault;
Expand Down Expand Up @@ -65,8 +66,7 @@ protected ConnectivityConfig getServiceSpecificConfig(final ScopedConfig dittoCo
@Override
protected Props getMainRootActorProps(final ConnectivityConfig connectivityConfig, final ActorRef pubSubMediator) {

return ConnectivityRootActor.props(connectivityConfig, pubSubMediator, UnaryOperator.identity(),
DefaultClientActorPropsFactory.getInstance());
return ConnectivityRootActor.props(connectivityConfig, pubSubMediator, UnaryOperator.identity());
}

}
@@ -0,0 +1,76 @@
/*
* Copyright (c) 2022 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0
*
* SPDX-License-Identifier: EPL-2.0
*/
package org.eclipse.ditto.connectivity.service;

import java.util.List;

import org.eclipse.ditto.base.service.DittoExtensionPoint;
import org.eclipse.ditto.connectivity.service.config.DittoConnectivityConfig;
import org.eclipse.ditto.internal.utils.akka.AkkaClassLoader;
import org.eclipse.ditto.internal.utils.config.DefaultScopedConfig;

import akka.actor.AbstractExtensionId;
import akka.actor.ActorContext;
import akka.actor.ActorSystem;
import akka.actor.ExtendedActorSystem;

/**
* Executor for custom code in connectivity root. Can be used i.e. to start custom actors.
*
* @since 3.0.0
*/
public abstract class CustomConnectivityRootExecutor implements DittoExtensionPoint {

private static final ExtensionId EXTENSION_ID = new ExtensionId();

protected final ActorSystem actorSystem;

/**
* @param actorSystem the actor system in which to load the extension.
*/
protected CustomConnectivityRootExecutor(final ActorSystem actorSystem) {
this.actorSystem = actorSystem;
}

/**
* Execute custom custom code.
* @param actorContext the context of the {@code ConnectivityRootActor}.
*/
public abstract void execute(ActorContext actorContext);

/**
* Loads the implementation of {@code CustomConnectivityRootExecutor} which is configured for the
* {@code ActorSystem}.
*
* @param actorSystem the actorSystem in which the {@code CustomConnectivityRootExecutor} should be loaded.
* @return the {@code CustomConnectivityRootExecutor} implementation.
* @throws NullPointerException if {@code actorSystem} is {@code null}.
*/
public static CustomConnectivityRootExecutor get(final ActorSystem actorSystem) {
return EXTENSION_ID.get(actorSystem);
}

private static final class ExtensionId extends AbstractExtensionId<CustomConnectivityRootExecutor> {

@Override
public CustomConnectivityRootExecutor createExtension(final ExtendedActorSystem system) {
final var implementation = DittoConnectivityConfig.of(DefaultScopedConfig.dittoScoped(
system.settings().config())).getCustomRootExecutor();

return AkkaClassLoader.instantiate(system, CustomConnectivityRootExecutor.class,
implementation,
List.of(ActorSystem.class),
List.of(system));
}
}
}
@@ -0,0 +1,34 @@
/*
* Copyright (c) 2022 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0
*
* SPDX-License-Identifier: EPL-2.0
*/
package org.eclipse.ditto.connectivity.service;

import akka.actor.ActorContext;
import akka.actor.ActorSystem;

/**
* Connectivity root executor that does purposefully nothing.
*/
public class NoOpConnectivityRootExecutor extends CustomConnectivityRootExecutor{

/**
* @param actorSystem the actor system in which to load the extension.
*/
protected NoOpConnectivityRootExecutor(final ActorSystem actorSystem) {
super(actorSystem);
}

@Override
public void execute(final ActorContext actorContext) {
// Do nothing.
}
}
Expand Up @@ -160,6 +160,25 @@ public interface ConnectionConfig extends WithSupervisorConfig, WithActivityChec
*/
boolean doubleDecodingEnabled();

/**
* Returns the full qualified classname of the {@code org.eclipse.ditto.connectivity.service.messaging.validation.CustomConnectivityCommandInterceptorProvider}
* implementation to use for custom executions in {@code CustomConnectivityCommandInterceptorProvider}.
*
* @return the full qualified classname of the {@code CustomConnectivityCommandInterceptorProvider} implementation to use.
* @since 3.0.0
*/
String getCustomCommandInterceptorProvider();

/**
* Returns the full qualified classname of the {@code org.eclipse.ditto.connectivity.service.messaging.ClientActorPropsFactory}
* implementation to use for custom executions in {@code ClientActorPropsFactory}.
*
* @return the full qualified classname of the {@code ClientActorPropsFactory} implementation to use.
* @since 3.0.0
*/
String getClientActorPropsFactory();


/**
* An enumeration of the known config path expressions and their associated default values for
* {@code ConnectionConfig}.
Expand Down Expand Up @@ -224,7 +243,21 @@ enum ConnectionConfigValue implements KnownConfigValue {
/**
* Whether double decoding of usernames and passwords in connection URIs is enabled.
*/
DOUBLE_DECODING_ENABLED("double-decoding-enabled", true);
DOUBLE_DECODING_ENABLED("double-decoding-enabled", true),

/**
* The full qualified classname of the {@code CustomConnectivityCommandInterceptorProvider} to instantiate.
* @since 3.0.0
*/
CUSTOM_COMMAND_INTERCEPTOR_PROVIDER("custom-command-interceptor-provider",
"org.eclipse.ditto.connectivity.service.messaging.validation.NoOpConnectivityCommandInterceptorProvider"),

/**
* The full qualified classname of the {@code ClientActorPropsFactory} to instantiate.
* @since 3.0.0
*/
CLIENT_ACTOR_PROPS_FACTORY("client-actor-props-factory",
"org.eclipse.ditto.connectivity.service.messaging.DefaultClientActorPropsFactory");

private final String path;
private final Object defaultValue;
Expand Down
Expand Up @@ -18,6 +18,7 @@
import org.eclipse.ditto.connectivity.service.config.mapping.MappingConfig;
import org.eclipse.ditto.internal.models.acks.config.AcknowledgementConfig;
import org.eclipse.ditto.internal.utils.config.DefaultScopedConfig;
import org.eclipse.ditto.internal.utils.config.KnownConfigValue;
import org.eclipse.ditto.internal.utils.health.config.WithHealthCheckConfig;
import org.eclipse.ditto.internal.utils.persistence.mongo.config.WithMongoDbConfig;
import org.eclipse.ditto.internal.utils.persistence.operations.WithPersistenceOperationsConfig;
Expand Down Expand Up @@ -89,6 +90,15 @@ public interface ConnectivityConfig extends ServiceSpecificConfig, WithHealthChe
*/
TunnelConfig getTunnelConfig();

/**
* Returns the full qualified classname of the {@code org.eclipse.ditto.connectivity.service.CustomConnectivityRootExecutor}
* implementation to use for custom executions in {@code ConnectivityRootActor}.
*
* @return the full qualified classname of the {@code CustomConnectivityRootExecutor} implementation to use.
* @since 3.0.0
*/
String getCustomRootExecutor();

/**
* Read the static connectivity config from an actor system.
*
Expand All @@ -99,4 +109,37 @@ static ConnectivityConfig of(final Config config) {
return DittoConnectivityConfig.of(DefaultScopedConfig.dittoScoped(config));
}

/**
* An enumeration of the known config path expressions and their associated default values for
* {@code ConnectivityConfig}.
*/
enum ConnectivityConfigValue implements KnownConfigValue {

/**
* The full qualified classname of the {@code CustomConnectivityRootExecutor} to instantiate.
* @since 3.0.0
*/
CUSTOM_ROOT_EXECUTOR("connectivity.custom-root-executor",
"org.eclipse.ditto.gateway.service.starter.NoOpGatewayRootExecutor");

private final String path;
private final Object defaultValue;

ConnectivityConfigValue(final String thePath, final Object theDefaultValue) {
path = thePath;
defaultValue = theDefaultValue;
}

@Override
public Object getDefaultValue() {
return defaultValue;
}

@Override
public String getConfigPath() {
return path;
}

}

}

0 comments on commit caf8fc9

Please sign in to comment.