Skip to content

Commit

Permalink
#1869 use cache in order to load imported policies when policies are …
Browse files Browse the repository at this point in the history
…loaded after invalidation in search

* cache is used in ResolvedPolicyCacheLoader
* it is now added that an invalidation caused by a policy contains the "causingPolicyTag" - which is then also invalidated
* however, the "causingPolicyTag" is only invalidated once (per search cluster node) to not overwhelm the cluster again due to too many invalidations

Signed-off-by: Thomas Jäckle <thomas.jaeckle@beyonnex.io>
  • Loading branch information
thjaeckle committed Jan 23, 2024
1 parent 2a35c6f commit 0182f9b
Show file tree
Hide file tree
Showing 21 changed files with 299 additions and 209 deletions.
Expand Up @@ -17,6 +17,7 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentMap;
import java.util.function.Function;
import java.util.function.Predicate;

/**
* A general purpose cache for items which are associated with a key.
Expand Down Expand Up @@ -75,6 +76,15 @@ public interface Cache<K, V> {
*/
boolean invalidate(K key);

/**
* Invalidates the passed key from the cache if present. TODO TJ adjust docs
*
* @param key the key to invalidate.
* @param valueCondition
* @return {@code true} if the entry was cached and is now invalidated, {@code false} otherwise.
*/
boolean invalidateConditionally(K key, Predicate<V> valueCondition);

