Skip to content

Commit

Permalink
fixed trying to process empty "currentWriteModel" leading to BsonExce…
Browse files Browse the repository at this point in the history
…ption

Signed-off-by: Thomas Jäckle <thomas.jaeckle@beyonnex.io>
  • Loading branch information
thjaeckle committed Jan 25, 2024
1 parent 22bc3ed commit 222de26
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 33 deletions.
Expand Up @@ -35,10 +35,10 @@
import org.bson.BsonString;
import org.bson.conversions.Bson;
import org.eclipse.ditto.internal.models.streaming.AbstractEntityIdWithRevision;
import org.eclipse.ditto.internal.utils.pekko.logging.DittoLoggerFactory;
import org.eclipse.ditto.internal.utils.pekko.logging.ThreadSafeDittoLogger;
import org.eclipse.ditto.internal.utils.metrics.DittoMetrics;
import org.eclipse.ditto.internal.utils.metrics.instruments.counter.Counter;
import org.eclipse.ditto.internal.utils.pekko.logging.DittoLoggerFactory;
import org.eclipse.ditto.internal.utils.pekko.logging.ThreadSafeDittoLogger;
import org.eclipse.ditto.thingsearch.service.persistence.PersistenceConstants;
import org.eclipse.ditto.thingsearch.service.persistence.write.mapping.BsonDiff;
import org.eclipse.ditto.thingsearch.service.updater.actors.MongoWriteModel;
Expand Down Expand Up @@ -222,39 +222,41 @@ private Optional<MongoWriteModel> computeDiff(final ThingWriteModel lastWriteMod
return Optional.empty();
}
final BsonDocument currentWriteModel = getThingDocument();
final var diff = tryComputeDiff(currentWriteModel, lastWriteModel.getThingDocument(), maxWireVersion);
if (currentWriteModel.isEmpty()) {
LOGGER.debug("Skipping update due to empty currentWriteModel <{}>",
((AbstractWriteModel) this).getClass().getSimpleName());
PATCH_SKIP_COUNT.increment();
return Optional.empty();
} else if (diff.isPresent() && diff.get().isDiffSmaller()) {
final var aggregationPipeline = diff.get().consumeAndExport();
if (aggregationPipeline.isEmpty()) {
LOGGER.debug("Skipping update due to {} <{}>", "empty diff",
((AbstractWriteModel) this).getClass().getSimpleName());
LOGGER.trace("Skipping update due to {} <{}>", "empty diff", this);
PATCH_SKIP_COUNT.increment();
return Optional.empty();
}
thingWriteModel = asPatchUpdate(lastWriteModel.getMetadata().getThingRevision());
final var filter = thingWriteModel.getFilter();
mongoWriteModel = new UpdateOneModel<>(filter, aggregationPipeline);
LOGGER.debug("Using incremental update <{}>", mongoWriteModel.getClass().getSimpleName());
LOGGER.trace("Using incremental update <{}>", mongoWriteModel);
PATCH_UPDATE_COUNT.increment();
isPatchUpdate1 = true;
} else {
thingWriteModel = this;
mongoWriteModel = this.toMongo();
LOGGER.debug("Using replacement because diff is bigger or nonexistent: <{}>",
mongoWriteModel.getClass().getSimpleName());
if (LOGGER.isTraceEnabled()) {
LOGGER.trace("Using replacement because diff is bigger or nonexistent. Diff=<{}>",
diff.map(BsonDiff::consumeAndExport));
final var diff = tryComputeDiff(currentWriteModel, lastWriteModel.getThingDocument(), maxWireVersion);
if (diff.isPresent() && diff.get().isDiffSmaller()) {
final var aggregationPipeline = diff.get().consumeAndExport();
if (aggregationPipeline.isEmpty()) {
LOGGER.debug("Skipping update due to {} <{}>", "empty diff",
((AbstractWriteModel) this).getClass().getSimpleName());
LOGGER.trace("Skipping update due to {} <{}>", "empty diff", this);
PATCH_SKIP_COUNT.increment();
return Optional.empty();
}
thingWriteModel = asPatchUpdate(lastWriteModel.getMetadata().getThingRevision());
final var filter = thingWriteModel.getFilter();
mongoWriteModel = new UpdateOneModel<>(filter, aggregationPipeline);
LOGGER.debug("Using incremental update <{}>", mongoWriteModel.getClass().getSimpleName());
LOGGER.trace("Using incremental update <{}>", mongoWriteModel);
PATCH_UPDATE_COUNT.increment();
isPatchUpdate1 = true;
} else {
thingWriteModel = this;
mongoWriteModel = this.toMongo();
LOGGER.debug("Using replacement because diff is bigger or nonexistent: <{}>",
mongoWriteModel.getClass().getSimpleName());
if (LOGGER.isTraceEnabled()) {
LOGGER.trace("Using replacement because diff is bigger or nonexistent. Diff=<{}>",
diff.map(BsonDiff::consumeAndExport));
}
FULL_UPDATE_COUNT.increment();
isPatchUpdate1 = false;
}
FULL_UPDATE_COUNT.increment();
isPatchUpdate1 = false;
}
return Optional.of(MongoWriteModel.of(thingWriteModel, mongoWriteModel, isPatchUpdate1));
}
Expand Down
Expand Up @@ -14,8 +14,11 @@

import static org.eclipse.ditto.base.model.common.ConditionChecker.checkNotNull;

import org.eclipse.ditto.internal.utils.extension.DittoExtensionPoint;
import org.apache.pekko.NotUsed;
import org.apache.pekko.actor.ActorSystem;
import org.apache.pekko.stream.javadsl.Source;
import org.eclipse.ditto.internal.utils.extension.DittoExtensionIds;
import org.eclipse.ditto.internal.utils.extension.DittoExtensionPoint;
import org.eclipse.ditto.internal.utils.metrics.instruments.timer.StartedTimer;
import org.eclipse.ditto.internal.utils.persistence.mongo.DittoMongoClient;
import org.eclipse.ditto.thingsearch.service.persistence.write.model.AbstractWriteModel;
Expand All @@ -25,10 +28,6 @@

import com.typesafe.config.Config;

import org.apache.pekko.NotUsed;
import org.apache.pekko.actor.ActorSystem;
import org.apache.pekko.stream.javadsl.Source;

/**
* Search Update Mapper to be loaded by reflection.
* Can be used as an extension point to use custom map search updates.
Expand Down

0 comments on commit 222de26

Please sign in to comment.