Skip to content

Commit

Permalink
added unit test ThingUpdaterTest.parallelUpdatesProvokingIncorrectPat…
Browse files Browse the repository at this point in the history
…chUpdate in order to test bugfix for parallelism problems in ThingUpdater

Signed-off-by: Thomas Jaeckle <thomas.jaeckle@bosch.io>
  • Loading branch information
thjaeckle committed Feb 23, 2022
1 parent 7679588 commit ce57d24
Showing 1 changed file with 97 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -338,6 +338,103 @@ public void forceUpdateAfterInitialStart() throws InterruptedException {
}};
}

@Test
public void parallelUpdatesProvokingIncorrectPatchUpdate() throws InterruptedException {
new TestKit(actorSystem) {{
final PolicyId policyId = PolicyId.of(THING_ID);

final TestPublisher.Probe<Object> probe = TestPublisher.probe(1, actorSystem);
doAnswer(invocation -> {
probe.subscribe(invocation.getArgument(0, Subscriber.class));
return null;
}).when(findPublisher).subscribe(any());

DocumentCodec codec = new DocumentCodec();
DecoderContext decoderContext = DecoderContext.builder().build();
final BsonDocument existingIndexBsonDocument = new BsonDocument()
.append(PersistenceConstants.FIELD_ID, new BsonString(THING_ID.toString()))
.append(PersistenceConstants.FIELD_REVISION, new BsonInt64(1234L))
.append(PersistenceConstants.FIELD_POLICY_ID, new BsonString(THING_ID.toString()))
.append(PersistenceConstants.FIELD_POLICY_REVISION, new BsonInt64(1L))
.append(PersistenceConstants.FIELD_NAMESPACE, new BsonString(THING_ID.getNamespace()))
.append(PersistenceConstants.FIELD_GLOBAL_READ, new BsonString("pre:ditto"))
.append(PersistenceConstants.FIELD_SORTING, new BsonDocument().append("Lorem ipsum",
new BsonString("Lorem ipsum dolor sit amet, consectetur adipiscing elit, " +
"sed do eiusmod tempor incididunt ut labore et dolore magna aliqua.")))
.append(PersistenceConstants.FIELD_INTERNAL, new BsonArray());

final CountDownLatch recoveryCompleteLatch = new CountDownLatch(1);
final Consumer<AbstractWriteModel> recoveryCompleteConsumer = writeModel -> {
assertThat(writeModel).isEqualTo(
ThingWriteModel.of(
Metadata.of(THING_ID, 1234L, PolicyId.of(THING_ID), 1L, null),
existingIndexBsonDocument));
recoveryCompleteLatch.countDown();
};
final Props props = Props.create(ThingUpdater.class,
() -> new ThingUpdater(pubSubTestProbe.ref(), changeQueueTestProbe.ref(), 0.0,
Duration.ZERO, 0.0, mongoClientExtension, true, true,
recoveryCompleteConsumer));
final var underTest = childActorOf(props, THING_ID.toString());

probe.expectRequest();
final var existingIndexDocument = codec.decode(new BsonDocumentReader(existingIndexBsonDocument),
decoderContext);
probe.sendNext(existingIndexDocument);

// wait until Actor was recovered:
assertThat(recoveryCompleteLatch.await(5L, TimeUnit.SECONDS)).isTrue();

final var document = new BsonDocument()
.append("_revision", new BsonInt64(1235))
.append("d", new BsonArray())
.append("s", new BsonDocument().append("Lorem ipsum", new BsonString(
"Lorem ipsum dolor sit amet, consectetur adipiscing elit, " +
"sed do eiusmod tempor incididunt ut labore et dolore magna aliqua."
)));
final var writeModel = ThingWriteModel.of(Metadata.of(THING_ID, 1235L, policyId, 1L, null), document);

// WHEN: updater is requested to compute incremental update against an update
underTest.tell(UpdateThing.of(THING_ID, UpdateReason.UNKNOWN, DittoHeaders.empty()), getRef());
underTest.tell(writeModel, getRef());

final UpdateOneModel<?> updateOneModel = expectMsgClass(UpdateOneModel.class);
Assertions.assertThat(updateOneModel.getFilter()).isEqualTo(Filters.and(
Filters.eq(PersistenceConstants.FIELD_ID, new BsonString(THING_ID.toString())),
Filters.eq(PersistenceConstants.FIELD_REVISION, BsonNumber.apply(1234L))
));

// THEN: an update is triggered
changeQueueTestProbe.expectMsgClass(Metadata.class);

// WHEN: updater is requested to compute incremental update against another update
final var document2 = new BsonDocument()
.append("_revision", new BsonInt64(1236))
.append("d", new BsonArray())
.append("s", document.get("s").asDocument()
.append("The real lorem ipsum", new BsonString("Lorem ipsum!")));
final var writeModel2 = ThingWriteModel.of(Metadata.of(THING_ID, 1236L, policyId, 1L, null), document2);
underTest.tell(UpdateThing.of(THING_ID, UpdateReason.UNKNOWN, DittoHeaders.empty()), getRef());
underTest.tell(writeModel2, getRef());

// THEN: no UpdateOneModel/ReplaceOneModel is expected yet, as the bulkWrite did not yet complete
expectNoMessage(Duration.ofSeconds(2));

// WHEN: updater is notified about the completed bulkWrite
underTest.tell(BulkWriteComplete.of("correlation-id"), getRef());

// THEN: expect UpdateOneModel for writeModel2 is generated - including the expected revisionNumber in the filter
final UpdateOneModel<?> updateOneModel2 = expectMsgClass(UpdateOneModel.class);
Assertions.assertThat(updateOneModel2.getFilter()).isEqualTo(Filters.and(
Filters.eq(PersistenceConstants.FIELD_ID, new BsonString(THING_ID.toString())),
Filters.eq(PersistenceConstants.FIELD_REVISION, BsonNumber.apply(1235L))
));

// THEN: an update is triggered
changeQueueTestProbe.expectMsgClass(Metadata.class);
}};
}

private ActorRef createThingUpdaterActor() {
final Props props = Props.create(ThingUpdater.class,
() -> new ThingUpdater(pubSubTestProbe.ref(), changeQueueTestProbe.ref(), 0.0, Duration.ZERO,
Expand Down

0 comments on commit ce57d24

Please sign in to comment.