Skip to content

Commit

Permalink
Delete the created policy if create thing enforcement fails
Browse files Browse the repository at this point in the history
Signed-off-by: Yannic Klem <Yannic.Klem@bosch.io>
  • Loading branch information
Yannic92 committed Jun 28, 2022
1 parent bcc1079 commit f96518e
Show file tree
Hide file tree
Showing 4 changed files with 132 additions and 38 deletions.
Expand Up @@ -22,6 +22,7 @@
import org.eclipse.ditto.base.model.entity.id.EntityId;
import org.eclipse.ditto.base.model.exceptions.DittoInternalErrorException;
import org.eclipse.ditto.base.model.exceptions.DittoRuntimeException;
import org.eclipse.ditto.base.model.headers.DittoHeaders;
import org.eclipse.ditto.base.model.headers.WithDittoHeaders;
import org.eclipse.ditto.base.model.signals.Signal;
import org.eclipse.ditto.base.model.signals.commands.CommandResponse;
Expand Down Expand Up @@ -178,42 +179,46 @@ private void doEnforceSignal(final S signal, final ActorRef sender) {
.thenCompose(optionalPolicyEnforcer -> optionalPolicyEnforcer
.map(policyEnforcer -> enforcement.authorizeSignal(signal, policyEnforcer))
.orElseGet(() -> enforcement.authorizeSignalWithMissingEnforcer(signal))
).handle((authorizedSignal, throwable) -> {
)
.whenComplete((authorizedSignal, throwable) -> {
if (null != authorizedSignal) {
log.withCorrelationId(authorizedSignal)
.info("Completed enforcement of message type <{}> with outcome 'success'",
authorizedSignal.getType());
sender.tell(authorizedSignal, self);
return authorizedSignal;
} else if (null != throwable) {
final DittoRuntimeException dittoRuntimeException =
DittoRuntimeException.asDittoRuntimeException(throwable, t ->
DittoInternalErrorException.newBuilder()
.cause(t)
.dittoHeaders(signal.getDittoHeaders())
.build()
);
log.withCorrelationId(dittoRuntimeException)
.info("Completed enforcement of message type <{}> with outcome 'failed' and headers: <{}>",
signal.getType(), signal.getDittoHeaders());
sender.tell(dittoRuntimeException, self);
return null;
handleAuthorizationFailure(signal, throwable, sender);
} else {
log.withCorrelationId(signal)
.warning(
"Neither authorizedSignal nor throwable were present during enforcement of signal: " +
"<{}>", signal);
return null;
}
});
} catch (final DittoRuntimeException dittoRuntimeException) {
log.withCorrelationId(dittoRuntimeException)
.info("Completed enforcement of message type <{}> with outcome 'failed' and headers: <{}>",
signal.getType(), signal.getDittoHeaders());
sender.tell(dittoRuntimeException, self);
handleAuthorizationFailure(signal, dittoRuntimeException, sender);
}
}

private void handleAuthorizationFailure(
final Signal<?> signal,
final Throwable throwable,
final ActorRef sender
) {
final DittoHeaders dittoHeaders = signal.getDittoHeaders();
final DittoRuntimeException dittoRuntimeException =
DittoRuntimeException.asDittoRuntimeException(throwable, t ->
DittoInternalErrorException.newBuilder()
.cause(t)
.dittoHeaders(dittoHeaders)
.build()
);
log.withCorrelationId(dittoRuntimeException)
.info("Completed enforcement of message type <{}> with outcome 'failed' and headers: <{}>",
signal.getType(), dittoHeaders);
sender.tell(dittoRuntimeException, getSelf());
}

