Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
Take Thing ID into account in IndexLengthRestrictionEnforcer
Attempt to prevent creation of documents whose index entry
does not fit in 1024 bytes.

Index content is now restricted thus:

namespace + thing-id + feature-id + name-of-attribute-or-feature-property + value <= 950.

Values of (top-level) attributes and feature properties exceeding the limit are truncated.

Signed-off-by: Cai Yufei (INST/ECS1) <yufei.cai@bosch-si.com>
  • Loading branch information
yufei-cai committed May 23, 2018
1 parent 4203c5d commit dd36946
Show file tree
Hide file tree
Showing 5 changed files with 68 additions and 109 deletions.
Expand Up @@ -27,30 +27,14 @@ public abstract class AbstractThingsSearchUpdaterPersistence implements ThingsSe
* The logger.
*/
protected final LoggingAdapter log;
/**
* The restriction helper.
*/
protected final IndexLengthRestrictionEnforcer indexLengthRestrictionEnforcer;

/**
* Default contructor.
*
* @param loggingAdapter the logger to use for logging.
*/
public AbstractThingsSearchUpdaterPersistence(final LoggingAdapter loggingAdapter) {
this(loggingAdapter, IndexLengthRestrictionEnforcer.newInstance(loggingAdapter));
}

/**
* Default contructor.
*
* @param loggingAdapter the logger to use for logging.
* @param indexLengthRestrictionEnforcer the restriction helper.
*/
public AbstractThingsSearchUpdaterPersistence(final LoggingAdapter loggingAdapter,
final IndexLengthRestrictionEnforcer indexLengthRestrictionEnforcer) {
this.log = loggingAdapter;
this.indexLengthRestrictionEnforcer = indexLengthRestrictionEnforcer;
log = loggingAdapter;
}

/**
Expand All @@ -60,7 +44,7 @@ public AbstractThingsSearchUpdaterPersistence(final LoggingAdapter loggingAdapte
public Source<Boolean, NotUsed> insertOrUpdate(final Thing thing, final long revision, final long
policyRevision) {
// enforce the restrictions on the data
final Thing toSave = indexLengthRestrictionEnforcer.enforceRestrictions(thing);
final Thing toSave = IndexLengthRestrictionEnforcer.enforceRestrictions(log, thing);
return save(toSave, revision, policyRevision)
.recoverWithRetries(1, errorRecovery(getThingId(toSave)));
}
Expand Down
Expand Up @@ -11,6 +11,8 @@
*/
package org.eclipse.ditto.services.thingsearch.persistence.write;

import static org.eclipse.ditto.services.thingsearch.persistence.PersistenceConstants.SLASH;

import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
Expand All @@ -32,7 +34,6 @@
import org.eclipse.ditto.model.things.FeaturesBuilder;
import org.eclipse.ditto.model.things.Thing;
import org.eclipse.ditto.model.things.ThingBuilder;
import org.eclipse.ditto.services.thingsearch.persistence.PersistenceConstants;

import akka.event.LoggingAdapter;

