Skip to content

Commit

Permalink
made DefaultPolicyEnforcerProvider no longer aware of sister "cache b…
Browse files Browse the repository at this point in the history
…ased" implementation

* let the PolicyEnforcerProvider interface decide based on config which implementation to instantiate
* use the existing PolicyEnforcerCacheLoader in CachingPolicyEnforcerProvider cache to async load entries
* adjusted config keys and defaults of caching to old values

Signed-off-by: Thomas Jaeckle <thomas.jaeckle@bosch.io>
  • Loading branch information
thjaeckle committed Jul 20, 2022
1 parent 7732389 commit 820bc34
Show file tree
Hide file tree
Showing 7 changed files with 212 additions and 176 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* Copyright (c) 2022 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0
*
* SPDX-License-Identifier: EPL-2.0
*/
package org.eclipse.ditto.policies.enforcement;

import org.eclipse.ditto.internal.utils.cache.entry.Entry;
import org.eclipse.ditto.internal.utils.cacheloaders.EnforcementCacheKey;
import org.eclipse.ditto.internal.utils.cacheloaders.config.AskWithRetryConfig;
import org.eclipse.ditto.internal.utils.cluster.ShardRegionProxyActorFactory;
import org.eclipse.ditto.internal.utils.cluster.config.ClusterConfig;
import org.eclipse.ditto.internal.utils.cluster.config.DefaultClusterConfig;
import org.eclipse.ditto.internal.utils.config.DefaultScopedConfig;
import org.eclipse.ditto.policies.api.PoliciesMessagingConstants;
import org.eclipse.ditto.policies.enforcement.config.DefaultEnforcementConfig;

import com.github.benmanes.caffeine.cache.AsyncCacheLoader;

import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.dispatch.MessageDispatcher;

