Skip to content

Commit

Permalink
Move client actor shutdown to shard region shutdown phase to prevent …
Browse files Browse the repository at this point in the history
…restarts during coordinated shutdown.

Signed-off-by: Yufei Cai <yufei.cai@bosch.io>
  • Loading branch information
yufei-cai committed Oct 7, 2022
1 parent 892aa4a commit fd4524b
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 48 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Predicate;
import java.util.function.Supplier;
Expand Down Expand Up @@ -353,20 +354,9 @@ private void registerConnectionPubSub() {
private void addCoordinatedShutdownTasks() {
final var system = getContext().getSystem();
final var coordinatedShutdown = CoordinatedShutdown.get(system);
if (shouldAnyTargetSendConnectionAnnouncements()) {
// only add clients of connections to Coordinated shutdown having connection announcements configured
coordinatedShutdown.addActorTerminationTask(
CoordinatedShutdown.PhaseServiceRequestsDone(),
"closeConnectionAndShutdown",
getSelf(),
Optional.of(CloseConnectionAndShutdown.INSTANCE)
);
}
final var id = getSelf().path().toString();
cancelOnStopTasks.add(coordinatedShutdown.addCancellableTask(CoordinatedShutdown.PhaseServiceUnbind(),
"service-unbind-" + id, askSelfShutdownTask(Control.SERVICE_UNBIND)));
coordinatedShutdown.addActorTerminationTask(CoordinatedShutdown.PhaseServiceRequestsDone(),
"service-requests-done" + id, getSelf(), Optional.of(Control.SERVICE_REQUESTS_DONE));
"service-unbind-" + id, askSelfServiceUnbind()));
}

private boolean shouldAnyTargetSendConnectionAnnouncements() {
Expand Down Expand Up @@ -519,7 +509,7 @@ protected FSMStateFunctionBuilder<BaseClientState, BaseClientData> inAnyState()
.event(FatalPubSubException.class, this::failConnectionDueToPubSubException)
.eventEquals(Control.ACKREGATOR_STARTED, this::ackregatorStarted)
.eventEquals(Control.ACKREGATOR_STOPPED, this::ackregatorStopped)
.eventEquals(Control.SERVICE_REQUESTS_DONE, this::serviceRequestsDone)
.eventEquals(Control.STOP_SHARDED_ACTOR, this::serviceRequestsDone)
.eventEquals(Control.SHUTDOWN_TIMEOUT, this::shutdownTimeout);
}

