Skip to content

Commit

Permalink
bugfix: fixed that LiveSignalEnforcement.handleAskTimeoutForCommand()…
Browse files Browse the repository at this point in the history
… threw a 503 (ThingNotAvailable) exception and caused a race condition against a "graceful timeout"

* made the handleAskTimeoutForCommand() return an Optional and proceed with a null element in the CompletionStage if the optional was empty

Signed-off-by: Thomas Jaeckle <thomas.jaeckle@bosch.io>
  • Loading branch information
thjaeckle committed Feb 2, 2022
1 parent 2e1f6aa commit 7026ad4
Show file tree
Hide file tree
Showing 4 changed files with 59 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
*/
package org.eclipse.ditto.concierge.service.enforcement;

import java.util.Optional;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Executor;
import java.util.function.BiFunction;
Expand All @@ -22,6 +23,7 @@
import org.eclipse.ditto.base.model.signals.Signal;
import org.eclipse.ditto.base.model.signals.commands.CommandResponse;
import org.eclipse.ditto.base.model.signals.commands.ErrorResponse;
import org.eclipse.ditto.base.model.signals.commands.exceptions.GatewayInternalErrorException;
import org.eclipse.ditto.internal.utils.cacheloaders.AskWithRetry;
import org.eclipse.ditto.policies.model.enforcers.Enforcer;

Expand Down Expand Up @@ -72,7 +74,17 @@ protected CompletionStage<R> askAndBuildJsonView(
final Executor executor) {

return ask(actorToAsk, commandWithReadSubjects, "before building JsonView", scheduler, executor)
.thenApply(response -> filterJsonView(response, enforcer));
.thenApply(response -> {
if (null != response) {
return filterJsonView(response, enforcer);
} else {
log(commandWithReadSubjects).error("Response before building JsonView was null at a place " +
"where it must never be null");
throw GatewayInternalErrorException.newBuilder()
.dittoHeaders(commandWithReadSubjects.getDittoHeaders())
.build();
}
});
}

/**
Expand Down Expand Up @@ -124,7 +136,13 @@ protected CompletionStage<R> ask(
.cause(cause)
.build());
if (dre instanceof AskException) {
throw handleAskTimeoutForCommand(commandWithReadSubjects, throwable);
final Optional<DittoRuntimeException> dittoRuntimeException =
handleAskTimeoutForCommand(commandWithReadSubjects, throwable);
if (dittoRuntimeException.isPresent()) {
throw dittoRuntimeException.get();
} else {
return null;
}
} else {
throw dre;
}
Expand All @@ -146,10 +164,14 @@ protected Function<Object, R> getResponseCaster(final C commandWithReadSubjects,
return (R) response;
} else if (response instanceof ErrorResponse) {
throw ((ErrorResponse<?>) response).getDittoRuntimeException();
} else if (response instanceof AskException) {
throw handleAskTimeoutForCommand(commandWithReadSubjects, (Throwable) response);
} else if (response instanceof AskTimeoutException) {
throw handleAskTimeoutForCommand(commandWithReadSubjects, (Throwable) response);
} else if (response instanceof AskException || response instanceof AskTimeoutException) {
final Optional<DittoRuntimeException> dittoRuntimeException =
handleAskTimeoutForCommand(commandWithReadSubjects, (Throwable) response);
if (dittoRuntimeException.isPresent()) {
throw dittoRuntimeException.get();
} else {
return null;
}
} else {
throw reportErrorOrResponse(hint, response, null);
}
Expand All @@ -172,13 +194,14 @@ protected Object wrapBeforeAsk(final C command) {
/**
* Handles the {@link AskTimeoutException} when
* {@link #ask(ActorRef,Signal, String, Scheduler, Executor) asking}
* the given {@code command} by transforming it into a individual {@link DittoRuntimeException}.
* the given {@code command} by transforming it into an individual {@link DittoRuntimeException}.
* May also respond with an empty Optional if no {@link DittoRuntimeException} should be thrown at all.
*
* @param command The command that was used to ask.
* @param askTimeout the ask timeout exception.
* @return the ditto runtime exception.
* @return the DittoRuntimeException or an empty Optional if no DittoRuntimeException should be thrown.
*/
protected abstract DittoRuntimeException handleAskTimeoutForCommand(C command, Throwable askTimeout);
protected abstract Optional<DittoRuntimeException> handleAskTimeoutForCommand(C command, Throwable askTimeout);

