Skip to content

Commit

Permalink
[#1081] Replace ConnectivityConfigProvider by ConnectionContextProvider.
Browse files Browse the repository at this point in the history
Signed-off-by: Yufei Cai <yufei.cai@bosch.io>
  • Loading branch information
yufei-cai committed Jun 21, 2021
1 parent b131186 commit cb0de86
Show file tree
Hide file tree
Showing 29 changed files with 268 additions and 216 deletions.
Expand Up @@ -17,34 +17,36 @@

import org.atteo.classindex.IndexSubclasses;
import org.eclipse.ditto.base.model.headers.DittoHeaders;
import org.eclipse.ditto.connectivity.model.ConnectionId;
import org.eclipse.ditto.base.model.signals.events.Event;
import org.eclipse.ditto.connectivity.model.Connection;
import org.eclipse.ditto.connectivity.model.ConnectionId;
import org.eclipse.ditto.connectivity.service.mapping.ConnectionContext;

import akka.actor.ActorRef;

/**
* Provides methods to load {@link ConnectivityConfig} and register for changes to {@link ConnectivityConfig}.
*/
@IndexSubclasses
public interface ConnectivityConfigProvider {
public interface ConnectionContextProvider {

/**
* Loads a {@link ConnectivityConfig} by a connection ID.
* Loads a {@link org.eclipse.ditto.connectivity.service.mapping.ConnectionContext} by a connection.
*
* @param connectionId the connection id for which to load the {@link ConnectivityConfig}
* @param dittoHeaders the ditto headers for which to load the {@link ConnectivityConfig}
* @return the connectivity config
* @param connection the connection for which to load the connection context.
* @param dittoHeaders the ditto headers for which to load the connection context.
* @return the connectivity context
*/
ConnectivityConfig getConnectivityConfig(ConnectionId connectionId, DittoHeaders dittoHeaders);
ConnectionContext getConnectionContext(Connection connection, DittoHeaders dittoHeaders);

/**
* Loads a {@link ConnectivityConfig} by a connection ID.
* Loads a connection context.
*
* @param connectionId the connection id for which to load the {@link ConnectivityConfig}
* @return the connectivity config
* @param connection the connection for which to load the connection context.
* @return the connection context.
*/
default ConnectivityConfig getConnectivityConfig(ConnectionId connectionId) {
return getConnectivityConfig(connectionId, DittoHeaders.empty());
default ConnectionContext getConnectionContext(Connection connection) {
return getConnectionContext(connection, DittoHeaders.empty());
}

/**
Expand Down
Expand Up @@ -18,12 +18,15 @@
import org.eclipse.ditto.connectivity.service.config.mapping.MappingConfig;
import org.eclipse.ditto.internal.models.acks.config.AcknowledgementConfig;
import org.eclipse.ditto.internal.models.signalenrichment.SignalEnrichmentConfig;
import org.eclipse.ditto.internal.utils.config.DefaultScopedConfig;
import org.eclipse.ditto.internal.utils.health.config.WithHealthCheckConfig;
import org.eclipse.ditto.internal.utils.persistence.mongo.config.WithMongoDbConfig;
import org.eclipse.ditto.internal.utils.persistence.operations.WithPersistenceOperationsConfig;
import org.eclipse.ditto.internal.utils.persistentactors.config.PingConfig;
import org.eclipse.ditto.internal.utils.protocol.config.WithProtocolConfig;

import akka.actor.ActorSystem;

/**
* Provides the configuration settings of the Connectivity service.
*/
Expand Down Expand Up @@ -93,4 +96,14 @@ public interface ConnectivityConfig extends ServiceSpecificConfig, WithHealthChe
* @return the config.
*/
TunnelConfig getTunnelConfig();

/**
* Read the static connectivity config from an actor system.
*
* @param actorSystem the actor system.
* @return the connectivity config.
*/
static ConnectivityConfig forActorSystem(final ActorSystem actorSystem) {
return DittoConnectivityConfig.of(DefaultScopedConfig.dittoScoped(actorSystem.settings().config()));
}
}
Expand Up @@ -55,9 +55,9 @@ default void registerForConfigChanges(ConnectionId connectionId) {
}

/**
* @return a {@link ConnectivityConfigProvider} required to register this actor for config changes
* @return a {@link ConnectionContextProvider} required to register this actor for config changes
*/
default ConnectivityConfigProvider getConnectivityConfigProvider() {
default ConnectionContextProvider getConnectivityConfigProvider() {
return ConnectivityConfigProviderFactory.getInstance(context().system());
}

Expand Down
Expand Up @@ -35,7 +35,7 @@
import scala.util.Try;

/**
* Factory to instantiate new {@link ConnectivityConfigProvider}s.
* Factory to instantiate new {@link ConnectionContextProvider}s.
*/
public final class ConnectivityConfigProviderFactory implements Extension {

Expand All @@ -50,60 +50,60 @@ public final class ConnectivityConfigProviderFactory implements Extension {
* implementation is used as a fallback or an exception is thrown, depending on the config value of
* {@value #DEFAULT_CONFIG_PROVIDER_CONFIG}.
*/
private static final Class<DittoConnectivityConfigProvider>
DEFAULT_CONNECTIVITY_CONFIG_PROVIDER_CLASS = DittoConnectivityConfigProvider.class;
private static final Class<DittoConnectionContextProvider>
DEFAULT_CONNECTIVITY_CONFIG_PROVIDER_CLASS = DittoConnectionContextProvider.class;

/**
* Holds the instance of the {@link ConnectivityConfigProvider}.
* Holds the instance of the {@link ConnectionContextProvider}.
*/
private final ConnectivityConfigProvider connectivityConfigProvider;
private final ConnectionContextProvider connectionContextProvider;

/**
* Returns the {@link ConnectivityConfigProvider} instance.
* Returns the {@link ConnectionContextProvider} instance.
*
* @return the instance of the {@link ConnectivityConfigProvider}
* @return the instance of the {@link ConnectionContextProvider}
*/
public ConnectivityConfigProvider getInstance() {
return connectivityConfigProvider;
public ConnectionContextProvider getInstance() {
return connectionContextProvider;
}

/**
* Returns the {@link ConnectivityConfigProvider} instance.
* Returns the {@link ConnectionContextProvider} instance.
*
* @param actorSystem the actor system
* @return the instance of the {@link ConnectivityConfigProvider}
* @return the instance of the {@link ConnectionContextProvider}
*/
public static ConnectivityConfigProvider getInstance(final ActorSystem actorSystem) {
public static ConnectionContextProvider getInstance(final ActorSystem actorSystem) {
return ConnectivityConfigProviderFactory.get(actorSystem).getInstance();
}

private ConnectivityConfigProviderFactory(final ActorSystem actorSystem) {
final Config config = actorSystem.settings().config();
final boolean loadDefaultProvider = config.getBoolean(DEFAULT_CONFIG_PROVIDER_CONFIG);

final Class<? extends ConnectivityConfigProvider> providerClass =
final Class<? extends ConnectionContextProvider> providerClass =
findProviderClass(c -> filterDefaultProvider(c, loadDefaultProvider));

try {
final ClassTag<ConnectivityConfigProvider> tag =
ClassTag$.MODULE$.apply(ConnectivityConfigProvider.class);
final ClassTag<ConnectionContextProvider> tag =
ClassTag$.MODULE$.apply(ConnectionContextProvider.class);
final Tuple2<Class<?>, Object> args = new Tuple2<>(ActorSystem.class, actorSystem);
final DynamicAccess dynamicAccess = ((ExtendedActorSystem) actorSystem).dynamicAccess();
final Try<ConnectivityConfigProvider> providerBox = dynamicAccess.createInstanceFor(providerClass,
final Try<ConnectionContextProvider> providerBox = dynamicAccess.createInstanceFor(providerClass,
CollectionConverters.asScala(Collections.singleton(args)).toList(), tag);
connectivityConfigProvider = providerBox.get();
connectionContextProvider = providerBox.get();
} catch (final Exception e) {
throw configProviderInstantiationFailed(providerClass, e);
}
}

private static Class<? extends ConnectivityConfigProvider> findProviderClass(
final Predicate<Class<? extends ConnectivityConfigProvider>> classPredicate) {
private static Class<? extends ConnectionContextProvider> findProviderClass(
final Predicate<Class<? extends ConnectionContextProvider>> classPredicate) {

final Iterable<Class<? extends ConnectivityConfigProvider>> subclasses =
ClassIndex.getSubclasses(ConnectivityConfigProvider.class);
final Iterable<Class<? extends ConnectionContextProvider>> subclasses =
ClassIndex.getSubclasses(ConnectionContextProvider.class);

final List<Class<? extends ConnectivityConfigProvider>> candidates =
final List<Class<? extends ConnectionContextProvider>> candidates =
StreamSupport.stream(subclasses.spliterator(), false)
.filter(classPredicate)
.collect(Collectors.toList());
Expand All @@ -115,7 +115,7 @@ private static Class<? extends ConnectivityConfigProvider> findProviderClass(
}
}

private static boolean filterDefaultProvider(final Class<? extends ConnectivityConfigProvider> c,
private static boolean filterDefaultProvider(final Class<? extends ConnectionContextProvider> c,
final boolean loadDefaultProvider) {

if (loadDefaultProvider) {
Expand All @@ -126,12 +126,12 @@ private static boolean filterDefaultProvider(final Class<? extends ConnectivityC
}

private static DittoRuntimeException configProviderNotFound(
final List<Class<? extends ConnectivityConfigProvider>> candidates) {
final List<Class<? extends ConnectionContextProvider>> candidates) {
return ConnectivityConfigProviderMissingException.newBuilder(candidates).build();
}

private static DittoRuntimeException configProviderInstantiationFailed(final Class<?
extends ConnectivityConfigProvider> c, final Exception cause) {
extends ConnectionContextProvider> c, final Exception cause) {
return ConnectivityConfigProviderFailedException.newBuilder(c)
.cause(cause)
.build();
Expand Down
Expand Up @@ -38,7 +38,7 @@ private ConnectivityConfigProviderFailedException(
super(ERROR_CODE, HttpStatus.INTERNAL_SERVER_ERROR, dittoHeaders, message, description, cause, href);
}

public static Builder newBuilder(final Class<? extends ConnectivityConfigProvider> failedConfigProvider) {
public static Builder newBuilder(final Class<? extends ConnectionContextProvider> failedConfigProvider) {
return new Builder(MessageFormat.format(MESSAGE_TEMPLATE, failedConfigProvider.getName()));
}

Expand Down
Expand Up @@ -40,7 +40,7 @@ private ConnectivityConfigProviderMissingException(
super(ERROR_CODE, HttpStatus.INTERNAL_SERVER_ERROR, dittoHeaders, message, description, cause, href);
}

public static Builder newBuilder(final List<Class<? extends ConnectivityConfigProvider>> candidates) {
public static Builder newBuilder(final List<Class<? extends ConnectionContextProvider>> candidates) {
return new Builder(MessageFormat.format(MESSAGE_TEMPLATE, candidates));
}

Expand Down
Expand Up @@ -15,29 +15,32 @@
import java.util.Optional;

import org.eclipse.ditto.base.model.headers.DittoHeaders;
import org.eclipse.ditto.base.model.signals.events.Event;
import org.eclipse.ditto.connectivity.model.Connection;
import org.eclipse.ditto.connectivity.model.ConnectionId;
import org.eclipse.ditto.connectivity.service.mapping.ConnectionContext;
import org.eclipse.ditto.connectivity.service.mapping.DittoConnectionContext;
import org.eclipse.ditto.internal.utils.config.DefaultScopedConfig;
import org.eclipse.ditto.base.model.signals.events.Event;

import akka.actor.ActorRef;
import akka.actor.ActorSystem;

/**
* Default implementation of {@link ConnectivityConfigProvider} which simply builds and returns a
* {@link DittoConnectivityConfig}.
* Default implementation of {@link ConnectionContextProvider} which simply builds and returns a
* {@link org.eclipse.ditto.connectivity.service.mapping.DittoConnectionContext}.
*/
public class DittoConnectivityConfigProvider implements ConnectivityConfigProvider {
public class DittoConnectionContextProvider implements ConnectionContextProvider {

private final DittoConnectivityConfig connectivityConfig;

public DittoConnectivityConfigProvider(final ActorSystem actorSystem) {
public DittoConnectionContextProvider(final ActorSystem actorSystem) {
this.connectivityConfig =
DittoConnectivityConfig.of(DefaultScopedConfig.dittoScoped(actorSystem.settings().config()));
}

@Override
public ConnectivityConfig getConnectivityConfig(final ConnectionId connectionId, final DittoHeaders dittoHeaders) {
return connectivityConfig;
public ConnectionContext getConnectionContext(final Connection connection, final DittoHeaders dittoHeaders) {
return DittoConnectionContext.of(connection, connectivityConfig);
}

@Override
Expand Down
Expand Up @@ -90,15 +90,14 @@
import org.eclipse.ditto.connectivity.model.signals.commands.query.RetrieveConnectionMetricsResponse;
import org.eclipse.ditto.connectivity.model.signals.commands.query.RetrieveConnectionStatus;
import org.eclipse.ditto.connectivity.service.config.ClientConfig;
import org.eclipse.ditto.connectivity.service.config.ConnectionContextProvider;
import org.eclipse.ditto.connectivity.service.config.ConnectivityConfig;
import org.eclipse.ditto.connectivity.service.config.ConnectivityConfigModifiedBehavior;
import org.eclipse.ditto.connectivity.service.config.ConnectivityConfigProvider;
import org.eclipse.ditto.connectivity.service.config.ConnectivityConfigProviderFactory;
import org.eclipse.ditto.connectivity.service.config.DittoConnectivityConfig;
import org.eclipse.ditto.connectivity.service.config.MonitoringConfig;
import org.eclipse.ditto.connectivity.service.config.mapping.MapperLimitsConfig;
import org.eclipse.ditto.connectivity.service.mapping.ConnectionContext;
import org.eclipse.ditto.connectivity.service.mapping.DittoConnectionContext;
import org.eclipse.ditto.connectivity.service.messaging.internal.ClientConnected;
import org.eclipse.ditto.connectivity.service.messaging.internal.ClientDisconnected;
import org.eclipse.ditto.connectivity.service.messaging.internal.ConnectionFailure;
Expand Down Expand Up @@ -197,8 +196,8 @@ public abstract class BaseClientActor extends AbstractFSMWithStash<BaseClientSta
private final String actorUUID;

// TODO: make async
private final ConnectivityConfigProvider connectivityConfigProvider;
private ConnectionContext connectionContext;
private final ConnectionContextProvider connectionContextProvider;
protected ConnectionContext connectionContext;

private final ActorRef inboundMappingProcessorActor;
private final ActorRef outboundDispatchingActor;
Expand Down Expand Up @@ -230,9 +229,8 @@ protected BaseClientActor(final Connection connection, @Nullable final ActorRef
DefaultScopedConfig.dittoScoped(getContext().getSystem().settings().config()));
clientConfig = connectivityConfig.getClientConfig();
proxyActorSelection = getLocalActorOfSamePath(proxyActor);
connectivityConfigProvider = ConnectivityConfigProviderFactory.getInstance(getContext().getSystem());
connectionContext =
DittoConnectionContext.of(connection, connectivityConfigProvider.getConnectivityConfig(connectionId));
connectionContextProvider = ConnectivityConfigProviderFactory.getInstance(getContext().getSystem());
connectionContext = connectionContextProvider.getConnectionContext(connection);

final ProtocolAdapterProvider protocolAdapterProvider =
ProtocolAdapterProvider.load(connectivityConfig.getProtocolConfig(), getContext().getSystem());
Expand Down Expand Up @@ -316,7 +314,7 @@ public void preStart() throws Exception {
whenUnhandled(inAnyState()
.anyEvent(this::onUnknownEvent));

connectivityConfigProvider.registerForConnectivityConfigChanges(connectionId(), getSelf());
connectionContextProvider.registerForConnectivityConfigChanges(connectionId(), getSelf());

initialize();

Expand Down Expand Up @@ -528,7 +526,7 @@ protected FSMStateFunctionBuilder<BaseClientState, BaseClientData> inAnyState()
.event(PublishMappedMessage.class, this::publishMappedMessage)
.event(ConnectivityCommand.class, this::onUnknownEvent) // relevant connectivity commands were handled
.event(org.eclipse.ditto.base.model.signals.events.Event.class,
(event, data) -> connectivityConfigProvider.canHandle(event),
(event, data) -> connectionContextProvider.canHandle(event),
(ccb, data) -> {
handleEvent(ccb);
return stay();
Expand Down Expand Up @@ -1610,15 +1608,9 @@ private Pair<ActorRef, ActorRef> startOutboundActors(final Connection connection
final OutboundMappingSettings settings;
final OutboundMappingProcessor outboundMappingProcessor;
try {
ConnectivityConfig retrievedConnectivityConfig =
connectivityConfigProvider.getConnectivityConfig(connection.getId());
final var newConnectionContext = connectionContext.withConnectivityConfig(retrievedConnectivityConfig);
// this one throws DittoRuntimeExceptions when the mapper could not be configured
settings = OutboundMappingSettings.of(connectionContext,
getContext().getSystem(),
proxyActorSelection,
protocolAdapter,
logger);
settings = OutboundMappingSettings.of(connectionContext, getContext().getSystem(), proxyActorSelection,
protocolAdapter, logger);
outboundMappingProcessor = OutboundMappingProcessor.of(settings);
} catch (final DittoRuntimeException dre) {
connectionLogger.failure("Failed to start message mapping processor due to: {0}.", dre.getMessage());
Expand Down Expand Up @@ -1675,15 +1667,9 @@ private ActorRef startInboundMappingProcessorActor(final Connection connection,

final InboundMappingProcessor inboundMappingProcessor;
try {
ConnectivityConfig retrievedConnectivityConfig =
connectivityConfigProvider.getConnectivityConfig(connection.getId());

// this one throws DittoRuntimeExceptions when the mapper could not be configured
inboundMappingProcessor = InboundMappingProcessor.of(
connectionContext.withConnectivityConfig(retrievedConnectivityConfig),
getContext().getSystem(),
protocolAdapter,
logger);
inboundMappingProcessor =
InboundMappingProcessor.of(connectionContext, getContext().getSystem(), protocolAdapter, logger);
} catch (final DittoRuntimeException dre) {
connectionLogger.failure("Failed to start message mapping processor due to: {0}.", dre.getMessage());
logger.info("Got DittoRuntimeException during initialization of MessageMappingProcessor: {} {} - desc: {}",
Expand Down

0 comments on commit cb0de86

Please sign in to comment.