Skip to content

Commit

Permalink
Display ConnectionStatus correctly when client is disconnecting and r…
Browse files Browse the repository at this point in the history
…etrying

 - added logic to getClientDisconnectedListener() to send ReportConnectionStatus with the disconnect cause. This will ensure that an open SSH tunnel with a disconnecting MQTT client will have a better representation >> SSH == OPEN and MQTT == Disconnecting
 - made sure to propagate ConnectivityStatus.OPEN when MQTT is connected

Signed-off-by: Kalin Kostashki <kalin.kostashki@bosch.io>
  • Loading branch information
Kalin Kostashki committed Jul 28, 2022
1 parent 679ae0a commit 9038269
Show file tree
Hide file tree
Showing 3 changed files with 61 additions and 28 deletions.
Expand Up @@ -698,7 +698,7 @@ private FSMStateFunctionBuilder<BaseClientState, BaseClientData> inDisconnectedS
*
* @return an FSM function builder
*/
protected FSMStateFunctionBuilder<BaseClientState, BaseClientData> inConnectingState() {
protected FSMStateFunctionBuilder<BaseClientState, BaseClientData> inConnectingState() {
return matchEventEquals(StateTimeout(), (event, data) -> connectionTimedOut(data))
.event(ConnectionFailure.class, this::connectingConnectionFailed)
.event(ClientConnected.class, this::clientConnectedInConnectingState)
Expand All @@ -723,9 +723,17 @@ protected FSMStateFunctionBuilder<BaseClientState, BaseClientData> inConnectedSt
.event(SshTunnelActor.TunnelClosed.class, this::tunnelClosed)
.event(OpenConnection.class, this::connectionAlreadyOpen)
.event(ConnectionFailure.class, this::connectedConnectionFailed)
.event(ReportConnectionStatus.class, this::updateConnectionStatusPartially)
.eventEquals(Control.RESUBSCRIBE, this::resubscribe);
}

private State<BaseClientState, BaseClientData> updateConnectionStatusPartially(
final ReportConnectionStatus reportConnectionStatus,
final BaseClientData baseClientData) {
BaseClientData nextClientData = baseClientData.setConnectionStatus(reportConnectionStatus.connectivityStatus());
return stay().using(nextClientData);
}

@Nullable
protected abstract ActorRef getPublisherActor();

Expand Down Expand Up @@ -793,14 +801,14 @@ protected FSMStateFunctionBuilder<BaseClientState, BaseClientData> inTestingStat

private State<BaseClientState, BaseClientData> onUnknownEvent(final Object event, final BaseClientData state) {
Object message = event;
if (event instanceof Failure) {
message = ((Failure) event).cause();
} else if (event instanceof Status.Failure) {
message = ((Status.Failure) event).cause();
if (event instanceof Failure failure) {
message = failure.cause();
} else if (event instanceof Status.Failure statusFailure) {
message = statusFailure.cause();
}

if (message instanceof Throwable) {
logger.error((Throwable) message, "received Exception {} in state {} - status: {} - sender: {}",
if (message instanceof Throwable throwable) {
logger.error(throwable, "received Exception {} in state {} - status: {} - sender: {}",
message,
stateName(),
state.getConnectionStatus() + ": " + state.getConnectionStatusDetails().orElse(""),
Expand Down Expand Up @@ -1566,8 +1574,8 @@ private ConnectionFailedException newConnectionFailedException(final DittoHeader

private DittoRuntimeException unhandledExceptionForSignalInState(final Object signal,
final BaseClientState state) {
final DittoHeaders headers = signal instanceof WithDittoHeaders
? ((WithDittoHeaders) signal).getDittoHeaders()
final DittoHeaders headers = signal instanceof WithDittoHeaders withDittoHeaders
? withDittoHeaders.getDittoHeaders()
: DittoHeaders.empty();
switch (state) {
case CONNECTING:
Expand Down Expand Up @@ -1648,8 +1656,7 @@ private FSM.State<BaseClientState, BaseClientData> handleInboundSignal(final Inb
}

private static Optional<EntityId> tryExtractEntityId(final Signal<?> signal) {
if (signal instanceof WithEntityId) {
final var withEntityId = (WithEntityId) signal;
if (signal instanceof final WithEntityId withEntityId) {
return Optional.of(withEntityId.getEntityId());
} else {
return Optional.empty();
Expand Down Expand Up @@ -2007,20 +2014,14 @@ private void startSubscriptionRefreshTimer() {
}

private static Optional<StreamingType> toStreamingTypes(final Topic topic) {
switch (topic) {
case POLICY_ANNOUNCEMENTS:
return Optional.of(StreamingType.POLICY_ANNOUNCEMENTS);
case LIVE_EVENTS:
return Optional.of(StreamingType.LIVE_EVENTS);
case LIVE_COMMANDS:
return Optional.of(StreamingType.LIVE_COMMANDS);
case LIVE_MESSAGES:
return Optional.of(StreamingType.MESSAGES);
case TWIN_EVENTS:
return Optional.of(StreamingType.EVENTS);
default:
return Optional.empty();
}
return switch (topic) {
case POLICY_ANNOUNCEMENTS -> Optional.of(StreamingType.POLICY_ANNOUNCEMENTS);
case LIVE_EVENTS -> Optional.of(StreamingType.LIVE_EVENTS);
case LIVE_COMMANDS -> Optional.of(StreamingType.LIVE_COMMANDS);
case LIVE_MESSAGES -> Optional.of(StreamingType.MESSAGES);
case TWIN_EVENTS -> Optional.of(StreamingType.EVENTS);
default -> Optional.empty();
};
}

private static Optional<Integer> parseHexString(final String hexString) {
Expand Down Expand Up @@ -2262,7 +2263,7 @@ private CloseConnectionAndShutdown() {

public enum HealthSignal implements AkkaJacksonCborSerializable {
PING,
PONG;
PONG
}

}
@@ -0,0 +1,21 @@
/*
* Copyright text:
* Copyright (c) 2022 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0
*
* SPDX-License-Identifier: EPL-2.0
*/

package org.eclipse.ditto.connectivity.service.messaging;

import org.eclipse.ditto.connectivity.model.ConnectivityStatus;

public record ReportConnectionStatus(ConnectivityStatus connectivityStatus) {

}
Expand Up @@ -29,16 +29,19 @@
import org.eclipse.ditto.base.model.headers.DittoHeaders;
import org.eclipse.ditto.connectivity.api.BaseClientState;
import org.eclipse.ditto.connectivity.model.Connection;
import org.eclipse.ditto.connectivity.model.ConnectivityStatus;
import org.eclipse.ditto.connectivity.model.signals.commands.modify.TestConnection;
import org.eclipse.ditto.connectivity.service.config.MqttConfig;
import org.eclipse.ditto.connectivity.service.messaging.BaseClientActor;
import org.eclipse.ditto.connectivity.service.messaging.BaseClientData;
import org.eclipse.ditto.connectivity.service.messaging.ReportConnectionStatus;
import org.eclipse.ditto.connectivity.service.messaging.backoff.RetryTimeoutStrategy;
import org.eclipse.ditto.connectivity.service.messaging.internal.ClientConnected;
import org.eclipse.ditto.connectivity.service.messaging.internal.ClientDisconnected;
import org.eclipse.ditto.connectivity.service.messaging.mqtt.MqttSpecificConfig;
import org.eclipse.ditto.connectivity.service.messaging.mqtt.hivemq.client.ClientRole;
import org.eclipse.ditto.connectivity.service.messaging.mqtt.hivemq.client.GenericMqttClient;
import org.eclipse.ditto.connectivity.service.messaging.mqtt.hivemq.client.GenericMqttClientConnectedListener;
import org.eclipse.ditto.connectivity.service.messaging.mqtt.hivemq.client.GenericMqttClientDisconnectedListener;
import org.eclipse.ditto.connectivity.service.messaging.mqtt.hivemq.client.GenericMqttClientFactory;
import org.eclipse.ditto.connectivity.service.messaging.mqtt.hivemq.client.HiveMqttClientProperties;
Expand Down Expand Up @@ -250,8 +253,7 @@ private HiveMqttClientProperties getHiveMqttClientPropertiesOrThrow(final Connec
.withSshTunnelStateSupplier(this::getSshTunnelState)
.withConnectionLogger(connectionLogger)
.withActorUuid(actorUuid)
.withClientConnectedListener((context, clientRole) -> logger.info("Connected client <{}>.",
getClientId(clientRole, getMqttClientIdentifierOrNull(context.getClientConfig()))))
.withClientConnectedListener(getClientConnectedListener())
.withClientDisconnectedListener(getClientDisconnectedListener())
.build();
} catch (final NoMqttConnectionException e) {
Expand All @@ -261,6 +263,14 @@ private HiveMqttClientProperties getHiveMqttClientPropertiesOrThrow(final Connec
}
}

private GenericMqttClientConnectedListener getClientConnectedListener() {
return (context, clientRole) -> {
logger.info("Connected client <{}>.",
getClientId(clientRole, getMqttClientIdentifierOrNull(context.getClientConfig())));
getSelf().tell(new ReportConnectionStatus(ConnectivityStatus.OPEN), ActorRef.noSender());
};
}

@Nullable
private static MqttClientIdentifier getMqttClientIdentifierOrNull(final MqttClientConfig mqttClientConfig) {
return mqttClientConfig.getClientIdentifier().orElse(null);
Expand Down Expand Up @@ -303,6 +313,7 @@ private GenericMqttClientDisconnectedListener getClientDisconnectedListener() {
clientId,
retryTimeoutStrategy.getCurrentTries(),
reconnectDelay);
getSelf().tell(new ReportConnectionStatus(connectivityStatusResolver.resolve(context.getCause())), ActorRef.noSender());
} else {
logger.info("Not reconnecting client <{}>.", clientId);
}
Expand Down

0 comments on commit 9038269

Please sign in to comment.