/**
* Filters the given {@code commandResponse} by using the given {@code enforcer}.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

import java.time.Duration;
import java.time.Instant;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.function.Function;
Expand Down Expand Up @@ -54,7 +55,6 @@
import org.eclipse.ditto.things.model.signals.commands.ThingCommand;
import org.eclipse.ditto.things.model.signals.commands.exceptions.EventSendNotAllowedException;
import org.eclipse.ditto.things.model.signals.commands.exceptions.ThingNotAccessibleException;
import org.eclipse.ditto.things.model.signals.commands.exceptions.ThingUnavailableException;
import org.eclipse.ditto.things.model.signals.commands.query.ThingQueryCommand;
import org.eclipse.ditto.things.model.signals.commands.query.ThingQueryCommandResponse;
import org.eclipse.ditto.things.model.signals.events.ThingEvent;
Expand Down Expand Up @@ -103,12 +103,10 @@ private LiveSignalEnforcement(final Contextual<SignalWithEntityId<?>> context,
}

@Override
protected DittoRuntimeException handleAskTimeoutForCommand(final SignalWithEntityId<?> signal,
protected Optional<DittoRuntimeException> handleAskTimeoutForCommand(final SignalWithEntityId<?> signal,
final Throwable askTimeout) {
log().info("Live command timed out. Response may be sent by another channel: <{}>", signal);
return ThingUnavailableException.newBuilder(ThingId.of(signal.getEntityId()))
.dittoHeaders(signal.getDittoHeaders())
.build();
return Optional.empty();
}

@Override
Expand Down Expand Up @@ -452,7 +450,13 @@ static CompletionStage<ThingQueryCommandResponse<?>> adjustTimeoutAndFilterLiveQ
return Patterns.ask(liveResponseForwarder, publish, timeout)
.exceptionally(e -> e)
.thenCompose(responseCaster)
.thenApply(response -> enforcement.filterJsonView(replaceAuthContext(response, command), enforcer));
.thenApply(response -> {
if (null != response) {
return enforcement.filterJsonView(replaceAuthContext(response, command), enforcer);
} else {
return null;
}
});
}

private static boolean isAuthorized(final MessageCommand<?, ?> command, final Enforcer enforcer) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -344,12 +344,12 @@ private void invalidateCaches(final PolicyId policyId) {
}

@Override
protected DittoRuntimeException handleAskTimeoutForCommand(final PolicyCommand<?> command,
protected Optional<DittoRuntimeException> handleAskTimeoutForCommand(final PolicyCommand<?> command,
final Throwable askTimeout) {
log(command).error(askTimeout, "Timeout before building JsonView");
return PolicyUnavailableException.newBuilder(command.getEntityId())
return Optional.of(PolicyUnavailableException.newBuilder(command.getEntityId())
.dittoHeaders(command.getDittoHeaders())
.build();
.build());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -342,8 +342,17 @@ private Contextual<WithDittoHeaders> enforceThingCommandByPolicyEnforcer(
} else if (thingQueryCommand instanceof RetrieveThing && shouldRetrievePolicyWithThing(thingQueryCommand)) {
final var retrieveThing = (RetrieveThing) ensureTwinChannel(thingQueryCommand);
result = withMessageToReceiverViaAskFuture(retrieveThing, sender(), () ->
retrieveThingAndPolicy(retrieveThing, policyId, enforcer).thenCompose(response ->
doSmartChannelSelection(thingQueryCommand, response, startTime, enforcer))
retrieveThingAndPolicy(retrieveThing, policyId, enforcer).thenCompose(response -> {
if (null != response) {
return doSmartChannelSelection(thingQueryCommand, response, startTime, enforcer);
} else {
log(retrieveThing).error("Response was null at a place where it must never " +
"be null");
throw GatewayInternalErrorException.newBuilder()
.dittoHeaders(retrieveThing.getDittoHeaders())
.build();
}
})
);
} else {
final var twinCommand = ensureTwinChannel(thingQueryCommand);
Expand Down Expand Up @@ -627,12 +636,12 @@ private static RetrieveThingResponse reportAggregatedThingAndPolicy(final Retrie
* @param askTimeout the timeout exception.
*/
@Override
protected DittoRuntimeException handleAskTimeoutForCommand(final ThingCommand<?> command,
protected Optional<DittoRuntimeException> handleAskTimeoutForCommand(final ThingCommand<?> command,
final Throwable askTimeout) {
LOGGER.withCorrelationId(dittoHeaders()).error("Timeout before building JsonView", askTimeout);
return ThingUnavailableException.newBuilder(command.getEntityId())
return Optional.of(ThingUnavailableException.newBuilder(command.getEntityId())
.dittoHeaders(command.getDittoHeaders())
.build();
.build());
}

/**
Expand Down

0 comments on commit 7026ad4

Please sign in to comment.