Skip to content

Commit

Permalink
Allow background sync stream to determine whether to invalidate cache…
Browse files Browse the repository at this point in the history
…d things or policies.

Signed-off-by: Yufei Cai <yufei.cai@bosch.io>
  • Loading branch information
yufei-cai committed Oct 25, 2021
1 parent 486d663 commit 86bc303
Show file tree
Hide file tree
Showing 8 changed files with 132 additions and 45 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,15 @@
*/
package org.eclipse.ditto.thingsearch.api.commands.sudo;

import static org.eclipse.ditto.base.model.json.JsonSchemaVersion.V_2;

import java.util.Objects;
import java.util.function.Predicate;

import javax.annotation.concurrent.Immutable;

import org.eclipse.ditto.base.model.json.FieldType;
import org.eclipse.ditto.json.JsonFactory;
import org.eclipse.ditto.json.JsonField;
import org.eclipse.ditto.json.JsonFieldDefinition;
import org.eclipse.ditto.json.JsonObject;
Expand Down Expand Up @@ -60,12 +64,24 @@ public final class UpdateThing extends AbstractCommand<UpdateThing> implements S
public static final String TYPE = TYPE_PREFIX + NAME;

private static final JsonFieldDefinition<String> JSON_THING_ID = Thing.JsonFields.ID;
private static final JsonFieldDefinition<Boolean> JSON_INVALIDATE_THING =
JsonFactory.newBooleanFieldDefinition("invalidateThing", FieldType.REGULAR, V_2);
private static final JsonFieldDefinition<Boolean> JSON_INVALIDATE_POLICY =
JsonFactory.newBooleanFieldDefinition("invalidatePolicy", FieldType.REGULAR, V_2);


private final ThingId thingId;
private final boolean invalidateThing;
private final boolean invalidatePolicy;