/**
* Associates the {@code value} with the {@code key} in this cache.
* <p>
Expand Down
Expand Up @@ -22,6 +22,7 @@
import java.util.concurrent.Executor;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.function.Predicate;

import javax.annotation.Nullable;

Expand Down Expand Up @@ -230,6 +231,25 @@ public boolean invalidate(final K key) {
return currentlyExisting;
}

@Override
public boolean invalidateConditionally(final K key, final Predicate<V> valueCondition) {
requireNonNull(key);

V value = synchronousCacheView.getIfPresent(key);
if (value != null) {
synchronized (value) {
value = synchronousCacheView.getIfPresent(key);
if (value != null && valueCondition.test(value)) {
return invalidate(key);
} else {
return false;
}
}
} else {
return false;
}
}

// optimized batch invalidation method for caffeine
@Override
public void invalidateAll(final Collection<K> keys) {
Expand Down
Expand Up @@ -17,6 +17,7 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.function.Function;
import java.util.function.Predicate;

/**
* A cache working on an embedded passed {@code cache} with values of type {@code <V>}, representing a projected cache
Expand Down Expand Up @@ -68,6 +69,11 @@ public boolean invalidate(final K key) {
return cache.invalidate(key);
}

@Override
public boolean invalidateConditionally(final K key, final Predicate<U> valueCondition) {
return cache.invalidateConditionally(key, value -> valueCondition.test(project.apply(value)));
}

@Override
public void put(final K key, final U value) {
cache.put(key, embed.apply(value));
Expand Down
Expand Up @@ -21,6 +21,7 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.function.Function;
import java.util.function.Predicate;

import org.eclipse.ditto.internal.utils.cache.Cache;
import org.eclipse.ditto.internal.utils.cache.CacheFactory;
Expand Down Expand Up @@ -102,6 +103,23 @@ public boolean invalidate(final PolicyId policyId) {
return directlyCached || indirectlyCachedViaImport;
}

@Override
public boolean invalidateConditionally(final PolicyId policyId,
final Predicate<Entry<PolicyEnforcer>> valueCondition) {
// Invalidate the changed policy
final boolean directlyCached = delegate.invalidateConditionally(policyId, valueCondition);

// Invalidate all policies that import the changed policy
final boolean indirectlyCachedViaImport = Optional.ofNullable(policyIdToImportingMap.remove(policyId))
.stream()
.flatMap(Collection::stream)
.map(p -> delegate.invalidateConditionally(p, valueCondition))
.reduce((previous, next) -> previous || next)
.orElse(false);

return directlyCached || indirectlyCachedViaImport;
}

@Override
public void put(final PolicyId key, final Entry<PolicyEnforcer> value) {
delegate.put(key, value);
Expand Down
Expand Up @@ -27,6 +27,13 @@

import javax.annotation.Nullable;

import org.apache.pekko.NotUsed;
import org.apache.pekko.actor.ActorSystem;
import org.apache.pekko.event.Logging;
import org.apache.pekko.event.LoggingAdapter;
import org.apache.pekko.japi.pf.PFBuilder;
import org.apache.pekko.stream.SystemMaterializer;
import org.apache.pekko.stream.javadsl.Source;
import org.bson.BsonDocument;
import org.bson.Document;
import org.bson.conversions.Bson;
Expand Down Expand Up @@ -71,13 +78,6 @@
import com.mongodb.reactivestreams.client.MongoCollection;
import com.mongodb.reactivestreams.client.MongoDatabase;

import org.apache.pekko.NotUsed;
import org.apache.pekko.actor.ActorSystem;
import org.apache.pekko.event.Logging;
import org.apache.pekko.event.LoggingAdapter;
import org.apache.pekko.japi.pf.PFBuilder;
import org.apache.pekko.stream.SystemMaterializer;
import org.apache.pekko.stream.javadsl.Source;
import scala.PartialFunction;

/**
Expand Down Expand Up @@ -402,7 +402,7 @@ private static Metadata readAsMetadata(final Document document) {
.map(dittoBsonJson::serialize)
.map(PolicyTag::fromJson)
.collect(Collectors.toSet());
return Metadata.of(thingId, thingRevision, thingPolicyTag, referencedPolicies, modified, null);
return Metadata.of(thingId, thingRevision, thingPolicyTag, null, referencedPolicies, modified, null);
}

private static AbstractWriteModel documentToWriteModel(final Document document) {
Expand Down
Expand Up @@ -24,6 +24,8 @@

import javax.annotation.Nullable;

import org.apache.pekko.actor.ActorRef;
import org.apache.pekko.actor.ActorSelection;
import org.eclipse.ditto.base.model.acks.DittoAcknowledgementLabel;
import org.eclipse.ditto.base.model.common.HttpStatus;
import org.eclipse.ditto.base.model.headers.DittoHeaders;
Expand All @@ -38,9 +40,6 @@
import org.eclipse.ditto.things.model.signals.events.ThingEvent;
import org.eclipse.ditto.thingsearch.api.UpdateReason;

import org.apache.pekko.actor.ActorRef;
import org.apache.pekko.actor.ActorSelection;

/**
* Data class holding information about a "thingEntities" database record.
*/
Expand All @@ -49,6 +48,7 @@ public final class Metadata {
private final ThingId thingId;
private final long thingRevision;
@Nullable private final PolicyTag thingPolicy;
@Nullable private final PolicyTag causingPolicyTag;
private final Set<PolicyTag> allReferencedPolicies;
@Nullable private final Instant modified;
private final List<ThingEvent<?>> events;
Expand All @@ -61,6 +61,7 @@ public final class Metadata {
private Metadata(final ThingId thingId,
final long thingRevision,
@Nullable final PolicyTag thingPolicy,
@Nullable final PolicyTag causingPolicyTag,
final Collection<PolicyTag> allReferencedPolicies,
@Nullable final Instant modified,
final List<ThingEvent<?>> events,
Expand All @@ -73,6 +74,7 @@ private Metadata(final ThingId thingId,
this.thingId = thingId;
this.thingRevision = thingRevision;
this.thingPolicy = thingPolicy;
this.causingPolicyTag = causingPolicyTag;
final HashSet<PolicyTag> policyTags = new HashSet<>(allReferencedPolicies);
if (thingPolicy != null) {
policyTags.add(thingPolicy);
Expand All @@ -93,17 +95,19 @@ private Metadata(final ThingId thingId,
* @param thingId the Thing ID.
* @param thingRevision the Thing revision.
* @param thingPolicy the policy directly referenced by the thing.
* @param causingPolicyTag TODO TJ doc
* @param allReferencedPolicies the policy directly and indirectly (via policy import) referenced by the thing.
* @param timer an optional timer measuring the search updater's consistency lag.
* @return the new Metadata object.
*/
public static Metadata of(final ThingId thingId,
final long thingRevision,
@Nullable final PolicyTag thingPolicy,
@Nullable final PolicyTag causingPolicyTag,
final Collection<PolicyTag> allReferencedPolicies,
@Nullable final StartedTimer timer) {

return new Metadata(thingId, thingRevision, thingPolicy, allReferencedPolicies, null,
return new Metadata(thingId, thingRevision, thingPolicy, causingPolicyTag, allReferencedPolicies, null,
List.of(), null != timer ? List.of(timer) : List.of(), List.of(), false, false,
List.of(UpdateReason.UNKNOWN));
}
Expand All @@ -114,6 +118,7 @@ public static Metadata of(final ThingId thingId,
* @param thingId the Thing ID.
* @param thingRevision the Thing revision.
* @param thingPolicy the policy directly referenced by the thing.
* @param causingPolicyTag TODO TJ doc
* @param allReferencedPolicies the policy directly and indirectly (via policy import) referenced by the thing.
* @param timer an optional timer measuring the search updater's consistency lag.
* @param ackRecipient the ackRecipient.
Expand All @@ -122,12 +127,13 @@ public static Metadata of(final ThingId thingId,
public static Metadata of(final ThingId thingId,
final long thingRevision,
@Nullable final PolicyTag thingPolicy,
@Nullable final PolicyTag causingPolicyTag,
final Collection<PolicyTag> allReferencedPolicies,
final List<ThingEvent<?>> events,
@Nullable final StartedTimer timer,
@Nullable final ActorSelection ackRecipient) {

return new Metadata(thingId, thingRevision, thingPolicy, allReferencedPolicies, null, events,
return new Metadata(thingId, thingRevision, thingPolicy, causingPolicyTag, allReferencedPolicies, null, events,
null != timer ? List.of(timer) : List.of(),
null != ackRecipient ? List.of(ackRecipient) : List.of(), false, false, List.of(UpdateReason.UNKNOWN));
}
Expand Down Expand Up @@ -156,7 +162,7 @@ public static Metadata of(final ThingId thingId,
final Collection<ActorSelection> ackRecipient,
final Collection<UpdateReason> updateReasons) {

return new Metadata(thingId, thingRevision, thingPolicy, allReferencedPolicies, modified, events, timers,
return new Metadata(thingId, thingRevision, thingPolicy, null/*TODO TJ */, allReferencedPolicies, modified, events, timers,
ackRecipient, false, false, updateReasons);
}

Expand All @@ -166,6 +172,7 @@ public static Metadata of(final ThingId thingId,
* @param thingId the Thing ID.
* @param thingRevision the Thing revision.
* @param thingPolicy the policy directly referenced by the thing.
* @param causingPolicyTag TODO TJ
* @param allReferencedPolicies the policy directly and indirectly (via policy import) referenced by the thing.
* @param modified the timestamp of the last change incorporated into the search index, or null if not known.
* @param timer an optional timer measuring the search updater's consistency lag.
Expand All @@ -174,11 +181,12 @@ public static Metadata of(final ThingId thingId,
public static Metadata of(final ThingId thingId,
final long thingRevision,
@Nullable final PolicyTag thingPolicy,
@Nullable final PolicyTag causingPolicyTag,
final Collection<PolicyTag> allReferencedPolicies,
@Nullable final Instant modified,
@Nullable final StartedTimer timer) {

return new Metadata(thingId, thingRevision, thingPolicy, allReferencedPolicies, modified,
return new Metadata(thingId, thingRevision, thingPolicy, causingPolicyTag, allReferencedPolicies, modified,
List.of(), null != timer ? List.of(timer) : List.of(), List.of(), false, false,
List.of(UpdateReason.UNKNOWN));
}
Expand All @@ -190,7 +198,7 @@ public static Metadata of(final ThingId thingId,
* @return the Metadata object.
*/
public static Metadata ofDeleted(final ThingId thingId) {
return Metadata.of(thingId, -1, null, Set.of(), null);
return Metadata.of(thingId, -1, null, null, Set.of(), null);
}

/**
Expand All @@ -199,7 +207,7 @@ public static Metadata ofDeleted(final ThingId thingId) {
* @return the exported metadata.
*/
public Metadata export() {
return Metadata.of(thingId, thingRevision, thingPolicy, allReferencedPolicies, null);
return Metadata.of(thingId, thingRevision, thingPolicy, causingPolicyTag, allReferencedPolicies, null);
}

/**
Expand All @@ -210,7 +218,7 @@ public Metadata export() {
* @return the copy.
*/
public Metadata invalidateCaches(final boolean invalidateThing, final boolean invalidatePolicy) {
return new Metadata(thingId, thingRevision, thingPolicy, allReferencedPolicies, modified, events, timers,
return new Metadata(thingId, thingRevision, thingPolicy, causingPolicyTag, allReferencedPolicies, modified, events, timers,
ackRecipients, invalidateThing, invalidatePolicy, updateReasons);
}

Expand All @@ -220,9 +228,8 @@ public Metadata invalidateCaches(final boolean invalidateThing, final boolean in
* @return the copy.
*/
public Metadata withAckRecipient(final ActorSelection ackRecipient) {
return new Metadata(thingId, thingRevision, thingPolicy, allReferencedPolicies, modified, events, timers,
List.of(ackRecipient),
invalidateThing, invalidatePolicy, updateReasons);
return new Metadata(thingId, thingRevision, thingPolicy, causingPolicyTag, allReferencedPolicies, modified,
events, timers, List.of(ackRecipient), invalidateThing, invalidatePolicy, updateReasons);
}

/**
Expand All @@ -231,8 +238,8 @@ public Metadata withAckRecipient(final ActorSelection ackRecipient) {
* @return the copy.
*/
public Metadata withUpdateReason(final UpdateReason reason) {
return new Metadata(thingId, thingRevision, thingPolicy, allReferencedPolicies, modified, events, timers,
ackRecipients, invalidateThing, invalidatePolicy, List.of(reason));
return new Metadata(thingId, thingRevision, thingPolicy, causingPolicyTag, allReferencedPolicies, modified,
events, timers, ackRecipients, invalidateThing, invalidatePolicy, List.of(reason));
}

/**
Expand Down Expand Up @@ -277,6 +284,14 @@ public Optional<PolicyTag> getThingPolicyTag() {
return Optional.ofNullable(thingPolicy);
}

/**
* TODO TJ doc
* @return
*/
public Optional<PolicyTag> getCausingPolicyTag() {
return Optional.ofNullable(causingPolicyTag);
}

/**
* @return all referenced policy tags. This includes the thing policy as well as all policies imported by the
* thing policy.
Expand Down Expand Up @@ -384,7 +399,8 @@ public Metadata append(final Metadata newMetadata) {
final List<UpdateReason> newReasons = Stream.concat(updateReasons.stream(), newMetadata.updateReasons.stream())
.toList();
return new Metadata(newMetadata.thingId, newMetadata.thingRevision, newMetadata.thingPolicy,
newMetadata.allReferencedPolicies, newMetadata.modified, newEvents, newTimers, newAckRecipients,
newMetadata.causingPolicyTag, newMetadata.allReferencedPolicies, newMetadata.modified, newEvents,
newTimers, newAckRecipients,
invalidateThing || newMetadata.invalidateThing,
invalidatePolicy || newMetadata.invalidatePolicy,
newReasons);
Expand Down Expand Up @@ -435,6 +451,7 @@ public boolean equals(final Object o) {
final Metadata that = (Metadata) o;
return thingRevision == that.thingRevision &&
Objects.equals(thingPolicy, that.thingPolicy) &&
Objects.equals(causingPolicyTag, that.causingPolicyTag) &&
Objects.equals(thingId, that.thingId) &&
Objects.equals(allReferencedPolicies, that.allReferencedPolicies) &&
Objects.equals(modified, that.modified) &&
Expand All @@ -448,8 +465,8 @@ public boolean equals(final Object o) {

@Override
public int hashCode() {
return Objects.hash(thingId, thingRevision, thingPolicy, allReferencedPolicies, modified, events, timers,
ackRecipients, invalidateThing, invalidatePolicy, updateReasons);
return Objects.hash(thingId, thingRevision, thingPolicy, causingPolicyTag, allReferencedPolicies, modified,
events, timers, ackRecipients, invalidateThing, invalidatePolicy, updateReasons);
}

@Override
Expand All @@ -458,6 +475,7 @@ public String toString() {
"thingId=" + thingId +
", thingRevision=" + thingRevision +
", thingPolicy=" + thingPolicy +
", causingPolicyTag=" + causingPolicyTag +
", allReferencedPolicies=" + allReferencedPolicies +
", modified=" + modified +
", events=" + events +
Expand Down
Expand Up @@ -121,7 +121,7 @@ private static boolean isInsideToleranceWindow(final Metadata metadata, final In
}

private static Metadata emptyMetadata() {
return Metadata.of(EMPTY_THING_ID, 0L, PolicyTag.of(EMPTY_POLICY_ID, 0L), Set.of(), null);
return Metadata.of(EMPTY_THING_ID, 0L, PolicyTag.of(EMPTY_POLICY_ID, 0L), null, Set.of(), null);
}

private Source<Metadata, NotUsed> filterForInconsistency(final Pair<Metadata, Metadata> pair) {
Expand Down

0 comments on commit 0182f9b

Please sign in to comment.