Skip to content

Commit

Permalink
[wip] remove substreams from updater flow.
Browse files Browse the repository at this point in the history
Signed-off-by: Yufei Cai <yufei.cai@bosch.io>
  • Loading branch information
yufei-cai committed Apr 1, 2022
1 parent 9a2956e commit 27e8fe1
Show file tree
Hide file tree
Showing 15 changed files with 711 additions and 522 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ public Source<ThingId, NotUsed> findAllUnlimited(final Query query, final List<S
* does not exist.
*/
public Source<AbstractWriteModel, NotUsed> recoverLastWriteModel(final ThingId thingId) {
final var metadata = Metadata.of(thingId, -1, null, null, null);
final var metadata = Metadata.ofDeleted(thingId);
final var publisher = collection.find(Filters.eq(PersistenceConstants.FIELD_ID, thingId.toString())).limit(1);
final var emptySource =
Source.<AbstractWriteModel>single(ThingDeleteModel.of(metadata));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,11 @@ public final CompletionStage<Optional<MongoWriteModel>> toIncrementalMongo() {
}
}

// TODO
public Optional<MongoWriteModel> toIncrementalMongo(@Nullable final AbstractWriteModel previousWriteModel) {
return Optional.of(MongoWriteModel.of(this, toMongo(), false));
}

/**
* @return Metadata of this write model.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,16 @@ public static Metadata of(final ThingId thingId,
List.of(UpdateReason.UNKNOWN));
}

/**
* Return a Metadata object for a deleted Thing.
*
* @param thingId the ID of the deleted thing.
* @return the Metadata object.
*/
public static Metadata ofDeleted(final ThingId thingId) {
return Metadata.of(thingId, -1, null, null, null);
}

/**
* Recover the metadata from an UpdateThingResponse.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,18 +13,23 @@
package org.eclipse.ditto.thingsearch.service.persistence.write.model;

import java.util.Objects;
import java.util.Optional;

import javax.annotation.Nullable;
import javax.annotation.concurrent.NotThreadSafe;

import org.bson.BsonDocument;
import org.bson.BsonInvalidOperationException;
import org.bson.conversions.Bson;
import org.eclipse.ditto.thingsearch.service.persistence.PersistenceConstants;
import org.eclipse.ditto.thingsearch.service.persistence.write.mapping.BsonDiff;
import org.eclipse.ditto.thingsearch.service.updater.actors.MongoWriteModel;
import org.mongodb.scala.bson.BsonNumber;

import com.mongodb.client.model.Filters;
import com.mongodb.client.model.ReplaceOneModel;
import com.mongodb.client.model.ReplaceOptions;
import com.mongodb.client.model.UpdateOneModel;
import com.mongodb.client.model.WriteModel;

/**
Expand Down Expand Up @@ -56,6 +61,15 @@ public static ThingWriteModel of(final Metadata metadata, final BsonDocument thi
return new ThingWriteModel(metadata, thingDocument, false, 0L);
}

@Override
public Optional<MongoWriteModel> toIncrementalMongo(@Nullable final AbstractWriteModel previousWriteModel) {
if (previousWriteModel instanceof ThingWriteModel thingWriteModel) {
return computeDiff(thingWriteModel);
} else {
return super.toIncrementalMongo(previousWriteModel);
}
}

/**
* Return a copy of this object as patch update.
*
Expand Down Expand Up @@ -131,4 +145,73 @@ public String toString() {
"]";
}

private Optional<MongoWriteModel> computeDiff(final ThingWriteModel lastWriteModel) {
final WriteModel<BsonDocument> mongoWriteModel;
final boolean isPatchUpdate;

if (isNextWriteModelOutDated(lastWriteModel, this)) {
// TODO: more informative error
throw new IllegalStateException("Received out-of-date write model");
}
final Optional<BsonDiff> diff = tryComputeDiff(getThingDocument(), lastWriteModel.getThingDocument());
if (diff.isPresent() && diff.get().isDiffSmaller()) {
final var aggregationPipeline = diff.get().consumeAndExport();
if (aggregationPipeline.isEmpty()) {
// TODO: logging + metrics
// skipNextUpdate(this, "empty diff");
return Optional.empty();
}
final var filter = asPatchUpdate(lastWriteModel.getMetadata().getThingRevision()).getFilter();
mongoWriteModel = new UpdateOneModel<>(filter, aggregationPipeline);
// TODO: logging + metrics
// log.debug("Using incremental update <{}>", mongoWriteModel.getClass().getSimpleName());
// LOGGER.trace("Using incremental update <{}>", mongoWriteModel);
// PATCH_UPDATE_COUNT.increment();
isPatchUpdate = true;
} else {
mongoWriteModel = this.toMongo();
// TODO: logging + metrics
// log.debug("Using replacement because diff is bigger or nonexistent: <{}>",
// mongoWriteModel.getClass().getSimpleName());
// if (LOGGER.isTraceEnabled()) {
// LOGGER.trace("Using replacement because diff is bigger or nonexistent. Diff=<{}>",
// diff.map(BsonDiff::consumeAndExport));
// }
// FULL_UPDATE_COUNT.increment();
isPatchUpdate = false;
}
return Optional.of(MongoWriteModel.of(this, mongoWriteModel, isPatchUpdate));
}

private Optional<BsonDiff> tryComputeDiff(final BsonDocument minuend, final BsonDocument subtrahend) {
try {
return Optional.of(BsonDiff.minusThingDocs(minuend, subtrahend));
} catch (final BsonInvalidOperationException e) {
// TODO add logging
// log.error(e, "Failed to compute BSON diff between <{}> and <{}>", minuend, subtrahend);

return Optional.empty();
}
}

private static boolean isNextWriteModelOutDated(@Nullable final AbstractWriteModel lastWriteModel,
final AbstractWriteModel nextWriteModel) {

if (lastWriteModel == null) {
return false;
} else {
final var lastMetadata = lastWriteModel.getMetadata();
final var nextMetadata = nextWriteModel.getMetadata();
final boolean isStrictlyOlder = nextMetadata.getThingRevision() < lastMetadata.getThingRevision() ||
nextMetadata.getThingRevision() == lastMetadata.getThingRevision() &&
nextMetadata.getPolicyRevision().flatMap(nextPolicyRevision ->
lastMetadata.getPolicyRevision().map(lastPolicyRevision ->
nextPolicyRevision < lastPolicyRevision))
.orElse(false);
final boolean hasSameRevisions = nextMetadata.getThingRevision() == lastMetadata.getThingRevision() &&
nextMetadata.getPolicyRevision().equals(lastMetadata.getPolicyRevision());

return isStrictlyOlder || hasSameRevisions;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,9 @@
import org.eclipse.ditto.thingsearch.service.persistence.write.model.AbstractWriteModel;
import org.eclipse.ditto.thingsearch.service.persistence.write.model.Metadata;
import org.eclipse.ditto.thingsearch.service.persistence.write.model.ThingDeleteModel;
import org.eclipse.ditto.thingsearch.service.updater.actors.MongoWriteModel;
import org.eclipse.ditto.thingsearch.service.updater.actors.SearchUpdateObserver;
import org.eclipse.ditto.thingsearch.service.updater.actors.ThingUpdater;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -61,7 +63,6 @@
import akka.stream.javadsl.Flow;
import akka.stream.javadsl.Keep;
import akka.stream.javadsl.Source;
import akka.stream.javadsl.SubSource;

/**
* Converts Thing changes into write models by retrieving data and applying enforcement via an enforcer cache.
Expand Down Expand Up @@ -158,17 +159,14 @@ private static boolean shouldReloadCache(@Nullable final Entry<?> entry, final M
* @param source the source of change maps.
* @param parallelismPerBulkShard how many thing retrieves to perform in parallel to the caching facade per bulk
* shard.
* @param bulkShardCount the configured amount of shards to create substreams for.
* @return the flow.
*/
public <T> SubSource<List<AbstractWriteModel>, T> create(
public <T> Source<List<AbstractWriteModel>, T> create(
final Source<Collection<Metadata>, T> source,
final int parallelismPerBulkShard,
final int bulkShardCount,
final int maxBulkSize) {

return source.flatMapConcat(changes -> Source.fromIterator(changes::iterator)
.groupBy(bulkShardCount, metadata -> Math.floorMod(metadata.getThingId().hashCode(), bulkShardCount))
.flatMapMerge(parallelismPerBulkShard, changedMetadata ->
retrieveThingFromCachingFacade(changedMetadata.getThingId(), changedMetadata)
.flatMapConcat(pair -> {
Expand All @@ -177,11 +175,27 @@ public <T> SubSource<List<AbstractWriteModel>, T> create(
return computeWriteModel(changedMetadata, thing);
})
)
.grouped(maxBulkSize)
.mergeSubstreams())
.filterNot(List::isEmpty)
.groupBy(bulkShardCount, models ->
Math.floorMod(models.get(0).getMetadata().getThingId().hashCode(), bulkShardCount));
.grouped(maxBulkSize))
.filterNot(List::isEmpty);
}

// TODO
public Flow<ThingUpdater.Data, MongoWriteModel, NotUsed> create() {
return Flow.<ThingUpdater.Data>create()
.flatMapConcat(data -> retrieveThingFromCachingFacade(data.metadata().getThingId(), data.metadata())
.flatMapConcat(pair -> {
final JsonObject thing = pair.second();
searchUpdateObserver.process(data.metadata(), thing);
return computeWriteModel(data.metadata(), thing);
})
// TODO: searchUpdateMapper
.flatMapConcat(writeModel -> writeModel.toIncrementalMongo(data.lastWriteModel().orElse(null))
.map(Source::single)
.orElseGet(() -> {
data.metadata().sendWeakAck(null);
return Source.empty();
}))
);
}

private Source<Pair<ThingId, JsonObject>, NotUsed> retrieveThingFromCachingFacade(final ThingId thingId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.eclipse.ditto.thingsearch.service.persistence.write.model.AbstractWriteModel;
import org.eclipse.ditto.thingsearch.service.persistence.write.model.WriteResultAndErrors;
import org.eclipse.ditto.thingsearch.service.updater.actors.MongoWriteModel;
import org.eclipse.ditto.thingsearch.service.updater.actors.ThingUpdater;

import com.mongodb.MongoBulkWriteException;
import com.mongodb.client.model.BulkWriteOptions;
Expand All @@ -42,6 +43,7 @@

import akka.NotUsed;
import akka.japi.pf.PFBuilder;
import akka.stream.javadsl.Flow;
import akka.stream.javadsl.Source;
import akka.stream.javadsl.SubSource;

Expand Down Expand Up @@ -98,15 +100,22 @@ public static MongoSearchUpdaterFlow of(final MongoDatabase database,
* {@link org.eclipse.ditto.base.model.acks.DittoAcknowledgementLabel#SEARCH_PERSISTED} was required or not.
* @return sub-source of write results.
*/
public SubSource<WriteResultAndErrors, NotUsed> start(
final SubSource<List<AbstractWriteModel>, NotUsed> subSource,
public Source<WriteResultAndErrors, NotUsed> start(
final Source<List<AbstractWriteModel>, NotUsed> subSource,
final boolean shouldAcknowledge) {

return subSource.map(MongoSearchUpdaterFlow::sortBySeqNr)
.flatMapConcat(searchUpdateMapper::processWriteModels)
.flatMapConcat(writeModels -> executeBulkWrite(shouldAcknowledge, writeModels));
}

// TODO
public Flow<MongoWriteModel, ThingUpdater.Result, NotUsed> create() {
return Flow.<MongoWriteModel>create()
.flatMapConcat(writeModel -> executeBulkWrite(true, List.of(writeModel))
.map(resultOrErrors -> new ThingUpdater.Result(writeModel, resultOrErrors)));
}

private Source<WriteResultAndErrors, NotUsed> executeBulkWrite(final boolean shouldAcknowledge,
final Collection<MongoWriteModel> writeModels) {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.eclipse.ditto.thingsearch.service.persistence.write.model.AbstractWriteModel;
import org.eclipse.ditto.thingsearch.service.persistence.write.model.Metadata;
import org.eclipse.ditto.thingsearch.service.persistence.write.model.WriteResultAndErrors;
import org.eclipse.ditto.thingsearch.service.updater.actors.ThingUpdater;

import com.mongodb.reactivestreams.client.MongoDatabase;

Expand All @@ -34,13 +35,10 @@
import akka.stream.Attributes;
import akka.stream.KillSwitch;
import akka.stream.KillSwitches;
import akka.stream.RestartSettings;
import akka.stream.javadsl.Flow;
import akka.stream.javadsl.Keep;
import akka.stream.javadsl.RestartSource;
import akka.stream.javadsl.Sink;
import akka.stream.javadsl.Source;
import akka.stream.javadsl.SubSource;

/**
* Stream from the cache of Thing changes to the persistence of the search index.
Expand Down Expand Up @@ -120,14 +118,18 @@ public static SearchUpdaterStream of(final UpdaterConfig updaterConfig,
* @return kill-switch to terminate the stream.
*/
public KillSwitch start() {
return createRestartResultSource()
.flatMapConcat(SubSource::mergeSubstreams)
return createSource()
.viaMat(KillSwitches.single(), Keep.right())
.to(Sink.ignore())
.run(actorSystem);
}

private Source<SubSource<String, NotUsed>, NotUsed> createRestartResultSource() {
// TODO
public Flow<ThingUpdater.Data, ThingUpdater.Result, NotUsed> flow() {
return enforcementFlow.create().via(mongoSearchUpdaterFlow.create());
}

private Source<String, NotUsed> createSource() {
final var streamConfig = updaterConfig.getStreamConfig();
final StreamStageConfig retrievalConfig = streamConfig.getRetrievalConfig();
final PersistenceStreamConfig persistenceConfig = streamConfig.getPersistenceConfig();
Expand All @@ -140,30 +142,24 @@ private Source<SubSource<String, NotUsed>, NotUsed> createRestartResultSource()
final var mergedSource =
acknowledgedSource.mergePrioritized(unacknowledgedSource, 1023, 1, true);

final SubSource<List<AbstractWriteModel>, NotUsed> enforcementSource = enforcementFlow.create(
final Source<List<AbstractWriteModel>, NotUsed> enforcementSource = enforcementFlow.create(
mergedSource.via(filterMapKeysByBlockedNamespaces()),
retrievalConfig.getParallelism(),
persistenceConfig.getParallelism(),
persistenceConfig.getMaxBulkSize()
);

final String logName = "SearchUpdaterStream/BulkWriteResult";
final SubSource<WriteResultAndErrors, NotUsed> persistenceSource = mongoSearchUpdaterFlow.start(
final Source<WriteResultAndErrors, NotUsed> persistenceSource = mongoSearchUpdaterFlow.start(
enforcementSource,
true
);
final SubSource<String, NotUsed> loggingSource =
persistenceSource.via(bulkWriteResultAckFlow.start(persistenceConfig.getAckDelay()))
.log(logName)
.withAttributes(Attributes.logLevels(
Attributes.logLevelInfo(),
Attributes.logLevelWarning(),
Attributes.logLevelError()));

final var backOffConfig = retrievalConfig.getExponentialBackOffConfig();
return RestartSource.withBackoff(
RestartSettings.create(backOffConfig.getMin(), backOffConfig.getMax(), backOffConfig.getRandomFactor()),
() -> Source.single(loggingSource));

return persistenceSource.via(bulkWriteResultAckFlow.start(persistenceConfig.getAckDelay()))
.log(logName)
.withAttributes(Attributes.logLevels(
Attributes.logLevelInfo(),
Attributes.logLevelWarning(),
Attributes.logLevelError()));
}

private Flow<Collection<Metadata>, Collection<Metadata>, NotUsed> filterMapKeysByBlockedNamespaces() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,10 +75,8 @@ public Source<WriteResultAndErrors, NotUsed> write(final Thing thing,
final AbstractWriteModel writeModel = EnforcedThingMapper.toWriteModel(thingJson, policy, policyRevision, -1,
null);

final var source = Source.single(writeModel)
.groupBy(1, foo -> 0)
.grouped(1);
return mongoSearchUpdaterFlow.start(source, false).mergeSubstreams();
final var source = Source.single(writeModel).grouped(1);
return mongoSearchUpdaterFlow.start(source, false);
}

/**
Expand All @@ -103,10 +101,8 @@ public Source<WriteResultAndErrors, NotUsed> delete(final ThingId thingId, final
*/
private Source<WriteResultAndErrors, NotUsed> delete(final Metadata metadata) {
final AbstractWriteModel writeModel = ThingDeleteModel.of(metadata);
final var source = Source.single(writeModel)
.groupBy(1, foo -> 0)
.grouped(1);
return mongoSearchUpdaterFlow.start(source, false).mergeSubstreams();
final var source = Source.single(writeModel).grouped(1);
return mongoSearchUpdaterFlow.start(source, false);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ private SearchUpdaterRootActor(final SearchConfig searchConfig,
log.warning("Event processing is disabled!");
}

final var thingUpdaterProps = ThingUpdater.props(pubSubMediator, changeQueueActor, updaterConfig);
final var thingUpdaterProps = ThingUpdaterOld.props(pubSubMediator, changeQueueActor, updaterConfig);

final ActorRef thingsShard = shardRegionFactory.getThingsShardRegion(numberOfShards);
final ActorRef policiesShard = shardRegionFactory.getPoliciesShardRegion(numberOfShards);
Expand Down
Loading

0 comments on commit 27e8fe1

Please sign in to comment.