Skip to content

Commit

Permalink
fixed another ack issue in gateway
Browse files Browse the repository at this point in the history
* fix when to ask/tell responses with ack-requests and "response-required" combinations

Signed-off-by: Thomas Jaeckle <thomas.jaeckle@bosch.io>
  • Loading branch information
thjaeckle committed Jun 23, 2022
1 parent 3212964 commit 90b8539
Show file tree
Hide file tree
Showing 5 changed files with 42 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,7 @@ private Receive createIncomingSignalBehavior() {

final Receive signalBehavior = ReceiveBuilder.create()
.match(Acknowledgement.class, this::hasUndeclaredAckLabel, this::ackLabelNotDeclared)
.match(Acknowledgement.class, this::forwardAcknowledgementOrLiveCommandResponse)
.match(CommandResponse.class, CommandResponse::isLiveCommandResponse, liveCommandResponse ->
commandRouter.forward(liveCommandResponse, getContext()))
.match(CommandResponse.class, this::forwardAcknowledgementOrLiveCommandResponse)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -272,14 +272,14 @@ protected CompletionStage<Object> askEnforcerChild(final Signal<?> signal) {
* {@link #getTargetActorForSendingEnforcedMessageTo(Object, boolean, akka.actor.ActorRef)} - the passed {@code message}.
*
* @param message the message to ask the target actor.
* @param responseRequired whether the message requires a response or not.
* @param shouldSendResponse whether the message should send a response or not.
* @param sender the sender which originally sent the message.
* @param <T> the type of the message.
* @return the completion stage with the response for the message or a failed stage.
*/
protected <T> CompletionStage<Object> askTargetActor(final T message, final boolean responseRequired,
protected <T> CompletionStage<Object> askTargetActor(final T message, final boolean shouldSendResponse,
final ActorRef sender) {
return getTargetActorForSendingEnforcedMessageTo(message, responseRequired, sender)
return getTargetActorForSendingEnforcedMessageTo(message, shouldSendResponse, sender)
.thenCompose(this::askOrForwardToTargetActor)
.thenApply(response -> {
if (null == response) {
Expand Down Expand Up @@ -319,21 +319,21 @@ private CompletionStage<Object> askOrForwardToTargetActor(
* May be overwritten by implementations to determine the target actor in a different way.
*
* @param message the message to determine the target actor for.
* @param responseRequired whether the message requires a response or not.
* @param shouldSendResponse whether the message should send a response or not.
* @param sender the sender which originally sent the message.
* @param <T> the type of the message.
* @return the completion stage with the determined {@link TargetActorWithMessage} which includes the target actor
* and the message to send it to
*/
protected <T> CompletionStage<TargetActorWithMessage> getTargetActorForSendingEnforcedMessageTo(final T message,
final boolean responseRequired,
final boolean shouldSendResponse,
final ActorRef sender) {
if (null != persistenceActorChild) {
return CompletableFuture.completedStage(
new TargetActorWithMessage(
persistenceActorChild,
message,
responseRequired ? DEFAULT_LOCAL_ASK_TIMEOUT : Duration.ZERO,
shouldSendResponse ? DEFAULT_LOCAL_ASK_TIMEOUT : Duration.ZERO,
Function.identity()
));
} else {
Expand Down Expand Up @@ -631,7 +631,7 @@ private CompletionStage<EnforcedSignalAndTargetActorResponse> enforcerResponseTo
log.withCorrelationId(enforcedSignal)
.debug("Received enforcedSignal from enforcerChild, forwarding to target actor: {}",
enforcedSignal);
return askTargetActor(enforcedSignal, enforcedSignal.getDittoHeaders().isResponseRequired(), sender)
return askTargetActor(enforcedSignal, shouldSendResponse(enforcedSignal), sender)
.thenCompose(response ->
modifyTargetActorCommandResponse(enforcedSignal, response))
.thenApply(response ->
Expand All @@ -655,6 +655,16 @@ private CompletionStage<EnforcedSignalAndTargetActorResponse> enforcerResponseTo
}
}

/**
* Determines whether the passed {@code WithDittoHeaders} resolves to that a response should be sent or not.
*
* @param withDittoHeaders where to extract the DittoHeaders from.
* @return whether a response should be sent or not.
*/
protected boolean shouldSendResponse(final WithDittoHeaders withDittoHeaders) {
return withDittoHeaders.getDittoHeaders().isResponseRequired();
}

protected CompletionStage<Object> filterTargetActorResponseViaEnforcer(
final EnforcedSignalAndTargetActorResponse targetActorResponse) {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,9 @@ public static PoliciesMappingStrategies getInstance() {
private static MappingStrategies getPoliciesMappingStrategies() {
return MappingStrategiesBuilder.newInstance()
.add(Policy.class, jsonObject -> PoliciesModelFactory.newPolicy(jsonObject))
.add(PolicyTag.class, jsonObject -> PolicyTag.fromJson(jsonObject)) // do not replace with lambda!
.add(PolicyTag.class, PolicyTag::fromJson)
.add(BatchedEntityIdWithRevisions.typeOf(PolicyTag.class),
BatchedEntityIdWithRevisions.deserializer(jsonObject -> PolicyTag.fromJson(jsonObject)))
BatchedEntityIdWithRevisions.deserializer(PolicyTag::fromJson))
.putAll(GlobalMappingStrategies.getInstance())
.build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ CompletionStage<TargetActorWithMessage> dispatchLiveSignal(final Signal<?> signa
.thenApply(distributedPub -> new TargetActorWithMessage(
distributedPub.pub().getPublisher(),
distributedPub.wrappedSignalForPublication(),
distributedPub.signal().getDittoHeaders().getTimeout().orElse(DEFAULT_LIVE_TIMEOUT),
calculateLiveChannelTimeout(distributedPub.signal()),
response -> handleEncounteredAskTimeoutsAsCommandTimeoutException(
signal, distributedPub, response)
));
Expand All @@ -186,12 +186,20 @@ CompletionStage<TargetActorWithMessage> dispatchLiveSignal(final Signal<?> signa
return CompletableFuture.completedStage(new TargetActorWithMessage(
distributedPubWithMessage.pub().getPublisher(),
distributedPubWithMessage.wrappedSignalForPublication(),
signal.getDittoHeaders().getTimeout().orElse(DEFAULT_LIVE_TIMEOUT),
calculateLiveChannelTimeout(signal),
Function.identity()
));
}
}

private static Duration calculateLiveChannelTimeout(final WithDittoHeaders withDittoHeaders) {
if (withDittoHeaders.getDittoHeaders().isResponseRequired()) {
return withDittoHeaders.getDittoHeaders().getTimeout().orElse(DEFAULT_LIVE_TIMEOUT);
} else {
return Duration.ZERO;
}
}

/**
* Dispatches the passed in {@code commandResponse} which was received as "live" response globally without knowing
* the sender of the command by using the {@code responseReceiverCache} to find out the originating sender and
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@

import javax.annotation.Nullable;

import org.eclipse.ditto.base.model.acks.DittoAcknowledgementLabel;
import org.eclipse.ditto.base.model.auth.AuthorizationContext;
import org.eclipse.ditto.base.model.exceptions.DittoRuntimeExceptionBuilder;
import org.eclipse.ditto.base.model.headers.WithDittoHeaders;
import org.eclipse.ditto.base.model.signals.Signal;
import org.eclipse.ditto.base.model.signals.commands.Command;
import org.eclipse.ditto.base.model.signals.commands.CommandResponse;
Expand Down Expand Up @@ -240,7 +242,7 @@ static <T extends ThingCommandResponse<?>> T replaceAuthContext(final T response

@Override
protected CompletionStage<TargetActorWithMessage> getTargetActorForSendingEnforcedMessageTo(final Object message,
final boolean responseRequired,
final boolean shouldSendResponse,
final ActorRef sender) {

if (message instanceof CommandResponse<?> commandResponse &&
Expand All @@ -262,7 +264,7 @@ protected CompletionStage<TargetActorWithMessage> getTargetActorForSendingEnforc
return liveChannelDispatching.dispatchLiveSignal(signal, sender);
} else {

return super.getTargetActorForSendingEnforcedMessageTo(message, responseRequired, sender);
return super.getTargetActorForSendingEnforcedMessageTo(message, shouldSendResponse, sender);
}
}

Expand Down Expand Up @@ -312,6 +314,14 @@ protected Props getPersistenceEnforcerProps(final ThingId entityId) {
enforcementConfig.getAskWithRetryConfig(), policiesShardRegion);
}

@Override
protected boolean shouldSendResponse(final WithDittoHeaders withDittoHeaders) {
return withDittoHeaders.getDittoHeaders().isResponseRequired() ||
withDittoHeaders.getDittoHeaders().getAcknowledgementRequests()
.stream()
.anyMatch(ar -> DittoAcknowledgementLabel.TWIN_PERSISTED.equals(ar.getLabel()));
}

@Override
protected ShutdownBehaviour getShutdownBehaviour(final ThingId entityId) {
return ShutdownBehaviour.fromId(entityId, pubSubMediator, getSelf());
Expand Down

0 comments on commit 90b8539

Please sign in to comment.