Skip to content

Commit

Permalink
Adjust the command response enforcement exception handling
Browse files Browse the repository at this point in the history
Currently, failed policy enforcer loading leads to error logging and 500 exceptions. This case happens when the corresponding policy is deleted before command response enforcement. Thus adjusted the error to 404 and removed error logging.

Additionally, the command response exceptions always get wrapped in an internal exception. Adjusted that exceptions which are already DRE are thrown as is.

Signed-off-by: David Schwilk <david.schwilk@bosch.io>
  • Loading branch information
DerSchwilk committed Sep 13, 2022
1 parent 73b5cc8 commit 8fc48d6
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 17 deletions.
Expand Up @@ -603,7 +603,7 @@ private void enforceAndForwardToTargetActor(final Object message) {
.whenComplete((result, error) -> handleOptionalTransformationException(signal, error, sender))
.thenCompose(transformed -> enforceSignalAndForwardToTargetActor((S) transformed, sender)
.whenComplete((response, throwable) ->
handleSignalEnforcementResponse(response, throwable, transformed, sender)
handleSignalEnforcementResponse(response, throwable, transformed, sender)
))
.handle((response, throwable) -> Control.PROCESS_NEXT_TWIN_MESSAGE);
Patterns.pipe(syncCs, getContext().getDispatcher()).pipeTo(getSelf(), getSelf());
Expand Down Expand Up @@ -639,16 +639,7 @@ private void handleSignalEnforcementResponse(@Nullable final Object response,
final ActorRef sender) {

if (null != throwable) {
final DittoRuntimeException dre =
DittoRuntimeException.asDittoRuntimeException(throwable, t -> {
log.withCorrelationId(signal)
.warning("Encountered Throwable when interacting with enforcer " +
"or target actor, telling sender: {}", throwable);
return DittoInternalErrorException.newBuilder()
.dittoHeaders(signal.getDittoHeaders())
.cause(t)
.build();
});
final DittoRuntimeException dre = getEnforcementExceptionAsRuntimeException(throwable, signal);
log.withCorrelationId(dre)
.info("Received DittoRuntimeException during enforcement or " +
"forwarding to target actor, telling sender: {}", dre);
Expand All @@ -665,6 +656,26 @@ private void handleSignalEnforcementResponse(@Nullable final Object response,
}
}

private DittoRuntimeException getEnforcementExceptionAsRuntimeException(final Throwable throwable,
final WithDittoHeaders signal) {

final DittoRuntimeException dre;
if (throwable instanceof DittoRuntimeException dittoRuntimeException) {
dre = dittoRuntimeException;
} else {
dre = DittoRuntimeException.asDittoRuntimeException(throwable, t -> {
log.withCorrelationId(signal)
.warning("Encountered Throwable when interacting with enforcer " +
"or target actor, telling sender: {}", throwable);
return DittoInternalErrorException.newBuilder()
.dittoHeaders(signal.getDittoHeaders())
.cause(t)
.build();
});
}
return dre;
}

/**
* All signals are treated in the following way:
* <ul>
Expand Down
Expand Up @@ -35,9 +35,11 @@
import org.eclipse.ditto.internal.utils.tracing.instruments.trace.StartedTrace;
import org.eclipse.ditto.policies.enforcement.pre.PreEnforcerProvider;
import org.eclipse.ditto.policies.model.PolicyId;
import org.eclipse.ditto.policies.model.signals.commands.exceptions.PolicyNotAccessibleException;

import akka.actor.ActorRef;
import akka.cluster.pubsub.DistributedPubSubMediator;
import akka.japi.Pair;
import akka.japi.pf.ReceiveBuilder;
import akka.pattern.Patterns;

Expand Down Expand Up @@ -217,14 +219,14 @@ private void replyWithFilteredCommandResponse(final R commandResponse) {
*/
private CompletionStage<R> filterResponse(final R commandResponse) {
if (enforcement.shouldFilterCommandResponse(commandResponse)) {
return loadPolicyEnforcer(commandResponse)
.thenApply(optionalPolicyEnforcer -> optionalPolicyEnforcer.orElseThrow(
return providePolicyIdForEnforcement(commandResponse)
.thenCompose(id -> providePolicyEnforcer(id).thenApply(enforcer -> Pair.apply(id, enforcer)))
.thenApply(pair -> pair.second().orElseThrow(
() -> {
log.withCorrelationId(commandResponse)
.error("Could not filter command response because policyEnforcer was missing");
throw DittoInternalErrorException.newBuilder()
.dittoHeaders(commandResponse.getDittoHeaders())
.build();
.debug("Could not filter command response because policyEnforcer was missing." +
" Likely the policy was deleted during command processing.");
throw PolicyNotAccessibleException.newBuilder(pair.first()).build();
}))
.thenCompose(policyEnforcer -> doFilterResponse(commandResponse, policyEnforcer));
} else {
Expand Down

0 comments on commit 8fc48d6

Please sign in to comment.