Skip to content

Commit

Permalink
Review:
Browse files Browse the repository at this point in the history
* Adapt to review changes in ditto

Signed-off-by: Yannic Klem <yannic.klem@bosch.io>
  • Loading branch information
Yannic92 committed Apr 12, 2021
1 parent 2e135a6 commit a0a9707
Show file tree
Hide file tree
Showing 150 changed files with 413 additions and 414 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import org.eclipse.ditto.json.JsonValue;
import org.eclipse.ditto.model.base.entity.type.WithEntityType;
import org.eclipse.ditto.model.base.headers.DittoHeadersSettable;
import org.eclipse.ditto.signals.base.WithEntityId;
import org.eclipse.ditto.model.base.entity.id.WithEntityId;

/**
* Common interface for all Thing related changes.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -306,7 +306,7 @@ public String getManifest() {
}

@Override
public ThingId getThingEntityId() {
public ThingId getEntityId() {
return (ThingId) ack.getEntityId();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,10 +66,10 @@
import org.eclipse.ditto.model.things.Thing;
import org.eclipse.ditto.model.things.ThingId;
import org.eclipse.ditto.model.things.ThingsModelFactory;
import org.eclipse.ditto.model.things.WithThingId;
import org.eclipse.ditto.protocoladapter.Adaptable;
import org.eclipse.ditto.protocoladapter.TopicPath;
import org.eclipse.ditto.signals.base.Signal;
import org.eclipse.ditto.model.base.entity.id.WithEntityId;
import org.eclipse.ditto.signals.base.WithOptionalEntity;
import org.eclipse.ditto.signals.commands.base.CommandResponse;
import org.eclipse.ditto.signals.commands.base.ErrorResponse;
Expand Down Expand Up @@ -647,21 +647,18 @@ public void registerForThingChanges(final String registrationId, final Consumer<
* @param protocolCommandAck the expected acknowledgement.
* @param futureToCompleteOrFailAfterAck the future to complete or fail after receiving the expected acknowledgement
* or not.
* @param adaptableToMessage function to convert an adaptable into a message.
* @return the subscription ID.
*/
protected AdaptableBus.SubscriptionId subscribe(
@Nullable final AdaptableBus.SubscriptionId previousSubscriptionId,
final Classification.StreamingType streamingType,
final String protocolCommand,
final String protocolCommandAck,
final CompletableFuture<Void> futureToCompleteOrFailAfterAck,
final Function<Adaptable, Optional<Message<?>>> adaptableToMessage) {
final CompletableFuture<Void> futureToCompleteOrFailAfterAck) {

return subscribeAndPublishMessage(previousSubscriptionId, streamingType, protocolCommand, protocolCommandAck,
futureToCompleteOrFailAfterAck, adaptable -> bus -> {
adaptableToMessage.apply(adaptable)
.ifPresent(message -> bus.notify(message.getSubject(), message));
asThingMessage(adaptable).ifPresent(message -> bus.notify(message.getSubject(), message));
});
}

Expand Down Expand Up @@ -753,23 +750,25 @@ protected void unsubscribe(@Nullable final AdaptableBus.SubscriptionId subscript
* @param adaptable from which the things {@link Message} shall be build from.
* @return empty if the adaptable doesn't provide a thingId, or the build {@link Message}.
*/
protected static Optional<Message<?>> asThingMessage(final Adaptable adaptable) {
private static Optional<Message<?>> asThingMessage(final Adaptable adaptable) {
final Signal<?> signal = PROTOCOL_ADAPTER.fromAdaptable(adaptable);
if (signal instanceof WithThingId) {
final ThingId thingId = ((WithThingId) signal).getThingEntityId();
final Optional<ThingId> thingIdOptional = WithEntityId.getEntityIdOfType(ThingId.class, signal);
final Message<?> message;
if (thingIdOptional.isPresent()) {
final ThingId thingId = thingIdOptional.get();
final MessageHeaders messageHeaders = MessageHeaders
.newBuilder(MessageDirection.FROM, thingId, signal.getType())
.correlationId(signal.getDittoHeaders().getCorrelationId().orElse(null))
.build();
final Message<Object> message = Message.newBuilder(messageHeaders)
message = Message.newBuilder(messageHeaders)
.payload(signal)
.extra(adaptable.getPayload().getExtra().orElse(null))
.build();
return Optional.of(message);
} else {
LOGGER.warn("Cannot build ThingMessage out of Signal without an ThingId: <{}>", signal);
return Optional.empty();
message = null;
}
return Optional.ofNullable(message);
}

private static void adjoin(final CompletionStage<?> stage, final CompletableFuture<Void> future) {
Expand All @@ -794,7 +793,8 @@ private static void assertThatThingHasId(final Thing thing) {
}

private CompletionStage<List<Thing>> sendRetrieveThingsMessage(final RetrieveThings command) {
return sendSignalAndExpectResponse(command, RetrieveThingsResponse.class, RetrieveThingsResponse::getThings, ErrorResponse.class,
return sendSignalAndExpectResponse(command, RetrieveThingsResponse.class, RetrieveThingsResponse::getThings,
ErrorResponse.class,
ErrorResponse::getDittoRuntimeException);
}

Expand Down

0 comments on commit a0a9707

Please sign in to comment.