Skip to content

Commit

Permalink
Use separate dispatcher for access on enforcers
Browse files Browse the repository at this point in the history
Signed-off-by: David Schwilk <david.schwilk@bosch.io>
  • Loading branch information
DerSchwilk committed Jul 8, 2022
1 parent 33d6e2b commit e795e70
Show file tree
Hide file tree
Showing 9 changed files with 360 additions and 213 deletions.
Expand Up @@ -22,6 +22,9 @@
import org.eclipse.ditto.internal.utils.akka.logging.DittoLoggerFactory;
import org.eclipse.ditto.internal.utils.akka.logging.ThreadSafeDittoLogger;

import akka.actor.ActorSystem;
import akka.dispatch.MessageDispatcher;

/**
* Abstract implementation of {@link EnforcementReloaded} providing common functionality of all entity specific
* enforcement implementations.
Expand All @@ -35,6 +38,14 @@ public abstract class AbstractEnforcementReloaded<S extends Signal<?>, R extends
protected static final ThreadSafeDittoLogger LOGGER =
DittoLoggerFactory.getThreadSafeLogger(AbstractEnforcementReloaded.class);

private static final String ENFORCEMENT_DISPATCHER = "enforcement-dispatcher";

protected final MessageDispatcher enforcementDispatcher;

protected AbstractEnforcementReloaded(final ActorSystem actorSystem) {
enforcementDispatcher = actorSystem.dispatchers().lookup(ENFORCEMENT_DISPATCHER);
}

/**
* Reports an error differently based on type of the error. If the error is of type
* {@link org.eclipse.ditto.base.model.exceptions.DittoRuntimeException}, it is returned as is
Expand Down
5 changes: 5 additions & 0 deletions policies/enforcement/src/main/resources/reference.conf
Expand Up @@ -6,6 +6,11 @@ enforcement-cache-dispatcher {
executor = "thread-pool-executor"
}

enforcement-dispatcher {
type = "Dispatcher"
executor = "thread-pool-executor"
}

ditto.policies-enforcer-cache {
enabled = false
enabled = ${?POLICIES_ENFORCER_CACHING_ENABLED}
Expand Down
Expand Up @@ -12,6 +12,7 @@
*/
package org.eclipse.ditto.policies.service.enforcement;

import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
Expand Down Expand Up @@ -48,6 +49,8 @@
import org.eclipse.ditto.policies.model.signals.commands.modify.PolicyModifyCommand;
import org.eclipse.ditto.policies.model.signals.commands.query.PolicyQueryCommandResponse;

import akka.actor.ActorSystem;

/**
* Authorizes {@link PolicyCommand}s and filters {@link PolicyCommandResponse}s.
*/
Expand All @@ -60,6 +63,10 @@ public final class PolicyCommandEnforcement
private static final JsonFieldSelector POLICY_QUERY_COMMAND_RESPONSE_ALLOWLIST =
JsonFactory.newFieldSelector(Policy.JsonFields.ID);

public PolicyCommandEnforcement(final ActorSystem actorSystem) {
super(actorSystem);
}