/**
* Abstract base of {@link PolicyEnforcer} implementations.
*/
abstract class AbstractPolicyEnforcerProvider implements PolicyEnforcerProvider {

protected AbstractPolicyEnforcerProvider() {
// no-op
}

protected static AsyncCacheLoader<EnforcementCacheKey, Entry<PolicyEnforcer>> policyEnforcerCacheLoader(
final ActorSystem actorSystem) {

final DefaultScopedConfig dittoScoped = DefaultScopedConfig.dittoScoped(actorSystem.settings().config());
final AskWithRetryConfig askWithRetryConfig = DefaultEnforcementConfig.of(dittoScoped)
.getAskWithRetryConfig();

final ClusterConfig clusterConfig = DefaultClusterConfig.of(dittoScoped);
final ShardRegionProxyActorFactory shardRegionProxyActorFactory =
ShardRegionProxyActorFactory.newInstance(actorSystem, clusterConfig);

final ActorRef policiesShardRegion = shardRegionProxyActorFactory.getShardRegionProxyActor(
PoliciesMessagingConstants.CLUSTER_ROLE,
PoliciesMessagingConstants.SHARD_REGION
);
return new PolicyEnforcerCacheLoader(askWithRetryConfig, actorSystem.getScheduler(), policiesShardRegion);
}

protected static MessageDispatcher enforcementCacheDispatcher(final ActorSystem actorSystem) {
return actorSystem.dispatchers().lookup(PolicyEnforcerCacheLoader.ENFORCEMENT_CACHE_DISPATCHER);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -19,42 +19,72 @@

import javax.annotation.Nullable;

import org.eclipse.ditto.base.model.entity.id.NamespacedEntityId;
import org.eclipse.ditto.internal.utils.akka.logging.DittoDiagnosticLoggingAdapter;
import org.eclipse.ditto.internal.utils.akka.logging.DittoLoggerFactory;
import org.eclipse.ditto.internal.utils.cache.Cache;
import org.eclipse.ditto.internal.utils.cache.CacheFactory;
import org.eclipse.ditto.internal.utils.cache.config.DefaultCacheConfig;
import org.eclipse.ditto.internal.utils.cache.entry.Entry;
import org.eclipse.ditto.internal.utils.cacheloaders.EnforcementCacheKey;
import org.eclipse.ditto.internal.utils.cluster.DistPubSubAccess;
import org.eclipse.ditto.internal.utils.namespaces.BlockedNamespaces;
import org.eclipse.ditto.policies.api.PolicyTag;
import org.eclipse.ditto.policies.model.PolicyId;
import org.slf4j.Logger;

import com.github.benmanes.caffeine.cache.AsyncCacheLoader;

import akka.actor.AbstractActor;
import akka.actor.ActorRef;
import akka.actor.ActorRefFactory;
import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.cluster.ddata.ORSet;
import akka.cluster.ddata.Replicator;
import akka.cluster.pubsub.DistributedPubSub;
import akka.cluster.pubsub.DistributedPubSubMediator;
import akka.dispatch.MessageDispatcher;
import akka.japi.pf.ReceiveBuilder;
import akka.pattern.Patterns;

/**
* Transparent caching layer for {@link org.eclipse.ditto.policies.enforcement.PolicyEnforcerProvider}
*/
final class CachingPolicyEnforcerProvider implements PolicyEnforcerProvider {
final class CachingPolicyEnforcerProvider extends AbstractPolicyEnforcerProvider {

private static final Logger LOGGER = DittoLoggerFactory.getThreadSafeLogger(CachingPolicyEnforcerProvider.class);
private static final Duration LOCAL_POLICY_RETRIEVAL_TIMEOUT = Duration.ofSeconds(60);

private final ActorRef cachingPolicyEnforcerProviderActor;

CachingPolicyEnforcerProvider(final ActorRefFactory actorRefFactory,
final Cache<PolicyId, Entry<PolicyEnforcer>> policyEnforcerCache,
final PolicyEnforcerProvider delegate,
@Nullable final BlockedNamespaces blockedNamespaces,
CachingPolicyEnforcerProvider(final ActorSystem actorSystem) {
this(actorSystem, policyEnforcerCacheLoader(actorSystem), enforcementCacheDispatcher(actorSystem));
}

private CachingPolicyEnforcerProvider(final ActorSystem actorSystem,
final AsyncCacheLoader<EnforcementCacheKey, Entry<PolicyEnforcer>> policyEnforcerCacheLoader,
final MessageDispatcher cacheDispatcher) {

this(actorSystem,
CacheFactory.createCache(
policyEnforcerCacheLoader,
DefaultCacheConfig.of(actorSystem.settings().config(),
PolicyEnforcerProvider.ENFORCER_CACHE_CONFIG_KEY),
"policy_enforcer_cache",
cacheDispatcher
),
BlockedNamespaces.of(actorSystem),
DistributedPubSub.get(actorSystem).mediator()
);
}

CachingPolicyEnforcerProvider(final ActorSystem actorSystem,
final Cache<EnforcementCacheKey, Entry<PolicyEnforcer>> policyEnforcerCache,
final BlockedNamespaces blockedNamespaces,
final ActorRef pubSubMediator) {

this.cachingPolicyEnforcerProviderActor = actorRefFactory.actorOf(
CachingPolicyEnforcerProviderActor.props(policyEnforcerCache, delegate, blockedNamespaces,
this.cachingPolicyEnforcerProviderActor = actorSystem.actorOf(
CachingPolicyEnforcerProviderActor.props(policyEnforcerCache, blockedNamespaces,
pubSubMediator));
}

Expand All @@ -63,7 +93,7 @@ public CompletionStage<Optional<PolicyEnforcer>> getPolicyEnforcer(@Nullable fin
if (policyId == null) {
return CompletableFuture.completedStage(Optional.empty());
}
return Patterns.ask(cachingPolicyEnforcerProviderActor, policyId, Duration.ofSeconds(60))
return Patterns.ask(cachingPolicyEnforcerProviderActor, policyId, LOCAL_POLICY_RETRIEVAL_TIMEOUT)
.thenApply(response -> {
final Optional<PolicyEnforcer> result;
if (response instanceof Optional<?> optional) {
Expand Down Expand Up @@ -91,16 +121,13 @@ public CompletionStage<Optional<PolicyEnforcer>> getPolicyEnforcer(@Nullable fin
private static final class CachingPolicyEnforcerProviderActor extends AbstractActor {

private final DittoDiagnosticLoggingAdapter log = DittoLoggerFactory.getDiagnosticLoggingAdapter(this);
private final Cache<PolicyId, Entry<PolicyEnforcer>> policyEnforcerCache;
private final PolicyEnforcerProvider delegate;
private final Cache<EnforcementCacheKey, Entry<PolicyEnforcer>> policyEnforcerCache;

CachingPolicyEnforcerProviderActor(final Cache<PolicyId, Entry<PolicyEnforcer>> policyEnforcerCache,
final PolicyEnforcerProvider delegate,
CachingPolicyEnforcerProviderActor(final Cache<EnforcementCacheKey, Entry<PolicyEnforcer>> policyEnforcerCache,
@Nullable final BlockedNamespaces blockedNamespaces,
final ActorRef pubSubMediator) {

this.policyEnforcerCache = policyEnforcerCache;
this.delegate = delegate;

if (blockedNamespaces != null) {
blockedNamespaces.subscribeForChanges(getSelf());
Expand All @@ -111,46 +138,31 @@ private static final class CachingPolicyEnforcerProviderActor extends AbstractAc
getSelf());
}

private static Props props(final Cache<PolicyId, Entry<PolicyEnforcer>> policyEnforcerCache,
final PolicyEnforcerProvider delegate,
private static Props props(final Cache<EnforcementCacheKey, Entry<PolicyEnforcer>> policyEnforcerCache,
@Nullable final BlockedNamespaces blockedNamespaces,
final ActorRef pubSubMediator) {

return Props.create(CachingPolicyEnforcerProviderActor.class,
() -> new CachingPolicyEnforcerProviderActor(policyEnforcerCache, delegate, blockedNamespaces,
pubSubMediator));
return Props.create(CachingPolicyEnforcerProviderActor.class, policyEnforcerCache, blockedNamespaces,
pubSubMediator);
}

@Override
public Receive createReceive() {
return ReceiveBuilder.create()
.match(PolicyId.class, this::doGetPolicyEnforcer)
.match(DistributedPubSubMediator.SubscribeAck.class, s -> log.debug("Got subscribeAck <{}>.", s))
.match(PolicyTag.class, policyTag -> policyEnforcerCache.invalidate(policyTag.getEntityId()))
.match(PolicyTag.class, policyTag ->
policyEnforcerCache.invalidate(EnforcementCacheKey.of(policyTag.getEntityId()))
)
.match(Replicator.Changed.class, this::handleChangedBlockedNamespaces)
.build();
}

private void doGetPolicyEnforcer(final PolicyId policyId) {
final ActorRef sender = sender();
final CompletableFuture<Optional<PolicyEnforcer>> policyEnforcerCS = policyEnforcerCache.get(policyId)
.thenCompose(optionalEntry -> {
if (optionalEntry.isPresent()) {
//Value is already cached. Return potentially unavailable value as optional.
final Entry<PolicyEnforcer> policyEnforcerEntry = optionalEntry.get();
return CompletableFuture.completedStage(policyEnforcerEntry.get());
} else {
// Value is not yet cached. Try to load it and put it into cache.
return delegate.getPolicyEnforcer(policyId)
.thenApply(optionalEnforcer -> {
final Entry<PolicyEnforcer> policyEnforcerEntry =
optionalEnforcer.map(enforcer -> Entry.of(0L, enforcer))
.orElseGet(Entry::nonexistent);
policyEnforcerCache.put(policyId, policyEnforcerEntry);
return optionalEnforcer;
});
}
});
final ActorRef sender = getSender();
final CompletableFuture<Optional<PolicyEnforcer>> policyEnforcerCS =
policyEnforcerCache.get(EnforcementCacheKey.of(policyId))
.thenApply(optionalEntry -> optionalEntry.flatMap(Entry::get));
Patterns.pipe(policyEnforcerCS, getContext().dispatcher()).to(sender);
}

Expand All @@ -160,8 +172,8 @@ private void handleChangedBlockedNamespaces(final Replicator.Changed<?> changed)
final ORSet<String> namespaces = (ORSet<String>) orSet;
logNamespaces("Received", namespaces);
policyEnforcerCache.asMap().keySet().stream()
.filter(policyId -> {
final String cachedNamespace = policyId.getNamespace();
.filter(cacheKey -> {
final String cachedNamespace = ((NamespacedEntityId) cacheKey.getId()).getNamespace();
return namespaces.contains(cachedNamespace);
})
.forEach(policyEnforcerCache::invalidate);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,109 +20,46 @@

import org.eclipse.ditto.internal.utils.akka.logging.DittoLoggerFactory;
import org.eclipse.ditto.internal.utils.akka.logging.ThreadSafeDittoLogger;
import org.eclipse.ditto.internal.utils.cache.CacheFactory;
import org.eclipse.ditto.internal.utils.cache.config.DefaultCacheConfig;
import org.eclipse.ditto.internal.utils.cache.entry.Entry;
import org.eclipse.ditto.internal.utils.cacheloaders.EnforcementCacheKey;
import org.eclipse.ditto.internal.utils.cacheloaders.config.AskWithRetryConfig;
import org.eclipse.ditto.internal.utils.cluster.ShardRegionProxyActorFactory;
import org.eclipse.ditto.internal.utils.cluster.config.ClusterConfig;
import org.eclipse.ditto.internal.utils.cluster.config.DefaultClusterConfig;
import org.eclipse.ditto.internal.utils.config.DefaultScopedConfig;
import org.eclipse.ditto.internal.utils.namespaces.BlockedNamespaces;
import org.eclipse.ditto.policies.api.PoliciesMessagingConstants;
import org.eclipse.ditto.policies.enforcement.config.DefaultEnforcementConfig;
import org.eclipse.ditto.policies.model.PolicyId;

import com.github.benmanes.caffeine.cache.AsyncCacheLoader;

import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.cluster.pubsub.DistributedPubSub;
import akka.dispatch.MessageDispatcher;

/**
* Loads the {@link org.eclipse.ditto.policies.model.Policy} from the policies shard region and wraps it into a {@link PolicyEnforcer}.
* Loads the {@link org.eclipse.ditto.policies.model.Policy} from the policies shard region and wraps it into a
* {@link PolicyEnforcer}.
*/
public final class DefaultPolicyEnforcerProvider implements PolicyEnforcerProvider {
final class DefaultPolicyEnforcerProvider extends AbstractPolicyEnforcerProvider {

private static final ThreadSafeDittoLogger LOGGER =
DittoLoggerFactory.getThreadSafeLogger(DefaultPolicyEnforcerProvider.class);

private final AsyncCacheLoader<EnforcementCacheKey, Entry<PolicyEnforcer>> policyEnforcerCacheLoader;
private final MessageDispatcher enforcementCacheDispatcher;
private final MessageDispatcher cacheDispatcher;

DefaultPolicyEnforcerProvider(final ActorSystem actorSystem) {
this(policyEnforcerCacheLoader(actorSystem), enforcementCacheDispatcher(actorSystem));
}

DefaultPolicyEnforcerProvider(
final AsyncCacheLoader<EnforcementCacheKey, Entry<PolicyEnforcer>> policyEnforcerCacheLoader,
final MessageDispatcher enforcementCacheDispatcher) {
final MessageDispatcher cacheDispatcher) {

this.policyEnforcerCacheLoader = policyEnforcerCacheLoader;
this.enforcementCacheDispatcher = enforcementCacheDispatcher;
this.cacheDispatcher = cacheDispatcher;
}

/**
* Creates a new instance of this policy enforcer provider based on the configuration in the actor system
*
* @param actorSystem used to initialize all dependencies of the policy enforcer provider
* @return the new instance.
*/
public static PolicyEnforcerProvider getInstance(final ActorSystem actorSystem) {
final DefaultPolicyEnforcerProvider shardRegionPolicyEnforcerProvider =
new DefaultPolicyEnforcerProvider(actorSystem);
final boolean withCaching = actorSystem.settings().config().getBoolean("ditto.policies-enforcer-cache.enabled");
if (withCaching) {
return shardRegionPolicyEnforcerProvider.withCaching(actorSystem);
} else {
return shardRegionPolicyEnforcerProvider;
}
}

private static AskWithRetryConfig askWithRetryConfig(final ActorSystem actorSystem) {
final DefaultScopedConfig dittoScoped = DefaultScopedConfig.dittoScoped(actorSystem.settings().config());
return DefaultEnforcementConfig.of(dittoScoped).getAskWithRetryConfig();
}

private static ActorRef policiesShardRegion(final ActorSystem actorSystem) {
final var dittoScopedConfig = DefaultScopedConfig.dittoScoped(actorSystem.settings().config());
final ClusterConfig clusterConfig = DefaultClusterConfig.of(dittoScopedConfig);
final ShardRegionProxyActorFactory shardRegionProxyActorFactory =
ShardRegionProxyActorFactory.newInstance(actorSystem, clusterConfig);

return shardRegionProxyActorFactory.getShardRegionProxyActor(
PoliciesMessagingConstants.CLUSTER_ROLE,
PoliciesMessagingConstants.SHARD_REGION
);
}

private static AsyncCacheLoader<EnforcementCacheKey, Entry<PolicyEnforcer>> policyEnforcerCacheLoader(
final ActorSystem actorSystem) {
final AskWithRetryConfig askWithRetryConfig = askWithRetryConfig(actorSystem);
final ActorRef policiesShardRegion = policiesShardRegion(actorSystem);
return new PolicyEnforcerCacheLoader(askWithRetryConfig, actorSystem.getScheduler(), policiesShardRegion);
}

private static MessageDispatcher enforcementCacheDispatcher(final ActorSystem actorSystem) {
return actorSystem.dispatchers().lookup(PolicyEnforcerCacheLoader.ENFORCEMENT_CACHE_DISPATCHER);
}

/**
* Loads the {@link org.eclipse.ditto.policies.model.Policy} from the policies shard region and wraps it into a {@link PolicyEnforcer}.
*
* @param policyId the ID of the policy that should be loaded.
* @return A completion stage completing with an Optional holding the PolicyEnforcer in case it could be retrieved or an empty optional if not.
*/
@Override
public CompletionStage<Optional<PolicyEnforcer>> getPolicyEnforcer(@Nullable final PolicyId policyId) {
if (null == policyId) {
return CompletableFuture.completedStage(Optional.empty());
} else {
try {
return policyEnforcerCacheLoader.asyncLoad(EnforcementCacheKey.of(policyId),
enforcementCacheDispatcher)
return policyEnforcerCacheLoader.asyncLoad(EnforcementCacheKey.of(policyId), cacheDispatcher)
.thenApply(Entry::get)
.exceptionally(error -> Optional.empty());
} catch (final Exception e) {
Expand All @@ -134,18 +71,4 @@ public CompletionStage<Optional<PolicyEnforcer>> getPolicyEnforcer(@Nullable fin
}
}
}

private PolicyEnforcerProvider withCaching(final ActorSystem actorSystem) {
final var dispatchers = actorSystem.dispatchers();
final var cacheConfig = DefaultCacheConfig.of(actorSystem.settings().config(), "ditto.policies-enforcer-cache");
final var cacheDispatcher = dispatchers.lookup(PolicyEnforcerCacheLoader.ENFORCEMENT_CACHE_DISPATCHER);
final var policyEnforcerCache =
CacheFactory.<PolicyId, Entry<PolicyEnforcer>>createCache(cacheConfig, "policy_enforcer_cache",
cacheDispatcher);
final var pubSubMediator = DistributedPubSub.get(actorSystem).mediator();
final var blockedNamespaces = BlockedNamespaces.of(actorSystem);
return new CachingPolicyEnforcerProvider(actorSystem, policyEnforcerCache, this, blockedNamespaces,
pubSubMediator);
}

}
Loading

0 comments on commit 820bc34

Please sign in to comment.