Skip to content

Commit

Permalink
Fixed behaviour of doDisconnectClient.
Browse files Browse the repository at this point in the history
It is crucial that `ClientDisconnected` gets emitted even if the client was disconnected previously and is not available anymore.

Signed-off-by: Juergen Fickel <juergen.fickel@bosch.io>
  • Loading branch information
Juergen Fickel committed Jun 1, 2022
1 parent 2253bd4 commit 87b8ca1
Show file tree
Hide file tree
Showing 4 changed files with 33 additions and 27 deletions.
Expand Up @@ -117,7 +117,7 @@ public Duration getReconnectForDeliveryDelay() {
/**
* Returns the delay how long to wait before reconnecting a consumer client for redelivery.
*
* @return the reconnect delay.
* @return the reconnect delay which is at least {@link ReconnectDelay#LOWER_BOUNDARY}.
*/
public ReconnectDelay getReconnectForDeliveryDelayNg() {
return ReconnectDelay.ofOrLowerBoundary(specificConfig.getDuration(RECONNECT_FOR_REDELIVERY_DELAY));
Expand Down
Expand Up @@ -22,7 +22,6 @@

/**
* Delay how long to wait before reconnecting an MQTT client.
* This delay is guaranteed
*/
@Immutable
public final class ReconnectDelay implements Comparable<ReconnectDelay> {
Expand Down
Expand Up @@ -13,6 +13,7 @@
package org.eclipse.ditto.connectivity.service.messaging.mqtt.hivemq;

import java.text.MessageFormat;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
Expand Down Expand Up @@ -160,9 +161,9 @@ private State<BaseClientState, BaseClientData> scheduleConsumerClientReconnect(
return stay();
}

private State<BaseClientState, BaseClientData> reconnectConsumerClient(
final Control control,
private State<BaseClientState, BaseClientData> reconnectConsumerClient(final Control control,
final BaseClientData baseClientData) {

if (null != genericMqttClient) {
enableAutomaticReconnect();
genericMqttClient.disconnectClientRole(ClientRole.CONSUMER);
Expand Down Expand Up @@ -199,7 +200,7 @@ protected CompletionStage<Status.Status> doTestConnection(final TestConnection t
connectionLogger.failure("Connection test failed: {0}", error.getMessage());
return CompletableFuture.completedFuture(new Status.Failure(error));
},
v1 -> v1
statusCompletionStage -> statusCompletionStage
);
}

Expand Down Expand Up @@ -318,17 +319,17 @@ private GenericMqttClientDisconnectedListener getClientDisconnectedListener() {
} else {
final var mqttDisconnectSource = context.getSource();
final var reconnect = isReconnect();
final var reconnectDelayMillis = getReconnectDelayMillis(retryTimeoutStrategy, mqttDisconnectSource);
final var reconnectDelay = getReconnectDelay(retryTimeoutStrategy, mqttDisconnectSource);
logger.info("Client <{}> disconnected by <{}>.", clientId, mqttDisconnectSource);
if (reconnect) {
logger.info("Reconnecting client <{}> with current retries of <{}> and a delay of <{}> ms.",
logger.info("Reconnecting client <{}> with current tries <{}> and a delay of <{}>.",
clientId,
retryTimeoutStrategy.getCurrentTries(),
reconnectDelayMillis);
reconnectDelay);
} else {
logger.info("Not reconnecting client <{}>.", clientId);
}
mqttClientReconnector.delay(reconnectDelayMillis, TimeUnit.MILLISECONDS);
mqttClientReconnector.delay(reconnectDelay.toMillis(), TimeUnit.MILLISECONDS);
mqttClientReconnector.reconnect(reconnect);
}
};
Expand All @@ -348,17 +349,21 @@ private boolean isReconnect() {
return connection.isFailoverEnabled() && automaticReconnect.get();
}

private long getReconnectDelayMillis(final RetryTimeoutStrategy retryTimeoutStrategy,
private Duration getReconnectDelay(final RetryTimeoutStrategy retryTimeoutStrategy,
final MqttDisconnectSource mqttDisconnectSource) {

final long result;
final var nextTimeout = retryTimeoutStrategy.getNextTimeout();
final Duration result;
final var retryTimeoutReconnectDelay = retryTimeoutStrategy.getNextTimeout();
if (MqttDisconnectSource.SERVER == mqttDisconnectSource) {
final var reconnectMinTimeoutForMqttBrokerInitiatedDisconnect =
final var reconnectDelayForBrokerInitiatedDisconnect =
mqttConfig.getReconnectMinTimeoutForMqttBrokerInitiatedDisconnect();
result = Math.max(nextTimeout.toMillis(), reconnectMinTimeoutForMqttBrokerInitiatedDisconnect.toMillis());
if (0 <= retryTimeoutReconnectDelay.compareTo(reconnectDelayForBrokerInitiatedDisconnect)) {
result = retryTimeoutReconnectDelay;
} else {
result = reconnectDelayForBrokerInitiatedDisconnect;
}
} else {
result = nextTimeout.toMillis();
result = retryTimeoutReconnectDelay;
}
return result;
}
Expand All @@ -377,16 +382,18 @@ protected void doDisconnectClient(final Connection connection,
@Nullable final ActorRef origin,
final boolean shutdownAfterDisconnect) {

if (null != genericMqttClient) {
disableAutomaticReconnect();
Patterns.pipe(
genericMqttClient.disconnect()
.thenApply(aVoid -> ClientDisconnected.of(origin, shutdownAfterDisconnect)),
getContextDispatcher()
).to(getSelf(), origin);
final CompletionStage<Void> disconnectFuture;
if (null == genericMqttClient) {
disconnectFuture = CompletableFuture.completedFuture(null);
} else {
logger.warning("Cannot disconnect generic MQTT client as it is not yet initialised.");
disableAutomaticReconnect();
disconnectFuture = genericMqttClient.disconnect();
}

Patterns.pipe(disconnectFuture
.handle((aVoid, throwable) -> ClientDisconnected.of(origin, shutdownAfterDisconnect)),
getContextDispatcher()
).to(getSelf(), origin);
}

@Nullable
Expand Down
Expand Up @@ -87,7 +87,7 @@ private CompletionStage<Void> connectSubscribingClient(final GenericMqttConnect
if (null == throwable) {
logger.debug("Connected subscribing client <{}>.", subscribingClient);
} else {
logger.warn("Failed to connect subscribing client <{}>.", subscribingClient, throwable);
logger.info("Failed to connect subscribing client <{}>.", subscribingClient, throwable);
}
});
}
Expand All @@ -98,7 +98,7 @@ private CompletionStage<Void> connectPublishingClient(final GenericMqttConnect g
if (null == throwable) {
logger.debug("Connected publishing client <{}>.", publishingClient);
} else {
logger.warn("Failed to connect publishing client <{}>. Disconnecting subscribing client …",
logger.info("Failed to connect publishing client <{}>. Disconnecting subscribing client …",
publishingClient,
throwable);
disconnectSubscribingClient();
Expand All @@ -112,7 +112,7 @@ private CompletableFuture<Void> disconnectSubscribingClient() {
if (null == throwable) {
logger.debug("Disconnected subscribing client <{}>.", subscribingClient);
} else {
logger.warn("Failed to disconnect subscribing client <{}>.", subscribingClient, throwable);
logger.info("Failed to disconnect subscribing client <{}>.", subscribingClient, throwable);
}
})
.toCompletableFuture();
Expand All @@ -139,7 +139,7 @@ private CompletableFuture<Void> disconnectPublishingClient() {
if (null == throwable) {
logger.debug("Disconnected publishing client <{}>.", publishingClient);
} else {
logger.warn("Failed to disconnect publishing client <{}>.", publishingClient);
logger.info("Failed to disconnect publishing client <{}>.", publishingClient);
}
})
.toCompletableFuture();
Expand Down

0 comments on commit 87b8ca1

Please sign in to comment.