Skip to content

Commit

Permalink
Issue #757: extend message sending to handle acknowledgements.
Browse files Browse the repository at this point in the history
Signed-off-by: Yufei Cai <yufei.cai@bosch.io>
  • Loading branch information
yufei-cai committed Aug 15, 2020
1 parent dea26cb commit 300b524
Show file tree
Hide file tree
Showing 10 changed files with 315 additions and 178 deletions.
@@ -0,0 +1,62 @@
/*
* Copyright (c) 2020 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0
*
* SPDX-License-Identifier: EPL-2.0
*/
package org.eclipse.ditto.client.ack;

import java.util.function.BiConsumer;

/**
* Interface encapsulating a {@link java.util.function.BiConsumer} which is notified about responses with either the
* response of type {@link R} (if it was successful) or with an {@link Throwable} if there occurred an error.
* Does also hold the type of the expected Message response.
*
* @param <R> the type of the expected response.
*/
public interface ResponseConsumer<R> {

/**
* Returns the type of the expected response.
*
* @return the type of the expected response.
*/
Class<R> getResponseType();

/**
* The BiConsumer which is notified about responses with either
* the response of type {@link R} (if it was successful) or with an {@link Throwable} if there occurred an error.
*
* @return the BiConsumer notified about responses.
*/
BiConsumer<R, Throwable> getResponseConsumer();

/**
* Type-check the argument against the response type and call the response consumer with the right type or
* with an exception.
*
* @param argument the argument to consume.
*/
default void accept(final Object argument) {
if (getResponseType().isInstance(argument)) {
getResponseConsumer().accept(getResponseType().cast(argument), null);
} else if (argument != null) {
getResponseConsumer().accept(null, new ClassCastException(
"Expected: " + getResponseType().getCanonicalName() +
"; Actual: " + argument.getClass().getCanonicalName() +
" (" + argument + ")"
));
} else {
getResponseConsumer().accept(null, new NullPointerException(
"Expected: " + getResponseType().getCanonicalName() + "; Actual: null")
);
}
}
}
Expand Up @@ -235,6 +235,7 @@ public DeleteThing deleteThing(final ThingId thingId, final Option<?>... options

/**
* Build a command for creating a Policy.
*
* @param policy the policy to create.
* @param options options to be applied configuring behaviour of this method.
* @return The {@link CreatePolicy} command.
Expand Down Expand Up @@ -285,6 +286,7 @@ public ModifyPolicy updatePolicy(final Policy policy, final Option<?>... options

/**
* Builds a command to retrieve the policy with ID {@code policyId}.
*
* @param policyId the policy to retrieve.
* @return the {@link RetrievePolicy} command.
* @throws NullPointerException if the policyId is {@code null}.
Expand All @@ -296,6 +298,7 @@ public RetrievePolicy retrievePolicy(final PolicyId policyId) {

/**
* Builds a command to delete the policy with ID {@code policyId}.
*
* @param policyId the policy to delete.
* @param options options to be applied configuring behaviour of this method.
* @return the {@link DeletePolicy} command.
Expand All @@ -314,7 +317,8 @@ public ModifyAttribute setAttribute(final ThingId thingId,
return ModifyAttribute.of(thingId, path, value, buildDittoHeaders(true, options));
}

public ModifyAttributes setAttributes(final ThingId thingId, final JsonObject attributes, final Option<?>... options) {
public ModifyAttributes setAttributes(final ThingId thingId, final JsonObject attributes,
final Option<?>... options) {
return ModifyAttributes.of(thingId, ThingsModelFactory.newAttributes(attributes),
buildDittoHeaders(true, options));
}
Expand Down Expand Up @@ -493,7 +497,7 @@ public <T> Message<T> sendMessage(final MessageSerializerRegistry registry, fina
return builder;
}).orElseGet(() -> MessagesModelFactory.newMessageBuilder(messageHeaders));

message.getGenericResponseConsumer().ifPresent(messageBuilder::responseConsumer);
message.getResponseConsumer().ifPresent(messageBuilder::responseConsumer);
return messageBuilder.build();
}

Expand Down
Expand Up @@ -12,13 +12,11 @@
*/
package org.eclipse.ditto.client.live.internal;

import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.Optional;
import java.util.function.BiConsumer;

import javax.annotation.Nullable;

import org.eclipse.ditto.client.ack.ResponseConsumer;
import org.eclipse.ditto.client.internal.OutgoingMessageFactory;
import org.eclipse.ditto.client.internal.bus.Classification;
import org.eclipse.ditto.client.live.messages.MessageSender;
Expand All @@ -29,18 +27,15 @@
import org.eclipse.ditto.client.live.messages.internal.ImmutableMessageSender;
import org.eclipse.ditto.client.messaging.MessagingProvider;
import org.eclipse.ditto.model.messages.Message;
import org.eclipse.ditto.model.messages.ResponseConsumer;
import org.eclipse.ditto.model.things.ThingId;
import org.eclipse.ditto.protocoladapter.ProtocolAdapter;
import org.eclipse.ditto.signals.acks.base.Acknowledgements;
import org.eclipse.ditto.signals.base.Signal;
import org.eclipse.ditto.signals.commands.base.ErrorResponse;
import org.eclipse.ditto.signals.commands.messages.MessageCommand;
import org.eclipse.ditto.signals.commands.messages.MessageCommandResponse;
import org.slf4j.Logger;

final class PendingMessageImpl<T> implements PendingMessage<T> {

private static final Duration DEFAULT_TIMEOUT = Duration.ofMinutes(1L);

private final Logger logger;
private final OutgoingMessageFactory outgoingMessageFactory;
private final MessageSerializerRegistry messageSerializerRegistry;
Expand Down Expand Up @@ -101,79 +96,14 @@ private PendingMessageImpl<T> getSelf() {
return this;
}

@SuppressWarnings("unchecked")
private static void typeCheckAndConsume(final ResponseConsumer<?, ?> responseConsumer,
final Signal<?> response) {

final BiConsumer uncheckedResponseConsumer = responseConsumer.getResponseConsumer();
final Class<?> responseType = responseConsumer.getResponseType();

private static void typeCheckAndConsume(final ResponseConsumer<?> responseConsumer, final Signal<?> response) {
try {
// throw ClassCastException if response has incorrect type
final Object responseMessage;
if (response instanceof MessageCommand) {
responseMessage = ((MessageCommand) response).getMessage();
} else if (response instanceof MessageCommandResponse) {
responseMessage = ((MessageCommandResponse) response).getMessage();
} else if (response instanceof Acknowledgements) {
responseMessage = response;
} else if (response instanceof ErrorResponse) {
uncheckedResponseConsumer.accept(null, ((ErrorResponse<?>) response).getDittoRuntimeException());
return;
} else {
uncheckedResponseConsumer.accept(null, classCastException(responseType, response));
return;
}

if (responseMessage instanceof Message) {
consumeMessageResponse((Message<?>) responseMessage, responseConsumer);
} else {
if (responseConsumer.getResponseType().isInstance(responseMessage)) {
uncheckedResponseConsumer.accept(responseMessage, null);
} else {
// response has unexpected type
uncheckedResponseConsumer.accept(null, classCastException(responseType, responseMessage));
}
}
} catch (final RuntimeException e) {
uncheckedResponseConsumer.accept(null, e);
}

}

@SuppressWarnings("unchecked")
private static void consumeMessageResponse(final Message<?> responseMessage,
final ResponseConsumer<?, ?> responseConsumer) {
final BiConsumer uncheckedResponseConsumer = responseConsumer.getResponseConsumer();
final Class<?> responseType = responseConsumer.getResponseType();
if (responseType.isAssignableFrom(ByteBuffer.class)) {
uncheckedResponseConsumer.accept(asByteBufferMessage(responseMessage), null);
} else {
final Optional<?> payloadOptional = responseMessage.getPayload();
if (payloadOptional.isPresent()) {
final Object payload = payloadOptional.get();
if (responseType.isInstance(payload)) {
uncheckedResponseConsumer.accept(setMessagePayload(responseMessage, payload), null);
} else {
// response has unexpected type
uncheckedResponseConsumer.accept(setMessagePayload(responseMessage, null),
classCastException(responseType, payload));
}
} else {
// response has no payload; regard it as any message type
uncheckedResponseConsumer.accept(responseMessage, null);
}
responseConsumer.accept(response);
} catch (final Throwable e) {
responseConsumer.getResponseConsumer().accept(null, e);
}
}

private static ClassCastException classCastException(final Class<?> expectedClass, final Object actual) {
return new ClassCastException(
"Expected: " + expectedClass.getCanonicalName() +
"; Actual: " + actual.getClass().getCanonicalName() +
" (" + actual + ")"
);
}

@Override
public MessageSender.SetFeatureIdOrSubject<T> from(final ThingId thingId) {
return ImmutableMessageSender.<T>newInstance()
Expand All @@ -188,40 +118,28 @@ public MessageSender.SetFeatureIdOrSubject<T> to(final ThingId thingId) {
.thingId(thingId);
}

private void sendMessage(final Message<T> message) {
private void sendMessage(final Message<T> message, @Nullable final ResponseConsumer<?> responseConsumer) {
final Message<?> toBeSentMessage =
outgoingMessageFactory.sendMessage(messageSerializerRegistry, message);
final String correlationId = toBeSentMessage.getCorrelationId().orElse(null);
logger.trace("Message about to send: {}", toBeSentMessage);
message.getGenericResponseConsumer().ifPresent(consumer ->
messagingProvider.getAdaptableBus().subscribeOnceForAdaptable(
Classification.forCorrelationId(correlationId),
Duration.ofSeconds(60)
).handle((responseAdaptable, error) -> {
typeCheckAndConsume(consumer, protocolAdapter.fromAdaptable(responseAdaptable));
return null;
})
);
if (responseConsumer != null) {
toBeSentMessage.getCorrelationId().ifPresent(correlationId ->
messagingProvider.getAdaptableBus().subscribeOnceForAdaptable(
Classification.forCorrelationId(correlationId),
getCallbackTTL(message)
).handle((responseAdaptable, error) -> {
typeCheckAndConsume(responseConsumer, protocolAdapter.fromAdaptable(responseAdaptable));
return null;
})
);
}
messagingProvider.emitAdaptable(
LiveMessagesUtil.constructAdaptableFromMessage(toBeSentMessage, protocolAdapter));
}

private static Message<ByteBuffer> asByteBufferMessage(final Message<?> message) {
final ByteBuffer byteBufferPayload = message.getRawPayload()
.orElseGet(() -> message.getPayload()
.map(object -> ByteBuffer.wrap(object.toString().getBytes()))
.orElse(ByteBuffer.allocate(0))
);
return setMessagePayload(message, byteBufferPayload);
}

private static <S, T> Message<T> setMessagePayload(final Message<S> message, @Nullable final T payload) {
return Message.<T>newBuilder(message.getHeaders())
.payload(payload)
.rawPayload(message.getRawPayload().orElse(null))
.extra(message.getExtra().orElse(null))
.responseConsumer(message.getResponseConsumer().orElse(null))
.build();
private static Duration getCallbackTTL(final Message<?> message) {
// set handler timeout to some time after actual timeout to account for delay and latency in both directions.
return message.getTimeout().orElse(DEFAULT_TIMEOUT).plus(Duration.ofSeconds(10L));
}

}
Expand Up @@ -18,6 +18,7 @@
import java.util.function.BiConsumer;
import java.util.function.Consumer;

import org.eclipse.ditto.client.ack.ResponseConsumer;
import org.eclipse.ditto.model.base.common.HttpStatusCode;
import org.eclipse.ditto.model.base.headers.DittoHeaders;
import org.eclipse.ditto.model.messages.Message;
Expand All @@ -33,14 +34,36 @@
*/
public interface MessageSender<T> {

/**
* Sets the message as being sent <em>FROM</em> a {@code Thing} (or a Thing's {@code Feature}).
*
* @param sendConsumer a Consumer which will at the end of the builder when invoking {@code send()} be called with
* the built Message and any callback for the message response.
* @return fluent api builder that provides the functionality to set the id of the Thing from which the Message will
* be sent.
*/
SetThingId<T> from(BiConsumer<Message<T>, ResponseConsumer<?>> sendConsumer);

/**
* Sets the message as being sent <em>TO</em> a {@code Thing} (or a Thing's {@code Feature}).
*
* @param sendConsumer a Consumer which will at the end of the builder when invoking {@code send()} be called with
* the built Message and any callback of the response.
* @return fluent api builder that provides the functionality to set the id of the Thing to which the Message will
* be sent.
*/
SetThingId<T> to(BiConsumer<Message<T>, ResponseConsumer<?>> sendConsumer);

/**
* Sets the message as being sent <em>FROM</em> a {@code Thing} (or a Thing's {@code Feature}).
*
* @param sendConsumer a Consumer which will at the end of the builder when invoking {@code send()} be called with
* the built Message.
* @return fluent api builder that provides the functionality to set the id of the Thing from which the Message will
* be sent.
* @deprecated since 1.2.0 - use {@code from(BiConsumer<Message<T>, ResponseConsumer<?>)} instead.
*/
@Deprecated
SetThingId<T> from(Consumer<Message<T>> sendConsumer);

/**
Expand All @@ -50,7 +73,9 @@ public interface MessageSender<T> {
* the built Message.
* @return fluent api builder that provides the functionality to set the id of the Thing to which the Message will
* be sent.
* @deprecated since 1.2.0 - use {@code to(BiConsumer<Message<T>, ResponseConsumer<?>)} instead.
*/
@Deprecated
SetThingId<T> to(Consumer<Message<T>> sendConsumer);

/**
Expand Down
Expand Up @@ -33,7 +33,7 @@
import org.eclipse.ditto.model.messages.MessageDirection;
import org.eclipse.ditto.model.messages.MessageHeaders;
import org.eclipse.ditto.model.messages.MessageResponseConsumer;
import org.eclipse.ditto.model.messages.ResponseConsumer;
import org.eclipse.ditto.client.ack.ResponseConsumer;
import org.eclipse.ditto.model.things.ThingId;

/**
Expand Down Expand Up @@ -167,11 +167,6 @@ public Optional<MessageResponseConsumer<?>> getResponseConsumer() {
return delegateMessage.getResponseConsumer();
}

@Override
public Optional<ResponseConsumer<?, ?>> getGenericResponseConsumer() {
return delegateMessage.getGenericResponseConsumer();
}

@Override
public Optional<Duration> getTimeout() {
return delegateMessage.getTimeout();
Expand Down

0 comments on commit 300b524

Please sign in to comment.