Skip to content

Commit

Permalink
do not change write model of ThingUpdater after skipped update, other…
Browse files Browse the repository at this point in the history
…wise the current revision number is used in the filter of subsequent patch updates instead of the revision number of the last applied update

Signed-off-by: Dominik Guggemos <dominik.guggemos@bosch.io>
  • Loading branch information
dguggemos committed Aug 31, 2022
1 parent 3d4293d commit d4ff1d3
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 17 deletions.
Expand Up @@ -12,11 +12,9 @@
*/
package org.eclipse.ditto.thingsearch.service.updater.actors;

import java.io.UnsupportedEncodingException;
import java.net.URLDecoder;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.text.MessageFormat;
import java.time.Duration;
import java.util.List;
import java.util.Objects;
Expand Down Expand Up @@ -343,7 +341,7 @@ private FSM.State<State, Data> onDone(final Done done, final Data data) {
log.info("Initial update was skipped - stopping thing updater for <{}>.", thingId);
return stop();
} else {
return goTo(State.READY).using(new Data(nextMetadata, data.lastWriteModel().setMetadata(nextMetadata)));
return goTo(State.READY).using(new Data(nextMetadata, data.lastWriteModel()));
}
}

Expand Down Expand Up @@ -522,12 +520,8 @@ private FSM.State<State, Data> stashAndStay(final Object message, final Data ini

private ThingId tryToGetThingId() {
final Charset utf8 = StandardCharsets.UTF_8;
try {
final String actorName = self().path().name();
return ThingId.of(URLDecoder.decode(actorName, utf8.name()));
} catch (final UnsupportedEncodingException e) {
throw new IllegalStateException(MessageFormat.format("Charset <{0}> is unsupported!", utf8.name()), e);
}
final String actorName = self().path().name();
return ThingId.of(URLDecoder.decode(actorName, utf8));
}

private UniqueKillSwitch recoverLastWriteModel(final ThingId thingId,
Expand Down
Expand Up @@ -163,7 +163,7 @@ public void updateFromEvent() {
inletProbe.request(16);
final var data = inletProbe.expectNext();
assertThat(data.metadata().export()).isEqualTo(Metadata.of(THING_ID, REVISION + 1, null, null, null));
assertThat(data.metadata().getTimers().size()).isEqualTo(1);
assertThat(data.metadata().getTimers()).hasSize(1);
assertThat(data.metadata().getAckRecipients()).isEmpty();
assertThat(data.lastWriteModel()).isEqualTo(getThingWriteModel());
}};
Expand All @@ -187,7 +187,7 @@ public void updateFromEventWithAck() {
inletProbe.request(16);
final var data = inletProbe.expectNext();
assertThat(data.metadata().export()).isEqualTo(Metadata.of(THING_ID, REVISION + 1, null, null, null));
assertThat(data.metadata().getTimers().size()).isEqualTo(1);
assertThat(data.metadata().getTimers()).hasSize(1);
assertThat(data.metadata().getAckRecipients()).containsOnly(getSystem().actorSelection(getRef().path()));
assertThat(data.lastWriteModel()).isEqualTo(getThingWriteModel());
}};
Expand Down Expand Up @@ -260,7 +260,7 @@ public void updateFromEventAfterSkippedUpdate() {
inletProbe.request(16);
final var data = inletProbe.expectNext();
assertThat(data.metadata().export()).isEqualTo(Metadata.of(THING_ID, REVISION + 1, null, null, null));
assertThat(data.metadata().getTimers().size()).isEqualTo(1);
assertThat(data.metadata().getTimers()).hasSize(1);
assertThat(data.metadata().getAckRecipients()).isEmpty();
assertThat(data.lastWriteModel()).isEqualTo(getThingWriteModel());

Expand All @@ -275,9 +275,11 @@ public void updateFromEventAfterSkippedUpdate() {
// THEN: next update is processed regularly
final var data2 = inletProbe.expectNext();
assertThat(data2.metadata().export()).isEqualTo(Metadata.of(THING_ID, REVISION + 2, null, null, null));
assertThat(data2.metadata().getTimers().size()).isEqualTo(1);
assertThat(data2.metadata().getTimers()).hasSize(1);
assertThat(data2.metadata().getAckRecipients()).isEmpty();
assertThat(data2.lastWriteModel()).isEqualTo(getThingWriteModel().setMetadata(data.metadata().export()));
assertThat(data2.lastWriteModel()).isEqualTo(
// write model was not changed because previous update was skipped
getThingWriteModel());
}};
}

Expand Down Expand Up @@ -389,8 +391,8 @@ public void combineUpdatesFrom2Events() {
inletProbe.request(16);
final var data = inletProbe.expectNext();
assertThat(data.metadata().export()).isEqualTo(Metadata.of(THING_ID, REVISION + 2, null, null, null));
assertThat(data.metadata().getTimers().size()).isEqualTo(2);
assertThat(data.metadata().getEvents().size()).isEqualTo(2);
assertThat(data.metadata().getTimers()).hasSize(2);
assertThat(data.metadata().getEvents()).hasSize(2);
assertThat(data.lastWriteModel()).isEqualTo(getThingWriteModel());

// THEN: no other updates are sent
Expand Down Expand Up @@ -520,7 +522,7 @@ public void shutdownOnThingDeletedCommand() {
assertThat(data.metadata().export()).isEqualTo(Metadata.of(THING_ID, REVISION + 1, null, null, null));
assertThat(data.metadata().getUpdateReasons()).contains(UpdateReason.THING_UPDATE);
assertThat(data.metadata().getEvents()).hasOnlyElementsOfType(ThingDeleted.class);
assertThat(data.metadata().getTimers().size()).isEqualTo(1);
assertThat(data.metadata().getTimers()).hasSize(1);

outletProbe.sendNext(getOKResult(REVISION + 1));

Expand Down

0 comments on commit d4ff1d3

Please sign in to comment.