Skip to content

Commit

Permalink
Fix cache invalidation
Browse files Browse the repository at this point in the history
* Sometimes the cache was not invalidate because the policyIdForEnforcement
  was null while policyEnforcer wasn't
* I removed this "duplication" of the policy ID since it can be extracted
  from the policyEnforcer anyway. This way it can't happen that the two
  variables are out of sync

Signed-off-by: Yannic Klem <Yannic.Klem@bosch.io>
  • Loading branch information
Yannic92 committed May 31, 2022
1 parent 0111ebb commit e0fe64a
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
package org.eclipse.ditto.policies.enforcement;

import java.time.Duration;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.function.Consumer;
Expand All @@ -24,7 +25,6 @@
import org.eclipse.ditto.base.model.exceptions.DittoInternalErrorException;
import org.eclipse.ditto.base.model.exceptions.DittoRuntimeException;
import org.eclipse.ditto.base.model.headers.WithDittoHeaders;
import org.eclipse.ditto.base.model.namespaces.NamespaceReader;
import org.eclipse.ditto.base.model.signals.Signal;
import org.eclipse.ditto.base.model.signals.commands.CommandResponse;
import org.eclipse.ditto.internal.utils.akka.actors.AbstractActorWithStashWithTimers;
Expand Down Expand Up @@ -68,7 +68,6 @@ public abstract class AbstractEnforcerActor<I extends EntityId, S extends Signal
protected final I entityId;
protected final E enforcement;

@Nullable protected PolicyId policyIdForEnforcement;
@Nullable protected PolicyEnforcer policyEnforcer;

