Skip to content

Commit

Permalink
Issue #ditto/757: un-deprecate 2 methods in MessageSender - they are …
Browse files Browse the repository at this point in the history
…actually used to send responses, which have no response of their own anyway.

Signed-off-by: Yufei Cai <yufei.cai@bosch.io>
  • Loading branch information
yufei-cai committed Aug 20, 2020
1 parent 1cb260b commit 83ac86e
Show file tree
Hide file tree
Showing 5 changed files with 67 additions and 90 deletions.
Expand Up @@ -42,7 +42,7 @@
public final class HandlerRegistry<T extends ThingHandle<F>, F extends FeatureHandle> {

private final PointerBus bus;
private final ConcurrentHashMap<String, Registration<Consumer<PointerWithData>>> registry;
private final Map<String, Registration<Consumer<PointerWithData>>> registry;
private final Map<ThingId, T> thingHandles;
private final Map<String, F> featureHandles;

Expand Down
Expand Up @@ -133,7 +133,7 @@ private static <U> Consumer<Message<U>> responsePublisher(

private static Consumer<Acknowledgement> acknowledgementPublisher(final ProtocolAdapter protocolAdapter,
final MessagingProvider messagingProvider) {
return ack -> messagingProvider.emitAdaptable(protocolAdapter.toAdaptable(ack));
return ack -> messagingProvider.emitAdaptable(protocolAdapter.toAdaptable((Signal<?>) ack));
}

private static <T> Message<T> eventToMessage(final PointerWithData e, final Class<T> type, final boolean
Expand Down
Expand Up @@ -56,26 +56,24 @@ public interface MessageSender<T> {

/**
* Sets the message as being sent <em>FROM</em> a {@code Thing} (or a Thing's {@code Feature}).
* No response is expected for the message.
*
* @param sendConsumer a Consumer which will at the end of the builder when invoking {@code send()} be called with
* the built Message.
* @return fluent api builder that provides the functionality to set the id of the Thing from which the Message will
* be sent.
* @deprecated since 1.2.0 - use {@code from(BiConsumer<Message<T>, ResponseConsumer<?>)} instead.
*/
@Deprecated
SetThingId<T> from(Consumer<Message<T>> sendConsumer);

/**
* Sets the message as being sent <em>TO</em> a {@code Thing} (or a Thing's {@code Feature}).
* No response is expected for the message.
*
* @param sendConsumer a Consumer which will at the end of the builder when invoking {@code send()} be called with
* the built Message.
* @return fluent api builder that provides the functionality to set the id of the Thing to which the Message will
* be sent.
* @deprecated since 1.2.0 - use {@code to(BiConsumer<Message<T>, ResponseConsumer<?>)} instead.
*/
@Deprecated
SetThingId<T> to(Consumer<Message<T>> sendConsumer);

/**
Expand Down
Expand Up @@ -21,7 +21,6 @@
import java.util.function.BiConsumer;
import java.util.function.Consumer;

import javax.annotation.Nonnull;
import javax.annotation.concurrent.Immutable;

import org.eclipse.ditto.client.ack.ResponseConsumer;
Expand All @@ -33,15 +32,10 @@
import org.eclipse.ditto.model.messages.MessageDirection;
import org.eclipse.ditto.model.messages.MessageHeaders;
import org.eclipse.ditto.model.messages.MessageHeadersBuilder;
import org.eclipse.ditto.model.messages.MessageResponseConsumer;
import org.eclipse.ditto.model.messages.MessagesModelFactory;
import org.eclipse.ditto.model.things.ThingId;
import org.eclipse.ditto.signals.acks.base.Acknowledgements;
import org.eclipse.ditto.signals.commands.messages.MessageCommandResponse;
import org.eclipse.ditto.signals.commands.messages.SendClaimMessageResponse;
import org.eclipse.ditto.signals.commands.messages.SendFeatureMessageResponse;
import org.eclipse.ditto.signals.commands.messages.SendMessageAcceptedResponse;
import org.eclipse.ditto.signals.commands.messages.SendThingMessageResponse;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -108,15 +102,14 @@ public SetThingId<T> to(final BiConsumer<Message<T>, ResponseConsumer<?>> sendCo
return new SetThingIdImpl();
}

// TODO: test backward compatibility
@Override
public SetThingId<T> from(final Consumer<Message<T>> sendConsumer) {
return from(adjustSendConsumerForBackwardCompatibility(sendConsumer));
return from(ignoreResponse(sendConsumer));
}

@Override
public SetThingId<T> to(final Consumer<Message<T>> sendConsumer) {
return to(adjustSendConsumerForBackwardCompatibility(sendConsumer));
return to(ignoreResponse(sendConsumer));
}

private void buildAndSendMessage(final T payload) {
Expand Down Expand Up @@ -162,75 +155,10 @@ private void buildAndSendMessage(final T payload, final ResponseConsumer<?> resp
sendConsumer.accept(messageBuilder.build(), responseConsumer);
}

/* BEGIN Backward compatibility block */
private BiConsumer<Message<T>, ResponseConsumer<?>> adjustSendConsumerForBackwardCompatibility(
final Consumer<Message<T>> sendConsumer) {
return (message, responseConsumer) ->
sendConsumer.accept(combineForBackwardCompatibility(message, responseConsumer));
private BiConsumer<Message<T>, ResponseConsumer<?>> ignoreResponse(final Consumer<Message<T>> sendConsumer) {
return (message, responseConsumer) -> sendConsumer.accept(message);
}

@SuppressWarnings("deprecation")
private Message<T> combineForBackwardCompatibility(final Message<T> message,
final ResponseConsumer<?> responseConsumer) {
return MessagesModelFactory.<T>newMessageBuilder(message.getHeaders())
.payload(message.getPayload().orElse(null))
.rawPayload(message.getRawPayload().orElse(null))
.extra(message.getExtra().orElse(null))
.responseConsumer(convertToMessageResponseConsumerForBackwardCompatibility(responseConsumer))
.build();
}

@SuppressWarnings("deprecation")
private MessageResponseConsumer<?> convertToMessageResponseConsumerForBackwardCompatibility(
final ResponseConsumer<?> responseConsumer) {
return new MessageResponseConsumer<Object>() {
@Nonnull
@Override
public Class<Object> getResponseType() {
return Object.class;
}

@Override
@Nonnull
public BiConsumer<Message<Object>, Throwable> getResponseConsumer() {
return (message, error) -> {
if (message != null) {
acceptWrappedForBackwardCompatibility(message, responseConsumer);
} else {
responseConsumer.getResponseConsumer().accept(null, error);
}
};
}

};
}

@SuppressWarnings("ConstantConditions")
private void acceptWrappedForBackwardCompatibility(final Message<?> message,
final ResponseConsumer<?> responseConsumer) {

final Class<?> clazz = responseConsumer.getResponseType();
if (clazz.isAssignableFrom(SendClaimMessageResponse.class)) {
responseConsumer.accept(SendClaimMessageResponse.of(message.getThingEntityId(), message,
message.getStatusCode().orElse(null), message.getHeaders()));
} else if (clazz.isAssignableFrom(SendFeatureMessageResponse.class)) {
responseConsumer.accept(
SendFeatureMessageResponse.of(message.getThingEntityId(), message.getFeatureId().orElse(null),
message, message.getStatusCode().orElse(null), message.getHeaders()));
} else if (clazz.isAssignableFrom(SendMessageAcceptedResponse.class)) {
responseConsumer.accept(
SendFeatureMessageResponse.of(message.getThingEntityId(), message.getFeatureId().orElse(null),
message, message.getStatusCode().orElse(null), message.getHeaders()));
} else if (clazz.isAssignableFrom(SendThingMessageResponse.class)) {
responseConsumer.accept(SendThingMessageResponse.of(message.getThingEntityId(), message,
message.getStatusCode().orElse(null), message.getHeaders()));
} else {
// throw ClassCastException if response consumer does not expect a message.
responseConsumer.accept(message);
}
}
/* END Backward compatibility block */

private static <T> ResponseConsumer<?> createMessageCommandResponseConsumer(final Class<T> clazz,
final BiConsumer<Message<T>, Throwable> responseMessageHandler) {
return new ResponseConsumerImpl<>(MessageCommandResponse.class, (response, error) -> {
Expand Down
Expand Up @@ -16,6 +16,7 @@
import static org.eclipse.ditto.client.TestConstants.Policy.POLICY_ID;
import static org.eclipse.ditto.client.TestConstants.Thing.THING_ID;
import static org.eclipse.ditto.model.base.acks.AcknowledgementRequest.parseAcknowledgementRequest;
import static org.eclipse.ditto.model.base.acks.DittoAcknowledgementLabel.LIVE_RESPONSE;

import java.util.UUID;
import java.util.concurrent.CompletableFuture;
Expand Down Expand Up @@ -225,6 +226,49 @@ public void testFeatureMessageAcknowledgement() {
testMessageAcknowledgement(client.live().forId(THING_ID).forFeature(FEATURE_ID), featureMessage());
}

@Test
public void testFeatureMessageResponseAndAcknowledgement() {
assertEventualCompletion(startConsumption());
final CompletableFuture<Void> messageReplyFuture = new CompletableFuture<>();
client.live().forId(THING_ID).forFeature(FEATURE_ID)
.registerForMessage("Ackermann", "request", String.class, msg ->
msg.handleAcknowledgementRequests(handles -> {
try {
handles.forEach(handle -> {
if (LIVE_RESPONSE.equals(handle.getAcknowledgementLabel())) {
msg.reply().statusCode(HttpStatusCode.OK).payload("response").send();
} else {
handle.acknowledge(HttpStatusCode.forInt(
Integer.parseInt(handle.getAcknowledgementLabel().toString()))
.orElse(HttpStatusCode.EXPECTATION_FAILED));
}
});
messageReplyFuture.complete(null);
} catch (final Throwable error) {
messageReplyFuture.completeExceptionally(error);
}
}));

final Signal<?> featureMessage = (((Signal<?>) featureMessage()).setDittoHeaders(DittoHeaders.newBuilder()
.channel(TopicPath.Channel.LIVE.getName())
.correlationId("correlation-id")
.acknowledgementRequest(
parseAcknowledgementRequest("live-response"),
parseAcknowledgementRequest("100"),
parseAcknowledgementRequest("301"),
parseAcknowledgementRequest("403")
)
.build()
));
reply(featureMessage);
messageReplyFuture.join();

assertThat(expectMsgClass(SendFeatureMessageResponse.class).getStatusCode()).isEqualTo(HttpStatusCode.OK);
assertThat(expectMsgClass(Acknowledgement.class).getStatusCode()).isEqualTo(HttpStatusCode.CONTINUE);
assertThat(expectMsgClass(Acknowledgement.class).getStatusCode()).isEqualTo(HttpStatusCode.MOVED_PERMANENTLY);
assertThat(expectMsgClass(Acknowledgement.class).getStatusCode()).isEqualTo(HttpStatusCode.FORBIDDEN);
}

@Test
public void testThingCommandAcknowledgement() {
startConsumption();
Expand Down Expand Up @@ -257,14 +301,20 @@ public void testThingCommandAcknowledgement() {

private void testMessageAcknowledgement(final MessageRegistration registration, final Signal<?> message) {
assertEventualCompletion(startConsumption());
registration.registerForMessage("Ackermann", "request", String.class, msg ->
msg.handleAcknowledgementRequests(handles ->
handles.forEach(handle -> handle.acknowledge(
HttpStatusCode.forInt(Integer.parseInt(handle.getAcknowledgementLabel().toString()))
.orElse(HttpStatusCode.EXPECTATION_FAILED)
))
)
);
final CompletableFuture<Void> messageHandlingFuture = new CompletableFuture<>();
registration.registerForMessage("Ackermann", "request", String.class, msg -> {
msg.handleAcknowledgementRequests(handles -> {
try {
handles.forEach(handle -> handle.acknowledge(
HttpStatusCode.forInt(Integer.parseInt(handle.getAcknowledgementLabel().toString()))
.orElse(HttpStatusCode.EXPECTATION_FAILED)
));
messageHandlingFuture.complete(null);
} catch (final Throwable error) {
messageHandlingFuture.completeExceptionally(error);
}
});
});

reply(message.setDittoHeaders(DittoHeaders.newBuilder()
.channel(TopicPath.Channel.LIVE.getName())
Expand All @@ -275,6 +325,7 @@ private void testMessageAcknowledgement(final MessageRegistration registration,
)
.build()
));
messageHandlingFuture.join();

assertThat(expectMsgClass(Acknowledgement.class).getStatusCode()).isEqualTo(HttpStatusCode.CONTINUE);
assertThat(expectMsgClass(Acknowledgement.class).getStatusCode()).isEqualTo(HttpStatusCode.MOVED_PERMANENTLY);
Expand Down

0 comments on commit 83ac86e

Please sign in to comment.