Skip to content

Commit

Permalink
Open and create connection with automated logging activation
Browse files Browse the repository at this point in the history
Signed-off-by: Vadim Guenther <vadim.guenther@bosch.io>
  • Loading branch information
VadimGue committed Apr 26, 2021
1 parent 9104c8d commit 91f527f
Show file tree
Hide file tree
Showing 4 changed files with 24 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -934,6 +934,7 @@ private void startClientActorsIfRequired(final int clientCount) {

// start client actor without name so it does not conflict with its previous incarnation
clientActorRouter = getContext().actorOf(clusterRouterPoolProps);
updateLoggingIfEnabled();
} else if (clientActorRouter != null) {
log.debug("ClientActor already started.");
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

import static org.eclipse.ditto.model.connectivity.ConnectionLifecycle.ACTIVE;
import static org.eclipse.ditto.services.connectivity.messaging.persistence.stages.ConnectionAction.BECOME_CREATED;
import static org.eclipse.ditto.services.connectivity.messaging.persistence.stages.ConnectionAction.ENABLE_LOGGING;
import static org.eclipse.ditto.services.connectivity.messaging.persistence.stages.ConnectionAction.OPEN_CONNECTION_IGNORE_ERRORS;
import static org.eclipse.ditto.services.connectivity.messaging.persistence.stages.ConnectionAction.PERSIST_AND_APPLY_EVENT;
import static org.eclipse.ditto.services.connectivity.messaging.persistence.stages.ConnectionAction.SEND_RESPONSE;
Expand Down Expand Up @@ -69,7 +70,8 @@ protected Result<ConnectivityEvent<?>> doApply(final Context<ConnectionState> co
.debug("Connection <{}> has status <{}> and will therefore be opened.",
connection.getId(), connection.getConnectionStatus());
final List<ConnectionAction> actions = List.of(
PERSIST_AND_APPLY_EVENT, BECOME_CREATED, UPDATE_SUBSCRIPTIONS, SEND_RESPONSE, OPEN_CONNECTION_IGNORE_ERRORS);
ENABLE_LOGGING, PERSIST_AND_APPLY_EVENT, BECOME_CREATED, UPDATE_SUBSCRIPTIONS, SEND_RESPONSE,
OPEN_CONNECTION_IGNORE_ERRORS);
return newMutationResult(StagedCommand.of(command, event, response, actions), event, response);
} else {
return newMutationResult(command, event, response, true, false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
*/
package org.eclipse.ditto.services.connectivity.messaging.persistence.strategies.commands;

import static org.eclipse.ditto.services.connectivity.messaging.persistence.stages.ConnectionAction.ENABLE_LOGGING;
import static org.eclipse.ditto.services.connectivity.messaging.persistence.stages.ConnectionAction.OPEN_CONNECTION;
import static org.eclipse.ditto.services.connectivity.messaging.persistence.stages.ConnectionAction.PERSIST_AND_APPLY_EVENT;
import static org.eclipse.ditto.services.connectivity.messaging.persistence.stages.ConnectionAction.SEND_RESPONSE;
Expand Down Expand Up @@ -63,7 +64,8 @@ protected Result<ConnectivityEvent<?>> doApply(final Context<ConnectionState> co
final WithDittoHeaders response =
OpenConnectionResponse.of(context.getState().id(), command.getDittoHeaders());
final List<ConnectionAction> actions =
Arrays.asList(PERSIST_AND_APPLY_EVENT, OPEN_CONNECTION, UPDATE_SUBSCRIPTIONS, SEND_RESPONSE);
Arrays.asList(ENABLE_LOGGING, PERSIST_AND_APPLY_EVENT, OPEN_CONNECTION, UPDATE_SUBSCRIPTIONS,
SEND_RESPONSE);
return newMutationResult(StagedCommand.of(command, event, response, actions), event, response);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,10 @@ public void deleteConnectionUpdatesSubscriptions() {

@Test
public void manageConnectionWith2Clients() throws Exception {
final EnableConnectionLogs enableConnectionLogs = EnableConnectionLogs.of(connectionId, DittoHeaders.empty());
final EnableConnectionLogsResponse enableConnectionLogsResponse =
EnableConnectionLogsResponse.of(connectionId, enableConnectionLogs.getDittoHeaders());

startSecondActorSystemAndJoinCluster();
new TestKit(actorSystem) {{
final TestProbe gossipProbe = TestProbe.apply("gossip", actorSystem);
Expand All @@ -335,6 +339,8 @@ public void manageConnectionWith2Clients() throws Exception {
underTest.tell(gossipProbe.expectMsgClass(ActorRef.class), ActorRef.noSender());
// one client actor receives the command
probe.expectMsg(openConnection);
probe.expectMsg(enableConnectionLogs);
expectMsg(enableConnectionLogsResponse);
expectMsgClass(OpenConnectionResponse.class);

// forward signal once
Expand Down Expand Up @@ -960,6 +966,10 @@ public void retrieveLogsIsAggregated() {
now.plusSeconds(123),
DittoHeaders.empty());

final EnableConnectionLogs enableConnectionLogs = EnableConnectionLogs.of(connectionId, DittoHeaders.empty());
final EnableConnectionLogsResponse enableConnectionLogsResponse =
EnableConnectionLogsResponse.of(connectionId, enableConnectionLogs.getDittoHeaders());

new TestKit(actorSystem) {{
final TestProbe probe = TestProbe.apply(actorSystem);
final ActorRef underTest =
Expand All @@ -971,6 +981,8 @@ public void retrieveLogsIsAggregated() {
// create connection
underTest.tell(createConnection, getRef());
probe.expectMsg(openConnection);
probe.expectMsg(enableConnectionLogs);
expectMsg(enableConnectionLogsResponse);
expectMsg(createConnectionResponse);

// retrieve logs
Expand Down Expand Up @@ -1053,6 +1065,9 @@ public void enabledConnectionLogsAreEnabledAgainAfterModify() {

@Test
public void disabledConnectionLogsAreNotEnabledAfterModify() {
final EnableConnectionLogs enableConnectionLogs = EnableConnectionLogs.of(connectionId, DittoHeaders.empty());
final EnableConnectionLogsResponse enableConnectionLogsResponse =
EnableConnectionLogsResponse.of(connectionId, enableConnectionLogs.getDittoHeaders());
new TestKit(actorSystem) {{

final TestProbe probe = TestProbe.apply(actorSystem);
Expand All @@ -1065,6 +1080,8 @@ public void disabledConnectionLogsAreNotEnabledAfterModify() {
// create connection
underTest.tell(createConnection, getRef());
probe.expectMsg(openConnection);
probe.expectMsg(enableConnectionLogs);
expectMsg(enableConnectionLogsResponse);
expectMsg(createConnectionResponse);

// modify connection
Expand Down

0 comments on commit 91f527f

Please sign in to comment.