Skip to content

Commit

Permalink
Adds a ClientReconnectingException which is thrown if the client is a…
Browse files Browse the repository at this point in the history
…ttempting a reconnect while a message should be sent.

Prior the sender of a message only got clues about dropped messages due to reconnect in the clients logs, but couldn't handle this case in code.

The ClientReconnectingException can be handled by the sender and an optional buffering/ retrying strategy can be implemented.
In methods returning a CompletionStage the stage will complete exceptionally conatining the error, in other methods the exception is thrown and has to be catched for custom handling.

Co-authored-by: Kalin Kostashki <kalin.kostashki@bosch.io>
Signed-off-by: David Schwilk <david.schwilk@bosch.io>
  • Loading branch information
DerSchwilk and Kalin Kostashki committed Sep 30, 2022
1 parent 38c77fd commit 1f1e1f6
Show file tree
Hide file tree
Showing 17 changed files with 349 additions and 115 deletions.
4 changes: 3 additions & 1 deletion java/src/main/java/org/eclipse/ditto/client/DittoClient.java
Expand Up @@ -52,7 +52,9 @@ public interface DittoClient {
* Directly sends a Ditto Protocol {@link Adaptable} message to the established Ditto backend connection.
*
* @param dittoProtocolAdaptable the adaptable to send
* @return a CompletionStage containing the correlated response to the sent {@code dittoProtocolAdaptable}
* @return a CompletionStage containing the correlated response to the sent {@code dittoProtocolAdaptable} or
* which failed with a {@link org.eclipse.ditto.client.management.ClientReconnectingException} if the client is
* in a reconnecting state.
* @throws IllegalStateException when no twin/live connection was configured for this client
*/
CompletionStage<Adaptable> sendDittoProtocol(Adaptable dittoProtocolAdaptable);
Expand Down
Expand Up @@ -18,6 +18,7 @@
import java.time.Duration;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.function.Function;
import java.util.function.Predicate;
Expand All @@ -40,6 +41,7 @@
import org.eclipse.ditto.client.ack.internal.AcknowledgementRequestsValidator;
import org.eclipse.ditto.client.internal.bus.Classification;
import org.eclipse.ditto.client.management.AcknowledgementsFailedException;
import org.eclipse.ditto.client.management.ClientReconnectingException;
import org.eclipse.ditto.client.messaging.MessagingProvider;
import org.eclipse.ditto.json.JsonField;
import org.eclipse.ditto.json.JsonObject;
Expand Down Expand Up @@ -138,6 +140,8 @@ protected Signal signalFromAdaptable(final Adaptable adaptable) {
* @param <R> type of the result.
* @return future of the result if the expected response arrives or a failed future on error.
* Type is {@code CompletionStage} to signify that the future will complete or fail without caller intervention.
* If the client is reconnecting while this method is called the future fails with a a
* {@link ClientReconnectingException}.
*/
protected <T extends PolicyCommand<T>, S extends PolicyCommandResponse<?>, R> CompletionStage<R> askPolicyCommand(
final T command,
Expand All @@ -159,11 +163,14 @@ protected <T extends PolicyCommand<T>, S extends PolicyCommandResponse<?>, R> Co
* @param <R> type of the result.
* @return future of the result if the expected response arrives or a failed future on error.
* Type is {@code CompletionStage} to signify that the future will complete or fail without caller intervention.
* If the client is reconnecting while this method is called the future fails with a a
* {@link ClientReconnectingException}.
*/
protected <T extends ThingCommand<T>, S extends CommandResponse<?>, R> CompletionStage<R> askThingCommand(
final T command,
final Class<S> expectedResponse,
final Function<S, R> onSuccess) {

final ThingCommand<?> commandWithChannel = validateAckRequests(setChannel(command, channel));
return sendSignalAndExpectResponse(commandWithChannel, expectedResponse, onSuccess, ErrorResponse.class,
ErrorResponse::getDittoRuntimeException);
Expand All @@ -180,33 +187,42 @@ protected <T extends ThingCommand<T>, S extends CommandResponse<?>, R> Completio
* @param <S> type of the expected success response.
* @param <E> type of the expected error response.
* @param <R> type of the result.
* @return future of the result.
* @return future of the result. The future can be exceptional with a {@link ClientReconnectingException} if the
* client is reconnecting while this method is called.
*/
protected <S, E, R> CompletionStage<R> sendSignalAndExpectResponse(final Signal<?> signal,
final Class<S> expectedResponseClass,
final Function<S, R> onSuccess,
final Class<E> expectedErrorResponseClass,
final Function<E, ? extends RuntimeException> onError) {

final CompletionStage<Adaptable> responseFuture = messagingProvider.getAdaptableBus()
.subscribeOnceForAdaptable(Classification.forCorrelationId(signal), getTimeout());

messagingProvider.emit(signalToJsonString(signal));
return responseFuture.thenApply(responseAdaptable -> {
final Signal<?> response = signalFromAdaptable(responseAdaptable);
if (expectedErrorResponseClass.isInstance(response)) {
// extracted runtime exception will be wrapped in CompletionException.
throw onError.apply(expectedErrorResponseClass.cast(response));
} else if (response instanceof Acknowledgements) {
final CommandResponse<?> commandResponse =
extractCommandResponseFromAcknowledgements(signal, (Acknowledgements) response);
return onSuccess.apply(expectedResponseClass.cast(commandResponse));
} else if (expectedResponseClass.isInstance(response)) {
return onSuccess.apply(expectedResponseClass.cast(response));
} else {
throw new ClassCastException("Expect " + expectedResponseClass.getSimpleName() + ", got: " + response);
}
});
try {
final CompletionStage<Adaptable> responseFuture = messagingProvider.getAdaptableBus()
.subscribeOnceForAdaptable(Classification.forCorrelationId(signal), getTimeout());

messagingProvider.emit(signalToJsonString(signal));
return responseFuture.thenApply(responseAdaptable -> {
final Signal<?> response = signalFromAdaptable(responseAdaptable);
if (expectedErrorResponseClass.isInstance(response)) {
// extracted runtime exception will be wrapped in CompletionException.
throw onError.apply(expectedErrorResponseClass.cast(response));
} else if (response instanceof Acknowledgements) {
final CommandResponse<?> commandResponse =
extractCommandResponseFromAcknowledgements(signal, (Acknowledgements) response);
return onSuccess.apply(expectedResponseClass.cast(commandResponse));
} else if (expectedResponseClass.isInstance(response)) {
return onSuccess.apply(expectedResponseClass.cast(response));
} else {
throw new ClassCastException(
"Expect " + expectedResponseClass.getSimpleName() + ", got: " + response);
}
});
} catch (final ClientReconnectingException cre) {
return CompletableFuture.supplyAsync(() -> {
throw cre;
});
}

}

/**
Expand Down
Expand Up @@ -163,7 +163,8 @@ public CompletionStage<Void> startConsumption(final Option<?>... consumptionOpti
* Starts the consumption of twin events / messages / live events and commands.
*
* @param consumptionConfig the configuration Map to apply for the consumption.
* @return a CompletionStage that terminates when the start operation was successful.
* @return a CompletionStage that terminates when the start operation was successful or fails if the client is in
* a reconnecting state
*/
protected abstract CompletionStage<Void> doStartConsumption(Map<String, String> consumptionConfig);

Expand Down Expand Up @@ -648,6 +649,7 @@ public void registerForThingChanges(final String registrationId, final Consumer<
* @param futureToCompleteOrFailAfterAck the future to complete or fail after receiving the expected acknowledgement
* or not.
* @return the subscription ID.
* @throws org.eclipse.ditto.client.management.ClientReconnectingException if the client client is reconnecting.
*/
protected AdaptableBus.SubscriptionId subscribe(
@Nullable final AdaptableBus.SubscriptionId previousSubscriptionId,
Expand Down Expand Up @@ -730,6 +732,7 @@ private static String appendCorrelationIdParameter(final String protocolCommand,
* @param protocolCommandAck the expected acknowledgement.
* @param futureToCompleteOrFailAfterAck the future to complete or fail after receiving the expected acknowledgement
* or not.
* @throws org.eclipse.ditto.client.management.ClientReconnectingException if the client client is reconnecting.
*/
protected void unsubscribe(@Nullable final AdaptableBus.SubscriptionId subscriptionId,
final String protocolCommand,
Expand Down
Expand Up @@ -234,7 +234,6 @@ private static PoliciesImpl configurePolicyClient(final MessagingProvider messag
final String busName = TopicPath.Channel.NONE.getName();
final PointerBus bus = BusFactory.createPointerBus(busName, messagingProvider.getExecutorService());
init(bus, messagingProvider);
final MessagingConfiguration messagingConfiguration = messagingProvider.getMessagingConfiguration();
final OutgoingMessageFactory messageFactory = getOutgoingMessageFactoryForPolicies(messagingProvider);
return PoliciesImpl.newInstance(messagingProvider, messageFactory, bus);
}
Expand Down
Expand Up @@ -41,6 +41,7 @@ public interface LiveCommandProcessor {
* Publish a signal.
*
* @param signal the signal to publish.
* @throws org.eclipse.ditto.client.management.ClientReconnectingException if the client is in a reconnecting state.
*/
void publishLiveSignal(Signal<?> signal);

Expand Down
Expand Up @@ -30,6 +30,7 @@ public interface EventEmitter<F extends EventFactory> {
*
* @param eventFunction Function providing a EventFactory and requiring a Event as result.
* @throws NullPointerException if {@code eventFunction} is {@code null}.
* @throws org.eclipse.ditto.client.management.ClientReconnectingException if the client client is reconnecting.
*/
void emitEvent(Function<F, Event<?>> eventFunction);

Expand Down
Expand Up @@ -50,6 +50,7 @@
import org.eclipse.ditto.client.live.messages.MessageSerializerRegistry;
import org.eclipse.ditto.client.live.messages.PendingMessage;
import org.eclipse.ditto.client.live.messages.RepliableMessage;
import org.eclipse.ditto.client.management.ClientReconnectingException;
import org.eclipse.ditto.client.messaging.MessagingProvider;
import org.eclipse.ditto.json.JsonKey;
import org.eclipse.ditto.messages.model.KnownMessageSubjects;
Expand Down Expand Up @@ -158,44 +159,58 @@ protected CompletionStage<Void> doStartConsumption(final Map<String, String> con
CompletableFuture.allOf(completableFutureEvents, completableFutureMessages,
completableFutureLiveCommands);

// register message handler which handles live events:
subscriptionIds.compute(Classification.StreamingType.LIVE_EVENT, (streamingType, previousSubscriptionId) -> {
final String subscriptionMessage = buildProtocolCommand(streamingType.start(), consumptionConfig);
messagingProvider.registerSubscriptionMessage(streamingType, subscriptionMessage);
return subscribe(previousSubscriptionId,
streamingType,
subscriptionMessage,
streamingType.startAck(),
completableFutureEvents
);
});

// register message handler which handles incoming messages:
subscriptionIds.compute(Classification.StreamingType.LIVE_MESSAGE, (streamingType, previousSubscriptionId) -> {
final String subscriptionMessage = buildProtocolCommand(streamingType.start(), consumptionConfig);
messagingProvider.registerSubscriptionMessage(streamingType, subscriptionMessage);
return subscribeAndPublishMessage(previousSubscriptionId,
streamingType,
subscriptionMessage,
streamingType.startAck(),
completableFutureMessages,
adaptable -> bus -> bus.notify(getPointerBusKey(adaptable), adaptableAsLiveMessage(adaptable)));
});

// register message handler which handles live commands:
subscriptionIds.compute(Classification.StreamingType.LIVE_COMMAND, (streamingType, previousSubscriptionId) -> {
final String subscriptionMessage = buildProtocolCommand(streamingType.start(), consumptionConfig);
messagingProvider.registerSubscriptionMessage(streamingType, subscriptionMessage);

return subscribeAndPublishMessage(previousSubscriptionId,
streamingType,
subscriptionMessage,
streamingType.startAck(),
completableFutureLiveCommands,
adaptable -> bus -> bus.getExecutor().submit(() -> handleLiveCommandOrResponse(adaptable))
);
});
return completableFutureCombined;
try {
// register message handler which handles live events:
subscriptionIds.compute(Classification.StreamingType.LIVE_EVENT,
(streamingType, previousSubscriptionId) -> {
final String subscriptionMessage =
buildProtocolCommand(streamingType.start(), consumptionConfig);
messagingProvider.registerSubscriptionMessage(streamingType, subscriptionMessage);
return subscribe(previousSubscriptionId,
streamingType,
subscriptionMessage,
streamingType.startAck(),
completableFutureEvents
);
});

// register message handler which handles incoming messages:
subscriptionIds.compute(Classification.StreamingType.LIVE_MESSAGE,
(streamingType, previousSubscriptionId) -> {
final String subscriptionMessage =
buildProtocolCommand(streamingType.start(), consumptionConfig);
messagingProvider.registerSubscriptionMessage(streamingType, subscriptionMessage);
return subscribeAndPublishMessage(previousSubscriptionId,
streamingType,
subscriptionMessage,
streamingType.startAck(),
completableFutureMessages,
adaptable -> bus -> bus.notify(getPointerBusKey(adaptable),
adaptableAsLiveMessage(adaptable)));
});

// register message handler which handles live commands:
subscriptionIds.compute(Classification.StreamingType.LIVE_COMMAND,
(streamingType, previousSubscriptionId) -> {
final String subscriptionMessage =
buildProtocolCommand(streamingType.start(), consumptionConfig);
messagingProvider.registerSubscriptionMessage(streamingType, subscriptionMessage);

return subscribeAndPublishMessage(previousSubscriptionId,
streamingType,
subscriptionMessage,
streamingType.startAck(),
completableFutureLiveCommands,
adaptable -> bus -> bus.getExecutor()
.submit(() -> handleLiveCommandOrResponse(adaptable))
);
});
return completableFutureCombined;
} catch (final ClientReconnectingException cre) {
return CompletableFuture.supplyAsync(() -> {
throw cre;
});
}
}

/*
Expand Down
Expand Up @@ -228,6 +228,8 @@ interface MessageSendable<T> {
* by its potential targets. </p>
*
* @throws IllegalStateException if the {@code Message} to be sent is in an invalid state.
* @throws org.eclipse.ditto.client.management.ClientReconnectingException if the client is in a reconnecting
* state.
*/
void send();

Expand All @@ -238,6 +240,8 @@ interface MessageSendable<T> {
* @param responseConsumer the Consumer which should be notified with the response ot the Throwable in case of
* an error.
* @throws IllegalStateException if the {@code Message} to be sent is in an invalid state.
* @throws org.eclipse.ditto.client.management.ClientReconnectingException if the client is in a reconnecting
* state.
*/
default void send(final BiConsumer<Message<ByteBuffer>, Throwable> responseConsumer) {
send(ByteBuffer.class, responseConsumer);
Expand All @@ -253,6 +257,8 @@ default void send(final BiConsumer<Message<ByteBuffer>, Throwable> responseConsu
* an error.
* @param <R> the type of the response message's payload.
* @throws IllegalStateException if the {@code Message} to be sent is in an invalid state.
* @throws org.eclipse.ditto.client.management.ClientReconnectingException if the client is in a reconnecting
* state.
* @since 1.0.0
*/
<R> void send(Class<R> responseType, BiConsumer<Message<R>, Throwable> responseConsumer);
Expand Down
@@ -0,0 +1,39 @@
/*
* Copyright (c) 2022 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0
*
* SPDX-License-Identifier: EPL-2.0
*/
package org.eclipse.ditto.client.management;

import javax.annotation.concurrent.Immutable;

/**
* This exception is thrown in the Ditto client if the client is in a reconnecting state and thus can't send messages
* to the backend.
*
* @since 3.0.0
*/
@Immutable
public class ClientReconnectingException extends RuntimeException {

private static final long serialVersionUID = -4578923424099138760L;

private static final String MESSAGE = "Message could not be sent, because the client is currently " +
"reconnecting.";

private ClientReconnectingException() {
super(MESSAGE);
}

public static ClientReconnectingException newInstance() {
return new ClientReconnectingException();
}

}

0 comments on commit 1f1e1f6

Please sign in to comment.