Expand All @@ -43,42 +44,24 @@
public final class IndexLengthRestrictionEnforcer {

/**
* Max allowed length of feature properties.
*/
static final int MAX_FEATURE_PROPERTY_VALUE_LENGTH = 950;
/**
* Max allowed length of attributes.
*/
static final int MAX_ATTRIBUTE_VALUE_LENGTH = 950;

/**
* The overhead caused by the json key of attribute entries. Use {@link IndexLengthRestrictionEnforcer#attributeOverhead()}
* ()} for calculating the concrete overhead.
*/
private static final int ATTRIBUTE_KEY_OVERHEAD = ("attributes").length();

/**
* The overhead caused by the json key of feature properties. Use {@link IndexLengthRestrictionEnforcer#featurePropertyOverhead(String)}
* for calculating the concrete overhead.
* Max allowed length of index content.
*/
private static final int FEATURE_PROPERTY_KEY_OVERHEAD = ("features" +
PersistenceConstants.SLASH
// feature id
+ PersistenceConstants.SLASH
+ "properties").length();
static final int MAX_INDEX_CONTENT_LENGTH = 950;

/**
* The logging adapter used to log size restriction enforcements.
*/
private final LoggingAdapter log;
private final int thingIdNamespaceOverhead;

/**
* Default constructor.
*
* @param log the logging adapter used to log size restriction enforcements.
*/
private IndexLengthRestrictionEnforcer(final LoggingAdapter log) {
private IndexLengthRestrictionEnforcer(final LoggingAdapter log, final String thingId) {
this.log = log;
this.thingIdNamespaceOverhead = calculateThingIdNamespaceOverhead(thingId);
}

/**
Expand All @@ -87,8 +70,14 @@ private IndexLengthRestrictionEnforcer(final LoggingAdapter log) {
* @param loggingAdapter the logging adapter used to log size restriction enforcements.
* @return the instance.
*/
public static IndexLengthRestrictionEnforcer newInstance(final LoggingAdapter loggingAdapter) {
return new IndexLengthRestrictionEnforcer(loggingAdapter);
public static IndexLengthRestrictionEnforcer newInstance(final LoggingAdapter loggingAdapter,
final String thingId) {
return new IndexLengthRestrictionEnforcer(loggingAdapter, thingId);
}

public static Thing enforceRestrictions(final LoggingAdapter log, final Thing thing) {
return new IndexLengthRestrictionEnforcer(log, thing.getId().orElse(""))
.enforceRestrictionsWithoutCheckingIdOverhead(thing);
}

/**
Expand All @@ -98,6 +87,10 @@ public static IndexLengthRestrictionEnforcer newInstance(final LoggingAdapter lo
* @return The thing with content that satisfies the thresholds.
*/
public Thing enforceRestrictions(final Thing thing) {
return IndexLengthRestrictionEnforcer.enforceRestrictions(log, thing);
}

private Thing enforceRestrictionsWithoutCheckingIdOverhead(final Thing thing) {
// check if features exceed limits
final Map<Feature, Set<JsonField>> exceededFeatures =
calculateThresholdViolations(thing.getFeatures().orElse(Features.newBuilder().build()));
Expand All @@ -123,7 +116,7 @@ public Thing enforceRestrictions(final Thing thing) {
fixViolation(jsonField.getKey().asPointer(),
jsonField.getValue(),
featurePropertyOverhead(feature.getId()),
MAX_FEATURE_PROPERTY_VALUE_LENGTH))));
MAX_INDEX_CONTENT_LENGTH))));
intermediateThing = builder.build();
}

Expand Down Expand Up @@ -187,14 +180,14 @@ public JsonValue enforceRestrictionsOnFeatureProperty(final String featureId, fi
if (violatesThreshold(featurePointer,
propertyValue,
featurePropertyOverhead(featureId),
MAX_FEATURE_PROPERTY_VALUE_LENGTH)) {
MAX_INDEX_CONTENT_LENGTH)) {
log.warning("Feature Property <{}> of Feature <{}> exceeds size restrictions.",
featurePointer.toString(),
featureId);
return fixViolation(featurePointer,
propertyValue,
featurePropertyOverhead(featureId),
MAX_FEATURE_PROPERTY_VALUE_LENGTH);
MAX_INDEX_CONTENT_LENGTH);
}
return propertyValue;
}
Expand Down Expand Up @@ -256,9 +249,9 @@ public JsonValue enforceRestrictionsOnAttributeValue(final JsonPointer attribute
final JsonValue attributeValue) {
if (violatesThreshold(attributeKey, attributeValue,
attributeOverhead(),
MAX_ATTRIBUTE_VALUE_LENGTH)) {
MAX_INDEX_CONTENT_LENGTH)) {
log.warning("Attribute <{}> exceeds size restrictions", attributeKey.toString());
return fixViolation(attributeKey, attributeValue, attributeOverhead(), MAX_ATTRIBUTE_VALUE_LENGTH);
return fixViolation(attributeKey, attributeValue, attributeOverhead(), MAX_INDEX_CONTENT_LENGTH);
}
return attributeValue;
}
Expand Down Expand Up @@ -290,7 +283,7 @@ private Attributes fixViolations(final Set<JsonField> exceededAttributes,
fixViolation(field.getKey().asPointer(),
field.getValue(),
attributeOverhead(),
MAX_ATTRIBUTE_VALUE_LENGTH)));
MAX_INDEX_CONTENT_LENGTH)));
return builder.build();
}

Expand All @@ -304,7 +297,7 @@ private FeatureProperties fixViolations(final String featureId,
jsonField.getKey().asPointer(),
jsonField.getValue(),
featurePropertyOverhead(featureId),
MAX_FEATURE_PROPERTY_VALUE_LENGTH);
MAX_INDEX_CONTENT_LENGTH);
featurePropertiesBuilder.set(jsonField.getKey().asPointer(), restrictedValue);
});

Expand All @@ -315,7 +308,7 @@ private JsonValue fixViolation(final JsonPointer key,
final JsonValue value,
final int overhead,
final int threshold) {
final int cutAt = threshold - totalOverhead(key, overhead);
final int cutAt = Math.max(0, threshold - totalOverhead(key, overhead));
return JsonValue.of(value.asString().substring(0, cutAt));
}

