Skip to content

Commit

Permalink
Fix client shard region name; reintroduce CLOSE_CONNECTION stage to r…
Browse files Browse the repository at this point in the history
…estore connection connection announcements.

Signed-off-by: Yufei Cai <yufei.cai@bosch.io>
  • Loading branch information
yufei-cai committed Oct 6, 2022
1 parent 67c8f3c commit 8ef2450
Show file tree
Hide file tree
Showing 6 changed files with 29 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ private static ActorRef startClientShardRegion(final ActorSystem actorSystem, fi
final ClusterShardingSettings shardingSettings = ClusterShardingSettings.create(actorSystem)
.withRole(ConnectivityMessagingConstants.CLUSTER_ROLE);
return ClusterSharding.get(actorSystem)
.start(ConnectivityMessagingConstants.SHARD_REGION, props, shardingSettings,
.start(ConnectivityMessagingConstants.CLIENT_SHARD_REGION, props, shardingSettings,
ShardRegionExtractor.of(numberOfShards, actorSystem));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -611,6 +611,7 @@ private void interpretStagedCommand(final StagedCommand command) {
}
case OPEN_CONNECTION -> openConnection(command.next(), false);
case OPEN_CONNECTION_IGNORE_ERRORS -> openConnection(command.next(), true);
case CLOSE_CONNECTION -> closeConnection(command.next());
case STOP_CLIENT_ACTORS -> {
stopClientActors();
interpretStagedCommand(command.next());
Expand Down Expand Up @@ -821,6 +822,18 @@ private void handleOpenConnectionError(final Throwable error, final boolean igno
}
}

private void closeConnection(final StagedCommand command) {
final CloseConnection closeConnection = CloseConnection.of(entityId, command.getDittoHeaders());
askAllClientActors(closeConnection)
.thenAccept(response -> getSelf().tell(command, ActorRef.noSender()))
.exceptionally(error -> {
// stop client actors anyway --- the closed status is already persisted.
stopClientActors();
handleException("disconnect", command.getSender(), error);
return null;
});
}

private void logDroppedSignal(final WithDittoHeaders withDittoHeaders, final String type, final String reason) {
log.withCorrelationId(withDittoHeaders).debug("Signal ({}) dropped: {}", type, reason);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,11 @@ public enum ConnectionAction {
*/
OPEN_CONNECTION,

/**
* Tell client actors to close the connection.
*/
CLOSE_CONNECTION,

/**
* Stop client actors.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,8 @@ protected Result<ConnectivityEvent<?>> doApply(final Context<ConnectionState> co
CloseConnectionResponse.of(context.getState().id(), command.getDittoHeaders());
final List<ConnectionAction> actions =
Arrays.asList(ConnectionAction.PERSIST_AND_APPLY_EVENT, ConnectionAction.CHECK_LOGGING_ENABLED,
ConnectionAction.STOP_CLIENT_ACTORS, ConnectionAction.SEND_RESPONSE);
ConnectionAction.CLOSE_CONNECTION, ConnectionAction.STOP_CLIENT_ACTORS,
ConnectionAction.SEND_RESPONSE);
return newMutationResult(StagedCommand.of(command, event, response, actions), event, response);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,9 @@ protected Result<ConnectivityEvent<?>> doApply(final Context<ConnectionState> co
// Not closing the connection asynchronously; rely on client actors to cleanup all resources when stopped.
final List<ConnectionAction> actions =
Arrays.asList(ConnectionAction.PERSIST_AND_APPLY_EVENT, ConnectionAction.CHECK_LOGGING_ENABLED,
ConnectionAction.STOP_CLIENT_ACTORS, ConnectionAction.DISABLE_LOGGING,
ConnectionAction.SEND_RESPONSE, ConnectionAction.BECOME_DELETED);
ConnectionAction.CLOSE_CONNECTION, ConnectionAction.STOP_CLIENT_ACTORS,
ConnectionAction.DISABLE_LOGGING, ConnectionAction.SEND_RESPONSE,
ConnectionAction.BECOME_DELETED);
return newMutationResult(StagedCommand.of(command, event, response, actions), event, response);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,14 +83,15 @@ protected Result<ConnectivityEvent<?>> doApply(final Context<ConnectionState> co
if (isNextConnectionOpen) {
context.getLog().withCorrelationId(command)
.debug("Desired connection state is OPEN");
actions = Arrays.asList(ConnectionAction.PERSIST_AND_APPLY_EVENT, ConnectionAction.OPEN_CONNECTION,
ConnectionAction.CHECK_LOGGING_ENABLED, ConnectionAction.SEND_RESPONSE);
actions = Arrays.asList(ConnectionAction.PERSIST_AND_APPLY_EVENT, ConnectionAction.CLOSE_CONNECTION,
ConnectionAction.OPEN_CONNECTION, ConnectionAction.CHECK_LOGGING_ENABLED,
ConnectionAction.SEND_RESPONSE);
} else {
context.getLog().withCorrelationId(command)
.debug("Desired connection state is not OPEN");
actions = Arrays.asList(ConnectionAction.PERSIST_AND_APPLY_EVENT,
ConnectionAction.CHECK_LOGGING_ENABLED, ConnectionAction.STOP_CLIENT_ACTORS,
ConnectionAction.SEND_RESPONSE);
ConnectionAction.CHECK_LOGGING_ENABLED, ConnectionAction.CLOSE_CONNECTION,
ConnectionAction.STOP_CLIENT_ACTORS, ConnectionAction.SEND_RESPONSE);
}
return newMutationResult(StagedCommand.of(command, event, response, actions), event, response);
} else {
Expand Down

0 comments on commit 8ef2450

Please sign in to comment.