Skip to content

Commit

Permalink
[#1081] update connection config in connection persistence actor.
Browse files Browse the repository at this point in the history
Signed-off-by: Yufei Cai <yufei.cai@bosch.io>
  • Loading branch information
yufei-cai committed Jun 27, 2021
1 parent 8a5f674 commit bd36c10
Show file tree
Hide file tree
Showing 12 changed files with 294 additions and 69 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.eclipse.ditto.connectivity.model.Connection;
import org.eclipse.ditto.connectivity.model.ConnectionId;
import org.eclipse.ditto.connectivity.service.mapping.ConnectionContext;
import org.eclipse.ditto.connectivity.service.mapping.DittoConnectionContext;

import akka.actor.ActorRef;

Expand All @@ -31,14 +32,26 @@
@IndexSubclasses
public interface ConnectionContextProvider {

/**
* Loads a {@link ConnectivityConfig} by a connection ID.
*
* @param connectionId the connection id for which to load the {@link ConnectivityConfig}
* @param dittoHeaders the ditto headers for which to load the {@link ConnectivityConfig}
* @return the future connectivity config
*/
CompletionStage<ConnectivityConfig> getConnectivityConfig(ConnectionId connectionId, DittoHeaders dittoHeaders);

/**
* Loads a {@link org.eclipse.ditto.connectivity.service.mapping.ConnectionContext} by a connection.
*
* @param connection the connection for which to load the connection context.
* @param dittoHeaders the ditto headers for which to load the connection context.
* @return the connectivity context
* @return the future connectivity context
*/
CompletionStage<ConnectionContext> getConnectionContext(Connection connection, DittoHeaders dittoHeaders);
default CompletionStage<ConnectionContext> getConnectionContext(Connection connection, DittoHeaders dittoHeaders) {
return getConnectivityConfig(connection.getId(), dittoHeaders)
.thenApply(config -> DittoConnectionContext.of(connection, config));
}

/**
* Loads a connection context.
Expand All @@ -56,9 +69,11 @@ default CompletionStage<ConnectionContext> getConnectionContext(Connection conne
* ConnectivityConfig}.
*
* @param connectionId the connection id
* @param dittoHeaders the ditto headers
* @param subscriber the subscriber that will receive {@link Event}s
*/
void registerForConnectivityConfigChanges(ConnectionId connectionId, ActorRef subscriber);
void registerForConnectivityConfigChanges(ConnectionId connectionId, final DittoHeaders dittoHeaders,
ActorRef subscriber);

/**
* Returns {@code true} if the implementation can handle the given {@code event} to generate a modified {@link
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
*/
package org.eclipse.ditto.connectivity.service.config;

import org.eclipse.ditto.base.model.headers.DittoHeaders;
import org.eclipse.ditto.connectivity.model.ConnectionId;
import org.eclipse.ditto.base.model.signals.events.Event;

Expand Down Expand Up @@ -51,7 +52,8 @@ default void handleEvent(final Event<?> event) {
* @param connectionId the connection id
*/
default void registerForConfigChanges(ConnectionId connectionId) {
getConnectivityConfigProvider().registerForConnectivityConfigChanges(connectionId, self());
getConnectivityConfigProvider()
.registerForConnectivityConfigChanges(connectionId, DittoHeaders.empty(), self());
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,7 @@

import org.eclipse.ditto.base.model.headers.DittoHeaders;
import org.eclipse.ditto.base.model.signals.events.Event;
import org.eclipse.ditto.connectivity.model.Connection;
import org.eclipse.ditto.connectivity.model.ConnectionId;
import org.eclipse.ditto.connectivity.service.mapping.ConnectionContext;
import org.eclipse.ditto.connectivity.service.mapping.DittoConnectionContext;
import org.eclipse.ditto.internal.utils.config.DefaultScopedConfig;

import akka.actor.ActorRef;
Expand All @@ -41,12 +38,14 @@ public DittoConnectionContextProvider(final ActorSystem actorSystem) {
}

@Override
public CompletionStage<ConnectionContext> getConnectionContext(final Connection connection, final DittoHeaders dittoHeaders) {
return CompletableFuture.completedStage(DittoConnectionContext.of(connection, connectivityConfig));
public CompletionStage<ConnectivityConfig> getConnectivityConfig(final ConnectionId connectionId,
final DittoHeaders dittoHeaders) {
return CompletableFuture.completedStage(connectivityConfig);
}

@Override
public void registerForConnectivityConfigChanges(final ConnectionId connectionId, final ActorRef subscriber) {
public void registerForConnectivityConfigChanges(final ConnectionId connectionId,
final DittoHeaders dittoHeaders, final ActorRef subscriber) {
// nothing to do, config changes are not supported by the default implementation
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,7 @@ public void preStart() throws Exception {
whenUnhandled(inAnyState()
.anyEvent(this::onUnknownEvent));

connectionContextProvider.registerForConnectivityConfigChanges(connectionId(), getSelf());
connectionContextProvider.registerForConnectivityConfigChanges(connectionId(), DittoHeaders.empty(), getSelf());

initialize();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.eclipse.ditto.base.model.json.JsonSchemaVersion;
import org.eclipse.ditto.base.model.signals.SignalWithEntityId;
import org.eclipse.ditto.base.model.signals.commands.Command;
import org.eclipse.ditto.base.model.signals.events.Event;
import org.eclipse.ditto.connectivity.api.BaseClientState;
import org.eclipse.ditto.connectivity.model.Connection;
import org.eclipse.ditto.connectivity.model.ConnectionId;
Expand Down Expand Up @@ -69,6 +70,8 @@
import org.eclipse.ditto.connectivity.model.signals.events.ConnectionOpened;
import org.eclipse.ditto.connectivity.model.signals.events.ConnectivityEvent;
import org.eclipse.ditto.connectivity.service.config.ConnectionConfig;
import org.eclipse.ditto.connectivity.service.config.ConnectionContextProvider;
import org.eclipse.ditto.connectivity.service.config.ConnectionContextProviderFactory;
import org.eclipse.ditto.connectivity.service.config.ConnectivityConfig;
import org.eclipse.ditto.connectivity.service.config.MonitoringConfig;
import org.eclipse.ditto.connectivity.service.config.MqttConfig;
Expand All @@ -89,6 +92,8 @@
import org.eclipse.ditto.connectivity.service.messaging.persistence.stages.StagedCommand;
import org.eclipse.ditto.connectivity.service.messaging.persistence.strategies.commands.ConnectionCreatedStrategies;
import org.eclipse.ditto.connectivity.service.messaging.persistence.strategies.commands.ConnectionDeletedStrategies;
import org.eclipse.ditto.connectivity.service.messaging.persistence.strategies.commands.ConnectionUninitializedStrategies;
import org.eclipse.ditto.connectivity.service.messaging.persistence.strategies.commands.ConnectivityCommandStrategies;
import org.eclipse.ditto.connectivity.service.messaging.persistence.strategies.events.ConnectionEventStrategies;
import org.eclipse.ditto.connectivity.service.messaging.rabbitmq.RabbitMQValidator;
import org.eclipse.ditto.connectivity.service.messaging.validation.CompoundConnectivityCommandInterceptor;
Expand Down Expand Up @@ -157,7 +162,6 @@ public final class ConnectionPersistenceActor
private final ActorRef proxyActor;
private final ClientActorPropsFactory propsFactory;
private final boolean allClientActorsOnOneNode;
private final ConnectivityCommandInterceptor commandValidator;
private final ConnectionLogger connectionLogger;
private final Duration clientActorAskTimeout;
private final Duration checkLoggingActiveInterval;
Expand All @@ -166,7 +170,13 @@ public final class ConnectionPersistenceActor
private final MonitoringConfig monitoringConfig;
private final ClientActorRefs clientActorRefs = ClientActorRefs.empty();
private final ConnectionPriorityProvider connectionPriorityProvider;
private final ConnectionContextProvider connectionContextProvider;

@Nullable private final ConnectivityCommandInterceptor customCommandValidator;
private ConnectivityCommandInterceptor commandValidator;

private ConnectivityCommandStrategies createdStrategies;
private ConnectivityCommandStrategies deletedStrategies;
private int subscriptionCounter = 0;
private Instant connectionClosedAt = Instant.now();
@Nullable private Instant loggingEnabledUntil;
Expand All @@ -184,49 +194,30 @@ public final class ConnectionPersistenceActor

this.proxyActor = proxyActor;
this.propsFactory = propsFactory;
this.customCommandValidator = customCommandValidator;

final ActorSystem actorSystem = getContext().getSystem();

final ConnectivityConfig connectivityConfig = ConnectivityConfig.forActorSystem(actorSystem);
config = connectivityConfig.getConnectionConfig();
this.allClientActorsOnOneNode = allClientActorsOnOneNode.orElse(config.areAllClientActorsOnOneNode());
final MqttConfig mqttConfig = connectivityConfig.getConnectionConfig().getMqttConfig();
final ConnectionValidator connectionValidator =
ConnectionValidator.of(actorSystem.log(),
RabbitMQValidator.newInstance(),
AmqpValidator.newInstance(),
Mqtt3Validator.newInstance(mqttConfig),
Mqtt5Validator.newInstance(mqttConfig),
KafkaValidator.getInstance(),
HttpPushValidator.newInstance());

final DittoConnectivityCommandValidator dittoCommandValidator =
new DittoConnectivityCommandValidator(propsFactory, proxyActor, getSelf(), connectionValidator,
actorSystem);

if (customCommandValidator != null) {
commandValidator =
new CompoundConnectivityCommandInterceptor(dittoCommandValidator, customCommandValidator);
} else {
commandValidator = dittoCommandValidator;
}

this.connectionPriorityProvider = connectionPriorityProviderFactory.newProvider(self(), log);

connectionContextProvider = ConnectionContextProviderFactory.get(actorSystem).getInstance();
clientActorAskTimeout = config.getClientActorAskTimeout();

monitoringConfig = connectivityConfig.getMonitoringConfig();
final ConnectionLoggerRegistry loggerRegistry =
ConnectionLoggerRegistry.fromConfig(monitoringConfig.logger());
connectionLogger = loggerRegistry.forConnection(connectionId);


this.loggingEnabledDuration = monitoringConfig.logger().logDuration();
this.checkLoggingActiveInterval = monitoringConfig.logger().loggingActiveCheckInterval();
// Make duration fuzzy to avoid all connections getting updated at once.
final Duration fuzzyPriorityUpdateInterval =
makeFuzzy(connectivityConfig.getConnectionConfig().getPriorityUpdateInterval());
startUpdatePriorityPeriodically(fuzzyPriorityUpdateInterval);

createdStrategies = ConnectionUninitializedStrategies.of(command -> stash());
deletedStrategies = ConnectionUninitializedStrategies.of(this::stashAndInitialize);
}

/**
Expand Down Expand Up @@ -304,13 +295,13 @@ protected CommandStrategy.Context<ConnectionState> getStrategyContext() {
}

@Override
protected ConnectionCreatedStrategies getCreatedStrategy() {
return ConnectionCreatedStrategies.getInstance();
protected ConnectivityCommandStrategies getCreatedStrategy() {
return createdStrategies;
}

@Override
protected ConnectionDeletedStrategies getDeletedStrategy() {
return ConnectionDeletedStrategies.getInstance();
protected ConnectivityCommandStrategies getDeletedStrategy() {
return deletedStrategies;
}

@Override
Expand Down Expand Up @@ -375,6 +366,12 @@ protected void recoveryCompleted(final RecoveryCompleted event) {
log.debug("Opening connection <{}> after recovery.", entityId);
restoreOpenConnection();
}
if (entity != null) {
// Initialize command validator by connectivity config.
// For deleted connections, only start initiation on first command because the connection ID is not attached
// to any solution.
startInitialization(connectionContextProvider, entityId, DittoHeaders.empty(), getSelf());
}
super.recoveryCompleted(event);
}

Expand Down Expand Up @@ -563,9 +560,18 @@ protected Receive matchAnyAfterInitialization() {
// maintain client actor refs
.match(ActorRef.class, this::addClientActor)
.match(Terminated.class, this::removeClientActor)
.build()
.orElse(createInitializationAndConfigUpdateBehavior())
.orElse(super.matchAnyAfterInitialization());
}

.matchAny(message -> log.warning("Unknown message: {}", message))
.build();
@Override
protected Receive matchAnyWhenDeleted() {
return createInitializationAndConfigUpdateBehavior()
.orElse(ReceiveBuilder.create()
.match(Control.class, msg -> log.debug("Ignoring control message when deleted: <{}>", msg))
.build())
.orElse(super.matchAnyWhenDeleted());
}

/**
Expand Down Expand Up @@ -1003,6 +1009,74 @@ private void restoreOpenConnection() {
openConnection(stagedCommand, false);
}

private ConnectivityCommandInterceptor updateValidator(final ConnectivityConfig connectivityConfig) {
final var actorSystem = getContext().getSystem();
final MqttConfig mqttConfig = connectivityConfig.getConnectionConfig().getMqttConfig();
final ConnectionValidator connectionValidator =
ConnectionValidator.of(actorSystem.log(),
connectivityConfig,
RabbitMQValidator.newInstance(),
AmqpValidator.newInstance(),
Mqtt3Validator.newInstance(mqttConfig),
Mqtt5Validator.newInstance(mqttConfig),
KafkaValidator.getInstance(),
HttpPushValidator.newInstance());

final DittoConnectivityCommandValidator dittoCommandValidator =
new DittoConnectivityCommandValidator(propsFactory, proxyActor, getSelf(), connectionValidator,
actorSystem);

if (customCommandValidator != null) {
return new CompoundConnectivityCommandInterceptor(dittoCommandValidator, customCommandValidator);
} else {
return dittoCommandValidator;
}
}

private void setConnectivityConfig(final ConnectivityConfig connectivityConfig) {
commandValidator = updateValidator(connectivityConfig);
}

private void updateConnectivityConfig(final Event<?> event) {
connectionContextProvider.handleEvent(event).ifPresent(this::setConnectivityConfig);
}

private void initializeByConnectivityConfig(final ConnectivityConfig connectivityConfig) {
setConnectivityConfig(connectivityConfig);
createdStrategies = ConnectionCreatedStrategies.getInstance();
deletedStrategies = ConnectionDeletedStrategies.getInstance();
unstashAll();
}

private void stashAndInitialize(final ConnectivityCommand<?> command) {
startInitialization(connectionContextProvider, entityId, command.getDittoHeaders(), getSelf());
stash();
}

private Receive createInitializationAndConfigUpdateBehavior() {
return ReceiveBuilder.create()
.match(Event.class, connectionContextProvider::canHandle, this::updateConnectivityConfig)
.match(ConnectivityConfig.class, this::initializeByConnectivityConfig)
.match(InitializationFailure.class, failure -> {
log.error(failure.cause, "Initialization failed");
// terminate self to restart after backoff
getContext().stop(getSelf());
})
.build();
}

private static void startInitialization(final ConnectionContextProvider provider,
final ConnectionId connectionId, final DittoHeaders dittoHeaders, final ActorRef self) {

provider.registerForConnectivityConfigChanges(connectionId, dittoHeaders, self);
provider.getConnectivityConfig(connectionId, dittoHeaders)
.thenAccept(config -> self.tell(config, ActorRef.noSender()))
.exceptionally(error -> {
self.tell(new InitializationFailure(error), ActorRef.noSender());
return null;
});
}

private static DittoRuntimeException toDittoRuntimeException(final Throwable error, final ConnectionId id,
final DittoHeaders headers) {

Expand Down Expand Up @@ -1042,6 +1116,15 @@ enum Control {
TRIGGER_UPDATE_PRIORITY;
}

private static final class InitializationFailure {

private final Throwable cause;

private InitializationFailure(final Throwable cause) {
this.cause = cause;
}
}

/**
* Local message this actor may sent to itself in order to update the priority of the connection.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,9 @@
/**
* Strategies to handle signals as an existing connection.
*/
public class ConnectionCreatedStrategies
extends
AbstractCommandStrategies<ConnectivityCommand<?>, Connection, ConnectionState, ConnectivityEvent<?>> {
public final class ConnectionCreatedStrategies
extends AbstractCommandStrategies<ConnectivityCommand<?>, Connection, ConnectionState, ConnectivityEvent<?>>
implements ConnectivityCommandStrategies {

private static final ConnectionCreatedStrategies CREATED_STRATEGIES = newCreatedStrategies();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,20 +15,20 @@
import javax.annotation.Nullable;

import org.eclipse.ditto.connectivity.model.Connection;
import org.eclipse.ditto.connectivity.model.signals.commands.ConnectivityCommand;
import org.eclipse.ditto.connectivity.model.signals.commands.exceptions.ConnectionNotAccessibleException;
import org.eclipse.ditto.connectivity.model.signals.events.ConnectivityEvent;
import org.eclipse.ditto.connectivity.service.messaging.persistence.stages.ConnectionState;
import org.eclipse.ditto.internal.utils.persistentactors.commands.AbstractCommandStrategies;
import org.eclipse.ditto.internal.utils.persistentactors.results.Result;
import org.eclipse.ditto.internal.utils.persistentactors.results.ResultFactory;
import org.eclipse.ditto.connectivity.model.signals.commands.ConnectivityCommand;
import org.eclipse.ditto.connectivity.model.signals.commands.exceptions.ConnectionNotAccessibleException;
import org.eclipse.ditto.connectivity.model.signals.events.ConnectivityEvent;

/**
* Strategies to handle signals as a nonexistent connection.
*/
public class ConnectionDeletedStrategies
extends
AbstractCommandStrategies<ConnectivityCommand<?>, Connection, ConnectionState, ConnectivityEvent<?>> {
public final class ConnectionDeletedStrategies
extends AbstractCommandStrategies<ConnectivityCommand<?>, Connection, ConnectionState, ConnectivityEvent<?>>
implements ConnectivityCommandStrategies {

private static final ConnectionDeletedStrategies DELETED_STRATEGIES = newDeletedStrategies();

Expand Down
Loading

0 comments on commit bd36c10

Please sign in to comment.