Skip to content

Commit

Permalink
Fix some TODOs for search update.
Browse files Browse the repository at this point in the history
Signed-off-by: Yufei Cai <yufei.cai@bosch.io>
  • Loading branch information
yufei-cai committed Apr 9, 2022
1 parent a736eca commit c68eb06
Show file tree
Hide file tree
Showing 8 changed files with 109 additions and 100 deletions.
Expand Up @@ -54,7 +54,13 @@ protected AbstractWriteModel(final Metadata metadata) {
*/
public abstract WriteModel<BsonDocument> toMongo();

// TODO
/**
* Convert this into a MongoDB write model taking the previous update into consideration.
*
* @param previousWriteModel The previous write model.
* @return Either the MongoDB write model of this object or an incremental update converting the document of
* the previous model into this one.
*/
public Optional<MongoWriteModel> toIncrementalMongo(@Nullable final AbstractWriteModel previousWriteModel) {
return Optional.of(MongoWriteModel.of(this, toMongo(), false));
}
Expand Down
Expand Up @@ -21,9 +21,14 @@
import org.bson.BsonDocument;
import org.bson.BsonInvalidOperationException;
import org.bson.conversions.Bson;
import org.eclipse.ditto.internal.utils.akka.logging.DittoLoggerFactory;
import org.eclipse.ditto.internal.utils.akka.logging.ThreadSafeDittoLogger;
import org.eclipse.ditto.internal.utils.metrics.DittoMetrics;
import org.eclipse.ditto.internal.utils.metrics.instruments.counter.Counter;
import org.eclipse.ditto.thingsearch.service.persistence.PersistenceConstants;
import org.eclipse.ditto.thingsearch.service.persistence.write.mapping.BsonDiff;
import org.eclipse.ditto.thingsearch.service.updater.actors.MongoWriteModel;
import org.eclipse.ditto.thingsearch.service.updater.actors.ThingUpdater;
import org.mongodb.scala.bson.BsonNumber;

import com.mongodb.client.model.Filters;
Expand All @@ -38,6 +43,12 @@
@NotThreadSafe
public final class ThingWriteModel extends AbstractWriteModel {

private static final ThreadSafeDittoLogger LOGGER = DittoLoggerFactory.getThreadSafeLogger(ThingUpdater.class);

private static final Counter PATCH_UPDATE_COUNT = DittoMetrics.counter("search_patch_updates");
private static final Counter PATCH_SKIP_COUNT = DittoMetrics.counter("search_patch_skips");
private static final Counter FULL_UPDATE_COUNT = DittoMetrics.counter("search_full_updates");

private final BsonDocument thingDocument;
private final boolean isPatchUpdate;
private final long previousRevision;
Expand Down Expand Up @@ -150,34 +161,34 @@ private Optional<MongoWriteModel> computeDiff(final ThingWriteModel lastWriteMod
final boolean isPatchUpdate;

if (isNextWriteModelOutDated(lastWriteModel, this)) {
// TODO: more informative error
throw new IllegalStateException("Received out-of-date write model");
throw new IllegalStateException(
String.format("Received out-of-date write model. this=<%s>, lastWriteModel=<%s>", this,
lastWriteModel));
}
final Optional<BsonDiff> diff = tryComputeDiff(getThingDocument(), lastWriteModel.getThingDocument());
if (diff.isPresent() && diff.get().isDiffSmaller()) {
final var aggregationPipeline = diff.get().consumeAndExport();
if (aggregationPipeline.isEmpty()) {
// TODO: logging + metrics
// skipNextUpdate(this, "empty diff");
LOGGER.debug("Skipping update due to {} <{}>", "empty diff", ((AbstractWriteModel) this).getClass().getSimpleName());
LOGGER.trace("Skipping update due to {} <{}>", "empty diff", this);
PATCH_SKIP_COUNT.increment();
return Optional.empty();
}
final var filter = asPatchUpdate(lastWriteModel.getMetadata().getThingRevision()).getFilter();
mongoWriteModel = new UpdateOneModel<>(filter, aggregationPipeline);
// TODO: logging + metrics
// log.debug("Using incremental update <{}>", mongoWriteModel.getClass().getSimpleName());
// LOGGER.trace("Using incremental update <{}>", mongoWriteModel);
// PATCH_UPDATE_COUNT.increment();
LOGGER.debug("Using incremental update <{}>", mongoWriteModel.getClass().getSimpleName());
LOGGER.trace("Using incremental update <{}>", mongoWriteModel);
PATCH_UPDATE_COUNT.increment();
isPatchUpdate = true;
} else {
mongoWriteModel = this.toMongo();
// TODO: logging + metrics
// log.debug("Using replacement because diff is bigger or nonexistent: <{}>",
// mongoWriteModel.getClass().getSimpleName());
// if (LOGGER.isTraceEnabled()) {
// LOGGER.trace("Using replacement because diff is bigger or nonexistent. Diff=<{}>",
// diff.map(BsonDiff::consumeAndExport));
// }
// FULL_UPDATE_COUNT.increment();
LOGGER.debug("Using replacement because diff is bigger or nonexistent: <{}>",
mongoWriteModel.getClass().getSimpleName());
if (LOGGER.isTraceEnabled()) {
LOGGER.trace("Using replacement because diff is bigger or nonexistent. Diff=<{}>",
diff.map(BsonDiff::consumeAndExport));
}
FULL_UPDATE_COUNT.increment();
isPatchUpdate = false;
}
return Optional.of(MongoWriteModel.of(this, mongoWriteModel, isPatchUpdate));
Expand All @@ -187,31 +198,25 @@ private Optional<BsonDiff> tryComputeDiff(final BsonDocument minuend, final Bson
try {
return Optional.of(BsonDiff.minusThingDocs(minuend, subtrahend));
} catch (final BsonInvalidOperationException e) {
// TODO add logging
// log.error(e, "Failed to compute BSON diff between <{}> and <{}>", minuend, subtrahend);

LOGGER.error("Failed to compute BSON diff between <{}> and <{}>", minuend, subtrahend, e);
return Optional.empty();
}
}

private static boolean isNextWriteModelOutDated(@Nullable final AbstractWriteModel lastWriteModel,
private static boolean isNextWriteModelOutDated(final AbstractWriteModel lastWriteModel,
final AbstractWriteModel nextWriteModel) {

if (lastWriteModel == null) {
return false;
} else {
final var lastMetadata = lastWriteModel.getMetadata();
final var nextMetadata = nextWriteModel.getMetadata();
final boolean isStrictlyOlder = nextMetadata.getThingRevision() < lastMetadata.getThingRevision() ||
nextMetadata.getThingRevision() == lastMetadata.getThingRevision() &&
nextMetadata.getPolicyRevision().flatMap(nextPolicyRevision ->
lastMetadata.getPolicyRevision().map(lastPolicyRevision ->
nextPolicyRevision < lastPolicyRevision))
.orElse(false);
final boolean hasSameRevisions = nextMetadata.getThingRevision() == lastMetadata.getThingRevision() &&
nextMetadata.getPolicyRevision().equals(lastMetadata.getPolicyRevision());

return isStrictlyOlder || hasSameRevisions;
}
final var lastMetadata = lastWriteModel.getMetadata();
final var nextMetadata = nextWriteModel.getMetadata();
final boolean isStrictlyOlder = nextMetadata.getThingRevision() < lastMetadata.getThingRevision() ||
nextMetadata.getThingRevision() == lastMetadata.getThingRevision() &&
nextMetadata.getPolicyRevision().flatMap(nextPolicyRevision ->
lastMetadata.getPolicyRevision().map(lastPolicyRevision ->
nextPolicyRevision < lastPolicyRevision))
.orElse(false);
final boolean hasSameRevisions = nextMetadata.getThingRevision() == lastMetadata.getThingRevision() &&
nextMetadata.getPolicyRevision().equals(lastMetadata.getPolicyRevision());

return isStrictlyOlder || hasSameRevisions;
}
}
Expand Up @@ -20,12 +20,10 @@
import java.util.List;
import java.util.Optional;

import org.eclipse.ditto.base.model.headers.DittoHeaders;
import org.eclipse.ditto.internal.utils.akka.logging.DittoLoggerFactory;
import org.eclipse.ditto.internal.utils.akka.logging.ThreadSafeDittoLogger;
import org.eclipse.ditto.internal.utils.metrics.DittoMetrics;
import org.eclipse.ditto.internal.utils.metrics.instruments.counter.Counter;
import org.eclipse.ditto.thingsearch.api.commands.sudo.UpdateThingResponse;
import org.eclipse.ditto.thingsearch.service.persistence.write.model.AbstractWriteModel;
import org.eclipse.ditto.thingsearch.service.persistence.write.model.Metadata;
import org.eclipse.ditto.thingsearch.service.persistence.write.model.ThingDeleteModel;
Expand All @@ -37,7 +35,6 @@
import com.mongodb.bulk.BulkWriteResult;

import akka.NotUsed;
import akka.actor.ActorRef;
import akka.japi.Pair;
import akka.stream.javadsl.Flow;

Expand All @@ -51,17 +48,18 @@ public final class BulkWriteResultAckFlow {
private static final ThreadSafeDittoLogger LOGGER =
DittoLoggerFactory.getThreadSafeLogger(BulkWriteResultAckFlow.class);

private static final DittoHeaders INCORRECT_PATCH_HEADERS = DittoHeaders.newBuilder()
.putHeader(SearchUpdaterStream.FORCE_UPDATE_INCORRECT_PATCH, "true")
.build();

private BulkWriteResultAckFlow() {}

static Flow<WriteResultAndErrors, Pair<Status, List<String>>, NotUsed> start() {
return Flow.<WriteResultAndErrors>create().map(BulkWriteResultAckFlow::checkBulkWriteResult);
}

// TODO
/**
* Check the result of an update operation, acknowledge successes and failures, and generate a report.
*
* @param writeResultAndErrors The result of an update operation.
* @return The report.
*/
public static Pair<Status, List<String>> checkBulkWriteResult(final WriteResultAndErrors writeResultAndErrors) {

if (wasNotAcknowledged(writeResultAndErrors)) {
Expand Down Expand Up @@ -131,14 +129,13 @@ private static List<String> acknowledgeSuccessesAndFailures(final WriteResultAnd
}
}
acknowledgeFailures(failedMetadata);
acknowledgeSuccesses(failedIndices, writeResultAndErrors.getBulkWriteCorrelationId(),
acknowledgeSuccesses(failedIndices,
writeResultAndErrors.getWriteModels());

return logEntries;
}

private static void acknowledgeSuccesses(final BitSet failedIndices, final String bulkWriteCorrelationId,
final List<MongoWriteModel> writeModels) {
private static void acknowledgeSuccesses(final BitSet failedIndices, final List<MongoWriteModel> writeModels) {
for (int i = 0; i < writeModels.size(); ++i) {
if (!failedIndices.get(i)) {
writeModels.get(i).getDitto().getMetadata().sendAck();
Expand All @@ -153,21 +150,6 @@ private static void acknowledgeFailures(final Collection<Metadata> metadataList)
}
}

private static Flow<WriteResultAndErrors, WriteResultAndErrors, NotUsed> getDelayFlow() {
return Flow.create();
}

private static UpdateThingResponse createFailureResponse(final Metadata metadata, final DittoHeaders dittoHeaders) {
return UpdateThingResponse.of(
metadata.getThingId(),
metadata.getThingRevision(),
metadata.getPolicyId().orElse(null),
metadata.getPolicyId().flatMap(policyId -> metadata.getPolicyRevision()).orElse(null),
false,
dittoHeaders
);
}

private static boolean wasNotAcknowledged(final WriteResultAndErrors writeResultAndErrors) {
return !writeResultAndErrors.getBulkWriteResult().wasAcknowledged();
}
Expand Down
Expand Up @@ -35,15 +35,10 @@ public final class ConsistencyLag {
*/
public static final String TAG_SHOULD_ACK = "should_ack";

/**
* Name of the segment spent before leaving search updater
*/
public static final String S0_IN_UPDATER = "s0_in_updater";

/**
* Name of the segment in change queue.
*/
public static final String S1_IN_CHANGE_QUEUE = "s1_in_change_queue";
public static final String S1_IN_UPDATER = "s1_in_updater";

/**
* Name of the segment waiting for demand after leaving the change queue actor before downstream demand.
Expand Down Expand Up @@ -74,22 +69,13 @@ private ConsistencyLag() {
throw new AssertionError();
}

/**
* Start the segment for time spent in a thing updater.
*
* @param timer the timer.
*/
public static void startS0InUpdater(final StartedTimer timer) {
timer.startNewSegment(S0_IN_UPDATER);
}

/**
* Start the segment for time spent in change queue.
*
* @param metadata the metadata.
* @param timer the timer.
*/
public static void startS1InChangeQueue(final Metadata metadata) {
stopAndStartSegments(metadata, S0_IN_UPDATER, S1_IN_CHANGE_QUEUE);
public static void startS1InUpdater(final StartedTimer timer) {
timer.startNewSegment(S1_IN_UPDATER);
}

/**
Expand All @@ -98,7 +84,7 @@ public static void startS1InChangeQueue(final Metadata metadata) {
* @param metadata the metadata.
*/
public static void startS2WaitForDemand(final Metadata metadata) {
stopAndStartSegments(metadata, S1_IN_CHANGE_QUEUE, S2_WAIT_FOR_DEMAND);
stopAndStartSegments(metadata, S1_IN_UPDATER, S2_WAIT_FOR_DEMAND);
}

/**
Expand Down
Expand Up @@ -75,23 +75,20 @@ final class EnforcementFlow {
private final CachingSignalEnrichmentFacade thingsFacade;
private final Cache<EnforcementCacheKey, Entry<Policy>> policyEnforcerCache;
private final Duration cacheRetryDelay;
private final int maxArraySize;
private final SearchUpdateObserver searchUpdateObserver;

private EnforcementFlow(final ActorSystem actorSystem,
final ActorRef thingsShardRegion,
final Cache<EnforcementCacheKey, Entry<Policy>> policyEnforcerCache,
final AskWithRetryConfig askWithRetryConfig,
final StreamCacheConfig thingCacheConfig,
final int maxArraySize,
final Executor thingCacheDispatcher) {

thingsFacade = createThingsFacade(actorSystem, thingsShardRegion, askWithRetryConfig.getAskTimeout(),
thingCacheConfig, thingCacheDispatcher);
this.policyEnforcerCache = policyEnforcerCache;
searchUpdateObserver = SearchUpdateObserver.get(actorSystem);
cacheRetryDelay = thingCacheConfig.getRetryDelay();
this.maxArraySize = maxArraySize;
}

/**
Expand Down Expand Up @@ -125,7 +122,7 @@ public static EnforcementFlow of(final ActorSystem actorSystem,
final var thingCacheDispatcher = actorSystem.dispatchers()
.lookup(thingCacheConfig.getDispatcherName());
return new EnforcementFlow(actorSystem, thingsShardRegion, policyEnforcerCache, askWithRetryConfig,
thingCacheConfig, updaterStreamConfig.getMaxArraySize(), thingCacheDispatcher);
thingCacheConfig, thingCacheDispatcher);
}

private static EnforcementCacheKey getPolicyCacheKey(final PolicyId policyId) {
Expand Down Expand Up @@ -179,7 +176,12 @@ public <T> Source<List<AbstractWriteModel>, T> create(
.filterNot(List::isEmpty);
}

// TODO
/**
* Create an enforcement flow for a thing-updater.
*
* @param mapper The search-update mapper.
* @return The enforcement flow.
*/
public Flow<ThingUpdater.Data, MongoWriteModel, NotUsed> create(final SearchUpdateMapper mapper) {
return Flow.<ThingUpdater.Data>create()
.flatMapConcat(data -> retrieveThingFromCachingFacade(data.metadata().getThingId(), data.metadata())
Expand Down
Expand Up @@ -13,7 +13,6 @@
package org.eclipse.ditto.thingsearch.service.persistence.write.streaming;

import java.util.Collection;
import java.util.Comparator;
import java.util.List;
import java.util.UUID;
import java.util.stream.Collectors;
Expand All @@ -25,7 +24,6 @@
import org.eclipse.ditto.internal.utils.metrics.instruments.timer.StartedTimer;
import org.eclipse.ditto.thingsearch.service.common.config.PersistenceStreamConfig;
import org.eclipse.ditto.thingsearch.service.persistence.PersistenceConstants;
import org.eclipse.ditto.thingsearch.service.persistence.write.model.AbstractWriteModel;
import org.eclipse.ditto.thingsearch.service.persistence.write.model.WriteResultAndErrors;
import org.eclipse.ditto.thingsearch.service.updater.actors.MongoWriteModel;
import org.eclipse.ditto.thingsearch.service.updater.actors.ThingUpdater;
Expand Down Expand Up @@ -82,7 +80,11 @@ public static MongoSearchUpdaterFlow of(final MongoDatabase database,
);
}

// TODO
/**
* Create a flow that performs the database operation described by a MongoWriteModel.
*
* @return The flow.
*/
public Flow<MongoWriteModel, ThingUpdater.Result, NotUsed> create() {
return Flow.<MongoWriteModel>create()
.flatMapConcat(writeModel -> executeBulkWrite(List.of(writeModel))
Expand Down
Expand Up @@ -34,11 +34,6 @@
*/
public final class SearchUpdaterStream {

/**
* Header to request this actor to perform a force-update due to a previous patch not being applied.
*/
public static final String FORCE_UPDATE_INCORRECT_PATCH = "force-update-incorrect-patch";

private final EnforcementFlow enforcementFlow;
private final MongoSearchUpdaterFlow mongoSearchUpdaterFlow;
private final BlockedNamespaces blockedNamespaces;
Expand Down Expand Up @@ -85,7 +80,11 @@ public static SearchUpdaterStream of(final UpdaterConfig updaterConfig,
return new SearchUpdaterStream(enforcementFlow, mongoSearchUpdaterFlow, blockedNamespaces, searchUpdateMapper);
}

// TODO
/**
* Create a flow for a thing-updater.
*
* @return The flow.
*/
public Flow<ThingUpdater.Data, ThingUpdater.Result, NotUsed> flow() {
final Flow<ThingUpdater.Data, ThingUpdater.Data, NotUsed> blockNamespace =
blockNamespaceFlow(data -> data.metadata().getThingId().getNamespace());
Expand Down

0 comments on commit c68eb06

Please sign in to comment.