Skip to content

Commit

Permalink
added separate ReportConnectionStatusError and ReportConnectionStatus…
Browse files Browse the repository at this point in the history
…Success

 - The ReportConnectionStatusError will send only the throwable to the BaseClientActor in order to pass on the error in statusDetails.
 - The ReportConnectionStatusSuccess is only a marker to trigger updateConnectionStatusSuccess()
 - also fixed a few places where inConnectionStatusSince wasn't properly updated

Signed-off-by: Kalin Kostashki <kalin.kostashki@bosch.io>
  • Loading branch information
Kalin Kostashki committed Jul 29, 2022
1 parent 9038269 commit 70dcc42
Show file tree
Hide file tree
Showing 5 changed files with 53 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,7 @@ public abstract class BaseClientActor extends AbstractFSMWithStash<BaseClientSta
protected final ConnectionLogger connectionLogger;
protected final ConnectivityStatusResolver connectivityStatusResolver;

private static final String CONNECTION_STATUS_DETAILS_CONNECTED = "CONNECTED";
/**
* The name of the dispatcher that will be used for async mapping.
*/
Expand Down Expand Up @@ -723,14 +724,28 @@ protected FSMStateFunctionBuilder<BaseClientState, BaseClientData> inConnectedSt
.event(SshTunnelActor.TunnelClosed.class, this::tunnelClosed)
.event(OpenConnection.class, this::connectionAlreadyOpen)
.event(ConnectionFailure.class, this::connectedConnectionFailed)
.event(ReportConnectionStatus.class, this::updateConnectionStatusPartially)
.event(ReportConnectionStatusSuccess.class, this::updateConnectionStatusSuccess)
.event(ReportConnectionStatusError.class, this::updateConnectionStatusError)
.eventEquals(Control.RESUBSCRIBE, this::resubscribe);
}

