Skip to content

Commit

Permalink
Close connection before deleting it (which will automatically send a …
Browse files Browse the repository at this point in the history
…connection announcement)

Signed-off-by: Florian Fendt <Florian.Fendt@bosch.io>
  • Loading branch information
ffendt committed May 7, 2021
1 parent d80c088 commit 7d84a3e
Show file tree
Hide file tree
Showing 5 changed files with 54 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@
import org.eclipse.ditto.connectivity.model.signals.events.ConnectivityEvent;

/**
* This strategy handles the {@link org.eclipse.ditto.connectivity.model.signals.commands.modify.DeleteConnection} command.
* This strategy handles the {@link org.eclipse.ditto.connectivity.model.signals.commands.modify.DeleteConnection}
* command.
*/
final class DeleteConnectionStrategy extends AbstractConnectivityCommandStrategy<DeleteConnection> {

Expand All @@ -53,8 +54,11 @@ protected Result<ConnectivityEvent<?>> doApply(final Context<ConnectionState> co
DeleteConnectionResponse.of(context.getState().id(), command.getDittoHeaders());
// Not closing the connection asynchronously; rely on client actors to cleanup all resources when stopped.
final List<ConnectionAction> actions =
Arrays.asList(ConnectionAction.PERSIST_AND_APPLY_EVENT, ConnectionAction.UPDATE_SUBSCRIPTIONS, ConnectionAction.STOP_CLIENT_ACTORS,
ConnectionAction.DISABLE_LOGGING, ConnectionAction.SEND_RESPONSE, ConnectionAction.BECOME_DELETED);
Arrays.asList(ConnectionAction.PERSIST_AND_APPLY_EVENT, ConnectionAction.UPDATE_SUBSCRIPTIONS,
ConnectionAction.CLOSE_CONNECTION, ConnectionAction.STOP_CLIENT_ACTORS,
ConnectionAction.DISABLE_LOGGING, ConnectionAction.SEND_RESPONSE,
ConnectionAction.BECOME_DELETED);
return newMutationResult(StagedCommand.of(command, event, response, actions), event, response);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
import org.eclipse.ditto.connectivity.model.signals.commands.modify.DeleteConnection;
import org.eclipse.ditto.connectivity.model.signals.commands.modify.DeleteConnectionResponse;
import org.eclipse.ditto.connectivity.model.signals.commands.modify.OpenConnection;
import org.eclipse.ditto.connectivity.service.config.DittoConnectivityConfig;
import org.eclipse.ditto.internal.utils.config.DefaultScopedConfig;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
Expand All @@ -43,10 +45,17 @@ public class ErrorHandlingActorTest extends WithMockServers {
private static ActorSystem actorSystem;
private static ActorRef proxyActor;
private static ActorRef pubSubMediator;
private static Duration CONNECT_TIMEOUT;
private static Duration DISCONNECT_TIMEOUT;

@BeforeClass
public static void setUp() {
actorSystem = ActorSystem.create("AkkaTestSystem", TestConstants.CONFIG);
final DittoConnectivityConfig connectivityConfig =
DittoConnectivityConfig.of(DefaultScopedConfig.dittoScoped(actorSystem.settings().config()));
CONNECT_TIMEOUT = connectivityConfig.getClientConfig().getConnectingMinTimeout();
DISCONNECT_TIMEOUT = connectivityConfig.getClientConfig().getDisconnectAnnouncementTimeout()
.plus(connectivityConfig.getClientConfig().getDisconnectingMaxTimeout());
pubSubMediator = DistributedPubSub.get(actorSystem).mediator();
proxyActor = actorSystem.actorOf(TestConstants.ProxyActorMock.props());
}
Expand All @@ -67,7 +76,7 @@ public void tryCreateConnectionExpectSuccessResponseIndependentOfConnectionStatu
final ActorRef underTest = TestConstants.createConnectionSupervisorActor(connectionId, actorSystem,
pubSubMediator, proxyActor,
(connection1, connectionActor, proxyActor) ->
FaultyClientActor.props(false));
FaultyClientActor.props(false, false));
watch(underTest);

// create connection
Expand All @@ -92,22 +101,29 @@ public void tryDeleteConnectionExpectErrorResponse() {
new TestKit(actorSystem) {{
final ConnectionId connectionId = TestConstants.createRandomConnectionId();
final Connection connection = TestConstants.createConnection(connectionId);

// need to allow close commands, since the ConnectionActor will send a CloseConnection to the clients upon
// receiving a DeleteConnection
final ClientActorPropsFactory faultyClientActorWithCloseCommands =
(c, cA, pA) -> FaultyClientActor.props(true, true);

final ActorRef underTest =
TestConstants.createConnectionSupervisorActor(connectionId, actorSystem, pubSubMediator,
proxyActor, FaultyClientActor.faultyClientActorPropsFactory);
proxyActor, faultyClientActorWithCloseCommands);
watch(underTest);

// create connection
final CreateConnection createConnection = CreateConnection.of(connection, DittoHeaders.empty());
underTest.tell(createConnection, getRef());
final CreateConnectionResponse createConnectionResponse =
CreateConnectionResponse.of(connection, DittoHeaders.empty());
expectMsg(dilated(Duration.ofSeconds(5)), createConnectionResponse);
expectMsg(dilated(CONNECT_TIMEOUT), createConnectionResponse);

// delete connection
final ConnectivityModifyCommand<?> command = DeleteConnection.of(connectionId, DittoHeaders.empty());
underTest.tell(command, getRef());
expectMsg(dilated(Duration.ofSeconds(5)), DeleteConnectionResponse.of(connectionId, DittoHeaders.empty()));
expectMsg(dilated(DISCONNECT_TIMEOUT),
DeleteConnectionResponse.of(connectionId, DittoHeaders.empty()));
}};
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,23 +24,25 @@
import akka.event.DiagnosticLoggingAdapter;

/**
* A ClientActor implementation that fails for every command received and answers with an exception.
* If {@code allowCreate} is {@code true} the first create command will return success (required to test open/close/delete).
* A ClientActor implementation that fails for every command received and answers with an exception. If {@code
* allowCreate} is {@code true} the first create command will return success (required to test open/close/delete).
*/
public class FaultyClientActor extends AbstractActor {

static final ClientActorPropsFactory faultyClientActorPropsFactory =
(connection, connectionActor, proxyActor) -> FaultyClientActor.props(true);
(connection, connectionActor, proxyActor) -> FaultyClientActor.props(true, false);

private final DiagnosticLoggingAdapter log = DittoLoggerFactory.getDiagnosticLoggingAdapter(this);
private final boolean allowClose;
private boolean allowCreate;

private FaultyClientActor(final boolean allowCreate) {
private FaultyClientActor(final boolean allowCreate, final boolean allowClose) {
this.allowCreate = allowCreate;
this.allowClose = allowClose;
}

public static Props props(final boolean allowFirstCreateCommand) {
return Props.create(FaultyClientActor.class, allowFirstCreateCommand);
public static Props props(final boolean allowFirstCreateCommand, final boolean allowCloseCommands) {
return Props.create(FaultyClientActor.class, allowFirstCreateCommand, allowCloseCommands);
}

@Override
Expand All @@ -60,11 +62,18 @@ public Receive createReceive() {
oc -> sender().tell(new Status.Failure(new IllegalStateException("error message")),
getSelf()))
.match(CloseConnection.class,
cc -> sender().tell(new Status.Failure(new IllegalStateException("error message")),
getSelf()))
cc -> {
if (allowClose) {
sender().tell(new Status.Success("mock"), getSelf());
} else {
sender().tell(new Status.Success(new IllegalStateException("error message")),
getSelf());
}
})
.match(DeleteConnection.class,
dc -> sender().tell(new Status.Failure(new IllegalStateException("error message")),
getSelf()))
.build();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -296,26 +296,26 @@ public void manageConnection() {
}

@Test
public void deleteConnectionUpdatesSubscriptions() {
public void deleteConnectionUpdatesSubscriptionsAndClosesConnection() {
new TestKit(actorSystem) {{
final TestProbe probe = TestProbe.apply(actorSystem);
final TestProbe clientActorMock = TestProbe.apply(actorSystem);
final ActorRef underTest = TestConstants.createConnectionSupervisorActor(
connectionId, actorSystem, proxyActor,
(connection, concierge, connectionActor) -> MockClientActor.props(probe.ref()),
(connection, concierge, connectionActor) -> MockClientActor.props(clientActorMock.ref()),
pubSubMediator
);
watch(underTest);

// create connection
underTest.tell(createConnection, getRef());
probe.expectMsg(enableConnectionLogs);
probe.expectMsg(openConnection);
clientActorMock.expectMsg(enableConnectionLogs);
clientActorMock.expectMsg(openConnection);
expectMsg(createConnectionResponse);

// delete connection
underTest.tell(deleteConnection, getRef());
clientActorMock.expectMsg(closeConnection);
expectMsg(deleteConnectionResponse);
probe.expectNoMessage();
expectTerminated(underTest);
}};

Expand Down Expand Up @@ -381,6 +381,7 @@ public void createConnectionAfterDeleted() {

// delete connection
underTest.tell(deleteConnection, getRef());
clientActorMock.expectMsg(closeConnection);
expectMsg(deleteConnectionResponse);

// create connection again (while ConnectionActor is in deleted state)
Expand Down
5 changes: 3 additions & 2 deletions connectivity/service/src/test/resources/logback-test.xml
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,9 @@
</appender>

<!-- Akka and Ditto log levels are reduced to reduce log verbosity. Increase them to debug tests. -->
<logger name="akka" level="WARN"/>
<logger name="org.eclipse.ditto" level="OFF"/>
<!-- TODO ff reset to WARN and OFF -->
<logger name="akka" level="INFO"/>
<logger name="org.eclipse.ditto" level="DEBUG"/>
<logger name="org.apache.qpid.jms.provider.ProviderFactory" level="OFF"/>
<logger name="org.apache.kafka" level="WARN"/>

Expand Down

0 comments on commit 7d84a3e

Please sign in to comment.