Skip to content

Commit

Permalink
Fix close-connection behavior of ClientSupervisor; fix ConnectionPers…
Browse files Browse the repository at this point in the history
…istenceActorTest.

Signed-off-by: Yufei Cai <yufei.cai@bosch.io>
  • Loading branch information
yufei-cai committed Oct 6, 2022
1 parent 8ef2450 commit dd63d4c
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import org.eclipse.ditto.connectivity.api.commands.sudo.SudoRetrieveConnectionStatusResponse;
import org.eclipse.ditto.connectivity.model.ConnectivityStatus;
import org.eclipse.ditto.connectivity.model.signals.commands.exceptions.ConnectionNotAccessibleException;
import org.eclipse.ditto.connectivity.model.signals.commands.modify.CloseConnection;
import org.eclipse.ditto.connectivity.model.signals.commands.modify.CloseConnectionResponse;
import org.eclipse.ditto.internal.utils.akka.logging.DittoDiagnosticLoggingAdapter;
import org.eclipse.ditto.internal.utils.akka.logging.DittoLoggerFactory;
import org.eclipse.ditto.internal.utils.cluster.ShardRegionExtractor;
Expand Down Expand Up @@ -109,6 +111,7 @@ public Receive createReceive() {
.match(SudoRetrieveConnectionStatusResponse.class, this::checkConnectionStatus)
.match(ConnectionNotAccessibleException.class, this::connectionNotAccessible)
.match(Terminated.class, this::childTerminated)
.match(CloseConnection.class, this::isNoClientActorStarted, this::respondAndStop)
.match(ConsistentHashingRouter.ConsistentHashableEnvelope.class, this::extractFromEnvelope)
.matchAny(this::forwardToClientActor)
.build();
Expand Down Expand Up @@ -184,6 +187,15 @@ private void connectionNotAccessible(final Object trigger) {
getContext().stop(getSelf());
}

private boolean isNoClientActorStarted(final Object message) {
return clientActor == null;
}

private void respondAndStop(final CloseConnection command) {
getSender().tell(CloseConnectionResponse.of(command.getEntityId(), command.getDittoHeaders()), getSelf());
getContext().stop(getSelf());
}

private enum Control {
STATUS_CHECK
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,8 @@ public void manageConnection() {
// close connection
final CloseConnection closeConnection = CloseConnection.of(connectionId, dittoHeadersWithCorrelationId);
underTest.tell(closeConnection, testProbe.ref());
expectMockClientActorMessage(closeConnection);
mockClientActorProbe.reply(new Status.Success("mock"));
clientWatcher.expectTerminated(clientActor, FiniteDuration.apply(3, "s"));
testProbe.expectMsg(CloseConnectionResponse.of(connectionId, dittoHeadersWithCorrelationId));

Expand All @@ -305,6 +307,8 @@ public void deleteConnectionTerminatesClientActors() {

// delete connection
underTest.tell(DeleteConnection.of(connectionId, dittoHeadersWithCorrelationId), testProbe.ref());
expectMockClientActorMessage(CloseConnection.of(connectionId, dittoHeadersWithCorrelationId));
mockClientActorProbe.reply(new Status.Success("mock"));
clientActorWatcher.expectTerminated(clientActor, FiniteDuration.apply(3, TimeUnit.SECONDS));
testProbe.expectMsg(DeleteConnectionResponse.of(connectionId, dittoHeadersWithCorrelationId));
testProbe.expectTerminated(underTest, FiniteDuration.apply(3, TimeUnit.SECONDS));
Expand All @@ -328,7 +332,8 @@ public TestActor.AutoPilot run(final ActorRef sender, final Object msg) {
});
final var underTest = createSupervisor();
final var testProbe = actorSystemResource1.newTestProbe();
testProbe.watch(underTest);
final var connectionWatcher = actorSystemResource1.newTestProbe();
connectionWatcher.watch(underTest);

// create closed connection
underTest.tell(createConnection(closedConnectionWith2Clients), testProbe.ref());
Expand Down Expand Up @@ -356,12 +361,16 @@ public TestActor.AutoPilot run(final ActorRef sender, final Object msg) {
// close connection: at least 1 client actor gets the command; the other may or may not be started.
final CloseConnection closeConnection = CloseConnection.of(connectionId, dittoHeadersWithCorrelationId);
underTest.tell(closeConnection, testProbe.ref());
for (int i = 0; i < 2; ++i) {
expectMockClientActorMessage(closeConnection);
mockClientActorProbe.reply(new Status.Success("mock"));
}
testProbe.expectMsg(CloseConnectionResponse.of(connectionId, dittoHeadersWithCorrelationId));

// delete connection
underTest.tell(DeleteConnection.of(connectionId, dittoHeadersWithCorrelationId), testProbe.ref());
connectionWatcher.expectTerminated(underTest, FiniteDuration.apply(5, TimeUnit.SECONDS));
testProbe.expectMsg(DeleteConnectionResponse.of(connectionId, dittoHeadersWithCorrelationId));
testProbe.expectTerminated(underTest, FiniteDuration.apply(5, TimeUnit.SECONDS));
}

private void startSecondActorSystemAndJoinCluster() throws Exception {
Expand All @@ -379,7 +388,7 @@ private void startSecondActorSystemAndJoinCluster() throws Exception {
public void createConnectionAfterDeleted() {
final var underTest = createSupervisor();
final var testProbe = actorSystemResource1.newTestProbe();
final var clientWatcher = actorSystemResource1.newTestProbe();
final var clientWatcher = actorSystemResource1.newTestProbe();
testProbe.watch(underTest);

// create connection
Expand All @@ -390,6 +399,8 @@ public void createConnectionAfterDeleted() {

// delete connection
underTest.tell(DeleteConnection.of(connectionId, dittoHeadersWithCorrelationId), testProbe.ref());
expectMockClientActorMessage(CloseConnection.of(connectionId, dittoHeadersWithCorrelationId));
mockClientActorProbe.reply(new Status.Success("mock"));
clientWatcher.expectTerminated(clientActor, FiniteDuration.apply(3, "s"));
testProbe.expectMsg(DeleteConnectionResponse.of(connectionId, dittoHeadersWithCorrelationId));

Expand All @@ -413,6 +424,8 @@ public void openConnectionAfterDeletedFails() {

// delete connection
underTest.tell(DeleteConnection.of(connectionId, dittoHeadersWithCorrelationId), testProbe.ref());
expectMockClientActorMessage(CloseConnection.of(connectionId, dittoHeadersWithCorrelationId));
mockClientActorProbe.reply(new Status.Success("mock"));
testProbe.expectMsg(DeleteConnectionResponse.of(connectionId, dittoHeadersWithCorrelationId));

// open connection should fail
Expand Down Expand Up @@ -599,7 +612,10 @@ public void modifyConnectionInClosedState() {
final var clientActor = clientWatcher.watch(gossipProbe.expectMsgClass(ActorRef.class));

// close connection
underTest.tell(CloseConnection.of(connectionId, dittoHeadersWithCorrelationId), testProbe.ref());
final var closeConnection = CloseConnection.of(connectionId, dittoHeadersWithCorrelationId);
underTest.tell(closeConnection, testProbe.ref());
expectMockClientActorMessage(closeConnection);
mockClientActorProbe.reply(new Status.Success("mock"));
clientWatcher.expectTerminated(clientActor, FiniteDuration.apply(3, "s"));
testProbe.expectMsg(CloseConnectionResponse.of(connectionId, dittoHeadersWithCorrelationId));

Expand Down Expand Up @@ -649,6 +665,8 @@ public void modifyConnectionClosesAndRestartsClientActor() {
// modify connection | Implicitly validates the restart by waiting for pubsub subscribe from client actor.
final var newHeaders = refresh(dittoHeadersWithCorrelationId);
underTest.tell(ModifyConnection.of(connection, newHeaders), testProbe.ref());
expectMockClientActorMessage(CloseConnection.of(connectionId, newHeaders));
mockClientActorProbe.reply(new Status.Success("mock"));
clientWatcher.expectTerminated(firstClientActor, FiniteDuration.apply(3, "s"));

// and sends an open connection (if desired state is open). Since logging is enabled from creation
Expand Down Expand Up @@ -730,6 +748,8 @@ public void recoverModifiedConnection() {
.targets(Collections.singletonList(TestConstants.Targets.MESSAGE_TARGET))
.build();
underTest.tell(ModifyConnection.of(modifiedConnection, dittoHeadersWithCorrelationId), testProbe.ref());
expectMockClientActorMessage(CloseConnection.of(connectionId, dittoHeadersWithCorrelationId));
mockClientActorProbe.reply(new Status.Success("mock"));
simulateSuccessfulOpenConnectionInClientActor();
testProbe.expectMsg(ModifyConnectionResponse.of(connectionId, dittoHeadersWithCorrelationId));

Expand Down Expand Up @@ -757,7 +777,10 @@ public void recoverClosedConnection() {
testProbe.expectMsg(createConnectionResponse());

// close connection
underTest.tell(CloseConnection.of(connectionId, dittoHeadersWithCorrelationId), testProbe.ref());
final var closeConnection = CloseConnection.of(connectionId, dittoHeadersWithCorrelationId);
underTest.tell(closeConnection, testProbe.ref());
expectMockClientActorMessage(closeConnection);
mockClientActorProbe.reply(new Status.Success("mock"));
testProbe.expectMsg(CloseConnectionResponse.of(connectionId, dittoHeadersWithCorrelationId));

// stop actor
Expand Down Expand Up @@ -798,6 +821,8 @@ public void recoverDeletedConnection() {

// delete connection
underTest.tell(DeleteConnection.of(connectionId, dittoHeadersWithCorrelationId), testProbe.ref());
expectMockClientActorMessage(CloseConnection.of(connectionId, dittoHeadersWithCorrelationId));
mockClientActorProbe.reply(new Status.Success("mock"));
testProbe.expectMsg(DeleteConnectionResponse.of(connectionId, dittoHeadersWithCorrelationId));
testProbe.expectTerminated(underTest, FiniteDuration.apply(3, TimeUnit.SECONDS));

Expand Down Expand Up @@ -1024,6 +1049,8 @@ public void enabledConnectionLogsAreEnabledAgainAfterModify() {

// modify connection
underTest.tell(ModifyConnection.of(connection, dittoHeadersWithCorrelationId), testProbe.ref());
expectMockClientActorMessage(CloseConnection.of(connectionId, dittoHeadersWithCorrelationId));
mockClientActorProbe.reply(new Status.Success("mock"));
simulateSuccessfulOpenConnectionInClientActor();
testProbe.expectMsg(ModifyConnectionResponse.of(connectionId, dittoHeadersWithCorrelationId));

Expand Down Expand Up @@ -1051,6 +1078,8 @@ public void disabledConnectionLogsAreNotEnabledAfterModify() {
// modify connection
final var newHeaders = refresh(dittoHeadersWithCorrelationId);
underTest.tell(ModifyConnection.of(connection, newHeaders), testProbe.ref());
expectMockClientActorMessage(CloseConnection.of(connectionId, newHeaders));
mockClientActorProbe.reply(new Status.Success("mock"));

clientWatcher.expectTerminated(clientActor1, FiniteDuration.apply(3, "s"));
final var clientActor2 = gossipProbe.expectMsgClass(ActorRef.class);
Expand Down

0 comments on commit dd63d4c

Please sign in to comment.