Skip to content

Commit

Permalink
Treat CommandTimeoutException in AcknowledgementAggregator same way as a
Browse files Browse the repository at this point in the history
receive timeout

* Both indicate that the timeout is exceeded and the aggregation should be aborted

Signed-off-by: Yannic Klem <yannic.klem@bosch.io>
  • Loading branch information
Yannic92 committed Aug 18, 2022
1 parent 3c8f28d commit 10a0fa6
Show file tree
Hide file tree
Showing 4 changed files with 27 additions and 174 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,36 +20,35 @@

import org.assertj.core.api.AbstractMapAssert;
import org.assertj.core.api.Assertions;
import org.eclipse.ditto.json.JsonObject;
import org.eclipse.ditto.base.model.headers.DittoHeaders;
import org.eclipse.ditto.base.model.json.Jsonifiable;
import org.eclipse.ditto.messages.model.AuthorizationSubjectBlockedException;
import org.eclipse.ditto.messages.model.MessageFormatInvalidException;
import org.eclipse.ditto.messages.model.MessageSendNotAllowedException;
import org.eclipse.ditto.messages.model.MessageTimeoutException;
import org.eclipse.ditto.messages.model.SubjectInvalidException;
import org.eclipse.ditto.messages.model.ThingIdInvalidException;
import org.eclipse.ditto.policies.model.PolicyId;
import org.eclipse.ditto.policies.model.SubjectId;
import org.eclipse.ditto.policies.api.PoliciesMappingStrategies;
import org.eclipse.ditto.things.api.ThingsMappingStrategies;
import org.eclipse.ditto.thingsearch.api.ThingSearchMappingStrategies;
import org.eclipse.ditto.policies.model.signals.announcements.SubjectDeletionAnnouncement;
import org.eclipse.ditto.base.model.signals.JsonParsable;
import org.eclipse.ditto.base.api.devops.signals.commands.ChangeLogLevel;
import org.eclipse.ditto.base.api.devops.signals.commands.ChangeLogLevelResponse;
import org.eclipse.ditto.base.api.devops.signals.commands.RetrieveLoggerConfig;
import org.eclipse.ditto.base.api.devops.signals.commands.RetrieveLoggerConfigResponse;
import org.eclipse.ditto.base.api.devops.signals.commands.RetrieveStatistics;
import org.eclipse.ditto.base.api.devops.signals.commands.RetrieveStatisticsDetails;
import org.eclipse.ditto.base.api.devops.signals.commands.RetrieveStatisticsResponse;
import org.eclipse.ditto.base.model.headers.DittoHeaders;
import org.eclipse.ditto.base.model.json.Jsonifiable;
import org.eclipse.ditto.base.model.signals.JsonParsable;
import org.eclipse.ditto.json.JsonObject;
import org.eclipse.ditto.messages.model.AuthorizationSubjectBlockedException;
import org.eclipse.ditto.messages.model.MessageFormatInvalidException;
import org.eclipse.ditto.messages.model.MessageSendNotAllowedException;
import org.eclipse.ditto.messages.model.SubjectInvalidException;
import org.eclipse.ditto.messages.model.ThingIdInvalidException;
import org.eclipse.ditto.messages.model.signals.commands.SendClaimMessage;
import org.eclipse.ditto.messages.model.signals.commands.SendClaimMessageResponse;
import org.eclipse.ditto.messages.model.signals.commands.SendFeatureMessage;
import org.eclipse.ditto.messages.model.signals.commands.SendFeatureMessageResponse;
import org.eclipse.ditto.messages.model.signals.commands.SendMessageAcceptedResponse;
import org.eclipse.ditto.messages.model.signals.commands.SendThingMessage;
import org.eclipse.ditto.messages.model.signals.commands.SendThingMessageResponse;
import org.eclipse.ditto.policies.api.PoliciesMappingStrategies;
import org.eclipse.ditto.policies.model.PolicyId;
import org.eclipse.ditto.policies.model.SubjectId;
import org.eclipse.ditto.policies.model.signals.announcements.SubjectDeletionAnnouncement;
import org.eclipse.ditto.things.api.ThingsMappingStrategies;
import org.eclipse.ditto.thingsearch.api.ThingSearchMappingStrategies;
import org.junit.Before;
import org.junit.Test;