protected AbstractEnforcerActor(final I entityId, final E enforcement, final ActorRef pubSubMediator,
Expand Down Expand Up @@ -142,7 +141,7 @@ public void preStart() throws Exception {
protected Receive activeBehaviour() {
return ReceiveBuilder.create()
.match(DistributedPubSubMediator.SubscribeAck.class, s -> log.debug("Got subscribeAck <{}>.", s))
.match(PolicyTag.class, pt -> pt.getEntityId().equals(policyIdForEnforcement),
.match(PolicyTag.class, this::matchesPolicy,
pt -> performPolicyEnforcerReload()
)
.match(PolicyTag.class, pt -> {
Expand Down Expand Up @@ -177,11 +176,7 @@ public Receive createReceive() {

private void reloadPolicyEnforcer(final Consumer<PolicyEnforcer> successConsumer) {
providePolicyIdForEnforcement()
.thenCompose(policyId -> {
this.policyIdForEnforcement =
policyId; // policyId could be null, e.g. if entity is not yet existing
return providePolicyEnforcer(policyId);
})
.thenCompose(this::providePolicyEnforcer)
.whenComplete((pEnf, throwable) -> {
if (null != throwable) {
policyEnforcer = null;
Expand Down Expand Up @@ -248,20 +243,31 @@ private void invalidateNamespacesAfterDelay(final ORSet<String> namespaces) {
private void invalidateCachedNamespaces(final InvalidateCachedNamespaces invalidate) {
final ORSet<String> namespaces = invalidate.namespaces;
logNamespaces("Invalidating", namespaces);
if (!namespaces.isEmpty() && null != policyIdForEnforcement &&
containsNamespaceOfEntityId(namespaces, policyIdForEnforcement)) {
if (containsRelevantNamespace(namespaces)) {
log.info("Reloading policy enforcer because namespace was added to blocked namespaces.");
performPolicyEnforcerReload();
}
}

private static boolean containsNamespaceOfEntityId(final ORSet<String> namespaces,
final EntityId entityId) {
return NamespaceReader.fromEntityId(entityId)
private boolean containsRelevantNamespace(final ORSet<String> namespaces) {
return getPolicyIdOfCachedEnforcer()
.map(PolicyId::getNamespace)
.map(namespaces::contains)
.orElse(false);
}

private boolean matchesPolicy(final PolicyTag policyTag) {
return getPolicyIdOfCachedEnforcer()
.map(policyTag.getEntityId()::equals)
.orElse(false);
}

protected Optional<PolicyId> getPolicyIdOfCachedEnforcer() {
return Optional.ofNullable(policyEnforcer)
.flatMap(PolicyEnforcer::getPolicy)
.flatMap(Policy::getEntityId);
}

/**
* Enforces the passed {@code signal} using the {@code enforcement} of this actor.
* Successfully enforced signals are sent back to the {@code getSender()} - which is our dear parent, the Supervisor.
Expand Down Expand Up @@ -295,10 +301,10 @@ private CompletionStage<Signal<?>> doEnforceSignal(final S signal, final ActorRe
final ActorRef self = getSelf();
try {
final CompletionStage<S> authorizedSignalStage;
if (null != policyEnforcer) {
authorizedSignalStage = enforcement.authorizeSignal(signal, policyEnforcer);
} else {
if (null == policyEnforcer) {
authorizedSignalStage = enforcement.authorizeSignalWithMissingEnforcer(signal);
} else {
authorizedSignalStage = enforcement.authorizeSignal(signal, policyEnforcer);
}

return authorizedSignalStage.handle((authorizedSignal, throwable) -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@
import org.eclipse.ditto.internal.utils.cacheloaders.EnforcementCacheKey;
import org.eclipse.ditto.internal.utils.cacheloaders.config.AskWithRetryConfig;
import org.eclipse.ditto.internal.utils.namespaces.BlockedNamespaces;
import org.eclipse.ditto.policies.enforcement.AbstractEnforcerActor;
import org.eclipse.ditto.json.JsonFieldSelector;
import org.eclipse.ditto.policies.enforcement.AbstractEnforcerActor;
import org.eclipse.ditto.policies.enforcement.PolicyEnforcer;
import org.eclipse.ditto.policies.enforcement.PolicyEnforcerCacheLoader;
import org.eclipse.ditto.policies.model.PolicyId;
Expand Down Expand Up @@ -86,17 +86,19 @@ public static Props props(final ThingId thingId,

@Override
protected CompletionStage<PolicyId> providePolicyIdForEnforcement() {
if (null != policyIdForEnforcement) {
return CompletableFuture.completedStage(policyIdForEnforcement);
} else {
return Patterns.ask(getContext().getParent(), SudoRetrieveThing.of(entityId,
JsonFieldSelector.newInstance("policyId"),
DittoHeaders.newBuilder()
.correlationId("sudoRetrieveThingFromThingEnforcerActor-" + UUID.randomUUID())
.build()
), DEFAULT_LOCAL_ASK_TIMEOUT
).thenApply(response -> extractPolicyIdFromSudoRetrieveThingResponse(response).orElse(null));
}
return getPolicyIdOfCachedEnforcer()
.map(CompletableFuture::completedStage)
.orElseGet(this::loadPolicyIdFromPersistenceActor);
}

private CompletionStage<PolicyId> loadPolicyIdFromPersistenceActor() {
return Patterns.ask(getContext().getParent(), SudoRetrieveThing.of(entityId,
JsonFieldSelector.newInstance("policyId"),
DittoHeaders.newBuilder()
.correlationId("sudoRetrieveThingFromThingEnforcerActor-" + UUID.randomUUID())
.build()
), DEFAULT_LOCAL_ASK_TIMEOUT
).thenApply(response -> extractPolicyIdFromSudoRetrieveThingResponse(response).orElse(null));
}

/**
Expand Down Expand Up @@ -124,7 +126,8 @@ protected CompletionStage<PolicyEnforcer> providePolicyEnforcer(@Nullable final
} else {
final ActorSystem actorSystem = getContext().getSystem();
if (null == policyEnforcerCacheLoader) {
final AskWithRetryConfig askWithRetryConfig = enforcement.getEnforcementConfig().getAskWithRetryConfig();
final AskWithRetryConfig askWithRetryConfig =
enforcement.getEnforcementConfig().getAskWithRetryConfig();
policyEnforcerCacheLoader = new PolicyEnforcerCacheLoader(askWithRetryConfig,
actorSystem.getScheduler(),
enforcement.getPoliciesShardRegion()
Expand Down

0 comments on commit e0fe64a

Please sign in to comment.