private UpdateThing(final ThingId thingId, final DittoHeaders dittoHeaders) {
private UpdateThing(final ThingId thingId,
final boolean invalidateThing,
final boolean invalidatePolicy,
final DittoHeaders dittoHeaders) {
super(TYPE, dittoHeaders);
this.thingId = thingId;
this.invalidateThing = invalidateThing;
this.invalidatePolicy = invalidatePolicy;
}

/**
Expand All @@ -76,7 +92,24 @@ private UpdateThing(final ThingId thingId, final DittoHeaders dittoHeaders) {
* @return the command.
*/
public static UpdateThing of(final ThingId thingId, final DittoHeaders dittoHeaders) {
return new UpdateThing(thingId, dittoHeaders);
return new UpdateThing(thingId, true, true, dittoHeaders);
}

/**
* Create an UpdateThing command.
*
* @param thingId the ID of the thing whose search index should be updated.
* @param invalidateThing whether the cached thing should be invalidated.
* @param invalidatePolicy whether the cached policy should be invalidated.
* @param dittoHeaders Ditto headers of the command.
* @return the command.
* @since 2.1.0
*/
public static UpdateThing of(final ThingId thingId,
final boolean invalidateThing,
final boolean invalidatePolicy,
final DittoHeaders dittoHeaders) {
return new UpdateThing(thingId, invalidateThing, invalidatePolicy, dittoHeaders);
}

/**
Expand All @@ -90,13 +123,20 @@ public static UpdateThing of(final ThingId thingId, final DittoHeaders dittoHead
* "thingId".
*/
public static UpdateThing fromJson(final JsonObject jsonObject, final DittoHeaders dittoHeaders) {
return of(ThingId.of(jsonObject.getValueOrThrow(JSON_THING_ID)), dittoHeaders);
return of(
ThingId.of(jsonObject.getValueOrThrow(JSON_THING_ID)),
jsonObject.getValueOrThrow(JSON_INVALIDATE_THING),
jsonObject.getValueOrThrow(JSON_INVALIDATE_POLICY),
dittoHeaders
);
}

@Override
protected void appendPayload(final JsonObjectBuilder jsonObjectBuilder, final JsonSchemaVersion schemaVersion,
final Predicate<JsonField> predicate) {
jsonObjectBuilder.set(JSON_THING_ID, thingId.toString(), predicate);
jsonObjectBuilder.set(JSON_THING_ID, thingId.toString(), predicate)
.set(JSON_INVALIDATE_THING, invalidateThing, predicate)
.set(JSON_INVALIDATE_POLICY, invalidatePolicy, predicate);
}

@Override
Expand All @@ -111,14 +151,34 @@ public Category getCategory() {

@Override
public UpdateThing setDittoHeaders(final DittoHeaders dittoHeaders) {
return new UpdateThing(thingId, dittoHeaders);
return new UpdateThing(thingId, invalidateThing, invalidatePolicy, dittoHeaders);
}

@Override
public ThingId getEntityId() {
return thingId;
}

/**
* Return whether to invalidate the cached thing.
*
* @return whether to invalidate the cached thing.
* @since 2.1.0
*/
public boolean shouldInvalidateThing() {
return invalidateThing;
}

/**
* Return whether to invalidate the cached policy.
*
* @return whether to invalidate the cached policy.
* @since 2.1.0
*/
public boolean shouldInvalidatePolicy() {
return invalidatePolicy;
}

@Override
public JsonPointer getResourcePath() {
return JsonPointer.empty();
Expand All @@ -135,17 +195,25 @@ public boolean equals(final Object o) {
return false;
} else {
final UpdateThing that = (UpdateThing) o;
return Objects.equals(thingId, that.thingId) && super.equals(that);
return Objects.equals(thingId, that.thingId) &&
invalidateThing == that.invalidateThing &&
invalidatePolicy == that.invalidatePolicy &&
super.equals(that);
}
}

@Override
public int hashCode() {
return Objects.hash(super.hashCode(), thingId);
return Objects.hash(super.hashCode(), thingId, invalidateThing, invalidatePolicy);
}

@Override
public String toString() {
return getClass().getSimpleName() + "[" + super.toString() + ",thingId=" + thingId + "]";
return getClass().getSimpleName() +
"[" + super.toString() +
",thingId=" + thingId +
",invalidateThing=" + invalidateThing +
",invalidatePolicy=" + invalidatePolicy +
"]";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@

import static org.assertj.core.api.Assertions.assertThat;

import org.eclipse.ditto.json.JsonObject;
import org.eclipse.ditto.base.model.headers.DittoHeaders;
import org.eclipse.ditto.json.JsonObject;
import org.eclipse.ditto.things.model.ThingId;
import org.junit.Test;
import org.mutabilitydetector.unittesting.AllowedReason;
Expand Down Expand Up @@ -46,7 +46,7 @@ public void testHashCodeAndEquals() {
@Test
public void testSerialization() {
final DittoHeaders dittoHeaders = DittoHeaders.newBuilder().randomCorrelationId().build();
final UpdateThing command = UpdateThing.of(ThingId.of("namespace", "name"), dittoHeaders);
final UpdateThing command = UpdateThing.of(ThingId.of("namespace", "name"), true, false, dittoHeaders);
final String jsonString = command.toJsonString();
final UpdateThing deserializedCommand = UpdateThing.fromJson(JsonObject.of(jsonString), dittoHeaders);
assertThat(deserializedCommand).isEqualTo(command);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,8 @@ public final class Metadata {
private final List<ThingEvent<?>> events;
private final List<StartedTimer> timers;
private final List<ActorRef> senders;
private final boolean invalidateCache;
private final boolean invalidateThing;
private final boolean invalidatePolicy;
@Nullable final ActorRef origin;

private Metadata(final ThingId thingId,
Expand All @@ -58,8 +59,8 @@ private Metadata(final ThingId thingId,
final List<ThingEvent<?>> events,
final Collection<StartedTimer> timers,
final Collection<ActorRef> senders,
final boolean invalidateCache,
@Nullable final ActorRef origin) {
final boolean invalidateThing,
final boolean invalidatePolicy, @Nullable final ActorRef origin) {

this.thingId = thingId;
this.thingRevision = thingRevision;
Expand All @@ -69,7 +70,8 @@ private Metadata(final ThingId thingId,
this.events = events;
this.timers = List.copyOf(timers);
this.senders = List.copyOf(senders);
this.invalidateCache = invalidateCache;
this.invalidateThing = invalidateThing;
this.invalidatePolicy = invalidatePolicy;
this.origin = origin;
}

Expand All @@ -90,7 +92,7 @@ public static Metadata of(final ThingId thingId,
@Nullable final StartedTimer timer) {

return new Metadata(thingId, thingRevision, policyId, policyRevision, null,
List.of(), null != timer ? List.of(timer) : List.of(), List.of(), false, null);
List.of(), null != timer ? List.of(timer) : List.of(), List.of(), false, false, null);
}

/**
Expand All @@ -114,7 +116,7 @@ public static Metadata of(final ThingId thingId,

return new Metadata(thingId, thingRevision, policyId, policyRevision, null, events,
null != timer ? List.of(timer) : List.of(),
null != sender ? List.of(sender) : List.of(), false, null);
null != sender ? List.of(sender) : List.of(), false, false, null);
}

/**
Expand All @@ -138,7 +140,7 @@ public static Metadata of(final ThingId thingId,
final Collection<ActorRef> senders) {

return new Metadata(thingId, thingRevision, policyId, policyRevision, modified, List.of(), timers, senders,
false, null);
false, false, null);
}

/**
Expand All @@ -160,7 +162,7 @@ public static Metadata of(final ThingId thingId,
@Nullable final StartedTimer timer) {

return new Metadata(thingId, thingRevision, policyId, policyRevision, modified,
List.of(), null != timer ? List.of(timer) : List.of(), List.of(), false, null);
List.of(), null != timer ? List.of(timer) : List.of(), List.of(), false, false, null);
}

/**
Expand All @@ -179,11 +181,13 @@ public static Metadata fromResponse(final UpdateThingResponse updateThingRespons
/**
* Create a copy of this metadata requesting cache invalidation.
*
* @param invalidateThing whether to invalidate the cached thing.
* @param invalidatePolicy whether to invalidate the cached policy enforcer.
* @return the copy.
*/
public Metadata invalidateCache() {
return new Metadata(thingId, thingRevision, policyId, policyRevision, modified, events, timers, senders, true,
origin);
public Metadata invalidateCaches(final boolean invalidateThing, final boolean invalidatePolicy) {
return new Metadata(thingId, thingRevision, policyId, policyRevision, modified, events, timers, senders,
invalidateThing, invalidatePolicy, origin);
}

/**
Expand All @@ -193,7 +197,7 @@ public Metadata invalidateCache() {
*/
public Metadata withOrigin(@Nullable final ActorRef origin) {
return new Metadata(thingId, thingRevision, policyId, policyRevision, modified, events, timers, senders,
invalidateCache, origin);
invalidateThing, invalidatePolicy, origin);
}

/**
Expand Down Expand Up @@ -296,13 +300,22 @@ public boolean isShouldAcknowledge() {
return !senders.isEmpty();
}

/**
* Returns whether this metadata should invalidate the cached thing.
*
* @return whether to invalidate the cached thing.
*/
public boolean shouldInvalidateThing() {
return invalidateThing;
}

/**
* Returns whether this metadata should invalidate the enforcer cache.
*
* @return whether to invalidate the enforcer cache.
*/
public boolean shouldInvalidateCache() {
return invalidateCache;
public boolean shouldInvalidatePolicy() {
return invalidatePolicy;
}

/**
Expand All @@ -320,7 +333,7 @@ public Metadata append(final Metadata newMetadata) {
Stream.concat(senders.stream(), newMetadata.senders.stream()).collect(Collectors.toList());
return new Metadata(newMetadata.thingId, newMetadata.thingRevision, newMetadata.policyId,
newMetadata.policyRevision, newMetadata.modified, newEvents, newTimers, newSenders,
invalidateCache || newMetadata.invalidateCache, newMetadata.origin);
invalidateThing || newMetadata.invalidateThing, invalidatePolicy, newMetadata.origin);
}

/**
Expand Down Expand Up @@ -365,14 +378,14 @@ public boolean equals(final Object o) {
Objects.equals(events, that.events) &&
Objects.equals(timers, that.timers) &&
Objects.equals(senders, that.senders) &&
invalidateCache == that.invalidateCache &&
invalidateThing == that.invalidateThing &&
Objects.equals(origin, that.origin);
}

@Override
public int hashCode() {
return Objects.hash(thingId, thingRevision, policyId, policyRevision, modified, events, timers, senders,
invalidateCache, origin);
invalidateThing, origin);
}

@Override
Expand All @@ -386,7 +399,7 @@ public String toString() {
", events=" + events +
", timers=[" + timers.size() + " timers]" +
", senders=" + senders +
", invalidateCache=" + invalidateCache +
", invalidateCache=" + invalidateThing +
", origin=" + origin +
"]";
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,15 @@
import java.util.concurrent.CompletionStage;

import org.eclipse.ditto.base.model.headers.DittoHeaders;
import org.eclipse.ditto.internal.models.streaming.LowerBound;
import org.eclipse.ditto.internal.utils.akka.controlflow.MergeSortedAsPair;
import org.eclipse.ditto.policies.api.commands.sudo.SudoRetrievePolicyRevision;
import org.eclipse.ditto.policies.api.commands.sudo.SudoRetrievePolicyRevisionResponse;
import org.eclipse.ditto.policies.model.PolicyConstants;
import org.eclipse.ditto.policies.model.PolicyId;
import org.eclipse.ditto.things.model.ThingConstants;
import org.eclipse.ditto.things.model.ThingId;
import org.eclipse.ditto.policies.api.commands.sudo.SudoRetrievePolicyRevision;
import org.eclipse.ditto.policies.api.commands.sudo.SudoRetrievePolicyRevisionResponse;
import org.eclipse.ditto.internal.models.streaming.LowerBound;
import org.eclipse.ditto.thingsearch.service.persistence.write.model.Metadata;
import org.eclipse.ditto.internal.utils.akka.controlflow.MergeSortedAsPair;

import akka.NotUsed;
import akka.actor.ActorRef;
Expand Down Expand Up @@ -162,14 +162,14 @@ private Source<Metadata, NotUsed> confirmPersistedAndNotIndexed(final Metadata p
*/
private Source<Metadata, NotUsed> emitUnlessConsistent(final Metadata persisted, final Metadata indexed) {
if (persisted.getThingRevision() > indexed.getThingRevision()) {
return Source.single(indexed).log("RevisionMismatch");
return Source.single(indexed.invalidateCaches(true, false)).log("RevisionMismatch");
} else {
final Optional<PolicyId> persistedPolicyId = persisted.getPolicyId();
final Optional<PolicyId> indexedPolicyId = indexed.getPolicyId();
// policy IDs are equal and nonempty; retrieve and compare policy revision
// policy IDs are empty - the entries are consistent.
if (!persistedPolicyId.equals(indexedPolicyId)) {
return Source.single(indexed).log("PolicyIdMismatch");
return Source.single(indexed.invalidateCaches(false, true)).log("PolicyIdMismatch");
} else {
return persistedPolicyId.map(policyId -> retrievePolicyRevisionAndEmitMismatch(policyId, indexed))
.orElseGet(Source::empty);
Expand Down Expand Up @@ -213,17 +213,18 @@ private Source<Metadata, NotUsed> retrievePolicyRevisionAndEmitMismatch(final Po
if (error != null) {
return Source.single(error)
.log("ErrorRetrievingPolicyRevision " + policyId)
.map(e -> indexed);
.map(e -> indexed.invalidateCaches(true, true));
} else if (response instanceof SudoRetrievePolicyRevisionResponse) {
final long revision = ((SudoRetrievePolicyRevisionResponse) response).getRevision();
return indexed.getPolicyRevision()
.filter(indexedPolicyRevision -> indexedPolicyRevision.equals(revision))
.map(indexedPolicyRevision -> Source.<Metadata>empty())
.orElseGet(() -> Source.single(indexed).log("PolicyRevisionMismatch"));
.orElseGet(() -> Source.single(indexed.invalidateCaches(false, true))
.log("PolicyRevisionMismatch"));
} else {
return Source.single(response)
.log("UnexpectedPolicyResponse")
.map(r -> indexed);
.map(r -> indexed.invalidateCaches(true, true));
}
});
return Source.completionStageSource(sourceCompletionStage)
Expand All @@ -243,7 +244,8 @@ private static int compareMetadata(final Metadata metadata1, final Metadata meta
* thing ID is bigger, and 0 if both are equal.
*/
public static int compareThingIds(final ThingId thingId1, final ThingId thingId2) {
final int emptyThingComparison = Boolean.compare(thingId1.equals(EMPTY_THING_ID), thingId2.equals(EMPTY_THING_ID));
final int emptyThingComparison =
Boolean.compare(thingId1.equals(EMPTY_THING_ID), thingId2.equals(EMPTY_THING_ID));
return emptyThingComparison != 0 ? emptyThingComparison : thingId1.compareTo(thingId2);
}

Expand Down
Loading

0 comments on commit 86bc303

Please sign in to comment.