Skip to content

Commit

Permalink
Recover last write model when starting a ThingUpdater.
Browse files Browse the repository at this point in the history
Signed-off-by: Yufei Cai <yufei.cai@bosch.io>
  • Loading branch information
yufei-cai committed Oct 25, 2021
1 parent 083146a commit 8fdc6a0
Show file tree
Hide file tree
Showing 4 changed files with 148 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,17 @@
import org.bson.BsonDocument;
import org.bson.Document;
import org.bson.conversions.Bson;
import org.eclipse.ditto.json.JsonArray;
import org.eclipse.ditto.base.model.entity.id.EntityId;
import org.eclipse.ditto.base.model.signals.commands.exceptions.GatewayQueryTimeExceededException;
import org.eclipse.ditto.internal.models.streaming.LowerBound;
import org.eclipse.ditto.internal.utils.persistence.mongo.BsonUtil;
import org.eclipse.ditto.internal.utils.persistence.mongo.DittoMongoClient;
import org.eclipse.ditto.internal.utils.persistence.mongo.indices.IndexInitializer;
import org.eclipse.ditto.json.JsonArray;
import org.eclipse.ditto.policies.model.PolicyId;
import org.eclipse.ditto.rql.query.Query;
import org.eclipse.ditto.rql.query.SortOption;
import org.eclipse.ditto.things.model.ThingId;
import org.eclipse.ditto.internal.models.streaming.LowerBound;
import org.eclipse.ditto.thingsearch.api.SearchNamespaceReportResult;
import org.eclipse.ditto.thingsearch.api.SearchNamespaceResultEntry;
import org.eclipse.ditto.thingsearch.service.common.model.ResultList;
Expand All @@ -46,11 +50,11 @@
import org.eclipse.ditto.thingsearch.service.persistence.read.criteria.visitors.CreateBsonVisitor;
import org.eclipse.ditto.thingsearch.service.persistence.read.expression.visitors.GetSortBsonVisitor;
import org.eclipse.ditto.thingsearch.service.persistence.read.query.MongoQuery;
import org.eclipse.ditto.thingsearch.service.persistence.write.model.AbstractWriteModel;
import org.eclipse.ditto.thingsearch.service.persistence.write.model.Metadata;
import org.eclipse.ditto.internal.utils.persistence.mongo.BsonUtil;
import org.eclipse.ditto.internal.utils.persistence.mongo.DittoMongoClient;
import org.eclipse.ditto.internal.utils.persistence.mongo.indices.IndexInitializer;
import org.eclipse.ditto.base.model.signals.commands.exceptions.GatewayQueryTimeExceededException;
import org.eclipse.ditto.thingsearch.service.persistence.write.model.ThingDeleteModel;
import org.eclipse.ditto.thingsearch.service.persistence.write.model.ThingWriteModel;
import org.mongodb.scala.MongoClient;
import org.reactivestreams.Publisher;

import com.mongodb.MongoExecutionTimeoutException;
Expand Down Expand Up @@ -211,6 +215,23 @@ public Source<ThingId, NotUsed> findAllUnlimited(final Query query, final List<S
.idleTimeout(maxQueryTime);
}

/**
* Recover a write model from the persistence.
*
* @param thingId the thing ID.
* @return the last write model if the thing exists in the search index, or a {@code ThingDeleteModel} if the thing
* does not exist.
*/
public Source<AbstractWriteModel, NotUsed> recoverLastWriteModel(final ThingId thingId) {
final var metadata = Metadata.of(thingId, -1, null, null, null);
final var publisher = collection.find(Filters.eq(PersistenceConstants.FIELD_ID, thingId.toString())).limit(1);
final var emptySource =
Source.<AbstractWriteModel>single(ThingDeleteModel.of(metadata));
return Source.fromPublisher(publisher)
.map(document -> documentToWriteModel(metadata, document))
.orElse(emptySource);
}