Expand Down Expand Up @@ -682,7 +672,6 @@ private FSMStateFunctionBuilder<BaseClientState, BaseClientData> inUnknownState(
private FSMStateFunctionBuilder<BaseClientState, BaseClientData> inInitializedState() {
return matchEvent(OpenConnection.class, this::openConnection)
.event(CloseConnection.class, this::closeConnection)
.event(CloseConnectionAndShutdown.class, this::closeConnectionAndShutdown)
.event(TestConnection.class, this::testConnection)
.eventEquals(Control.SERVICE_UNBIND, this::serviceUnbindWhenWaitingForCommand);
}
Expand Down Expand Up @@ -710,7 +699,6 @@ protected FSMStateFunctionBuilder<BaseClientState, BaseClientData> inConnectingS
.event(ClientConnected.class, this::clientConnectedInConnectingState)
.event(InitializationResult.class, this::handleInitializationResult)
.event(CloseConnection.class, this::closeConnection)
.event(CloseConnectionAndShutdown.class, this::closeConnectionAndShutdown)
.event(SshTunnelActor.TunnelStarted.class, this::tunnelStarted)
.eventEquals(Control.CONNECT_AFTER_TUNNEL_ESTABLISHED, this::connectAfterTunnelStarted)
.eventEquals(Control.GOTO_CONNECTED_AFTER_INITIALIZATION, this::gotoConnectedAfterInitialization)
Expand All @@ -726,7 +714,6 @@ protected FSMStateFunctionBuilder<BaseClientState, BaseClientData> inConnectingS
*/
protected FSMStateFunctionBuilder<BaseClientState, BaseClientData> inConnectedState() {
return matchEvent(CloseConnection.class, this::closeConnection)
.event(CloseConnectionAndShutdown.class, this::closeConnectionAndShutdown)
.event(SshTunnelActor.TunnelClosed.class, this::tunnelClosed)
.event(OpenConnection.class, this::connectionAlreadyOpen)
.event(ConnectionFailure.class, this::connectedConnectionFailed)
Expand Down Expand Up @@ -870,13 +857,12 @@ private FSM.State<BaseClientState, BaseClientData> serviceUnbindWhenConnected(fi
final var dittoProtocolUnsub = dittoProtocolSub.removeSubscriber(getSelf(), getTargetAuthSubjects());
final var connectionUnsub = connectionPubSub.unsubscribe(connectionId(), getSelf());
final var stopConsumers = stopConsuming();
final var sendAnnouncement = sendDisconnectAnnouncementInCoordinatedShutdown();
final var resultFuture = dittoProtocolUnsub.thenCompose(_void -> connectionUnsub)
.thenCompose(_void -> stopConsumers)
.thenCompose(_void -> sendAnnouncement)
.handle((result, error) -> Done.getInstance());
Patterns.pipe(resultFuture, getContext().dispatcher()).to(getSender());
if (shouldAnyTargetSendConnectionAnnouncements()) {
getSelf().tell(SEND_DISCONNECT_ANNOUNCEMENT, getSender());
}
return stay();
}

Expand All @@ -901,20 +887,8 @@ private FSM.State<BaseClientState, BaseClientData> serviceUnbindWhenTesting(fina
return stay();
}

private FSM.State<BaseClientState, BaseClientData> closeConnectionAndShutdown(
final CloseConnectionAndShutdown closeConnectionAndShutdown,
final BaseClientData data) {

return closeConnection(CloseConnection.of(connectionId(), DittoHeaders.empty()), data, true);
}

private FSM.State<BaseClientState, BaseClientData> closeConnection(final WithDittoHeaders closeConnection,
final BaseClientData data) {
return closeConnection(closeConnection, data, false);
}

private FSM.State<BaseClientState, BaseClientData> closeConnection(final WithDittoHeaders closeConnection,
final BaseClientData data, final boolean shutdownAfterDisconnect) {

final ActorRef sender = getSender();

Expand All @@ -925,11 +899,11 @@ private FSM.State<BaseClientState, BaseClientData> closeConnection(final WithDit
timeoutUntilDisconnectCompletes =
clientConfig.getDisconnectingMaxTimeout().plus(disconnectAnnouncementTimeout);
getSelf().tell(SEND_DISCONNECT_ANNOUNCEMENT, sender);
startSingleTimer("startDisconnect", new Disconnect(sender, shutdownAfterDisconnect),
startSingleTimer("startDisconnect", new Disconnect(sender, false),
disconnectAnnouncementTimeout);
} else {
timeoutUntilDisconnectCompletes = clientConfig.getDisconnectingMaxTimeout();
getSelf().tell(new Disconnect(sender, shutdownAfterDisconnect), sender);
getSelf().tell(new Disconnect(sender, false), sender);
}

dittoProtocolSub.removeSubscriber(getSelf());
Expand Down Expand Up @@ -2074,9 +2048,9 @@ private void startSubscriptionRefreshTimer() {
startSingleTimer(Control.RESUBSCRIBE.name(), Control.RESUBSCRIBE, randomizedDelay);
}

private Supplier<CompletionStage<Done>> askSelfShutdownTask(final Object question) {
private Supplier<CompletionStage<Done>> askSelfServiceUnbind() {
final var shutdownTimeout = Duration.ofMinutes(2);
return () -> Patterns.ask(getSelf(), question, shutdownTimeout).thenApply(answer -> Done.done());
return () -> Patterns.ask(getSelf(), Control.SERVICE_UNBIND, shutdownTimeout).thenApply(answer -> Done.done());
}

private FSM.State<BaseClientState, BaseClientData> ackregatorStarted(final Control event,
Expand All @@ -2089,7 +2063,7 @@ private FSM.State<BaseClientState, BaseClientData> ackregatorStopped(final Contr
final BaseClientData data) {
--ackregatorCount;
if (shuttingDown && ackregatorCount == 0) {
logger.info("{}: finished waiting for ackregators", Control.SERVICE_REQUESTS_DONE);
logger.info("{}: finished waiting for ackregators", Control.STOP_SHARDED_ACTOR);
return stop();
} else {
return stay();
Expand All @@ -2100,7 +2074,7 @@ private FSM.State<BaseClientState, BaseClientData> serviceRequestsDone(final Con
final BaseClientData data) {

shuttingDown = true;
logger.info("{}: ackregatorCount={}", Control.SERVICE_REQUESTS_DONE, ackregatorCount);
logger.info("{}: ackregatorCount={}", Control.STOP_SHARDED_ACTOR, ackregatorCount);
if (ackregatorCount == 0) {
return stop();
} else {
Expand All @@ -2117,6 +2091,19 @@ private FSM.State<BaseClientState, BaseClientData> shutdownTimeout(final Control
return stop();
}

private CompletionStage<?> sendDisconnectAnnouncementInCoordinatedShutdown() {
if (shouldAnyTargetSendConnectionAnnouncements()) {
final var disconnectAnnouncementTimeout =
connectivityConfig.getClientConfig().getDisconnectAnnouncementTimeout().toMillis();
getSelf().tell(SEND_DISCONNECT_ANNOUNCEMENT, ActorRef.noSender());
final CompletableFuture<?> waitForAnnouncementFuture = new CompletableFuture<>();
waitForAnnouncementFuture.completeOnTimeout(null, disconnectAnnouncementTimeout, TimeUnit.MILLISECONDS);
return waitForAnnouncementFuture;
} else {
return CompletableFuture.completedStage(null);
}
}

private static Optional<StreamingType> toStreamingTypes(final Topic topic) {
return switch (topic) {
case POLICY_ANNOUNCEMENTS -> Optional.of(StreamingType.POLICY_ANNOUNCEMENTS);
Expand Down Expand Up @@ -2341,15 +2328,6 @@ private boolean shutdownAfterDisconnect() {
}
}

private static final class CloseConnectionAndShutdown {

private static final CloseConnectionAndShutdown INSTANCE = new CloseConnectionAndShutdown();

private CloseConnectionAndShutdown() {
// no-op
}
}

/**
* Messages to control the life cycles of the actor.
*/
Expand All @@ -2359,7 +2337,7 @@ public enum Control {
GOTO_CONNECTED_AFTER_INITIALIZATION,
RESUBSCRIBE,
SERVICE_UNBIND,
SERVICE_REQUESTS_DONE,
STOP_SHARDED_ACTOR,
ACKREGATOR_STARTED,
ACKREGATOR_STOPPED,
SHUTDOWN_TIMEOUT
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,12 @@ private void restartIfOpen(final StopShardedActor stopShardedActor) {
.shardRegion(ConnectivityMessagingConstants.CLIENT_SHARD_REGION)
.tell(envelope, ActorRef.noSender());
}
getContext().stop(getSelf());
if (clientActor != null) {
// wait for ongoing ackregators in the client actor; terminate when the client actor terminates
clientActor.tell(BaseClientActor.Control.STOP_SHARDED_ACTOR, ActorRef.noSender());
} else {
getContext().stop(getSelf());
}
}

private enum Control {
Expand Down

0 comments on commit fd4524b

Please sign in to comment.