Skip to content

Commit

Permalink
Fix test when open connection with automated logging activation
Browse files Browse the repository at this point in the history
Signed-off-by: Vadim Guenther <vadim.guenther@bosch.io>
  • Loading branch information
VadimGue committed Apr 27, 2021
1 parent 91f527f commit 16ba24a
Showing 1 changed file with 38 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
import org.eclipse.ditto.signals.commands.connectivity.modify.DeleteConnectionResponse;
import org.eclipse.ditto.signals.commands.connectivity.modify.EnableConnectionLogs;
import org.eclipse.ditto.signals.commands.connectivity.modify.EnableConnectionLogsResponse;
import org.eclipse.ditto.signals.commands.connectivity.modify.LoggingExpired;
import org.eclipse.ditto.signals.commands.connectivity.modify.ModifyConnection;
import org.eclipse.ditto.signals.commands.connectivity.modify.ModifyConnectionResponse;
import org.eclipse.ditto.signals.commands.connectivity.modify.OpenConnection;
Expand Down Expand Up @@ -130,6 +131,8 @@ public final class ConnectionPersistenceActorTest extends WithMockServers {
private RetrieveConnectionResponse retrieveModifiedConnectionResponse;
private RetrieveConnectionStatusResponse retrieveConnectionStatusOpenResponse;
private ConnectionNotAccessibleException connectionNotAccessibleException;
private EnableConnectionLogs enableConnectionLogs;
private EnableConnectionLogsResponse enableConnectionLogsResponse;

// second actor system to test multiple client actors
private ActorSystem actorSystem2;
Expand Down Expand Up @@ -199,6 +202,11 @@ public void init() {
"target1", "publisher started")))
.build();
connectionNotAccessibleException = ConnectionNotAccessibleException.newBuilder(connectionId).build();

enableConnectionLogs = EnableConnectionLogs.of(connectionId, DittoHeaders.empty());
enableConnectionLogsResponse = EnableConnectionLogsResponse.of(connectionId,
enableConnectionLogs.getDittoHeaders());

}

