diff --git a/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/BaseClientActor.java b/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/BaseClientActor.java index d5df1bdb7d..74a8445c42 100644 --- a/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/BaseClientActor.java +++ b/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/BaseClientActor.java @@ -167,6 +167,7 @@ public abstract class BaseClientActor extends AbstractFSMWithStash inConnectedSt .event(SshTunnelActor.TunnelClosed.class, this::tunnelClosed) .event(OpenConnection.class, this::connectionAlreadyOpen) .event(ConnectionFailure.class, this::connectedConnectionFailed) - .event(ReportConnectionStatus.class, this::updateConnectionStatusPartially) + .event(ReportConnectionStatusSuccess.class, this::updateConnectionStatusSuccess) + .event(ReportConnectionStatusError.class, this::updateConnectionStatusError) .eventEquals(Control.RESUBSCRIBE, this::resubscribe); } - private State updateConnectionStatusPartially( - final ReportConnectionStatus reportConnectionStatus, + private State updateConnectionStatusSuccess(final ReportConnectionStatusSuccess reportConnectionStatusSuccess, + BaseClientData baseClientData) { + BaseClientData nextClientData = baseClientData.setConnectionStatus(ConnectivityStatus.OPEN) + .setRecoveryStatus(RecoveryStatus.SUCCEEDED) + .setConnectionStatusDetails(CONNECTION_STATUS_DETAILS_CONNECTED) + .setInConnectionStatusSince(Instant.now()); + return stay().using(nextClientData); + } + + private State updateConnectionStatusError( + final ReportConnectionStatusError reportConnectionStatus, final BaseClientData baseClientData) { - BaseClientData nextClientData = baseClientData.setConnectionStatus(reportConnectionStatus.connectivityStatus()); + BaseClientData nextClientData = baseClientData.setConnectionStatus(connectivityStatusResolver.resolve(reportConnectionStatus.cause())) + .setRecoveryStatus(RecoveryStatus.ONGOING) + .setConnectionStatusDetails( + ConnectionFailure.determineFailureDescription(null, reportConnectionStatus.cause(), null)) + .setInConnectionStatusSince(Instant.now()); return stay().using(nextClientData); } @@ -1204,7 +1219,8 @@ private State gotoConnectedAfterInitialization( .resetFailureCount() .setConnectionStatus(ConnectivityStatus.OPEN) .setRecoveryStatus(RecoveryStatus.SUCCEEDED) - .setConnectionStatusDetails("Connected at " + Instant.now()) + .setConnectionStatusDetails(CONNECTION_STATUS_DETAILS_CONNECTED) + .setInConnectionStatusSince(Instant.now()) ); } else { logger.info("Initialization of consumers, publisher and subscriptions successful, but failures were " + @@ -1415,12 +1431,11 @@ private FSM.State retrieveConnectionStatus(fina } else { retrieveAddressStatusFromChildren(command, sender, childrenToAsk); } - final ResourceStatus clientStatus = ConnectivityModelFactory.newClientStatus(getInstanceIdentifier(), clientConnectionStatus, data.getRecoveryStatus(), - "[" + stateName().name() + "] " + data.getConnectionStatusDetails().orElse(""), + data.getConnectionStatusDetails().orElse(""), getInConnectionStatusSince()); sender.tell(clientStatus, getSelf()); diff --git a/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/BaseClientData.java b/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/BaseClientData.java index 855c98c235..9f2a3a43fd 100644 --- a/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/BaseClientData.java +++ b/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/BaseClientData.java @@ -192,6 +192,10 @@ public BaseClientData setConnectionStatusDetails(@Nullable final String connecti return BaseClientDataBuilder.from(this).setConnectionStatusDetails(connectionStatusDetails).build(); } + public BaseClientData setInConnectionStatusSince(final Instant inConnectionStatusSince) { + return BaseClientDataBuilder.from(this).setInConnectionStatusSince(inConnectionStatusSince).build(); + } + /** * Adds the passed {@code origin} sender with the passed {@code dittoHeaders} to the managed {@code sessionSenders} * returning a new instance of BaseClientData. diff --git a/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/ReportConnectionStatus.java b/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/ReportConnectionStatusError.java similarity index 77% rename from connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/ReportConnectionStatus.java rename to connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/ReportConnectionStatusError.java index d4e193efd5..1e636bb878 100644 --- a/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/ReportConnectionStatus.java +++ b/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/ReportConnectionStatusError.java @@ -14,8 +14,9 @@ package org.eclipse.ditto.connectivity.service.messaging; -import org.eclipse.ditto.connectivity.model.ConnectivityStatus; +/* -public record ReportConnectionStatus(ConnectivityStatus connectivityStatus) { + */ +public record ReportConnectionStatusError(Throwable cause) { } diff --git a/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/ReportConnectionStatusSuccess.java b/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/ReportConnectionStatusSuccess.java new file mode 100644 index 0000000000..1105256ba6 --- /dev/null +++ b/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/ReportConnectionStatusSuccess.java @@ -0,0 +1,19 @@ +/* + * Copyright text: + * Copyright (c) 2022 Contributors to the Eclipse Foundation + * + * See the NOTICE file(s) distributed with this work for additional + * information regarding copyright ownership. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0 + * + * SPDX-License-Identifier: EPL-2.0 + */ + +package org.eclipse.ditto.connectivity.service.messaging; + +// A placeholder indicating ConnectivityStatus.OPEN +public record ReportConnectionStatusSuccess() { +} diff --git a/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/mqtt/hivemq/MqttClientActor.java b/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/mqtt/hivemq/MqttClientActor.java index 289c09eaf9..ea7dfae360 100644 --- a/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/mqtt/hivemq/MqttClientActor.java +++ b/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/mqtt/hivemq/MqttClientActor.java @@ -29,12 +29,12 @@ import org.eclipse.ditto.base.model.headers.DittoHeaders; import org.eclipse.ditto.connectivity.api.BaseClientState; import org.eclipse.ditto.connectivity.model.Connection; -import org.eclipse.ditto.connectivity.model.ConnectivityStatus; import org.eclipse.ditto.connectivity.model.signals.commands.modify.TestConnection; import org.eclipse.ditto.connectivity.service.config.MqttConfig; import org.eclipse.ditto.connectivity.service.messaging.BaseClientActor; import org.eclipse.ditto.connectivity.service.messaging.BaseClientData; -import org.eclipse.ditto.connectivity.service.messaging.ReportConnectionStatus; +import org.eclipse.ditto.connectivity.service.messaging.ReportConnectionStatusError; +import org.eclipse.ditto.connectivity.service.messaging.ReportConnectionStatusSuccess; import org.eclipse.ditto.connectivity.service.messaging.backoff.RetryTimeoutStrategy; import org.eclipse.ditto.connectivity.service.messaging.internal.ClientConnected; import org.eclipse.ditto.connectivity.service.messaging.internal.ClientDisconnected; @@ -267,7 +267,7 @@ private GenericMqttClientConnectedListener getClientConnectedListener() { return (context, clientRole) -> { logger.info("Connected client <{}>.", getClientId(clientRole, getMqttClientIdentifierOrNull(context.getClientConfig()))); - getSelf().tell(new ReportConnectionStatus(ConnectivityStatus.OPEN), ActorRef.noSender()); + getSelf().tell(new ReportConnectionStatusSuccess(), ActorRef.noSender()); }; } @@ -313,7 +313,8 @@ private GenericMqttClientDisconnectedListener getClientDisconnectedListener() { clientId, retryTimeoutStrategy.getCurrentTries(), reconnectDelay); - getSelf().tell(new ReportConnectionStatus(connectivityStatusResolver.resolve(context.getCause())), ActorRef.noSender()); + // This is sent because the status of the client isn't made explicit to the user. + getSelf().tell(new ReportConnectionStatusError(context.getCause()), ActorRef.noSender()); } else { logger.info("Not reconnecting client <{}>.", clientId); }