Skip to content

Commit

Permalink
Add caching and unittests für PolicyEnforcerProvider
Browse files Browse the repository at this point in the history
Signed-off-by: Yannic Klem <Yannic.Klem@bosch.io>
  • Loading branch information
Yannic92 committed Jun 29, 2022
1 parent c1ecdc6 commit 4eccbcb
Show file tree
Hide file tree
Showing 5 changed files with 647 additions and 12 deletions.
@@ -0,0 +1,184 @@
/*
* Copyright (c) 2022 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0
*
* SPDX-License-Identifier: EPL-2.0
*/
package org.eclipse.ditto.policies.enforcement;

import java.time.Duration;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;

import javax.annotation.Nullable;

import org.eclipse.ditto.internal.utils.akka.logging.DittoDiagnosticLoggingAdapter;
import org.eclipse.ditto.internal.utils.akka.logging.DittoLoggerFactory;
import org.eclipse.ditto.internal.utils.cache.Cache;
import org.eclipse.ditto.internal.utils.cache.entry.Entry;
import org.eclipse.ditto.internal.utils.cluster.DistPubSubAccess;
import org.eclipse.ditto.internal.utils.namespaces.BlockedNamespaces;
import org.eclipse.ditto.policies.api.PolicyTag;
import org.eclipse.ditto.policies.model.PolicyId;
import org.slf4j.Logger;

import akka.actor.AbstractActor;
import akka.actor.ActorRef;
import akka.actor.ActorRefFactory;
import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.cluster.ddata.ORSet;
import akka.cluster.ddata.Replicator;
import akka.cluster.pubsub.DistributedPubSubMediator;
import akka.japi.pf.ReceiveBuilder;
import akka.pattern.Patterns;

/**
* Transparent caching layer for {@link org.eclipse.ditto.policies.enforcement.PolicyEnforcerProvider}
*/
final class CachingPolicyEnforcerProvider implements PolicyEnforcerProvider {

private static final Logger LOGGER = DittoLoggerFactory.getThreadSafeLogger(CachingPolicyEnforcerProvider.class);
private final ActorRef cachingPolicyEnforcerProviderActor;

CachingPolicyEnforcerProvider(final ActorRefFactory actorRefFactory,
final Cache<PolicyId, Entry<PolicyEnforcer>> policyEnforcerCache,
final PolicyEnforcerProvider delegate,
@Nullable final BlockedNamespaces blockedNamespaces,
final ActorRef pubSubMediator) {
this.cachingPolicyEnforcerProviderActor = actorRefFactory.actorOf(
CachingPolicyEnforcerProviderActor.props(policyEnforcerCache, delegate, blockedNamespaces,
pubSubMediator));
}

@Override
public CompletionStage<Optional<PolicyEnforcer>> getPolicyEnforcer(@Nullable final PolicyId policyId) {
if (policyId == null) {
return CompletableFuture.completedStage(Optional.empty());
}
return Patterns.ask(cachingPolicyEnforcerProviderActor, policyId, Duration.ofSeconds(60))
.thenApply(response -> {
final Optional<PolicyEnforcer> result;
if (response instanceof Optional<?> optional) {
result = optional.map(value -> {
if (value instanceof PolicyEnforcer policyEnforcer) {
return policyEnforcer;
} else {
LOGGER.warn("Did receive Optional holding an unexpected type. " +
"Did expect a PolicyEnforcer but got <{}>.",
value.getClass());
return null;
}
});
} else {
result = Optional.empty();
}
return result;
});
}


/**
* Actor which handles the actual cache lookup and invalidation.
*/
private static final class CachingPolicyEnforcerProviderActor extends AbstractActor {

private final DittoDiagnosticLoggingAdapter log = DittoLoggerFactory.getDiagnosticLoggingAdapter(this);
private final Cache<PolicyId, Entry<PolicyEnforcer>> policyEnforcerCache;
private final PolicyEnforcerProvider delegate;

CachingPolicyEnforcerProviderActor(final Cache<PolicyId, Entry<PolicyEnforcer>> policyEnforcerCache,
final PolicyEnforcerProvider delegate,
@Nullable final BlockedNamespaces blockedNamespaces,
final ActorRef pubSubMediator) {

this.policyEnforcerCache = policyEnforcerCache;
this.delegate = delegate;

if (blockedNamespaces != null) {
blockedNamespaces.subscribeForChanges(getSelf());
}

// subscribe for PolicyTags in order to reload policyEnforcer when "backing policy" was modified
pubSubMediator.tell(DistPubSubAccess.subscribe(PolicyTag.PUB_SUB_TOPIC_INVALIDATE_ENFORCERS, getSelf()),
getSelf());
}

private static Props props(final Cache<PolicyId, Entry<PolicyEnforcer>> policyEnforcerCache,
final PolicyEnforcerProvider delegate,
@Nullable final BlockedNamespaces blockedNamespaces,
final ActorRef pubSubMediator) {
return Props.create(CachingPolicyEnforcerProviderActor.class,
() -> new CachingPolicyEnforcerProviderActor(policyEnforcerCache, delegate, blockedNamespaces,
pubSubMediator));
}

@Override
public Receive createReceive() {
return ReceiveBuilder.create()
.match(PolicyId.class, this::doGetPolicyEnforcer)
.match(DistributedPubSubMediator.SubscribeAck.class, s -> log.debug("Got subscribeAck <{}>.", s))
.match(PolicyTag.class, policyTag -> policyEnforcerCache.invalidate(policyTag.getEntityId()))
.match(Replicator.Changed.class, this::handleChangedBlockedNamespaces)
.build();
}

private void doGetPolicyEnforcer(final PolicyId policyId) {
final ActorRef sender = sender();
final CompletableFuture<Optional<PolicyEnforcer>> policyEnforcerCS = policyEnforcerCache.get(policyId)
.thenCompose(optionalEntry -> {
if (optionalEntry.isPresent()) {
//Value is already cached. Return potentially unavailable value as optional.
final Entry<PolicyEnforcer> policyEnforcerEntry = optionalEntry.get();
return CompletableFuture.completedStage(policyEnforcerEntry.get());
} else {
// Value is not yet cached. Try to load it and put it into cache.
return delegate.getPolicyEnforcer(policyId)
.thenApply(optionalEnforcer -> {
final Entry<PolicyEnforcer> policyEnforcerEntry =
optionalEnforcer.map(enforcer -> Entry.of(0L, enforcer))
.orElseGet(Entry::nonexistent);
policyEnforcerCache.put(policyId, policyEnforcerEntry);
return optionalEnforcer;
});
}
});
Patterns.pipe(policyEnforcerCS, getContext().dispatcher()).to(sender);
}

@SuppressWarnings("unchecked")
private void handleChangedBlockedNamespaces(final Replicator.Changed<?> changed) {
if (changed.dataValue() instanceof ORSet<?> orSet) {
final ORSet<String> namespaces = (ORSet<String>) orSet;
logNamespaces("Received", namespaces);
policyEnforcerCache.asMap().keySet().stream()
.filter(policyId -> {
final String cachedNamespace = policyId.getNamespace();
return namespaces.contains(cachedNamespace);
})
.forEach(policyId -> {
policyEnforcerCache.invalidate(policyId);
});
} else {
log.warning("Unhandled: <{}>", changed);
}
}

private void logNamespaces(final String verb, final ORSet<String> namespaces) {
if (namespaces.size() > 25) {
log.info("{} <{}> namespaces", verb, namespaces.size());
} else {
log.info("{} namespaces: <{}>", verb, namespaces);
}
}

}

}
Expand Up @@ -18,46 +18,48 @@