private Source<Document, NotUsed> findAllInternal(final Query query, final List<String> authorizationSubjectIds,
@Nullable final Set<String> namespaces,
@Nullable final Integer limit,
Expand Down Expand Up @@ -256,7 +277,8 @@ public Source<Metadata, NotUsed> sudoStreamMetadata(final EntityId lowerBound) {
? notDeletedFilter
: Filters.and(notDeletedFilter, Filters.gt(PersistenceConstants.FIELD_ID, lowerBound.toString()));
final Bson relevantFieldsProjection =
Projections.include(PersistenceConstants.FIELD_ID, PersistenceConstants.FIELD_REVISION, PersistenceConstants.FIELD_POLICY_ID, PersistenceConstants.FIELD_POLICY_REVISION,
Projections.include(PersistenceConstants.FIELD_ID, PersistenceConstants.FIELD_REVISION,
PersistenceConstants.FIELD_POLICY_ID, PersistenceConstants.FIELD_POLICY_REVISION,
PersistenceConstants.FIELD_PATH_MODIFIED);
final Bson sortById = Sorts.ascending(PersistenceConstants.FIELD_ID);
final Publisher<Document> publisher = collection.find(filter)
Expand Down Expand Up @@ -324,13 +346,21 @@ private static PartialFunction<Throwable, Throwable> handleMongoExecutionTimeExc

private static Metadata readAsMetadata(final Document document) {
final ThingId thingId = ThingId.of(document.getString(PersistenceConstants.FIELD_ID));
final long thingRevision = Optional.ofNullable(document.getLong(PersistenceConstants.FIELD_REVISION)).orElse(0L);
final long thingRevision =
Optional.ofNullable(document.getLong(PersistenceConstants.FIELD_REVISION)).orElse(0L);
final String policyIdInPersistence = document.getString(PersistenceConstants.FIELD_POLICY_ID);
final PolicyId policyId = policyIdInPersistence.isEmpty() ? null : PolicyId.of(policyIdInPersistence);
final long policyRevision = Optional.ofNullable(document.getLong(PersistenceConstants.FIELD_POLICY_REVISION)).orElse(0L);
final String nullableTimestamp = document.getEmbedded(List.of(PersistenceConstants.FIELD_SORTING, PersistenceConstants.FIELD_MODIFIED), String.class);
final long policyRevision =
Optional.ofNullable(document.getLong(PersistenceConstants.FIELD_POLICY_REVISION)).orElse(0L);
final String nullableTimestamp =
document.getEmbedded(List.of(PersistenceConstants.FIELD_SORTING, PersistenceConstants.FIELD_MODIFIED),
String.class);
final Instant modified = Optional.ofNullable(nullableTimestamp).map(Instant::parse).orElse(null);
return Metadata.of(thingId, thingRevision, policyId, policyRevision, modified, null);
}

private static AbstractWriteModel documentToWriteModel(final Metadata metadata, final Document document) {
final var bsonDocument = document.toBsonDocument(Document.class, MongoClient.DEFAULT_CODEC_REGISTRY());
return ThingWriteModel.of(metadata, bsonDocument);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -174,10 +174,10 @@ private Sink<Source<AbstractWriteModel, NotUsed>, NotUsed> createRestartSink() {

private <T> Flow<Map<ThingId, T>, Map<ThingId, T>, NotUsed> filterMapKeysByBlockedNamespaces() {
return Flow.<Map<ThingId, T>>create()
.flatMapConcat(map ->
.<Map<ThingId, T>, NotUsed>flatMapConcat(map ->
Source.fromIterator(map.entrySet()::iterator)
.via(blockNamespaceFlow(entry -> entry.getKey().getNamespace()))
.<Map<ThingId, T>>fold(new HashMap<>(), (accumulator, entry) -> {
.fold(new HashMap<>(), (accumulator, entry) -> {
accumulator.put(entry.getKey(), entry.getValue());
return accumulator;
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,12 @@
import java.text.MessageFormat;
import java.util.List;
import java.util.Objects;
import java.util.Optional;

import javax.annotation.Nullable;

import org.bson.BsonDocument;
import org.bson.BsonInvalidOperationException;
import org.eclipse.ditto.base.model.acks.AcknowledgementRequest;
import org.eclipse.ditto.base.model.acks.DittoAcknowledgementLabel;
import org.eclipse.ditto.base.service.actors.ShutdownBehaviour;
Expand All @@ -43,27 +45,32 @@
import org.eclipse.ditto.thingsearch.api.commands.sudo.UpdateThingResponse;
import org.eclipse.ditto.thingsearch.service.common.config.DittoSearchConfig;
import org.eclipse.ditto.thingsearch.service.common.config.UpdaterConfig;
import org.eclipse.ditto.thingsearch.service.persistence.read.MongoThingsSearchPersistence;
import org.eclipse.ditto.thingsearch.service.persistence.write.mapping.BsonDiff;
import org.eclipse.ditto.thingsearch.service.persistence.write.model.AbstractWriteModel;
import org.eclipse.ditto.thingsearch.service.persistence.write.model.Metadata;
import org.eclipse.ditto.thingsearch.service.persistence.write.model.ThingDeleteModel;
import org.eclipse.ditto.thingsearch.service.persistence.write.model.ThingWriteModel;
import org.eclipse.ditto.thingsearch.service.persistence.write.streaming.ConsistencyLag;
import org.eclipse.ditto.thingsearch.service.starter.actors.MongoClientExtension;

import com.mongodb.client.model.UpdateOneModel;
import com.mongodb.client.model.WriteModel;

import akka.Done;
import akka.actor.AbstractActor;
import akka.actor.AbstractActorWithStash;
import akka.actor.ActorRef;
import akka.actor.PoisonPill;
import akka.actor.Props;
import akka.actor.ReceiveTimeout;
import akka.cluster.sharding.ShardRegion;
import akka.pattern.Patterns;
import akka.stream.javadsl.Sink;

/**
* This Actor initiates persistence updates related to 1 thing.
*/
final class ThingUpdater extends AbstractActor {
final class ThingUpdater extends AbstractActorWithStash {

private static final String FORCE_UPDATE = "force-update";

Expand All @@ -88,6 +95,15 @@ final class ThingUpdater extends AbstractActor {
private ThingUpdater(final ActorRef pubSubMediator,
final ActorRef changeQueueActor,
final double forceUpdateProbability) {
this(pubSubMediator, changeQueueActor, forceUpdateProbability, true, true);
}

ThingUpdater(final ActorRef pubSubMediator,
final ActorRef changeQueueActor,
final double forceUpdateProbability,
final boolean loadPreviousState,
final boolean awaitRecovery) {

log = DittoLoggerFactory.getDiagnosticLoggingAdapter(this);
final var dittoSearchConfig = DittoSearchConfig.of(
DefaultScopedConfig.dittoScoped(getContext().getSystem().settings().config())
Expand All @@ -98,6 +114,14 @@ private ThingUpdater(final ActorRef pubSubMediator,
this.forceUpdateProbability = forceUpdateProbability;

getContext().setReceiveTimeout(dittoSearchConfig.getUpdaterConfig().getMaxIdleTime());

if (loadPreviousState) {
recoverLastWriteModel(thingId);
} else if (!awaitRecovery) {
// Not loading the previous model is equivalent to initializing via a delete-one model.
final var noLastModel = ThingDeleteModel.of(Metadata.of(thingId, -1L, null, null, null));
getSelf().tell(noLastModel, getSelf());
}
}

/**
Expand All @@ -117,6 +141,21 @@ static Props props(final ActorRef pubSubMediator, final ActorRef changeQueueActo

@Override
public Receive createReceive() {
return shutdownBehaviour.createReceive()
.match(AbstractWriteModel.class, this::recoveryComplete)
.match(ReceiveTimeout.class, this::stopThisActor)
.matchAny(this::matchAnyDuringRecovery)
.build();
}

private void recoveryComplete(final AbstractWriteModel writeModel) {
log.debug("Recovered: <{}>", writeModel);
lastWriteModel = writeModel;
getContext().become(recoveredBehavior());
unstashAll();
}

private Receive recoveredBehavior() {
return shutdownBehaviour.createReceive()
.match(ThingEvent.class, this::processThingEvent)
.match(AbstractWriteModel.class, this::onNextWriteModel)
Expand All @@ -132,15 +171,20 @@ public Receive createReceive() {
.build();
}

private void matchAnyDuringRecovery(final Object message) {
log.debug("Stashing during initialization: <{}>", message);
stash();
}

private void onNextWriteModel(final AbstractWriteModel nextWriteModel) {
final WriteModel<BsonDocument> mongoWriteModel;
final boolean forceUpdate = forceUpdateProbability > 0 && Math.random() < forceUpdateProbability;
if (!forceUpdate && lastWriteModel instanceof ThingWriteModel && nextWriteModel instanceof ThingWriteModel) {
final var last = (ThingWriteModel) lastWriteModel;
final var next = (ThingWriteModel) nextWriteModel;
final var diff = BsonDiff.minusThingDocs(next.getThingDocument(), last.getThingDocument());
if (diff.isDiffSmaller()) {
final var aggregationPipeline = diff.consumeAndExport();
final Optional<BsonDiff> diff = tryComputeDiff(next.getThingDocument(), last.getThingDocument());
if (diff.isPresent() && diff.get().isDiffSmaller()) {
final var aggregationPipeline = diff.get().consumeAndExport();
if (aggregationPipeline.isEmpty()) {
log.debug("Skipping update due to empty diff <{}>", nextWriteModel);
getSender().tell(Done.getInstance(), getSelf());
Expand All @@ -151,7 +195,8 @@ private void onNextWriteModel(final AbstractWriteModel nextWriteModel) {
} else {
mongoWriteModel = nextWriteModel.toMongo();
if (log.isDebugEnabled()) {
log.debug("Using replacement because it is smaller. Diff=<{}>", diff.consumeAndExport());
log.debug("Using replacement because diff is bigger or nonexistent. Diff=<{}>",
diff.map(BsonDiff::consumeAndExport));
}
}
} else {
Expand All @@ -166,6 +211,15 @@ private void onNextWriteModel(final AbstractWriteModel nextWriteModel) {
lastWriteModel = nextWriteModel;
}

private Optional<BsonDiff> tryComputeDiff(final BsonDocument minuend, final BsonDocument subtrahend) {
try {
return Optional.of(BsonDiff.minusThingDocs(minuend, subtrahend));
} catch (BsonInvalidOperationException e) {
log.error(e, "Failed to compute BSON diff between <{}> and <{}>", minuend, subtrahend);
return Optional.empty();
}
}

private void stopThisActor(final ReceiveTimeout receiveTimeout) {
log.debug("stopping ThingUpdater <{}> due to <{}>", thingId, receiveTimeout);
getContext().getParent().tell(new ShardRegion.Passivate(PoisonPill.getInstance()), getSelf());
Expand Down Expand Up @@ -312,4 +366,13 @@ private void acknowledge(final IdentifiableStreamingMessage message) {
}
}

private void recoverLastWriteModel(final ThingId thingId) {
final var actorSystem = getContext().getSystem();
// using search client instead of updater client for READ to ensure consistency in case of shard migration
final var client = MongoClientExtension.get(actorSystem).getSearchClient();
final var searchPersistence = new MongoThingsSearchPersistence(client, actorSystem);
final var writeModelFuture = searchPersistence.recoverLastWriteModel(thingId).runWith(Sink.head(), actorSystem);
Patterns.pipe(writeModelFuture, getContext().getDispatcher()).to(getSelf());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@
import java.time.Instant;

import org.assertj.core.api.Assertions;
import org.bson.BsonArray;
import org.bson.BsonDocument;
import org.bson.BsonInt64;
import org.bson.BsonString;
import org.eclipse.ditto.base.api.common.Shutdown;
import org.eclipse.ditto.base.api.common.ShutdownReasonFactory;
import org.eclipse.ditto.base.model.headers.DittoHeaders;
Expand All @@ -29,17 +33,19 @@
import org.eclipse.ditto.things.model.ThingsModelFactory;
import org.eclipse.ditto.things.model.signals.events.ThingCreated;
import org.eclipse.ditto.things.model.signals.events.ThingModified;
import org.eclipse.ditto.thingsearch.service.common.config.DefaultUpdaterConfig;
import org.eclipse.ditto.thingsearch.service.persistence.write.model.Metadata;
import org.eclipse.ditto.thingsearch.service.persistence.write.model.ThingWriteModel;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;

import akka.Done;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.cluster.pubsub.DistributedPubSubMediator;
import akka.testkit.TestProbe;
import akka.testkit.javadsl.TestKit;
Expand Down Expand Up @@ -276,10 +282,37 @@ public void shutdownOnCommand() {

}

@Test
public void recoverLastWriteModel() {
new TestKit(actorSystem) {{
final Props props = Props.create(ThingUpdater.class,
() -> new ThingUpdater(pubSubTestProbe.ref(), changeQueueTestProbe.ref(), 0.0, false, true));
final var underTest = childActorOf(props, THING_ID.toString());

final var document = new BsonDocument()
.append("_revision", new BsonInt64(1234))
.append("d", new BsonArray())
.append("s", new BsonDocument().append("Lorem ipsum", new BsonString(
"Lorem ipsum dolor sit amet, consectetur adipiscing elit, " +
"sed do eiusmod tempor incididunt ut labore et dolore magna aliqua."
)));
final var writeModel = ThingWriteModel.of(Metadata.of(THING_ID, -1, null, null, null), document);

// GIVEN: updater is recovered with a write model
underTest.tell(writeModel, ActorRef.noSender());

// WHEN: updater is requested to compute incremental update against the same write model
underTest.tell(writeModel, getRef());

// THEN: expect no update.
expectMsg(Done.done());
}};
}

private ActorRef createThingUpdaterActor() {
return actorSystem.actorOf(ThingUpdater.props(pubSubTestProbe.ref(), changeQueueTestProbe.ref(),
DefaultUpdaterConfig.of(ConfigFactory.empty())),
THING_ID.toString());
final Props props = Props.create(ThingUpdater.class,
() -> new ThingUpdater(pubSubTestProbe.ref(), changeQueueTestProbe.ref(), 0.0, false, false));
return actorSystem.actorOf(props, THING_ID.toString());
}

}

0 comments on commit 8fdc6a0

Please sign in to comment.