Skip to content

Commit

Permalink
Use enforcement actor dispatcher as actor dispatcher for enforcement …
Browse files Browse the repository at this point in the history
…actors, instead of in every call to the enforcer

Signed-off-by: David Schwilk <david.schwilk@bosch.io>
  • Loading branch information
DerSchwilk committed Jul 11, 2022
1 parent a0a343a commit f34d4f3
Show file tree
Hide file tree
Showing 10 changed files with 58 additions and 90 deletions.
Expand Up @@ -22,9 +22,6 @@
import org.eclipse.ditto.internal.utils.akka.logging.DittoLoggerFactory;
import org.eclipse.ditto.internal.utils.akka.logging.ThreadSafeDittoLogger;

import akka.actor.ActorSystem;
import akka.dispatch.MessageDispatcher;

/**
* Abstract implementation of {@link EnforcementReloaded} providing common functionality of all entity specific
* enforcement implementations.
Expand All @@ -38,14 +35,6 @@ public abstract class AbstractEnforcementReloaded<S extends Signal<?>, R extends
protected static final ThreadSafeDittoLogger LOGGER =
DittoLoggerFactory.getThreadSafeLogger(AbstractEnforcementReloaded.class);

private static final String ENFORCEMENT_DISPATCHER = "enforcement-dispatcher";

protected final MessageDispatcher enforcementDispatcher;

protected AbstractEnforcementReloaded(final ActorSystem actorSystem) {
enforcementDispatcher = actorSystem.dispatchers().lookup(ENFORCEMENT_DISPATCHER);
}

/**
* Reports an error differently based on type of the error. If the error is of type
* {@link org.eclipse.ditto.base.model.exceptions.DittoRuntimeException}, it is returned as is
Expand Down
Expand Up @@ -49,8 +49,6 @@
import org.eclipse.ditto.policies.model.signals.commands.modify.PolicyModifyCommand;
import org.eclipse.ditto.policies.model.signals.commands.query.PolicyQueryCommandResponse;

import akka.actor.ActorSystem;

/**
* Authorizes {@link PolicyCommand}s and filters {@link PolicyCommandResponse}s.
*/
Expand All @@ -63,10 +61,6 @@ public final class PolicyCommandEnforcement
private static final JsonFieldSelector POLICY_QUERY_COMMAND_RESPONSE_ALLOWLIST =
JsonFactory.newFieldSelector(Policy.JsonFields.ID);

public PolicyCommandEnforcement(final ActorSystem actorSystem) {
super(actorSystem);
}