private State<BaseClientState, BaseClientData> updateConnectionStatusPartially(
final ReportConnectionStatus reportConnectionStatus,
private State<BaseClientState, BaseClientData> updateConnectionStatusSuccess(final ReportConnectionStatusSuccess reportConnectionStatusSuccess,
BaseClientData baseClientData) {
BaseClientData nextClientData = baseClientData.setConnectionStatus(ConnectivityStatus.OPEN)
.setRecoveryStatus(RecoveryStatus.SUCCEEDED)
.setConnectionStatusDetails(CONNECTION_STATUS_DETAILS_CONNECTED)
.setInConnectionStatusSince(Instant.now());
return stay().using(nextClientData);
}

private State<BaseClientState, BaseClientData> updateConnectionStatusError(
final ReportConnectionStatusError reportConnectionStatus,
final BaseClientData baseClientData) {
BaseClientData nextClientData = baseClientData.setConnectionStatus(reportConnectionStatus.connectivityStatus());
BaseClientData nextClientData = baseClientData.setConnectionStatus(connectivityStatusResolver.resolve(reportConnectionStatus.cause()))
.setRecoveryStatus(RecoveryStatus.ONGOING)
.setConnectionStatusDetails(
ConnectionFailure.determineFailureDescription(null, reportConnectionStatus.cause(), null))
.setInConnectionStatusSince(Instant.now());
return stay().using(nextClientData);
}

Expand Down Expand Up @@ -1204,7 +1219,8 @@ private State<BaseClientState, BaseClientData> gotoConnectedAfterInitialization(
.resetFailureCount()
.setConnectionStatus(ConnectivityStatus.OPEN)
.setRecoveryStatus(RecoveryStatus.SUCCEEDED)
.setConnectionStatusDetails("Connected at " + Instant.now())
.setConnectionStatusDetails(CONNECTION_STATUS_DETAILS_CONNECTED)
.setInConnectionStatusSince(Instant.now())
);
} else {
logger.info("Initialization of consumers, publisher and subscriptions successful, but failures were " +
Expand Down Expand Up @@ -1415,12 +1431,11 @@ private FSM.State<BaseClientState, BaseClientData> retrieveConnectionStatus(fina
} else {
retrieveAddressStatusFromChildren(command, sender, childrenToAsk);
}

final ResourceStatus clientStatus =
ConnectivityModelFactory.newClientStatus(getInstanceIdentifier(),
clientConnectionStatus,
data.getRecoveryStatus(),
"[" + stateName().name() + "] " + data.getConnectionStatusDetails().orElse(""),
data.getConnectionStatusDetails().orElse(""),
getInConnectionStatusSince());
sender.tell(clientStatus, getSelf());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,10 @@ public BaseClientData setConnectionStatusDetails(@Nullable final String connecti
return BaseClientDataBuilder.from(this).setConnectionStatusDetails(connectionStatusDetails).build();
}

public BaseClientData setInConnectionStatusSince(final Instant inConnectionStatusSince) {
return BaseClientDataBuilder.from(this).setInConnectionStatusSince(inConnectionStatusSince).build();
}

/**
* Adds the passed {@code origin} sender with the passed {@code dittoHeaders} to the managed {@code sessionSenders}
* returning a new instance of BaseClientData.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,9 @@

package org.eclipse.ditto.connectivity.service.messaging;

import org.eclipse.ditto.connectivity.model.ConnectivityStatus;
/*
public record ReportConnectionStatus(ConnectivityStatus connectivityStatus) {
*/
public record ReportConnectionStatusError(Throwable cause) {

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
/*
* Copyright text:
* Copyright (c) 2022 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0
*
* SPDX-License-Identifier: EPL-2.0
*/

package org.eclipse.ditto.connectivity.service.messaging;

// A placeholder indicating ConnectivityStatus.OPEN
public record ReportConnectionStatusSuccess() {
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,12 @@
import org.eclipse.ditto.base.model.headers.DittoHeaders;
import org.eclipse.ditto.connectivity.api.BaseClientState;
import org.eclipse.ditto.connectivity.model.Connection;
import org.eclipse.ditto.connectivity.model.ConnectivityStatus;
import org.eclipse.ditto.connectivity.model.signals.commands.modify.TestConnection;
import org.eclipse.ditto.connectivity.service.config.MqttConfig;
import org.eclipse.ditto.connectivity.service.messaging.BaseClientActor;
import org.eclipse.ditto.connectivity.service.messaging.BaseClientData;
import org.eclipse.ditto.connectivity.service.messaging.ReportConnectionStatus;
import org.eclipse.ditto.connectivity.service.messaging.ReportConnectionStatusError;
import org.eclipse.ditto.connectivity.service.messaging.ReportConnectionStatusSuccess;
import org.eclipse.ditto.connectivity.service.messaging.backoff.RetryTimeoutStrategy;
import org.eclipse.ditto.connectivity.service.messaging.internal.ClientConnected;
import org.eclipse.ditto.connectivity.service.messaging.internal.ClientDisconnected;
Expand Down Expand Up @@ -267,7 +267,7 @@ private GenericMqttClientConnectedListener getClientConnectedListener() {
return (context, clientRole) -> {
logger.info("Connected client <{}>.",
getClientId(clientRole, getMqttClientIdentifierOrNull(context.getClientConfig())));
getSelf().tell(new ReportConnectionStatus(ConnectivityStatus.OPEN), ActorRef.noSender());
getSelf().tell(new ReportConnectionStatusSuccess(), ActorRef.noSender());
};
}

Expand Down Expand Up @@ -313,7 +313,8 @@ private GenericMqttClientDisconnectedListener getClientDisconnectedListener() {
clientId,
retryTimeoutStrategy.getCurrentTries(),
reconnectDelay);
getSelf().tell(new ReportConnectionStatus(connectivityStatusResolver.resolve(context.getCause())), ActorRef.noSender());
// This is sent because the status of the client isn't made explicit to the user.
getSelf().tell(new ReportConnectionStatusError(context.getCause()), ActorRef.noSender());
} else {
logger.info("Not reconnecting client <{}>.", clientId);
}
Expand Down

0 comments on commit 70dcc42

Please sign in to comment.