From 0182f9b1e5c427b7412d4cdf5626c35734442d5a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Thomas=20J=C3=A4ckle?= Date: Tue, 23 Jan 2024 17:36:36 +0100 Subject: [PATCH] #1869 use cache in order to load imported policies when policies are loaded after invalidation in search MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * cache is used in ResolvedPolicyCacheLoader * it is now added that an invalidation caused by a policy contains the "causingPolicyTag" - which is then also invalidated * however, the "causingPolicyTag" is only invalidated once (per search cluster node) to not overwhelm the cluster again due to too many invalidations Signed-off-by: Thomas Jäckle --- .../ditto/internal/utils/cache/Cache.java | 10 +++ .../internal/utils/cache/CaffeineCache.java | 20 ++++++ .../internal/utils/cache/ProjectedCache.java | 6 ++ .../enforcement/PolicyEnforcerCache.java | 18 ++++++ .../read/MongoThingsSearchPersistence.java | 16 ++--- .../persistence/write/model/Metadata.java | 54 ++++++++++------ .../write/streaming/BackgroundSyncStream.java | 2 +- .../write/streaming/EnforcementFlow.java | 52 ++++++++++----- .../streaming/ResolvedPolicyCacheLoader.java | 18 ++++-- .../service/updater/actors/ThingUpdater.java | 49 +++++++------- .../updater/actors/ThingsMetadataSource.java | 14 ++-- .../service/persistence/read/SudoIT.java | 7 +- .../write/mapping/BsonDiffVisitorIT.java | 21 +++--- .../persistence/write/model/MetadataTest.java | 12 ++-- .../write/model/ThingWriteModelIT.java | 7 +- .../streaming/BackgroundSyncStreamTest.java | 37 ++++++----- .../streaming/BulkWriteResultAckFlowTest.java | 15 ++--- .../write/streaming/EnforcementFlowTest.java | 42 ++++++------ .../streaming/TestSearchUpdaterStream.java | 7 +- .../actors/BackgroundSyncActorTest.java | 37 ++++++----- .../updater/actors/ThingUpdaterTest.java | 64 +++++++++---------- 21 files changed, 299 insertions(+), 209 deletions(-) diff --git a/internal/utils/cache/src/main/java/org/eclipse/ditto/internal/utils/cache/Cache.java b/internal/utils/cache/src/main/java/org/eclipse/ditto/internal/utils/cache/Cache.java index 4da7837983..e85cbdbd5c 100644 --- a/internal/utils/cache/src/main/java/org/eclipse/ditto/internal/utils/cache/Cache.java +++ b/internal/utils/cache/src/main/java/org/eclipse/ditto/internal/utils/cache/Cache.java @@ -17,6 +17,7 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentMap; import java.util.function.Function; +import java.util.function.Predicate; /** * A general purpose cache for items which are associated with a key. @@ -75,6 +76,15 @@ public interface Cache { */ boolean invalidate(K key); + /** + * Invalidates the passed key from the cache if present. TODO TJ adjust docs + * + * @param key the key to invalidate. + * @param valueCondition + * @return {@code true} if the entry was cached and is now invalidated, {@code false} otherwise. + */ + boolean invalidateConditionally(K key, Predicate valueCondition); + /** * Associates the {@code value} with the {@code key} in this cache. *