Expand All @@ -325,7 +318,7 @@ private Set<JsonField> calculateThresholdViolations(final Attributes attributes)
.filter(field -> violatesThreshold(field.getKey().asPointer(),
field.getValue(),
attributeOverhead(),
MAX_ATTRIBUTE_VALUE_LENGTH))
MAX_INDEX_CONTENT_LENGTH))
.collect(Collectors.toSet());
}

Expand All @@ -352,7 +345,7 @@ private Set<JsonField> calculateThresholdViolations(final String featureId,
field.getKey().asPointer(),
field.getValue(),
featurePropertyOverhead(featureId),
MAX_FEATURE_PROPERTY_VALUE_LENGTH))
MAX_INDEX_CONTENT_LENGTH))
.collect(Collectors.toSet());
}

Expand All @@ -367,7 +360,7 @@ private boolean violatesThreshold(final JsonPointer key,
}

private int totalOverhead(final JsonPointer key, final int additionalOverhead) {
return key.toString().length() + additionalOverhead;
return jsonPointerLengthWithoutStartingSlash(key) + additionalOverhead;
}

/**
Expand All @@ -376,7 +369,7 @@ private int totalOverhead(final JsonPointer key, final int additionalOverhead) {
* @return The overhead as a positive int.
*/
private int attributeOverhead() {
return ATTRIBUTE_KEY_OVERHEAD;
return thingIdNamespaceOverhead;
}

/**
Expand All @@ -386,7 +379,7 @@ private int attributeOverhead() {
* @return The overhead as a positive int.
*/
private int featurePropertyOverhead(final String featureKey) {
return FEATURE_PROPERTY_KEY_OVERHEAD + featureKey.length();
return thingIdNamespaceOverhead + featureKey.length();
}


Expand All @@ -400,5 +393,17 @@ private String getThingId(final Thing thing) {
return thing.getId().orElseThrow(() -> new IllegalArgumentException("The thing has no ID!"));
}

private static int calculateThingIdNamespaceOverhead(final String thingId) {
final int namespaceLength = Math.max(0, thingId.indexOf(':'));
return thingId.length() + namespaceLength;
}

private static int jsonPointerLengthWithoutStartingSlash(final JsonPointer jsonPointer) {
final String stringRepresentation = jsonPointer.toString();
return stringRepresentation.startsWith(SLASH)
? stringRepresentation.length() - 1
: stringRepresentation.length();
}

}

Expand Up @@ -45,6 +45,7 @@
import org.eclipse.ditto.services.thingsearch.persistence.mapping.ThingDocumentMapper;
import org.eclipse.ditto.services.thingsearch.persistence.write.AbstractThingsSearchUpdaterPersistence;
import org.eclipse.ditto.services.thingsearch.persistence.write.EventToPersistenceStrategyFactory;
import org.eclipse.ditto.services.thingsearch.persistence.write.IndexLengthRestrictionEnforcer;
import org.eclipse.ditto.services.thingsearch.persistence.write.ThingMetadata;
import org.eclipse.ditto.services.thingsearch.persistence.write.ThingsSearchUpdaterPersistence;
import org.eclipse.ditto.services.utils.persistence.mongo.MongoClientWrapper;
Expand Down Expand Up @@ -139,7 +140,8 @@ private static Document toUpdate(final Document document) {
@Override
protected final Source<Boolean, NotUsed> save(final Thing thing, final long revision, final long policyRevision) {
log.debug("Saving Thing with revision <{}> and policy revision <{}>: <{}>", revision, policyRevision, thing);
final Bson filter = filterWithLowerThingRevisionOrLowerPolicyRevision(getThingId(thing), revision, policyRevision);
final Bson filter =
filterWithLowerThingRevisionOrLowerPolicyRevision(getThingId(thing), revision, policyRevision);
final Document document = toUpdate(ThingDocumentMapper.toDocument(thing), revision, policyRevision);
return Source.fromPublisher(collection.updateOne(filter, document, new UpdateOptions().upsert(true)))
.map(updateResult -> updateResult.getMatchedCount() > 0 || null != updateResult.getUpsertedId());
Expand Down Expand Up @@ -257,7 +259,7 @@ private <T extends ThingEvent> List<WriteModel<Document>> createThingUpdates(fin
final T thingEvent) {
final List<Bson> updates = persistenceStrategyFactory
.getStrategy(thingEvent)
.thingUpdates(thingEvent, indexLengthRestrictionEnforcer);
.thingUpdates(thingEvent, IndexLengthRestrictionEnforcer.newInstance(log, thingEvent.getThingId()));
return updates
.stream()
.map(update -> new UpdateOneModel<Document>(filter, update, new UpdateOptions().upsert(true)))
Expand Down

0 comments on commit dd36946

Please sign in to comment.