Expand Down Expand Up @@ -93,7 +92,6 @@ public void messagesStrategiesAreKnown() {
.knows(AuthorizationSubjectBlockedException.ERROR_CODE)
.knows(MessageFormatInvalidException.ERROR_CODE)
.knows(MessageSendNotAllowedException.ERROR_CODE)
.knows(MessageTimeoutException.ERROR_CODE)
.knows(SubjectInvalidException.ERROR_CODE)
.knows(ThingIdInvalidException.ERROR_CODE);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,7 @@ public Receive createReceive() {
.match(Acknowledgement.class, this::handleAcknowledgement)
.match(Acknowledgements.class, this::handleAcknowledgements)
.match(CommandResponse.class, this::handleCommandResponse)
.match(CommandTimeoutException.class, this::handleCommandTimeoutException)
.match(DittoRuntimeException.class, this::handleDittoRuntimeException)
.matchEquals(Control.WAITING_FOR_ACKS_TIMED_OUT, this::handleReceiveTimeout)
.matchAny(m -> log.warning("Received unexpected message: <{}>", m))
Expand Down Expand Up @@ -332,6 +333,15 @@ private void handleAcknowledgements(final Acknowledgements acknowledgements) {
potentiallyCompleteAcknowledgements(null);
}

private void handleCommandTimeoutException(final CommandTimeoutException commandTimeoutException) {
/*
When a command timeout exception is received, this indicates that the acknowledgements should no longer be
awaited. This is a rare case and can happen for example when the timeout for a live message hits before the
receive timeout does.
*/
this.handleReceiveTimeout(Control.WAITING_FOR_ACKS_TIMED_OUT);
}

private void handleDittoRuntimeException(final DittoRuntimeException dittoRuntimeException) {
log.withCorrelationId(correlationId)
.info("Stopped waiting for acknowledgements because of ditto runtime exception <{}>.",
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
package org.eclipse.ditto.things.service.persistence.actors;

import java.time.Duration;
import java.time.Instant;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.CompletionStage;
Expand Down Expand Up @@ -138,12 +137,9 @@ TargetActorWithMessage prepareForPubSubPublishing(final ThingQueryCommand<?> thi
final ActorRef receiver,
final UnaryOperator<Object> responseOrErrorConverter) {

final var startTime = Instant.now();
final var timeout = getAdjustedTimeout(thingQueryCommand, startTime);
final var signalWithAdjustedTimeout = adjustTimeout(thingQueryCommand, timeout);
final var timeout = calculateLiveChannelTimeout(thingQueryCommand.getDittoHeaders());
final var pub = liveSignalPub.command();
final var publish =
pub.wrapForPublicationWithAcks(signalWithAdjustedTimeout, THING_COMMAND_ACK_EXTRACTOR);
final var publish = pub.wrapForPublicationWithAcks(thingQueryCommand, THING_COMMAND_ACK_EXTRACTOR);

return new TargetActorWithMessage(
receiver,
Expand Down Expand Up @@ -292,17 +288,6 @@ private ActorRef createLiveResponseReceiverActor(final ThingQueryCommand<?> thin
return actorRefFactory.actorOf(props);
}

private static Duration getAdjustedTimeout(final Signal<?> signal, final Instant startTime) {

final var baseTimeout = getLiveSignalTimeout(signal);
final var adjustedTimeout = baseTimeout.minus(Duration.between(startTime, Instant.now()));
return adjustedTimeout.minus(MIN_LIVE_TIMEOUT).isNegative() ? MIN_LIVE_TIMEOUT : adjustedTimeout;
}

static Duration getLiveSignalTimeout(final Signal<?> signal) {
return signal.getDittoHeaders().getTimeout().orElse(DEFAULT_LIVE_TIMEOUT);
}

private static ThingCommand<?> adjustTimeout(final ThingCommand<?> signal, final Duration adjustedTimeout) {

return signal.setDittoHeaders(
Expand Down

0 comments on commit 10a0fa6

Please sign in to comment.