Skip to content

Commit

Permalink
fixed acknowledgement sending for live commands
Browse files Browse the repository at this point in the history
* use fallback "ditto-ackgregator-address" from orignating signal if ACK/response did not contain it by themself

Signed-off-by: Thomas Jaeckle <thomas.jaeckle@bosch.io>
  • Loading branch information
thjaeckle committed Jun 17, 2022
1 parent b7407b7 commit 546fd07
Show file tree
Hide file tree
Showing 6 changed files with 46 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
import java.util.Optional;
import java.util.function.Predicate;

import javax.annotation.Nullable;

import org.eclipse.ditto.base.model.acks.AcknowledgementLabel;
import org.eclipse.ditto.base.model.entity.id.EntityId;
import org.eclipse.ditto.base.model.headers.DittoHeaderDefinition;
Expand Down Expand Up @@ -57,6 +59,7 @@ public final class AcknowledgementForwarderActor extends AbstractActor {
static final String ACTOR_NAME_PREFIX = "ackForwarder-";

private final String correlationId;
@Nullable private final String ackgregatorAddressFallback;
private final DittoDiagnosticLoggingAdapter log;

@SuppressWarnings("unused")
Expand All @@ -67,6 +70,7 @@ private AcknowledgementForwarderActor(final DittoHeaders dittoHeaders, final Dur
// fall back using the actor name which also contains the correlation-id
getSelf().path().name().replace(ACTOR_NAME_PREFIX, "")
);
ackgregatorAddressFallback = dittoHeaders.get(DittoHeaderDefinition.DITTO_ACKREGATOR_ADDRESS.getKey());
log = DittoLoggerFactory.getDiagnosticLoggingAdapter(this);

getContext().setReceiveTimeout(dittoHeaders.getTimeout().orElse(defaultTimeout));
Expand Down Expand Up @@ -94,7 +98,8 @@ public Receive createReceive() {

private void forwardCommandResponse(final WithDittoHeaders acknowledgementOrResponse) {
final DittoHeaders dittoHeaders = acknowledgementOrResponse.getDittoHeaders();
final String ackregatorAddress = dittoHeaders.get(DittoHeaderDefinition.DITTO_ACKREGATOR_ADDRESS.getKey());
final String ackregatorAddress = dittoHeaders.getOrDefault(
DittoHeaderDefinition.DITTO_ACKREGATOR_ADDRESS.getKey(), ackgregatorAddressFallback);
if (null != ackregatorAddress) {
final ActorSelection acknowledgementRequester = getContext().actorSelection(ackregatorAddress);
log.withCorrelationId(acknowledgementOrResponse)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import javax.annotation.Nullable;

import org.eclipse.ditto.base.model.acks.AcknowledgementLabel;
import org.eclipse.ditto.base.model.headers.DittoHeaderDefinition;
import org.eclipse.ditto.base.model.signals.acks.Acknowledgement;
import org.eclipse.ditto.base.model.signals.acks.Acknowledgements;
import org.eclipse.ditto.base.model.signals.commands.CommandResponse;
Expand All @@ -30,15 +31,14 @@

import akka.actor.AbstractActor;
import akka.actor.ActorRef;
import akka.actor.ActorSelection;
import akka.actor.Props;
import akka.actor.ReceiveTimeout;
import akka.japi.pf.ReceiveBuilder;

/**
* An actor to deal with a live thing query command expecting a response and requesting custom acknowledgements.
* The sender of the live command is an actor in Concierge in order to apply policy enforcement on the response.
* As a result, custom acknowledgements are also sent to Concierge, which must forward them to the acknowledgement
* aggregator actor.
* The sender of the live command is an actor in order to apply policy enforcement on the response.
*/
final class LiveResponseAndAcknowledgementForwarder extends AbstractActor {

Expand All @@ -47,7 +47,6 @@ final class LiveResponseAndAcknowledgementForwarder extends AbstractActor {
private final DittoDiagnosticLoggingAdapter log = DittoLoggerFactory.getDiagnosticLoggingAdapter(this);

private final ActorRef messageReceiver;
private final ActorRef acknowledgementReceiver;
private final Set<AcknowledgementLabel> pendingAcknowledgements;
private final ThingQueryCommand<?> thingQueryCommand;
private final CommandAndCommandResponseMatchingValidator responseValidator;
Expand All @@ -58,12 +57,10 @@ final class LiveResponseAndAcknowledgementForwarder extends AbstractActor {

@SuppressWarnings("unused")
private LiveResponseAndAcknowledgementForwarder(final ThingQueryCommand<?> thingQueryCommand,
final ActorRef messageReceiver,
final ActorRef acknowledgementReceiver) {
final ActorRef messageReceiver) {

pendingAcknowledgements = new HashSet<>();
this.messageReceiver = messageReceiver;
this.acknowledgementReceiver = acknowledgementReceiver;
this.thingQueryCommand = thingQueryCommand;
responseValidator = CommandAndCommandResponseMatchingValidator.getInstance();
getContext().setReceiveTimeout(this.thingQueryCommand.getDittoHeaders().getTimeout().orElse(DEFAULT_TIMEOUT));
Expand All @@ -77,14 +74,11 @@ private LiveResponseAndAcknowledgementForwarder(final ThingQueryCommand<?> thing
*
* @param thingQueryCommand The live command whose acknowledgements and responses this actor listens for.
* @param messageReceiver Receiver of the message to publish.
* @param acknowledgementReceiver Receiver of acknowledgements.
* @return The Props object.
*/
public static Props props(final ThingQueryCommand<?> thingQueryCommand,
final ActorRef messageReceiver,
final ActorRef acknowledgementReceiver) {
return Props.create(LiveResponseAndAcknowledgementForwarder.class, thingQueryCommand, messageReceiver,
acknowledgementReceiver);
final ActorRef messageReceiver) {
return Props.create(LiveResponseAndAcknowledgementForwarder.class, thingQueryCommand, messageReceiver);
}

@Override
Expand All @@ -107,15 +101,15 @@ private void sendMessage(final Object message) {
private void onAcknowledgement(final Acknowledgement ack) {
log.debug("Got <{}>", ack);
pendingAcknowledgements.remove(ack.getLabel());
acknowledgementReceiver.forward(ack, getContext());
forwardToAcknowledgementRequester(ack);
}

private void onAcknowledgements(final Acknowledgements acks) {
log.debug("Got <{}>", acks);
for (final var ack : acks) {
pendingAcknowledgements.remove(ack.getLabel());
}
acknowledgementReceiver.forward(acks, getContext());
forwardToAcknowledgementRequester(acks);
}

private void onCommandResponse(final CommandResponse<?> incomingResponse) {
Expand All @@ -132,7 +126,26 @@ private void onCommandResponse(final CommandResponse<?> incomingResponse) {
stopSelf("Message sender not found");
}
} else {
acknowledgementReceiver.forward(response, getContext());
forwardToAcknowledgementRequester(response);
}
}

private void forwardToAcknowledgementRequester(final CommandResponse<?> ackResponse) {
final String ackregatorAddress = ackResponse.getDittoHeaders()
.getOrDefault(DittoHeaderDefinition.DITTO_ACKREGATOR_ADDRESS.getKey(),
thingQueryCommand.getDittoHeaders()
.get(DittoHeaderDefinition.DITTO_ACKREGATOR_ADDRESS.getKey()));
if (null != ackregatorAddress) {
final ActorSelection acknowledgementRequester = getContext().actorSelection(ackregatorAddress);
log.withCorrelationId(ackResponse)
.debug("Received Acknowledgement / live CommandResponse, forwarding to original requester <{}>: " +
"<{}>", acknowledgementRequester, ackResponse);
acknowledgementRequester.forward(ackResponse, getContext());
} else {
log.withCorrelationId(ackResponse)
.error("Received Acknowledgement / live CommandResponse <{}> did not contain header of " +
"Ackgregator address: {}", ackResponse.getClass().getSimpleName(),
ackResponse.getDittoHeaders());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,23 +87,21 @@ final class SupervisorLiveChannelDispatching {
* acknowledgements.
*
* @param thingQueryCommand the command to handle as "live" query command
* @param sender the original sender of the command required for responding the live response to
* @param responseHandler a response handler function creating the instance of {@link TargetActorWithMessage} the
* returned CompletionStage will be completed with
* @return a CompletionStage which will be completed with a target actor and a message to send to this target actor
*/
CompletionStage<TargetActorWithMessage> dispatchLiveChannelThingQueryCommand(
final ThingQueryCommand<?> thingQueryCommand,
final ActorRef sender,
final BiFunction<ThingQueryCommand<?>, ActorRef, TargetActorWithMessage> responseHandler) {

if (enforcementConfig.shouldDispatchGlobally(thingQueryCommand)) {
return responseReceiverCache.insertResponseReceiverConflictFree(thingQueryCommand,
conflictFreeCommand -> createLiveResponseReceiverActor(conflictFreeCommand, sender),
this::createLiveResponseReceiverActor,
responseHandler
);
} else {
final var receiver = createLiveResponseReceiverActor(thingQueryCommand, sender);
final var receiver = createLiveResponseReceiverActor(thingQueryCommand);
return CompletableFuture.completedStage(responseHandler.apply(thingQueryCommand, receiver));
}
}
Expand Down Expand Up @@ -278,12 +276,10 @@ private Object handleEncounteredAskTimeoutsAsCommandTimeoutException(final Signa
}
}

private ActorRef createLiveResponseReceiverActor(final ThingQueryCommand<?> thingQueryCommand,
final ActorRef sender) {
private ActorRef createLiveResponseReceiverActor(final ThingQueryCommand<?> thingQueryCommand) {

final var pub = liveSignalPub.command();
final var props = LiveResponseAndAcknowledgementForwarder.props(thingQueryCommand, pub.getPublisher(),
sender);
final var props = LiveResponseAndAcknowledgementForwarder.props(thingQueryCommand, pub.getPublisher());
// and start the actor using the provided actorRefFactory
return actorRefFactory.actorOf(props);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ CompletionStage<TargetActorWithMessage> dispatchSmartChannelThingQueryCommand(
if (shouldAttemptLiveChannel(thingQueryCommand, twinQueryCommandResponse)) {
// perform conversion + publishing of live command
final ThingQueryCommand<?> liveCommand = toLiveCommand(thingQueryCommand);
return liveChannelDispatching.dispatchLiveChannelThingQueryCommand(liveCommand, sender,
return liveChannelDispatching.dispatchLiveChannelThingQueryCommand(liveCommand,
(command, receiver) ->
liveChannelDispatching.prepareForPubSubPublishing(command, receiver,
response ->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ protected CompletionStage<TargetActorWithMessage> getTargetActorForSendingEnforc
} else if (message instanceof ThingQueryCommand<?> thingQueryCommand &&
Command.isLiveCommand(thingQueryCommand)) {

return liveChannelDispatching.dispatchLiveChannelThingQueryCommand(thingQueryCommand, sender,
return liveChannelDispatching.dispatchLiveChannelThingQueryCommand(thingQueryCommand,
liveChannelDispatching::prepareForPubSubPublishing);
} else if (message instanceof Signal<?> signal &&
(Command.isLiveCommand(signal) || Event.isLiveEvent(signal))) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,9 @@ public void liveChannelErrorWithTwinFallback() {

expectLiveQueryCommandOnPubSub(retrieveThing);
pubSubMediatorProbe.reply(ThingErrorResponse.of(ThingIdInvalidException.newBuilder(retrieveThing.getEntityId())
.dittoHeaders(DittoHeaders.newBuilder().channel("live").build())
.dittoHeaders(DittoHeaders.newBuilder().channel("live")
.putHeader(DittoHeaderDefinition.DITTO_ACKREGATOR_ADDRESS.getKey(), getRef().path().toSerializationFormat())
.build())
.build()));

final var receivedErrorResponse = expectMsgClass(ThingErrorResponse.class);
Expand Down Expand Up @@ -245,7 +247,9 @@ public void liveCommandErrorWithTwinFallback() {

expectLiveQueryCommandOnPubSub(retrieveThing);
pubSubMediatorProbe.reply(ThingErrorResponse.of(ThingIdInvalidException.newBuilder(retrieveThing.getEntityId())
.dittoHeaders(DittoHeaders.newBuilder().channel("live").build())
.dittoHeaders(DittoHeaders.newBuilder().channel("live")
.putHeader(DittoHeaderDefinition.DITTO_ACKREGATOR_ADDRESS.getKey(), getRef().path().toSerializationFormat())
.build())
.build()));
final var receivedErrorResponse = expectMsgClass(ThingErrorResponse.class);
assertLiveChannel(receivedErrorResponse);
Expand Down

0 comments on commit 546fd07

Please sign in to comment.