Skip to content

Commit

Permalink
Use $unsetField only for MongoDB 5.0 or above; turn on array indexing…
Browse files Browse the repository at this point in the history
… by default.

Signed-off-by: Yufei Cai <yufei.cai@bosch.io>
  • Loading branch information
yufei-cai committed Jun 10, 2022
1 parent 2c82756 commit e1a415b
Show file tree
Hide file tree
Showing 16 changed files with 112 additions and 61 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -60,4 +60,11 @@ public interface DittoMongoClient extends MongoClient {
*/
MongoClientSettings getClientSettings();

/**
* Returns the max wire version of the MongoDB server, or 0 if the client is disconnected.
*
* @return the max wire version.
*/
int getMaxWireVersion();

}
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import com.mongodb.ServerAddress;
import com.mongodb.WriteConcern;
import com.mongodb.connection.ClusterDescription;
import com.mongodb.connection.ServerDescription;
import com.mongodb.connection.netty.NettyStreamFactoryFactory;
import com.mongodb.event.CommandListener;
import com.mongodb.event.ConnectionPoolListener;
Expand Down Expand Up @@ -223,6 +224,16 @@ public MongoClientSettings getClientSettings() {
return clientSettings;
}

@Override
public int getMaxWireVersion() {
return mongoClient.getClusterDescription()
.getServerDescriptions()
.stream()
.mapToInt(ServerDescription::getMaxWireVersion)
.max()
.orElse(0);
}