/**
* Filters the response payload of the passed {@code commandResponse} using the {@code enforcement} of this actor.
* Filtered command responses are sent back to the {@code getSender()} - which is our dear parent, the Supervisor.
Expand Down
Expand Up @@ -23,6 +23,7 @@

import org.eclipse.ditto.base.model.auth.AuthorizationContext;
import org.eclipse.ditto.base.model.exceptions.DittoRuntimeException;
import org.eclipse.ditto.base.model.headers.DittoHeaderDefinition;
import org.eclipse.ditto.base.model.headers.DittoHeaders;
import org.eclipse.ditto.base.model.headers.contenttype.ContentType;
import org.eclipse.ditto.base.model.signals.FeatureToggle;
Expand All @@ -32,6 +33,8 @@
import org.eclipse.ditto.base.model.signals.commands.CommandToExceptionRegistry;
import org.eclipse.ditto.internal.utils.akka.logging.DittoLoggerFactory;
import org.eclipse.ditto.internal.utils.akka.logging.ThreadSafeDittoLogger;
import org.eclipse.ditto.internal.utils.cacheloaders.AskWithRetry;
import org.eclipse.ditto.internal.utils.cacheloaders.config.AskWithRetryConfig;
import org.eclipse.ditto.json.JsonFactory;
import org.eclipse.ditto.json.JsonFieldSelector;
import org.eclipse.ditto.json.JsonObject;
Expand All @@ -45,8 +48,13 @@
import org.eclipse.ditto.policies.enforcement.config.EnforcementConfig;
import org.eclipse.ditto.policies.model.Permissions;
import org.eclipse.ditto.policies.model.PoliciesResourceType;
import org.eclipse.ditto.policies.model.Policy;
import org.eclipse.ditto.policies.model.PolicyId;
import org.eclipse.ditto.policies.model.ResourceKey;
import org.eclipse.ditto.policies.model.enforcers.Enforcer;
import org.eclipse.ditto.policies.model.signals.commands.exceptions.PolicyNotAccessibleException;
import org.eclipse.ditto.policies.model.signals.commands.modify.DeletePolicy;
import org.eclipse.ditto.policies.model.signals.commands.modify.DeletePolicyResponse;
import org.eclipse.ditto.rql.model.ParserException;
import org.eclipse.ditto.rql.model.predicates.ast.RootNode;
import org.eclipse.ditto.rql.parser.RqlPredicateParser;
Expand All @@ -71,6 +79,10 @@
import org.eclipse.ditto.things.model.signals.commands.query.ThingQueryCommand;
import org.eclipse.ditto.things.model.signals.commands.query.ThingQueryCommandResponse;

import akka.Done;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;

/**
* Authorizes {@link ThingCommand}s and filters {@link ThingCommandResponse}s.
*/
Expand All @@ -85,14 +97,26 @@ final class ThingCommandEnforcement
*/
private static final JsonFieldSelector THING_QUERY_COMMAND_RESPONSE_ALLOWLIST =
JsonFactory.newFieldSelector(Thing.JsonFields.ID);
private final ActorSystem actorSystem;
private final ActorRef policiesShardRegion;
private final AskWithRetryConfig askWithRetryConfig;

