Skip to content

Commit

Permalink
Move logic for retrieval of policy enforcer into separate class which
Browse files Browse the repository at this point in the history
allows to mock the logic for tests in a single place

Signed-off-by: Yannic Klem <Yannic.Klem@bosch.io>
  • Loading branch information
Yannic92 committed Jun 29, 2022
1 parent 22baef7 commit c1ecdc6
Show file tree
Hide file tree
Showing 19 changed files with 379 additions and 342 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
*/
package org.eclipse.ditto.internal.utils.cache.entry;

import java.util.Optional;

/**
* Cache entry for authorization.
*
Expand Down Expand Up @@ -50,4 +52,12 @@ static <T> Entry<T> nonexistent() {
*/
T getValueOrThrow();

default Optional<T> get() {
if (exists()) {
return Optional.of(getValueOrThrow());
} else {
return Optional.empty();
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,9 @@
import org.eclipse.ditto.internal.utils.akka.actors.AbstractActorWithStashWithTimers;
import org.eclipse.ditto.internal.utils.akka.logging.DittoDiagnosticLoggingAdapter;
import org.eclipse.ditto.internal.utils.akka.logging.DittoLoggerFactory;
import org.eclipse.ditto.internal.utils.cluster.DistPubSubAccess;
import org.eclipse.ditto.internal.utils.namespaces.BlockedNamespaces;
import org.eclipse.ditto.policies.api.PolicyTag;
import org.eclipse.ditto.policies.model.PolicyId;

import akka.actor.ActorRef;
import akka.cluster.ddata.ORSet;
import akka.cluster.ddata.Replicator;
import akka.cluster.pubsub.DistributedPubSubMediator;
import akka.japi.pf.ReceiveBuilder;

Expand All @@ -63,19 +58,9 @@ public abstract class AbstractEnforcerActor<I extends EntityId, S extends Signal
protected final I entityId;
protected final E enforcement;

protected AbstractEnforcerActor(final I entityId, final E enforcement, final ActorRef pubSubMediator,
@Nullable final BlockedNamespaces blockedNamespaces) {

protected AbstractEnforcerActor(final I entityId, final E enforcement) {
this.entityId = entityId;
this.enforcement = enforcement;

// subscribe for PolicyTags in order to reload policyEnforcer when "backing policy" was modified
pubSubMediator.tell(DistPubSubAccess.subscribe(PolicyTag.PUB_SUB_TOPIC_INVALIDATE_ENFORCERS, getSelf()),
getSelf());

if (null != blockedNamespaces) {
blockedNamespaces.subscribeForChanges(getSelf());
}
}

/**
Expand All @@ -93,29 +78,20 @@ protected AbstractEnforcerActor(final I entityId, final E enforcement, final Act
* The implementation chooses the most efficient strategy to retrieve it.
*
* @param policyId the {@link PolicyId} to retrieve the PolicyEnforcer for.
* @return a successful CompletionStage of either the loaded {@link PolicyEnforcer} or a failed CompletionStage with
* the cause for the failure.
* @return a successful CompletionStage of either an optional holding the loaded {@link PolicyEnforcer} or an empty optional if the enforcer could not be loaded.
*/
protected abstract CompletionStage<PolicyEnforcer> providePolicyEnforcer(@Nullable PolicyId policyId);
protected abstract CompletionStage<Optional<PolicyEnforcer>> providePolicyEnforcer(@Nullable PolicyId policyId);

@SuppressWarnings("unchecked")
@Override
public Receive createReceive() {
return ReceiveBuilder.create()
.match(DistributedPubSubMediator.SubscribeAck.class, s -> log.debug("Got subscribeAck <{}>.", s))
.match(PolicyTag.class, this::matchesPolicy, pt -> {
//TODO: yannic invalidate policy cache later
})
.match(PolicyTag.class, pt -> {
//TODO: yannic we should not even retrieve those, as this could lead to a lot of traffic
// ignore policy tags not intended for this actor - not necessary to log on debug!
})
.match(SudoCommand.class, sudoCommand -> log.withCorrelationId(sudoCommand)
.error("Received SudoCommand in enforcer which should never happen: <{}>", sudoCommand)
)
.match(CommandResponse.class, r -> filterResponse((R) r))
.match(Signal.class, s -> enforceSignal((S) s))
.match(Replicator.Changed.class, this::handleChanged)
.matchAny(message ->
log.withCorrelationId(
message instanceof WithDittoHeaders withDittoHeaders ? withDittoHeaders : null)
Expand All @@ -125,40 +101,7 @@ public Receive createReceive() {

protected CompletionStage<Optional<PolicyEnforcer>> loadPolicyEnforcer(Signal<?> signal) {
return providePolicyIdForEnforcement(signal)
.thenCompose(this::providePolicyEnforcer)
.handle((pEnf, throwable) -> {
if (null != throwable) {
log.error(throwable, "Failed to load policy enforcer; stopping myself..");
getContext().stop(getSelf());
return Optional.empty();
} else {
return Optional.ofNullable(pEnf);
}
});
}

@SuppressWarnings("unchecked")
private void handleChanged(final Replicator.Changed<?> changed) {
if (changed.dataValue() instanceof ORSet) {
final ORSet<String> namespaces = (ORSet<String>) changed.dataValue();
logNamespaces("Received", namespaces);
//TODO: Yannic invalidate policy cache after caching is reintroduced
} else {
log.warning("Unhandled: <{}>", changed);
}
}

private void logNamespaces(final String verb, final ORSet<String> namespaces) {
if (namespaces.size() > 25) {
log.info("{} <{}> namespaces", verb, namespaces.size());
} else {
log.info("{} namespaces: <{}>", verb, namespaces);
}
}

private boolean matchesPolicy(final PolicyTag policyTag) {
//TODO: Yannic this method should not be necessary
return false;
.thenCompose(this::providePolicyEnforcer);
}

/**
Expand Down Expand Up @@ -204,7 +147,7 @@ private void handleAuthorizationFailure(
final Signal<?> signal,
final Throwable throwable,
final ActorRef sender
) {
) {
final DittoHeaders dittoHeaders = signal.getDittoHeaders();
final DittoRuntimeException dittoRuntimeException =
DittoRuntimeException.asDittoRuntimeException(throwable, t ->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,25 +12,16 @@
*/
package org.eclipse.ditto.policies.enforcement;

import java.util.concurrent.CompletableFuture;
import java.util.Optional;
import java.util.concurrent.CompletionStage;

import javax.annotation.Nullable;

import org.eclipse.ditto.base.model.entity.id.EntityId;
import org.eclipse.ditto.base.model.signals.Signal;
import org.eclipse.ditto.base.model.signals.commands.CommandResponse;
import org.eclipse.ditto.internal.utils.cache.entry.Entry;
import org.eclipse.ditto.internal.utils.cacheloaders.EnforcementCacheKey;
import org.eclipse.ditto.internal.utils.cacheloaders.config.AskWithRetryConfig;
import org.eclipse.ditto.internal.utils.namespaces.BlockedNamespaces;
import org.eclipse.ditto.policies.model.PolicyId;

import com.github.benmanes.caffeine.cache.AsyncCacheLoader;

import akka.actor.ActorRef;
import akka.dispatch.MessageDispatcher;

/**
* Abstract enforcer of commands performing authorization / enforcement of incoming signals based on policy
* loaded via the policies shard region.
Expand All @@ -43,41 +34,18 @@
public abstract class AbstractPolicyLoadingEnforcerActor<I extends EntityId, S extends Signal<?>, R extends CommandResponse<?>,
E extends EnforcementReloaded<S, R>> extends AbstractEnforcerActor<I, S, R, E> {

private final AsyncCacheLoader<EnforcementCacheKey, Entry<PolicyEnforcer>> policyEnforcerCacheLoader;
private final MessageDispatcher enforcementCacheDispatcher;
private final PolicyEnforcerProvider policyEnforcerProvider;

protected AbstractPolicyLoadingEnforcerActor(final I entityId,
final E enforcement,
final ActorRef pubSubMediator,
@Nullable final BlockedNamespaces blockedNamespaces,
final AskWithRetryConfig askWithRetryConfig,
final ActorRef policiesShardRegion) {
super(entityId, enforcement, pubSubMediator, blockedNamespaces);
this.policyEnforcerCacheLoader = new PolicyEnforcerCacheLoader(askWithRetryConfig,
context().system().getScheduler(),
policiesShardRegion
);
enforcementCacheDispatcher =
context().system().dispatchers().lookup(PolicyEnforcerCacheLoader.ENFORCEMENT_CACHE_DISPATCHER);
final PolicyEnforcerProvider policyEnforcerProvider) {
super(entityId, enforcement);
this.policyEnforcerProvider = policyEnforcerProvider;
}

@Override
protected CompletionStage<PolicyEnforcer> providePolicyEnforcer(@Nullable final PolicyId policyId) {
if (null == policyId) {
return CompletableFuture.completedStage(null);
} else {
try {
return policyEnforcerCacheLoader.asyncLoad(EnforcementCacheKey.of(policyId), enforcementCacheDispatcher)
.thenApply(entry -> {
if (entry.exists()) {
return entry.getValueOrThrow();
} else {
return null;
}
});
} catch (final Exception e) {
throw new IllegalStateException("Could not load policyEnforcer via policyEnforcerCacheLoader", e);
}
}
protected CompletionStage<Optional<PolicyEnforcer>> providePolicyEnforcer(@Nullable final PolicyId policyId) {
return policyEnforcerProvider.getPolicyEnforcer(policyId)
.exceptionally(error -> Optional.empty());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
/*
* Copyright (c) 2022 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0
*
* SPDX-License-Identifier: EPL-2.0
*/
package org.eclipse.ditto.policies.enforcement;

import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;

import javax.annotation.Nullable;

import org.eclipse.ditto.internal.utils.cache.entry.Entry;
import org.eclipse.ditto.internal.utils.cacheloaders.EnforcementCacheKey;
import org.eclipse.ditto.internal.utils.cacheloaders.config.AskWithRetryConfig;
import org.eclipse.ditto.internal.utils.cluster.ShardRegionProxyActorFactory;
import org.eclipse.ditto.internal.utils.cluster.config.ClusterConfig;
import org.eclipse.ditto.internal.utils.cluster.config.DefaultClusterConfig;
import org.eclipse.ditto.internal.utils.config.DefaultScopedConfig;
import org.eclipse.ditto.policies.api.PoliciesMessagingConstants;
import org.eclipse.ditto.policies.enforcement.config.DefaultEnforcementConfig;
import org.eclipse.ditto.policies.model.PolicyId;

import com.github.benmanes.caffeine.cache.AsyncCacheLoader;

import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.dispatch.MessageDispatcher;

/**
* Loads the {@link org.eclipse.ditto.policies.model.Policy} from the policies shard region and wraps it into a {@link PolicyEnforcer}.
*/
public final class DefaultPolicyEnforcerProvider implements PolicyEnforcerProvider {

private final AsyncCacheLoader<EnforcementCacheKey, Entry<PolicyEnforcer>> policyEnforcerCacheLoader;
private final MessageDispatcher enforcementCacheDispatcher;

DefaultPolicyEnforcerProvider(final ActorSystem actorSystem) {
this(askWithRetryConfig(actorSystem), actorSystem, policiesShardRegion(actorSystem));
}

DefaultPolicyEnforcerProvider(
final AskWithRetryConfig askWithRetryConfig,
final ActorSystem actorSystem,
final ActorRef policiesShardRegion) {

this.policyEnforcerCacheLoader = new PolicyEnforcerCacheLoader(askWithRetryConfig,
actorSystem.getScheduler(),
policiesShardRegion
);
enforcementCacheDispatcher =
actorSystem.dispatchers().lookup(PolicyEnforcerCacheLoader.ENFORCEMENT_CACHE_DISPATCHER);
}

/**
* Creates a new instance of this policy enforcer provider based on the configuration in the actor system
*
* @param actorSystem used to initialize all dependencies of the policy enforcer provider
* @return the new instance.
*/
public static PolicyEnforcerProvider getInstance(final ActorSystem actorSystem) {
return new DefaultPolicyEnforcerProvider(actorSystem);
}

private static AskWithRetryConfig askWithRetryConfig(final ActorSystem actorSystem) {
final DefaultScopedConfig dittoScoped = DefaultScopedConfig.dittoScoped(actorSystem.settings().config());
return DefaultEnforcementConfig.of(dittoScoped).getAskWithRetryConfig();
}

private static ActorRef policiesShardRegion(final ActorSystem actorSystem) {
final var dittoScopedConfig = DefaultScopedConfig.dittoScoped(actorSystem.settings().config());
final ClusterConfig clusterConfig = DefaultClusterConfig.of(dittoScopedConfig);
final ShardRegionProxyActorFactory shardRegionProxyActorFactory =
ShardRegionProxyActorFactory.newInstance(actorSystem, clusterConfig);

return shardRegionProxyActorFactory.getShardRegionProxyActor(
PoliciesMessagingConstants.CLUSTER_ROLE,
PoliciesMessagingConstants.SHARD_REGION
);
}

/**
* Loads the {@link org.eclipse.ditto.policies.model.Policy} from the policies shard region and wraps it into a {@link PolicyEnforcer}.
*
* @param policyId the ID of the policy that should be loaded.
* @return A completion stage completing with an Optional holding the PolicyEnforcer in case it could be retrieved or an empty optional if not.
*/
@Override
public CompletionStage<Optional<PolicyEnforcer>> getPolicyEnforcer(@Nullable final PolicyId policyId) {
if (null == policyId) {
return CompletableFuture.completedStage(Optional.empty());
} else {
try {
return policyEnforcerCacheLoader.asyncLoad(EnforcementCacheKey.of(policyId),
enforcementCacheDispatcher)
.thenApply(Entry::get)
.exceptionally(error -> Optional.empty());
} catch (final Exception e) {
throw new IllegalStateException("Could not load policyEnforcer via policyEnforcerCacheLoader", e);
}
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Copyright (c) 2022 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0
*
* SPDX-License-Identifier: EPL-2.0
*/
package org.eclipse.ditto.policies.enforcement;

import java.util.Optional;
import java.util.concurrent.CompletionStage;

import javax.annotation.Nullable;

import org.eclipse.ditto.policies.model.PolicyId;

@FunctionalInterface
public interface PolicyEnforcerProvider {

CompletionStage<Optional<PolicyEnforcer>> getPolicyEnforcer(@Nullable PolicyId policyId);

}
Loading

0 comments on commit c1ecdc6

Please sign in to comment.