Skip to content

Commit

Permalink
also notify client actor about failure if mqtt client is not reconnec…
Browse files Browse the repository at this point in the history
…ting

Signed-off-by: Dominik Guggemos <dominik.guggemos@bosch.io>
  • Loading branch information
dguggemos committed Aug 9, 2022
1 parent 8201c19 commit 553226d
Show file tree
Hide file tree
Showing 3 changed files with 17 additions and 16 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
/*
* Copyright text:
* Copyright (c) 2022 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
Expand All @@ -11,12 +10,10 @@
*
* SPDX-License-Identifier: EPL-2.0
*/

package org.eclipse.ditto.connectivity.service.messaging;

/*
/**
* Reports a connection error with the given {@code cause}. The receiver should update its own status but should not
* initiate reconnection as it is handled by the sender.
*/
public record ReportConnectionStatusError(Throwable cause) {

}
public record ReportConnectionStatusError(Throwable cause) {}
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
/*
* Copyright text:
* Copyright (c) 2022 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
Expand All @@ -11,9 +10,9 @@
*
* SPDX-License-Identifier: EPL-2.0
*/

package org.eclipse.ditto.connectivity.service.messaging;

// A placeholder indicating ConnectivityStatus.OPEN
public record ReportConnectionStatusSuccess() {
}
/**
* Reports a successful connection. The receiver should update its own status to e.g. {@code ConnectionStatus#OPEN}.
*/
public record ReportConnectionStatusSuccess() {}
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.eclipse.ditto.connectivity.service.messaging.backoff.RetryTimeoutStrategy;
import org.eclipse.ditto.connectivity.service.messaging.internal.ClientConnected;
import org.eclipse.ditto.connectivity.service.messaging.internal.ClientDisconnected;
import org.eclipse.ditto.connectivity.service.messaging.internal.ConnectionFailure;
import org.eclipse.ditto.connectivity.service.messaging.mqtt.MqttSpecificConfig;
import org.eclipse.ditto.connectivity.service.messaging.mqtt.hivemq.client.ClientRole;
import org.eclipse.ditto.connectivity.service.messaging.mqtt.hivemq.client.GenericMqttClient;
Expand Down Expand Up @@ -303,6 +304,8 @@ private GenericMqttClientDisconnectedListener getClientDisconnectedListener() {
*/
logger.info("Initial connect of client <{}> failed. Disabling automatic reconnect.", clientId);
mqttClientReconnector.reconnect(false);
getSelf().tell(ConnectionFailure.of(null, context.getCause(), "MQTT client got disconnected."),
ActorRef.noSender());
} else {
final var mqttDisconnectSource = context.getSource();
final var reconnect = isReconnect();
Expand All @@ -313,10 +316,13 @@ private GenericMqttClientDisconnectedListener getClientDisconnectedListener() {
clientId,
retryTimeoutStrategy.getCurrentTries(),
reconnectDelay);

// This is sent because the status of the client isn't made explicit to the user.
getSelf().tell(new ReportConnectionStatusError(context.getCause()), ActorRef.noSender());
} else {
logger.info("Not reconnecting client <{}>.", clientId);
getSelf().tell(ConnectionFailure.of(null, context.getCause(), "MQTT client got disconnected."),
ActorRef.noSender());
}
mqttClientReconnector.delay(reconnectDelay.toMillis(), TimeUnit.MILLISECONDS);
mqttClientReconnector.reconnect(reconnect);
Expand Down Expand Up @@ -344,16 +350,15 @@ private Duration getReconnectDelay(final RetryTimeoutStrategy retryTimeoutStrate
final var retryTimeoutReconnectDelay = retryTimeoutStrategy.getNextTimeout();
if (MqttDisconnectSource.SERVER == mqttDisconnectSource) {
// wait at least the configured duration for server initiated disconnect to not overload the server with reconnect attempts
final var reconnectDelayForBrokerInitiatedDisconnect =
mqttConfig.getReconnectMinTimeoutForMqttBrokerInitiatedDisconnect();
result = max(retryTimeoutReconnectDelay, reconnectDelayForBrokerInitiatedDisconnect);
result = getMaxDuration(retryTimeoutReconnectDelay,
mqttConfig.getReconnectMinTimeoutForMqttBrokerInitiatedDisconnect());
} else {
result = retryTimeoutReconnectDelay;
}
return result;
}

private static Duration max(Duration d1, Duration d2) {
private static Duration getMaxDuration(final Duration d1, final Duration d2) {
return d1.compareTo(d2) >= 0 ? d1 : d2;
}

Expand Down

0 comments on commit 553226d

Please sign in to comment.