Skip to content

Commit

Permalink
ThingUpdater: Skip outdated updates.
Browse files Browse the repository at this point in the history
Signed-off-by: Yufei Cai <yufei.cai@bosch.io>
  • Loading branch information
yufei-cai committed Mar 21, 2022
1 parent 2b18280 commit 30e83a7
Show file tree
Hide file tree
Showing 6 changed files with 113 additions and 15 deletions.
Expand Up @@ -64,7 +64,7 @@ protected AbstractWriteModel(final Metadata metadata) {
*
* @return either the MongoDB write model of this object or an incremental update converting
*/
public CompletionStage<Optional<MongoWriteModel>> toIncrementalMongo() {
public final CompletionStage<Optional<MongoWriteModel>> toIncrementalMongo() {
final var origin = metadata.getOrigin();
if (origin.isPresent()) {
return Patterns.ask(origin.orElseThrow(), this, Duration.ofSeconds(10L))
Expand Down
Expand Up @@ -168,21 +168,18 @@ public <T> SubSource<List<AbstractWriteModel>, T> create(
final Source<Collection<Metadata>, T> source,
final int parallelismPerBulkShard,
final int bulkShardCount,
final int maxBulkSize,
final ActorSystem system) {
final int maxBulkSize) {

return source.flatMapConcat(changes -> Source.fromIterator(changes::iterator)
.groupBy(bulkShardCount, metadata -> Math.floorMod(metadata.getThingId().hashCode(), bulkShardCount))
.mapAsync(parallelismPerBulkShard, changedMetadata ->
.flatMapMerge(parallelismPerBulkShard, changedMetadata ->
retrieveThingFromCachingFacade(changedMetadata.getThingId(), changedMetadata)
.flatMapConcat(pair -> {
final JsonObject thing = pair.second();
searchUpdateObserver.process(changedMetadata, thing);
return computeWriteModel(changedMetadata, thing);
})
.runWith(Sink.seq(), system)
)
.mapConcat(models -> models)
.grouped(maxBulkSize)
.mergeSubstreams())
.filterNot(List::isEmpty)
Expand Down
Expand Up @@ -143,8 +143,8 @@ private Source<SubSource<String, NotUsed>, NotUsed> createRestartResultSource()
mergedSource.via(filterMapKeysByBlockedNamespaces()),
retrievalConfig.getParallelism(),
persistenceConfig.getParallelism(),
persistenceConfig.getMaxBulkSize(),
actorSystem);
persistenceConfig.getMaxBulkSize()
);

final String logName = "SearchUpdaterStream/BulkWriteResult";
final SubSource<WriteResultAndErrors, NotUsed> persistenceSource = mongoSearchUpdaterFlow.start(
Expand Down
Expand Up @@ -239,6 +239,17 @@ private void onNextWriteModel(final AbstractWriteModel nextWriteModel) {
final WriteModel<BsonDocument> mongoWriteModel;
final boolean isPatchUpdate;

if (isNextWriteModelOutDated(forceNextUpdate, lastWriteModel, nextWriteModel)) {
final var lastMetadata = lastWriteModel.getMetadata();
final var nextMetadata = nextWriteModel.getMetadata();
skipNextUpdate(nextWriteModel, String.format("reordering of revisions <%d,%d> -> <%d,%d>",
lastMetadata.getThingRevision(),
lastMetadata.getPolicyRevision().orElse(0L),
nextMetadata.getThingRevision(),
nextMetadata.getPolicyRevision().orElse(0L)));
return;
}

final boolean forceUpdate = (forceUpdateProbability > 0 && Math.random() < forceUpdateProbability) ||
forceNextUpdate;

Expand All @@ -250,11 +261,7 @@ private void onNextWriteModel(final AbstractWriteModel nextWriteModel) {
if (diff.isPresent() && diff.get().isDiffSmaller()) {
final var aggregationPipeline = diff.get().consumeAndExport();
if (aggregationPipeline.isEmpty()) {
log.debug("Skipping update due to empty diff <{}>", nextWriteModel.getClass().getSimpleName());
LOGGER.trace("Skipping update due to empty diff <{}>", nextWriteModel);
getSender().tell(Done.getInstance(), getSelf());
PATCH_SKIP_COUNT.increment();

skipNextUpdate(nextWriteModel, "empty diff");
return;
}
final var filter = ((ThingWriteModel) nextWriteModel)
Expand Down Expand Up @@ -470,8 +477,37 @@ private void recoverLastWriteModel(final ThingId thingId) {
Patterns.pipe(writeModelFuture, getContext().getDispatcher()).to(getSelf());
}

private void skipNextUpdate(final AbstractWriteModel nextWriteModel, final String reason) {
log.debug("Skipping update due to {} <{}>", reason, nextWriteModel.getClass().getSimpleName());
LOGGER.trace("Skipping update due to {} <{}>", reason, nextWriteModel);
getSender().tell(Done.getInstance(), getSelf());
PATCH_SKIP_COUNT.increment();
}

private static Duration randomizeTimeout(final Duration minTimeout, final double randomFactor) {
final long randomDelayMillis = (long) (Math.random() * randomFactor * minTimeout.toMillis());
return minTimeout.plus(Duration.ofMillis(randomDelayMillis));
}

private static boolean isNextWriteModelOutDated(final boolean forceNextUpdate,
@Nullable final AbstractWriteModel lastWriteModel,
final AbstractWriteModel nextWriteModel) {

if (lastWriteModel == null) {
return false;
} else {
final var lastMetadata = lastWriteModel.getMetadata();
final var nextMetadata = nextWriteModel.getMetadata();
final boolean isStrictlyOlder = nextMetadata.getThingRevision() < lastMetadata.getThingRevision() ||
nextMetadata.getThingRevision() == lastMetadata.getThingRevision() &&
nextMetadata.getPolicyRevision().flatMap(nextPolicyRevision ->
lastMetadata.getPolicyRevision().map(lastPolicyRevision ->
nextPolicyRevision < lastPolicyRevision))
.orElse(false);
final boolean hasSameRevisions = nextMetadata.getThingRevision() == lastMetadata.getThingRevision() &&
nextMetadata.getPolicyRevision().equals(lastMetadata.getPolicyRevision());

return isStrictlyOlder || hasSameRevisions && !forceNextUpdate;
}
}
}
Expand Up @@ -750,7 +750,7 @@ private void materializeTestProbes(final EnforcementFlow enforcementFlow, final
final int bulkShardCount, final int bulkSize) {
final var source = TestSource.<Collection<Metadata>>probe(system);
final var sink = TestSink.<List<AbstractWriteModel>>probe(system);
final var runnableGraph = enforcementFlow.create(source, parallelism, bulkShardCount, bulkSize, system)
final var runnableGraph = enforcementFlow.create(source, parallelism, bulkShardCount, bulkSize)
.mergeSubstreams()
.viaMat(KillSwitches.single(), Keep.both())
.toMat(sink, Keep.both());
Expand Down
Expand Up @@ -51,6 +51,7 @@
import org.eclipse.ditto.thingsearch.service.persistence.PersistenceConstants;
import org.eclipse.ditto.thingsearch.service.persistence.write.model.AbstractWriteModel;
import org.eclipse.ditto.thingsearch.service.persistence.write.model.Metadata;
import org.eclipse.ditto.thingsearch.service.persistence.write.model.ThingDeleteModel;
import org.eclipse.ditto.thingsearch.service.persistence.write.model.ThingWriteModel;
import org.eclipse.ditto.thingsearch.service.starter.actors.MongoClientExtension;
import org.junit.After;
Expand Down Expand Up @@ -243,7 +244,7 @@ public void recoverLastWriteModel() {
"Lorem ipsum dolor sit amet, consectetur adipiscing elit, " +
"sed do eiusmod tempor incididunt ut labore et dolore magna aliqua."
)));
final var writeModel = ThingWriteModel.of(Metadata.of(THING_ID, -1, null, null, null), document);
final var writeModel = ThingWriteModel.of(Metadata.of(THING_ID, 1234, null, null, null), document);

// GIVEN: updater is recovered with a write model
underTest.tell(writeModel, ActorRef.noSender());
Expand All @@ -257,6 +258,70 @@ public void recoverLastWriteModel() {
}};
}

@Test
public void refuseToPerformOutOfOrderUpdate() {
new TestKit(actorSystem) {{
final Props props = Props.create(ThingUpdater.class,
() -> new ThingUpdater(pubSubTestProbe.ref(), changeQueueTestProbe.ref(), 0.0,
Duration.ZERO, 0.0, mongoClientExtension, false, true,
writeModel -> {}));
final var underTest = childActorOf(props, THING_ID.toString());

final var document = new BsonDocument()
.append("_revision", new BsonInt64(1234))
.append("d", new BsonArray())
.append("s", new BsonDocument().append("Lorem ipsum", new BsonString(
"Lorem ipsum dolor sit amet, consectetur adipiscing elit, " +
"sed do eiusmod tempor incididunt ut labore et dolore magna aliqua."
)));
final var writeModel = ThingWriteModel.of(Metadata.of(THING_ID, 1234, null, null, null), document);

// GIVEN: updater is recovered with a write model
underTest.tell(writeModel, ActorRef.noSender());

// WHEN: updater is requested to compute incremental update of an older write model
underTest.tell(UpdateThing.of(THING_ID, UpdateReason.UNKNOWN, DittoHeaders.empty()), getRef());
final var olderWriteModel = ThingDeleteModel.of(Metadata.of(THING_ID, 1233, null, null, null));
underTest.tell(olderWriteModel, getRef());

// THEN: expect no update.
expectMsg(Done.done());
}};
}

@Test
public void forceUpdateOnSameSequenceNumber() {
new TestKit(actorSystem) {{
final Props props = Props.create(ThingUpdater.class,
() -> new ThingUpdater(pubSubTestProbe.ref(), changeQueueTestProbe.ref(), 0.0,
Duration.ZERO, 0.0, mongoClientExtension, false, true,
writeModel -> {}));
final var underTest = childActorOf(props, THING_ID.toString());

final var document = new BsonDocument()
.append("_revision", new BsonInt64(1234))
.append("d", new BsonArray())
.append("s", new BsonDocument().append("Lorem ipsum", new BsonString(
"Lorem ipsum dolor sit amet, consectetur adipiscing elit, " +
"sed do eiusmod tempor incididunt ut labore et dolore magna aliqua."
)));
final var writeModel = ThingWriteModel.of(Metadata.of(THING_ID, 1234, null, null, null), document);

// GIVEN: updater is recovered with a write model
underTest.tell(writeModel, ActorRef.noSender());

// WHEN: updater is requested to compute incremental update of an older write model
final var forceUpdateHeaders = DittoHeaders.newBuilder().putHeader("force-update", "true").build();
underTest.tell(UpdateThing.of(THING_ID, UpdateReason.UNKNOWN, forceUpdateHeaders), getRef());
final var olderWriteModel = ThingDeleteModel.of(Metadata.of(THING_ID, 1234, null, null, null));
underTest.tell(olderWriteModel, getRef());

// THEN: expect an update.
final var mongoWriteModel = expectMsgClass(MongoWriteModel.class);
assertThat(mongoWriteModel.getDitto()).isEqualTo(olderWriteModel);
}};
}

@Test
public void forceUpdateAfterInitialStart() throws InterruptedException {
new TestKit(actorSystem) {{
Expand Down

0 comments on commit 30e83a7

Please sign in to comment.