Skip to content

Commit

Permalink
Send connection announcement after opening and before closing a conne…
Browse files Browse the repository at this point in the history
…ction

Signed-off-by: Florian Fendt <Florian.Fendt@bosch.io>
  • Loading branch information
ffendt committed May 7, 2021
1 parent e67b80d commit f3cbc12
Show file tree
Hide file tree
Showing 9 changed files with 325 additions and 36 deletions.
Expand Up @@ -42,6 +42,21 @@ public interface ClientConfig {
*/
Duration getConnectingMaxTimeout();

/**
* Max timeout (when actually disconnecting) until we're assuming disconnecting failed.
*
* @return the maximum disconnecting timeout.
* @see ClientConfig#getDisconnectAnnouncementTimeout()
*/
Duration getDisconnectingMaxTimeout();

/**
* Time that will be waited between sending a disconnect announcement and actually disconnecting.
*
* @return the timeout.
*/
Duration getDisconnectAnnouncementTimeout();

/**
* Max timeout {@code SubscriptionManager} waits for search commands from {@code BaseClientActor}
*
Expand Down Expand Up @@ -108,6 +123,16 @@ enum ClientConfigValue implements KnownConfigValue {
*/
CONNECTING_MAX_TIMEOUT("connecting-max-timeout", Duration.ofMinutes(60L)),

/**
* See documentation on {@link ClientConfig#getDisconnectingMaxTimeout()} ()}.
*/
DISCONNECTING_MAX_TIMEOUT("disconnecting-max-timeout", CONNECTING_MIN_TIMEOUT.getDefaultValue()),

/**
* See documentation on {@link ClientConfig#getDisconnectAnnouncementTimeout()} ()} ()}.
*/
DISCONNECT_ANNOUNCEMENT_TIMEOUT("disconnect-announcement-timeout", Duration.ofSeconds(3L)),

/**
* See documentation on {@link ClientConfig#getSubscriptionManagerTimeout()}.
*/
Expand Down
Expand Up @@ -34,6 +34,8 @@ public final class DefaultClientConfig implements ClientConfig {
private final Duration initTimeout;
private final Duration connectingMinTimeout;
private final Duration connectingMaxTimeout;
private final Duration disconnectingMaxTimeout;
private final Duration disconnectAnnouncementTimeout;
private final Duration subscriptionManagerTimeout;
private final int connectingMaxTries;
private final Duration testingTimeout;
Expand All @@ -45,6 +47,9 @@ private DefaultClientConfig(final ScopedConfig config) {
initTimeout = config.getDuration(ClientConfigValue.INIT_TIMEOUT.getConfigPath());
connectingMinTimeout = config.getDuration(ClientConfigValue.CONNECTING_MIN_TIMEOUT.getConfigPath());
connectingMaxTimeout = config.getDuration(ClientConfigValue.CONNECTING_MAX_TIMEOUT.getConfigPath());
disconnectingMaxTimeout = config.getDuration(ClientConfigValue.DISCONNECTING_MAX_TIMEOUT.getConfigPath());
disconnectAnnouncementTimeout = config.getDuration(
ClientConfigValue.DISCONNECT_ANNOUNCEMENT_TIMEOUT.getConfigPath());
subscriptionManagerTimeout = config.getDuration(ClientConfigValue.SUBSCRIPTION_MANAGER_TIMEOUT.getConfigPath());
connectingMaxTries = config.getInt(ClientConfigValue.CONNECTING_MAX_TRIES.getConfigPath());
testingTimeout = config.getDuration(ClientConfigValue.TESTING_TIMEOUT.getConfigPath());
Expand Down Expand Up @@ -75,6 +80,16 @@ public Duration getConnectingMaxTimeout() {
return connectingMaxTimeout;
}

@Override
public Duration getDisconnectingMaxTimeout() {
return disconnectingMaxTimeout;
}

@Override
public Duration getDisconnectAnnouncementTimeout() {
return disconnectAnnouncementTimeout;
}

@Override
public Duration getSubscriptionManagerTimeout() {
return subscriptionManagerTimeout;
Expand Down Expand Up @@ -117,6 +132,8 @@ public boolean equals(@Nullable final Object o) {
return Objects.equals(initTimeout, that.initTimeout) &&
Objects.equals(connectingMinTimeout, that.connectingMinTimeout) &&
Objects.equals(connectingMaxTimeout, that.connectingMaxTimeout) &&
Objects.equals(disconnectingMaxTimeout, that.disconnectingMaxTimeout) &&
Objects.equals(disconnectAnnouncementTimeout, that.disconnectAnnouncementTimeout) &&
Objects.equals(connectingMaxTries, that.connectingMaxTries) &&
Objects.equals(testingTimeout, that.testingTimeout) &&
Objects.equals(minBackoff, that.minBackoff) &&
Expand All @@ -127,8 +144,9 @@ public boolean equals(@Nullable final Object o) {

@Override
public int hashCode() {
return Objects.hash(initTimeout, connectingMinTimeout, connectingMaxTimeout, connectingMaxTries,
testingTimeout, minBackoff, maxBackoff, subscriptionManagerTimeout, clientActorRefsNotificationDelay);
return Objects.hash(initTimeout, connectingMinTimeout, connectingMaxTimeout, disconnectingMaxTimeout,
disconnectAnnouncementTimeout, connectingMaxTries, testingTimeout, minBackoff, maxBackoff,
subscriptionManagerTimeout, clientActorRefsNotificationDelay);
}

@Override
Expand All @@ -137,6 +155,8 @@ public String toString() {
", initTimeout=" + initTimeout +
", connectingMinTimeout=" + connectingMinTimeout +
", connectingMaxTimeout=" + connectingMaxTimeout +
", disconnectingMaxTimeout=" + disconnectingMaxTimeout +
", disconnectAnnouncementTimeout=" + disconnectAnnouncementTimeout +
", connectingMaxTries=" + connectingMaxTries +
", testingTimeout=" + testingTimeout +
", minBackoff=" + minBackoff +
Expand Down
Expand Up @@ -54,6 +54,10 @@
import org.eclipse.ditto.base.model.exceptions.DittoRuntimeException;
import org.eclipse.ditto.base.model.headers.DittoHeaders;
import org.eclipse.ditto.base.model.headers.WithDittoHeaders;
import org.eclipse.ditto.base.model.signals.Signal;
import org.eclipse.ditto.connectivity.api.BaseClientState;
import org.eclipse.ditto.connectivity.api.InboundSignal;
import org.eclipse.ditto.connectivity.api.OutboundSignal;
import org.eclipse.ditto.connectivity.model.Connection;
import org.eclipse.ditto.connectivity.model.ConnectionId;
import org.eclipse.ditto.connectivity.model.ConnectionMetrics;
Expand All @@ -67,6 +71,25 @@
import org.eclipse.ditto.connectivity.model.Target;
import org.eclipse.ditto.connectivity.model.TargetMetrics;
import org.eclipse.ditto.connectivity.model.Topic;
import org.eclipse.ditto.connectivity.model.signals.announcements.ConnectionClosedAnnouncement;
import org.eclipse.ditto.connectivity.model.signals.announcements.ConnectionOpenedAnnouncement;
import org.eclipse.ditto.connectivity.model.signals.commands.ConnectivityCommand;
import org.eclipse.ditto.connectivity.model.signals.commands.exceptions.ConnectionFailedException;
import org.eclipse.ditto.connectivity.model.signals.commands.exceptions.ConnectionSignalIllegalException;
import org.eclipse.ditto.connectivity.model.signals.commands.modify.CheckConnectionLogsActive;
import org.eclipse.ditto.connectivity.model.signals.commands.modify.CloseConnection;
import org.eclipse.ditto.connectivity.model.signals.commands.modify.CreateConnection;
import org.eclipse.ditto.connectivity.model.signals.commands.modify.EnableConnectionLogs;
import org.eclipse.ditto.connectivity.model.signals.commands.modify.LoggingExpired;
import org.eclipse.ditto.connectivity.model.signals.commands.modify.OpenConnection;
import org.eclipse.ditto.connectivity.model.signals.commands.modify.ResetConnectionLogs;
import org.eclipse.ditto.connectivity.model.signals.commands.modify.ResetConnectionMetrics;
import org.eclipse.ditto.connectivity.model.signals.commands.modify.TestConnection;
import org.eclipse.ditto.connectivity.model.signals.commands.query.RetrieveConnectionLogs;
import org.eclipse.ditto.connectivity.model.signals.commands.query.RetrieveConnectionLogsResponse;
import org.eclipse.ditto.connectivity.model.signals.commands.query.RetrieveConnectionMetrics;
import org.eclipse.ditto.connectivity.model.signals.commands.query.RetrieveConnectionMetricsResponse;
import org.eclipse.ditto.connectivity.model.signals.commands.query.RetrieveConnectionStatus;
import org.eclipse.ditto.connectivity.service.config.ClientConfig;
import org.eclipse.ditto.connectivity.service.config.ConnectivityConfig;
import org.eclipse.ditto.connectivity.service.config.ConnectivityConfigModifiedBehavior;
Expand All @@ -88,11 +111,7 @@
import org.eclipse.ditto.connectivity.service.messaging.tunnel.SshTunnelActor;
import org.eclipse.ditto.connectivity.service.messaging.tunnel.SshTunnelState;
import org.eclipse.ditto.connectivity.service.messaging.validation.ConnectionValidator;
import org.eclipse.ditto.protocol.adapter.ProtocolAdapter;
import org.eclipse.ditto.connectivity.service.util.ConnectivityMdcEntryKey;
import org.eclipse.ditto.connectivity.api.BaseClientState;
import org.eclipse.ditto.connectivity.api.InboundSignal;
import org.eclipse.ditto.connectivity.api.OutboundSignal;
import org.eclipse.ditto.internal.utils.akka.logging.DittoLoggerFactory;
import org.eclipse.ditto.internal.utils.akka.logging.ThreadSafeDittoLoggingAdapter;
import org.eclipse.ditto.internal.utils.config.DefaultScopedConfig;
Expand All @@ -103,24 +122,7 @@
import org.eclipse.ditto.internal.utils.pubsub.DittoProtocolSub;
import org.eclipse.ditto.internal.utils.pubsub.StreamingType;
import org.eclipse.ditto.internal.utils.search.SubscriptionManager;
import org.eclipse.ditto.base.model.signals.Signal;
import org.eclipse.ditto.connectivity.model.signals.commands.ConnectivityCommand;
import org.eclipse.ditto.connectivity.model.signals.commands.exceptions.ConnectionFailedException;
import org.eclipse.ditto.connectivity.model.signals.commands.exceptions.ConnectionSignalIllegalException;
import org.eclipse.ditto.connectivity.model.signals.commands.modify.CheckConnectionLogsActive;
import org.eclipse.ditto.connectivity.model.signals.commands.modify.CloseConnection;
import org.eclipse.ditto.connectivity.model.signals.commands.modify.CreateConnection;
import org.eclipse.ditto.connectivity.model.signals.commands.modify.EnableConnectionLogs;
import org.eclipse.ditto.connectivity.model.signals.commands.modify.LoggingExpired;
import org.eclipse.ditto.connectivity.model.signals.commands.modify.OpenConnection;
import org.eclipse.ditto.connectivity.model.signals.commands.modify.ResetConnectionLogs;
import org.eclipse.ditto.connectivity.model.signals.commands.modify.ResetConnectionMetrics;
import org.eclipse.ditto.connectivity.model.signals.commands.modify.TestConnection;
import org.eclipse.ditto.connectivity.model.signals.commands.query.RetrieveConnectionLogs;
import org.eclipse.ditto.connectivity.model.signals.commands.query.RetrieveConnectionLogsResponse;
import org.eclipse.ditto.connectivity.model.signals.commands.query.RetrieveConnectionMetrics;
import org.eclipse.ditto.connectivity.model.signals.commands.query.RetrieveConnectionMetricsResponse;
import org.eclipse.ditto.connectivity.model.signals.commands.query.RetrieveConnectionStatus;
import org.eclipse.ditto.protocol.adapter.ProtocolAdapter;
import org.eclipse.ditto.thingsearch.model.signals.commands.ThingSearchCommand;
import org.eclipse.ditto.thingsearch.model.signals.commands.WithSubscriptionId;

Expand Down Expand Up @@ -649,6 +651,7 @@ private void onTransition(final BaseClientState from, final BaseClientState to)
if (to == CONNECTED) {
clientGauge.set(1L);
reconnectTimeoutStrategy.reset();
publishConnectionOpenedAnnouncement();
}
if (to == DISCONNECTED) {
clientGauge.reset();
Expand All @@ -666,6 +669,10 @@ private void onTransition(final BaseClientState from, final BaseClientState to)
}
}

private void publishConnectionOpenedAnnouncement() {
getSelf().tell(ConnectionOpenedAnnouncement.of(connectionId(), Instant.now(), DittoHeaders.empty()), getSelf());
}

/*
* For each volatile state, use the special goTo methods for timer management.
*/
Expand All @@ -674,8 +681,8 @@ private FSM.State<BaseClientState, BaseClientData> goToConnecting(final Duration
return goTo(CONNECTING);
}

private FSM.State<BaseClientState, BaseClientData> goToDisconnecting() {
scheduleStateTimeout(clientConfig.getConnectingMinTimeout());
private FSM.State<BaseClientState, BaseClientData> goToDisconnecting(final Duration timeout) {
scheduleStateTimeout(timeout);
return goTo(DISCONNECTING);
}

Expand Down Expand Up @@ -760,6 +767,8 @@ private FSM.State<BaseClientState, BaseClientData> publishMappedMessage(final Pu
*/
protected FSMStateFunctionBuilder<BaseClientState, BaseClientData> inDisconnectingState() {
return matchEventEquals(StateTimeout(), (event, data) -> connectionTimedOut(data))
.eventEquals(SendDisconnectAnnouncement, (event, data) -> this.sendDisconnectAnnouncement(data))
.event(Disconnect.class, this::disconnect)
.event(ConnectionFailure.class, this::connectingConnectionFailed)
.event(ClientDisconnected.class, this::clientDisconnected);
}
Expand Down Expand Up @@ -830,12 +839,35 @@ private FSM.State<BaseClientState, BaseClientData> closeConnection(final CloseCo
final BaseClientData data) {

final ActorRef sender = getSender();
doDisconnectClient(data.getConnection(), sender);

final Duration disconnectAnnouncementTimeout = clientConfig.getDisconnectAnnouncementTimeout();
final Duration timeoutUntilDisconnectCompletes =
clientConfig.getDisconnectingMaxTimeout().plus(disconnectAnnouncementTimeout);

getSelf().tell(SendDisconnectAnnouncement, sender);
startSingleTimer("startDisconnect", new Disconnect(sender), disconnectAnnouncementTimeout);

dittoProtocolSub.removeSubscriber(getSelf());
return goToDisconnecting().using(setSession(data, sender, closeConnection.getDittoHeaders())
.setDesiredConnectionStatus(ConnectivityStatus.CLOSED)
.setConnectionStatusDetails("closing or deleting connection at " + Instant.now()));

return goToDisconnecting(timeoutUntilDisconnectCompletes).using(
setSession(data, sender, closeConnection.getDittoHeaders())
.setDesiredConnectionStatus(ConnectivityStatus.CLOSED)
.setConnectionStatusDetails(
"cleaning up before closing or deleting connection at " + Instant.now()));
}

private State<BaseClientState, BaseClientData> sendDisconnectAnnouncement(final BaseClientData data) {
final ConnectionClosedAnnouncement announcement =
ConnectionClosedAnnouncement.of(data.getConnectionId(), Instant.now(), DittoHeaders.empty());
// need to tell the announcement directly to the dispatching actor since the state == DISCONNECTING
outboundDispatchingActor.tell(announcement, getSender());
return stay();
}

private State<BaseClientState, BaseClientData> disconnect(final Disconnect disconnect, final BaseClientData data) {
doDisconnectClient(connection(), disconnect.getSender());
return stay()
.using(data.setConnectionStatusDetails("disconnecting connection at " + Instant.now()));
}

private FSM.State<BaseClientState, BaseClientData> openConnection(final OpenConnection openConnection,
Expand Down Expand Up @@ -872,7 +904,6 @@ private FSM.State<BaseClientState, BaseClientData> doOpenConnection(final BaseCl
}
}


private FSM.State<BaseClientState, BaseClientData> connectionAlreadyOpen(final OpenConnection openConnection,
final BaseClientData data) {

Expand Down Expand Up @@ -1460,7 +1491,7 @@ private FSM.State<BaseClientState, BaseClientData> handleInboundSignal(final Inb
return stay();
}

private static Optional<EntityId> tryExtractEntityId(Signal<?> signal){
private static Optional<EntityId> tryExtractEntityId(Signal<?> signal) {
if (signal instanceof WithEntityId) {
final WithEntityId withEntityId = (WithEntityId) signal;
return Optional.of(withEntityId.getEntityId());
Expand Down Expand Up @@ -1679,7 +1710,7 @@ private void cancelStateTimeout() {
}

private void scheduleStateTimeout(final Duration duration) {
setTimer(DITTO_STATE_TIMEOUT_TIMER, StateTimeout(), duration, false);
startSingleTimer(DITTO_STATE_TIMEOUT_TIMER, StateTimeout(), duration);
}

/**
Expand Down Expand Up @@ -2019,6 +2050,7 @@ public boolean isSuccess() {
public String toString() {
return isSuccess() ? "Success" : failure.toString();
}

}

private enum Control {
Expand All @@ -2042,6 +2074,7 @@ private ReplaceInboundMappingProcessor(final InboundMappingProcessor inboundMapp
InboundMappingProcessor getInboundMappingProcessor() {
return inboundMappingProcessor;
}

}

/**
Expand All @@ -2060,5 +2093,25 @@ private ReplaceOutboundMappingProcessor(
OutboundMappingProcessor getOutboundMappingProcessor() {
return outboundMappingProcessor;
}

}

private static final Object SendDisconnectAnnouncement = new Object();

private static final class Disconnect {

@Nullable
private final ActorRef sender;

private Disconnect(@Nullable final ActorRef sender) {
this.sender = sender;
}

@Nullable
private ActorRef getSender() {
return sender;
}

}

}
12 changes: 12 additions & 0 deletions connectivity/service/src/main/resources/connectivity.conf
Expand Up @@ -362,6 +362,18 @@ ditto {
# max time ~= connecting-max-tries * connecting-max-timeout = 50 * 60m = 50h
connecting-max-tries = 50
connecting-max-tries = ${?CONNECTIVITY_CLIENT_CONNECTING_MAX_TRIES}
# Max timeout when actually disconnecting until we're assuming disconnecting failed. The timeout will start
# after a disconnect announcment was sent (see disconnect-announcement-timeout) and disconnecting is actually
# triggered. Thus, the total amount of timeout that might elapse between closing a connection and running into
# the timeout, is: disconnect-announcement-timeout + disconnecting-max-timeout.
disconnecting-max-timeout = ${ditto.connectivity.client.connecting-min-timeout}
disconnecting-max-timeout = ${?CONNECTIVITY_CLIENT_DISCONNECTING_MAX_TIMEOUT}
# Time that will be waited between sending a disconnect announcement and actually disconnecting.
# When closing a connection, a disconnect announcement (special topic for connection targets) will be sent to
# the client. The connection will wait for disconnect-announcement-timeout before actually disconnecting
# the client.
disconnect-announcement-timeout = 3s
disconnect-announcement-timeout = ${?CONNECTIVITY_CLIENT_DISCONNECT_ANNOUNCEMENT_TIMEOUT}
# Max timeout until waiting for search commands.
subscription-manager-timeout = 60s
subscription-manager-timeout = ${?CONNECTIVITY_CLIENT_SUBSCRIPTION_MANAGER_TIMEOUT}
Expand Down

0 comments on commit f3cbc12

Please sign in to comment.