Skip to content

Commit

Permalink
added unittest for connection RecoveryStatus
Browse files Browse the repository at this point in the history
Signed-off-by: Thomas Jaeckle <thomas.jaeckle@bosch.io>
  • Loading branch information
thjaeckle committed Mar 8, 2022
1 parent 3bf2528 commit 43552ed
Showing 1 changed file with 88 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@
import org.eclipse.ditto.connectivity.model.Connection;
import org.eclipse.ditto.connectivity.model.ConnectionId;
import org.eclipse.ditto.connectivity.model.ConnectivityModelFactory;
import org.eclipse.ditto.connectivity.model.RecoveryStatus;
import org.eclipse.ditto.connectivity.model.ResourceStatus;
import org.eclipse.ditto.connectivity.model.Target;
import org.eclipse.ditto.connectivity.model.Topic;
import org.eclipse.ditto.connectivity.model.signals.announcements.ConnectionClosedAnnouncement;
Expand All @@ -51,6 +53,7 @@
import org.eclipse.ditto.connectivity.model.signals.commands.modify.CloseConnection;
import org.eclipse.ditto.connectivity.model.signals.commands.modify.OpenConnection;
import org.eclipse.ditto.connectivity.model.signals.commands.modify.TestConnection;
import org.eclipse.ditto.connectivity.model.signals.commands.query.RetrieveConnectionStatus;
import org.eclipse.ditto.connectivity.service.config.DittoConnectivityConfig;
import org.eclipse.ditto.connectivity.service.messaging.internal.ClientConnected;
import org.eclipse.ditto.connectivity.service.messaging.internal.ClientDisconnected;
Expand All @@ -69,7 +72,9 @@
import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnitRunner;

import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import com.typesafe.config.ConfigValueFactory;

import akka.actor.ActorRef;
import akka.actor.ActorSystem;
Expand Down Expand Up @@ -181,6 +186,7 @@ private void reconnectsAfterBackoffWhenMultipleFailuresReceived(final boolean in
// verify that doConnectClient is called after correct backoff
thenExpectConnectClientCalledAfterTimeout(i + 2, connectivityConfig.getClientConfig().getMaxBackoff());
}

final long totalBackoffDurationMs = System.currentTimeMillis() - start;
final long tolerancePerBackoffMs = 100L; // allow 100ms tolerance per backoff until connectClient is called
assertThat(totalBackoffDurationMs).isGreaterThan(expectedTotalBackoffMs);
Expand All @@ -189,6 +195,63 @@ private void reconnectsAfterBackoffWhenMultipleFailuresReceived(final boolean in
}};
}

@Test
public void ensureRecoveryStateIsExpectedAfterReconnectsAndBackoff() {
new TestKit(actorSystem) {{
final ConnectionId randomConnectionId = TestConstants.createRandomConnectionId();
final Connection connection =
TestConstants.createConnection(randomConnectionId, new Target[0]);

final int connectingMaxTries = 5;
final long minBackOffAndTimeoutInMs = 50;
final Props props = DummyClientActor.props(connection, getRef(), getRef(), getRef(), delegate,
ConfigFactory.empty()
.withValue("ditto.connectivity.client.connecting-max-tries",
ConfigValueFactory.fromAnyRef(connectingMaxTries))
.withValue("ditto.connectivity.client.connecting-min-timeout",
ConfigValueFactory.fromAnyRef(minBackOffAndTimeoutInMs + "ms"))
.withValue("ditto.connectivity.client.min-backoff",
ConfigValueFactory.fromAnyRef(minBackOffAndTimeoutInMs + "ms"))
);
final ActorRef dummyClientActor = watch(actorSystem.actorOf(props));

whenOpeningConnection(dummyClientActor, OpenConnection.of(randomConnectionId, DittoHeaders.empty()),
getRef());
thenExpectConnectClientCalled();

ensureRecoveryStatus(dummyClientActor, randomConnectionId, RecoveryStatus.UNKNOWN);

andConnectionNotSuccessful(dummyClientActor);
thenExpectConnectClientCalledAfterTimeout(
connectivityConfig.getClientConfig().getConnectingMinTimeout());

long nextBackoffInMs = minBackOffAndTimeoutInMs;
final long staticWaitOverhead = 100;
for (int i = 0; i < connectingMaxTries; i++) {
ensureRecoveryStatus(dummyClientActor, randomConnectionId, RecoveryStatus.ONGOING);

try {
nextBackoffInMs = nextBackoffInMs * 2;
TimeUnit.MILLISECONDS.sleep(staticWaitOverhead + nextBackoffInMs);
} catch (final InterruptedException e) {
Thread.currentThread().interrupt();
}
}

ensureRecoveryStatus(dummyClientActor, randomConnectionId, RecoveryStatus.BACK_OFF_LIMIT_REACHED);
}

private void ensureRecoveryStatus(final ActorRef dummyClientActor, final ConnectionId randomConnectionId,
final RecoveryStatus unknown) {
dummyClientActor.tell(RetrieveConnectionStatus.of(randomConnectionId, DittoHeaders.empty()), getRef());
final ResourceStatus clientStatus = (ResourceStatus)
fishForMessage(Duration.ofSeconds(1), "Client status", o -> o instanceof ResourceStatus rs &&
rs.getResourceType() == ResourceStatus.ResourceType.CLIENT);
assertThat(clientStatus.getRecoveryStatus()).contains(unknown);
}
};
}

@Test
public void handlesCloseConnectionInConnectingState() {
new TestKit(actorSystem) {{
Expand Down Expand Up @@ -689,9 +752,10 @@ public DummyClientActor(final Connection connection,
final ActorRef connectionActor,
final ActorRef proxyActor,
final ActorRef publisherActor,
final BaseClientActor delegate) {
final BaseClientActor delegate,
final Config config) {

super(connection, proxyActor, connectionActor, DittoHeaders.empty(), ConfigFactory.empty());
super(connection, proxyActor, connectionActor, DittoHeaders.empty(), config);
this.publisherActor = publisherActor;
this.delegate = delegate;
}
Expand All @@ -707,9 +771,29 @@ public DummyClientActor(final Connection connection,
*/
public static Props props(final Connection connection, final ActorRef connectionActor,
final ActorRef proxyActor,
final ActorRef publisherActor, final BaseClientActor delegate) {
final ActorRef publisherActor,
final BaseClientActor delegate) {
return Props.create(DummyClientActor.class, connection, connectionActor, proxyActor,
publisherActor, delegate, ConfigFactory.empty());
}

/**
* Creates Akka configuration object for this actor.
*
* @param connection the connection.
* @param connectionActor the connectionPersistenceActor which created this client.
* @param proxyActor the actor used to send signals into the ditto cluster.
* @param publisherActor the actor that publishes to external system
* @param config the config to pass the BaseClientActor, e.g. containing test specific overrides.
* @return the Akka configuration Props object.
*/
public static Props props(final Connection connection, final ActorRef connectionActor,
final ActorRef proxyActor,
final ActorRef publisherActor,
final BaseClientActor delegate,
final Config config) {
return Props.create(DummyClientActor.class, connection, connectionActor, proxyActor,
publisherActor, delegate);
publisherActor, delegate, config);
}

@Override
Expand Down

0 comments on commit 43552ed

Please sign in to comment.