@Override
public CompletionStage<PolicyCommand<?>> authorizeSignal(final PolicyCommand<?> command,
final PolicyEnforcer policyEnforcer) {
Expand All @@ -72,43 +79,52 @@ public CompletionStage<PolicyCommand<?>> authorizeSignal(final PolicyCommand<?>
final Enforcer enforcer = policyEnforcer.getEnforcer();
final var policyResourceKey = PoliciesResourceType.policyResource(command.getResourcePath());
final var authorizationContext = command.getDittoHeaders().getAuthorizationContext();
final PolicyCommand<?> authorizedCommand;
final CompletionStage<PolicyCommand<?>> authorizedCommand;
if (command instanceof CreatePolicy createPolicy) {
authorizedCommand = authorizeCreatePolicy(enforcer, createPolicy, policyResourceKey, authorizationContext);
} else if (command instanceof PolicyActionCommand) {
authorizedCommand = authorizeActionCommand(policyEnforcer, command, policyResourceKey, authorizationContext)
.orElseThrow(() -> errorForPolicyCommand(command));
authorizedCommand = authorizeActionCommand(policyEnforcer, command, policyResourceKey,
authorizationContext).thenApply(
optional -> optional.orElseThrow(() -> errorForPolicyCommand(command)));
} else if (command instanceof PolicyModifyCommand) {
if (hasUnrestrictedWritePermission(enforcer, policyResourceKey, authorizationContext)) {
authorizedCommand = command;
} else {
throw errorForPolicyCommand(command);
}
authorizedCommand =
hasUnrestrictedWritePermission(enforcer, policyResourceKey, authorizationContext).thenApply(
hasPermission -> {
if (Boolean.TRUE.equals(hasPermission)) {
return command;
} else {
throw errorForPolicyCommand(command);
}
});
} else {
final String permission = Permission.READ;
if (enforcer.hasPartialPermissions(policyResourceKey, authorizationContext, permission)) {
authorizedCommand = command;
} else {
throw errorForPolicyCommand(command);
}
authorizedCommand = CompletableFuture.supplyAsync(() -> enforcer.hasPartialPermissions(policyResourceKey,
authorizationContext,
permission), enforcementDispatcher).thenApply(hasPermission -> {
if (Boolean.TRUE.equals(hasPermission)) {
return command;
} else {
throw errorForPolicyCommand(command);
}
});
}

return CompletableFuture.completedStage(authorizedCommand);
return authorizedCommand;
}

private PolicyCommand<?> authorizeCreatePolicy(final Enforcer enforcer,
private CompletionStage<PolicyCommand<?>> authorizeCreatePolicy(final Enforcer enforcer,
final CreatePolicy createPolicy,
final ResourceKey policyResourceKey,
final AuthorizationContext authorizationContext) {

final PolicyCommand<?> authorizedCommand;
if (createPolicy.getDittoHeaders().isAllowPolicyLockout()
|| hasUnrestrictedWritePermission(enforcer, policyResourceKey, authorizationContext)) {
authorizedCommand = createPolicy;
} else {
throw errorForPolicyCommand(createPolicy);
}
return authorizedCommand;
return hasUnrestrictedWritePermission(enforcer, policyResourceKey, authorizationContext).thenApply(hasPermission -> {
if (Boolean.TRUE.equals(hasPermission) || createPolicy.getDittoHeaders().isAllowPolicyLockout()) {
return createPolicy;
} else {
throw errorForPolicyCommand(createPolicy);
}
});

}

@Override
Expand All @@ -127,93 +143,147 @@ public boolean shouldFilterCommandResponse(final PolicyCommandResponse<?> comman
public CompletionStage<PolicyCommandResponse<?>> filterResponse(final PolicyCommandResponse<?> commandResponse,
final PolicyEnforcer policyEnforcer) {

final CompletionStage<PolicyCommandResponse<?>> result;
if (commandResponse instanceof PolicyQueryCommandResponse<?> policyQueryCommandResponse) {
try {
return CompletableFuture.completedStage(
buildJsonViewForPolicyQueryCommandResponse(policyQueryCommandResponse,
policyEnforcer.getEnforcer())
);
result = buildJsonViewForPolicyQueryCommandResponse(policyQueryCommandResponse,
policyEnforcer.getEnforcer()).thenApply(cr -> cr);
} catch (final RuntimeException e) {
throw reportError("Error after building JsonView", e, commandResponse.getDittoHeaders());
}
} else {
// no filtering required for non PolicyQueryCommandResponses:
return CompletableFuture.completedStage(commandResponse);
result = CompletableFuture.completedStage(commandResponse);
}
return result;
}

@SuppressWarnings("unchecked")
private static <T extends PolicyCommand<?>> Optional<T> authorizeActionCommand(final PolicyEnforcer enforcer,
private <T extends PolicyCommand<?>> CompletionStage<Optional<T>> authorizeActionCommand(
final PolicyEnforcer enforcer,
final T command, final ResourceKey resourceKey, final AuthorizationContext authorizationContext) {

if (command instanceof TopLevelPolicyActionCommand topLevelPolicyActionCommand) {
return (Optional<T>) authorizeTopLevelAction(enforcer, topLevelPolicyActionCommand, authorizationContext);
return authorizeTopLevelAction(enforcer, topLevelPolicyActionCommand,
authorizationContext);
} else {
return authorizeEntryLevelAction(enforcer.getEnforcer(), command, resourceKey, authorizationContext);
}
}

private static <T extends PolicyCommand<?>> Optional<T> authorizeEntryLevelAction(final Enforcer enforcer,
private <T extends PolicyCommand<?>> CompletionStage<Optional<T>> authorizeEntryLevelAction(final Enforcer enforcer,
final T command, final ResourceKey resourceKey, final AuthorizationContext authorizationContext) {
return enforcer.hasUnrestrictedPermissions(resourceKey, authorizationContext, Permission.EXECUTE)
return CompletableFuture.supplyAsync(() -> enforcer.hasUnrestrictedPermissions(resourceKey,
authorizationContext,
Permission.EXECUTE)
? Optional.of(command)
: Optional.empty();
: Optional.empty(), enforcementDispatcher);
}

private static Optional<TopLevelPolicyActionCommand> authorizeTopLevelAction(final PolicyEnforcer policyEnforcer,
private <T extends PolicyCommand<?>> CompletionStage<Optional<T>> authorizeTopLevelAction(
final PolicyEnforcer policyEnforcer,
final TopLevelPolicyActionCommand command, final AuthorizationContext authorizationContext) {

final var enforcer = policyEnforcer.getEnforcer();
final List<Label> authorizedLabels = policyEnforcer.getPolicy()

final List<CompletionStage<Label>> labels = getLabelsFromPolicyEnforcer(policyEnforcer);
final var enforcedLabels = enforcePolicyLabels(labels, enforcer, command, authorizationContext);
final var authorizedLabels = filterAuthorizedLabels(enforcedLabels);

return authorizedLabels.thenApply(labelList -> {
if (labelList.isEmpty()) {
return Optional.empty();
} else {
final var adjustedCommand =
TopLevelPolicyActionCommand.of(command.getPolicyActionCommand(), labelList);
return (Optional<T>) Optional.of(adjustedCommand);
}
});
}

private static List<CompletionStage<Label>> getLabelsFromPolicyEnforcer(final PolicyEnforcer policyEnforcer) {
return policyEnforcer.getPolicy()
.map(policy -> policy.getEntriesSet().stream()
.map(PolicyEntry::getLabel)
.filter(label -> enforcer.hasUnrestrictedPermissions(asResourceKey(label, command),
authorizationContext, Permission.EXECUTE))
.toList())
.orElse(List.of());
if (authorizedLabels.isEmpty()) {
return Optional.empty();
} else {
final var adjustedCommand =
TopLevelPolicyActionCommand.of(command.getPolicyActionCommand(), authorizedLabels);
return Optional.of(adjustedCommand);
}
.map(CompletableFuture::completedStage)
.toList()).orElse(List.of());
}

private static boolean hasUnrestrictedWritePermission(final Enforcer enforcer, final ResourceKey policyResourceKey,
private List<CompletionStage<Label>> enforcePolicyLabels(final List<CompletionStage<Label>> labels,
final Enforcer enforcer,
final TopLevelPolicyActionCommand command,
final AuthorizationContext authorizationContext) {

return labels.stream().map(labelStage ->
labelStage.thenCompose(label -> CompletableFuture.supplyAsync(
() -> enforcer.hasUnrestrictedPermissions(asResourceKey(label, command), authorizationContext,
Permission.EXECUTE), enforcementDispatcher).thenApply(result -> {
if (Boolean.TRUE.equals(result)) {
return label;
} else {
return null;
}
}))).toList();
}

private CompletableFuture<List<Label>> filterAuthorizedLabels(final List<CompletionStage<Label>> enforcedLabels) {
// Wait for all labels to finish enforced.
return CompletableFuture.allOf(enforcedLabels.toArray(new CompletableFuture[0]))
.thenCompose(voidValue -> {
final CompletionStage<List<Label>> labelList = CompletableFuture.completedStage(new ArrayList<>());
for (final CompletionStage<Label> l : enforcedLabels) {
l.thenCompose(label -> labelList.thenApply(list -> {
if (null != label) {
list.add(label);
}
return list;
}));
}
return labelList;
});
}

private CompletionStage<Boolean> hasUnrestrictedWritePermission(final Enforcer enforcer,
final ResourceKey policyResourceKey,
final AuthorizationContext authorizationContext) {
return enforcer.hasUnrestrictedPermissions(policyResourceKey, authorizationContext, Permission.WRITE);
return CompletableFuture.supplyAsync(() -> enforcer.hasUnrestrictedPermissions(policyResourceKey,
authorizationContext,
Permission.WRITE), enforcementDispatcher);
}

/**
* Limit view on entity of {@code PolicyQueryCommandResponse} by enforcer.
*
* @param response the response.
* @param enforcer the enforcer.
* @return response with view on entity restricted by enforcer.
* @return a {@code CompletionStage} containing the response with view on entity restricted by enforcer.
*/
public static <T extends PolicyQueryCommandResponse<T>> T buildJsonViewForPolicyQueryCommandResponse(
public <T extends PolicyQueryCommandResponse<T>> CompletionStage<T> buildJsonViewForPolicyQueryCommandResponse(
final PolicyQueryCommandResponse<T> response,
final Enforcer enforcer) {

final JsonValue entity = response.getEntity();
final CompletionStage<T> result;
if (entity.isObject()) {
final JsonObject filteredView =
final CompletionStage<JsonObject> filteredView =
getJsonViewForPolicyQueryCommandResponse(entity.asObject(), response, enforcer);
return response.setEntity(filteredView);
result = filteredView.thenApply(response::setEntity);
} else {
return response.setEntity(entity);
result = CompletableFuture.completedStage(response.setEntity(entity));
}
return result;
}

private static JsonObject getJsonViewForPolicyQueryCommandResponse(final JsonObject responseEntity,
private CompletableFuture<JsonObject> getJsonViewForPolicyQueryCommandResponse(final JsonObject responseEntity,
final PolicyQueryCommandResponse<?> response,
final Enforcer enforcer) {

final var resourceKey = ResourceKey.newInstance(PolicyCommand.RESOURCE_TYPE, response.getResourcePath());
final var authorizationContext = response.getDittoHeaders().getAuthorizationContext();

return enforcer.buildJsonView(resourceKey, responseEntity, authorizationContext,
POLICY_QUERY_COMMAND_RESPONSE_ALLOWLIST, Permissions.newInstance(Permission.READ));
return CompletableFuture.supplyAsync(() -> enforcer.buildJsonView(resourceKey, responseEntity,
authorizationContext,
POLICY_QUERY_COMMAND_RESPONSE_ALLOWLIST, Permissions.newInstance(Permission.READ)),
enforcementDispatcher);
}

/**
Expand Down
Expand Up @@ -112,7 +112,7 @@ protected Props getPersistenceActorProps(final PolicyId entityId) {

@Override
protected Props getPersistenceEnforcerProps(final PolicyId entityId) {
return PolicyEnforcerActor.props(entityId, new PolicyCommandEnforcement());
return PolicyEnforcerActor.props(entityId, new PolicyCommandEnforcement(getContext().system()));
}

@Override
Expand Down
Expand Up @@ -142,18 +142,14 @@ public final class PolicyCommandEnforcementTest {

private TestProbe policyPersistenceActorProbe;
private ActorRef supervisor;
private MockPolicyPersistenceSupervisor mockPolicyPersistenceSupervisor;

@Before
public void init() {
system = ActorSystem.create("test", ConfigFactory.load("test"));

pubSubMediatorProbe = createPubSubMediatorProbe();
policyPersistenceActorProbe = createPolicyPersistenceActorProbe();
final TestActorRef<MockPolicyPersistenceSupervisor> policyPersistenceSupervisorTestActorRef =
createPolicyPersistenceSupervisor();
supervisor = policyPersistenceSupervisorTestActorRef;
mockPolicyPersistenceSupervisor = policyPersistenceSupervisorTestActorRef.underlyingActor();
supervisor = createPolicyPersistenceSupervisor();
}

@After
Expand Down Expand Up @@ -790,17 +786,15 @@ private TestProbe createPolicyPersistenceActorProbe() {

private TestActorRef<MockPolicyPersistenceSupervisor> createPolicyPersistenceSupervisor() {
return new TestActorRef<>(system, Props.create(
MockPolicyPersistenceSupervisor.class,
pubSubMediatorProbe.ref(),
policyPersistenceActorProbe.ref()
), system.guardian(), MockPolicyPersistenceSupervisor.ACTOR_NAME);
MockPolicyPersistenceSupervisor.class, () -> new MockPolicyPersistenceSupervisor(pubSubMediatorProbe.ref(),
policyPersistenceActorProbe.ref())), system.guardian(), MockPolicyPersistenceSupervisor.ACTOR_NAME);
}

private static String createUniqueName(final String prefix) {
return prefix + UUID.randomUUID();
}

private static class MockPolicyPersistenceSupervisor
private class MockPolicyPersistenceSupervisor
extends AbstractPersistenceSupervisor<PolicyId, PolicyCommand<?>> {

static final String ACTOR_NAME = "mockPolicyPersistenceSupervisor";
Expand All @@ -812,10 +806,6 @@ private MockPolicyPersistenceSupervisor(final ActorRef pubSubMediator, final Act
this.pubSubMediator = pubSubMediator;
}

ActorRef getEnforcerChild() {
return enforcerChild;
}

@Override
protected PolicyId getEntityId() throws Exception {
return POLICY_ID;
Expand All @@ -828,7 +818,7 @@ protected Props getPersistenceActorProps(final PolicyId entityId) {

@Override
protected Props getPersistenceEnforcerProps(final PolicyId entityId) {
return PolicyEnforcerActor.props(entityId, new PolicyCommandEnforcement());
return PolicyEnforcerActor.props(entityId, new PolicyCommandEnforcement(system));
}

@Override
Expand Down

0 comments on commit e795e70

Please sign in to comment.