Skip to content

Commit

Permalink
Adapt to the changes made in ditto regarding 'Fix WithId+WithIdButAct…
Browse files Browse the repository at this point in the history
…uallyNot'

* getId() doesn't exist anymore -> assertions adapted (hint: hasThingId is still there)
* WithId renamed -> WithEntityId
* Signal doesn't implement WithEntityId anymore
* Make sure received signals have a thingId. Signals without an id
should not arrive, in case it happend, log a warning and drop them.

Signed-off-by: Joel Bartelheimer <joel.bartelheimer@bosch.io>
  • Loading branch information
jbartelh committed Mar 18, 2021
1 parent 3f32158 commit e8c6ac1
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 36 deletions.
Expand Up @@ -23,14 +23,14 @@
import org.eclipse.ditto.json.JsonValue;
import org.eclipse.ditto.model.base.entity.type.WithEntityType;
import org.eclipse.ditto.model.base.headers.DittoHeadersSettable;
import org.eclipse.ditto.signals.base.WithId;
import org.eclipse.ditto.signals.base.WithEntityId;

/**
* Common interface for all Thing related changes.
*
* @since 1.0.0
*/
public interface Change extends WithId, WithEntityType, DittoHeadersSettable<Change>, Acknowledgeable {
public interface Change extends WithEntityId, WithEntityType, DittoHeadersSettable<Change>, Acknowledgeable {

/**
* Returns the {@link ChangeAction} which caused this change.
Expand Down
Expand Up @@ -66,9 +66,11 @@
import org.eclipse.ditto.model.things.Thing;
import org.eclipse.ditto.model.things.ThingId;
import org.eclipse.ditto.model.things.ThingsModelFactory;
import org.eclipse.ditto.model.things.WithThingId;
import org.eclipse.ditto.protocoladapter.Adaptable;
import org.eclipse.ditto.protocoladapter.TopicPath;
import org.eclipse.ditto.signals.base.Signal;
import org.eclipse.ditto.signals.base.WithEntityId;
import org.eclipse.ditto.signals.base.WithOptionalEntity;
import org.eclipse.ditto.signals.commands.base.CommandResponse;
import org.eclipse.ditto.signals.commands.things.ThingErrorResponse;
Expand Down Expand Up @@ -654,12 +656,12 @@ protected AdaptableBus.SubscriptionId subscribe(
final String protocolCommand,
final String protocolCommandAck,
final CompletableFuture<Void> futureToCompleteOrFailAfterAck,
final Function<Adaptable, Message<?>> adaptableToMessage) {
final Function<Adaptable, Optional<Message<?>>> adaptableToMessage) {

return subscribeAndPublishMessage(previousSubscriptionId, streamingType, protocolCommand, protocolCommandAck,
futureToCompleteOrFailAfterAck, adaptable -> bus -> {
final Message<?> message = adaptableToMessage.apply(adaptable);
bus.notify(message.getSubject(), message);
adaptableToMessage.apply(adaptable)
.ifPresent(message -> bus.notify(message.getSubject(), message));
});
}

Expand Down Expand Up @@ -745,17 +747,29 @@ protected void unsubscribe(@Nullable final AdaptableBus.SubscriptionId subscript
}
}

protected static Message<?> asThingMessage(final Adaptable adaptable) {
/**
* Build a {@link Message} out of the given {@link Adaptable}.
*
* @param adaptable from which the things {@link Message} shall be build from.
* @return empty if the adaptable doesn't provide a thingId, or the build {@link Message}.
*/
protected static Optional<Message<?>> asThingMessage(final Adaptable adaptable) {
final Signal<?> signal = PROTOCOL_ADAPTER.fromAdaptable(adaptable);
final ThingId thingId = ThingId.of(signal.getEntityId());
final MessageHeaders messageHeaders =
MessageHeaders.newBuilder(MessageDirection.FROM, thingId, signal.getType())
.correlationId(signal.getDittoHeaders().getCorrelationId().orElse(null))
.build();
return Message.newBuilder(messageHeaders)
.payload(signal)
.extra(adaptable.getPayload().getExtra().orElse(null))
.build();
if (signal instanceof WithThingId) {
final ThingId thingId = ((WithThingId) signal).getThingEntityId();
final MessageHeaders messageHeaders = MessageHeaders
.newBuilder(MessageDirection.FROM, thingId, signal.getType())
.correlationId(signal.getDittoHeaders().getCorrelationId().orElse(null))
.build();
final Message<Object> message = Message.newBuilder(messageHeaders)
.payload(signal)
.extra(adaptable.getPayload().getExtra().orElse(null))
.build();
return Optional.of(message);
} else {
LOGGER.warn("Cannot build ThingMessage out of Signal without an ThingId: <{}>", signal);
return Optional.empty();
}
}

private static void adjoin(final CompletionStage<?> stage, final CompletableFuture<Void> future) {
Expand Down
Expand Up @@ -113,7 +113,6 @@ public void thingCreatedReturnsExpected() {

assertThat(thingCreated)
.hasType(ThingCreated.TYPE)
.hasId(THING_ID)
.hasThingId(THING_ID)
.hasEntity(THING_V2.toJson())
.hasRevision(-1);
Expand All @@ -128,7 +127,6 @@ public void thingDeletedReturnsExpected() {

assertThat(thingDeleted)
.hasType(ThingDeleted.TYPE)
.hasId(THING_ID)
.hasThingId(THING_ID)
.hasNoEntity()
.hasRevision(-1);
Expand All @@ -143,7 +141,6 @@ public void thingModifiedReturnsExpected() {

assertThat(thingModified)
.hasType(ThingModified.TYPE)
.hasId(THING_ID)
.hasThingId(THING_ID)
.hasEntity(THING_V2.toJson())
.hasRevision(-1);
Expand All @@ -159,7 +156,6 @@ public void attributeCreatedReturnsExpected() {

assertThat(attributeCreated)
.hasType(AttributeCreated.TYPE)
.hasId(THING_ID)
.hasThingId(THING_ID)
.hasEntity(ATTRIBUTE_VALUE)
.hasResourcePath(JsonFactory.newPointer("/attributes" + ATTRIBUTE_JSON_POINTER))
Expand All @@ -175,7 +171,6 @@ public void attributeDeletedReturnsExpected() {

assertThat(attributeDeleted)
.hasType(AttributeDeleted.TYPE)
.hasId(THING_ID)
.hasThingId(THING_ID)
.hasNoEntity()
.hasResourcePath(ATTRIBUTE_RESOURCE_PATH)
Expand All @@ -192,7 +187,6 @@ public void attributeModifiedReturnsExpected() {

assertThat(attributeModified)
.hasType(AttributeModified.TYPE)
.hasId(THING_ID)
.hasThingId(THING_ID)
.hasEntity(ATTRIBUTE_VALUE)
.hasResourcePath(ATTRIBUTE_RESOURCE_PATH)
Expand All @@ -208,7 +202,6 @@ public void attributesCreatedReturnsExpected() {

assertThat(attributesCreated)
.hasType(AttributesCreated.TYPE)
.hasId(THING_ID)
.hasThingId(THING_ID)
.hasEntity(ATTRIBUTES.toJson(SCHEMA_VERSION, FieldType.regularOrSpecial()))
.hasResourcePath(ATTRIBUTES_POINTER)
Expand All @@ -224,7 +217,6 @@ public void attributesDeletedReturnsExpected() {

assertThat(attributesDeleted)
.hasType(AttributesDeleted.TYPE)
.hasId(THING_ID)
.hasThingId(THING_ID)
.hasNoEntity()
.hasResourcePath(ATTRIBUTES_POINTER)
Expand All @@ -240,7 +232,6 @@ public void attributesModifiedReturnsExpected() {

assertThat(attributesModified)
.hasType(AttributesModified.TYPE)
.hasId(THING_ID)
.hasThingId(THING_ID)
.hasEntity(ATTRIBUTES.toJson(SCHEMA_VERSION, FieldType.regularOrSpecial()))
.hasResourcePath(ATTRIBUTES_POINTER)
Expand All @@ -256,7 +247,6 @@ public void featureCreatedReturnsExpected() {

assertThat(featureCreated)
.hasType(FeatureCreated.TYPE)
.hasId(THING_ID)
.hasThingId(THING_ID)
.hasEntity(FLUX_CAPACITOR.toJson(SCHEMA_VERSION, FieldType.notHidden()))
.hasResourcePath(FEATURES_POINTER.append(JsonPointer.of(FLUX_CAPACITOR_ID)))
Expand All @@ -272,7 +262,6 @@ public void featureDeletedReturnsExpected() {

assertThat(featureDeleted)
.hasType(FeatureDeleted.TYPE)
.hasId(THING_ID)
.hasThingId(THING_ID)
.hasNoEntity()
.hasResourcePath(FEATURES_POINTER.append(JsonPointer.of(FLUX_CAPACITOR_ID)))
Expand All @@ -288,7 +277,6 @@ public void featureModifiedReturnsExpected() {

assertThat(featureModified)
.hasType(FeatureModified.TYPE)
.hasId(THING_ID)
.hasThingId(THING_ID)
.hasEntity(FLUX_CAPACITOR.toJson(SCHEMA_VERSION, FieldType.notHidden()))
.hasResourcePath(FEATURES_POINTER.append(JsonPointer.of(FLUX_CAPACITOR_ID)))
Expand All @@ -304,7 +292,6 @@ public void featuresCreatedReturnsExpected() {

assertThat(featuresCreated)
.hasType(FeaturesCreated.TYPE)
.hasId(THING_ID)
.hasThingId(THING_ID)
.hasEntity(FEATURES.toJson(SCHEMA_VERSION, FieldType.notHidden()))
.hasResourcePath(FEATURES_POINTER)
Expand All @@ -320,7 +307,6 @@ public void featuresDeletedReturnsExpected() {

assertThat(featuresDeleted)
.hasType(FeaturesDeleted.TYPE)
.hasId(THING_ID)
.hasThingId(THING_ID)
.hasNoEntity()
.hasResourcePath(FEATURES_POINTER)
Expand All @@ -336,7 +322,6 @@ public void featuresModifiedReturnsExpected() {

assertThat(featuresModified)
.hasType(FeaturesModified.TYPE)
.hasId(THING_ID)
.hasThingId(THING_ID)
.hasEntity(FEATURES.toJson(SCHEMA_VERSION, FieldType.notHidden()))
.hasResourcePath(FEATURES_POINTER)
Expand All @@ -353,7 +338,6 @@ public void featurePropertiesCreatedReturnsExpected() {

assertThat(featurePropertiesCreated)
.hasType(FeaturePropertiesCreated.TYPE)
.hasId(THING_ID)
.hasThingId(THING_ID)
.hasEntity(FLUX_CAPACITOR_PROPERTIES.toJson(SCHEMA_VERSION, FieldType.regularOrSpecial()))
.hasResourcePath(JsonFactory.newPointer(FEATURES_POINTER + "/" + FLUX_CAPACITOR_ID + "/properties"))
Expand All @@ -370,7 +354,6 @@ public void featurePropertiesDeletedReturnsExpected() {

assertThat(featurePropertiesDeleted)
.hasType(FeaturePropertiesDeleted.TYPE)
.hasId(THING_ID)
.hasThingId(THING_ID)
.hasNoEntity()
.hasResourcePath(JsonFactory.newPointer(FEATURES_POINTER + "/" + FLUX_CAPACITOR_ID + "/properties"))
Expand All @@ -387,7 +370,6 @@ public void featurePropertiesModifiedReturnsExpected() {

assertThat(featurePropertiesModified)
.hasType(FeaturePropertiesModified.TYPE)
.hasId(THING_ID)
.hasThingId(THING_ID)
.hasEntity(FLUX_CAPACITOR_PROPERTIES.toJson(SCHEMA_VERSION, FieldType.regularOrSpecial()))
.hasResourcePath(JsonFactory.newPointer(FEATURES_POINTER + "/" + FLUX_CAPACITOR_ID + "/properties"))
Expand All @@ -404,7 +386,6 @@ public void featurePropertyCreatedReturnsExpected() {

assertThat(featurePropertyCreated)
.hasType(FeaturePropertyCreated.TYPE)
.hasId(THING_ID)
.hasThingId(THING_ID)
.hasEntity(PROPERTY_VALUE)
.hasResourcePath(JsonFactory.newPointer(FEATURES_POINTER + "/" + FLUX_CAPACITOR_ID + "/properties"
Expand All @@ -422,7 +403,6 @@ public void featurePropertyDeletedReturnsExpected() {

assertThat(featurePropertyDeleted)
.hasType(FeaturePropertyDeleted.TYPE)
.hasId(THING_ID)
.hasThingId(THING_ID)
.hasNoEntity()
.hasResourcePath(JsonFactory.newPointer(FEATURES_POINTER + "/" + FLUX_CAPACITOR_ID + "/properties"
Expand All @@ -440,7 +420,6 @@ public void featurePropertyModifiedReturnsExpected() {

assertThat(featurePropertyModified)
.hasType(FeaturePropertyModified.TYPE)
.hasId(THING_ID)
.hasThingId(THING_ID)
.hasEntity(PROPERTY_VALUE)
.hasResourcePath(JsonFactory.newPointer(FEATURES_POINTER + "/" + FLUX_CAPACITOR_ID + "/properties"
Expand Down

0 comments on commit e8c6ac1

Please sign in to comment.