Skip to content

Commit

Permalink
#288 add merge to client
Browse files Browse the repository at this point in the history
Signed-off-by: Vadim Guenther <vadim.guenther@bosch.io>
  • Loading branch information
VadimGue committed Jan 18, 2021
1 parent d35cda8 commit 874790c
Show file tree
Hide file tree
Showing 15 changed files with 749 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,11 @@ public enum ChangeAction {
/**
* An already existing entry was deleted.
*/
DELETED
DELETED,

/**
* An already existing entry was merged.
*/
MERGED

}
Original file line number Diff line number Diff line change
Expand Up @@ -386,7 +386,7 @@ private CompletableFuture<Thing> processCreate(final Thing thing, @Nullable fina


@Override
public CompletableFuture<Thing> merge(final ThingId thingId, final JsonObject jsonObject,
public CompletableFuture<Void> merge(final ThingId thingId, final JsonObject jsonObject,
final Option<?>... options) {
argumentNotNull(jsonObject);

Expand All @@ -395,12 +395,13 @@ public CompletableFuture<Thing> merge(final ThingId thingId, final JsonObject js
}

@Override
public CompletableFuture<Thing> merge(final ThingId thingId, final Thing thing,
public CompletableFuture<Void> merge(final ThingId thingId, final Thing thing,
final Option<?>... options) {
argumentNotNull(thing);

return askThingCommand(outgoingMessageFactory.mergeThing(thingId, thing, options),
CommandResponse.class,
this::transformModifyResponse).toCompletableFuture();
this::toVoid).toCompletableFuture();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,12 @@
import org.eclipse.ditto.protocoladapter.Adaptable;
import org.eclipse.ditto.protocoladapter.DittoProtocolAdapter;
import org.eclipse.ditto.protocoladapter.HeaderTranslator;
import org.eclipse.ditto.protocoladapter.PayloadPathMatcher;
import org.eclipse.ditto.protocoladapter.ProtocolAdapter;
import org.eclipse.ditto.protocoladapter.ProtocolFactory;
import org.eclipse.ditto.protocoladapter.TopicPath;
import org.eclipse.ditto.protocoladapter.UnknownPathException;
import org.eclipse.ditto.protocoladapter.things.ThingMergePayloadPathMatcher;
import org.eclipse.ditto.signals.acks.base.Acknowledgement;
import org.eclipse.ditto.signals.base.Signal;
import org.eclipse.ditto.signals.commands.base.ErrorResponse;
Expand Down Expand Up @@ -87,6 +90,7 @@
import org.eclipse.ditto.signals.events.things.FeaturesModified;
import org.eclipse.ditto.signals.events.things.ThingCreated;
import org.eclipse.ditto.signals.events.things.ThingDeleted;
import org.eclipse.ditto.signals.events.things.ThingMerged;
import org.eclipse.ditto.signals.events.things.ThingModified;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -336,6 +340,15 @@ private static void registerKeyBasedHandlersForIncomingEvents(final PointerBus b
DittoHeaderDefinition.RESPONSE_REQUIRED
);

/*
* Merge Events are separated by theire path. There is only one command
*/
SelectorUtil.addHandlerForThingEvent(LOGGER, bus, ThingMerged.TYPE, ThingMerged.class,
DefaultDittoClient::mapEventPathToAddress,
(e, extra) -> new ImmutableChange(e.getThingEntityId(), ChangeAction.MERGED,
e.getResourcePath(), e.getValue(), e.getRevision(), e.getTimestamp().orElse(null),
extra, e.getDittoHeaders(), emitAcknowledgement));

/*
* Thing
*/
Expand Down Expand Up @@ -719,4 +732,94 @@ private static Throwable asDittoRuntimeException(final Adaptable adaptable) {
ProtocolFactory.wrapAsJsonifiableAdaptable(adaptable).toJsonString());
}
}

private static String mapEventPathToAddress(final ThingMerged mergedEvent) {
final ThingId thingEntityId = mergedEvent.getThingEntityId();

final PayloadPathMatcher payloadPathMatcher = ThingMergePayloadPathMatcher.getInstance();
final String errorMessage = "No address pointer was found";
switch (payloadPathMatcher.match(mergedEvent.getResourcePath())) {
case "thing":
return MessageFormat.format(
DefaultDittoClient.THING_PATTERN, thingEntityId);
case "attribute":
return filterForAttribute(mergedEvent)
.map(attribute -> MessageFormat.format(
DefaultDittoClient.ATTRIBUTE_PATTERN, thingEntityId, attribute))
.orElseThrow(() -> {
LOGGER.trace(errorMessage);
return new UnknownPathException.Builder(mergedEvent.getResourcePath()).build();
});
case "attributes":
return MessageFormat.format(DefaultDittoClient.ATTRIBUTES_PATTERN,
thingEntityId);
case "feature":
return filterForFeatureId(mergedEvent)
.map(featureId -> MessageFormat.format(DefaultDittoClient.FEATURE_PATTERN,
thingEntityId, featureId))
.orElseThrow(() -> {
LOGGER.trace(errorMessage);
return new UnknownPathException.Builder(mergedEvent.getResourcePath()).build();
});
case "features":
return MessageFormat.format(DefaultDittoClient.FEATURES_PATTERN,
thingEntityId);
case "featureProperty":
return filterForFeatureId(mergedEvent)
.flatMap(featureId -> filterForFeatureProperty(mergedEvent)
.map(featureProperty -> MessageFormat.format(
DefaultDittoClient.FEATURE_PROPERTY_PATTERN,
thingEntityId, featureId, featureProperty))
)
.orElseThrow(() -> {
LOGGER.trace(errorMessage);
return new UnknownPathException.Builder(mergedEvent.getResourcePath()).build();
});
case "featureProperties":
return filterForFeatureId(mergedEvent)
.map(featureId -> MessageFormat.format(DefaultDittoClient.FEATURE_PROPERTIES_PATTERN,
thingEntityId, featureId))
.orElseThrow(() -> {
LOGGER.trace(errorMessage);
return new UnknownPathException.Builder(mergedEvent.getResourcePath()).build();
});
case "featureDesiredProperty":
return filterForFeatureId(mergedEvent)
.flatMap(desiredFeatureId -> filterForFeatureProperty(mergedEvent)
.map(featureProperty -> MessageFormat.format(
DefaultDittoClient.FEATURE_DESIRED_PROPERTY_PATTERN,
thingEntityId, desiredFeatureId, featureProperty)))
.orElseThrow(() -> {
LOGGER.trace(errorMessage);
return new UnknownPathException.Builder(mergedEvent.getResourcePath()).build();
});
case "featureDesiredProperties":
return filterForFeatureId(mergedEvent)
.map(desiredFeatureId -> MessageFormat.format(
DefaultDittoClient.FEATURE_DESIRED_PROPERTIES_PATTERN,
thingEntityId, desiredFeatureId))
.orElseThrow(() -> {
LOGGER.trace(errorMessage);
return new UnknownPathException.Builder(mergedEvent.getResourcePath()).build();
});
default:
LOGGER.trace(errorMessage);
throw new UnknownPathException.Builder(mergedEvent.getResourcePath()).build();
}
}

private static Optional<String> filterForFeatureProperty(final ThingMerged mergedEvent) {
return mergedEvent.getResourcePath().getSubPointer(3)
.map(CharSequence::toString);
}

private static Optional<String> filterForFeatureId(final ThingMerged mergedEvent) {
return mergedEvent.getResourcePath().get(1)
.map(CharSequence::toString);
}

private static Optional<String> filterForAttribute(final ThingMerged mergedEvent) {
return mergedEvent.getResourcePath().getSubPointer(1)
.map(CharSequence::toString);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -339,12 +339,25 @@ public ModifyAttribute setAttribute(final ThingId thingId,
return ModifyAttribute.of(thingId, path, value, buildDittoHeaders(true, options));
}

public MergeThing mergeAttribute(final ThingId thingId,
final JsonPointer path,
final JsonValue value,
final Option<?>... options) {

return MergeThing.withAttribute(thingId, path, value, buildDittoHeaders(true, options));
}

public ModifyAttributes setAttributes(final ThingId thingId, final JsonObject attributes,
final Option<?>... options) {
return ModifyAttributes.of(thingId, ThingsModelFactory.newAttributes(attributes),
buildDittoHeaders(true, options));
}

public MergeThing mergeAttributes(final ThingId thingId, final JsonObject attributes, final Option<?>[] options) {
return MergeThing.withAttributes(thingId, ThingsModelFactory.newAttributes(attributes),
buildDittoHeaders(true, options));
}

public DeleteAttribute deleteAttribute(final ThingId thingId, final JsonPointer path, final Option<?>... options) {
return DeleteAttribute.of(thingId, path, buildDittoHeaders(false, options));
}
Expand All @@ -357,14 +370,26 @@ public ModifyFeature setFeature(final ThingId thingId, final Feature feature, fi
return ModifyFeature.of(thingId, feature, buildDittoHeaders(true, options));
}

public MergeThing mergeFeature(final ThingId thingId, final Feature feature, final Option<?>... options) {
return MergeThing.withFeature(thingId, feature, buildDittoHeaders(true, options));
}

public ModifyFeatures setFeatures(final ThingId thingId, final Features features, final Option<?>... options) {
return ModifyFeatures.of(thingId, features, buildDittoHeaders(true, options));
}

public MergeThing mergeFeatures(final ThingId thingId, final Features features, final Option<?>[] options) {
return MergeThing.withFeatures(thingId, features, buildDittoHeaders(true, options));
}

public ModifyPolicyId setPolicyId(final ThingId thingId, final PolicyId policyId, final Option<?>... options) {
return ModifyPolicyId.of(thingId, policyId, buildDittoHeaders(true, options));
}

public MergeThing mergePolicyId(final ThingId thingId, final PolicyId policyId, final Option<?>... options) {
return MergeThing.withPolicyId(thingId, policyId, buildDittoHeaders(true, options));
}

public RetrieveFeature retrieveFeature(final ThingId thingId, final String featureId, final Option<?>... options) {
return RetrieveFeature.of(thingId, featureId, buildDittoHeaders(false, options));
}
Expand Down Expand Up @@ -404,6 +429,25 @@ public ModifyFeatureDefinition setFeatureDefinition(final ThingId thingId,
return ModifyFeatureDefinition.of(thingId, featureId, featureDefinition, buildDittoHeaders(true, options));
}

/**
* Creates a new {@link MergeThing} object to merge FeatureDefinition.
*
* @param thingId ID of the thing to which the feature belongs to.
* @param featureId ID of the feature to set the definition for.
* @param featureDefinition the FeatureDefinition to be merged.
* @param options options to be applied configuring behaviour of this method, see {@link Option}s.
* @return the command object.
* @throws NullPointerException if any argument is {@code null}.
*/
public MergeThing mergeFeatureDefinition(final ThingId thingId,
final String featureId,
final FeatureDefinition featureDefinition,
final Option<?>... options) {

return MergeThing.withFeatureDefinition(thingId, featureId, featureDefinition,
buildDittoHeaders(true, options));
}

/**
* Creates a new {@link DeleteFeatureDefinition} object.
*
Expand All @@ -428,6 +472,15 @@ public ModifyFeatureProperty setFeatureProperty(final ThingId thingId,
return ModifyFeatureProperty.of(thingId, featureId, path, value, buildDittoHeaders(true, options));
}

public MergeThing mergeFeatureProperty(final ThingId thingId,
final String featureId,
final JsonPointer path,
final JsonValue value,
final Option<?>... options) {

return MergeThing.withFeatureProperty(thingId, featureId, path, value, buildDittoHeaders(true, options));
}

public ModifyFeatureProperties setFeatureProperties(final ThingId thingId,
final String featureId,
final JsonObject properties,
Expand All @@ -437,6 +490,15 @@ public ModifyFeatureProperties setFeatureProperties(final ThingId thingId,
buildDittoHeaders(true, options));
}

public MergeThing mergeFeatureProperties(final ThingId thingId,
final String featureId,
final JsonObject properties,
final Option<?>... options) {

return MergeThing.withFeatureProperties(thingId, featureId, ThingsModelFactory.newFeatureProperties(properties),
buildDittoHeaders(true, options));
}

public DeleteFeatureProperty deleteFeatureProperty(final ThingId thingId,
final String featureId,
final JsonPointer path,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,6 @@ public static <T extends org.eclipse.ditto.signals.events.base.Event> void addHa
"Payload of event was not present"))
.getClass()));

final String address = addressBuilderFunction.apply(event);
final Change change = changeBuilderFunction.apply(event, message.getExtra().orElse(null));

final List<JsonPointer> jsonPointers = change.getValue()
Expand All @@ -142,6 +141,7 @@ public static <T extends org.eclipse.ditto.signals.events.base.Event> void addHa
.orElse(Collections.singletonList(JsonPointer.empty()));

// notify the address where the Change actually happened:
String address = addressBuilderFunction.apply(event);
final JsonPointer jsonPointer = JsonPointer.of(address);
final JsonPointerWithChangePaths
jsonPointerWithChangePaths = new JsonPointerWithChangePaths(jsonPointer, jsonPointers);
Expand Down Expand Up @@ -172,7 +172,7 @@ private static List<JsonPointer> calculateJsonPointerHierarchy(final JsonPointer
if (value.isObject()) {
final List<JsonPointer> pointersOnLevel = new ArrayList<>();
final JsonObject objectOnLevel = value.asObject();
pointersOnLevel.add(pointerOnLevel);
pointersOnLevel.add(pointerOnLevel);

// recurse further "down":
pointersOnLevel.addAll(calculateJsonPointerHierarchy(pointerOnLevel, objectOnLevel));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -308,16 +308,12 @@ public interface CommonManagement<T extends ThingHandle, F extends FeatureHandle
* @param thing which should be used for merged.
* @param options options to be applied configuring behaviour of this method, see {@link
* org.eclipse.ditto.client.options.Options}.
* @return completable future providing the merged Thing object or a specific {@link
* org.eclipse.ditto.model.base.exceptions.DittoRuntimeException} if the operation failed
* @throws IllegalArgumentException if {@code thing} is {@code null} or if it does not contain the field named
* {@code "thingId"}, or if {@code initialPolicy} is {@code null}.
* @throws org.eclipse.ditto.model.base.exceptions.DittoJsonException if {@code thing} cannot be parsed to a {@link
* Thing}.
* @throws org.eclipse.ditto.model.things.ThingIdInvalidException if the {@code thingId} was invalid.
* @return completable future providing {@code null} in case of success or a specific {@link
* org.eclipse.ditto.model.base.exceptions.DittoRuntimeException} if the operation failed.
* @throws IllegalArgumentException if {@code argument} is {@code null}.
* @since 2.0.0
*/
CompletableFuture<Thing> merge(ThingId thingId, Thing thing, Option<?>... options);
CompletableFuture<Void> merge(ThingId thingId, Thing thing, Option<?>... options);

/**
* Merges a {@link Thing} if it does exist based on the given {@link JsonObject}.
Expand All @@ -326,16 +322,12 @@ public interface CommonManagement<T extends ThingHandle, F extends FeatureHandle
* @param thing a JSON object representation of the Thing which should be used for merged.
* @param options options to be applied configuring behaviour of this method, see {@link
* org.eclipse.ditto.client.options.Options}.
* @return completable future providing the created Thing object or a specific {@link
* org.eclipse.ditto.model.base.exceptions.DittoRuntimeException} if the operation failed
* @throws IllegalArgumentException if {@code thing} is {@code null} or if it does not contain the field named
* {@code "thingId"}, or if {@code initialPolicy} is {@code null}.
* @throws org.eclipse.ditto.model.base.exceptions.DittoJsonException if {@code thing} cannot be parsed to a {@link
* Thing}.
* @throws org.eclipse.ditto.model.things.ThingIdInvalidException if the {@code thingId} was invalid.
* @return completable future providing {@code null} in case of success or a specific {@link
* org.eclipse.ditto.model.base.exceptions.DittoRuntimeException} if the operation failed.
* @throws IllegalArgumentException if {@code argument} is {@code null}.
* @since 2.0.0
*/
CompletableFuture<Thing> merge(ThingId thingId, JsonObject thing, Option<?>... options);
CompletableFuture<Void> merge(ThingId thingId, JsonObject thing, Option<?>... options);

/**
* Puts the given {@link Thing}, which means that the Thing might be created or updated. The behaviour can be
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,18 @@ public interface FeatureDefinitionManagement {
*/
CompletableFuture<Void> setDefinition(FeatureDefinition definition, Option<?>... options);

/**
* Merge the given definition to the Feature.
*
* @param definition the FeatureDefinition to be merged.
* @param options options to be applied configuring behaviour of this method, see {@link
* org.eclipse.ditto.client.options.Options}.
* @return a CompletableFuture providing the result of this operation or a specific {@link
* org.eclipse.ditto.model.base.exceptions.DittoRuntimeException} if the operation failed.
* @throws NullPointerException if any argument is {@code null}.
*/
CompletableFuture<Void> mergeDefinition(FeatureDefinition definition, Option<?>... options);

/**
* Deletes the definition of the Feature.
*
Expand Down

0 comments on commit 874790c

Please sign in to comment.