/**
* Creates a new instance of the thing command enforcer.
*
* @param actorSystem the actor system to load config, dispatchers from.
* @param policiesShardRegion the policies shard region to load policies from and to use in order to create new
* (inline) policies when creating new things.
* @param enforcementConfig the configuration to apply for this command enforcement implementation.
*/
public ThingCommandEnforcement(final EnforcementConfig enforcementConfig) {

public ThingCommandEnforcement(
final ActorSystem actorSystem,
final ActorRef policiesShardRegion,
final EnforcementConfig enforcementConfig) {

this.actorSystem = actorSystem;
this.policiesShardRegion = policiesShardRegion;
this.askWithRetryConfig = enforcementConfig.getAskWithRetryConfig();
enforcementConfig.getSpecialLoggingInspectedNamespaces()
.forEach(loggedNamespace -> NAMESPACE_INSPECTION_LOGGERS.put(
loggedNamespace,
Expand Down Expand Up @@ -134,18 +158,26 @@ public CompletionStage<ThingCommand<?>> authorizeSignal(final ThingCommand<?> co
// for retrieving the WoT TD, assume that full TD gets returned unfiltered:
authorizedCommand = prepareThingCommandBeforeSendingToPersistence(command);
} else {
final ThingCommand<?> commandWithReadSubjects = authorizeByPolicyOrThrow(policyEnforcer.getEnforcer(),
command);
if (commandWithReadSubjects instanceof ThingQueryCommand<?> thingQueryCommand) {
authorizedCommand = prepareThingCommandBeforeSendingToPersistence(
ensureTwinChannel(thingQueryCommand)
);
} else if (commandWithReadSubjects.getDittoHeaders().getLiveChannelCondition().isPresent()) {
throw LiveChannelConditionNotAllowedException.newBuilder()
.dittoHeaders(commandWithReadSubjects.getDittoHeaders())
.build();
} else {
authorizedCommand = prepareThingCommandBeforeSendingToPersistence(commandWithReadSubjects);
try {
final ThingCommand<?> commandWithReadSubjects = authorizeByPolicyOrThrow(policyEnforcer.getEnforcer(),
command);
if (commandWithReadSubjects instanceof ThingQueryCommand<?> thingQueryCommand) {
authorizedCommand = prepareThingCommandBeforeSendingToPersistence(
ensureTwinChannel(thingQueryCommand)
);
} else if (commandWithReadSubjects.getDittoHeaders().getLiveChannelCondition().isPresent()) {
throw LiveChannelConditionNotAllowedException.newBuilder()
.dittoHeaders(commandWithReadSubjects.getDittoHeaders())
.build();
} else {
authorizedCommand = prepareThingCommandBeforeSendingToPersistence(commandWithReadSubjects);
}
} catch (final Throwable error) {
if (command instanceof CreateThing createThing && !Signal.isChannelLive(createThing)) {
return handleFailedCreateThing(createThing, policyEnforcer)
.thenCompose(done -> CompletableFuture.failedStage(error));
}
return CompletableFuture.failedStage(error);
}
}
return CompletableFuture.completedStage(authorizedCommand);
Expand Down Expand Up @@ -366,6 +398,60 @@ static <T extends ThingCommand<T>> T authorizeByPolicyOrThrow(final Enforcer enf
}
}

private CompletionStage<Done> handleFailedCreateThing(
final CreateThing createThing,
final PolicyEnforcer policyEnforcer) {
if (shouldDeletePolicy(createThing)) {
return deletePolicy(policyEnforcer.getPolicy().flatMap(Policy::getEntityId).orElseThrow(), createThing);
}
return CompletableFuture.completedStage(Done.getInstance());
}

private static boolean shouldDeletePolicy(final CreateThing createThing) {
return wasPolicyCopied(createThing)
|| wasInlinePolicyCreated(createThing)
|| wasDefaultPolicyCreated(createThing);
}

private static boolean wasPolicyCopied(final CreateThing createThing) {
return createThing.getPolicyIdOrPlaceholder().isPresent();
}

private static boolean wasInlinePolicyCreated(final CreateThing createThing) {
return createThing.getInitialPolicy().isPresent();
}

private static boolean wasDefaultPolicyCreated(final CreateThing createThing) {
return createThing.getThing().getPolicyId().isEmpty() && createThing.getPolicyIdOrPlaceholder().isEmpty();
}

private CompletionStage<Done> deletePolicy(final PolicyId policyId, final CreateThing createThing) {
final DittoHeaders dittoHeaders = createThing.getDittoHeaders();
final var dittoHeadersForCreatePolicy = DittoHeaders.newBuilder(dittoHeaders)
.removePreconditionHeaders()
.responseRequired(true)
.putHeader(DittoHeaderDefinition.DITTO_SUDO.getKey(), "true")
.build();
return doDeletePolicy(DeletePolicy.of(policyId, dittoHeadersForCreatePolicy));
}

private CompletionStage<Done> doDeletePolicy(final DeletePolicy deletePolicy) {
return AskWithRetry.askWithRetry(policiesShardRegion, deletePolicy, askWithRetryConfig, actorSystem,
this::handleDeletePolicyResponse)
.thenCompose(success -> {
if (!success) {
return doDeletePolicy(deletePolicy);
}
return CompletableFuture.completedStage(Done.getInstance());
});
}

private boolean handleDeletePolicyResponse(final Object policyResponse) {
//not accessible means already deleted and can be considered as success
return policyResponse instanceof DeletePolicyResponse || policyResponse instanceof PolicyNotAccessibleException;
}


/**
* Extend a signal by subject headers given with granted and revoked READ access.
* The subjects are provided by the given enforcer for the resource type {@link ThingConstants#ENTITY_TYPE}.
Expand Down
Expand Up @@ -24,6 +24,9 @@
import org.eclipse.ditto.things.model.Thing;
import org.eclipse.ditto.things.model.signals.commands.modify.CreateThing;

import akka.actor.ActorRef;
import akka.actor.ActorSystem;

/**
* Authorizes {@link Signal}s and filters {@link CommandResponse}s related to things by applying different included
* {@link ThingEnforcementStrategy}s.
Expand All @@ -32,11 +35,12 @@ public final class ThingEnforcement extends AbstractEnforcementReloaded<Signal<?

private final List<ThingEnforcementStrategy> enforcementStrategies;

public ThingEnforcement(final EnforcementConfig enforcementConfig) {
public ThingEnforcement(final ActorRef policiesShardRegion, final ActorSystem actorSystem,
final EnforcementConfig enforcementConfig) {

enforcementStrategies = List.of(
new LiveSignalEnforcement(),
new ThingCommandEnforcement(enforcementConfig)
new ThingCommandEnforcement(actorSystem, policiesShardRegion, enforcementConfig)
);
}

Expand Down
Expand Up @@ -300,9 +300,8 @@ protected Props getPersistenceActorProps(final ThingId entityId) {

@Override
protected Props getPersistenceEnforcerProps(final ThingId entityId) {
final ActorContext actorContext = getContext();

final ThingEnforcement thingEnforcement = new ThingEnforcement(enforcementConfig);
final ThingEnforcement thingEnforcement =
new ThingEnforcement(policiesShardRegion, getContext().getSystem(), enforcementConfig);

return ThingEnforcerActor.props(entityId, thingEnforcement, pubSubMediator, blockedNamespaces,
enforcementConfig.getAskWithRetryConfig(), policiesShardRegion, thingsShardRegion);
Expand Down

0 comments on commit f96518e

Please sign in to comment.