Skip to content

Commit

Permalink
extend UpdateThing with UpdateReason;
Browse files Browse the repository at this point in the history
moved UpdateReason into api module of things-search;
extract updateReason in ThingUpdater from UpdateThing command;

Signed-off-by: Stefan Maute <stefan.maute@bosch.io>
  • Loading branch information
Stefan Maute committed Dec 7, 2021
1 parent dd7d4b3 commit 1278363
Show file tree
Hide file tree
Showing 11 changed files with 79 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
*
* SPDX-License-Identifier: EPL-2.0
*/
package org.eclipse.ditto.thingsearch.service.persistence.write.model;
package org.eclipse.ditto.thingsearch.api;

/**
* Describes the reason why a thing is being updated in the search index.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.eclipse.ditto.things.model.Thing;
import org.eclipse.ditto.things.model.ThingId;
import org.eclipse.ditto.things.model.WithThingId;
import org.eclipse.ditto.thingsearch.api.UpdateReason;
import org.eclipse.ditto.thingsearch.model.signals.commands.ThingSearchCommand;
import org.eclipse.ditto.utils.jsr305.annotations.AllValuesAreNonnullByDefault;

Expand Down Expand Up @@ -68,21 +69,26 @@ public final class UpdateThing extends AbstractCommand<UpdateThing> implements S
JsonFactory.newBooleanFieldDefinition("invalidateThing", FieldType.REGULAR, V_2);
private static final JsonFieldDefinition<Boolean> JSON_INVALIDATE_POLICY =
JsonFactory.newBooleanFieldDefinition("invalidatePolicy", FieldType.REGULAR, V_2);
private static final JsonFieldDefinition<String> JSON_UPDATE_REASON =
JsonFactory.newStringFieldDefinition("updateReason", FieldType.REGULAR, V_2);


private final ThingId thingId;
private final boolean invalidateThing;
private final boolean invalidatePolicy;
private final UpdateReason updateReason;

private UpdateThing(final ThingId thingId,
final boolean invalidateThing,
final boolean invalidatePolicy,
final UpdateReason updateReason,
final DittoHeaders dittoHeaders) {

super(TYPE, dittoHeaders);
this.thingId = thingId;
this.invalidateThing = invalidateThing;
this.invalidatePolicy = invalidatePolicy;
this.updateReason = updateReason;
}

/**
Expand All @@ -92,8 +98,10 @@ private UpdateThing(final ThingId thingId,
* @param dittoHeaders Ditto headers of the command.
* @return the command.
*/
public static UpdateThing of(final ThingId thingId, final DittoHeaders dittoHeaders) {
return new UpdateThing(thingId, true, true, dittoHeaders);
public static UpdateThing of(final ThingId thingId,
final UpdateReason updateReason,
final DittoHeaders dittoHeaders) {
return new UpdateThing(thingId, true, true, updateReason, dittoHeaders);
}

/**
Expand All @@ -109,9 +117,9 @@ public static UpdateThing of(final ThingId thingId, final DittoHeaders dittoHead
public static UpdateThing of(final ThingId thingId,
final boolean invalidateThing,
final boolean invalidatePolicy,
final UpdateReason updateReason,
final DittoHeaders dittoHeaders) {

return new UpdateThing(thingId, invalidateThing, invalidatePolicy, dittoHeaders);
return new UpdateThing(thingId, invalidateThing, invalidatePolicy, updateReason, dittoHeaders);
}

/**
Expand All @@ -129,6 +137,7 @@ public static UpdateThing fromJson(final JsonObject jsonObject, final DittoHeade
ThingId.of(jsonObject.getValueOrThrow(JSON_THING_ID)),
jsonObject.getValueOrThrow(JSON_INVALIDATE_THING),
jsonObject.getValueOrThrow(JSON_INVALIDATE_POLICY),
UpdateReason.valueOf(jsonObject.getValueOrThrow(JSON_UPDATE_REASON)),
dittoHeaders
);
}
Expand All @@ -138,7 +147,8 @@ protected void appendPayload(final JsonObjectBuilder jsonObjectBuilder, final Js
final Predicate<JsonField> predicate) {
jsonObjectBuilder.set(JSON_THING_ID, thingId.toString(), predicate)
.set(JSON_INVALIDATE_THING, invalidateThing, predicate)
.set(JSON_INVALIDATE_POLICY, invalidatePolicy, predicate);
.set(JSON_INVALIDATE_POLICY, invalidatePolicy, predicate)
.set(JSON_UPDATE_REASON, updateReason.toString(), predicate);
}

@Override
Expand All @@ -153,7 +163,7 @@ public Category getCategory() {

@Override
public UpdateThing setDittoHeaders(final DittoHeaders dittoHeaders) {
return new UpdateThing(thingId, invalidateThing, invalidatePolicy, dittoHeaders);
return new UpdateThing(thingId, invalidateThing, invalidatePolicy, updateReason, dittoHeaders);
}

@Override
Expand Down Expand Up @@ -181,6 +191,16 @@ public boolean shouldInvalidatePolicy() {
return invalidatePolicy;
}

/**
* Return the update reason.
*
* @return the update reason.
* @since 2.3.0
*/
public UpdateReason getUpdateReason() {
return updateReason;
}

@Override
public JsonPointer getResourcePath() {
return JsonPointer.empty();
Expand All @@ -200,13 +220,14 @@ public boolean equals(final Object o) {
return Objects.equals(thingId, that.thingId) &&
invalidateThing == that.invalidateThing &&
invalidatePolicy == that.invalidatePolicy &&
updateReason == that.updateReason &&
super.equals(that);
}
}

@Override
public int hashCode() {
return Objects.hash(super.hashCode(), thingId, invalidateThing, invalidatePolicy);
return Objects.hash(super.hashCode(), thingId, invalidateThing, invalidatePolicy, updateReason);
}

@Override
Expand All @@ -216,6 +237,8 @@ public String toString() {
",thingId=" + thingId +
",invalidateThing=" + invalidateThing +
",invalidatePolicy=" + invalidatePolicy +
",updateReason=" + updateReason +
"]";
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import org.eclipse.ditto.base.model.headers.DittoHeaders;
import org.eclipse.ditto.json.JsonObject;
import org.eclipse.ditto.things.model.ThingId;
import org.eclipse.ditto.thingsearch.api.UpdateReason;
import org.junit.Test;
import org.mutabilitydetector.unittesting.AllowedReason;
import org.mutabilitydetector.unittesting.MutabilityAssert;
Expand Down Expand Up @@ -46,9 +47,11 @@ public void testHashCodeAndEquals() {
@Test
public void testSerialization() {
final DittoHeaders dittoHeaders = DittoHeaders.newBuilder().randomCorrelationId().build();
final UpdateThing command = UpdateThing.of(ThingId.of("namespace", "name"), true, false, dittoHeaders);
final UpdateThing command =
UpdateThing.of(ThingId.of("namespace", "name"), true, false, UpdateReason.UNKNOWN, dittoHeaders);
final String jsonString = command.toJsonString();
final UpdateThing deserializedCommand = UpdateThing.fromJson(JsonObject.of(jsonString), dittoHeaders);

assertThat(deserializedCommand).isEqualTo(command);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,13 @@
import javax.annotation.Nonnull;
import javax.annotation.concurrent.Immutable;

import org.eclipse.ditto.base.model.entity.id.NamespacedEntityId;
import org.eclipse.ditto.base.model.entity.metadata.Metadata;
import org.eclipse.ditto.base.model.headers.DittoHeaders;
import org.eclipse.ditto.base.model.json.FieldType;
import org.eclipse.ditto.base.model.json.JsonParsableEvent;
import org.eclipse.ditto.base.model.json.JsonSchemaVersion;
import org.eclipse.ditto.base.model.signals.events.Event;
import org.eclipse.ditto.json.JsonArray;
import org.eclipse.ditto.json.JsonCollectors;
import org.eclipse.ditto.json.JsonFactory;
Expand All @@ -30,14 +37,7 @@
import org.eclipse.ditto.json.JsonObject;
import org.eclipse.ditto.json.JsonPointer;
import org.eclipse.ditto.json.JsonValue;
import org.eclipse.ditto.base.model.entity.id.NamespacedEntityId;
import org.eclipse.ditto.base.model.entity.metadata.Metadata;
import org.eclipse.ditto.base.model.headers.DittoHeaders;
import org.eclipse.ditto.base.model.json.FieldType;
import org.eclipse.ditto.base.model.json.JsonParsableEvent;
import org.eclipse.ditto.base.model.json.JsonSchemaVersion;
import org.eclipse.ditto.things.model.ThingId;
import org.eclipse.ditto.base.model.signals.events.Event;

/**
* Event to report out-of-sync things in the search index.
Expand All @@ -64,8 +64,7 @@ public final class ThingsOutOfSync implements Event<ThingsOutOfSync> {
public static final String TYPE = TYPE_PREFIX + NAME;

private static final JsonFieldDefinition<JsonArray> JSON_THING_IDS =
JsonFactory.newJsonArrayFieldDefinition("thingIds", FieldType.REGULAR,
JsonSchemaVersion.V_2);
JsonFactory.newJsonArrayFieldDefinition("thingIds", FieldType.REGULAR, JsonSchemaVersion.V_2);

private final Collection<ThingId> thingIds;
private final DittoHeaders dittoHeaders;
Expand Down Expand Up @@ -103,6 +102,7 @@ public static ThingsOutOfSync fromJson(final JsonObject jsonObject, final DittoH
.map(JsonValue::asString)
.map(ThingId::of)
.collect(Collectors.toList());

return of(thingIds, dittoHeaders);
}

Expand Down Expand Up @@ -186,6 +186,7 @@ public JsonObject toJson(final JsonSchemaVersion schemaVersion, final Predicate<
.map(NamespacedEntityId::toString)
.map(JsonFactory::newValue)
.collect(JsonCollectors.valuesToArray());

return JsonFactory.newObjectBuilder()
// TYPE is included unconditionally
.set(JsonFields.TYPE, TYPE)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.eclipse.ditto.policies.model.PolicyId;
import org.eclipse.ditto.things.model.ThingId;
import org.eclipse.ditto.things.model.signals.events.ThingEvent;
import org.eclipse.ditto.thingsearch.api.UpdateReason;
import org.eclipse.ditto.thingsearch.api.commands.sudo.UpdateThingResponse;

import akka.actor.ActorRef;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.eclipse.ditto.json.JsonObjectBuilder;
import org.eclipse.ditto.things.model.ThingConstants;
import org.eclipse.ditto.things.model.ThingId;
import org.eclipse.ditto.thingsearch.api.UpdateReason;
import org.eclipse.ditto.thingsearch.api.commands.sudo.UpdateThing;
import org.eclipse.ditto.thingsearch.service.common.config.BackgroundSyncConfig;
import org.eclipse.ditto.thingsearch.service.common.config.DefaultBackgroundSyncConfig;
Expand Down Expand Up @@ -251,8 +252,9 @@ private void doBookmarkThingId(final String bookmark) {

private void handleInconsistency(final Metadata metadata) {
final var thingId = metadata.getThingId();
final var command = UpdateThing.of(thingId, metadata.shouldInvalidateThing(), metadata.shouldInvalidatePolicy(),
SEARCH_PERSISTED_HEADERS);
final var command =
UpdateThing.of(thingId, metadata.shouldInvalidateThing(), metadata.shouldInvalidatePolicy(),
UpdateReason.BACKGROUND_SYNC, SEARCH_PERSISTED_HEADERS);
final var askFuture = Patterns.ask(thingsUpdater, command, UPDATER_TIMEOUT)
.handle((result, error) -> {
if (result instanceof Acknowledgement) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.eclipse.ditto.things.api.ThingTag;
import org.eclipse.ditto.things.model.ThingId;
import org.eclipse.ditto.things.model.signals.events.ThingEvent;
import org.eclipse.ditto.thingsearch.api.UpdateReason;
import org.eclipse.ditto.thingsearch.api.commands.sudo.UpdateThing;
import org.eclipse.ditto.thingsearch.api.commands.sudo.UpdateThingResponse;
import org.eclipse.ditto.thingsearch.service.common.config.DittoSearchConfig;
Expand All @@ -51,7 +52,6 @@
import org.eclipse.ditto.thingsearch.service.persistence.write.model.Metadata;
import org.eclipse.ditto.thingsearch.service.persistence.write.model.ThingDeleteModel;
import org.eclipse.ditto.thingsearch.service.persistence.write.model.ThingWriteModel;
import org.eclipse.ditto.thingsearch.service.persistence.write.model.UpdateReason;
import org.eclipse.ditto.thingsearch.service.persistence.write.streaming.ConsistencyLag;
import org.eclipse.ditto.thingsearch.service.starter.actors.MongoClientExtension;

Expand Down Expand Up @@ -285,8 +285,7 @@ private void updateThing(final UpdateThing updateThing) {
}
final Metadata metadata = exportMetadata(null, null)
.invalidateCaches(updateThing.shouldInvalidateThing(), updateThing.shouldInvalidatePolicy())
// TODO background sync or namespace indexing?
.withUpdateReason(UpdateReason.MANUAL_REINDEXING);
.withUpdateReason(updateThing.getUpdateReason());
if (updateThing.getDittoHeaders().getAcknowledgementRequests().contains(SEARCH_PERSISTED_REQUEST)) {
enqueueMetadata(metadata.withSender(getSender()));
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.eclipse.ditto.things.api.ThingTag;
import org.eclipse.ditto.things.model.ThingId;
import org.eclipse.ditto.things.model.signals.events.ThingEvent;
import org.eclipse.ditto.thingsearch.api.UpdateReason;
import org.eclipse.ditto.thingsearch.api.commands.sudo.UpdateThing;
import org.eclipse.ditto.thingsearch.model.signals.events.ThingsOutOfSync;
import org.eclipse.ditto.thingsearch.service.common.config.UpdaterConfig;
Expand Down Expand Up @@ -175,7 +176,8 @@ private void updateThings(final ThingsOutOfSync updateThings) {
.info("Out-of-sync things are reported: <{}>", updateThings);
updateThings.getThingIds().forEach(thingId ->
forwardToShardRegion(
UpdateThing.of(ThingId.of(thingId), updateThings.getDittoHeaders()),
// TODO define update reason
UpdateThing.of(ThingId.of(thingId), UpdateReason.THING_UPDATE, updateThings.getDittoHeaders()),
UpdateThing::getEntityId,
UpdateThing::getType,
UpdateThing::toJson,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import org.eclipse.ditto.things.model.Thing;
import org.eclipse.ditto.things.model.ThingId;
import org.eclipse.ditto.thingsearch.api.SearchNamespaceReportResult;
import org.eclipse.ditto.thingsearch.api.UpdateReason;
import org.eclipse.ditto.thingsearch.api.commands.sudo.UpdateThing;
import org.eclipse.ditto.thingsearch.service.common.config.BackgroundSyncConfig;
import org.eclipse.ditto.thingsearch.service.common.config.DefaultBackgroundSyncConfig;
Expand Down Expand Up @@ -181,7 +182,7 @@ public void resettingHealthEventsAfterSyncStreamFailureClearsErrors() {
thenRespondWithPersistedThingsStream(pubSub,
List.of(createStreamedSnapshot(THING_ID, persistedRevision + 1)));
expectSyncActorToRequestThingUpdatesInSearch(thingsUpdater,
List.of(UpdateThing.of(THING_ID, true, false, HEADERS)));
List.of(UpdateThing.of(THING_ID, true, false, UpdateReason.BACKGROUND_SYNC, HEADERS)));

expectSyncActorToBeUpWithWarning(underTest, this);

Expand Down Expand Up @@ -209,14 +210,14 @@ public void providesHealthWarningWhenSameThingIsSynchronizedTwice() {
expectSyncActorToStartStreaming(pubSub);
thenRespondWithPersistedThingsStream(pubSub, List.of(createStreamedSnapshot(THING_ID, persistedRevision)));
expectSyncActorToRequestThingUpdatesInSearch(thingsUpdater,
List.of(UpdateThing.of(THING_ID, true, false, HEADERS)));
List.of(UpdateThing.of(THING_ID, true, false, UpdateReason.BACKGROUND_SYNC, HEADERS)));

// second synchronization stream
whenSearchPersistenceHasIndexedThings(List.of(indexedThingMetadata));
expectSyncActorToStartStreaming(pubSub, backgroundSyncConfig.getIdleTimeout());
thenRespondWithPersistedThingsStream(pubSub, List.of(createStreamedSnapshot(THING_ID, persistedRevision)));
expectSyncActorToRequestThingUpdatesInSearch(thingsUpdater,
List.of(UpdateThing.of(THING_ID, true, false, HEADERS)));
List.of(UpdateThing.of(THING_ID, true, false, UpdateReason.BACKGROUND_SYNC, HEADERS)));

// expect health to have events for both runs
syncActorShouldHaveHealth(underTest, this, StatusInfo.Status.UP, List.of(StatusDetailMessage.Level.WARN),
Expand Down Expand Up @@ -249,14 +250,14 @@ public void noHealthWarningAfterSuccessfulStream() {
expectSyncActorToStartStreaming(pubSub);
thenRespondWithPersistedThingsStream(pubSub, streamedSnapshots);
expectSyncActorToRequestThingUpdatesInSearch(thingsUpdater,
List.of(UpdateThing.of(THING_ID, true, false, HEADERS)));
List.of(UpdateThing.of(THING_ID, true, false, UpdateReason.BACKGROUND_SYNC, HEADERS)));

// second synchronization stream
whenSearchPersistenceHasIndexedThings(List.of(indexedThingMetadata));
expectSyncActorToStartStreaming(pubSub, backgroundSyncConfig.getIdleTimeout());
thenRespondWithPersistedThingsStream(pubSub, streamedSnapshots);
expectSyncActorToRequestThingUpdatesInSearch(thingsUpdater,
List.of(UpdateThing.of(THING_ID, true, false, HEADERS)));
List.of(UpdateThing.of(THING_ID, true, false, UpdateReason.BACKGROUND_SYNC, HEADERS)));

// third synchronization stream
whenSearchPersistenceHasIndexedThings(List.of(persistedThingMetadata));
Expand Down Expand Up @@ -292,14 +293,14 @@ public void staysHealthyWhenSameThingIsSynchronizedWithOtherRevision() {
expectSyncActorToStartStreaming(pubSub);
thenRespondWithPersistedThingsStream(pubSub, List.of(createStreamedSnapshot(THING_ID, persistedRevision)));
expectSyncActorToRequestThingUpdatesInSearch(thingsUpdater, List.of(
UpdateThing.of(THING_ID, true, false, HEADERS)));
UpdateThing.of(THING_ID, true, false, UpdateReason.BACKGROUND_SYNC, HEADERS)));

// second synchronization stream
whenSearchPersistenceHasIndexedThings(List.of(nextThingMetadata));
expectSyncActorToStartStreaming(pubSub, backgroundSyncConfig.getIdleTimeout());
thenRespondWithPersistedThingsStream(pubSub, List.of(createStreamedSnapshot(THING_ID, nextRevision)));
expectSyncActorToRequestThingUpdatesInSearch(thingsUpdater, List.of(
UpdateThing.of(THING_ID, true, false, HEADERS)));
UpdateThing.of(THING_ID, true, false, UpdateReason.BACKGROUND_SYNC, HEADERS)));

// expect health to have events for both runs
syncActorShouldHaveHealth(underTest, this, StatusInfo.Status.UP, List.of(StatusDetailMessage.Level.INFO),
Expand Down Expand Up @@ -354,10 +355,10 @@ private void thenRespondWithPersistedThingsStream(final TestKit pubSub, final Li

private void expectSyncActorToRequestThingUpdatesInSearch(final TestKit thingsUpdater) {
expectSyncActorToRequestThingUpdatesInSearch(thingsUpdater, List.of(
UpdateThing.of(KNOWN_IDs.get(0), true, false, HEADERS),
UpdateThing.of(KNOWN_IDs.get(1), true, false, HEADERS),
UpdateThing.of(KNOWN_IDs.get(2), true, false, HEADERS),
UpdateThing.of(KNOWN_IDs.get(3), true, false, HEADERS)
UpdateThing.of(KNOWN_IDs.get(0), true, false, UpdateReason.BACKGROUND_SYNC, HEADERS),
UpdateThing.of(KNOWN_IDs.get(1), true, false, UpdateReason.BACKGROUND_SYNC, HEADERS),
UpdateThing.of(KNOWN_IDs.get(2), true, false, UpdateReason.BACKGROUND_SYNC, HEADERS),
UpdateThing.of(KNOWN_IDs.get(3), true, false, UpdateReason.BACKGROUND_SYNC, HEADERS)
));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,9 @@
import org.eclipse.ditto.things.model.ThingsModelFactory;
import org.eclipse.ditto.things.model.signals.events.ThingCreated;
import org.eclipse.ditto.things.model.signals.events.ThingModified;
import org.eclipse.ditto.thingsearch.api.UpdateReason;
import org.eclipse.ditto.thingsearch.service.persistence.write.model.Metadata;
import org.eclipse.ditto.thingsearch.service.persistence.write.model.ThingWriteModel;
import org.eclipse.ditto.thingsearch.service.persistence.write.model.UpdateReason;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
Expand Down

0 comments on commit 1278363

Please sign in to comment.