Skip to content

Commit

Permalink
[#1228] Delay LiveResponseAndAcknowledgementForwarder termination unt…
Browse files Browse the repository at this point in the history
…il valid response is received.

Signed-off-by: Yufei Cai <yufei.cai@bosch.io>
  • Loading branch information
yufei-cai committed Dec 1, 2021
1 parent eb459de commit d28ad36
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,12 @@
import java.util.Set;

import org.eclipse.ditto.base.model.acks.AcknowledgementLabel;
import org.eclipse.ditto.base.model.signals.Signal;
import org.eclipse.ditto.base.model.signals.acks.Acknowledgement;
import org.eclipse.ditto.base.model.signals.acks.Acknowledgements;
import org.eclipse.ditto.base.model.signals.commands.Command;
import org.eclipse.ditto.base.model.signals.commands.CommandResponse;
import org.eclipse.ditto.connectivity.model.ConnectionIdInvalidException;
import org.eclipse.ditto.internal.models.signal.correlation.CommandAndCommandResponseMatchingValidator;
import org.eclipse.ditto.internal.utils.akka.logging.DittoDiagnosticLoggingAdapter;
import org.eclipse.ditto.internal.utils.akka.logging.DittoLoggerFactory;
import org.eclipse.ditto.things.model.signals.commands.query.ThingQueryCommandResponse;
Expand All @@ -44,36 +47,40 @@ public final class LiveResponseAndAcknowledgementForwarder extends AbstractActor
private final ActorRef messageReceiver;
private final ActorRef acknowledgementReceiver;
private final Set<AcknowledgementLabel> pendingAcknowledgements;
private final Command<?> command;
private final CommandAndCommandResponseMatchingValidator responseValidator;
private boolean responseReceived = false;
private ActorRef messageSender;

@SuppressWarnings("unused")
private LiveResponseAndAcknowledgementForwarder(final Signal<?> liveSignal,
private LiveResponseAndAcknowledgementForwarder(final Command<?> command,
final ActorRef messageReceiver,
final ActorRef acknowledgementReceiver) {

logger = DittoLoggerFactory.getDiagnosticLoggingAdapter(this);
pendingAcknowledgements = new HashSet<>();
this.messageReceiver = messageReceiver;
this.acknowledgementReceiver = acknowledgementReceiver;
getContext().setReceiveTimeout(liveSignal.getDittoHeaders().getTimeout().orElse(DEFAULT_TIMEOUT));
for (final var ackRequest : liveSignal.getDittoHeaders().getAcknowledgementRequests()) {
this.command = command;
responseValidator = CommandAndCommandResponseMatchingValidator.getInstance();
getContext().setReceiveTimeout(command.getDittoHeaders().getTimeout().orElse(DEFAULT_TIMEOUT));
for (final var ackRequest : command.getDittoHeaders().getAcknowledgementRequests()) {
pendingAcknowledgements.add(ackRequest.getLabel());
}
}

/**
* Create Props object for this actor.
*
* @param liveSignal The live signal whose acknowledgements and responses this actor listens for.
* @param command The live command whose acknowledgements and responses this actor listens for.
* @param messageReceiver Receiver of the message to publish.
* @param acknowledgementReceiver Receiver of acknowledgements.
* @return The Props object.
*/
public static Props props(final Signal<?> liveSignal,
public static Props props(final Command<?> command,
final ActorRef messageReceiver,
final ActorRef acknowledgementReceiver) {
return Props.create(LiveResponseAndAcknowledgementForwarder.class, liveSignal, messageReceiver,
return Props.create(LiveResponseAndAcknowledgementForwarder.class, command, messageReceiver,
acknowledgementReceiver);
}

Expand Down Expand Up @@ -109,8 +116,11 @@ private void onAcknowledgements(final Acknowledgements acks) {
}

private void onQueryCommandResponse(final ThingQueryCommandResponse<?> response) {
logger.debug("Got <{}>", response);
responseReceived = true;
final boolean validResponse = isValidResponse(response);
logger.debug("Got <{}>, valid=<{}>", response, validResponse);
if (validResponse) {
responseReceived = true;
}
if (messageSender != null) {
messageSender.forward(response, getContext());
checkCompletion();
Expand All @@ -130,4 +140,12 @@ private void stopSelf(final Object trigger) {
logger.debug("Stopping due to <{}>", trigger);
getContext().stop(getSelf());
}

private boolean isValidResponse(final CommandResponse<?> response) {
try {
return responseValidator.apply(command, response).isSuccess();
} catch (final ConnectionIdInvalidException e) {
return false;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,6 @@

import akka.actor.ActorRef;
import akka.actor.ActorRefFactory;
import akka.actor.ActorSystem;
import akka.japi.Pair;

/**
Expand Down Expand Up @@ -438,10 +437,10 @@ private <T extends Signal<?>, S extends T> CompletionStage<Contextual<WithDittoH
obj -> pub.wrapForPublicationWithAcks((S) obj, ackExtractor)));
}

private <T extends Signal<?>> Contextual<WithDittoHeaders> askAndBuildJsonViewWithAckForwarding(
final T signal,
final AckExtractor<T> ackExtractor,
final DistributedPub<T> pub,
private Contextual<WithDittoHeaders> askAndBuildJsonViewWithAckForwarding(
final ThingCommand<?> signal,
final AckExtractor<ThingCommand<?>> ackExtractor,
final DistributedPub<ThingCommand<?>> pub,
final Enforcer enforcer) {

final var publish = pub.wrapForPublicationWithAcks(signal, ackExtractor);
Expand Down

0 comments on commit d28ad36

Please sign in to comment.