Skip to content

Commit

Permalink
stop thing updater if initial update was skipped to avoid endless upd…
Browse files Browse the repository at this point in the history
…ate loop

Signed-off-by: Dominik Guggemos <dominik.guggemos@bosch.io>
  • Loading branch information
dguggemos committed May 31, 2022
1 parent efb7639 commit 7d0adb2
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -349,7 +349,14 @@ private FSM.State<State, Data> onDone(final Done done, final Data data) {
killSwitch = null;
final var nextMetadata = data.metadata().export();
log.debug("Update skipped: <{}>", nextMetadata);
return goTo(State.READY).using(new Data(nextMetadata, data.lastWriteModel().setMetadata(nextMetadata)));

// initial update was skipped, stop updater to avoid endless skipped updates
if (data.metadata().getThingRevision() <= 0 && data.lastWriteModel().getMetadata().getThingRevision() <= 0) {
log.info("Initial update was skipped - stopping thing updater for <{}>.", thingId);
return stop();
} else {
return goTo(State.READY).using(new Data(nextMetadata, data.lastWriteModel().setMetadata(nextMetadata)));
}
}

private FSM.State<State, Data> tick(final Control tick, final Data data) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
import java.net.URLEncoder;
import java.nio.charset.Charset;
import java.time.Duration;
import java.time.Instant;
import java.util.List;

import org.bson.BsonArray;
Expand Down Expand Up @@ -603,6 +602,32 @@ public void updateSkipped() {
}};
}

@Test
public void initialUpdateSkipped() {
new TestKit(system) {{
// GIVEN: search mapper decides to skip updates
final TestProbe inputProbe = TestProbe.apply(system);
final Flow<ThingUpdater.Data, ThingUpdater.Result, NotUsed> flow = Flow.fromSinkAndSource(
Sink.foreach(data -> inputProbe.ref().tell(data, ActorRef.noSender())),
Source.empty()
);

final Props props =
ThingUpdater.props(flow, id -> Source.single(ThingDeleteModel.of(Metadata.of(THING_ID, -1, null,
null, null))), SEARCH_CONFIG,
TestProbe.apply(system).ref());
final ActorRef underTest = watch(childActorOf(props, ACTOR_NAME));

// WHEN: An event of the next revision arrives
underTest.tell(UpdateThing.of(THING_ID, UpdateReason.BACKGROUND_SYNC, DittoHeaders.empty()),
ActorRef.noSender());

// THEN: Update is triggered only once
inputProbe.expectMsgClass(ThingUpdater.Data.class);
inputProbe.expectNoMessage(FiniteDuration.apply(5, "s"));
}};
}

private static ThingUpdater.Result getOKResult(final long revision) {
final var mongoWriteModel =
MongoWriteModel.of(getThingWriteModel(revision),
Expand Down

0 comments on commit 7d0adb2

Please sign in to comment.