Skip to content

Commit

Permalink
Fix endless update due to skipped updates.
Browse files Browse the repository at this point in the history
Signed-off-by: Yufei Cai <yufei.cai@bosch.io>
  • Loading branch information
yufei-cai committed May 26, 2022
1 parent 8bd95bb commit 8864b76
Show file tree
Hide file tree
Showing 5 changed files with 47 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,14 @@ protected AbstractWriteModel(final Metadata metadata) {
*/
public abstract WriteModel<BsonDocument> toMongo();

/**
* Create a copy of this write model with a new metadata.
*
* @param metadata The new metadata.
* @return The copy.
*/
public abstract AbstractWriteModel setMetadata(final Metadata metadata);

/**
* Convert this into a MongoDB write model taking the previous update into consideration.
*
Expand Down Expand Up @@ -81,15 +89,6 @@ public Bson getFilter() {
return Filters.eq(PersistenceConstants.FIELD_ID, new BsonString(metadata.getThingId().toString()));
}

/**
* Check whether this update is a patch update based on a specific sequence number.
*
* @return Whether this is a patch update.
*/
public boolean isPatchUpdate() {
return false;
}

@Override
public boolean equals(@Nullable final Object o) {
if (this == o) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,11 @@ public WriteModel<BsonDocument> toMongo() {
return new DeleteOneModel<>(filter);
}

@Override
public ThingDeleteModel setMetadata(final Metadata metadata) {
return new ThingDeleteModel(metadata);
}

@Override
public String toString() {
return super.toString() + "]";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,11 @@ public WriteModel<BsonDocument> toMongo() {
return new ReplaceOneModel<>(getFilter(), thingDocument, upsertOption());
}

@Override
public ThingWriteModel setMetadata(final Metadata metadata) {
return new ThingWriteModel(metadata, thingDocument, isPatchUpdate, previousRevision);
}

/**
* @return the Thing document to be written in the persistence.
*/
Expand All @@ -116,11 +121,6 @@ public Bson getFilter() {
}
}

@Override
public boolean isPatchUpdate() {
return isPatchUpdate;
}

private ReplaceOptions upsertOption() {
return new ReplaceOptions().upsert(!isPatchUpdate);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -349,7 +349,7 @@ private FSM.State<State, Data> onDone(final Done done, final Data data) {
killSwitch = null;
final var nextMetadata = data.metadata().export();
log.debug("Update skipped: <{}>", nextMetadata);
return goTo(State.READY).using(new Data(nextMetadata, data.lastWriteModel()));
return goTo(State.READY).using(new Data(nextMetadata, data.lastWriteModel().setMetadata(nextMetadata)));
}

private FSM.State<State, Data> tick(final Control tick, final Data data) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import java.net.URLEncoder;
import java.nio.charset.Charset;
import java.time.Duration;
import java.time.Instant;
import java.util.List;

import org.bson.BsonArray;
Expand Down Expand Up @@ -65,6 +66,7 @@
import akka.stream.javadsl.Flow;
import akka.stream.javadsl.Keep;
import akka.stream.javadsl.MergeHub;
import akka.stream.javadsl.Sink;
import akka.stream.javadsl.Source;
import akka.stream.scaladsl.BroadcastHub;
import akka.stream.testkit.TestPublisher;
Expand Down Expand Up @@ -277,7 +279,7 @@ public void updateFromEventAfterSkippedUpdate() {
assertThat(data2.metadata().export()).isEqualTo(Metadata.of(THING_ID, REVISION + 2, null, null, null));
assertThat(data2.metadata().getTimers().size()).isEqualTo(1);
assertThat(data2.metadata().getSenders()).isEmpty();
assertThat(data2.lastWriteModel()).isEqualTo(getThingWriteModel());
assertThat(data2.lastWriteModel()).isEqualTo(getThingWriteModel().setMetadata(data.metadata().export()));
}};
}

Expand Down Expand Up @@ -576,6 +578,31 @@ public void shutdownOnCommand() {
}};
}

@Test
public void updateSkipped() {
new TestKit(system) {{
// GIVEN: search mapper decides to skip updates
final TestProbe inputProbe = TestProbe.apply(system);
final Flow<ThingUpdater.Data, ThingUpdater.Result, NotUsed> flow = Flow.fromSinkAndSource(
Sink.foreach(data -> inputProbe.ref().tell(data, ActorRef.noSender())),
Source.empty()
);

final Props props =
ThingUpdater.props(flow, id -> Source.single(getThingWriteModel()), SEARCH_CONFIG,
TestProbe.apply(system).ref());
final ActorRef underTest = watch(childActorOf(props, ACTOR_NAME));

// WHEN: An event of the next revision arrives
underTest.tell(AttributeModified.of(THING_ID, JsonPointer.of("x"), JsonValue.of(5), REVISION + 1, null,
DittoHeaders.empty(), null), ActorRef.noSender());

// THEN: Update is triggered only once
inputProbe.expectMsgClass(ThingUpdater.Data.class);
inputProbe.expectNoMessage(FiniteDuration.apply(5, "s"));
}};
}

private static ThingUpdater.Result getOKResult(final long revision) {
final var mongoWriteModel =
MongoWriteModel.of(getThingWriteModel(revision),
Expand Down

0 comments on commit 8864b76

Please sign in to comment.