Skip to content

Commit

Permalink
Make strategy context dynamic.
Browse files Browse the repository at this point in the history
Signed-off-by: Cai Yufei (INST/ECS1) <yufei.cai@bosch-si.com>
  • Loading branch information
yufei-cai committed Sep 13, 2019
1 parent 7b6bdf6 commit 078c437
Show file tree
Hide file tree
Showing 7 changed files with 142 additions and 61 deletions.
Expand Up @@ -65,6 +65,7 @@
import org.eclipse.ditto.services.connectivity.messaging.monitoring.metrics.RetrieveConnectionStatusAggregatorActor;
import org.eclipse.ditto.services.connectivity.messaging.mqtt.MqttValidator;
import org.eclipse.ditto.services.connectivity.messaging.persistence.ConnectionMongoSnapshotAdapter;
import org.eclipse.ditto.services.connectivity.messaging.persistence.stages.ConnectionState;
import org.eclipse.ditto.services.connectivity.messaging.persistence.stages.StagedCommand;
import org.eclipse.ditto.services.connectivity.messaging.rabbitmq.RabbitMQValidator;
import org.eclipse.ditto.services.connectivity.messaging.validation.CompoundConnectivityCommandInterceptor;
Expand All @@ -79,6 +80,7 @@
import org.eclipse.ditto.services.utils.persistence.mongo.config.ActivityCheckConfig;
import org.eclipse.ditto.services.utils.persistentactors.AbstractShardedPersistenceActor;
import org.eclipse.ditto.services.utils.persistentactors.commands.CommandStrategy;
import org.eclipse.ditto.services.utils.persistentactors.commands.DefaultContext;
import org.eclipse.ditto.services.utils.persistentactors.events.EventStrategy;
import org.eclipse.ditto.signals.base.Signal;
import org.eclipse.ditto.signals.commands.base.Command;
Expand Down Expand Up @@ -117,7 +119,7 @@
* remote server is delegated to a child actor that uses a specific client (AMQP 1.0 or 0.9.1).
*/
public final class ConnectionActor
extends AbstractShardedPersistenceActor<Signal, Connection, ConnectionId, ConnectivityEvent> {
extends AbstractShardedPersistenceActor<Signal, Connection, ConnectionId, ConnectionState, ConnectivityEvent> {

/**
* Prefix to prepend to the connection ID to construct the persistence ID.
Expand Down Expand Up @@ -271,6 +273,11 @@ protected Class<ConnectivityEvent> getEventClass() {
return ConnectivityEvent.class;
}

@Override
protected CommandStrategy.Context<ConnectionState> getStrategyContext() {
return DefaultContext.getInstance(ConnectionState.of(entityId, connectionLogger, commandValidator), log);
}

@Override
protected ConnectionCreatedStrategies getCreatedStrategy() {
return ConnectionCreatedStrategies.getInstance();
Expand Down
Expand Up @@ -39,9 +39,9 @@
import org.eclipse.ditto.model.base.headers.DittoHeaders;
import org.eclipse.ditto.model.base.headers.WithDittoHeaders;
import org.eclipse.ditto.model.connectivity.Connection;
import org.eclipse.ditto.model.connectivity.ConnectionId;
import org.eclipse.ditto.model.connectivity.ConnectivityStatus;
import org.eclipse.ditto.services.connectivity.messaging.persistence.stages.ConnectionAction;
import org.eclipse.ditto.services.connectivity.messaging.persistence.stages.ConnectionState;
import org.eclipse.ditto.services.connectivity.messaging.persistence.stages.StagedCommand;
import org.eclipse.ditto.services.utils.persistentactors.commands.AbstractCommandStrategy;
import org.eclipse.ditto.services.utils.persistentactors.commands.AbstractReceiveStrategy;
Expand Down Expand Up @@ -82,7 +82,7 @@

// TODO
public class ConnectionCreatedStrategies
extends AbstractReceiveStrategy<Signal, Connection, ConnectionId, Result<ConnectivityEvent>> {
extends AbstractReceiveStrategy<Signal, Connection, ConnectionState, Result<ConnectivityEvent>> {

private static final ConnectionCreatedStrategies CREATED_STRATEGIES = newCreatedStrategies();

Expand Down Expand Up @@ -115,7 +115,7 @@ private static ConnectionCreatedStrategies newCreatedStrategies() {
}

@Override
public Result<ConnectivityEvent> unhandled(final Context<ConnectionId> context,
public Result<ConnectivityEvent> unhandled(final Context<ConnectionState> context,
@Nullable final Connection entity,
final long nextRevision,
final Signal signal) {
Expand All @@ -131,7 +131,7 @@ protected Result<ConnectivityEvent> getEmptyResult() {
}

private abstract static class AbstractStrategy<C>
extends AbstractCommandStrategy<C, Connection, ConnectionId, Result<ConnectivityEvent>> {
extends AbstractCommandStrategy<C, Connection, ConnectionState, Result<ConnectivityEvent>> {

AbstractStrategy(final Class<C> theMatchingClass) {
super(theMatchingClass);
Expand All @@ -142,9 +142,9 @@ public boolean isDefined(final C command) {
return true;
}

ConnectionNotAccessibleException notAccessible(final Context<ConnectionId> context,
ConnectionNotAccessibleException notAccessible(final Context<ConnectionState> context,
final WithDittoHeaders command) {
return ConnectionNotAccessibleException.newBuilder(context.getState())
return ConnectionNotAccessibleException.newBuilder(context.getState().id())
.dittoHeaders(command.getDittoHeaders())
.build();
}
Expand All @@ -160,7 +160,7 @@ private abstract static class AbstractSingleActionStrategy<C extends Connectivit
abstract ConnectionAction getAction();

@Override
protected Result<ConnectivityEvent> doApply(final Context<ConnectionId> context,
protected Result<ConnectivityEvent> doApply(final Context<ConnectionState> context,
@Nullable final Connection connection, final long nextRevision, final C command) {
final ConnectivityEvent event = StagedCommand.dummyEvent();
final Collection<ConnectionAction> actions = Collections.singletonList(getAction());
Expand All @@ -176,12 +176,12 @@ private abstract static class AbstractEphemeralStrategy<C extends ConnectivityCo
super(theMatchingClass);
}

abstract WithDittoHeaders getResponse(final ConnectionId connectionId, final DittoHeaders headers);
abstract WithDittoHeaders getResponse(final ConnectionState connectionId, final DittoHeaders headers);

abstract Collection<ConnectionAction> getActions();

@Override
protected Result<ConnectivityEvent> doApply(final Context<ConnectionId> context,
protected Result<ConnectivityEvent> doApply(final Context<ConnectionState> context,
@Nullable final Connection connection, final long nextRevision, final C command) {
final ConnectivityEvent event = StagedCommand.dummyEvent();
final WithDittoHeaders response = getResponse(context.getState(), command.getDittoHeaders());
Expand All @@ -198,10 +198,10 @@ private TestConnectionConflictStrategy() {
}

@Override
protected Result<ConnectivityEvent> doApply(final Context<ConnectionId> context,
protected Result<ConnectivityEvent> doApply(final Context<ConnectionState> context,
@Nullable final Connection entity, final long nextRevision, final TestConnection command) {
return newQueryResult(command,
TestConnectionResponse.alreadyCreated(context.getState(), command.getDittoHeaders()));
TestConnectionResponse.alreadyCreated(context.getState().id(), command.getDittoHeaders()));
}
}

Expand All @@ -212,11 +212,11 @@ private ConnectionConflictStrategy() {
}

@Override
protected Result<ConnectivityEvent> doApply(final Context<ConnectionId> context,
protected Result<ConnectivityEvent> doApply(final Context<ConnectionState> context,
@Nullable final Connection entity, final long nextRevision, final CreateConnection command) {
context.getLog().info("Connection <{}> already exists! Responding with conflict.", context.getState());
context.getLog().info("Connection <{}> already exists! Responding with conflict.", context.getState().id());
final ConnectionConflictException conflictException =
ConnectionConflictException.newBuilder(context.getState())
ConnectionConflictException.newBuilder(context.getState().id())
.dittoHeaders(command.getDittoHeaders())
.build();
return newErrorResult(conflictException);
Expand All @@ -230,13 +230,13 @@ private ModifyConnectionStrategy() {
}

@Override
protected Result<ConnectivityEvent> doApply(final Context<ConnectionId> context,
protected Result<ConnectivityEvent> doApply(final Context<ConnectionState> context,
@Nullable final Connection entity, final long nextRevision, final ModifyConnection command) {
final Connection connection = command.getConnection().toBuilder().lifecycle(ACTIVE).build();
final ConnectivityEvent event =
ConnectionModified.of(connection, getEventTimestamp(), command.getDittoHeaders());
final WithDittoHeaders response =
ModifyConnectionResponse.of(context.getState(), command.getDittoHeaders());
ModifyConnectionResponse.of(context.getState().id(), command.getDittoHeaders());
final boolean isCurrentConnectionOpen = Optional.ofNullable(entity)
.map(c -> c.getConnectionStatus() == ConnectivityStatus.OPEN)
.orElse(false);
Expand Down Expand Up @@ -264,10 +264,11 @@ private OpenConnectionStrategy() {
}

@Override
protected Result<ConnectivityEvent> doApply(final Context<ConnectionId> context,
protected Result<ConnectivityEvent> doApply(final Context<ConnectionState> context,
@Nullable final Connection connection, final long nextRevision, final OpenConnection command) {
final ConnectivityEvent event = ConnectionOpened.of(context.getState(), command.getDittoHeaders());
final WithDittoHeaders response = OpenConnectionResponse.of(context.getState(), command.getDittoHeaders());
final ConnectivityEvent event = ConnectionOpened.of(context.getState().id(), command.getDittoHeaders());
final WithDittoHeaders response =
OpenConnectionResponse.of(context.getState().id(), command.getDittoHeaders());
final Collection<ConnectionAction> actions =
Arrays.asList(PERSIST_AND_APPLY_EVENT, OPEN_CONNECTION, UPDATE_SUBSCRIPTIONS, SEND_RESPONSE);
return newMutationResult(StagedCommand.of(command, event, response, actions), event, response);
Expand All @@ -281,10 +282,11 @@ private CloseConnectionStrategy() {
}

@Override
protected Result<ConnectivityEvent> doApply(final Context<ConnectionId> context,
protected Result<ConnectivityEvent> doApply(final Context<ConnectionState> context,
@Nullable final Connection connection, final long nextRevision, final CloseConnection command) {
final ConnectivityEvent event = ConnectionClosed.of(context.getState(), command.getDittoHeaders());
final WithDittoHeaders response = CloseConnectionResponse.of(context.getState(), command.getDittoHeaders());
final ConnectivityEvent event = ConnectionClosed.of(context.getState().id(), command.getDittoHeaders());
final WithDittoHeaders response =
CloseConnectionResponse.of(context.getState().id(), command.getDittoHeaders());
final Collection<ConnectionAction> actions =
Arrays.asList(PERSIST_AND_APPLY_EVENT, UPDATE_SUBSCRIPTIONS, CLOSE_CONNECTION, SEND_RESPONSE);
return newMutationResult(StagedCommand.of(command, event, response, actions), event, response);
Expand All @@ -298,11 +300,11 @@ private DeleteConnectionStrategy() {
}

@Override
protected Result<ConnectivityEvent> doApply(final Context<ConnectionId> context,
protected Result<ConnectivityEvent> doApply(final Context<ConnectionState> context,
@Nullable final Connection connection, final long nextRevision, final DeleteConnection command) {
final ConnectivityEvent event = ConnectionDeleted.of(context.getState(), command.getDittoHeaders());
final ConnectivityEvent event = ConnectionDeleted.of(context.getState().id(), command.getDittoHeaders());
final WithDittoHeaders response =
DeleteConnectionResponse.of(context.getState(), command.getDittoHeaders());
DeleteConnectionResponse.of(context.getState().id(), command.getDittoHeaders());
final Collection<ConnectionAction> actions =
Arrays.asList(PERSIST_AND_APPLY_EVENT, CLOSE_CONNECTION, SEND_RESPONSE, BECOME_DELETED);
return newMutationResult(StagedCommand.of(command, event, response, actions), event, response);
Expand All @@ -317,8 +319,8 @@ private ResetConnectionMetricsStrategy() {
}

@Override
WithDittoHeaders getResponse(final ConnectionId connectionId, final DittoHeaders headers) {
return ResetConnectionMetricsResponse.of(connectionId, headers);
WithDittoHeaders getResponse(final ConnectionState state, final DittoHeaders headers) {
return ResetConnectionMetricsResponse.of(state.id(), headers);
}

@Override
Expand All @@ -334,8 +336,8 @@ private EnableConnectionLogsStrategy() {
}

@Override
WithDittoHeaders getResponse(final ConnectionId connectionId, final DittoHeaders headers) {
return EnableConnectionLogsResponse.of(connectionId, headers);
WithDittoHeaders getResponse(final ConnectionState state, final DittoHeaders headers) {
return EnableConnectionLogsResponse.of(state.id(), headers);
}

@Override
Expand Down Expand Up @@ -364,8 +366,8 @@ private ResetConnectionLogsStrategy() {
}

@Override
WithDittoHeaders getResponse(final ConnectionId connectionId, final DittoHeaders headers) {
return ResetConnectionLogsResponse.of(connectionId, headers);
WithDittoHeaders getResponse(final ConnectionState state, final DittoHeaders headers) {
return ResetConnectionLogsResponse.of(state.id(), headers);
}

@Override
Expand All @@ -381,7 +383,7 @@ private RetrieveConnectionStrategy() {
}

@Override
protected Result<ConnectivityEvent> doApply(final Context<ConnectionId> context,
protected Result<ConnectivityEvent> doApply(final Context<ConnectionState> context,
@Nullable final Connection entity, final long nextRevision, final RetrieveConnection command) {
if (entity != null) {
return ResultFactory.newQueryResult(command,
Expand Down
Expand Up @@ -31,9 +31,9 @@

import org.eclipse.ditto.model.base.headers.WithDittoHeaders;
import org.eclipse.ditto.model.connectivity.Connection;
import org.eclipse.ditto.model.connectivity.ConnectionId;
import org.eclipse.ditto.model.connectivity.ConnectivityStatus;
import org.eclipse.ditto.services.connectivity.messaging.persistence.stages.ConnectionAction;
import org.eclipse.ditto.services.connectivity.messaging.persistence.stages.ConnectionState;
import org.eclipse.ditto.services.connectivity.messaging.persistence.stages.StagedCommand;
import org.eclipse.ditto.services.utils.persistentactors.commands.AbstractCommandStrategy;
import org.eclipse.ditto.services.utils.persistentactors.commands.AbstractReceiveStrategy;
Expand All @@ -51,7 +51,7 @@

// TODO
public class ConnectionDeletedStrategies
extends AbstractReceiveStrategy<Signal, Connection, ConnectionId, Result<ConnectivityEvent>> {
extends AbstractReceiveStrategy<Signal, Connection, ConnectionState, Result<ConnectivityEvent>> {

private static final ConnectionDeletedStrategies DELETED_STRATEGIES = newDeletedStrategies();

Expand All @@ -72,15 +72,15 @@ private static ConnectionDeletedStrategies newDeletedStrategies() {
}

@Override
public Result<ConnectivityEvent> unhandled(final Context<ConnectionId> context,
public Result<ConnectivityEvent> unhandled(final Context<ConnectionState> context,
@Nullable final Connection entity,
final long nextRevision,
final Signal signal) {

if (signal instanceof ConnectivityCommand) {
final ConnectivityCommand command = (ConnectivityCommand) signal;
context.getLog().warning("Received command for deleted connection, rejecting: <{}>", command);
return ResultFactory.newErrorResult(ConnectionNotAccessibleException.newBuilder(context.getState())
return ResultFactory.newErrorResult(ConnectionNotAccessibleException.newBuilder(context.getState().id())
.dittoHeaders(signal.getDittoHeaders())
.build());
} else {
Expand All @@ -95,7 +95,7 @@ protected Result<ConnectivityEvent> getEmptyResult() {
}

private abstract static class AbstractStrategy<C>
extends AbstractCommandStrategy<C, Connection, ConnectionId, Result<ConnectivityEvent>> {
extends AbstractCommandStrategy<C, Connection, ConnectionState, Result<ConnectivityEvent>> {

AbstractStrategy(final Class<C> theMatchingClass) {
super(theMatchingClass);
Expand All @@ -114,7 +114,7 @@ private TestConnectionStrategy() {
}

@Override
protected Result<ConnectivityEvent> doApply(final Context<ConnectionId> context,
protected Result<ConnectivityEvent> doApply(final Context<ConnectionState> context,
@Nullable final Connection entity, final long nextRevision, final TestConnection command) {
if (entity != null) {
final Connection connection = command.getConnection();
Expand All @@ -125,7 +125,7 @@ protected Result<ConnectivityEvent> doApply(final Context<ConnectionId> context,
return newMutationResult(stagedCommand, event, command);
} else {
return newQueryResult(command,
TestConnectionResponse.alreadyCreated(context.getState(), command.getDittoHeaders()));
TestConnectionResponse.alreadyCreated(context.getState().id(), command.getDittoHeaders()));
}
}
}
Expand All @@ -137,7 +137,7 @@ private CreateConnectionStrategy() {
}

@Override
protected Result<ConnectivityEvent> doApply(final Context<ConnectionId> context,
protected Result<ConnectivityEvent> doApply(final Context<ConnectionState> context,
@Nullable final Connection entity, final long nextRevision, final CreateConnection command) {
final Connection connection = command.getConnection().toBuilder().lifecycle(ACTIVE).build();
final ConnectivityEvent event =
Expand Down

0 comments on commit 078c437

Please sign in to comment.