Skip to content

Commit

Permalink
[wip] move command strategies to separate package
Browse files Browse the repository at this point in the history
Signed-off-by: Dominik Guggemos <dominik.guggemos@bosch-si.com>
  • Loading branch information
dguggemos committed Jul 5, 2018
1 parent 7bdc025 commit ca04889
Show file tree
Hide file tree
Showing 9 changed files with 232 additions and 86 deletions.
Expand Up @@ -18,6 +18,8 @@
import javax.annotation.Nullable;
import javax.annotation.concurrent.NotThreadSafe;

import org.eclipse.ditto.services.things.persistence.actors.strategies.ReceiveStrategy;

import akka.actor.AbstractActor;
import akka.japi.pf.PFBuilder;
import akka.japi.pf.ReceiveBuilder;
Expand Down
Expand Up @@ -57,14 +57,17 @@
import org.eclipse.ditto.services.models.things.ThingsMessagingConstants;
import org.eclipse.ditto.services.models.things.commands.sudo.SudoRetrieveThing;
import org.eclipse.ditto.services.models.things.commands.sudo.SudoRetrieveThingResponse;
import org.eclipse.ditto.services.things.persistence.actors.strategies.AbstractReceiveStrategy;
import org.eclipse.ditto.services.things.persistence.actors.strategies.AbstractThingCommandStrategy;
import org.eclipse.ditto.services.things.persistence.actors.strategies.ModifyAttributesStrategy;
import org.eclipse.ditto.services.things.persistence.actors.strategies.ReceiveStrategy;
import org.eclipse.ditto.services.things.persistence.snapshotting.DittoThingSnapshotter;
import org.eclipse.ditto.services.things.persistence.snapshotting.ThingSnapshotter;
import org.eclipse.ditto.services.things.starter.util.ConfigKeys;
import org.eclipse.ditto.services.utils.akka.LogUtil;
import org.eclipse.ditto.signals.base.WithThingId;
import org.eclipse.ditto.signals.base.WithType;
import org.eclipse.ditto.signals.commands.base.Command;
import org.eclipse.ditto.signals.commands.things.ThingCommand;
import org.eclipse.ditto.signals.commands.things.exceptions.AclModificationInvalidException;
import org.eclipse.ditto.signals.commands.things.exceptions.AclNotAccessibleException;
import org.eclipse.ditto.signals.commands.things.exceptions.AttributeNotAccessibleException;
Expand Down Expand Up @@ -104,7 +107,6 @@
import org.eclipse.ditto.signals.commands.things.modify.ModifyAttribute;
import org.eclipse.ditto.signals.commands.things.modify.ModifyAttributeResponse;
import org.eclipse.ditto.signals.commands.things.modify.ModifyAttributes;
import org.eclipse.ditto.signals.commands.things.modify.ModifyAttributesResponse;
import org.eclipse.ditto.signals.commands.things.modify.ModifyFeature;
import org.eclipse.ditto.signals.commands.things.modify.ModifyFeatureDefinition;
import org.eclipse.ditto.signals.commands.things.modify.ModifyFeatureDefinitionResponse;
Expand Down Expand Up @@ -149,9 +151,7 @@
import org.eclipse.ditto.signals.events.things.AttributeCreated;
import org.eclipse.ditto.signals.events.things.AttributeDeleted;
import org.eclipse.ditto.signals.events.things.AttributeModified;
import org.eclipse.ditto.signals.events.things.AttributesCreated;
import org.eclipse.ditto.signals.events.things.AttributesDeleted;
import org.eclipse.ditto.signals.events.things.AttributesModified;
import org.eclipse.ditto.signals.events.things.FeatureCreated;
import org.eclipse.ditto.signals.events.things.FeatureDefinitionCreated;
import org.eclipse.ditto.signals.events.things.FeatureDefinitionDeleted;
Expand Down Expand Up @@ -737,8 +737,8 @@ public String toString() {

private final class LazyStrategyLoader {

final Map<Class, ReceiveStrategy> strategies = new HashMap<>();
final Map<Class, Supplier<ReceiveStrategy>> supplier = new HashMap<>();
final Map<Class, ReceiveStrategy> strategies = new HashMap<>();
final ReceiveStrategy unhandledStrategy = new MatchAnyAfterInitializeStrategy();

private LazyStrategyLoader() {
Expand Down Expand Up @@ -1532,40 +1532,6 @@ protected void doApply(final RetrieveAclEntry command) {

}

/**
* This strategy handles the {@link ModifyAttributes} command.
*/
@NotThreadSafe
private final class ModifyAttributesStrategy extends AbstractThingCommandStrategy<ModifyAttributes> {

/**
* Constructs a new {@code ModifyAttributesStrategy} object.
*/
public ModifyAttributesStrategy() {
super(ModifyAttributes.class, log);
}

@Override
protected void doApply(final ModifyAttributes command) {
final DittoHeaders dittoHeaders = command.getDittoHeaders();
final ThingModifiedEvent eventToPersist;
final ThingModifyCommandResponse response;

if (thing().getAttributes().isPresent()) {
eventToPersist = AttributesModified.of(thingId, command.getAttributes(), nextRevision(),
eventTimestamp(), dittoHeaders);
response = ModifyAttributesResponse.modified(thingId, dittoHeaders);
} else {
eventToPersist = AttributesCreated.of(thingId, command.getAttributes(), nextRevision(),
eventTimestamp(), dittoHeaders);
response = ModifyAttributesResponse.created(thingId, command.getAttributes(), dittoHeaders);
}

persistAndApplyEvent(eventToPersist, event -> notifySender(response));
}

}

/**
* This strategy handles the {@link ModifyAttribute} command.
*/
Expand Down Expand Up @@ -2261,7 +2227,7 @@ public RetrieveFeaturePropertiesStrategy() {
}

@Override
protected void doApply(final RetrieveFeatureProperties command) {
protected Result doApply(final RetrieveFeatureProperties command) {
final Optional<Features> optionalFeatures = thing().getFeatures();

final String featureId = command.getFeatureId();
Expand Down Expand Up @@ -2448,41 +2414,6 @@ private void shutdown(final String shutdownLogTemplate, final String thingId) {

}

/**
* This extension of {@link AbstractReceiveStrategy} is for handling {@link ThingCommand}.
*
* @param <T> type of the class this strategy matches against.
*/
@NotThreadSafe
abstract class AbstractThingCommandStrategy<T extends Command> extends AbstractReceiveStrategy<T> {

/**
* Constructs a new {@code AbstractThingCommandStrategy} object.
*
* @param theMatchingClass the class of the message this strategy reacts to.
* @param theLogger the logger to use for logging.
* @throws NullPointerException if {@code theMatchingClass} is {@code null}.
*/
AbstractThingCommandStrategy(final Class<T> theMatchingClass, final DiagnosticLoggingAdapter theLogger) {
super(theMatchingClass, theLogger);
}

@Override
public FI.TypedPredicate<T> getPredicate() {
return command -> null != thing() && thing().getId()
.filter(command.getId()::equals)
.isPresent();
}

@Override
public FI.UnitApply<T> getUnhandledFunction() {
return command -> {
throw new IllegalArgumentException(MessageFormat.format(UNHANDLED_MESSAGE_TEMPLATE, command.getId()));
};
}

}

/**
* This consumer logs the correlation ID, the thing ID as well as the type of any incoming message.
*/
Expand Down
Expand Up @@ -24,6 +24,8 @@
import javax.annotation.concurrent.NotThreadSafe;

import org.eclipse.ditto.model.base.headers.WithDittoHeaders;
import org.eclipse.ditto.services.things.persistence.actors.strategies.AbstractReceiveStrategy;
import org.eclipse.ditto.services.things.persistence.actors.strategies.ReceiveStrategy;
import org.eclipse.ditto.services.utils.akka.LogUtil;
import org.eclipse.ditto.signals.commands.things.exceptions.ThingUnavailableException;

Expand Down
Expand Up @@ -5,14 +5,16 @@
* are made available under the terms of the Eclipse Public License v2.0
* which accompanies this distribution, and is available at
* https://www.eclipse.org/org/documents/epl-2.0/index.php
*
* Contributors:
* Bosch Software Innovations GmbH - initial contribution
*
*/
package org.eclipse.ditto.services.things.persistence.actors;
package org.eclipse.ditto.services.things.persistence.actors.strategies;

import static java.util.Objects.requireNonNull;

import java.util.function.BiFunction;

import javax.annotation.concurrent.NotThreadSafe;

import org.eclipse.ditto.services.utils.akka.LogUtil;
Expand Down Expand Up @@ -47,18 +49,18 @@ protected AbstractReceiveStrategy(final Class<T> theMatchingClass, final Diagnos
logger = requireNonNull(theLogger, "The logger must not be null!");
}

protected void preApply(final T message) {
protected Result preApply(final Context context, final T message) {
if (message instanceof Command) {
final Command command = (Command) message;
LogUtil.enhanceLogWithCorrelationId(logger, command.getDittoHeaders().getCorrelationId());
if (logger.isDebugEnabled()) {
logger.debug("Applying command '{}': {}", command.getType(), command.toJsonString());
}
}
doApply(message);
return doApply(context, message);
}

protected abstract void doApply(T message);
protected abstract Result doApply(final Context context, T message);

@Override
public Class<T> getMatchingClass() {
Expand All @@ -71,7 +73,7 @@ public FI.TypedPredicate<T> getPredicate() {
}

@Override
public FI.UnitApply<T> getApplyFunction() {
public BiFunction<Context, T, Result> getApplyFunction() {
return this::preApply;
}

Expand Down
@@ -0,0 +1,64 @@
/*
* Copyright (c) 2017 Bosch Software Innovations GmbH.
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v2.0
* which accompanies this distribution, and is available at
* https://www.eclipse.org/org/documents/epl-2.0/index.php
* Contributors:
* Bosch Software Innovations GmbH - initial contribution
*
*/
package org.eclipse.ditto.services.things.persistence.actors.strategies;

import java.text.MessageFormat;
import java.time.Instant;

import javax.annotation.concurrent.NotThreadSafe;

import org.eclipse.ditto.services.things.persistence.actors.ThingPersistenceActor;
import org.eclipse.ditto.signals.commands.base.Command;
import org.eclipse.ditto.signals.commands.things.ThingCommand;

import akka.event.DiagnosticLoggingAdapter;
import akka.japi.pf.FI;

/**
* This extension of {@link AbstractReceiveStrategy} is for handling {@link ThingCommand}.
*
* @param <T> type of the class this strategy matches against.
*/
@NotThreadSafe
public abstract class AbstractThingCommandStrategy<T extends Command> extends AbstractReceiveStrategy<T> {

/**
* Constructs a new {@code AbstractThingCommandStrategy} object.
*
* @param theMatchingClass the class of the message this strategy reacts to.
* @param theLogger the logger to use for logging.
* @throws NullPointerException if {@code theMatchingClass} is {@code null}.
*/
AbstractThingCommandStrategy(final Class<T> theMatchingClass, final DiagnosticLoggingAdapter theLogger) {
super(theMatchingClass, theLogger);
}

@Override
public FI.TypedPredicate<T> getPredicate() {
return command -> null != thing() && thing().getId()
.filter(command.getId()::equals)
.isPresent();
}

@Override
public FI.UnitApply<T> getUnhandledFunction() {
return command -> {
throw new IllegalArgumentException(
MessageFormat.format(ThingPersistenceActor.UNHANDLED_MESSAGE_TEMPLATE, command.getId()));
};
}

protected static Instant eventTimestamp() {
return Instant.now();
}

}
@@ -0,0 +1,55 @@
/*
* Copyright (c) 2017 Bosch Software Innovations GmbH.
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v2.0
* which accompanies this distribution, and is available at
* https://www.eclipse.org/org/documents/epl-2.0/index.php
* Contributors:
* Bosch Software Innovations GmbH - initial contribution
*
*/

package org.eclipse.ditto.services.things.persistence.actors.strategies;

import java.util.Optional;

import javax.annotation.Nullable;

import org.eclipse.ditto.model.base.exceptions.DittoRuntimeException;
import org.eclipse.ditto.signals.commands.things.ThingCommandResponse;
import org.eclipse.ditto.signals.events.things.ThingEvent;

class ImmutableResult implements ReceiveStrategy.Result {

@Nullable private final ThingEvent eventToPersist;
@Nullable private final ThingCommandResponse response;
@Nullable private final DittoRuntimeException exception;

private ImmutableResult(@Nullable final ThingEvent eventToPersist,
@Nullable final ThingCommandResponse response,
@Nullable final DittoRuntimeException exception) {
this.eventToPersist = eventToPersist;
this.response = response;
this.exception = exception;
}

static ReceiveStrategy.Result of(final ThingEvent eventToPersist, final ThingCommandResponse response) {
return new ImmutableResult(eventToPersist, response, null);
}

@Override
public Optional<ThingEvent> getEventToPersist() {
return Optional.ofNullable(eventToPersist);
}

@Override
public Optional<ThingCommandResponse> getResponse() {
return Optional.ofNullable(response);
}

@Override
public Optional<DittoRuntimeException> getException() {
return Optional.ofNullable(exception);
}
}
@@ -0,0 +1,61 @@
/*
* Copyright (c) 2017 Bosch Software Innovations GmbH.
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v2.0
* which accompanies this distribution, and is available at
* https://www.eclipse.org/org/documents/epl-2.0/index.php
* Contributors:
* Bosch Software Innovations GmbH - initial contribution
*
*/
package org.eclipse.ditto.services.things.persistence.actors.strategies;

import javax.annotation.concurrent.NotThreadSafe;

import org.eclipse.ditto.model.base.headers.DittoHeaders;
import org.eclipse.ditto.model.things.Thing;
import org.eclipse.ditto.signals.commands.things.modify.ModifyAttributes;
import org.eclipse.ditto.signals.commands.things.modify.ModifyAttributesResponse;
import org.eclipse.ditto.signals.commands.things.modify.ThingModifyCommandResponse;
import org.eclipse.ditto.signals.events.things.AttributesCreated;
import org.eclipse.ditto.signals.events.things.AttributesModified;
import org.eclipse.ditto.signals.events.things.ThingModifiedEvent;

/**
* This strategy handles the {@link ModifyAttributes} command.
*/
@NotThreadSafe
public final class ModifyAttributesStrategy extends AbstractThingCommandStrategy<ModifyAttributes> {

/**
* Constructs a new {@code ModifyAttributesStrategy} object.
*/
public ModifyAttributesStrategy() {
super(ModifyAttributes.class, null);
}

@Override
protected Result doApply(final Context context, final ModifyAttributes command) {
final DittoHeaders dittoHeaders = command.getDittoHeaders();
final ThingModifiedEvent eventToPersist;
final ThingModifyCommandResponse response;

final String thingId = context.getThingId();
final Thing thing = context.getThing();
final long nextRevision = context.nextRevision();

if (thing.getAttributes().isPresent()) {
eventToPersist = AttributesModified.of(thingId, command.getAttributes(), nextRevision,
eventTimestamp(), dittoHeaders);
response = ModifyAttributesResponse.modified(thingId, dittoHeaders);
} else {
eventToPersist = AttributesCreated.of(thingId, command.getAttributes(), nextRevision,
eventTimestamp(), dittoHeaders);
response = ModifyAttributesResponse.created(thingId, command.getAttributes(), dittoHeaders);
}

return ImmutableResult.of(eventToPersist, response);
}

}

0 comments on commit ca04889

Please sign in to comment.