@Test
Expand Down Expand Up @@ -272,6 +280,7 @@ public void manageConnection() {

// create connection
underTest.tell(createConnection, getRef());
probe.expectMsg(enableConnectionLogs);
probe.expectMsg(openConnection);
expectMsg(createConnectionResponse);

Expand Down Expand Up @@ -301,6 +310,7 @@ public void deleteConnectionUpdatesSubscriptions() {

// create connection
underTest.tell(createConnection, getRef());
probe.expectMsg(enableConnectionLogs);
probe.expectMsg(openConnection);
expectMsg(createConnectionResponse);

Expand All @@ -315,10 +325,6 @@ public void deleteConnectionUpdatesSubscriptions() {

@Test
public void manageConnectionWith2Clients() throws Exception {
final EnableConnectionLogs enableConnectionLogs = EnableConnectionLogs.of(connectionId, DittoHeaders.empty());
final EnableConnectionLogsResponse enableConnectionLogsResponse =
EnableConnectionLogsResponse.of(connectionId, enableConnectionLogs.getDittoHeaders());

startSecondActorSystemAndJoinCluster();
new TestKit(actorSystem) {{
final TestProbe gossipProbe = TestProbe.apply("gossip", actorSystem);
Expand All @@ -338,9 +344,9 @@ public void manageConnectionWith2Clients() throws Exception {
underTest.tell(gossipProbe.expectMsgClass(ActorRef.class), ActorRef.noSender());
underTest.tell(gossipProbe.expectMsgClass(ActorRef.class), ActorRef.noSender());
// one client actor receives the command
probe.expectMsg(enableConnectionLogs);
probe.expectMsg(openConnection);
probe.expectMsg(enableConnectionLogs);
expectMsg(enableConnectionLogsResponse);
expectMsgClass(OpenConnectionResponse.class);

// forward signal once
Expand Down Expand Up @@ -371,6 +377,7 @@ public void createConnectionAfterDeleted() {

// create connection
underTest.tell(createConnection, getRef());
clientActorMock.expectMsg(enableConnectionLogs);
clientActorMock.expectMsg(openConnection);
expectMsg(createConnectionResponse);

Expand All @@ -381,6 +388,7 @@ public void createConnectionAfterDeleted() {
// create connection again (while ConnectionActor is in deleted state)
underTest.tell(createConnection, getRef());
expectMsg(createConnectionResponse);
clientActorMock.expectMsg(enableConnectionLogs);
clientActorMock.expectMsg(openConnection);
}};
}
Expand All @@ -397,6 +405,7 @@ public void openConnectionAfterDeletedFails() {

// create connection
underTest.tell(createConnection, getRef());
clientActorMock.expectMsg(enableConnectionLogs);
clientActorMock.expectMsg(openConnection);
expectMsg(createConnectionResponse);

Expand Down Expand Up @@ -574,6 +583,7 @@ public void modifyConnectionInClosedState() {

// create connection
underTest.tell(createConnection, getRef());
probe.expectMsg(enableConnectionLogs);
probe.expectMsg(openConnection);
expectMsg(createConnectionResponse);

Expand Down Expand Up @@ -636,6 +646,7 @@ public void modifyConnectionClosesAndRestartsClientActor() {

// create connection
underTest.tell(createConnection, commandSender.ref());
mockClientProbe.expectMsg(enableConnectionLogs);
mockClientProbe.expectMsg(FiniteDuration.create(5, TimeUnit.SECONDS), openConnection);
commandSender.expectMsg(createConnectionResponse);

Expand All @@ -649,7 +660,9 @@ public void modifyConnectionClosesAndRestartsClientActor() {
// unsubscribe is called for topics of unmodified connection
expectTerminated(clientActor);

// and sends an open connection (if desired state is open)
// and sends an open connection (if desired state is open). Since logging is enabled from creation
// enabledConnectionLogs is also expected
mockClientProbe.expectMsg(enableConnectionLogs);
mockClientProbe.expectMsg(openConnection);
// finally the response is sent
commandSender.expectMsg(modifyConnectionResponse);
Expand All @@ -675,7 +688,8 @@ public void recoverOpenConnection() {
expectMsg(createConnectionResponse);

// wait for open connection of initial creation
mockClientProbe.expectMsg(OpenConnection.of(connectionId, DittoHeaders.empty()));
mockClientProbe.expectMsg(enableConnectionLogs);
mockClientProbe.expectMsg(openConnection);

// stop actor
getSystem().stop(underTest);
Expand All @@ -689,7 +703,7 @@ public void recoverOpenConnection() {
recoveredMockClientProbe.ref())));

// connection is opened after recovery -> recovered client actor receives OpenConnection command
recoveredMockClientProbe.expectMsg(OpenConnection.of(connectionId, DittoHeaders.empty()));
recoveredMockClientProbe.expectMsg(openConnection);

// poll connection status until status is OPEN
final ActorRef recoveredActor = underTest;
Expand Down Expand Up @@ -864,6 +878,7 @@ public void testResetConnectionMetrics() {

// create connection
underTest.tell(createConnection, getRef());
probe.expectMsg(enableConnectionLogs);
probe.expectMsg(openConnection);
expectMsg(createConnectionResponse);

Expand All @@ -890,6 +905,7 @@ public void testConnectionActorRespondsToCleanupCommand() {

// create connection
underTest.tell(createConnection, getRef());
probe.expectMsg(enableConnectionLogs);
probe.expectMsg(openConnection);
expectMsg(createConnectionResponse);

Expand All @@ -902,9 +918,6 @@ public void testConnectionActorRespondsToCleanupCommand() {

@Test
public void enableConnectionLogs() {
final EnableConnectionLogs enableConnectionLogs = EnableConnectionLogs.of(connectionId, DittoHeaders.empty());
final EnableConnectionLogsResponse enableConnectionLogsResponse =
EnableConnectionLogsResponse.of(connectionId, enableConnectionLogs.getDittoHeaders());
new TestKit(actorSystem) {{

final TestProbe probe = TestProbe.apply(actorSystem);
Expand All @@ -916,6 +929,7 @@ public void enableConnectionLogs() {

// create connection
underTest.tell(createConnection, getRef());
probe.expectMsg(enableConnectionLogs);
probe.expectMsg(openConnection);
expectMsg(createConnectionResponse);

Expand Down Expand Up @@ -966,10 +980,6 @@ public void retrieveLogsIsAggregated() {
now.plusSeconds(123),
DittoHeaders.empty());

final EnableConnectionLogs enableConnectionLogs = EnableConnectionLogs.of(connectionId, DittoHeaders.empty());
final EnableConnectionLogsResponse enableConnectionLogsResponse =
EnableConnectionLogsResponse.of(connectionId, enableConnectionLogs.getDittoHeaders());

new TestKit(actorSystem) {{
final TestProbe probe = TestProbe.apply(actorSystem);
final ActorRef underTest =
Expand All @@ -980,15 +990,15 @@ public void retrieveLogsIsAggregated() {

// create connection
underTest.tell(createConnection, getRef());
probe.expectMsg(openConnection);
probe.expectMsg(enableConnectionLogs);
expectMsg(enableConnectionLogsResponse);
probe.expectMsg(openConnection);
expectMsg(createConnectionResponse);

// retrieve logs
final RetrieveConnectionLogs retrieveConnectionLogs =
RetrieveConnectionLogs.of(connectionId, DittoHeaders.empty());
underTest.tell(retrieveConnectionLogs, getRef());
probe.expectMsg(enableConnectionLogs);
probe.expectMsg(retrieveConnectionLogs);

// send answer to aggregator actor
Expand All @@ -1015,6 +1025,7 @@ public void resetConnectionLogs() {

// create connection
underTest.tell(createConnection, getRef());
probe.expectMsg(enableConnectionLogs);
probe.expectMsg(openConnection);
expectMsg(createConnectionResponse);

Expand All @@ -1028,9 +1039,6 @@ public void resetConnectionLogs() {

@Test
public void enabledConnectionLogsAreEnabledAgainAfterModify() {
final EnableConnectionLogs enableConnectionLogs = EnableConnectionLogs.of(connectionId, DittoHeaders.empty());
final EnableConnectionLogsResponse enableConnectionLogsResponse =
EnableConnectionLogsResponse.of(connectionId, enableConnectionLogs.getDittoHeaders());
new TestKit(actorSystem) {{

final TestProbe probe = TestProbe.apply(actorSystem);
Expand All @@ -1042,6 +1050,7 @@ public void enabledConnectionLogsAreEnabledAgainAfterModify() {

// create connection
underTest.tell(createConnection, getRef());
probe.expectMsg(enableConnectionLogs);
probe.expectMsg(openConnection);
expectMsg(createConnectionResponse);

Expand All @@ -1055,6 +1064,7 @@ public void enabledConnectionLogsAreEnabledAgainAfterModify() {
// modify connection
underTest.tell(modifyConnection, getRef());
probe.expectMsg(closeConnection);
probe.expectMsg(enableConnectionLogs);
probe.expectMsg(openConnection);
expectMsg(modifyConnectionResponse);

Expand All @@ -1065,9 +1075,6 @@ public void enabledConnectionLogsAreEnabledAgainAfterModify() {

@Test
public void disabledConnectionLogsAreNotEnabledAfterModify() {
final EnableConnectionLogs enableConnectionLogs = EnableConnectionLogs.of(connectionId, DittoHeaders.empty());
final EnableConnectionLogsResponse enableConnectionLogsResponse =
EnableConnectionLogsResponse.of(connectionId, enableConnectionLogs.getDittoHeaders());
new TestKit(actorSystem) {{

final TestProbe probe = TestProbe.apply(actorSystem);
Expand All @@ -1079,11 +1086,13 @@ public void disabledConnectionLogsAreNotEnabledAfterModify() {

// create connection
underTest.tell(createConnection, getRef());
probe.expectMsg(openConnection);
probe.expectMsg(enableConnectionLogs);
expectMsg(enableConnectionLogsResponse);
probe.expectMsg(openConnection);
expectMsg(createConnectionResponse);

//Close logging which are automatically enabled via create connection
underTest.tell(LoggingExpired.of(connectionId, DittoHeaders.empty()), getRef());

// modify connection
underTest.tell(modifyConnection, getRef());
probe.expectMsg(closeConnection);
Expand Down Expand Up @@ -1131,6 +1140,10 @@ public void preStart() {
underTest.tell(createClosedConnectionWith2Clients, getRef());
expectMsgClass(CreateConnectionResponse.class);
underTest.tell(OpenConnection.of(myConnectionId, DittoHeaders.empty()), getRef());
assertThat(clientActorsProbe.expectMsgClass(WithSender.class).getMessage())
.isInstanceOf(EnableConnectionLogs.class);
assertThat(clientActorsProbe.expectMsgClass(WithSender.class).getMessage())
.isInstanceOf(EnableConnectionLogs.class);
assertThat(clientActorsProbe.expectMsgClass(WithSender.class).getMessage())
.isInstanceOf(OpenConnection.class);
clientActorsProbe.reply(new Status.Success("connected"));
Expand Down

0 comments on commit 16ba24a

Please sign in to comment.