Skip to content

Commit

Permalink
Merge pull request #873 from bosch-io/bugfix/concierge-metrics-timeouts
Browse files Browse the repository at this point in the history
fixed exception handling in AbstractEnforcement
  • Loading branch information
Yannic92 committed Nov 6, 2020
2 parents 66c88e6 + 3e256a0 commit a7609ec
Show file tree
Hide file tree
Showing 5 changed files with 204 additions and 183 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@

import java.time.Duration;
import java.util.Objects;
import java.util.concurrent.CompletionException;
import java.util.concurrent.CompletionStage;
import java.util.function.BiFunction;
import java.util.function.Function;
Expand Down Expand Up @@ -45,19 +44,19 @@
* check if the passed in {@code signal} is authorized and forward it accordingly or respond with an error to the passed
* in {@code sender}.
*/
public abstract class AbstractEnforcement<T extends Signal<?>> {
public abstract class AbstractEnforcement<C extends Signal<?>> {

/**
* Context of the enforcement step: sender, self, signal and so forth.
*/
protected final Contextual<T> context;
protected final Contextual<C> context;

/**
* Create an enforcement step from its context.
*
* @param context the context of the enforcement step.
*/
protected AbstractEnforcement(final Contextual<T> context) {
protected AbstractEnforcement(final Contextual<C> context) {
this.context = context;
}

Expand All @@ -83,70 +82,64 @@ public CompletionStage<Contextual<WithDittoHeaders>> enforceSafely() {
return enforce().handle(handleEnforcementCompletion());
}

/**
* Handle error for a future message such that failed futures become a completed future with DittoRuntimeException.
*
* @param throwable error of the future on failure.
* @return the DittoRuntimeException.
*/
private DittoRuntimeException convertError(@Nullable final Throwable throwable) {
final Throwable error = throwable instanceof CompletionException
? throwable.getCause()
: throwable != null
? throwable
: new NullPointerException("Result and error are both null");
return reportError("Error thrown during enforcement", error);
}

private BiFunction<Contextual<WithDittoHeaders>, Throwable, Contextual<WithDittoHeaders>> handleEnforcementCompletion() {
return (result, throwable) -> {
context.getStartedTimer()
.map(startedTimer -> startedTimer.tag("outcome", throwable != null ? "fail" : "success"))
.ifPresent(StartedTimer::stop);
return Objects.requireNonNullElseGet(result,
() -> withMessageToReceiver(convertError(throwable), sender()));
() -> withMessageToReceiver(reportError("Error thrown during enforcement", throwable), sender()));
};
}

/**
* Report unexpected error or unknown response.
*/
protected DittoRuntimeException reportUnexpectedErrorOrResponse(final String hint, final Object response,
protected DittoRuntimeException reportErrorOrResponse(final String hint, @Nullable final Object response,
@Nullable final Throwable error) {

if (error != null) {
return reportUnexpectedError(hint, error);
} else {
return reportError(hint, error);
} else if(response instanceof Throwable){
return reportError(hint, (Throwable) response);
} else if (response != null) {
return reportUnknownResponse(hint, response);
} else {
return reportError(hint, new NullPointerException("Response and error were null."));
}
}

/**
* Reports an error differently based on type of the error. If the error is of type
* {@link org.eclipse.ditto.model.base.exceptions.DittoRuntimeException}, it is send to the {@code sender}
* without modification, otherwise it is wrapped inside a {@link GatewayInternalErrorException}.
* {@link org.eclipse.ditto.model.base.exceptions.DittoRuntimeException}, it is returned as is
* (without modification), otherwise it is wrapped inside a {@link GatewayInternalErrorException}.
*
* @param hint hint about the nature of the error.
* @param error the error.
* @param throwable the error.
* @return DittoRuntimeException suitable for transmission of the error.
*/
protected DittoRuntimeException reportError(final String hint, final Throwable error) {
if (error instanceof DittoRuntimeException) {
log().info("{} - {}: {}", hint, error.getClass().getSimpleName(), error.getMessage());
return (DittoRuntimeException) error;
} else {
return reportUnexpectedError(hint, error);
}
protected DittoRuntimeException reportError(final String hint, @Nullable final Throwable throwable) {
final Throwable error = throwable == null
? new NullPointerException("Result and error are both null")
: throwable;
final DittoRuntimeException dre = DittoRuntimeException
.asDittoRuntimeException(error, cause -> reportUnexpectedError(hint, cause));
log().info("{} - {}: {}", hint, dre.getClass().getSimpleName(), dre.getMessage());
return dre;
}


/**
* Report unexpected error.
*/
protected DittoRuntimeException reportUnexpectedError(final String hint, final Throwable error) {
private DittoRuntimeException reportUnexpectedError(final String hint, final Throwable error) {
log().error(error, "Unexpected error {} - {}: {}", hint, error.getClass().getSimpleName(),
error.getMessage());

return mapToExternalException(error);
return GatewayInternalErrorException.newBuilder()
.cause(error)
.dittoHeaders(dittoHeaders())
.build();
}

/**
Expand All @@ -158,19 +151,6 @@ protected GatewayInternalErrorException reportUnknownResponse(final String hint,
return GatewayInternalErrorException.newBuilder().dittoHeaders(dittoHeaders()).build();
}

private DittoRuntimeException mapToExternalException(final Throwable error) {
if (error instanceof GatewayInternalErrorException) {
return (GatewayInternalErrorException) error;
} else {
log().error(error, "Unexpected non-DittoRuntimeException error - responding with " +
"GatewayInternalErrorException - {} :{}", error.getClass().getSimpleName(), error.getMessage());
return GatewayInternalErrorException.newBuilder()
.cause(error)
.dittoHeaders(dittoHeaders())
.build();
}
}

/**
* Extend a signal by subject headers given with granted and revoked READ access.
* The subjects are provided by the given enforcer for the resource type {@link ThingConstants#ENTITY_TYPE}.
Expand Down Expand Up @@ -263,7 +243,7 @@ protected ActorRef sender() {
/**
* @return the sent Signal of subtype {@code <T>}
*/
protected T signal() {
protected C signal() {
return context.getMessage();
}

Expand Down Expand Up @@ -300,7 +280,7 @@ protected <T> Contextual<WithDittoHeaders> withMessageToReceiverViaAskFuture(fin
return withMessageToReceiver(message, receiver)
.withAskFuture(() -> askFutureWithoutErrorHandling.get()
.<Object>thenApply(x -> x)
.exceptionally(this::convertError)
.exceptionally(error -> this.reportError("Error thrown during enforcement", error))
);
}

Expand All @@ -321,16 +301,6 @@ protected <S extends WithDittoHeaders> Contextual<S> withMessageToReceiver(final
return context.withMessage(message).withReceiver(receiver).withReceiverWrapperFunction(wrapperFunction);
}

/**
* Sets the {@code null} receiver to the {@link #context} meaning that no message at all is emitted. Therefore
* the {@code message} may also stay {@code null}.
*
* @return the adjusted context.
*/
protected <S extends WithDittoHeaders> Contextual<S> withoutReceiver(final S message) {
return context.withMessage(message).withReceiver(null);
}

/**
* @return the DittoHeaders of the sent {@link #signal()}
*/
Expand All @@ -345,26 +315,4 @@ protected ActorRef conciergeForwarder() {
return context.getConciergeForwarder();
}

/**
* Handle the passed {@code throwable} by sending it to the {@link #context}'s sender.
*
* @param throwable the occurred throwable (most likely a {@link DittoRuntimeException}) to send to the sender.
* @return the built contextual including the DittoRuntimeException.
*/
protected Contextual<WithDittoHeaders> handleExceptionally(final Throwable throwable) {
final Contextual<T> newContext = context.withReceiver(context.getSender());

final DittoRuntimeException dittoRuntimeException =
DittoRuntimeException.asDittoRuntimeException(throwable,
cause -> {
log().error(cause, "Unexpected non-DittoRuntimeException");
return GatewayInternalErrorException.newBuilder()
.cause(cause)
.dittoHeaders(context.getDittoHeaders())
.build();
});

return newContext.withMessage(dittoRuntimeException);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
/*
* Copyright (c) 2020 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0
*
* SPDX-License-Identifier: EPL-2.0
*/
package org.eclipse.ditto.services.concierge.enforcement;

import java.util.concurrent.CompletionStage;

import org.eclipse.ditto.model.base.exceptions.DittoRuntimeException;
import org.eclipse.ditto.model.enforcers.Enforcer;
import org.eclipse.ditto.signals.base.Signal;
import org.eclipse.ditto.signals.commands.base.CommandResponse;
import org.eclipse.ditto.signals.commands.base.ErrorResponse;

import akka.actor.ActorRef;
import akka.pattern.AskTimeoutException;
import akka.pattern.Patterns;

/**
* Adds a common handling to ask an actor for a response and automatically filter the responses JSON view by an
* enforcer.
*
* @param <C> The command type.
* @param <R> The response type.
*/
public abstract class AbstractEnforcementWithAsk<C extends Signal<?>, R extends CommandResponse>
extends AbstractEnforcement<C> {

private final Class<R> responseClass;

/**
* Create an enforcement step from its context.
*
* @param context the context of the enforcement step.
*/
protected AbstractEnforcementWithAsk(final Contextual<C> context, final Class<R> responseClass) {
super(context);
this.responseClass = responseClass;
}

/**
* Asks the given {@code actorToAsk} for a response by telling {@code commandWithReadSubjects}.
* The response is then be filtered by using the {@code enforcer}.
*
* @param actorToAsk the actor that should be asked.
* @param commandWithReadSubjects the command that is used to ask.
* @param enforcer the enforced used to filter the JSON view.
* @return A completion stage which either completes with a filtered response of type {@link R} or fails with a
* {@link DittoRuntimeException}.
*/
protected CompletionStage<R> askAndBuildJsonView(
final ActorRef actorToAsk,
final C commandWithReadSubjects,
final Enforcer enforcer) {

return ask(actorToAsk, commandWithReadSubjects, "before building JsonView")
.thenApply(response -> filterJsonView(response, enforcer));
}

/**
* Asks the given {@code actorToAsk} for a response by telling {@code commandWithReadSubjects}.
*
* @param actorToAsk the actor that should be asked.
* @param commandWithReadSubjects the command that is used to ask.
* @param hint used for logging purposes.
* @return A completion stage which either completes with a filtered response of type {@link R} or fails with a
* {@link DittoRuntimeException}.
*/
@SuppressWarnings("unchecked") // We can ignore this warning since it is tested that response class is assignable
protected CompletionStage<R> ask(
final ActorRef actorToAsk,
final C commandWithReadSubjects,
final String hint) {

return Patterns.ask(actorToAsk, wrapBeforeAsk(commandWithReadSubjects), getAskTimeout())
.handle((response, error) ->
{
if (response != null && responseClass.isAssignableFrom(response.getClass())) {
return (R) response;
} else if (response instanceof AskTimeoutException) {
throw handleAskTimeoutForCommand(commandWithReadSubjects, (AskTimeoutException) response);
} else if (error instanceof AskTimeoutException) {
throw handleAskTimeoutForCommand(commandWithReadSubjects, (AskTimeoutException) error);
} else if (response instanceof ErrorResponse) {
throw ((ErrorResponse<?>) response).getDittoRuntimeException();
} else {
throw reportErrorOrResponse(hint, response, error);
}
});
}

/**
* Allows to wrap an command into something different before {@link #ask(ActorRef, Signal, String) asking}.
* Useful if the {@link ActorRef actor} that should be asked is the pubsub mediator and the command therefore needs
* to be wrapped into {@link akka.cluster.pubsub.DistributedPubSubMediator.Send }.
*
* @param command command to wrap.
* @return the wrapped command.
*/
protected Object wrapBeforeAsk(final C command) {
return command;
}

/**
* Handles the {@link AskTimeoutException} when {@link #ask(ActorRef, Signal, String) asking} the given
* {@code command} by transforming it into a individual {@link DittoRuntimeException}.
*
* @param command The command that was used to ask.
* @param askTimeout the ask timeout exception.
* @return the ditto runtime exception.
*/
protected abstract DittoRuntimeException handleAskTimeoutForCommand(C command, AskTimeoutException askTimeout);

/**
* Filters the given {@code commandResponse} by using the given {@code enforcer}.
*
* @param commandResponse the command response that needs to be filtered.
* @param enforcer the enforcer that should be used for filtering.
* @return the filtered command response.
*/
protected abstract R filterJsonView(R commandResponse, Enforcer enforcer);

}
Original file line number Diff line number Diff line change
Expand Up @@ -133,10 +133,9 @@ public CompletionStage<Contextual<WithDittoHeaders>> enforce() {
LogUtil.enhanceLogWithCorrelationIdOrRandom(liveSignal);
return enforcerRetriever.retrieve(entityId(), (enforcerKeyEntry, enforcerEntry) -> {
try {
return doEnforce(liveSignal, enforcerEntry)
.exceptionally(this::handleExceptionally);
return doEnforce(liveSignal, enforcerEntry);
} catch (final RuntimeException e) {
return CompletableFuture.completedFuture(handleExceptionally(e));
return CompletableFuture.failedStage(e);
}
});
}
Expand Down
Loading

0 comments on commit a7609ec

Please sign in to comment.