Skip to content

Commit

Permalink
#1882 fixed ResolvedPolicyCacheLoader loading policy imports transiti…
Browse files Browse the repository at this point in the history
…vely

* cause was that by recursively using the cache, also policy imports were resolved transitively
* that could e.g. lead to a "PolicyLabelInvalidException" for scenarios with very nested policy import paths
* in either case, it was breaking the not available transitiveness of policy imports
  • Loading branch information
thjaeckle committed Jan 29, 2024
1 parent bd0f613 commit 7483dcc
Show file tree
Hide file tree
Showing 3 changed files with 72 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import java.util.concurrent.CompletionException;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Executor;
import java.util.function.Predicate;

import javax.annotation.Nullable;

Expand Down Expand Up @@ -81,14 +80,14 @@ final class EnforcementFlow {
Source.single(Entry.nonexistent());
private final Logger log = LoggerFactory.getLogger(getClass());
private final CachingSignalEnrichmentFacade thingsFacade;
private final Cache<PolicyId, Entry<Pair<Policy, Set<PolicyTag>>>> policyEnforcerCache;
private final Cache<PolicyIdResolvingImports, Entry<Pair<Policy, Set<PolicyTag>>>> policyEnforcerCache;
private final Duration cacheRetryDelay;
private final SearchUpdateObserver searchUpdateObserver;
private final int maxArraySize;

private EnforcementFlow(final ActorSystem actorSystem,
final ActorRef thingsShardRegion,
final Cache<PolicyId, Entry<Pair<Policy, Set<PolicyTag>>>> policyEnforcerCache,
final Cache<PolicyIdResolvingImports, Entry<Pair<Policy, Set<PolicyTag>>>> policyEnforcerCache,
final AskWithRetryConfig askWithRetryConfig,
final StreamCacheConfig thingCacheConfig,
final Executor thingCacheDispatcher) {
Expand Down Expand Up @@ -127,11 +126,11 @@ public static EnforcementFlow of(final ActorSystem actorSystem,

final PolicyCacheLoader policyCacheLoader =
PolicyCacheLoader.getNewInstance(askWithRetryConfig, scheduler, policiesShardRegion);
final CompletableFuture<Cache<PolicyId, Entry<Pair<Policy, Set<PolicyTag>>>>> cacheFuture =
final CompletableFuture<Cache<PolicyIdResolvingImports, Entry<Pair<Policy, Set<PolicyTag>>>>> cacheFuture =
new CompletableFuture<>();
final ResolvedPolicyCacheLoader resolvedPolicyCacheLoader =
new ResolvedPolicyCacheLoader(policyCacheLoader, cacheFuture);
final Cache<PolicyId, Entry<Pair<Policy, Set<PolicyTag>>>> policyEnforcerCache =
final Cache<PolicyIdResolvingImports, Entry<Pair<Policy, Set<PolicyTag>>>> policyEnforcerCache =
CacheFactory.createCache(resolvedPolicyCacheLoader, policyCacheConfig,
"things-search_enforcementflow_enforcer_cache_policy", policyCacheDispatcher);
cacheFuture.complete(policyEnforcerCache);
Expand Down Expand Up @@ -332,18 +331,17 @@ private Source<Entry<Pair<Policy, Set<PolicyTag>>>, NotUsed> readCachedEnforcer(

final Source<Entry<Pair<Policy, Set<PolicyTag>>>, ?> lazySource = Source.lazySource(() -> {
final CompletionStage<Source<Entry<Pair<Policy, Set<PolicyTag>>>, NotUsed>> enforcerFuture =
policyEnforcerCache.get(policyId)
policyEnforcerCache.get(new PolicyIdResolvingImports(policyId, true))
.thenApply(optionalEnforcerEntry -> {
if (shouldReloadCache(optionalEnforcerEntry.orElse(null), metadata, iteration)) {
// invalid entry; invalidate and retry after delay
policyEnforcerCache.invalidate(policyId);
policyEnforcerCache.invalidate(new PolicyIdResolvingImports(policyId, true));

// only invalidate causing policy tag once, e.g. when a massively imported policy is changed:
metadata.getCausingPolicyTag()
.filter(Predicate.not(tag -> policyId.equals(tag.getEntityId())))
.ifPresent(causingPolicyTag -> {
final boolean invalidated = policyEnforcerCache.invalidateConditionally(
causingPolicyTag.getEntityId(),
new PolicyIdResolvingImports(causingPolicyTag.getEntityId(), false),
entry -> entry.exists() &&
entry.getRevision() < causingPolicyTag.getRevision()
);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
/*
* Copyright (c) 2024 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0
*
* SPDX-License-Identifier: EPL-2.0
*/
package org.eclipse.ditto.thingsearch.service.persistence.write.streaming;

import org.eclipse.ditto.policies.model.PolicyId;

/**
* Package private cache key for loading policies via {@link ResolvedPolicyCacheLoader} into a policy cache.
*/
record PolicyIdResolvingImports(PolicyId policyId, boolean resolveImports) {}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Executor;

import org.apache.pekko.japi.Pair;
Expand All @@ -29,59 +30,70 @@

import com.github.benmanes.caffeine.cache.AsyncCacheLoader;

final class ResolvedPolicyCacheLoader implements AsyncCacheLoader<PolicyId, Entry<Pair<Policy, Set<PolicyTag>>>> {
final class ResolvedPolicyCacheLoader
implements AsyncCacheLoader<PolicyIdResolvingImports, Entry<Pair<Policy, Set<PolicyTag>>>> {

private final PolicyCacheLoader policyCacheLoader;
private final CompletableFuture<Cache<PolicyId, Entry<Pair<Policy, Set<PolicyTag>>>>> cacheFuture;
private final CompletableFuture<Cache<PolicyIdResolvingImports, Entry<Pair<Policy, Set<PolicyTag>>>>> cacheFuture;

ResolvedPolicyCacheLoader(final PolicyCacheLoader policyCacheLoader,
final CompletableFuture<Cache<PolicyId, Entry<Pair<Policy, Set<PolicyTag>>>>> cacheFuture) {
final CompletableFuture<Cache<PolicyIdResolvingImports, Entry<Pair<Policy, Set<PolicyTag>>>>> cacheFuture) {
this.policyCacheLoader = policyCacheLoader;
this.cacheFuture = cacheFuture;
}

@Override
public CompletableFuture<? extends Entry<Pair<Policy, Set<PolicyTag>>>> asyncLoad(final PolicyId policyId,
public CompletableFuture<? extends Entry<Pair<Policy, Set<PolicyTag>>>> asyncLoad(
final PolicyIdResolvingImports policyIdResolvingImports,
final Executor executor) {

return policyCacheLoader.asyncLoad(policyId, executor)
return policyCacheLoader.asyncLoad(policyIdResolvingImports.policyId(), executor)
.thenCompose(policyEntry -> {
if (policyEntry.exists()) {
final Policy policy = policyEntry.getValueOrThrow();
final long revision = policy.getRevision().map(PolicyRevision::toLong)
.orElseThrow(
() -> new IllegalStateException("Bad SudoRetrievePolicyResponse: no revision"));
final Set<PolicyTag> referencedPolicies = new HashSet<>();
return policy.withResolvedImports(
importedPolicyId -> cacheFuture
.thenCompose(cache -> cache.get(importedPolicyId))
.thenApply(entry -> entry.flatMap(Entry::get))
.thenApply(optionalReferencedPolicy -> {
if (optionalReferencedPolicy.isPresent()) {
final Policy referencedPolicy =
optionalReferencedPolicy.get().first();
final Optional<PolicyRevision> revision =
referencedPolicy.getRevision();
final Optional<PolicyId> entityId =
referencedPolicy.getEntityId();
if (revision.isPresent() && entityId.isPresent()) {
referencedPolicies.add(
PolicyTag.of(entityId.get(),
revision.get().toLong())
);
}
}
return optionalReferencedPolicy.map(Pair::first);
}))
.thenApply(resolvedPolicy -> {
final long revision = policy.getRevision().map(PolicyRevision::toLong)
.orElseThrow(
() -> new IllegalStateException(
"Bad SudoRetrievePolicyResponse: no revision"));
return Entry.of(revision, new Pair<>(resolvedPolicy, referencedPolicies));
});

if (policyIdResolvingImports.resolveImports()) {
return cacheFuture.thenCompose(cache ->
resolvePolicyImports(cache, policy, referencedPolicies)
)
.thenApply(resolvedPolicy ->
Entry.of(revision, new Pair<>(resolvedPolicy, referencedPolicies))
);
} else {
return CompletableFuture.completedFuture(
Entry.of(revision, new Pair<>(policy, referencedPolicies))
);
}
} else {
return CompletableFuture.completedFuture(Entry.nonexistent());
}
});
}

private static CompletionStage<Policy> resolvePolicyImports(
final Cache<PolicyIdResolvingImports, Entry<Pair<Policy, Set<PolicyTag>>>> cache,
final Policy policy,
final Set<PolicyTag> referencedPolicies) {

return policy.withResolvedImports(importedPolicyId ->
cache.get(new PolicyIdResolvingImports(importedPolicyId, false)) // don't transitively resolve imports, only 1 "level"
.thenApply(entry -> entry.flatMap(Entry::get))
.thenApply(optionalReferencedPolicy -> {
if (optionalReferencedPolicy.isPresent()) {
final Policy referencedPolicy = optionalReferencedPolicy.get().first();
final Optional<PolicyRevision> refRevision = referencedPolicy.getRevision();
final Optional<PolicyId> entityId = referencedPolicy.getEntityId();
if (refRevision.isPresent() && entityId.isPresent()) {
referencedPolicies.add(PolicyTag.of(entityId.get(), refRevision.get().toLong()));
}
}
return optionalReferencedPolicy.map(Pair::first);
})
);
}

}

0 comments on commit 7483dcc

Please sign in to comment.