import javax.annotation.Nullable;

import org.eclipse.ditto.internal.utils.akka.logging.DittoLoggerFactory;
import org.eclipse.ditto.internal.utils.cache.CacheFactory;
import org.eclipse.ditto.internal.utils.cache.config.DefaultCacheConfig;
import org.eclipse.ditto.internal.utils.cache.entry.Entry;
import org.eclipse.ditto.internal.utils.cacheloaders.EnforcementCacheKey;
import org.eclipse.ditto.internal.utils.cacheloaders.config.AskWithRetryConfig;
import org.eclipse.ditto.internal.utils.cluster.ShardRegionProxyActorFactory;
import org.eclipse.ditto.internal.utils.cluster.config.ClusterConfig;
import org.eclipse.ditto.internal.utils.cluster.config.DefaultClusterConfig;
import org.eclipse.ditto.internal.utils.config.DefaultScopedConfig;
import org.eclipse.ditto.internal.utils.namespaces.BlockedNamespaces;
import org.eclipse.ditto.policies.api.PoliciesMessagingConstants;
import org.eclipse.ditto.policies.enforcement.config.DefaultEnforcementConfig;
import org.eclipse.ditto.policies.model.PolicyId;
import org.slf4j.Logger;

import com.github.benmanes.caffeine.cache.AsyncCacheLoader;

import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.cluster.pubsub.DistributedPubSub;
import akka.dispatch.MessageDispatcher;

