Skip to content

Commit

Permalink
fixed issues + tests after ensuring the order of signals in AbstractP…
Browse files Browse the repository at this point in the history
…ersistenceSupervisor

Signed-off-by: Thomas Jaeckle <thomas.jaeckle@bosch.io>
  • Loading branch information
thjaeckle committed Jun 23, 2022
1 parent 47ca32b commit 084b5d0
Show file tree
Hide file tree
Showing 5 changed files with 84 additions and 31 deletions.
Expand Up @@ -23,11 +23,13 @@
import javax.jms.JMSRuntimeException;

import org.eclipse.ditto.base.model.exceptions.DittoRuntimeExceptionBuilder;
import org.eclipse.ditto.base.model.signals.Signal;
import org.eclipse.ditto.base.service.actors.ShutdownBehaviour;
import org.eclipse.ditto.base.service.config.supervision.ExponentialBackOffConfig;
import org.eclipse.ditto.connectivity.model.ConnectionId;
import org.eclipse.ditto.connectivity.model.signals.commands.ConnectivityCommand;
import org.eclipse.ditto.connectivity.model.signals.commands.exceptions.ConnectionUnavailableException;
import org.eclipse.ditto.connectivity.model.signals.commands.modify.LoggingExpired;
import org.eclipse.ditto.connectivity.service.config.ConnectionConfig;
import org.eclipse.ditto.connectivity.service.config.ConnectivityConfigModifiedBehavior;
import org.eclipse.ditto.connectivity.service.config.DittoConnectivityConfig;
Expand All @@ -44,6 +46,7 @@
import akka.actor.Props;
import akka.actor.SupervisorStrategy;
import akka.japi.pf.DeciderBuilder;
import akka.japi.pf.FI;
import akka.japi.pf.ReceiveBuilder;
import akka.pattern.Patterns;

Expand Down Expand Up @@ -121,7 +124,8 @@ public Receive createReceive() {
}

