Skip to content

Commit

Permalink
simplified persistence supervisor enforcement logic
Browse files Browse the repository at this point in the history
Signed-off-by: Thomas Jaeckle <thomas.jaeckle@bosch.io>
  • Loading branch information
thjaeckle committed May 10, 2022
1 parent 64eba92 commit d1fcbd6
Showing 1 changed file with 71 additions and 57 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import akka.actor.SupervisorStrategy;
import akka.actor.Terminated;
import akka.cluster.sharding.ShardRegion;
import akka.japi.Pair;
import akka.japi.pf.ReceiveBuilder;
import akka.pattern.Patterns;

Expand Down Expand Up @@ -391,8 +392,33 @@ private void enforceAndForwardToPersistenceActor(final Object message) {
entityId, message);
unhandled(message);
} else {
// all commands must be enforced by the enforcer child, so ask all commands to it:
enforceCommandAndForwardToPersistenceActorIfAuthorized(sender, command);
enforceCommandAndForwardToPersistenceActor(command)
.whenComplete((response, throwable) -> {
if (null != throwable) {
if (!(throwable instanceof DittoRuntimeException)) {
log.withCorrelationId(command)
.warning("Encountered Throwable when interacting with enforcer or " +
"persistence child, telling sender: {}", throwable);
}

final DittoRuntimeException dre =
DittoRuntimeException.asDittoRuntimeException(throwable, t ->
DittoInternalErrorException.newBuilder()
.dittoHeaders(command.getDittoHeaders())
.cause(t)
.build());
log.withCorrelationId(dre)
.info("Received DittoRuntimeException during enforcement or persistence, " +
"telling sender: {}", dre);
sender.tell(dre, getSelf());
} else if (null != response) {
sender.tell(response, getSelf());
} else {
log.withCorrelationId(command)
.error("Received nothing when enforcing command and forwarding to " +
"persistence actor - this should not happen.");
}
});
}
} else if (null != persistenceActorChild) {
if (persistenceActorChild.equals(sender)) {
Expand All @@ -407,97 +433,85 @@ private void enforceAndForwardToPersistenceActor(final Object message) {
}
}

private CompletionStage<Object> enforceCommandAndForwardToPersistenceActorIfAuthorized(final ActorRef sender,
final Command<?> command) {
/**
* All commands must are treated in the following way:
* <ul>
* <li>they are sent to the enforcer child which enforces/applies authorization of the command</li>
* <li>after successful enforcement, they are optionally modified in {@link #modifyEnforcerActorEnforcedCommandResponse(Object)}</li>
* <li>afterwards, the enforced command is sent to the persistence actor child in {@link #enforcedCommandToPersistenceActor(org.eclipse.ditto.base.model.headers.DittoHeaders, Object)}</li>
* <li>the persistence actor's response is handled in {@link #filterPersistenceActorResponseViaEnforcer(org.eclipse.ditto.base.model.signals.commands.Command, Object)}
* where the enforcer applies optionally filtering of the response</li>
* <li>the result is returned in the CompletionStage</li>
* </ul>
*
* @param command the command to enforce and forward to the persistence actor
* @return a successful CompletionStage with the command response or a failed stage with a DittoRuntimeException as
* cause
*/
private CompletionStage<Object> enforceCommandAndForwardToPersistenceActor(final Command<?> command) {

if (null != enforcerChild) {
return preEnforcer.apply(command).thenCompose(preEnforcedCommand ->
Patterns.ask(enforcerChild, preEnforcedCommand, DEFAULT_LOCAL_ASK_TIMEOUT)
.thenCompose(this::modifyEnforcerActorEnforcedCommandResponse)
.handle((enforcerResponse, enforcerThrowable) ->
handleEnforcerResponse(sender, enforcerResponse, enforcerThrowable,
preEnforcedCommand.getDittoHeaders())
)
);

.thenCompose(enforcedCommand -> enforcedCommandToPersistenceActor(
preEnforcedCommand.getDittoHeaders(),
enforcedCommand
))
.thenCompose(persistenceActorResponse -> filterPersistenceActorResponseViaEnforcer(
null != persistenceActorResponse ? persistenceActorResponse.first() : null,
null != persistenceActorResponse ? persistenceActorResponse.second() : null
))
);
} else {
log.withCorrelationId(command)
.error("Could not enforce command because enforcerChild was not present");
return CompletableFuture.completedStage(null);
}
}

private CompletionStage<Object> handleEnforcerResponse(final ActorRef sender,
@Nullable final Object enforcerResponse,
@Nullable final Throwable enforcerThrowable,
final DittoHeaders dittoHeaders) {

if (null != enforcerThrowable) {
log.withCorrelationId(dittoHeaders)
.info("Encountered Throwable when interacting with enforcerChild, telling sender: {}",
enforcerThrowable);
final DittoRuntimeException dre =
DittoRuntimeException.asDittoRuntimeException(enforcerThrowable, throwable ->
DittoInternalErrorException.newBuilder()
.dittoHeaders(dittoHeaders)
.cause(throwable)
.build());
log.withCorrelationId(dre)
.debug("Received DittoRuntimeException from enforcerChild, telling sender: {}", dre);
sender.tell(dre, null != persistenceActorChild ? persistenceActorChild : getSelf());
} else if (null == persistenceActorChild) {
final DittoRuntimeException unavailableException = getUnavailableExceptionBuilder(entityId)
private CompletionStage<Pair<Command<?>, Object>> enforcedCommandToPersistenceActor(
final DittoHeaders dittoHeaders,
@Nullable final Object enforcerResponse
) {
if (null == persistenceActorChild) {
throw getUnavailableExceptionBuilder(entityId)
.dittoHeaders(dittoHeaders)
.build();
sender.tell(unavailableException, getSelf());
} else if (enforcerResponse instanceof Command<?> enforcedCommand) {
log.withCorrelationId(enforcedCommand)
.debug("Received enforcedCommand from enforcerChild, forwarding to persistenceActorChild: {}",
enforcedCommand);
return Patterns.ask(persistenceActorChild, enforcedCommand, DEFAULT_LOCAL_ASK_TIMEOUT)
.thenCompose(response -> modifyPersistenceActorCommandResponse(enforcedCommand, response))
.whenComplete((persistenceActorResponse, paThrowable) ->
handlePersistenceActorResponse(sender,
enforcedCommand,
persistenceActorResponse,
paThrowable
)
);
.thenApply(response -> Pair.create(enforcedCommand, response));
} else if (enforcerResponse instanceof DittoRuntimeException dre) {
log.withCorrelationId(dre)
.debug("Received DittoRuntimeException from enforcerChild, telling sender: {}", dre);
sender.tell(dre, persistenceActorChild);
log.withCorrelationId(dittoHeaders)
.debug("Received DittoRuntimeException as response from enforcerChild: {}", dre);
throw dre;
} else {
log.withCorrelationId(enforcerResponse instanceof WithDittoHeaders wdh ? wdh : null)
.warning("Unexpected response from enforcerChild: {}", enforcerResponse);
return CompletableFuture.completedStage(null);
}
return CompletableFuture.completedStage(null);
}

private void handlePersistenceActorResponse(final ActorRef sender,
private CompletionStage<Object> filterPersistenceActorResponseViaEnforcer(
@Nullable final Command<?> enforcedCommand,
@Nullable final Object persistenceActorResponse,
@Nullable final Throwable persistenceActorThrowable) {

@Nullable final Object persistenceActorResponse
) {
assert enforcerChild != null;
if (persistenceActorResponse instanceof CommandResponse<?> commandResponse) {
log.withCorrelationId(commandResponse)
.debug("Received CommandResponse from persistenceActorChild, " +
"telling enforcerChild to apply response filtering: {}", commandResponse);
enforcerChild.tell(commandResponse, sender);
return Patterns.ask(enforcerChild, commandResponse, DEFAULT_LOCAL_ASK_TIMEOUT);
} else if (persistenceActorResponse instanceof DittoRuntimeException dre) {
log.withCorrelationId(enforcedCommand)
.debug("Received DittoRuntimeException as response from persistenceActorChild, " +
"telling sender: {}", dre);
sender.tell(dre, persistenceActorChild);
} else if (null != persistenceActorThrowable) {
log.withCorrelationId(enforcedCommand)
.info("Encountered Throwable when interacting with persistenceActorChild, " +
"telling sender: {}", persistenceActorThrowable);
sender.tell(persistenceActorThrowable, persistenceActorChild);
.debug("Received DittoRuntimeException as response from persistenceActorChild: {}", dre);
throw dre;
} else {
log.withCorrelationId(enforcedCommand)
.warning("Unexpected response from persistenceActorChild: {}", persistenceActorResponse);
return CompletableFuture.completedStage(null);
}
}

Expand Down

0 comments on commit d1fcbd6

Please sign in to comment.