/**
* Loads the {@link org.eclipse.ditto.policies.model.Policy} from the policies shard region and wraps it into a {@link PolicyEnforcer}.
*/
public final class DefaultPolicyEnforcerProvider implements PolicyEnforcerProvider {

private static Logger LOGGER = DittoLoggerFactory.getThreadSafeLogger(DefaultPolicyEnforcerProvider.class);
private final AsyncCacheLoader<EnforcementCacheKey, Entry<PolicyEnforcer>> policyEnforcerCacheLoader;
private final MessageDispatcher enforcementCacheDispatcher;

DefaultPolicyEnforcerProvider(final ActorSystem actorSystem) {
this(askWithRetryConfig(actorSystem), actorSystem, policiesShardRegion(actorSystem));
this(policyEnforcerCacheLoader(actorSystem), enforcementCacheDispatcher(actorSystem));
}

DefaultPolicyEnforcerProvider(
final AskWithRetryConfig askWithRetryConfig,
final ActorSystem actorSystem,
final ActorRef policiesShardRegion) {
final AsyncCacheLoader<EnforcementCacheKey, Entry<PolicyEnforcer>> policyEnforcerCacheLoader,
final MessageDispatcher enforcementCacheDispatcher) {

this.policyEnforcerCacheLoader = new PolicyEnforcerCacheLoader(askWithRetryConfig,
actorSystem.getScheduler(),
policiesShardRegion
);
enforcementCacheDispatcher =
actorSystem.dispatchers().lookup(PolicyEnforcerCacheLoader.ENFORCEMENT_CACHE_DISPATCHER);
this.policyEnforcerCacheLoader = policyEnforcerCacheLoader;
this.enforcementCacheDispatcher = enforcementCacheDispatcher;
}

/**
Expand All @@ -67,7 +69,14 @@ public final class DefaultPolicyEnforcerProvider implements PolicyEnforcerProvid
* @return the new instance.
*/
public static PolicyEnforcerProvider getInstance(final ActorSystem actorSystem) {
return new DefaultPolicyEnforcerProvider(actorSystem);
final DefaultPolicyEnforcerProvider shardRegionPolicyEnforcerProvider =
new DefaultPolicyEnforcerProvider(actorSystem);
final boolean withCaching = actorSystem.settings().config().getBoolean("ditto.policies-enforcer-cache.enabled");
if (withCaching) {
return shardRegionPolicyEnforcerProvider.withCaching(actorSystem);
} else {
return shardRegionPolicyEnforcerProvider;
}
}

private static AskWithRetryConfig askWithRetryConfig(final ActorSystem actorSystem) {
Expand All @@ -87,6 +96,17 @@ private static ActorRef policiesShardRegion(final ActorSystem actorSystem) {
);
}

private static AsyncCacheLoader<EnforcementCacheKey, Entry<PolicyEnforcer>> policyEnforcerCacheLoader(
final ActorSystem actorSystem) {
final AskWithRetryConfig askWithRetryConfig = askWithRetryConfig(actorSystem);
final ActorRef policiesShardRegion = policiesShardRegion(actorSystem);
return new PolicyEnforcerCacheLoader(askWithRetryConfig, actorSystem.getScheduler(), policiesShardRegion);
}

private static MessageDispatcher enforcementCacheDispatcher(final ActorSystem actorSystem) {
return actorSystem.dispatchers().lookup(PolicyEnforcerCacheLoader.ENFORCEMENT_CACHE_DISPATCHER);
}

/**
* Loads the {@link org.eclipse.ditto.policies.model.Policy} from the policies shard region and wraps it into a {@link PolicyEnforcer}.
*
Expand All @@ -104,9 +124,25 @@ public CompletionStage<Optional<PolicyEnforcer>> getPolicyEnforcer(@Nullable fin
.thenApply(Entry::get)
.exceptionally(error -> Optional.empty());
} catch (final Exception e) {
throw new IllegalStateException("Could not load policyEnforcer via policyEnforcerCacheLoader", e);
LOGGER.warn(
"Got exception when trying to load the policy enforcer via cache loader. This is unexpected"
);
return CompletableFuture.completedStage(Optional.empty());
}
}
}

private PolicyEnforcerProvider withCaching(final ActorSystem actorSystem) {
final var dispatchers = actorSystem.dispatchers();
final var cacheConfig = DefaultCacheConfig.of(actorSystem.settings().config(), "ditto.policies-enforcer-cache");
final var cacheDispatcher = dispatchers.lookup("enforcement-cache-dispatcher");
final var policyEnforcerCache =
CacheFactory.<PolicyId, Entry<PolicyEnforcer>>createCache(cacheConfig, "policy_enforcer_cache",
cacheDispatcher);
final var pubSubMediator = DistributedPubSub.get(actorSystem).mediator();
final var blockedNamespaces = BlockedNamespaces.of(actorSystem);
return new CachingPolicyEnforcerProvider(actorSystem, policyEnforcerCache, this, blockedNamespaces,
pubSubMediator);
}

}
11 changes: 11 additions & 0 deletions policies/enforcement/src/main/resources/reference.conf
Expand Up @@ -5,3 +5,14 @@ enforcement-cache-dispatcher {
type = "Dispatcher"
executor = "thread-pool-executor"
}

ditto.policies-enforcer-cache {
enabled = false
enabled = ${?POLICIES_ENFORCER_CACHING_ENABLED}
maximum-size = 50000
maximum-size = ${?POLICIES_ENFORCER_CACHING_MAX_SIZE}
expire-after-write = 15m
expire-after-write = ${?POLICIES_ENFORCER_CACHING_EXPIRE_AFTER_WRITE}
expire-after-access = 15m
expire-after-access = ${?POLICIES_ENFORCER_CACHING_EXPIRE_AFTER_ACCESS}
}

0 comments on commit 4eccbcb

Please sign in to comment.