Skip to content

Commit

Permalink
[eclipse-ditto#926] improve type safety of AbstractCommandStrategies.
Browse files Browse the repository at this point in the history
Signed-off-by: Yufei Cai <yufei.cai@bosch.io>
  • Loading branch information
yufei-cai committed Jan 18, 2021
1 parent 1f02f79 commit cfa9145
Show file tree
Hide file tree
Showing 18 changed files with 105 additions and 81 deletions.
Expand Up @@ -24,7 +24,6 @@
import org.eclipse.ditto.model.connectivity.ConnectionId;
import org.eclipse.ditto.services.connectivity.messaging.persistence.stages.ConnectionState;
import org.eclipse.ditto.services.utils.persistentactors.commands.AbstractCommandStrategy;
import org.eclipse.ditto.services.utils.persistentactors.results.Result;
import org.eclipse.ditto.signals.commands.connectivity.ConnectivityCommand;
import org.eclipse.ditto.signals.commands.connectivity.exceptions.ConnectionFailedException;
import org.eclipse.ditto.signals.commands.connectivity.exceptions.ConnectionNotAccessibleException;
Expand All @@ -36,9 +35,9 @@
* @param <C> the type of the handled command
*/
abstract class AbstractConnectivityCommandStrategy<C extends ConnectivityCommand<?>>
extends AbstractCommandStrategy<C, Connection, ConnectionState, Result<ConnectivityEvent<?>>> {
extends AbstractCommandStrategy<C, Connection, ConnectionState, ConnectivityEvent<?>> {

AbstractConnectivityCommandStrategy(final Class<C> theMatchingClass) {
AbstractConnectivityCommandStrategy(final Class<?> theMatchingClass) {
super(theMatchingClass);
}

Expand Down
Expand Up @@ -28,7 +28,7 @@
*/
public class ConnectionCreatedStrategies
extends
AbstractCommandStrategies<ConnectivityCommand<?>, Connection, ConnectionState, Result<ConnectivityEvent<?>>> {
AbstractCommandStrategies<ConnectivityCommand<?>, Connection, ConnectionState, ConnectivityEvent<?>> {

private static final ConnectionCreatedStrategies CREATED_STRATEGIES = newCreatedStrategies();

Expand Down
Expand Up @@ -28,7 +28,7 @@
*/
public class ConnectionDeletedStrategies
extends
AbstractCommandStrategies<ConnectivityCommand<?>, Connection, ConnectionState, Result<ConnectivityEvent<?>>> {
AbstractCommandStrategies<ConnectivityCommand<?>, Connection, ConnectionState, ConnectivityEvent<?>> {

private static final ConnectionDeletedStrategies DELETED_STRATEGIES = newDeletedStrategies();

Expand Down
Expand Up @@ -48,7 +48,6 @@
import org.eclipse.ditto.services.utils.persistentactors.commands.CommandStrategy;
import org.eclipse.ditto.services.utils.persistentactors.commands.DefaultContext;
import org.eclipse.ditto.services.utils.persistentactors.events.EventStrategy;
import org.eclipse.ditto.services.utils.persistentactors.results.Result;
import org.eclipse.ditto.signals.commands.base.Command;
import org.eclipse.ditto.signals.commands.policies.exceptions.PolicyNotAccessibleException;
import org.eclipse.ditto.signals.events.policies.PolicyEvent;
Expand Down Expand Up @@ -143,7 +142,7 @@ protected PolicyCommandStrategies getCreatedStrategy() {
}

@Override
protected CommandStrategy<? extends Command<?>, Policy, PolicyId, Result<PolicyEvent<?>>> getDeletedStrategy() {
protected CommandStrategy<? extends Command<?>, Policy, PolicyId, PolicyEvent<?>> getDeletedStrategy() {
return PolicyCommandStrategies.getCreatePolicyStrategy(policyConfig);
}

Expand Down Expand Up @@ -227,7 +226,7 @@ private void scheduleNextSubjectExpiryCheck() {
final Duration scheduleTimeout = durationBetweenNowAndEarliestExpiry.compareTo(oneDay) < 0 ?
durationBetweenNowAndEarliestExpiry : oneDay;
log.info("Scheduling message for deleting next expired subject in: <{}> - " +
"earliest expiry is at: <{}>", scheduleTimeout, earliestExpiry);
"earliest expiry is at: <{}>", scheduleTimeout, earliestExpiry);
timers().startSingleTimer(NEXT_SUBJECT_EXPIRY_TIMER, DeleteOldestExpiredSubject.INSTANCE,
scheduleTimeout);
}
Expand All @@ -238,11 +237,11 @@ private void handleDeleteExpiredSubjects() {
log.debug("Calculating whether subjects did expire and need to be deleted..");
calculateSubjectDeletedEventOfOldestExpiredSubject(entityId, entity)
.ifPresentOrElse(subjectDeleted ->
persistAndApplyEvent(subjectDeleted, (persistedEvent, resultingEntity) ->
log.withCorrelationId(persistedEvent)
.info("Deleted expired subject <{}> of label <{}>",
subjectDeleted.getSubjectId(), subjectDeleted.getLabel())
),
persistAndApplyEvent(subjectDeleted, (persistedEvent, resultingEntity) ->
log.withCorrelationId(persistedEvent)
.info("Deleted expired subject <{}> of label <{}>",
subjectDeleted.getSubjectId(), subjectDeleted.getLabel())
),
this::scheduleNextSubjectExpiryCheck
);
}
Expand Down Expand Up @@ -306,6 +305,7 @@ private static Optional<SubjectExpiry> findEarliestSubjectExpiryTimestamp(
}

private static final class DeleteOldestExpiredSubject {

private static final DeleteOldestExpiredSubject INSTANCE = new DeleteOldestExpiredSubject();
}
}
Expand Up @@ -31,7 +31,7 @@
* Command strategies of {@code PolicyPersistenceActor}.
*/
public final class PolicyCommandStrategies
extends AbstractCommandStrategies<Command<?>, Policy, PolicyId, Result<PolicyEvent<?>>> {
extends AbstractCommandStrategies<Command<?>, Policy, PolicyId, PolicyEvent<?>> {

@Nullable private static volatile PolicyCommandStrategies instance;
@Nullable private static volatile CreatePolicyStrategy createPolicyStrategy;
Expand All @@ -54,8 +54,8 @@ private PolicyCommandStrategies(final PolicyConfig policyConfig, final ActorSyst
addStrategy(new ModifyPolicyEntryStrategy(policyConfig));
addStrategy(new RetrievePolicyEntryStrategy(policyConfig));
addStrategy(new DeletePolicyEntryStrategy(policyConfig));
addStrategy(this, new ActivateTokenIntegrationStrategy(policyConfig, system));
addStrategy(this, new DeactivateTokenIntegrationStrategy(policyConfig, system));
addStrategy(new ActivateTokenIntegrationStrategy(policyConfig, system));
addStrategy(new DeactivateTokenIntegrationStrategy(policyConfig, system));

// Subjects
addStrategy(new ModifySubjectsStrategy(policyConfig));
Expand Down Expand Up @@ -99,7 +99,7 @@ public static PolicyCommandStrategies getInstance(final PolicyConfig policyConfi
* @param policyConfig the PolicyConfig of the Policy service to apply.
* @return command strategy to create a policy.
*/
public static CommandStrategy<CreatePolicy, Policy, PolicyId, Result<PolicyEvent<?>>> getCreatePolicyStrategy(
public static CommandStrategy<CreatePolicy, Policy, PolicyId, PolicyEvent<?>> getCreatePolicyStrategy(
final PolicyConfig policyConfig) {
CreatePolicyStrategy localCreatePolicyStrategy = createPolicyStrategy;
if (null == localCreatePolicyStrategy) {
Expand Down
Expand Up @@ -32,7 +32,6 @@
import org.eclipse.ditto.services.utils.persistentactors.commands.CommandStrategy;
import org.eclipse.ditto.services.utils.persistentactors.commands.DefaultContext;
import org.eclipse.ditto.services.utils.persistentactors.events.EventStrategy;
import org.eclipse.ditto.services.utils.persistentactors.results.Result;
import org.eclipse.ditto.services.utils.pubsub.DistributedPub;
import org.eclipse.ditto.services.utils.pubsub.extractors.AckExtractor;
import org.eclipse.ditto.signals.commands.base.Command;
Expand Down Expand Up @@ -138,7 +137,7 @@ protected ThingCommandStrategies getCreatedStrategy() {
}

@Override
protected CommandStrategy<CreateThing, Thing, ThingId, Result<ThingEvent<?>>> getDeletedStrategy() {
protected CommandStrategy<CreateThing, Thing, ThingId, ThingEvent<?>> getDeletedStrategy() {
return ThingCommandStrategies.getCreateThingStrategy();
}

Expand Down
Expand Up @@ -26,7 +26,7 @@
* The collection of the command strategies of {@code ThingPersistenceActor}.
*/
public final class ThingCommandStrategies
extends AbstractCommandStrategies<Command<?>, Thing, ThingId, Result<ThingEvent<?>>> {
extends AbstractCommandStrategies<Command<?>, Thing, ThingId, ThingEvent<?>> {

private static final ThingCommandStrategies INSTANCE = new ThingCommandStrategies();

Expand Down Expand Up @@ -61,7 +61,7 @@ public static ThingCommandStrategies getInstance() {
*
* @return the instance.
*/
public static CommandStrategy<CreateThing, Thing, ThingId, Result<ThingEvent<?>>> getCreateThingStrategy() {
public static CommandStrategy<CreateThing, Thing, ThingId, ThingEvent<?>> getCreateThingStrategy() {
return CreateThingStrategy.getInstance();
}

Expand Down
Expand Up @@ -67,7 +67,7 @@ protected static CommandStrategy.Context<ThingId> getDefaultContext() {
}

protected static <C extends Command<?>, T extends ThingModifiedEvent<?>> T assertModificationResult(
final CommandStrategy<C, Thing, ThingId, Result<ThingEvent<?>>> underTest,
final CommandStrategy<C, Thing, ThingId, ThingEvent<?>> underTest,
@Nullable final Thing thing,
final C command,
final Class<T> expectedEventClass,
Expand All @@ -77,7 +77,7 @@ protected static <C extends Command<?>, T extends ThingModifiedEvent<?>> T asser
}

protected static <C extends Command<?>, T extends ThingModifiedEvent<?>> T assertModificationResult(
final CommandStrategy<C, Thing, ThingId, Result<ThingEvent<?>>> underTest,
final CommandStrategy<C, Thing, ThingId, ThingEvent<?>> underTest,
@Nullable final Thing thing,
final C command,
final Class<T> expectedEventClass,
Expand All @@ -91,7 +91,7 @@ protected static <C extends Command<?>, T extends ThingModifiedEvent<?>> T asser
}

protected static <C extends Command<?>> void assertErrorResult(
final CommandStrategy<C, Thing, ThingId, Result<ThingEvent<?>>> underTest,
final CommandStrategy<C, Thing, ThingId, ThingEvent<?>> underTest,
@Nullable final Thing thing,
final C command,
final DittoRuntimeException expectedException) {
Expand All @@ -102,7 +102,7 @@ protected static <C extends Command<?>> void assertErrorResult(
}

protected static <C extends Command<?>> void assertQueryResult(
final CommandStrategy<C, Thing, ThingId, Result<ThingEvent<?>>> underTest,
final CommandStrategy<C, Thing, ThingId, ThingEvent<?>> underTest,
@Nullable final Thing thing,
final C command,
final CommandResponse<?> expectedCommandResponse) {
Expand All @@ -112,7 +112,7 @@ protected static <C extends Command<?>> void assertQueryResult(


protected static <C extends Command<?>> void assertUnhandledResult(
final AbstractCommandStrategy<C, Thing, ThingId, Result<ThingEvent<?>>> underTest,
final AbstractCommandStrategy<C, Thing, ThingId, ThingEvent<?>> underTest,
@Nullable final Thing thing,
final C command,
final DittoRuntimeException expectedResponse) {
Expand Down Expand Up @@ -145,7 +145,7 @@ private static void assertInfoResult(final Result<ThingEvent<?>> result, final W
}

private static <C extends Command<?>> Result<ThingEvent<?>> applyStrategy(
final CommandStrategy<C, Thing, ThingId, Result<ThingEvent<?>>> underTest,
final CommandStrategy<C, Thing, ThingId, ThingEvent<?>> underTest,
final CommandStrategy.Context<ThingId> context,
final @Nullable Thing thing,
final C command) {
Expand Down
Expand Up @@ -136,12 +136,12 @@ protected void onEntityModified() {
/**
* @return strategies to handle commands when the entity exists.
*/
protected abstract CommandStrategy<C, S, K, Result<E>> getCreatedStrategy();
protected abstract CommandStrategy<C, S, K, E> getCreatedStrategy();

/**
* @return strategies to handle commands when the entity does not exist.
*/
protected abstract CommandStrategy<? extends C, S, K, Result<E>> getDeletedStrategy();
protected abstract CommandStrategy<? extends C, S, K, E> getDeletedStrategy();

/**
* @return strategies to modify the entity by events.
Expand Down Expand Up @@ -253,7 +253,7 @@ public Receive createReceiveRecover() {
* Start handling messages for an existing entity and schedule maintenance messages to self.
*/
protected void becomeCreatedHandler() {
final CommandStrategy<C, S, K, Result<E>> commandStrategy = getCreatedStrategy();
final CommandStrategy<C, S, K, E> commandStrategy = getCreatedStrategy();

final Receive receive = handleCleanups.orElse(ReceiveBuilder.create()
.match(commandStrategy.getMatchingClass(), commandStrategy::isDefined, this::handleByCommandStrategy)
Expand Down Expand Up @@ -346,7 +346,7 @@ protected void passivate() {
}

private Receive createDeletedBehavior() {
final CommandStrategy<? extends C, S, K, Result<E>> deleteStrategy = getDeletedStrategy();
final CommandStrategy<? extends C, S, K, E> deleteStrategy = getDeletedStrategy();
return handleCleanups.orElse(handleByStrategyReceiveBuilder(deleteStrategy)
.match(CheckForActivity.class, this::checkForActivity)
.matchEquals(Control.TAKE_SNAPSHOT, this::takeSnapshotByInterval)
Expand Down Expand Up @@ -384,13 +384,13 @@ private void handleByCommandStrategy(final C command) {
}

private <T extends Command<?>> ReceiveBuilder handleByStrategyReceiveBuilder(
final CommandStrategy<T, S, K, Result<E>> strategy) {
final CommandStrategy<T, S, K, E> strategy) {
return ReceiveBuilder.create()
.match(strategy.getMatchingClass(), command -> handleByStrategy(command, strategy));
}

private <T extends Command<?>> void handleByStrategy(final T command,
final CommandStrategy<T, S, K, Result<E>> strategy) {
final CommandStrategy<T, S, K, E> strategy) {
log.debug("Handling by strategy: <{}>", command);
accessCounter++;
Result<E> result;
Expand Down
Expand Up @@ -30,43 +30,43 @@
* @param <C> the type of the handled command
* @param <S> the type of the managed entity
* @param <K> the type of the context
* @param <R> the type of the results
* @param <E> the type of the result's event
*/
@Immutable
public abstract class AbstractCommandStrategies<C extends Command<?>, S, K, R extends Result<?>>
extends AbstractCommandStrategy<C, S, K, R> {
public abstract class AbstractCommandStrategies<C extends Command<?>, S, K, E extends Event<?>>
extends AbstractCommandStrategy<C, S, K, E> {

protected final Map<Class<? extends C>, CommandStrategy<? extends C, S, K, ? extends R>> strategies;
protected final Map<Class<? extends C>, CommandStrategy<? extends C, S, K, ? extends E>> strategies;

/**
* Constructs a new {@code AbstractCommandStrategy} object.
*
* @param theMatchingClass the class
* @throws NullPointerException if {@code theMatchingClass} is {@code null}.
*/
@SuppressWarnings({"unchecked", "rawtypes"})
protected AbstractCommandStrategies(final Class theMatchingClass) {
super((Class<C>) theMatchingClass);
protected AbstractCommandStrategies(final Class<?> theMatchingClass) {
super(theMatchingClass);
strategies = new HashMap<>();
}

/**
* @return the empty result.
*/
protected abstract R getEmptyResult();
protected abstract Result<E> getEmptyResult();

/**
* Add a command strategy. Call in constructor only.
*
* @param strategy the strategy.
*/
protected void addStrategy(final CommandStrategy<? extends C, S, K, ? extends R> strategy) {
protected void addStrategy(final CommandStrategy<? extends C, S, K, ? extends E> strategy) {
final Class<? extends C> matchingClass = strategy.getMatchingClass();
strategies.put(matchingClass, strategy);
}

@Override
public R unhandled(final Context<K> context, @Nullable final S entity, final long nextRevision, final C command) {
public Result<E> unhandled(final Context<K> context, @Nullable final S entity, final long nextRevision,
final C command) {
context.getLog().withCorrelationId(command)
.info("Command <{}> cannot be handled by this strategy.", command);
return getEmptyResult();
Expand All @@ -88,15 +88,17 @@ protected Optional<Metadata> calculateRelativeMetadata(@Nullable final S entity,
}

@Override
protected R doApply(final Context<K> context, @Nullable final S entity, final long nextRevision, final C command,
protected Result<E> doApply(final Context<K> context, @Nullable final S entity, final long nextRevision,
final C command,
@Nullable final Metadata metadata) {

final CommandStrategy<C, S, K, R> commandStrategy = getAppropriateStrategy(command.getClass());
final CommandStrategy<C, S, K, ? extends E> commandStrategy =
getAppropriateStrategy(command.getClass());

if (commandStrategy != null) {
context.getLog().withCorrelationId(command)
.debug("Applying command <{}>", command);
return commandStrategy.apply(context, entity, nextRevision, command);
return commandStrategy.apply(context, entity, nextRevision, command).map(x -> x);
} else {
// this may happen when subclasses override the "isDefined" condition.
return unhandled(context, entity, nextRevision, command);
Expand All @@ -105,27 +107,8 @@ protected R doApply(final Context<K> context, @Nullable final S entity, final lo

@Nullable
@SuppressWarnings("unchecked")
private CommandStrategy<C, S, K, R> getAppropriateStrategy(final Class<?> commandClass) {
return (CommandStrategy<C, S, K, R>) strategies.get(commandClass);
}

/**
* Add command strategy in a covariant way.
* TODO: Replace Result by event type; hard code the result type, then delete this method.
*
* @param strategies the command strategies.
* @param strategy the strategy to add.
* @param <E> type of events.
* @param <C> type of commands.
* @param <S> type of entities.
* @param <K> type of contexts.
*/
@SuppressWarnings({"unchecked", "rawtypes"})
protected static <E extends Event<?>, F extends E, C extends Command<?>, S, K> void addStrategy(
final AbstractCommandStrategies<C, S, K, Result<E>> strategies,
CommandStrategy<? extends C, S, K, Result<F>> strategy) {

strategies.strategies.put(strategy.getMatchingClass(), (CommandStrategy) strategy);
private CommandStrategy<C, S, K, ? extends E> getAppropriateStrategy(final Class<?> commandClass) {
return (CommandStrategy<C, S, K, ? extends E>) strategies.get(commandClass);
}

}

0 comments on commit cfa9145

Please sign in to comment.