@Override
public ClusterDescription getClusterDescription() {
return mongoClient.getClusterDescription();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public final class MongoDbUriSupplierTest {

private static final String SOURCE_URI = "mongodb://user-name-1234-5678-abcdefg:password12345@" +
"first.hostname.com:10000,second.hostname.com:20000,third.hostname.com:30000,fourth.hostname" +
"fifth.hostname.com:50000,sixth.hostname.com:60000,seventh:hostname.com:70000" +
"fifth.hostname.com:50000,sixth.hostname.com:60000,seventh.hostname.com:65000" +
"/database-name?replicaSet=streched-0003&maxIdleTimeMS=240000&w=majority" +
"&readPreference=primaryPreferred&ssl=true&sslInvalidHostNameAllowed=true";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,18 +40,25 @@
*/
final class BsonArrayDiff {

/**
* The minimum max-wire-version of a MongoDB server to support the $unsetField operator.
* Wire version 13 corresponds to MongoDB 5.0.
*/
private static final int MIN_UNSET_WIRE_VERSION = 13;

private static final String ARRAY_ELEM_AT = "$arrayElemAt";
private static final String CONCAT_ARRAYS = "$concatArrays";
private static final String SLICE = "$slice";

static BsonValue diff(final JsonPointer key,
final BsonArray minuend,
final BsonArray subtrahend) {
final BsonArray subtrahend,
final int maxWireVersion) {

return diff(key, minuend, subtrahend, (v, j) -> j);
return diff(key, minuend, subtrahend, maxWireVersion, (v, j) -> j);
}

static BsonDiff diffFeaturesArray(final BsonArray minuend, final BsonArray subtrahend) {
static BsonDiff diffFeaturesArray(final BsonArray minuend, final BsonArray subtrahend, final int maxWireVersion) {
final BsonSizeVisitor bsonSizeVisitor = new BsonSizeVisitor();
final int replacementSize = bsonSizeVisitor.eval(subtrahend);
if (minuend.equals(subtrahend)) {
Expand All @@ -68,7 +75,7 @@ static BsonDiff diffFeaturesArray(final BsonArray minuend, final BsonArray subtr
final BiFunction<BsonDocument, Integer, Integer> kMapGet =
// use 0 as default value to re-use root grant/revoke
(doc, j) -> kMap.getOrDefault(doc.get(FIELD_FEATURE_ID), 0);
final BsonValue difference = diff(internalArrayKey, minuend, subtrahend, kMapGet);
final BsonValue difference = diff(internalArrayKey, minuend, subtrahend, maxWireVersion, kMapGet);
return new BsonDiff(
replacementSize,
bsonSizeVisitor.eval(difference),
Expand All @@ -80,8 +87,9 @@ static BsonDiff diffFeaturesArray(final BsonArray minuend, final BsonArray subtr
private static BsonValue diff(final JsonPointer key,
final BsonArray minuend,
final BsonArray subtrahend,
final int maxWireVersion,
final BiFunction<BsonDocument, Integer, Integer> mostSimilarIndex) {
final List<Element> elements = diffAsElementList(key, minuend, subtrahend, mostSimilarIndex);
final List<Element> elements = diffAsElementList(key, minuend, subtrahend, maxWireVersion, mostSimilarIndex);
final List<ElementGroup> aggregatedElements = aggregate(elements);
if (elements.size() - aggregatedElements.size() > 1 && aggregatedElements.size() > 1) {
// aggregated element groups are suitable for array concatenation syntax.
Expand All @@ -105,6 +113,7 @@ private static BsonValue diff(final JsonPointer key,
private static List<Element> diffAsElementList(final JsonPointer key,
final BsonArray minuend,
final BsonArray subtrahend,
final int maxWireVersion,
final BiFunction<BsonDocument, Integer, Integer> mostSimilarIndex) {

final BsonSizeVisitor bsonSizeVisitor = new BsonSizeVisitor();
Expand All @@ -122,9 +131,12 @@ private static List<Element> diffAsElementList(final JsonPointer key,
final int k = mostSimilarIndex.apply(elementDoc, j);
if (isMostSimilarElementADocument(subtrahend, k)) {
final int replaceSize = bsonSizeVisitor.eval(elementDoc);
final BsonDiff diff = BsonDiff.minus(element.asDocument(), subtrahend.get(k).asDocument(), false);
final BsonDiff diff =
BsonDiff.minus(element.asDocument(), subtrahend.get(k).asDocument(), false, maxWireVersion);
final BsonDiffList diffList = diff.consumeAndExportToList();
final var diffInPipeline = diffList.toBsonInPipeline(getSubtrahendElement(subtrahendExpr, k));
final boolean isUnsetAllowed = maxWireVersion >= MIN_UNSET_WIRE_VERSION;
final var diffInPipeline =
diffList.toBsonInPipeline(getSubtrahendElement(subtrahendExpr, k), isUnsetAllowed);
final boolean diffSizeIsBetter = diffInPipeline.map(bsonSizeVisitor::eval)
.map(diffSize -> diffSize < replaceSize)
.orElse(false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,11 +80,10 @@ static BsonDiff set(final int replacementSize, final int diffSize, final JsonPoi
* @param recurseIntoArrays whether diff computation should descend into arrays.
* @return the change to edit the starting BSON document into the target BSON document.
*/
public static BsonDiff minus(final BsonDocument minuend,
final BsonDocument subtrahend,
final boolean recurseIntoArrays) {
public static BsonDiff minus(final BsonDocument minuend, final BsonDocument subtrahend,
final boolean recurseIntoArrays, final int maxWireVersion) {

return new BsonDiffVisitor(recurseIntoArrays).eval(minuend).apply(subtrahend);
return new BsonDiffVisitor(recurseIntoArrays, maxWireVersion).eval(minuend).apply(subtrahend);
}

/**
Expand All @@ -94,8 +93,19 @@ public static BsonDiff minus(final BsonDocument minuend,
* @param subtrahend the subtrahend document.
* @return the difference.
*/
public static BsonDiff minusThingDocs(final BsonDocument minuend, final BsonDocument subtrahend) {
return minus(minuend, subtrahend, true);
public static BsonDiff minusThingDocs(final BsonDocument minuend, final BsonDocument subtrahend,
final int maxWireVersion) {
// compute the internal array diff especially to find similar elements by internal key
final var minuendFeatures = minuend.getArray(FIELD_F_ARRAY);
final var subtrahendFeatures = subtrahend.getArray(FIELD_F_ARRAY);
final var diffFeatures = BsonArrayDiff.diffFeaturesArray(minuendFeatures, subtrahendFeatures, maxWireVersion);
// compute the rest of the diff without the internal array
final var minuendWithoutInternal = minuend.clone();
final var subtrahendWithoutInternal = subtrahend.clone();
minuendWithoutInternal.remove(FIELD_F_ARRAY);
subtrahendWithoutInternal.remove(FIELD_F_ARRAY);
final var diffWithoutInternal = minus(minuendWithoutInternal, subtrahendWithoutInternal, true, maxWireVersion);
return diffWithoutInternal.concat(diffFeatures);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,7 @@ public final class BsonDiffList {
/**
* $unsetField only possible for top-level fields
*/
private static final String SET_FIELD = "$setField";
private static final BsonString SET_FIELD_REMOVE = new BsonString("$$REMOVE");
private static final String UNSET_FIELD = "$unsetField";
private static final String FIELD = "field";
private static final String INPUT = "input";
private static final String VALUE = "value";
Expand All @@ -55,9 +54,10 @@ public final class BsonDiffList {
*
* @return the diff document, or an empty optional if we gave up.
*/
Optional<BsonValue> toBsonInPipeline(final BsonValue previousDocumentExpression) {
Optional<BsonValue> toBsonInPipeline(final BsonValue previousDocumentExpression, final boolean isUnsetAllowed) {
final boolean hasNestedUnset = unset.stream().anyMatch(pointer -> pointer.getLevelCount() > 1);
if (hasNestedUnset) {
final boolean unsetNotAllowed = !isUnsetAllowed && !unset.isEmpty();
if (hasNestedUnset || unsetNotAllowed) {
return Optional.empty();
}
final BsonValue beforeUnset;
Expand All @@ -84,10 +84,9 @@ private static BsonValue buildUnsetDocument(final BsonValue beforeUnset, final I
if (keys.hasNext()) {
final String key = keys.next().getRoot().map(JsonKey::toString).orElseThrow();
final BsonDocument nextDoc = new BsonDocument()
.append(SET_FIELD, new BsonDocument()
.append(UNSET_FIELD, new BsonDocument()
.append(FIELD, new BsonString(key))
.append(INPUT, beforeUnset)
.append(VALUE, SET_FIELD_REMOVE)
);
return buildUnsetDocument(nextDoc, keys);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,11 @@ final class BsonDiffVisitor implements BsonValueVisitor<Function<BsonValue, Bson

private final BsonSizeVisitor bsonSizeVisitor = new BsonSizeVisitor();
private final boolean recurseIntoArrays;
private final int maxWireVersion;

BsonDiffVisitor(final boolean recurseIntoArrays) {
BsonDiffVisitor(final boolean recurseIntoArrays, final int maxWireVersion) {
this.recurseIntoArrays = recurseIntoArrays;
this.maxWireVersion = maxWireVersion;
}

@Override
Expand Down Expand Up @@ -66,7 +68,7 @@ public Function<BsonValue, BsonDiff> array(final JsonPointer key, final BsonArra
if (value.equals(oldValue)) {
return BsonDiff.empty(replacementSize);
} else if (oldValue.isArray()) {
final var bsonArrayDiff = BsonArrayDiff.diff(key, value, oldValue.asArray());
final var bsonArrayDiff = BsonArrayDiff.diff(key, value, oldValue.asArray(), maxWireVersion);
final var diffSize = key.length() + bsonSizeVisitor.eval(bsonArrayDiff);
if (diffSize <= replacementSize) {
return BsonDiff.set(replacementSize, diffSize, key, bsonArrayDiff);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,8 @@ protected AbstractWriteModel(final Metadata metadata) {
* @return Either the MongoDB write model of this object or an incremental update converting the document of
* the previous model into this one.
*/
public Optional<MongoWriteModel> toIncrementalMongo(@Nullable final AbstractWriteModel previousWriteModel) {
public Optional<MongoWriteModel> toIncrementalMongo(@Nullable final AbstractWriteModel previousWriteModel,
final int maxWireVersion) {
return Optional.of(MongoWriteModel.of(this, toMongo(), false));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,11 +73,12 @@ public static ThingWriteModel of(final Metadata metadata, final BsonDocument thi
}

@Override
public Optional<MongoWriteModel> toIncrementalMongo(@Nullable final AbstractWriteModel previousWriteModel) {
public Optional<MongoWriteModel> toIncrementalMongo(@Nullable final AbstractWriteModel previousWriteModel,
final int maxWireVersion) {
if (previousWriteModel instanceof ThingWriteModel thingWriteModel) {
return computeDiff(thingWriteModel);
return computeDiff(thingWriteModel, maxWireVersion);
} else {
return super.toIncrementalMongo(previousWriteModel);
return super.toIncrementalMongo(previousWriteModel, maxWireVersion);
}
}

Expand Down Expand Up @@ -156,7 +157,7 @@ public String toString() {
"]";
}

private Optional<MongoWriteModel> computeDiff(final ThingWriteModel lastWriteModel) {
private Optional<MongoWriteModel> computeDiff(final ThingWriteModel lastWriteModel, final int maxWireVersion) {
final WriteModel<BsonDocument> mongoWriteModel;
final boolean isPatchUpdate;

Expand All @@ -166,38 +167,40 @@ private Optional<MongoWriteModel> computeDiff(final ThingWriteModel lastWriteMod
PATCH_SKIP_COUNT.increment();
return Optional.empty();
}
final Optional<BsonDiff> diff = tryComputeDiff(getThingDocument(), lastWriteModel.getThingDocument());
final var diff = tryComputeDiff(getThingDocument(), lastWriteModel.getThingDocument(), maxWireVersion);
if (diff.isPresent() && diff.get().isDiffSmaller()) {
final var aggregationPipeline = diff.get().consumeAndExport();
if (aggregationPipeline.isEmpty()) {
LOGGER.debug("Skipping update due to {} <{}>", "empty diff", ((AbstractWriteModel) this).getClass().getSimpleName());
LOGGER.debug("Skipping update due to {} <{}>", "empty diff",
((AbstractWriteModel) this).getClass().getSimpleName());
LOGGER.trace("Skipping update due to {} <{}>", "empty diff", this);
PATCH_SKIP_COUNT.increment();
return Optional.empty();
}
final var filter = asPatchUpdate(lastWriteModel.getMetadata().getThingRevision()).getFilter();
mongoWriteModel = new UpdateOneModel<>(filter, aggregationPipeline);
LOGGER.debug("Using incremental update <{}>", mongoWriteModel.getClass().getSimpleName());
LOGGER.trace("Using incremental update <{}>", mongoWriteModel);
PATCH_UPDATE_COUNT.increment();
LOGGER.debug("Using incremental update <{}>", mongoWriteModel.getClass().getSimpleName());
LOGGER.trace("Using incremental update <{}>", mongoWriteModel);
PATCH_UPDATE_COUNT.increment();
isPatchUpdate = true;
} else {
mongoWriteModel = this.toMongo();
LOGGER.debug("Using replacement because diff is bigger or nonexistent: <{}>",
LOGGER.debug("Using replacement because diff is bigger or nonexistent: <{}>",
mongoWriteModel.getClass().getSimpleName());
if (LOGGER.isTraceEnabled()) {
if (LOGGER.isTraceEnabled()) {
LOGGER.trace("Using replacement because diff is bigger or nonexistent. Diff=<{}>",
diff.map(BsonDiff::consumeAndExport));
}
FULL_UPDATE_COUNT.increment();
}
FULL_UPDATE_COUNT.increment();
isPatchUpdate = false;
}
return Optional.of(MongoWriteModel.of(this, mongoWriteModel, isPatchUpdate));
}

private Optional<BsonDiff> tryComputeDiff(final BsonDocument minuend, final BsonDocument subtrahend) {
private Optional<BsonDiff> tryComputeDiff(final BsonDocument minuend, final BsonDocument subtrahend,
final int maxWireVersion) {
try {
return Optional.of(BsonDiff.minusThingDocs(minuend, subtrahend));
return Optional.of(BsonDiff.minusThingDocs(minuend, subtrahend, maxWireVersion));
} catch (final BsonInvalidOperationException e) {
LOGGER.error("Failed to compute BSON diff between <{}> and <{}>", minuend, subtrahend, e);
return Optional.empty();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,11 @@ public final class DefaultSearchUpdateMapper extends SearchUpdateMapper {
*/
@SuppressWarnings("unused")
private DefaultSearchUpdateMapper(final ActorSystem actorSystem) {
super(actorSystem);
// Nothing to initialize.
this(actorSystem, 0);
}

private DefaultSearchUpdateMapper(final ActorSystem actorSystem, final Integer maxWireVersion) {
super(actorSystem, maxWireVersion);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,11 @@
import org.eclipse.ditto.internal.utils.akka.AkkaClassLoader;
import org.eclipse.ditto.internal.utils.config.DefaultScopedConfig;
import org.eclipse.ditto.internal.utils.metrics.instruments.timer.StartedTimer;
import org.eclipse.ditto.internal.utils.persistence.mongo.DittoMongoClient;
import org.eclipse.ditto.thingsearch.service.common.config.DittoSearchConfig;
import org.eclipse.ditto.thingsearch.service.common.config.SearchConfig;
import org.eclipse.ditto.thingsearch.service.persistence.write.model.AbstractWriteModel;
import org.eclipse.ditto.thingsearch.service.starter.actors.MongoClientExtension;
import org.eclipse.ditto.thingsearch.service.updater.actors.MongoWriteModel;
import org.slf4j.Logger;

Expand All @@ -42,9 +44,11 @@ public abstract class SearchUpdateMapper implements Extension {
private static final ExtensionId EXTENSION_ID = new ExtensionId();

protected final ActorSystem actorSystem;
protected final int maxWireVersion;

protected SearchUpdateMapper(final ActorSystem actorSystem) {
protected SearchUpdateMapper(final ActorSystem actorSystem, final Integer maxWireVersion) {
this.actorSystem = actorSystem;
this.maxWireVersion = maxWireVersion;
}

/**
Expand Down Expand Up @@ -75,10 +79,10 @@ public static SearchUpdateMapper get(final ActorSystem actorSystem) {
* @return a singleton list of write model together with its update document, or an empty list if there is no
* change.
*/
protected static Source<MongoWriteModel, NotUsed>
protected Source<MongoWriteModel, NotUsed>
toIncrementalMongo(final AbstractWriteModel model, final AbstractWriteModel lastWriteModel, final Logger logger) {
try {
final var mongoWriteModelOpt = model.toIncrementalMongo(lastWriteModel);
final var mongoWriteModelOpt = model.toIncrementalMongo(lastWriteModel, maxWireVersion);
if (mongoWriteModelOpt.isEmpty()) {
logger.debug("Write model is unchanged, skipping update: <{}>", model);
model.getMetadata().sendWeakAck(null);
Expand All @@ -89,8 +93,7 @@ public static SearchUpdateMapper get(final ActorSystem actorSystem) {
logger.debug("MongoWriteModel={}", result);
return Source.single(result);
}
}
catch (final Exception error) {
} catch (final Exception error) {
logger.error("Failed to compute write model " + model, error);
try {
model.getMetadata().getTimers().forEach(StartedTimer::stop);
Expand All @@ -112,10 +115,13 @@ public SearchUpdateMapper createExtension(final ExtendedActorSystem system) {
DittoSearchConfig.of(DefaultScopedConfig.dittoScoped(
system.settings().config()));

final DittoMongoClient client = MongoClientExtension.get(system).getUpdaterClient();
final int maxWireVersion = client.getMaxWireVersion();

return AkkaClassLoader.instantiate(system, SearchUpdateMapper.class,
searchConfig.getSearchUpdateMapperImplementation(),
List.of(ActorSystem.class),
List.of(system));
List.of(ActorSystem.class, Integer.class),
List.of(system, maxWireVersion));
}
}

Expand Down
Loading

0 comments on commit e1a415b

Please sign in to comment.