From 078c4372fdd414745fb9d488e80c98c93df034a8 Mon Sep 17 00:00:00 2001 From: "Cai Yufei (INST/ECS1)" Date: Fri, 13 Sep 2019 13:56:32 +0200 Subject: [PATCH] Make strategy context dynamic. Signed-off-by: Cai Yufei (INST/ECS1) --- .../messaging/ConnectionActor.java | 9 ++- .../ConnectionCreatedStrategies.java | 66 ++++++++++--------- .../ConnectionDeletedStrategies.java | 16 ++--- .../persistence/stages/ConnectionState.java | 66 +++++++++++++++++++ .../actors/policy/PolicyPersistenceActor.java | 8 ++- .../actors/ThingPersistenceActor.java | 9 ++- .../AbstractShardedPersistenceActor.java | 29 ++++---- 7 files changed, 142 insertions(+), 61 deletions(-) create mode 100644 services/connectivity/messaging/src/main/java/org/eclipse/ditto/services/connectivity/messaging/persistence/stages/ConnectionState.java diff --git a/services/connectivity/messaging/src/main/java/org/eclipse/ditto/services/connectivity/messaging/ConnectionActor.java b/services/connectivity/messaging/src/main/java/org/eclipse/ditto/services/connectivity/messaging/ConnectionActor.java index 8cedbfb21d..c68a8004dd 100644 --- a/services/connectivity/messaging/src/main/java/org/eclipse/ditto/services/connectivity/messaging/ConnectionActor.java +++ b/services/connectivity/messaging/src/main/java/org/eclipse/ditto/services/connectivity/messaging/ConnectionActor.java @@ -65,6 +65,7 @@ import org.eclipse.ditto.services.connectivity.messaging.monitoring.metrics.RetrieveConnectionStatusAggregatorActor; import org.eclipse.ditto.services.connectivity.messaging.mqtt.MqttValidator; import org.eclipse.ditto.services.connectivity.messaging.persistence.ConnectionMongoSnapshotAdapter; +import org.eclipse.ditto.services.connectivity.messaging.persistence.stages.ConnectionState; import org.eclipse.ditto.services.connectivity.messaging.persistence.stages.StagedCommand; import org.eclipse.ditto.services.connectivity.messaging.rabbitmq.RabbitMQValidator; import org.eclipse.ditto.services.connectivity.messaging.validation.CompoundConnectivityCommandInterceptor; @@ -79,6 +80,7 @@ import org.eclipse.ditto.services.utils.persistence.mongo.config.ActivityCheckConfig; import org.eclipse.ditto.services.utils.persistentactors.AbstractShardedPersistenceActor; import org.eclipse.ditto.services.utils.persistentactors.commands.CommandStrategy; +import org.eclipse.ditto.services.utils.persistentactors.commands.DefaultContext; import org.eclipse.ditto.services.utils.persistentactors.events.EventStrategy; import org.eclipse.ditto.signals.base.Signal; import org.eclipse.ditto.signals.commands.base.Command; @@ -117,7 +119,7 @@ * remote server is delegated to a child actor that uses a specific client (AMQP 1.0 or 0.9.1). */ public final class ConnectionActor - extends AbstractShardedPersistenceActor { + extends AbstractShardedPersistenceActor { /** * Prefix to prepend to the connection ID to construct the persistence ID. @@ -271,6 +273,11 @@ protected Class getEventClass() { return ConnectivityEvent.class; } + @Override + protected CommandStrategy.Context getStrategyContext() { + return DefaultContext.getInstance(ConnectionState.of(entityId, connectionLogger, commandValidator), log); + } + @Override protected ConnectionCreatedStrategies getCreatedStrategy() { return ConnectionCreatedStrategies.getInstance(); diff --git a/services/connectivity/messaging/src/main/java/org/eclipse/ditto/services/connectivity/messaging/ConnectionCreatedStrategies.java b/services/connectivity/messaging/src/main/java/org/eclipse/ditto/services/connectivity/messaging/ConnectionCreatedStrategies.java index 00e2fe7c5c..f3a801d9bf 100644 --- a/services/connectivity/messaging/src/main/java/org/eclipse/ditto/services/connectivity/messaging/ConnectionCreatedStrategies.java +++ b/services/connectivity/messaging/src/main/java/org/eclipse/ditto/services/connectivity/messaging/ConnectionCreatedStrategies.java @@ -39,9 +39,9 @@ import org.eclipse.ditto.model.base.headers.DittoHeaders; import org.eclipse.ditto.model.base.headers.WithDittoHeaders; import org.eclipse.ditto.model.connectivity.Connection; -import org.eclipse.ditto.model.connectivity.ConnectionId; import org.eclipse.ditto.model.connectivity.ConnectivityStatus; import org.eclipse.ditto.services.connectivity.messaging.persistence.stages.ConnectionAction; +import org.eclipse.ditto.services.connectivity.messaging.persistence.stages.ConnectionState; import org.eclipse.ditto.services.connectivity.messaging.persistence.stages.StagedCommand; import org.eclipse.ditto.services.utils.persistentactors.commands.AbstractCommandStrategy; import org.eclipse.ditto.services.utils.persistentactors.commands.AbstractReceiveStrategy; @@ -82,7 +82,7 @@ // TODO public class ConnectionCreatedStrategies - extends AbstractReceiveStrategy> { + extends AbstractReceiveStrategy> { private static final ConnectionCreatedStrategies CREATED_STRATEGIES = newCreatedStrategies(); @@ -115,7 +115,7 @@ private static ConnectionCreatedStrategies newCreatedStrategies() { } @Override - public Result unhandled(final Context context, + public Result unhandled(final Context context, @Nullable final Connection entity, final long nextRevision, final Signal signal) { @@ -131,7 +131,7 @@ protected Result getEmptyResult() { } private abstract static class AbstractStrategy - extends AbstractCommandStrategy> { + extends AbstractCommandStrategy> { AbstractStrategy(final Class theMatchingClass) { super(theMatchingClass); @@ -142,9 +142,9 @@ public boolean isDefined(final C command) { return true; } - ConnectionNotAccessibleException notAccessible(final Context context, + ConnectionNotAccessibleException notAccessible(final Context context, final WithDittoHeaders command) { - return ConnectionNotAccessibleException.newBuilder(context.getState()) + return ConnectionNotAccessibleException.newBuilder(context.getState().id()) .dittoHeaders(command.getDittoHeaders()) .build(); } @@ -160,7 +160,7 @@ private abstract static class AbstractSingleActionStrategy doApply(final Context context, + protected Result doApply(final Context context, @Nullable final Connection connection, final long nextRevision, final C command) { final ConnectivityEvent event = StagedCommand.dummyEvent(); final Collection actions = Collections.singletonList(getAction()); @@ -176,12 +176,12 @@ private abstract static class AbstractEphemeralStrategy getActions(); @Override - protected Result doApply(final Context context, + protected Result doApply(final Context context, @Nullable final Connection connection, final long nextRevision, final C command) { final ConnectivityEvent event = StagedCommand.dummyEvent(); final WithDittoHeaders response = getResponse(context.getState(), command.getDittoHeaders()); @@ -198,10 +198,10 @@ private TestConnectionConflictStrategy() { } @Override - protected Result doApply(final Context context, + protected Result doApply(final Context context, @Nullable final Connection entity, final long nextRevision, final TestConnection command) { return newQueryResult(command, - TestConnectionResponse.alreadyCreated(context.getState(), command.getDittoHeaders())); + TestConnectionResponse.alreadyCreated(context.getState().id(), command.getDittoHeaders())); } } @@ -212,11 +212,11 @@ private ConnectionConflictStrategy() { } @Override - protected Result doApply(final Context context, + protected Result doApply(final Context context, @Nullable final Connection entity, final long nextRevision, final CreateConnection command) { - context.getLog().info("Connection <{}> already exists! Responding with conflict.", context.getState()); + context.getLog().info("Connection <{}> already exists! Responding with conflict.", context.getState().id()); final ConnectionConflictException conflictException = - ConnectionConflictException.newBuilder(context.getState()) + ConnectionConflictException.newBuilder(context.getState().id()) .dittoHeaders(command.getDittoHeaders()) .build(); return newErrorResult(conflictException); @@ -230,13 +230,13 @@ private ModifyConnectionStrategy() { } @Override - protected Result doApply(final Context context, + protected Result doApply(final Context context, @Nullable final Connection entity, final long nextRevision, final ModifyConnection command) { final Connection connection = command.getConnection().toBuilder().lifecycle(ACTIVE).build(); final ConnectivityEvent event = ConnectionModified.of(connection, getEventTimestamp(), command.getDittoHeaders()); final WithDittoHeaders response = - ModifyConnectionResponse.of(context.getState(), command.getDittoHeaders()); + ModifyConnectionResponse.of(context.getState().id(), command.getDittoHeaders()); final boolean isCurrentConnectionOpen = Optional.ofNullable(entity) .map(c -> c.getConnectionStatus() == ConnectivityStatus.OPEN) .orElse(false); @@ -264,10 +264,11 @@ private OpenConnectionStrategy() { } @Override - protected Result doApply(final Context context, + protected Result doApply(final Context context, @Nullable final Connection connection, final long nextRevision, final OpenConnection command) { - final ConnectivityEvent event = ConnectionOpened.of(context.getState(), command.getDittoHeaders()); - final WithDittoHeaders response = OpenConnectionResponse.of(context.getState(), command.getDittoHeaders()); + final ConnectivityEvent event = ConnectionOpened.of(context.getState().id(), command.getDittoHeaders()); + final WithDittoHeaders response = + OpenConnectionResponse.of(context.getState().id(), command.getDittoHeaders()); final Collection actions = Arrays.asList(PERSIST_AND_APPLY_EVENT, OPEN_CONNECTION, UPDATE_SUBSCRIPTIONS, SEND_RESPONSE); return newMutationResult(StagedCommand.of(command, event, response, actions), event, response); @@ -281,10 +282,11 @@ private CloseConnectionStrategy() { } @Override - protected Result doApply(final Context context, + protected Result doApply(final Context context, @Nullable final Connection connection, final long nextRevision, final CloseConnection command) { - final ConnectivityEvent event = ConnectionClosed.of(context.getState(), command.getDittoHeaders()); - final WithDittoHeaders response = CloseConnectionResponse.of(context.getState(), command.getDittoHeaders()); + final ConnectivityEvent event = ConnectionClosed.of(context.getState().id(), command.getDittoHeaders()); + final WithDittoHeaders response = + CloseConnectionResponse.of(context.getState().id(), command.getDittoHeaders()); final Collection actions = Arrays.asList(PERSIST_AND_APPLY_EVENT, UPDATE_SUBSCRIPTIONS, CLOSE_CONNECTION, SEND_RESPONSE); return newMutationResult(StagedCommand.of(command, event, response, actions), event, response); @@ -298,11 +300,11 @@ private DeleteConnectionStrategy() { } @Override - protected Result doApply(final Context context, + protected Result doApply(final Context context, @Nullable final Connection connection, final long nextRevision, final DeleteConnection command) { - final ConnectivityEvent event = ConnectionDeleted.of(context.getState(), command.getDittoHeaders()); + final ConnectivityEvent event = ConnectionDeleted.of(context.getState().id(), command.getDittoHeaders()); final WithDittoHeaders response = - DeleteConnectionResponse.of(context.getState(), command.getDittoHeaders()); + DeleteConnectionResponse.of(context.getState().id(), command.getDittoHeaders()); final Collection actions = Arrays.asList(PERSIST_AND_APPLY_EVENT, CLOSE_CONNECTION, SEND_RESPONSE, BECOME_DELETED); return newMutationResult(StagedCommand.of(command, event, response, actions), event, response); @@ -317,8 +319,8 @@ private ResetConnectionMetricsStrategy() { } @Override - WithDittoHeaders getResponse(final ConnectionId connectionId, final DittoHeaders headers) { - return ResetConnectionMetricsResponse.of(connectionId, headers); + WithDittoHeaders getResponse(final ConnectionState state, final DittoHeaders headers) { + return ResetConnectionMetricsResponse.of(state.id(), headers); } @Override @@ -334,8 +336,8 @@ private EnableConnectionLogsStrategy() { } @Override - WithDittoHeaders getResponse(final ConnectionId connectionId, final DittoHeaders headers) { - return EnableConnectionLogsResponse.of(connectionId, headers); + WithDittoHeaders getResponse(final ConnectionState state, final DittoHeaders headers) { + return EnableConnectionLogsResponse.of(state.id(), headers); } @Override @@ -364,8 +366,8 @@ private ResetConnectionLogsStrategy() { } @Override - WithDittoHeaders getResponse(final ConnectionId connectionId, final DittoHeaders headers) { - return ResetConnectionLogsResponse.of(connectionId, headers); + WithDittoHeaders getResponse(final ConnectionState state, final DittoHeaders headers) { + return ResetConnectionLogsResponse.of(state.id(), headers); } @Override @@ -381,7 +383,7 @@ private RetrieveConnectionStrategy() { } @Override - protected Result doApply(final Context context, + protected Result doApply(final Context context, @Nullable final Connection entity, final long nextRevision, final RetrieveConnection command) { if (entity != null) { return ResultFactory.newQueryResult(command, diff --git a/services/connectivity/messaging/src/main/java/org/eclipse/ditto/services/connectivity/messaging/ConnectionDeletedStrategies.java b/services/connectivity/messaging/src/main/java/org/eclipse/ditto/services/connectivity/messaging/ConnectionDeletedStrategies.java index f24629c1f6..fa07c14d2e 100644 --- a/services/connectivity/messaging/src/main/java/org/eclipse/ditto/services/connectivity/messaging/ConnectionDeletedStrategies.java +++ b/services/connectivity/messaging/src/main/java/org/eclipse/ditto/services/connectivity/messaging/ConnectionDeletedStrategies.java @@ -31,9 +31,9 @@ import org.eclipse.ditto.model.base.headers.WithDittoHeaders; import org.eclipse.ditto.model.connectivity.Connection; -import org.eclipse.ditto.model.connectivity.ConnectionId; import org.eclipse.ditto.model.connectivity.ConnectivityStatus; import org.eclipse.ditto.services.connectivity.messaging.persistence.stages.ConnectionAction; +import org.eclipse.ditto.services.connectivity.messaging.persistence.stages.ConnectionState; import org.eclipse.ditto.services.connectivity.messaging.persistence.stages.StagedCommand; import org.eclipse.ditto.services.utils.persistentactors.commands.AbstractCommandStrategy; import org.eclipse.ditto.services.utils.persistentactors.commands.AbstractReceiveStrategy; @@ -51,7 +51,7 @@ // TODO public class ConnectionDeletedStrategies - extends AbstractReceiveStrategy> { + extends AbstractReceiveStrategy> { private static final ConnectionDeletedStrategies DELETED_STRATEGIES = newDeletedStrategies(); @@ -72,7 +72,7 @@ private static ConnectionDeletedStrategies newDeletedStrategies() { } @Override - public Result unhandled(final Context context, + public Result unhandled(final Context context, @Nullable final Connection entity, final long nextRevision, final Signal signal) { @@ -80,7 +80,7 @@ public Result unhandled(final Context context, if (signal instanceof ConnectivityCommand) { final ConnectivityCommand command = (ConnectivityCommand) signal; context.getLog().warning("Received command for deleted connection, rejecting: <{}>", command); - return ResultFactory.newErrorResult(ConnectionNotAccessibleException.newBuilder(context.getState()) + return ResultFactory.newErrorResult(ConnectionNotAccessibleException.newBuilder(context.getState().id()) .dittoHeaders(signal.getDittoHeaders()) .build()); } else { @@ -95,7 +95,7 @@ protected Result getEmptyResult() { } private abstract static class AbstractStrategy - extends AbstractCommandStrategy> { + extends AbstractCommandStrategy> { AbstractStrategy(final Class theMatchingClass) { super(theMatchingClass); @@ -114,7 +114,7 @@ private TestConnectionStrategy() { } @Override - protected Result doApply(final Context context, + protected Result doApply(final Context context, @Nullable final Connection entity, final long nextRevision, final TestConnection command) { if (entity != null) { final Connection connection = command.getConnection(); @@ -125,7 +125,7 @@ protected Result doApply(final Context context, return newMutationResult(stagedCommand, event, command); } else { return newQueryResult(command, - TestConnectionResponse.alreadyCreated(context.getState(), command.getDittoHeaders())); + TestConnectionResponse.alreadyCreated(context.getState().id(), command.getDittoHeaders())); } } } @@ -137,7 +137,7 @@ private CreateConnectionStrategy() { } @Override - protected Result doApply(final Context context, + protected Result doApply(final Context context, @Nullable final Connection entity, final long nextRevision, final CreateConnection command) { final Connection connection = command.getConnection().toBuilder().lifecycle(ACTIVE).build(); final ConnectivityEvent event = diff --git a/services/connectivity/messaging/src/main/java/org/eclipse/ditto/services/connectivity/messaging/persistence/stages/ConnectionState.java b/services/connectivity/messaging/src/main/java/org/eclipse/ditto/services/connectivity/messaging/persistence/stages/ConnectionState.java new file mode 100644 index 0000000000..6ed82bfe91 --- /dev/null +++ b/services/connectivity/messaging/src/main/java/org/eclipse/ditto/services/connectivity/messaging/persistence/stages/ConnectionState.java @@ -0,0 +1,66 @@ +/* + * Copyright (c) 2019 Contributors to the Eclipse Foundation + * + * See the NOTICE file(s) distributed with this work for additional + * information regarding copyright ownership. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0 + * + * SPDX-License-Identifier: EPL-2.0 + */ +package org.eclipse.ditto.services.connectivity.messaging.persistence.stages; + +import java.util.function.Consumer; + +import org.eclipse.ditto.model.connectivity.ConnectionId; +import org.eclipse.ditto.services.connectivity.messaging.monitoring.logs.ConnectionLogger; +import org.eclipse.ditto.signals.commands.connectivity.ConnectivityCommand; + +/** + * Everything needed by strategies. + */ +public final class ConnectionState { + + private final ConnectionId connectionId; + private final ConnectionLogger connectionLogger; + private final Consumer> validator; + + private ConnectionState(final ConnectionId connectionId, + final ConnectionLogger connectionLogger, final Consumer> validator) { + this.connectionId = connectionId; + this.connectionLogger = connectionLogger; + this.validator = validator; + } + + // TODO + public static ConnectionState of( + final ConnectionId connectionId, + final ConnectionLogger connectionLogger, + final Consumer> validator) { + + return new ConnectionState(connectionId, connectionLogger, validator); + } + + /** + * @return the connection ID. + */ + public ConnectionId id() { + return connectionId; + } + + /** + * @return the public logger. + */ + public ConnectionLogger getConnectionLogger() { + return connectionLogger; + } + + /** + * @return the command validator. + */ + public Consumer> getValidator() { + return validator; + } +} diff --git a/services/policies/persistence/src/main/java/org/eclipse/ditto/services/policies/persistence/actors/policy/PolicyPersistenceActor.java b/services/policies/persistence/src/main/java/org/eclipse/ditto/services/policies/persistence/actors/policy/PolicyPersistenceActor.java index fe39f0cb8c..7f0585c221 100755 --- a/services/policies/persistence/src/main/java/org/eclipse/ditto/services/policies/persistence/actors/policy/PolicyPersistenceActor.java +++ b/services/policies/persistence/src/main/java/org/eclipse/ditto/services/policies/persistence/actors/policy/PolicyPersistenceActor.java @@ -27,6 +27,7 @@ import org.eclipse.ditto.services.utils.persistence.mongo.config.SnapshotConfig; import org.eclipse.ditto.services.utils.persistentactors.AbstractShardedPersistenceActor; import org.eclipse.ditto.services.utils.persistentactors.commands.CommandStrategy; +import org.eclipse.ditto.services.utils.persistentactors.commands.DefaultContext; import org.eclipse.ditto.services.utils.persistentactors.events.EventStrategy; import org.eclipse.ditto.services.utils.persistentactors.results.Result; import org.eclipse.ditto.signals.commands.base.Command; @@ -42,7 +43,7 @@ * PersistentActor which "knows" the state of a single {@link Policy}. */ public final class PolicyPersistenceActor - extends AbstractShardedPersistenceActor { + extends AbstractShardedPersistenceActor { /** * The prefix of the persistenceId for Policies. @@ -121,6 +122,11 @@ protected Class getEventClass() { return PolicyEvent.class; } + @Override + protected CommandStrategy.Context getStrategyContext() { + return DefaultContext.getInstance(entityId, log); + } + @Override protected PolicyCommandStrategies getCreatedStrategy() { return PolicyCommandStrategies.getInstance(); diff --git a/services/things/persistence/src/main/java/org/eclipse/ditto/services/things/persistence/actors/ThingPersistenceActor.java b/services/things/persistence/src/main/java/org/eclipse/ditto/services/things/persistence/actors/ThingPersistenceActor.java index 2adc219bd1..93d1701c47 100755 --- a/services/things/persistence/src/main/java/org/eclipse/ditto/services/things/persistence/actors/ThingPersistenceActor.java +++ b/services/things/persistence/src/main/java/org/eclipse/ditto/services/things/persistence/actors/ThingPersistenceActor.java @@ -30,6 +30,8 @@ import org.eclipse.ditto.services.utils.persistence.mongo.config.ActivityCheckConfig; import org.eclipse.ditto.services.utils.persistence.mongo.config.SnapshotConfig; import org.eclipse.ditto.services.utils.persistentactors.AbstractShardedPersistenceActor; +import org.eclipse.ditto.services.utils.persistentactors.commands.CommandStrategy; +import org.eclipse.ditto.services.utils.persistentactors.commands.DefaultContext; import org.eclipse.ditto.services.utils.persistentactors.events.EventStrategy; import org.eclipse.ditto.services.utils.pubsub.DistributedPub; import org.eclipse.ditto.signals.commands.base.Command; @@ -44,7 +46,7 @@ * PersistentActor which "knows" the state of a single {@link Thing}. */ public final class ThingPersistenceActor - extends AbstractShardedPersistenceActor { + extends AbstractShardedPersistenceActor { /** * The prefix of the persistenceId for Things. @@ -121,6 +123,11 @@ protected Class getEventClass() { return ThingEvent.class; } + @Override + protected CommandStrategy.Context getStrategyContext() { + return DefaultContext.getInstance(entityId, log); + } + @Override protected ThingReceiveStrategy getCreatedStrategy() { return ThingReceiveStrategy.getInstance(); diff --git a/services/utils/persistent-actors/src/main/java/org/eclipse/ditto/services/utils/persistentactors/AbstractShardedPersistenceActor.java b/services/utils/persistent-actors/src/main/java/org/eclipse/ditto/services/utils/persistentactors/AbstractShardedPersistenceActor.java index 6d1241d4df..5d204cc60f 100755 --- a/services/utils/persistent-actors/src/main/java/org/eclipse/ditto/services/utils/persistentactors/AbstractShardedPersistenceActor.java +++ b/services/utils/persistent-actors/src/main/java/org/eclipse/ditto/services/utils/persistentactors/AbstractShardedPersistenceActor.java @@ -28,7 +28,6 @@ import org.eclipse.ditto.services.utils.persistence.mongo.config.ActivityCheckConfig; import org.eclipse.ditto.services.utils.persistence.mongo.config.SnapshotConfig; import org.eclipse.ditto.services.utils.persistentactors.commands.CommandStrategy; -import org.eclipse.ditto.services.utils.persistentactors.commands.DefaultContext; import org.eclipse.ditto.services.utils.persistentactors.events.EventStrategy; import org.eclipse.ditto.services.utils.persistentactors.results.Result; import org.eclipse.ditto.services.utils.persistentactors.results.ResultVisitor; @@ -49,7 +48,7 @@ /** * PersistentActor which "knows" the state of a single entity. */ -public abstract class AbstractShardedPersistenceActor +public abstract class AbstractShardedPersistenceActor extends AbstractPersistentActorWithTimersAndCleanup implements ResultVisitor { @@ -61,12 +60,6 @@ public abstract class AbstractShardedPersistenceActor defaultContext; - @Nullable protected S entity; protected final I entityId; @@ -83,8 +76,6 @@ protected AbstractShardedPersistenceActor(final I entityId, final SnapshotAdapte lastSnapshotRevision = 0L; confirmedSnapshotRevision = 0L; - defaultContext = DefaultContext.getInstance(entityId, log); - handleEvents = ReceiveBuilder.create() .match(getEventClass(), event -> entity = getEventStrategy().handle(event, entity, getRevisionNumber())) .build(); @@ -103,9 +94,11 @@ protected AbstractShardedPersistenceActor(final I entityId, final SnapshotAdapte protected abstract Class getEventClass(); - protected abstract CommandStrategy> getCreatedStrategy(); + protected abstract CommandStrategy.Context getStrategyContext(); + + protected abstract CommandStrategy> getCreatedStrategy(); - protected abstract CommandStrategy> getDeletedStrategy(); + protected abstract CommandStrategy> getDeletedStrategy(); protected abstract EventStrategy getEventStrategy(); @@ -179,7 +172,7 @@ public Receive createReceiveRecover() { * be activated. In return the strategy for the CreateThing command is not needed anymore. */ protected void becomeCreatedHandler() { - final CommandStrategy> commandStrategy = getCreatedStrategy(); + final CommandStrategy> commandStrategy = getCreatedStrategy(); final Receive receive = handleCleanups.orElse(ReceiveBuilder.create() .match(commandStrategy.getMatchingClass(), commandStrategy::isDefined, this::handleByCommandStrategy) @@ -215,8 +208,8 @@ protected void passivate() { } private Receive createDeletedBehavior() { - final CommandStrategy> createStrategy = getDeletedStrategy(); - return handleCleanups.orElse(handleByStrategyReceiveBuilder(createStrategy) + final CommandStrategy> deleteStrategy = getDeletedStrategy(); + return handleCleanups.orElse(handleByStrategyReceiveBuilder(deleteStrategy) .match(CheckForActivity.class, this::checkForActivity) .matchEquals(Control.TAKE_SNAPSHOT, this::takeSnapshotByInterval) .match(SaveSnapshotSuccess.class, this::saveSnapshotSuccess) @@ -247,17 +240,17 @@ private void handleByCommandStrategy(final C command) { handleByStrategy(command, getCreatedStrategy()); } - private ReceiveBuilder handleByStrategyReceiveBuilder(final CommandStrategy> strategy) { + private ReceiveBuilder handleByStrategyReceiveBuilder(final CommandStrategy> strategy) { return ReceiveBuilder.create() .match(strategy.getMatchingClass(), command -> handleByStrategy(command, strategy)); } - private void handleByStrategy(final T command, final CommandStrategy> strategy) { + private void handleByStrategy(final T command, final CommandStrategy> strategy) { log.debug("Handling by strategy: <{}>", command); accessCounter++; final Result result; try { - result = strategy.apply(defaultContext, entity, getNextRevisionNumber(), command); + result = strategy.apply(getStrategyContext(), entity, getNextRevisionNumber(), command); } catch (final DittoRuntimeException e) { getSender().tell(e, getSelf()); return;