@Override
protected Receive activeBehaviour() {
protected Receive activeBehaviour(final FI.UnitApply<AbstractPersistenceSupervisor.Control> matchProcessNextTwinMessageBehavior,
final FI.UnitApply<Object> matchAnyBehavior) {
return ReceiveBuilder.create()
.match(Config.class, this::onConnectivityConfigModified)
.matchEquals(Control.CHECK_FOR_OVERWRITES_CONFIG,
Expand All @@ -132,7 +136,14 @@ protected Receive activeBehaviour() {
})
.build()
.orElse(connectivityConfigModifiedBehavior())
.orElse(super.activeBehaviour());
.orElse(super.activeBehaviour(matchProcessNextTwinMessageBehavior, matchAnyBehavior));
}

@Override
protected boolean shouldBecomeTwinSignalProcessingAwaiting(final Signal<?> signal) {
return super.shouldBecomeTwinSignalProcessingAwaiting(signal) &&
!(signal instanceof LoggingExpired) // twin signal without a response
;
}

@Override
Expand Down
Expand Up @@ -47,6 +47,7 @@
import akka.actor.SupervisorStrategy;
import akka.actor.Terminated;
import akka.cluster.sharding.ShardRegion;
import akka.japi.pf.FI;
import akka.japi.pf.ReceiveBuilder;
import akka.pattern.Patterns;

Expand Down Expand Up @@ -151,15 +152,17 @@ protected boolean isStartChildImmediately() {
return true;
}

protected Receive activeBehaviour() {
protected Receive activeBehaviour(final FI.UnitApply<Control> matchProcessNextTwinMessageBehavior,
final FI.UnitApply<Object> matchAnyBehavior) {
return ReceiveBuilder.create()
.match(Terminated.class, this::childTerminated)
.matchEquals(Control.START_CHILDREN, this::startChildren)
.matchEquals(Control.PASSIVATE, this::passivate)
.matchEquals(Control.PROCESS_NEXT_TWIN_MESSAGE, matchProcessNextTwinMessageBehavior)
.match(SudoCommand.class, this::forwardSudoCommandToChildIfAvailable)
.match(WithDittoHeaders.class, w -> w.getDittoHeaders().isSudo(),
this::forwardDittoSudoToChildIfAvailable)
.matchAny(this::enforceAndForwardToTargetActor)
.matchAny(matchAnyBehavior)
.build();
}

Expand Down Expand Up @@ -266,15 +269,17 @@ protected CompletionStage<Object> askEnforcerChild(final Signal<?> signal) {
/**
* Asks the "target actor" (being either the {@link AbstractPersistenceActor} for "twin" commands or e.g. a
* pub/sub actor reference for "live" commands/messages) - which is determined by
* {@link #getTargetActorForSendingEnforcedMessageTo(Object, akka.actor.ActorRef)} - the passed {@code message}.
* {@link #getTargetActorForSendingEnforcedMessageTo(Object, boolean, akka.actor.ActorRef)} - the passed {@code message}.
*
* @param message the message to ask the target actor.
* @param responseRequired whether the message requires a response or not.
* @param sender the sender which originally sent the message.
* @param <T> the type of the message.
* @return the completion stage with the response for the message or a failed stage.
*/
protected <T> CompletionStage<Object> askTargetActor(final T message, final ActorRef sender) {
return getTargetActorForSendingEnforcedMessageTo(message, sender)
protected <T> CompletionStage<Object> askTargetActor(final T message, final boolean responseRequired,
final ActorRef sender) {
return getTargetActorForSendingEnforcedMessageTo(message, responseRequired, sender)
.thenCompose(this::askOrForwardToTargetActor)
.thenApply(response -> {
if (null == response) {
Expand Down Expand Up @@ -314,19 +319,21 @@ private CompletionStage<Object> askOrForwardToTargetActor(
* May be overwritten by implementations to determine the target actor in a different way.
*
* @param message the message to determine the target actor for.
* @param responseRequired whether the message requires a response or not.
* @param sender the sender which originally sent the message.
* @param <T> the type of the message.
* @return the completion stage with the determined {@link TargetActorWithMessage} which includes the target actor
* and the message to send it to
*/
protected <T> CompletionStage<TargetActorWithMessage> getTargetActorForSendingEnforcedMessageTo(final T message,
final boolean responseRequired,
final ActorRef sender) {
if (null != persistenceActorChild) {
return CompletableFuture.completedStage(
new TargetActorWithMessage(
persistenceActorChild,
message,
DEFAULT_LOCAL_ASK_TIMEOUT,
responseRequired ? DEFAULT_LOCAL_ASK_TIMEOUT : Duration.ZERO,
Function.identity()
));
} else {
Expand All @@ -339,8 +346,32 @@ private static DittoHeaders getDittoHeaders(final Object message) {
}

private void becomeActive(final ShutdownBehaviour shutdownBehaviour) {
getContext().become(shutdownBehaviour.createReceive().build()
.orElse(activeBehaviour()));
getContext().become(
shutdownBehaviour.createReceive().build().orElse(
activeBehaviour(
processNextTwinMessage -> {
// ingore
},
this::enforceAndForwardToTargetActor
)
)
);
}

protected void becomeTwinSignalProcessingAwaiting() {
getContext().become(
activeBehaviour(
processNextTwinMsg -> {
unstashAll();
becomeActive(getShutdownBehaviour(entityId));
},
m -> {
log.withCorrelationId(m instanceof WithDittoHeaders w ? w : null)
.debug("stashing during 'becomeTwinSignalProcessingAwaiting': <{}>", m.getClass().getSimpleName());
stash();
}
)
);
}

private void passivate(final Control passivationTrigger) {
Expand Down Expand Up @@ -478,26 +509,15 @@ private void enforceAndForwardToTargetActor(final Object message) {
entityId, message);
unhandled(message);
} else {
getContext().become(ReceiveBuilder.create()
.match(Terminated.class, this::childTerminated)
.matchEquals(Control.START_CHILDREN, this::startChildren)
.matchEquals(Control.PASSIVATE, this::passivate)
.matchEquals(Control.PROCESS_NEXT_MESSAGE, processNextMsg -> {
unstashAll();
becomeActive(getShutdownBehaviour(entityId));
})
.match(SudoCommand.class, this::forwardSudoCommandToChildIfAvailable)
.match(WithDittoHeaders.class, w -> w.getDittoHeaders().isSudo(),
this::forwardDittoSudoToChildIfAvailable)
.matchAny(m -> stash()) // stash all Signals
.build()
);
if (shouldBecomeTwinSignalProcessingAwaiting(signal)) {
becomeTwinSignalProcessingAwaiting();
}

Patterns.pipe(
enforceSignalAndForwardToTargetActor((S) signal, sender)
.handle((response, throwable) -> {
handleSignalEnforcementResponse(response, throwable, signal, sender);
return Control.PROCESS_NEXT_MESSAGE;
return Control.PROCESS_NEXT_TWIN_MESSAGE;
}),
getContext().getDispatcher()
).pipeTo(getSelf(), getSelf());
Expand All @@ -515,6 +535,19 @@ private void enforceAndForwardToTargetActor(final Object message) {
}
}

/**
* Determines whether the passed {@code signal} should stash further Signals sent to the supervisor until it was
* processed.
* That is required in order to guarantee in-order processing of twin commands.
*
* @param signal the signal to determine it for.
* @return whether the supervisor should become signal processing awaiting for the passed signal.
*/
protected boolean shouldBecomeTwinSignalProcessingAwaiting(final Signal<?> signal) {
return !Signal.isChannelLive(signal) && !Signal.isChannelSmart(signal) &&
signal.getDittoHeaders().isResponseRequired();
}

private void handleSignalEnforcementResponse(@Nullable final Object response,
@Nullable final Throwable throwable,
final WithDittoHeaders signal,
Expand Down Expand Up @@ -598,14 +631,16 @@ private CompletionStage<EnforcedSignalAndTargetActorResponse> enforcerResponseTo
log.withCorrelationId(enforcedSignal)
.debug("Received enforcedSignal from enforcerChild, forwarding to target actor: {}",
enforcedSignal);
return askTargetActor(enforcedSignal, sender)
return askTargetActor(enforcedSignal, enforcedSignal.getDittoHeaders().isResponseRequired(), sender)
.thenCompose(response ->
modifyTargetActorCommandResponse(enforcedSignal, response))
.thenApply(response ->
new EnforcedSignalAndTargetActorResponse(enforcedSignal, response)
);
} else if (enforcerResponse instanceof DistributedPubWithMessage distributedPubWithMessage) {
return askTargetActor(distributedPubWithMessage, sender)
return askTargetActor(distributedPubWithMessage,
distributedPubWithMessage.signal().getDittoHeaders().isResponseRequired(), sender
)
.thenCompose(response ->
modifyTargetActorCommandResponse(distributedPubWithMessage.signal(), response))
.thenApply(response ->
Expand Down Expand Up @@ -683,7 +718,7 @@ public enum Control {
/**
* Signals the actor to process the next message.
*/
PROCESS_NEXT_MESSAGE
PROCESS_NEXT_TWIN_MESSAGE
}

private record EnforcedSignalAndTargetActorResponse(@Nullable Signal<?> enforcedSignal,
Expand Down
Expand Up @@ -240,6 +240,7 @@ static <T extends ThingCommandResponse<?>> T replaceAuthContext(final T response

@Override
protected CompletionStage<TargetActorWithMessage> getTargetActorForSendingEnforcedMessageTo(final Object message,
final boolean responseRequired,
final ActorRef sender) {

if (message instanceof CommandResponse<?> commandResponse &&
Expand All @@ -261,7 +262,7 @@ protected CompletionStage<TargetActorWithMessage> getTargetActorForSendingEnforc
return liveChannelDispatching.dispatchLiveSignal(signal, sender);
} else {

return super.getTargetActorForSendingEnforcedMessageTo(message, sender);
return super.getTargetActorForSendingEnforcedMessageTo(message, responseRequired, sender);
}
}

Expand Down
Expand Up @@ -115,7 +115,9 @@ public <S extends ThingEvent<?>> Object wrapForPublicationWithAcks(final S messa
new TestSetup.DummyLiveSignalPub(pubSubMediatorProbe.ref()),
thingPersistenceActorProbe.ref(),
null
), system.guardian(), URLEncoder.encode(THING_ID.toString(), Charset.defaultCharset()));
).withDispatcher("akka.actor.default-dispatcher"), system.guardian(), URLEncoder.encode(THING_ID.toString(), Charset.defaultCharset()));
// Actors using "stash()" require the above dispatcher to be configured, otherwise stash() and unstashAll() won't
// work like in the "normal" actor!
}

protected void expectAndAnswerSudoRetrieveThing(final Object sudoRetrieveThingResponse) {
Expand Down
Expand Up @@ -76,9 +76,9 @@
import org.eclipse.ditto.things.model.signals.commands.modify.ModifyAttribute;
import org.eclipse.ditto.things.model.signals.commands.modify.ModifyAttributeResponse;
import org.eclipse.ditto.things.model.signals.commands.modify.ModifyFeature;
import org.eclipse.ditto.things.model.signals.commands.modify.ModifyFeatureResponse;
import org.eclipse.ditto.things.model.signals.commands.modify.ModifyPolicyId;
import org.eclipse.ditto.things.model.signals.commands.modify.ModifyPolicyIdResponse;
import org.eclipse.ditto.things.model.signals.commands.modify.ModifyThing;
import org.eclipse.ditto.things.model.signals.commands.query.RetrieveAttribute;
import org.eclipse.ditto.things.model.signals.commands.query.RetrieveAttributeResponse;
import org.eclipse.ditto.things.model.signals.commands.query.RetrieveThing;
Expand Down Expand Up @@ -232,6 +232,10 @@ public void acceptByPolicy() {
final ThingCommand<?> expectedWriteCommand = addReadSubjectHeader(write,
SubjectId.newInstance(GOOGLE, TestSetup.SUBJECT_ID));
thingPersistenceActorProbe.expectMsg(expectedWriteCommand);
final ModifyFeatureResponse modifyFeatureResponse =
ModifyFeatureResponse.modified(THING_ID, "x", headers());
thingPersistenceActorProbe.reply(modifyFeatureResponse);
expectMsg(modifyFeatureResponse);

final ThingCommand<?> read = getReadCommand();
final RetrieveThingResponse retrieveThingResponse =
Expand Down

0 comments on commit 084b5d0

Please sign in to comment.