@Override
public CompletionStage<PolicyCommand<?>> authorizeSignal(final PolicyCommand<?> command,
final PolicyEnforcer policyEnforcer) {
Expand Down Expand Up @@ -100,7 +94,7 @@ public CompletionStage<PolicyCommand<?>> authorizeSignal(final PolicyCommand<?>
final String permission = Permission.READ;
authorizedCommand = CompletableFuture.supplyAsync(() -> enforcer.hasPartialPermissions(policyResourceKey,
authorizationContext,
permission), enforcementDispatcher).thenApply(hasPermission -> {
permission)).thenApply(hasPermission -> {
if (Boolean.TRUE.equals(hasPermission)) {
return command;
} else {
Expand Down Expand Up @@ -176,7 +170,7 @@ private <T extends PolicyCommand<?>> CompletionStage<Optional<T>> authorizeEntry
authorizationContext,
Permission.EXECUTE)
? Optional.of(command)
: Optional.empty(), enforcementDispatcher);
: Optional.empty());
}

private <T extends PolicyCommand<?>> CompletionStage<Optional<T>> authorizeTopLevelAction(
Expand Down Expand Up @@ -216,7 +210,7 @@ private List<CompletionStage<Label>> enforcePolicyLabels(final List<CompletionSt
return labels.stream().map(labelStage ->
labelStage.thenCompose(label -> CompletableFuture.supplyAsync(
() -> enforcer.hasUnrestrictedPermissions(asResourceKey(label, command), authorizationContext,
Permission.EXECUTE), enforcementDispatcher).thenApply(result -> {
Permission.EXECUTE)).thenApply(result -> {
if (Boolean.TRUE.equals(result)) {
return label;
} else {
Expand Down Expand Up @@ -247,7 +241,7 @@ private CompletionStage<Boolean> hasUnrestrictedWritePermission(final Enforcer e
final AuthorizationContext authorizationContext) {
return CompletableFuture.supplyAsync(() -> enforcer.hasUnrestrictedPermissions(policyResourceKey,
authorizationContext,
Permission.WRITE), enforcementDispatcher);
Permission.WRITE));
}

/**
Expand Down Expand Up @@ -282,8 +276,7 @@ private CompletableFuture<JsonObject> getJsonViewForPolicyQueryCommandResponse(f

return CompletableFuture.supplyAsync(() -> enforcer.buildJsonView(resourceKey, responseEntity,
authorizationContext,
POLICY_QUERY_COMMAND_RESPONSE_ALLOWLIST, Permissions.newInstance(Permission.READ)),
enforcementDispatcher);
POLICY_QUERY_COMMAND_RESPONSE_ALLOWLIST, Permissions.newInstance(Permission.READ)));
}

/**
Expand Down
Expand Up @@ -43,6 +43,8 @@
public final class PolicyEnforcerActor
extends AbstractEnforcerActor<PolicyId, PolicyCommand<?>, PolicyCommandResponse<?>, PolicyCommandEnforcement> {

private static final String ENFORCEMENT_DISPATCHER = "enforcement-dispatcher";

private final PolicyEnforcerProvider policyEnforcerProvider = policyId -> {
if (null == policyId) {
return CompletableFuture.completedStage(Optional.empty());
Expand All @@ -69,7 +71,8 @@ private PolicyEnforcerActor(final PolicyId policyId, final PolicyCommandEnforcem
* @return the {@link Props} to create this actor.
*/
public static Props props(final PolicyId policyId, final PolicyCommandEnforcement policyCommandEnforcement) {
return Props.create(PolicyEnforcerActor.class, policyId, policyCommandEnforcement);
return Props.create(PolicyEnforcerActor.class, policyId, policyCommandEnforcement)
.withDispatcher(ENFORCEMENT_DISPATCHER);
}

@Override
Expand Down
Expand Up @@ -112,7 +112,7 @@ protected Props getPersistenceActorProps(final PolicyId entityId) {

@Override
protected Props getPersistenceEnforcerProps(final PolicyId entityId) {
return PolicyEnforcerActor.props(entityId, new PolicyCommandEnforcement(getContext().system()));
return PolicyEnforcerActor.props(entityId, new PolicyCommandEnforcement());
}

@Override
Expand Down
Expand Up @@ -818,7 +818,7 @@ protected Props getPersistenceActorProps(final PolicyId entityId) {

@Override
protected Props getPersistenceEnforcerProps(final PolicyId entityId) {
return PolicyEnforcerActor.props(entityId, new PolicyCommandEnforcement(system));
return PolicyEnforcerActor.props(entityId, new PolicyCommandEnforcement());
}

@Override
Expand Down
Expand Up @@ -47,19 +47,13 @@
import org.eclipse.ditto.things.model.signals.commands.exceptions.ThingNotAccessibleException;
import org.eclipse.ditto.things.model.signals.events.ThingEvent;

import akka.actor.ActorSystem;

/**
* Enforces live commands (including message commands) and live events.
*/
final class LiveSignalEnforcement
extends AbstractEnforcementReloaded<Signal<?>, CommandResponse<?>>
implements ThingEnforcementStrategy {

LiveSignalEnforcement(final ActorSystem actorSystem) {
super(actorSystem);
}

@Override
public boolean isApplicable(final Signal<?> signal) {
return Command.isLiveCommand(signal) || Event.isLiveEvent(signal) ||
Expand Down Expand Up @@ -144,7 +138,7 @@ public CompletionStage<CommandResponse<?>> filterResponse(final CommandResponse<
try {
if (withEntity.getEntity().isObject()) {
result = ThingCommandEnforcement.getJsonViewForCommandResponse(withEntity.getEntity().asObject(),
commandResponse, policyEnforcer.getEnforcer(), enforcementDispatcher)
commandResponse, policyEnforcer.getEnforcer())
.thenApply(jsonViewForCommandResponse -> {
final WithEntity<?> commandResponseWithEntity =
withEntity.setEntity(jsonViewForCommandResponse);
Expand Down Expand Up @@ -172,8 +166,7 @@ private CompletionStage<Signal<?>> enforceLiveSignal(final StreamingType streami
return enforceLiveEvent(liveSignal, enforcer.getEnforcer());
case LIVE_COMMANDS:
return ThingCommandEnforcement.authorizeByPolicyOrThrow(enforcer.getEnforcer(),
(ThingCommand<?>) liveSignal,
enforcementDispatcher).thenCompose(s ->
(ThingCommand<?>) liveSignal).thenCompose(s ->
addEffectedReadSubjectsToThingLiveSignal((ThingCommand<?>) liveSignal,
enforcer.getEnforcer()).thenApply(withReadSubjects -> {
LOGGER.withCorrelationId(withReadSubjects)
Expand Down Expand Up @@ -202,31 +195,31 @@ <T extends Signal<T>> CompletionStage<T> addEffectedReadSubjectsToThingLiveSigna
final Enforcer enforcer) {

final var resourceKey = ResourceKey.newInstance(ThingConstants.ENTITY_TYPE, signal.getResourcePath());
return CompletableFuture.supplyAsync(() -> enforcer.getSubjectsWithPermission(resourceKey, Permission.READ),
enforcementDispatcher).thenApply(subjects -> {
return CompletableFuture.supplyAsync(() -> enforcer.getSubjectsWithPermission(resourceKey, Permission.READ))
.thenApply(subjects -> {

final var newHeaders = signal.getDittoHeaders()
.toBuilder()
.readGrantedSubjects(subjects.getGranted())
.readRevokedSubjects(subjects.getRevoked())
.build();
final var newHeaders = signal.getDittoHeaders()
.toBuilder()
.readGrantedSubjects(subjects.getGranted())
.readRevokedSubjects(subjects.getRevoked())
.build();

return signal.setDittoHeaders(newHeaders);
});
return signal.setDittoHeaders(newHeaders);
});
}

private CompletionStage<Signal<?>> enforceLiveEvent(final Signal<?> liveSignal,
final Enforcer enforcer) {

return CompletableFuture.supplyAsync(() -> enforcer.hasUnrestrictedPermissions(
PoliciesResourceType.thingResource(liveSignal.getResourcePath()),
liveSignal.getDittoHeaders().getAuthorizationContext(), WRITE), enforcementDispatcher)
liveSignal.getDittoHeaders().getAuthorizationContext(), WRITE))
.thenCompose(authorized -> {
if (Boolean.TRUE.equals(authorized)) {
LOGGER.withCorrelationId(liveSignal)
.info("Live Event was authorized: <{}>", liveSignal);
return ThingCommandEnforcement.addEffectedReadSubjectsToThingSignal((ThingEvent<?>) liveSignal,
enforcer, enforcementDispatcher).thenApply(s -> s);
enforcer).thenApply(s -> s);
} else {
LOGGER.withCorrelationId(liveSignal)
.info("Live Event was NOT authorized: <{}>", liveSignal);
Expand Down Expand Up @@ -254,15 +247,15 @@ private CompletionStage<Signal<?>> publishMessageCommand(final MessageCommand<?,

final ResourceKey resourceKey =
ResourceKey.newInstance(MessageCommand.RESOURCE_TYPE, command.getResourcePath());
return CompletableFuture.supplyAsync(() -> enforcer.getSubjectsWithPermission(resourceKey, Permission.READ),
enforcementDispatcher).thenApply(effectedSubjects -> {
final var headersWithReadSubjects = command.getDittoHeaders()
.toBuilder()
.readGrantedSubjects(effectedSubjects.getGranted())
.readRevokedSubjects(effectedSubjects.getRevoked())
.build();
return command.setDittoHeaders(headersWithReadSubjects);
});
return CompletableFuture.supplyAsync(() -> enforcer.getSubjectsWithPermission(resourceKey, Permission.READ))
.thenApply(effectedSubjects -> {
final var headersWithReadSubjects = command.getDittoHeaders()
.toBuilder()
.readGrantedSubjects(effectedSubjects.getGranted())
.readRevokedSubjects(effectedSubjects.getRevoked())
.build();
return command.setDittoHeaders(headersWithReadSubjects);
});
}

private MessageSendNotAllowedException rejectMessageCommand(final MessageCommand<?, ?> command) {
Expand All @@ -283,7 +276,7 @@ private MessageSendNotAllowedException rejectMessageCommand(final MessageCommand
private CompletionStage<Boolean> isAuthorized(final MessageCommand<?, ?> command, final Enforcer enforcer) {
return CompletableFuture.supplyAsync(
() -> enforcer.hasUnrestrictedPermissions(extractMessageResourceKey(command),
command.getDittoHeaders().getAuthorizationContext(), WRITE), enforcementDispatcher);
command.getDittoHeaders().getAuthorizationContext(), WRITE));
}

private static ResourceKey extractMessageResourceKey(final MessageCommand<?, ?> command) {
Expand Down

0 comments on commit f34d4f3

Please sign in to comment.