diff --git a/internal/utils/cache/src/main/java/org/eclipse/ditto/internal/utils/cache/CaffeineCache.java b/internal/utils/cache/src/main/java/org/eclipse/ditto/internal/utils/cache/CaffeineCache.java index c48f638bfa..9b292f3625 100644 --- a/internal/utils/cache/src/main/java/org/eclipse/ditto/internal/utils/cache/CaffeineCache.java +++ b/internal/utils/cache/src/main/java/org/eclipse/ditto/internal/utils/cache/CaffeineCache.java @@ -22,6 +22,7 @@ import java.util.concurrent.Executor; import java.util.function.BiFunction; import java.util.function.Function; +import java.util.function.Predicate; import javax.annotation.Nullable; @@ -230,6 +231,25 @@ public boolean invalidate(final K key) { return currentlyExisting; } + @Override + public boolean invalidateConditionally(final K key, final Predicate valueCondition) { + requireNonNull(key); + + V value = synchronousCacheView.getIfPresent(key); + if (value != null) { + synchronized (value) { + value = synchronousCacheView.getIfPresent(key); + if (value != null && valueCondition.test(value)) { + return invalidate(key); + } else { + return false; + } + } + } else { + return false; + } + } + // optimized batch invalidation method for caffeine @Override public void invalidateAll(final Collection keys) { diff --git a/internal/utils/cache/src/main/java/org/eclipse/ditto/internal/utils/cache/ProjectedCache.java b/internal/utils/cache/src/main/java/org/eclipse/ditto/internal/utils/cache/ProjectedCache.java index 84628459b5..c40f7ad587 100644 --- a/internal/utils/cache/src/main/java/org/eclipse/ditto/internal/utils/cache/ProjectedCache.java +++ b/internal/utils/cache/src/main/java/org/eclipse/ditto/internal/utils/cache/ProjectedCache.java @@ -17,6 +17,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.function.Function; +import java.util.function.Predicate; /** * A cache working on an embedded passed {@code cache} with values of type {@code }, representing a projected cache @@ -68,6 +69,11 @@ public boolean invalidate(final K key) { return cache.invalidate(key); } + @Override + public boolean invalidateConditionally(final K key, final Predicate valueCondition) { + return cache.invalidateConditionally(key, value -> valueCondition.test(project.apply(value))); + } + @Override public void put(final K key, final U value) { cache.put(key, embed.apply(value)); diff --git a/policies/enforcement/src/main/java/org/eclipse/ditto/policies/enforcement/PolicyEnforcerCache.java b/policies/enforcement/src/main/java/org/eclipse/ditto/policies/enforcement/PolicyEnforcerCache.java index 15e7d262cc..ce4172e562 100644 --- a/policies/enforcement/src/main/java/org/eclipse/ditto/policies/enforcement/PolicyEnforcerCache.java +++ b/policies/enforcement/src/main/java/org/eclipse/ditto/policies/enforcement/PolicyEnforcerCache.java @@ -21,6 +21,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.function.Function; +import java.util.function.Predicate; import org.eclipse.ditto.internal.utils.cache.Cache; import org.eclipse.ditto.internal.utils.cache.CacheFactory; @@ -102,6 +103,23 @@ public boolean invalidate(final PolicyId policyId) { return directlyCached || indirectlyCachedViaImport; } + @Override + public boolean invalidateConditionally(final PolicyId policyId, + final Predicate> valueCondition) { + // Invalidate the changed policy + final boolean directlyCached = delegate.invalidateConditionally(policyId, valueCondition); + + // Invalidate all policies that import the changed policy + final boolean indirectlyCachedViaImport = Optional.ofNullable(policyIdToImportingMap.remove(policyId)) + .stream() + .flatMap(Collection::stream) + .map(p -> delegate.invalidateConditionally(p, valueCondition)) + .reduce((previous, next) -> previous || next) + .orElse(false); + + return directlyCached || indirectlyCachedViaImport; + } + @Override public void put(final PolicyId key, final Entry value) { delegate.put(key, value); diff --git a/thingsearch/service/src/main/java/org/eclipse/ditto/thingsearch/service/persistence/read/MongoThingsSearchPersistence.java b/thingsearch/service/src/main/java/org/eclipse/ditto/thingsearch/service/persistence/read/MongoThingsSearchPersistence.java index cf9a18381b..ffc53f0eb7 100644 --- a/thingsearch/service/src/main/java/org/eclipse/ditto/thingsearch/service/persistence/read/MongoThingsSearchPersistence.java +++ b/thingsearch/service/src/main/java/org/eclipse/ditto/thingsearch/service/persistence/read/MongoThingsSearchPersistence.java @@ -27,6 +27,13 @@ import javax.annotation.Nullable; +import org.apache.pekko.NotUsed; +import org.apache.pekko.actor.ActorSystem; +import org.apache.pekko.event.Logging; +import org.apache.pekko.event.LoggingAdapter; +import org.apache.pekko.japi.pf.PFBuilder; +import org.apache.pekko.stream.SystemMaterializer; +import org.apache.pekko.stream.javadsl.Source; import org.bson.BsonDocument; import org.bson.Document; import org.bson.conversions.Bson; @@ -71,13 +78,6 @@ import com.mongodb.reactivestreams.client.MongoCollection; import com.mongodb.reactivestreams.client.MongoDatabase; -import org.apache.pekko.NotUsed; -import org.apache.pekko.actor.ActorSystem; -import org.apache.pekko.event.Logging; -import org.apache.pekko.event.LoggingAdapter; -import org.apache.pekko.japi.pf.PFBuilder; -import org.apache.pekko.stream.SystemMaterializer; -import org.apache.pekko.stream.javadsl.Source; import scala.PartialFunction; /** @@ -402,7 +402,7 @@ private static Metadata readAsMetadata(final Document document) { .map(dittoBsonJson::serialize) .map(PolicyTag::fromJson) .collect(Collectors.toSet()); - return Metadata.of(thingId, thingRevision, thingPolicyTag, referencedPolicies, modified, null); + return Metadata.of(thingId, thingRevision, thingPolicyTag, null, referencedPolicies, modified, null); } private static AbstractWriteModel documentToWriteModel(final Document document) { diff --git a/thingsearch/service/src/main/java/org/eclipse/ditto/thingsearch/service/persistence/write/model/Metadata.java b/thingsearch/service/src/main/java/org/eclipse/ditto/thingsearch/service/persistence/write/model/Metadata.java index 6cd5261261..11c80abca4 100644 --- a/thingsearch/service/src/main/java/org/eclipse/ditto/thingsearch/service/persistence/write/model/Metadata.java +++ b/thingsearch/service/src/main/java/org/eclipse/ditto/thingsearch/service/persistence/write/model/Metadata.java @@ -24,6 +24,8 @@ import javax.annotation.Nullable; +import org.apache.pekko.actor.ActorRef; +import org.apache.pekko.actor.ActorSelection; import org.eclipse.ditto.base.model.acks.DittoAcknowledgementLabel; import org.eclipse.ditto.base.model.common.HttpStatus; import org.eclipse.ditto.base.model.headers.DittoHeaders; @@ -38,9 +40,6 @@ import org.eclipse.ditto.things.model.signals.events.ThingEvent; import org.eclipse.ditto.thingsearch.api.UpdateReason; -import org.apache.pekko.actor.ActorRef; -import org.apache.pekko.actor.ActorSelection; - /** * Data class holding information about a "thingEntities" database record. */ @@ -49,6 +48,7 @@ public final class Metadata { private final ThingId thingId; private final long thingRevision; @Nullable private final PolicyTag thingPolicy; + @Nullable private final PolicyTag causingPolicyTag; private final Set allReferencedPolicies; @Nullable private final Instant modified; private final List> events; @@ -61,6 +61,7 @@ public final class Metadata { private Metadata(final ThingId thingId, final long thingRevision, @Nullable final PolicyTag thingPolicy, + @Nullable final PolicyTag causingPolicyTag, final Collection allReferencedPolicies, @Nullable final Instant modified, final List> events, @@ -73,6 +74,7 @@ private Metadata(final ThingId thingId, this.thingId = thingId; this.thingRevision = thingRevision; this.thingPolicy = thingPolicy; + this.causingPolicyTag = causingPolicyTag; final HashSet policyTags = new HashSet<>(allReferencedPolicies); if (thingPolicy != null) { policyTags.add(thingPolicy); @@ -93,6 +95,7 @@ private Metadata(final ThingId thingId, * @param thingId the Thing ID. * @param thingRevision the Thing revision. * @param thingPolicy the policy directly referenced by the thing. + * @param causingPolicyTag TODO TJ doc * @param allReferencedPolicies the policy directly and indirectly (via policy import) referenced by the thing. * @param timer an optional timer measuring the search updater's consistency lag. * @return the new Metadata object. @@ -100,10 +103,11 @@ private Metadata(final ThingId thingId, public static Metadata of(final ThingId thingId, final long thingRevision, @Nullable final PolicyTag thingPolicy, + @Nullable final PolicyTag causingPolicyTag, final Collection allReferencedPolicies, @Nullable final StartedTimer timer) { - return new Metadata(thingId, thingRevision, thingPolicy, allReferencedPolicies, null, + return new Metadata(thingId, thingRevision, thingPolicy, causingPolicyTag, allReferencedPolicies, null, List.of(), null != timer ? List.of(timer) : List.of(), List.of(), false, false, List.of(UpdateReason.UNKNOWN)); } @@ -114,6 +118,7 @@ public static Metadata of(final ThingId thingId, * @param thingId the Thing ID. * @param thingRevision the Thing revision. * @param thingPolicy the policy directly referenced by the thing. + * @param causingPolicyTag TODO TJ doc * @param allReferencedPolicies the policy directly and indirectly (via policy import) referenced by the thing. * @param timer an optional timer measuring the search updater's consistency lag. * @param ackRecipient the ackRecipient. @@ -122,12 +127,13 @@ public static Metadata of(final ThingId thingId, public static Metadata of(final ThingId thingId, final long thingRevision, @Nullable final PolicyTag thingPolicy, + @Nullable final PolicyTag causingPolicyTag, final Collection allReferencedPolicies, final List> events, @Nullable final StartedTimer timer, @Nullable final ActorSelection ackRecipient) { - return new Metadata(thingId, thingRevision, thingPolicy, allReferencedPolicies, null, events, + return new Metadata(thingId, thingRevision, thingPolicy, causingPolicyTag, allReferencedPolicies, null, events, null != timer ? List.of(timer) : List.of(), null != ackRecipient ? List.of(ackRecipient) : List.of(), false, false, List.of(UpdateReason.UNKNOWN)); } @@ -156,7 +162,7 @@ public static Metadata of(final ThingId thingId, final Collection ackRecipient, final Collection updateReasons) { - return new Metadata(thingId, thingRevision, thingPolicy, allReferencedPolicies, modified, events, timers, + return new Metadata(thingId, thingRevision, thingPolicy, null/*TODO TJ */, allReferencedPolicies, modified, events, timers, ackRecipient, false, false, updateReasons); } @@ -166,6 +172,7 @@ public static Metadata of(final ThingId thingId, * @param thingId the Thing ID. * @param thingRevision the Thing revision. * @param thingPolicy the policy directly referenced by the thing. + * @param causingPolicyTag TODO TJ * @param allReferencedPolicies the policy directly and indirectly (via policy import) referenced by the thing. * @param modified the timestamp of the last change incorporated into the search index, or null if not known. * @param timer an optional timer measuring the search updater's consistency lag. @@ -174,11 +181,12 @@ public static Metadata of(final ThingId thingId, public static Metadata of(final ThingId thingId, final long thingRevision, @Nullable final PolicyTag thingPolicy, + @Nullable final PolicyTag causingPolicyTag, final Collection allReferencedPolicies, @Nullable final Instant modified, @Nullable final StartedTimer timer) { - return new Metadata(thingId, thingRevision, thingPolicy, allReferencedPolicies, modified, + return new Metadata(thingId, thingRevision, thingPolicy, causingPolicyTag, allReferencedPolicies, modified, List.of(), null != timer ? List.of(timer) : List.of(), List.of(), false, false, List.of(UpdateReason.UNKNOWN)); } @@ -190,7 +198,7 @@ public static Metadata of(final ThingId thingId, * @return the Metadata object. */ public static Metadata ofDeleted(final ThingId thingId) { - return Metadata.of(thingId, -1, null, Set.of(), null); + return Metadata.of(thingId, -1, null, null, Set.of(), null); } /** @@ -199,7 +207,7 @@ public static Metadata ofDeleted(final ThingId thingId) { * @return the exported metadata. */ public Metadata export() { - return Metadata.of(thingId, thingRevision, thingPolicy, allReferencedPolicies, null); + return Metadata.of(thingId, thingRevision, thingPolicy, causingPolicyTag, allReferencedPolicies, null); } /** @@ -210,7 +218,7 @@ public Metadata export() { * @return the copy. */ public Metadata invalidateCaches(final boolean invalidateThing, final boolean invalidatePolicy) { - return new Metadata(thingId, thingRevision, thingPolicy, allReferencedPolicies, modified, events, timers, + return new Metadata(thingId, thingRevision, thingPolicy, causingPolicyTag, allReferencedPolicies, modified, events, timers, ackRecipients, invalidateThing, invalidatePolicy, updateReasons); } @@ -220,9 +228,8 @@ public Metadata invalidateCaches(final boolean invalidateThing, final boolean in * @return the copy. */ public Metadata withAckRecipient(final ActorSelection ackRecipient) { - return new Metadata(thingId, thingRevision, thingPolicy, allReferencedPolicies, modified, events, timers, - List.of(ackRecipient), - invalidateThing, invalidatePolicy, updateReasons); + return new Metadata(thingId, thingRevision, thingPolicy, causingPolicyTag, allReferencedPolicies, modified, + events, timers, List.of(ackRecipient), invalidateThing, invalidatePolicy, updateReasons); } /** @@ -231,8 +238,8 @@ public Metadata withAckRecipient(final ActorSelection ackRecipient) { * @return the copy. */ public Metadata withUpdateReason(final UpdateReason reason) { - return new Metadata(thingId, thingRevision, thingPolicy, allReferencedPolicies, modified, events, timers, - ackRecipients, invalidateThing, invalidatePolicy, List.of(reason)); + return new Metadata(thingId, thingRevision, thingPolicy, causingPolicyTag, allReferencedPolicies, modified, + events, timers, ackRecipients, invalidateThing, invalidatePolicy, List.of(reason)); } /** @@ -277,6 +284,14 @@ public Optional getThingPolicyTag() { return Optional.ofNullable(thingPolicy); } + /** + * TODO TJ doc + * @return + */ + public Optional getCausingPolicyTag() { + return Optional.ofNullable(causingPolicyTag); + } + /** * @return all referenced policy tags. This includes the thing policy as well as all policies imported by the * thing policy. @@ -384,7 +399,8 @@ public Metadata append(final Metadata newMetadata) { final List newReasons = Stream.concat(updateReasons.stream(), newMetadata.updateReasons.stream()) .toList(); return new Metadata(newMetadata.thingId, newMetadata.thingRevision, newMetadata.thingPolicy, - newMetadata.allReferencedPolicies, newMetadata.modified, newEvents, newTimers, newAckRecipients, + newMetadata.causingPolicyTag, newMetadata.allReferencedPolicies, newMetadata.modified, newEvents, + newTimers, newAckRecipients, invalidateThing || newMetadata.invalidateThing, invalidatePolicy || newMetadata.invalidatePolicy, newReasons); @@ -435,6 +451,7 @@ public boolean equals(final Object o) { final Metadata that = (Metadata) o; return thingRevision == that.thingRevision && Objects.equals(thingPolicy, that.thingPolicy) && + Objects.equals(causingPolicyTag, that.causingPolicyTag) && Objects.equals(thingId, that.thingId) && Objects.equals(allReferencedPolicies, that.allReferencedPolicies) && Objects.equals(modified, that.modified) && @@ -448,8 +465,8 @@ public boolean equals(final Object o) { @Override public int hashCode() { - return Objects.hash(thingId, thingRevision, thingPolicy, allReferencedPolicies, modified, events, timers, - ackRecipients, invalidateThing, invalidatePolicy, updateReasons); + return Objects.hash(thingId, thingRevision, thingPolicy, causingPolicyTag, allReferencedPolicies, modified, + events, timers, ackRecipients, invalidateThing, invalidatePolicy, updateReasons); } @Override @@ -458,6 +475,7 @@ public String toString() { "thingId=" + thingId + ", thingRevision=" + thingRevision + ", thingPolicy=" + thingPolicy + + ", causingPolicyTag=" + causingPolicyTag + ", allReferencedPolicies=" + allReferencedPolicies + ", modified=" + modified + ", events=" + events + diff --git a/thingsearch/service/src/main/java/org/eclipse/ditto/thingsearch/service/persistence/write/streaming/BackgroundSyncStream.java b/thingsearch/service/src/main/java/org/eclipse/ditto/thingsearch/service/persistence/write/streaming/BackgroundSyncStream.java index e0ec1fbc90..3b8c681d2f 100644 --- a/thingsearch/service/src/main/java/org/eclipse/ditto/thingsearch/service/persistence/write/streaming/BackgroundSyncStream.java +++ b/thingsearch/service/src/main/java/org/eclipse/ditto/thingsearch/service/persistence/write/streaming/BackgroundSyncStream.java @@ -121,7 +121,7 @@ private static boolean isInsideToleranceWindow(final Metadata metadata, final In } private static Metadata emptyMetadata() { - return Metadata.of(EMPTY_THING_ID, 0L, PolicyTag.of(EMPTY_POLICY_ID, 0L), Set.of(), null); + return Metadata.of(EMPTY_THING_ID, 0L, PolicyTag.of(EMPTY_POLICY_ID, 0L), null, Set.of(), null); } private Source filterForInconsistency(final Pair pair) { diff --git a/thingsearch/service/src/main/java/org/eclipse/ditto/thingsearch/service/persistence/write/streaming/EnforcementFlow.java b/thingsearch/service/src/main/java/org/eclipse/ditto/thingsearch/service/persistence/write/streaming/EnforcementFlow.java index 00e57b9847..5e0104de29 100644 --- a/thingsearch/service/src/main/java/org/eclipse/ditto/thingsearch/service/persistence/write/streaming/EnforcementFlow.java +++ b/thingsearch/service/src/main/java/org/eclipse/ditto/thingsearch/service/persistence/write/streaming/EnforcementFlow.java @@ -18,12 +18,23 @@ import java.util.Comparator; import java.util.List; import java.util.Set; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; import java.util.concurrent.CompletionStage; import java.util.concurrent.Executor; import javax.annotation.Nullable; +import org.apache.pekko.NotUsed; +import org.apache.pekko.actor.ActorRef; +import org.apache.pekko.actor.ActorSystem; +import org.apache.pekko.actor.Scheduler; +import org.apache.pekko.japi.Pair; +import org.apache.pekko.japi.pf.PFBuilder; +import org.apache.pekko.pattern.AskTimeoutException; +import org.apache.pekko.stream.javadsl.Flow; +import org.apache.pekko.stream.javadsl.Keep; +import org.apache.pekko.stream.javadsl.Source; import org.eclipse.ditto.base.model.exceptions.AskException; import org.eclipse.ditto.internal.models.signalenrichment.CachingSignalEnrichmentFacade; import org.eclipse.ditto.internal.models.streaming.AbstractEntityIdWithRevision; @@ -60,17 +71,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.pekko.NotUsed; -import org.apache.pekko.actor.ActorRef; -import org.apache.pekko.actor.ActorSystem; -import org.apache.pekko.actor.Scheduler; -import org.apache.pekko.japi.Pair; -import org.apache.pekko.japi.pf.PFBuilder; -import org.apache.pekko.pattern.AskTimeoutException; -import org.apache.pekko.stream.javadsl.Flow; -import org.apache.pekko.stream.javadsl.Keep; -import org.apache.pekko.stream.javadsl.Source; - /** * Converts Thing changes into write models by retrieving data and applying enforcement via an enforcer cache. */ @@ -126,10 +126,14 @@ public static EnforcementFlow of(final ActorSystem actorSystem, final PolicyCacheLoader policyCacheLoader = PolicyCacheLoader.getNewInstance(askWithRetryConfig, scheduler, policiesShardRegion); - final ResolvedPolicyCacheLoader resolvedPolicyCacheLoader = new ResolvedPolicyCacheLoader(policyCacheLoader); + final CompletableFuture>>>> cacheFuture = + new CompletableFuture<>(); + final ResolvedPolicyCacheLoader resolvedPolicyCacheLoader = + new ResolvedPolicyCacheLoader(policyCacheLoader, cacheFuture); final Cache>>> policyEnforcerCache = CacheFactory.createCache(resolvedPolicyCacheLoader, policyCacheConfig, "things-search_enforcementflow_enforcer_cache_policy", policyCacheDispatcher); + cacheFuture.complete(policyEnforcerCache); final var thingCacheConfig = updaterStreamConfig.getThingCacheConfig(); final var thingCacheDispatcher = actorSystem.dispatchers() @@ -206,7 +210,7 @@ public Flow create(final SearchUpda return computeWriteModel(data.metadata(), thing); }) .flatMapConcat(writeModel -> mapper.processWriteModel(writeModel, data.lastWriteModel()) - .orElse(Source.lazily(() -> { + .orElse(Source.lazySource(() -> { data.metadata().sendWeakAck(null); return Source.empty(); }))) @@ -230,7 +234,7 @@ private Source, NotUsed> retrieveThingFromCachingFacad return retrieveThingFromCachingFacade(thingId, metadata, leftRetryAttempts - 1); } else { log.warn("No retries left, try to SudoRetrieveThing via cache, therefore giving " + - "up for thingId <{}>", thingId); + "up for thingId <{}>", thingId); return Source.empty(); } } @@ -285,7 +289,8 @@ private Source computeWriteModel(final Metadata met final Throwable fetchErrorCause = entry.getFetchErrorCause().orElse( new IllegalStateException("No fetch error cause present when it should be") ); - log.warn("Computed - due to fetch error <{}: {}> on policy cache - 'no op' ThingWriteModel " + + log.warn( + "Computed - due to fetch error <{}: {}> on policy cache - 'no op' ThingWriteModel " + "for metadata <{}> and thing <{}>", fetchErrorCause.getClass().getSimpleName(), fetchErrorCause.getMessage(), metadata, thing, fetchErrorCause @@ -294,7 +299,7 @@ private Source computeWriteModel(final Metadata met } else { // no enforcer; "empty out" thing in search index log.warn("Computed - due to missing enforcer - 'emptied out' ThingWriteModel for " + - "metadata <{}> and thing <{}>", metadata, thing); + "metadata <{}> and thing <{}>", metadata, thing); return ThingWriteModel.ofEmptiedOut(metadata); } } @@ -309,7 +314,8 @@ private Source computeWriteModel(final Metadata met * @param thing the thing * @return source of an enforcer or an empty source. */ - private Source>>, NotUsed> getPolicy(final Metadata metadata, final JsonObject thing) { + private Source>>, NotUsed> getPolicy(final Metadata metadata, + final JsonObject thing) { try { return thing.getValue(Thing.JsonFields.POLICY_ID) .map(PolicyId::of) @@ -330,6 +336,18 @@ private Source>>, NotUsed> readCachedEnforcer( if (shouldReloadCache(optionalEnforcerEntry.orElse(null), metadata, iteration)) { // invalid entry; invalidate and retry after delay policyEnforcerCache.invalidate(policyId); + + // only invalidate causing policy tag once, e.g. when a massively imported policy is changed: + metadata.getCausingPolicyTag() + .ifPresent(causingPolicyTag -> { + final boolean invalidated = policyEnforcerCache.invalidateConditionally( + causingPolicyTag.getEntityId(), + entry -> entry.exists() && + entry.getRevision() < causingPolicyTag.getRevision() + ); + log.debug("Causing policy tag was invalidated conditionally: <{}>", invalidated); + }); + return readCachedEnforcer(metadata, policyId, iteration + 1) .initialDelay(cacheRetryDelay); } else { diff --git a/thingsearch/service/src/main/java/org/eclipse/ditto/thingsearch/service/persistence/write/streaming/ResolvedPolicyCacheLoader.java b/thingsearch/service/src/main/java/org/eclipse/ditto/thingsearch/service/persistence/write/streaming/ResolvedPolicyCacheLoader.java index 1ccca8e553..987e8016ac 100644 --- a/thingsearch/service/src/main/java/org/eclipse/ditto/thingsearch/service/persistence/write/streaming/ResolvedPolicyCacheLoader.java +++ b/thingsearch/service/src/main/java/org/eclipse/ditto/thingsearch/service/persistence/write/streaming/ResolvedPolicyCacheLoader.java @@ -18,6 +18,8 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executor; +import org.apache.pekko.japi.Pair; +import org.eclipse.ditto.internal.utils.cache.Cache; import org.eclipse.ditto.internal.utils.cache.entry.Entry; import org.eclipse.ditto.policies.api.PolicyTag; import org.eclipse.ditto.policies.enforcement.PolicyCacheLoader; @@ -27,14 +29,15 @@ import com.github.benmanes.caffeine.cache.AsyncCacheLoader; -import org.apache.pekko.japi.Pair; - final class ResolvedPolicyCacheLoader implements AsyncCacheLoader>>> { private final PolicyCacheLoader policyCacheLoader; + private final CompletableFuture>>>> cacheFuture; - ResolvedPolicyCacheLoader(final PolicyCacheLoader policyCacheLoader) { + ResolvedPolicyCacheLoader(final PolicyCacheLoader policyCacheLoader, + final CompletableFuture>>>> cacheFuture) { this.policyCacheLoader = policyCacheLoader; + this.cacheFuture = cacheFuture; } @Override @@ -47,12 +50,13 @@ public CompletableFuture>>> asyncLoa final Policy policy = policyEntry.getValueOrThrow(); final Set referencedPolicies = new HashSet<>(); return policy.withResolvedImports( - importedPolicyId -> policyCacheLoader.asyncLoad(importedPolicyId, executor) - .thenApply(Entry::get) + importedPolicyId -> cacheFuture + .thenCompose(cache -> cache.get(importedPolicyId)) + .thenApply(entry -> entry.flatMap(Entry::get)) .thenApply(optionalReferencedPolicy -> { if (optionalReferencedPolicy.isPresent()) { final Policy referencedPolicy = - optionalReferencedPolicy.get(); + optionalReferencedPolicy.get().first(); final Optional revision = referencedPolicy.getRevision(); final Optional entityId = @@ -64,7 +68,7 @@ public CompletableFuture>>> asyncLoa ); } } - return optionalReferencedPolicy; + return optionalReferencedPolicy.map(Pair::first); })) .thenApply(resolvedPolicy -> { final long revision = policy.getRevision().map(PolicyRevision::toLong) diff --git a/thingsearch/service/src/main/java/org/eclipse/ditto/thingsearch/service/updater/actors/ThingUpdater.java b/thingsearch/service/src/main/java/org/eclipse/ditto/thingsearch/service/updater/actors/ThingUpdater.java index 500c768e7f..d9f1feb366 100644 --- a/thingsearch/service/src/main/java/org/eclipse/ditto/thingsearch/service/updater/actors/ThingUpdater.java +++ b/thingsearch/service/src/main/java/org/eclipse/ditto/thingsearch/service/updater/actors/ThingUpdater.java @@ -25,6 +25,24 @@ import javax.annotation.Nullable; +import org.apache.pekko.Done; +import org.apache.pekko.NotUsed; +import org.apache.pekko.actor.AbstractFSMWithStash; +import org.apache.pekko.actor.ActorRef; +import org.apache.pekko.actor.ActorSelection; +import org.apache.pekko.actor.FSM; +import org.apache.pekko.actor.Props; +import org.apache.pekko.cluster.pubsub.DistributedPubSubMediator; +import org.apache.pekko.japi.pf.FSMStateFunctionBuilder; +import org.apache.pekko.japi.pf.PFBuilder; +import org.apache.pekko.pattern.Patterns; +import org.apache.pekko.stream.KillSwitches; +import org.apache.pekko.stream.Materializer; +import org.apache.pekko.stream.UniqueKillSwitch; +import org.apache.pekko.stream.javadsl.Flow; +import org.apache.pekko.stream.javadsl.Keep; +import org.apache.pekko.stream.javadsl.Sink; +import org.apache.pekko.stream.javadsl.Source; import org.bson.BsonDocument; import org.eclipse.ditto.base.api.common.ShutdownReasonType; import org.eclipse.ditto.base.model.acks.AcknowledgementRequest; @@ -33,13 +51,13 @@ import org.eclipse.ditto.base.model.headers.DittoHeaders; import org.eclipse.ditto.base.service.actors.ShutdownBehaviour; import org.eclipse.ditto.base.service.config.supervision.ExponentialBackOff; -import org.eclipse.ditto.internal.utils.pekko.logging.DittoDiagnosticLoggingAdapter; -import org.eclipse.ditto.internal.utils.pekko.logging.DittoLogger; -import org.eclipse.ditto.internal.utils.pekko.logging.DittoLoggerFactory; import org.eclipse.ditto.internal.utils.cluster.StopShardedActor; import org.eclipse.ditto.internal.utils.metrics.DittoMetrics; import org.eclipse.ditto.internal.utils.metrics.instruments.counter.Counter; import org.eclipse.ditto.internal.utils.metrics.instruments.timer.StartedTimer; +import org.eclipse.ditto.internal.utils.pekko.logging.DittoDiagnosticLoggingAdapter; +import org.eclipse.ditto.internal.utils.pekko.logging.DittoLogger; +import org.eclipse.ditto.internal.utils.pekko.logging.DittoLoggerFactory; import org.eclipse.ditto.internal.utils.tracing.DittoTracing; import org.eclipse.ditto.json.JsonValue; import org.eclipse.ditto.policies.api.PolicyTag; @@ -59,25 +77,6 @@ import com.mongodb.client.model.DeleteOneModel; -import org.apache.pekko.Done; -import org.apache.pekko.NotUsed; -import org.apache.pekko.actor.AbstractFSMWithStash; -import org.apache.pekko.actor.ActorRef; -import org.apache.pekko.actor.ActorSelection; -import org.apache.pekko.actor.FSM; -import org.apache.pekko.actor.Props; -import org.apache.pekko.cluster.pubsub.DistributedPubSubMediator; -import org.apache.pekko.japi.pf.FSMStateFunctionBuilder; -import org.apache.pekko.japi.pf.PFBuilder; -import org.apache.pekko.pattern.Patterns; -import org.apache.pekko.stream.KillSwitches; -import org.apache.pekko.stream.Materializer; -import org.apache.pekko.stream.UniqueKillSwitch; -import org.apache.pekko.stream.javadsl.Flow; -import org.apache.pekko.stream.javadsl.Keep; -import org.apache.pekko.stream.javadsl.Sink; -import org.apache.pekko.stream.javadsl.Source; - /** * This Actor initiates persistence updates related to 1 thing. */ @@ -445,6 +444,8 @@ private FSM.State onPolicyReferenceTag(final PolicyReferenceTag pol affectedOldPolicyTag); } + // TODO TJ invalidate cache entry?? + final var policyTag = policyReferenceTag.getPolicyTag(); if (affectedOldPolicyTag == null || affectedOldPolicyTag.getRevision() < policyTag.getRevision()) { final PolicyTag thingPolicyTag = Optional.ofNullable(affectedOldPolicyTag) @@ -462,7 +463,7 @@ private FSM.State onPolicyReferenceTag(final PolicyReferenceTag pol final Set allReferencedPolicyTags = buildNewAllReferencedPolicyTags(data.metadata() .getAllReferencedPolicyTags(), policyTag); - final var newMetadata = Metadata.of(thingId, thingRevision, thingPolicyTag, allReferencedPolicyTags, null) + final var newMetadata = Metadata.of(thingId, thingRevision, thingPolicyTag, policyTag, allReferencedPolicyTags, null) .withUpdateReason(UpdateReason.POLICY_UPDATE) .invalidateCaches(false, true); @@ -610,6 +611,7 @@ private Metadata exportMetadataWithSender(final boolean shouldAcknowledge, final long thingRevision = event.getRevision(); if (shouldAcknowledge) { return Metadata.of(thingId, thingRevision, data.metadata().getThingPolicyTag().orElse(null), + data.metadata.getCausingPolicyTag().orElse(null), data.metadata().getAllReferencedPolicyTags(), List.of(event), consistencyLagTimer, ackRecipient); } else { @@ -620,6 +622,7 @@ private Metadata exportMetadataWithSender(final boolean shouldAcknowledge, private Metadata exportMetadata(@Nullable final ThingEvent event, final long thingRevision, @Nullable final StartedTimer timer, final Data data) { return Metadata.of(thingId, thingRevision, data.metadata().getThingPolicyTag().orElse(null), + data.metadata().getCausingPolicyTag().orElse(null), data.metadata().getAllReferencedPolicyTags(), event == null ? List.of() : List.of(event), timer, null); } diff --git a/thingsearch/service/src/main/java/org/eclipse/ditto/thingsearch/service/updater/actors/ThingsMetadataSource.java b/thingsearch/service/src/main/java/org/eclipse/ditto/thingsearch/service/updater/actors/ThingsMetadataSource.java index ef435911b5..d9f3ae3f79 100644 --- a/thingsearch/service/src/main/java/org/eclipse/ditto/thingsearch/service/updater/actors/ThingsMetadataSource.java +++ b/thingsearch/service/src/main/java/org/eclipse/ditto/thingsearch/service/updater/actors/ThingsMetadataSource.java @@ -18,6 +18,11 @@ import java.util.Optional; import java.util.Set; +import org.apache.pekko.NotUsed; +import org.apache.pekko.actor.ActorRef; +import org.apache.pekko.pattern.Patterns; +import org.apache.pekko.stream.SourceRef; +import org.apache.pekko.stream.javadsl.Source; import org.eclipse.ditto.base.model.headers.DittoHeaders; import org.eclipse.ditto.internal.models.streaming.LowerBound; import org.eclipse.ditto.internal.models.streaming.StreamedSnapshot; @@ -32,12 +37,6 @@ import org.eclipse.ditto.things.model.ThingId; import org.eclipse.ditto.thingsearch.service.persistence.write.model.Metadata; -import org.apache.pekko.NotUsed; -import org.apache.pekko.actor.ActorRef; -import org.apache.pekko.pattern.Patterns; -import org.apache.pekko.stream.SourceRef; -import org.apache.pekko.stream.javadsl.Source; - /** * Source of metadata streamed from things-service. */ @@ -124,7 +123,8 @@ private static Optional toMetadata(final StreamedSnapshot streamedSnap final Instant modified = snapshot.getValue(Thing.JsonFields.MODIFIED).map(Instant::parse).orElse(null); // policy revision is not known from thing snapshot final Optional policyTag = optionalPolicyId.map(policyId -> PolicyTag.of(policyId, 0L)); - return Optional.of(Metadata.of(thingId, thingRevision, policyTag.orElse(null), Set.of(), modified, null)); + return Optional.of( + Metadata.of(thingId, thingRevision, policyTag.orElse(null), null, Set.of(), modified, null)); } catch (PolicyIdInvalidException e) { return Optional.empty(); } diff --git a/thingsearch/service/src/test/java/org/eclipse/ditto/thingsearch/service/persistence/read/SudoIT.java b/thingsearch/service/src/test/java/org/eclipse/ditto/thingsearch/service/persistence/read/SudoIT.java index f3dbc4b94b..7ee9d57b6b 100644 --- a/thingsearch/service/src/test/java/org/eclipse/ditto/thingsearch/service/persistence/read/SudoIT.java +++ b/thingsearch/service/src/test/java/org/eclipse/ditto/thingsearch/service/persistence/read/SudoIT.java @@ -18,6 +18,7 @@ import java.util.Map; import java.util.Set; +import org.apache.pekko.stream.javadsl.Sink; import org.eclipse.ditto.base.service.config.limits.LimitsConfig; import org.eclipse.ditto.internal.models.streaming.LowerBound; import org.eclipse.ditto.json.JsonArray; @@ -40,8 +41,6 @@ import org.junit.Test; import org.mockito.Mockito; -import org.apache.pekko.stream.javadsl.Sink; - /** * Test sudo methods. */ @@ -99,9 +98,9 @@ public void sudoCount() { @Test public void sudoStreamMetadata() { final Metadata metadata1 = - Metadata.of(THING1_ID, 1L, PolicyTag.of(PolicyId.of(THING1_ID), 0L), Set.of(), TIMESTAMP1, null); + Metadata.of(THING1_ID, 1L, PolicyTag.of(PolicyId.of(THING1_ID), 0L), null, Set.of(), TIMESTAMP1, null); final Metadata metadata2 = - Metadata.of(THING2_ID, 2L, PolicyTag.of(PolicyId.of(THING2_ID), 0L), Set.of(), TIMESTAMP2, null); + Metadata.of(THING2_ID, 2L, PolicyTag.of(PolicyId.of(THING2_ID), 0L), null, Set.of(), TIMESTAMP2, null); assertThat(waitFor(readPersistence.sudoStreamMetadata(LowerBound.emptyEntityId(ThingConstants.ENTITY_TYPE)))) .containsExactly(metadata1, metadata2); diff --git a/thingsearch/service/src/test/java/org/eclipse/ditto/thingsearch/service/persistence/write/mapping/BsonDiffVisitorIT.java b/thingsearch/service/src/test/java/org/eclipse/ditto/thingsearch/service/persistence/write/mapping/BsonDiffVisitorIT.java index cade204a9f..82283c46ee 100644 --- a/thingsearch/service/src/test/java/org/eclipse/ditto/thingsearch/service/persistence/write/mapping/BsonDiffVisitorIT.java +++ b/thingsearch/service/src/test/java/org/eclipse/ditto/thingsearch/service/persistence/write/mapping/BsonDiffVisitorIT.java @@ -18,6 +18,10 @@ import java.util.List; import java.util.Set; +import org.apache.pekko.actor.ActorSystem; +import org.apache.pekko.stream.javadsl.Sink; +import org.apache.pekko.stream.javadsl.Source; +import org.apache.pekko.testkit.javadsl.TestKit; import org.bson.BsonDocument; import org.bson.Document; import org.bson.codecs.DecoderContext; @@ -42,11 +46,6 @@ import com.mongodb.reactivestreams.client.MongoCollection; -import org.apache.pekko.actor.ActorSystem; -import org.apache.pekko.stream.javadsl.Sink; -import org.apache.pekko.stream.javadsl.Source; -import org.apache.pekko.testkit.javadsl.TestKit; - /** * Tests incremental update. */ @@ -105,7 +104,7 @@ public void testAggregationUpdate() { final Metadata metadata = Metadata.of(ThingId.of("solar.system:pluto"), 23L, - PolicyTag.of(PolicyId.of("solar.system:pluto"), 45L), Set.of(), null, null); + PolicyTag.of(PolicyId.of("solar.system:pluto"), 45L), null, Set.of(), null, null); final JsonObject prevThing = getThing1(); final JsonObject nextThing = getThing2(); // Thing1 with some fields updated @@ -140,7 +139,7 @@ public void testEnforcerChange() { final Metadata metadata = Metadata.of(ThingId.of("solar.system:pluto"), 23L, - PolicyTag.of(PolicyId.of("solar.system:pluto"), 45L), Set.of(), null, null); + PolicyTag.of(PolicyId.of("solar.system:pluto"), 45L), null, Set.of(), null, null); final JsonObject prevThing = getThing1(); final JsonObject nextThing = getThing1(); @@ -175,7 +174,7 @@ public void testEnforcerAndThingChange() { final Metadata metadata = Metadata.of(ThingId.of("solar.system:pluto"), 23L, - PolicyTag.of(PolicyId.of("solar.system:pluto"), 45L), Set.of(), null, null); + PolicyTag.of(PolicyId.of("solar.system:pluto"), 45L), null, Set.of(), null, null); final JsonObject prevThing = getThing1(); final JsonObject nextThing = getThing2(); @@ -210,7 +209,7 @@ public void testArrayDiffPropertyDeleted() { final Metadata metadata = Metadata.of(ThingId.of("solar.system:pluto"), 23L, - PolicyTag.of(PolicyId.of("solar.system:pluto"), 45L), Set.of(), null, null); + PolicyTag.of(PolicyId.of("solar.system:pluto"), 45L), null, Set.of(), null, null); final JsonObject prevThing = getThing1(); final JsonObject nextThing = getThing6(); // identical to Thing1 with property deleted @@ -247,7 +246,7 @@ public void testSetEmptyObject() { final Metadata metadata = Metadata.of(ThingId.of("solar.system:pluto"), 23L, - PolicyTag.of(PolicyId.of("solar.system:pluto"), 45L), Set.of(), null, null); + PolicyTag.of(PolicyId.of("solar.system:pluto"), 45L), null, Set.of(), null, null); final JsonObject prevThing = getThing1(); final JsonObject nextThing = getThing4(); // identical to Thing1 with an extra fields with empty object as value @@ -284,7 +283,7 @@ public void testStringExpressionInUpdate() { final Metadata metadata = Metadata.of(ThingId.of("solar.system:pluto"), 23L, - PolicyTag.of(PolicyId.of("solar.system:pluto"), 45L), Set.of(), null, null); + PolicyTag.of(PolicyId.of("solar.system:pluto"), 45L), null, Set.of(), null, null); final JsonObject prevThing = getThing1(); final JsonObject nextThing = getThing5(); // Thing1 with string field updated to begin with '$' and end with '.' diff --git a/thingsearch/service/src/test/java/org/eclipse/ditto/thingsearch/service/persistence/write/model/MetadataTest.java b/thingsearch/service/src/test/java/org/eclipse/ditto/thingsearch/service/persistence/write/model/MetadataTest.java index 08417d1246..44e670d5f8 100644 --- a/thingsearch/service/src/test/java/org/eclipse/ditto/thingsearch/service/persistence/write/model/MetadataTest.java +++ b/thingsearch/service/src/test/java/org/eclipse/ditto/thingsearch/service/persistence/write/model/MetadataTest.java @@ -17,14 +17,14 @@ import java.util.List; import java.util.Set; +import org.apache.pekko.actor.ActorSelection; +import org.apache.pekko.actor.ActorSystem; +import org.apache.pekko.testkit.TestProbe; import org.eclipse.ditto.policies.api.PolicyTag; import org.eclipse.ditto.policies.model.PolicyId; import org.eclipse.ditto.things.model.ThingId; import org.junit.Test; -import org.apache.pekko.actor.ActorSelection; -import org.apache.pekko.actor.ActorSystem; -import org.apache.pekko.testkit.TestProbe; import nl.jqno.equalsverifier.EqualsVerifier; /** @@ -51,15 +51,15 @@ public void appendsThingPolicyTagToAllReferencedPolicies() { final Set expectedReferencedPolicyTags = Set.of(policyTag, referencedPolicyTag); - assertThat(Metadata.of(ThingId.generateRandom(), 1337L, policyTag, Set.of(referencedPolicyTag), null) + assertThat(Metadata.of(ThingId.generateRandom(), 1337L, policyTag, null, Set.of(referencedPolicyTag), null) .getAllReferencedPolicyTags()) .containsExactlyInAnyOrderElementsOf(expectedReferencedPolicyTags); - assertThat(Metadata.of(ThingId.generateRandom(), 1337L, policyTag, Set.of(referencedPolicyTag), List.of(), null, + assertThat(Metadata.of(ThingId.generateRandom(), 1337L, policyTag, null, Set.of(referencedPolicyTag), List.of(), null, null).getAllReferencedPolicyTags()) .containsExactlyInAnyOrderElementsOf(expectedReferencedPolicyTags); - assertThat(Metadata.of(ThingId.generateRandom(), 1337L, policyTag, Set.of(referencedPolicyTag), null, null) + assertThat(Metadata.of(ThingId.generateRandom(), 1337L, policyTag, null, Set.of(referencedPolicyTag), null, null) .getAllReferencedPolicyTags()) .containsExactlyInAnyOrderElementsOf(expectedReferencedPolicyTags); diff --git a/thingsearch/service/src/test/java/org/eclipse/ditto/thingsearch/service/persistence/write/model/ThingWriteModelIT.java b/thingsearch/service/src/test/java/org/eclipse/ditto/thingsearch/service/persistence/write/model/ThingWriteModelIT.java index b6a60deb31..a98a28d492 100644 --- a/thingsearch/service/src/test/java/org/eclipse/ditto/thingsearch/service/persistence/write/model/ThingWriteModelIT.java +++ b/thingsearch/service/src/test/java/org/eclipse/ditto/thingsearch/service/persistence/write/model/ThingWriteModelIT.java @@ -18,6 +18,8 @@ import java.util.Arrays; import java.util.Set; +import org.apache.pekko.stream.javadsl.Sink; +import org.apache.pekko.stream.javadsl.Source; import org.bson.BsonDocument; import org.eclipse.ditto.internal.utils.pekko.logging.DittoLogger; import org.eclipse.ditto.internal.utils.pekko.logging.DittoLoggerFactory; @@ -32,9 +34,6 @@ import com.mongodb.bulk.BulkWriteResult; import com.mongodb.reactivestreams.client.MongoCollection; -import org.apache.pekko.stream.javadsl.Sink; -import org.apache.pekko.stream.javadsl.Source; - /** * Tests MongoDB interaction of {@link ThingWriteModel}. */ @@ -111,7 +110,7 @@ private BulkWriteResult executeWrite(final ThingWriteModel... thingWriteModels) } private static ThingWriteModel getWriteModel(final long sn, final int counterValue) { - final Metadata metadata = Metadata.of(ThingId.of("thing:id"), sn, null, Set.of(), null); + final Metadata metadata = Metadata.of(ThingId.of("thing:id"), sn, null, null, Set.of(), null); final BsonDocument thingDocument = new BsonDocument() .append("_revision", BsonNumber.apply(sn)) .append("counter", BsonNumber.apply(counterValue)); diff --git a/thingsearch/service/src/test/java/org/eclipse/ditto/thingsearch/service/persistence/write/streaming/BackgroundSyncStreamTest.java b/thingsearch/service/src/test/java/org/eclipse/ditto/thingsearch/service/persistence/write/streaming/BackgroundSyncStreamTest.java index 2461a61188..2a54fb8bec 100644 --- a/thingsearch/service/src/test/java/org/eclipse/ditto/thingsearch/service/persistence/write/streaming/BackgroundSyncStreamTest.java +++ b/thingsearch/service/src/test/java/org/eclipse/ditto/thingsearch/service/persistence/write/streaming/BackgroundSyncStreamTest.java @@ -20,6 +20,11 @@ import java.util.Set; import java.util.concurrent.CompletionStage; +import org.apache.pekko.NotUsed; +import org.apache.pekko.actor.ActorSystem; +import org.apache.pekko.stream.javadsl.Sink; +import org.apache.pekko.stream.javadsl.Source; +import org.apache.pekko.testkit.javadsl.TestKit; import org.eclipse.ditto.base.model.headers.DittoHeaders; import org.eclipse.ditto.policies.api.PolicyTag; import org.eclipse.ditto.policies.api.commands.sudo.SudoRetrievePolicy; @@ -35,12 +40,6 @@ import org.junit.BeforeClass; import org.junit.Test; -import org.apache.pekko.NotUsed; -import org.apache.pekko.actor.ActorSystem; -import org.apache.pekko.stream.javadsl.Sink; -import org.apache.pekko.stream.javadsl.Source; -import org.apache.pekko.testkit.javadsl.TestKit; - /** * Tests {@link BackgroundSyncStream}. */ @@ -65,28 +64,28 @@ public void mergeMetadataStreams() { final Duration toleranceWindow = Duration.ofHours(1L); final Source persisted = Source.from(List.of( - Metadata.of(ThingId.of("x:0-only-persisted"), 1L, PolicyTag.of(PolicyId.of("x:0"), 0L), Set.of(), null), - Metadata.of(ThingId.of("x:2-within-tolerance"), 3L, null, Set.of(), null), - Metadata.of(ThingId.of("x:3-revision-mismatch"), 3L, PolicyTag.of(PolicyId.of("x:3"), 0L), Set.of(), + Metadata.of(ThingId.of("x:0-only-persisted"), 1L, PolicyTag.of(PolicyId.of("x:0"), 0L), null, Set.of(), null), + Metadata.of(ThingId.of("x:2-within-tolerance"), 3L, null, null, Set.of(), null), + Metadata.of(ThingId.of("x:3-revision-mismatch"), 3L, PolicyTag.of(PolicyId.of("x:3"), 0L), null, Set.of(), null), - Metadata.of(ThingId.of("x:4-policy-id-mismatch"), 3L, PolicyTag.of(PolicyId.of("x:4"), 0L), Set.of(), + Metadata.of(ThingId.of("x:4-policy-id-mismatch"), 3L, PolicyTag.of(PolicyId.of("x:4"), 0L), null, Set.of(), null), - Metadata.of(ThingId.of("x:5-policy-revision-mismatch"), 3L, PolicyTag.of(PolicyId.of("x:5"), 0L), + Metadata.of(ThingId.of("x:5-policy-revision-mismatch"), 3L, PolicyTag.of(PolicyId.of("x:5"), 0L), null, Set.of(), null), - Metadata.of(ThingId.of("x:6-all-up-to-date"), 3L, PolicyTag.of(PolicyId.of("x:6"), 0L), Set.of(), null), - Metadata.of(ThingId.of("x:7-policy-deleted"), 7L, PolicyTag.of(PolicyId.of("x:7"), 0L), Set.of(), null) + Metadata.of(ThingId.of("x:6-all-up-to-date"), 3L, PolicyTag.of(PolicyId.of("x:6"), 0L), null, Set.of(), null), + Metadata.of(ThingId.of("x:7-policy-deleted"), 7L, PolicyTag.of(PolicyId.of("x:7"), 0L), null, Set.of(), null) )); final Source indexed = Source.from(List.of( - Metadata.of(ThingId.of("x:1-only-indexed"), 1L, null, Set.of(), null), - Metadata.of(ThingId.of("x:2-within-tolerance"), 1L, null, Set.of(), Instant.now(), null), - Metadata.of(ThingId.of("x:3-revision-mismatch"), 2L, PolicyTag.of(PolicyId.of("x:3"), 1L), Set.of(), + Metadata.of(ThingId.of("x:1-only-indexed"), 1L, null, null, Set.of(), null), + Metadata.of(ThingId.of("x:2-within-tolerance"), 1L, null, null, Set.of(), Instant.now(), null), + Metadata.of(ThingId.of("x:3-revision-mismatch"), 2L, PolicyTag.of(PolicyId.of("x:3"), 1L), null, Set.of(), null), Metadata.of(ThingId.of("x:4-policy-id-mismatch"), 3L, PolicyTag.of(PolicyId.of("x:mismatched"), 0L), - Set.of(), null), + null, Set.of(), null), Metadata.of(ThingId.of("x:5-policy-revision-mismatch"), 3L, PolicyTag.of(PolicyId.of("x:5"), 3L), - Set.of(), null), - Metadata.of(ThingId.of("x:6-all-up-to-date"), 5L, PolicyTag.of(PolicyId.of("x:6"), 6L), Set.of(), null) + null, Set.of(), null), + Metadata.of(ThingId.of("x:6-all-up-to-date"), 5L, PolicyTag.of(PolicyId.of("x:6"), 6L), null, Set.of(), null) )); new TestKit(actorSystem) {{ diff --git a/thingsearch/service/src/test/java/org/eclipse/ditto/thingsearch/service/persistence/write/streaming/BulkWriteResultAckFlowTest.java b/thingsearch/service/src/test/java/org/eclipse/ditto/thingsearch/service/persistence/write/streaming/BulkWriteResultAckFlowTest.java index d364d65e73..da03304b93 100644 --- a/thingsearch/service/src/test/java/org/eclipse/ditto/thingsearch/service/persistence/write/streaming/BulkWriteResultAckFlowTest.java +++ b/thingsearch/service/src/test/java/org/eclipse/ditto/thingsearch/service/persistence/write/streaming/BulkWriteResultAckFlowTest.java @@ -20,6 +20,12 @@ import java.util.stream.Collectors; import java.util.stream.IntStream; +import org.apache.pekko.actor.ActorSystem; +import org.apache.pekko.japi.Pair; +import org.apache.pekko.stream.javadsl.Sink; +import org.apache.pekko.stream.javadsl.Source; +import org.apache.pekko.testkit.TestProbe; +import org.apache.pekko.testkit.javadsl.TestKit; import org.bson.BsonDocument; import org.bson.BsonString; import org.eclipse.ditto.base.model.common.HttpStatus; @@ -43,13 +49,6 @@ import com.mongodb.bulk.BulkWriteResult; import com.mongodb.bulk.BulkWriteUpsert; -import org.apache.pekko.actor.ActorSystem; -import org.apache.pekko.japi.Pair; -import org.apache.pekko.stream.javadsl.Sink; -import org.apache.pekko.stream.javadsl.Source; -import org.apache.pekko.testkit.TestProbe; -import org.apache.pekko.testkit.javadsl.TestKit; - /** * Tests {@link BulkWriteResultAckFlow}. */ @@ -202,7 +201,7 @@ private List generateWriteModels(final List probes) final long policyRevision = i * 100L; final PolicyTag policyTag = policyId == null ? null : PolicyTag.of(policyId, policyRevision); final Metadata metadata = - Metadata.of(thingId, thingRevision, policyTag, Set.of(), List.of(), null, + Metadata.of(thingId, thingRevision, policyTag, null, Set.of(), List.of(), null, actorSystem.actorSelection(probes.get(i).ref().path())); final AbstractWriteModel abstractModel; if (i % 2 == 0) { diff --git a/thingsearch/service/src/test/java/org/eclipse/ditto/thingsearch/service/persistence/write/streaming/EnforcementFlowTest.java b/thingsearch/service/src/test/java/org/eclipse/ditto/thingsearch/service/persistence/write/streaming/EnforcementFlowTest.java index 319dbac615..0ebd54d6f3 100644 --- a/thingsearch/service/src/test/java/org/eclipse/ditto/thingsearch/service/persistence/write/streaming/EnforcementFlowTest.java +++ b/thingsearch/service/src/test/java/org/eclipse/ditto/thingsearch/service/persistence/write/streaming/EnforcementFlowTest.java @@ -22,6 +22,17 @@ import java.util.concurrent.TimeUnit; import java.util.stream.IntStream; +import org.apache.pekko.actor.ActorRef; +import org.apache.pekko.actor.ActorSystem; +import org.apache.pekko.stream.KillSwitches; +import org.apache.pekko.stream.javadsl.Keep; +import org.apache.pekko.stream.testkit.TestPublisher; +import org.apache.pekko.stream.testkit.TestSubscriber; +import org.apache.pekko.stream.testkit.javadsl.TestSink; +import org.apache.pekko.stream.testkit.javadsl.TestSource; +import org.apache.pekko.testkit.TestActor; +import org.apache.pekko.testkit.TestProbe; +import org.apache.pekko.testkit.javadsl.TestKit; import org.eclipse.ditto.base.model.headers.DittoHeaders; import org.eclipse.ditto.base.model.json.FieldType; import org.eclipse.ditto.json.JsonObject; @@ -57,17 +68,6 @@ import com.typesafe.config.ConfigFactory; -import org.apache.pekko.actor.ActorRef; -import org.apache.pekko.actor.ActorSystem; -import org.apache.pekko.stream.KillSwitches; -import org.apache.pekko.stream.javadsl.Keep; -import org.apache.pekko.stream.testkit.TestPublisher; -import org.apache.pekko.stream.testkit.TestSubscriber; -import org.apache.pekko.stream.testkit.javadsl.TestSink; -import org.apache.pekko.stream.testkit.javadsl.TestSource; -import org.apache.pekko.testkit.TestActor; -import org.apache.pekko.testkit.TestProbe; -import org.apache.pekko.testkit.javadsl.TestKit; import scala.concurrent.duration.Duration; import scala.concurrent.duration.FiniteDuration; @@ -105,7 +105,7 @@ public void updateThingAndPolicyRevisions() { final ThingId thingId = ThingId.of("thing:id"); final PolicyId policyId = PolicyId.of("policy:id"); final Metadata metadata = - Metadata.of(thingId, thingRev1, PolicyTag.of(policyId, policyRev1), Set.of(), null); + Metadata.of(thingId, thingRev1, PolicyTag.of(policyId, policyRev1), null, Set.of(), null); final Collection input = List.of(metadata); final TestProbe thingsProbe = TestProbe.apply(system); @@ -157,7 +157,7 @@ public void ignoreCacheWhenRequestedToUpdate() { final long policyRev2 = 98L; final ThingId thingId = ThingId.of("thing:id"); final PolicyId policyId = PolicyId.of("policy:id"); - final Metadata metadata1 = Metadata.of(thingId, thingRev1, PolicyTag.of(policyId, policyRev1), Set.of(), null); + final Metadata metadata1 = Metadata.of(thingId, thingRev1, PolicyTag.of(policyId, policyRev1), null, Set.of(), null); final TestProbe thingsProbe = TestProbe.apply(system); final TestProbe policiesProbe = TestProbe.apply(system); @@ -233,7 +233,7 @@ public void computeThingCacheValueFromThingEvents() { ); final Metadata metadata = - Metadata.of(thingId, 5L, PolicyTag.of(policyId, 1L), Set.of(), events, null, null); + Metadata.of(thingId, 5L, PolicyTag.of(policyId, 1L), null, Set.of(), events, null, null); final List inputMap = List.of(metadata); final TestProbe thingsProbe = TestProbe.apply(system); @@ -301,7 +301,7 @@ public void computeThingCacheValueFromThingEventsWhenLastEventWasDeleted() { ); final Metadata metadata = - Metadata.of(thingId, 5L, PolicyTag.of(policyId, 1L), Set.of(), events, null, null); + Metadata.of(thingId, 5L, PolicyTag.of(policyId, 1L), null, Set.of(), events, null, null); final List inputMap = List.of(metadata); final TestProbe thingsProbe = TestProbe.apply(system); @@ -357,7 +357,7 @@ public void forceRetrieveThing() { AttributeDeleted.of(thingId, JsonPointer.of("w"), 6, null, headers, null) ); - final Metadata metadata = Metadata.of(thingId, 6L, PolicyTag.of(policyId, 1L), Set.of(), events, null, null) + final Metadata metadata = Metadata.of(thingId, 6L, PolicyTag.of(policyId, 1L), null, Set.of(), events, null, null) .invalidateCaches(true, true); final List inputMap = List.of(metadata); @@ -404,7 +404,7 @@ public void eventSequenceNumberTooLow() { ); final Metadata metadata = - Metadata.of(thingId, 7L, PolicyTag.of(policyId, 1L), Set.of(), events, null, null); + Metadata.of(thingId, 7L, PolicyTag.of(policyId, 1L), null, Set.of(), events, null, null); final List inputMap = List.of(metadata); final TestProbe thingsProbe = TestProbe.apply(system); @@ -449,7 +449,7 @@ public void eventMissed() { ); final Metadata metadata = - Metadata.of(thingId, 6L, PolicyTag.of(policyId, 1L), Set.of(), events, null, null); + Metadata.of(thingId, 6L, PolicyTag.of(policyId, 1L), null, Set.of(), events, null, null); final List inputMap = List.of(metadata); final TestProbe thingsProbe = TestProbe.apply(system); @@ -493,7 +493,7 @@ public void noInitialCreatedOrDeletedEvent() { ); final Metadata metadata = - Metadata.of(thingId, 6L, PolicyTag.of(policyId, 1L), Set.of(), events, null, null); + Metadata.of(thingId, 6L, PolicyTag.of(policyId, 1L), null, Set.of(), events, null, null); final List inputMap = List.of(metadata); final TestProbe thingsProbe = TestProbe.apply(system); @@ -561,7 +561,7 @@ public void onlyApplyRelevantEvents() { ); final Metadata metadata = - Metadata.of(thingId, 6L, PolicyTag.of(policyId, 1L), Set.of(), events, null, null); + Metadata.of(thingId, 6L, PolicyTag.of(policyId, 1L), null, Set.of(), events, null, null); final List inputMap = List.of(metadata); final TestProbe thingsProbe = TestProbe.apply(system); @@ -618,7 +618,7 @@ public void thereCanBeMultipleUpdatesPerBulk() { final ThingId thingId = ThingId.of("thing:" + i); final Thing ithThing = thing.toBuilder().setId(thingId).setRevision(i).build(); final List> events = List.of(ThingModified.of(ithThing, i, null, headers, null)); - return Metadata.of(thingId, i, PolicyTag.of(policyId, 1L), Set.of(), events, null, null); + return Metadata.of(thingId, i, PolicyTag.of(policyId, 1L), null, Set.of(), events, null, null); }).toList()); final TestProbe thingsProbe = TestProbe.apply(system); diff --git a/thingsearch/service/src/test/java/org/eclipse/ditto/thingsearch/service/persistence/write/streaming/TestSearchUpdaterStream.java b/thingsearch/service/src/test/java/org/eclipse/ditto/thingsearch/service/persistence/write/streaming/TestSearchUpdaterStream.java index eecf79337c..8c520bdf8e 100644 --- a/thingsearch/service/src/test/java/org/eclipse/ditto/thingsearch/service/persistence/write/streaming/TestSearchUpdaterStream.java +++ b/thingsearch/service/src/test/java/org/eclipse/ditto/thingsearch/service/persistence/write/streaming/TestSearchUpdaterStream.java @@ -16,6 +16,8 @@ import javax.annotation.Nullable; +import org.apache.pekko.NotUsed; +import org.apache.pekko.stream.javadsl.Source; import org.eclipse.ditto.base.model.json.FieldType; import org.eclipse.ditto.json.JsonObject; import org.eclipse.ditto.policies.api.PolicyTag; @@ -34,9 +36,6 @@ import com.mongodb.reactivestreams.client.MongoDatabase; import com.typesafe.config.ConfigFactory; -import org.apache.pekko.NotUsed; -import org.apache.pekko.stream.javadsl.Source; - /** * Run parts of the updater stream for unit tests. */ @@ -99,7 +98,7 @@ public Source write(final Thing thing, public Source delete(final ThingId thingId, final long revision, @Nullable final PolicyId policyId, final long policyRevision) { return delete(Metadata.of(thingId, revision, policyId == null ? null : PolicyTag.of(policyId, policyRevision), - Set.of(), null)); + null, Set.of(), null)); } /** diff --git a/thingsearch/service/src/test/java/org/eclipse/ditto/thingsearch/service/updater/actors/BackgroundSyncActorTest.java b/thingsearch/service/src/test/java/org/eclipse/ditto/thingsearch/service/updater/actors/BackgroundSyncActorTest.java index 234d92088d..55273eeda4 100644 --- a/thingsearch/service/src/test/java/org/eclipse/ditto/thingsearch/service/updater/actors/BackgroundSyncActorTest.java +++ b/thingsearch/service/src/test/java/org/eclipse/ditto/thingsearch/service/updater/actors/BackgroundSyncActorTest.java @@ -30,6 +30,17 @@ import javax.annotation.Nullable; +import org.apache.pekko.Done; +import org.apache.pekko.NotUsed; +import org.apache.pekko.actor.ActorRef; +import org.apache.pekko.actor.ActorSystem; +import org.apache.pekko.cluster.pubsub.DistributedPubSubMediator; +import org.apache.pekko.japi.Pair; +import org.apache.pekko.stream.Materializer; +import org.apache.pekko.stream.SourceRef; +import org.apache.pekko.stream.javadsl.Source; +import org.apache.pekko.stream.javadsl.StreamRefs; +import org.apache.pekko.testkit.javadsl.TestKit; import org.awaitility.Awaitility; import org.eclipse.ditto.base.api.common.Shutdown; import org.eclipse.ditto.base.api.common.ShutdownReasonFactory; @@ -41,13 +52,13 @@ import org.eclipse.ditto.base.model.signals.acks.Acknowledgement; import org.eclipse.ditto.internal.models.streaming.StreamedSnapshot; import org.eclipse.ditto.internal.models.streaming.SudoStreamSnapshots; -import org.eclipse.ditto.internal.utils.pekko.streaming.TimestampPersistence; import org.eclipse.ditto.internal.utils.health.ResetHealthEvents; import org.eclipse.ditto.internal.utils.health.ResetHealthEventsResponse; import org.eclipse.ditto.internal.utils.health.RetrieveHealth; import org.eclipse.ditto.internal.utils.health.RetrieveHealthResponse; import org.eclipse.ditto.internal.utils.health.StatusDetailMessage; import org.eclipse.ditto.internal.utils.health.StatusInfo; +import org.eclipse.ditto.internal.utils.pekko.streaming.TimestampPersistence; import org.eclipse.ditto.json.JsonArray; import org.eclipse.ditto.json.JsonObject; import org.eclipse.ditto.json.JsonValue; @@ -72,18 +83,6 @@ import com.typesafe.config.Config; import com.typesafe.config.ConfigFactory; -import org.apache.pekko.Done; -import org.apache.pekko.NotUsed; -import org.apache.pekko.actor.ActorRef; -import org.apache.pekko.actor.ActorSystem; -import org.apache.pekko.cluster.pubsub.DistributedPubSubMediator; -import org.apache.pekko.japi.Pair; -import org.apache.pekko.stream.Materializer; -import org.apache.pekko.stream.SourceRef; -import org.apache.pekko.stream.javadsl.Source; -import org.apache.pekko.stream.javadsl.StreamRefs; -import org.apache.pekko.testkit.javadsl.TestKit; - /** * Unit test for {@link BackgroundSyncActor}. */ @@ -105,7 +104,7 @@ public final class BackgroundSyncActorTest { private static final List THINGS_INDEXED = KNOWN_IDs.stream() .map(id -> Metadata.of(ThingId.of(id), REVISION_INDEXED, - PolicyTag.of(PolicyId.of(id), REVISION_INDEXED), Set.of(), null)) + PolicyTag.of(PolicyId.of(id), REVISION_INDEXED), null, Set.of(), null)) .toList(); private static final List THINGS_PERSISTED = KNOWN_IDs.stream() .map(id -> createStreamedSnapshot(id, REVISION_PERSISTED)) @@ -219,7 +218,7 @@ public void providesHealthWarningWhenSyncStreamFails() { @Test public void resettingHealthEventsAfterSyncStreamFailureClearsErrors() { - final Metadata indexedThingMetadata = Metadata.of(THING_ID, 2, null, Set.of(), null); + final Metadata indexedThingMetadata = Metadata.of(THING_ID, 2, null, null, Set.of(), null); final long persistedRevision = indexedThingMetadata.getThingRevision() + 1; new TestKit(actorSystem) {{ @@ -246,9 +245,9 @@ public void resettingHealthEventsAfterSyncStreamFailureClearsErrors() { @Test public void noHealthWarningAfterSuccessfulStream() { - final Metadata indexedThingMetadata = Metadata.of(THING_ID, 2, null, Set.of(), null); + final Metadata indexedThingMetadata = Metadata.of(THING_ID, 2, null, null, Set.of(), null); final long persistedRevision = indexedThingMetadata.getThingRevision() + 1; - final Metadata persistedThingMetadata = Metadata.of(THING_ID, persistedRevision, null, Set.of(), null); + final Metadata persistedThingMetadata = Metadata.of(THING_ID, persistedRevision, null, null, Set.of(), null); final var streamedSnapshots = List.of(createStreamedSnapshot(THING_ID, persistedRevision)); final var streamedSnapshotsWithoutPolicyId = List.of(createStreamedSnapshotWithoutPolicyId(THING_ID, persistedRevision)); @@ -291,9 +290,9 @@ public void noHealthWarningAfterSuccessfulStream() { @Test public void staysHealthyWhenSameThingIsSynchronizedWithOtherRevision() { - final Metadata indexedThingMetadata = Metadata.of(THING_ID, 2, null, Set.of(), null); + final Metadata indexedThingMetadata = Metadata.of(THING_ID, 2, null, null, Set.of(), null); final long persistedRevision = indexedThingMetadata.getThingRevision() + 1; - final Metadata nextThingMetadata = Metadata.of(THING_ID, persistedRevision, null, Set.of(), null); + final Metadata nextThingMetadata = Metadata.of(THING_ID, persistedRevision, null, null, Set.of(), null); final long nextRevision = nextThingMetadata.getThingRevision() + 1; new TestKit(actorSystem) {{ diff --git a/thingsearch/service/src/test/java/org/eclipse/ditto/thingsearch/service/updater/actors/ThingUpdaterTest.java b/thingsearch/service/src/test/java/org/eclipse/ditto/thingsearch/service/updater/actors/ThingUpdaterTest.java index 004cfe45bd..9c3f6eb90a 100644 --- a/thingsearch/service/src/test/java/org/eclipse/ditto/thingsearch/service/updater/actors/ThingUpdaterTest.java +++ b/thingsearch/service/src/test/java/org/eclipse/ditto/thingsearch/service/updater/actors/ThingUpdaterTest.java @@ -20,6 +20,24 @@ import java.util.List; import java.util.Set; +import org.apache.pekko.Done; +import org.apache.pekko.NotUsed; +import org.apache.pekko.actor.ActorRef; +import org.apache.pekko.actor.ActorSystem; +import org.apache.pekko.actor.Props; +import org.apache.pekko.cluster.pubsub.DistributedPubSubMediator; +import org.apache.pekko.stream.javadsl.Flow; +import org.apache.pekko.stream.javadsl.Keep; +import org.apache.pekko.stream.javadsl.MergeHub; +import org.apache.pekko.stream.javadsl.Sink; +import org.apache.pekko.stream.javadsl.Source; +import org.apache.pekko.stream.scaladsl.BroadcastHub; +import org.apache.pekko.stream.testkit.TestPublisher; +import org.apache.pekko.stream.testkit.TestSubscriber; +import org.apache.pekko.stream.testkit.javadsl.TestSink; +import org.apache.pekko.stream.testkit.javadsl.TestSource; +import org.apache.pekko.testkit.TestProbe; +import org.apache.pekko.testkit.javadsl.TestKit; import org.bson.BsonArray; import org.bson.BsonDocument; import org.bson.BsonInt64; @@ -29,8 +47,8 @@ import org.eclipse.ditto.base.model.acks.DittoAcknowledgementLabel; import org.eclipse.ditto.base.model.entity.id.EntityId; import org.eclipse.ditto.base.model.headers.DittoHeaders; -import org.eclipse.ditto.internal.utils.pekko.ActorSystemResource; import org.eclipse.ditto.internal.utils.config.DefaultScopedConfig; +import org.eclipse.ditto.internal.utils.pekko.ActorSystemResource; import org.eclipse.ditto.internal.utils.tracing.DittoTracingInitResource; import org.eclipse.ditto.json.JsonPointer; import org.eclipse.ditto.json.JsonValue; @@ -58,24 +76,6 @@ import com.mongodb.client.model.UpdateOneModel; import com.typesafe.config.ConfigFactory; -import org.apache.pekko.Done; -import org.apache.pekko.NotUsed; -import org.apache.pekko.actor.ActorRef; -import org.apache.pekko.actor.ActorSystem; -import org.apache.pekko.actor.Props; -import org.apache.pekko.cluster.pubsub.DistributedPubSubMediator; -import org.apache.pekko.stream.javadsl.Flow; -import org.apache.pekko.stream.javadsl.Keep; -import org.apache.pekko.stream.javadsl.MergeHub; -import org.apache.pekko.stream.javadsl.Sink; -import org.apache.pekko.stream.javadsl.Source; -import org.apache.pekko.stream.scaladsl.BroadcastHub; -import org.apache.pekko.stream.testkit.TestPublisher; -import org.apache.pekko.stream.testkit.TestSubscriber; -import org.apache.pekko.stream.testkit.javadsl.TestSink; -import org.apache.pekko.stream.testkit.javadsl.TestSource; -import org.apache.pekko.testkit.TestProbe; -import org.apache.pekko.testkit.javadsl.TestKit; import scala.concurrent.duration.FiniteDuration; /** @@ -169,7 +169,7 @@ public void updateFromEvent() { inletProbe.ensureSubscription(); inletProbe.request(16); final var data = inletProbe.expectNext(); - assertThat(data.metadata().export()).isEqualTo(Metadata.of(THING_ID, REVISION + 1, null, Set.of(), null)); + assertThat(data.metadata().export()).isEqualTo(Metadata.of(THING_ID, REVISION + 1, null, null, Set.of(), null)); assertThat(data.metadata().getTimers()).hasSize(1); assertThat(data.metadata().getAckRecipients()).isEmpty(); assertThat(data.lastWriteModel()).isEqualTo(getThingWriteModel()); @@ -193,7 +193,7 @@ public void updateFromEventWithAck() { inletProbe.ensureSubscription(); inletProbe.request(16); final var data = inletProbe.expectNext(); - assertThat(data.metadata().export()).isEqualTo(Metadata.of(THING_ID, REVISION + 1, null, Set.of(), null)); + assertThat(data.metadata().export()).isEqualTo(Metadata.of(THING_ID, REVISION + 1, null, null, Set.of(), null)); assertThat(data.metadata().getTimers()).hasSize(1); assertThat(data.metadata().getAckRecipients()).containsOnly(getSystem().actorSelection(getRef().path())); assertThat(data.lastWriteModel()).isEqualTo(getThingWriteModel()); @@ -266,7 +266,7 @@ public void updateFromEventAfterSkippedUpdate() { inletProbe.ensureSubscription(); inletProbe.request(16); final var data = inletProbe.expectNext(); - assertThat(data.metadata().export()).isEqualTo(Metadata.of(THING_ID, REVISION + 1, null, Set.of(), null)); + assertThat(data.metadata().export()).isEqualTo(Metadata.of(THING_ID, REVISION + 1, null, null, Set.of(), null)); assertThat(data.metadata().getTimers()).hasSize(1); assertThat(data.metadata().getAckRecipients()).isEmpty(); assertThat(data.lastWriteModel()).isEqualTo(getThingWriteModel()); @@ -281,7 +281,7 @@ public void updateFromEventAfterSkippedUpdate() { // THEN: next update is processed regularly final var data2 = inletProbe.expectNext(); - assertThat(data2.metadata().export()).isEqualTo(Metadata.of(THING_ID, REVISION + 2, null, Set.of(), null)); + assertThat(data2.metadata().export()).isEqualTo(Metadata.of(THING_ID, REVISION + 2, null, null, Set.of(), null)); assertThat(data2.metadata().getTimers()).hasSize(1); assertThat(data2.metadata().getAckRecipients()).isEmpty(); assertThat(data2.lastWriteModel()).isEqualTo( @@ -397,7 +397,7 @@ public void combineUpdatesFrom2Events() { inletProbe.ensureSubscription(); inletProbe.request(16); final var data = inletProbe.expectNext(); - assertThat(data.metadata().export()).isEqualTo(Metadata.of(THING_ID, REVISION + 2, null, Set.of(), null)); + assertThat(data.metadata().export()).isEqualTo(Metadata.of(THING_ID, REVISION + 2, null, null, Set.of(), null)); assertThat(data.metadata().getTimers()).hasSize(2); assertThat(data.metadata().getEvents()).hasSize(2); assertThat(data.lastWriteModel()).isEqualTo(getThingWriteModel()); @@ -429,7 +429,7 @@ public void stashEventsDuringPersistence() { inletProbe.ensureSubscription(); inletProbe.request(16); final var data1 = inletProbe.expectNext(); - assertThat(data1.metadata().export()).isEqualTo(Metadata.of(THING_ID, REVISION + 1, null, Set.of(), null)); + assertThat(data1.metadata().export()).isEqualTo(Metadata.of(THING_ID, REVISION + 1, null, null, Set.of(), null)); underTest.tell(event2, ActorRef.noSender()); // THEN: no second update is sent until the first event is persisted @@ -439,7 +439,7 @@ public void stashEventsDuringPersistence() { outletProbe.expectRequest(); outletProbe.sendNext(getOKResult(REVISION + 1)); final var data2 = inletProbe.expectNext(TEN_SECONDS); - assertThat(data2.metadata().export()).isEqualTo(Metadata.of(THING_ID, REVISION + 2, null, Set.of(), null)); + assertThat(data2.metadata().export()).isEqualTo(Metadata.of(THING_ID, REVISION + 2, null, null, Set.of(), null)); }}; } @@ -461,7 +461,7 @@ public void policyIdChangeTriggersSync() { inletProbe.request(16); final var data = inletProbe.expectNext(); assertThat(data.metadata().export()).isEqualTo( - Metadata.of(THING_ID, REVISION, null, Set.of(PolicyTag.of(policyId, 1L)), null)); + Metadata.of(THING_ID, REVISION, null, null, Set.of(PolicyTag.of(policyId, 1L)), null)); assertThat(data.metadata().getUpdateReasons()).contains(UpdateReason.POLICY_UPDATE); assertThat(data.metadata().getTimers()).isEmpty(); }}; @@ -480,7 +480,7 @@ public void triggerUpdateOnCommand() { inletProbe.ensureSubscription(); inletProbe.request(16); final var data = inletProbe.expectNext(); - assertThat(data.metadata().export()).isEqualTo(Metadata.of(THING_ID, REVISION, null, Set.of(), null)); + assertThat(data.metadata().export()).isEqualTo(Metadata.of(THING_ID, REVISION, null, null, Set.of(), null)); assertThat(data.metadata().getTimers()).isEmpty(); assertThat(data.lastWriteModel()).isEqualTo(getThingWriteModel()); }}; @@ -491,7 +491,7 @@ public void forceUpdateOnCommand() { new TestKit(system) {{ final Props props = ThingUpdater.props(flow, id -> Source.single(getThingWriteModel()), SEARCH_CONFIG, getTestActor()); - final var expectedMetadata = Metadata.of(THING_ID, REVISION, null, Set.of(), null); + final var expectedMetadata = Metadata.of(THING_ID, REVISION, null, null, Set.of(), null); final ActorRef underTest = watch(childActorOf(props, ACTOR_NAME)); final var command = SudoUpdateThing.of(THING_ID, UpdateReason.MANUAL_REINDEXING, DittoHeaders.newBuilder() @@ -527,7 +527,7 @@ public void shutdownOnThingDeletedCommand() { inletProbe.ensureSubscription(); inletProbe.request(16); final var data = inletProbe.expectNext(); - assertThat(data.metadata().export()).isEqualTo(Metadata.of(THING_ID, REVISION + 1, null, Set.of(), null)); + assertThat(data.metadata().export()).isEqualTo(Metadata.of(THING_ID, REVISION + 1, null, null, Set.of(), null)); assertThat(data.metadata().getUpdateReasons()).contains(UpdateReason.THING_UPDATE); assertThat(data.metadata().getEvents()).hasOnlyElementsOfType(ThingDeleted.class); assertThat(data.metadata().getTimers()).hasSize(1); @@ -622,7 +622,7 @@ public void initialUpdateSkipped() { ); final Props props = - ThingUpdater.props(flow, id -> Source.single(ThingDeleteModel.of(Metadata.of(THING_ID, -1, null, + ThingUpdater.props(flow, id -> Source.single(ThingDeleteModel.of(Metadata.of(THING_ID, -1, null, null, Set.of(), null))), SEARCH_CONFIG, TestProbe.apply(system).ref()); final ActorRef underTest = watch(childActorOf(props, ACTOR_NAME)); @@ -657,7 +657,7 @@ private static ThingWriteModel getThingWriteModel(final long revision) { .append("f", new BsonArray()) .append("t", new BsonDocument().append("attributes", new BsonDocument().append("x", BsonInt32.apply(5)))); - return ThingWriteModel.of(Metadata.of(THING_ID, revision, null, Set.of(), null), document); + return ThingWriteModel.of(Metadata.of(THING_ID, revision, null, null, Set.of(), null), document); } private static String getActorName(final String name) {