Skip to content

Commit

Permalink
BaseClientActor: replace state timeout by own timers.
Browse files Browse the repository at this point in the history
Reason: state timeout gets reset by any message and can't
be longer than 5 minutes.

Deja vu. Did this happen before?

Signed-off-by: Cai Yufei (INST/ECS1) <yufei.cai@bosch-si.com>
  • Loading branch information
yufei-cai committed Aug 9, 2019
1 parent 5f0d024 commit 3992a46
Show file tree
Hide file tree
Showing 2 changed files with 80 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,8 @@
*/
public abstract class BaseClientActor extends AbstractFSM<BaseClientState, BaseClientData> {

private static final String DITTO_STATE_TIMEOUT_TIMER = "dittoStateTimeout";

private static final int SOCKET_CHECK_TIMEOUT_MS = 2000;

protected final DiagnosticLoggingAdapter log = LogUtil.obtain(this);
Expand Down Expand Up @@ -156,26 +158,27 @@ protected BaseClientActor(final Connection connection,
final BaseClientData startingData = new BaseClientData(connection.getId(), connection,
ConnectivityStatus.UNKNOWN, desiredConnectionStatus, "initialized", Instant.now(), null, null);


clientGauge = DittoMetrics.gauge("connection_client")
.tag("id", connection.getId())
.tag("type", connection.getConnectionType().getName());
clientConnectingGauge = DittoMetrics.gauge("connecting_client")
.tag("id", connection.getId())
.tag("type", connection.getConnectionType().getName());

startWith(UNKNOWN, startingData, clientConfig.getInitTimeout());

// stable states
when(UNKNOWN, inUnknownState());
when(CONNECTED, inConnectedState());
when(DISCONNECTED, inDisconnectedState());

// volatile states that time out
final Duration connectingTimeout = clientConfig.getConnectingMinTimeout();
when(CONNECTING, connectingTimeout, inConnectingState());
when(DISCONNECTING, connectingTimeout, inDisconnectingState());
when(TESTING, clientConfig.getTestingTimeout(), inTestingState());
// volatile states
//
// DO NOT use state timeout:
// FSM state timeout gets reset by any message, AND cannot be longer than 5 minutes (Akka v2.5.23).
when(DISCONNECTING, inDisconnectingState());
when(CONNECTING, inConnectingState());
when(TESTING, inTestingState());

startWith(UNKNOWN, startingData, clientConfig.getInitTimeout());

onTransition(this::onTransition);

Expand Down Expand Up @@ -419,6 +422,31 @@ private void onTransition(final BaseClientState from, final BaseClientState to)
if (from == CONNECTING) {
clientConnectingGauge.decrement();
}
// cancel our own state timeout if target state is stable
switch (to) {
case UNKNOWN:
case CONNECTED:
case DISCONNECTED:
cancelStateTimeout();
}
}

/*
* For each volatile state, use the special goTo methods for timer management.
*/
private FSM.State<BaseClientState, BaseClientData> goToConnecting(final Duration timeout) {
scheduleStateTimeout(timeout);
return goTo(CONNECTING);
}

private FSM.State<BaseClientState, BaseClientData> goToDisconnecting() {
scheduleStateTimeout(clientConfig.getConnectingMinTimeout());
return goTo(DISCONNECTING);
}

private FSM.State<BaseClientState, BaseClientData> goToTesting() {
scheduleStateTimeout(clientConfig.getTestingTimeout());
return goTo(TESTING);
}

private FSMStateFunctionBuilder<BaseClientState, BaseClientData> inUnknownState() {
Expand Down Expand Up @@ -550,7 +578,7 @@ private FSM.State<BaseClientState, BaseClientData> closeConnection(final CloseCo
final BaseClientData data) {
final ActorRef sender = getSender();
doDisconnectClient(data.getConnection(), sender);
return goTo(DISCONNECTING).using(setSession(data, sender, closeConnection.getDittoHeaders())
return goToDisconnecting().using(setSession(data, sender, closeConnection.getDittoHeaders())
.setDesiredConnectionStatus(ConnectivityStatus.CLOSED)
.setConnectionStatusDetails("closing or deleting connection at " + Instant.now()));
}
Expand All @@ -562,14 +590,15 @@ private FSM.State<BaseClientState, BaseClientData> openConnection(final OpenConn
final Connection connection = data.getConnection();
final DittoHeaders dittoHeaders = openConnection.getDittoHeaders();
reconnectTimeoutStrategy.reset();
final Duration connectingTimeout = clientConfig.getConnectingMinTimeout();
if (canConnectViaSocket(connection)) {
doConnectClient(connection, sender);
return goTo(CONNECTING).using(setSession(data, sender, dittoHeaders));
return goToConnecting(connectingTimeout).using(setSession(data, sender, dittoHeaders));
} else {
cleanupResourcesForConnection();
final DittoRuntimeException error = newConnectionFailedException(data.getConnection(), dittoHeaders);
sender.tell(new Status.Failure(error), getSelf());
return goTo(CONNECTING)
return goToConnecting(connectingTimeout)
.using(data.setConnectionStatus(ConnectivityStatus.FAILED)
.setConnectionStatusDetails(error.getMessage())
.resetSession());
Expand Down Expand Up @@ -631,10 +660,9 @@ private FSM.State<BaseClientState, BaseClientData> testConnection(final TestConn
});
}

return goTo(TESTING)
.using(setSession(data, sender, testConnection.getDittoHeaders())
.setConnection(connection)
.setConnectionStatusDetails("Testing connection since " + Instant.now()));
return goToTesting().using(setSession(data, sender, testConnection.getDittoHeaders())
.setConnection(connection)
.setConnectionStatusDetails("Testing connection since " + Instant.now()));
}

private FSM.State<BaseClientState, BaseClientData> connectionTimedOut(final BaseClientData data) {
Expand All @@ -653,23 +681,20 @@ private FSM.State<BaseClientState, BaseClientData> connectionTimedOut(final Base
if (ConnectivityStatus.OPEN.equals(data.getDesiredConnectionStatus())) {
if (reconnectTimeoutStrategy.canReconnect()) {
reconnect(data);
return goTo(CONNECTING).forMax(reconnectTimeoutStrategy.getNextTimeout())
.using(data.resetSession()
.setConnectionStatus(ConnectivityStatus.FAILED)
.setConnectionStatusDetails(timeoutMessage + " Will try to reconnect."));
return goToConnecting(reconnectTimeoutStrategy.getNextTimeout()).using(data.resetSession()
.setConnectionStatus(ConnectivityStatus.FAILED)
.setConnectionStatusDetails(timeoutMessage + " Will try to reconnect."));
} else {
connectionLogger.failure(
"Connection timed out. Reached maximum tries and thus will no longer try to reconnect.");
log.info(
"Connection <{}> reached maximum retries for reconnecting and thus will no longer try to reconnect.",
connectionId());

return goTo(UNKNOWN)
.forMax(scala.concurrent.duration.Duration.Inf())
.using(data.resetSession()
.setConnectionStatus(ConnectivityStatus.FAILED)
.setConnectionStatusDetails(timeoutMessage
+ " Reached maximum retries and thus will not try to reconnect any longer."));
return goTo(UNKNOWN).using(data.resetSession()
.setConnectionStatus(ConnectivityStatus.FAILED)
.setConnectionStatusDetails(timeoutMessage +
" Reached maximum retries and thus will not try to reconnect any longer."));
}
}

Expand Down Expand Up @@ -752,7 +777,7 @@ private State<BaseClientState, BaseClientData> backoffAfterFailure(final Connect
String.format("Connection failed due to: {0}. Will reconnect after %s.", nextBackoff);
connectionLogger.failure(errorMessage, event.getFailureDescription());
log.info("Connection failed: {}. Reconnect after {}.", event, nextBackoff);
return goTo(CONNECTING).forMax(nextBackoff).using(data.resetSession()
return goToConnecting(nextBackoff).using(data.resetSession()
.setConnectionStatus(ConnectivityStatus.FAILED)
.setConnectionStatusDetails(event.getFailureDescription()));
} else {
Expand All @@ -764,12 +789,10 @@ private State<BaseClientState, BaseClientData> backoffAfterFailure(final Connect
connectionId());

// stay in UNKNOWN state until re-opened manually
return goTo(UNKNOWN)
.forMax(scala.concurrent.duration.Duration.Inf())
.using(data.resetSession()
.setConnectionStatus(ConnectivityStatus.FAILED)
.setConnectionStatusDetails(event.getFailureDescription()
+ " Reached maximum retries and thus will not try to reconnect any longer."));
return goTo(UNKNOWN).using(data.resetSession()
.setConnectionStatus(ConnectivityStatus.FAILED)
.setConnectionStatusDetails(event.getFailureDescription()
+ " Reached maximum retries and thus will not try to reconnect any longer."));
}

}
Expand Down Expand Up @@ -1106,6 +1129,14 @@ private BaseClientData setSession(final BaseClientData data, @Nullable final Act
}
}

private void cancelStateTimeout() {
cancelTimer(DITTO_STATE_TIMEOUT_TIMER);
}

private void scheduleStateTimeout(final Duration duration) {
setTimer(DITTO_STATE_TIMEOUT_TIMER, StateTimeout(), duration, false);
}

/**
* Add meaningful message to status for reporting.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,23 @@ public void doesNotReconnectIfConnectionSuccessful() {
}};
}

@Test
public void connectsAutomaticallyAfterActorStart() {
new TestKit(actorSystem) {{
final String randomConnectionId = TestConstants.createRandomConnectionId();
final Connection connection =
TestConstants.createConnection(randomConnectionId,new Target[0]);
final Props props = DummyClientActor.props(connection, getRef(), delegate);

final ActorRef dummyClientActor = actorSystem.actorOf(props);
watch(dummyClientActor);

thenExpectConnectClientCalledAfterTimeout(Duration.ofSeconds(5L));
Mockito.clearInvocations(delegate);
andConnectionSuccessful(dummyClientActor, getRef());
}};
}

private void thenExpectConnectClientCalled() {
thenExpectConnectClientCalledAfterTimeout(Duration.ZERO);
}
Expand Down

0 comments on commit 3992a46

Please sign in to comment.