Skip to content

Commit

Permalink
Stop consuming and unsubscribe from PubSub on Service-Unbind.
Browse files Browse the repository at this point in the history
Signed-off-by: Yufei Cai <yufei.cai@bosch.io>
  • Loading branch information
yufei-cai committed Sep 28, 2022
1 parent f635d09 commit 4fa0a91
Show file tree
Hide file tree
Showing 16 changed files with 339 additions and 92 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import java.text.MessageFormat;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Objects;
Expand All @@ -41,6 +42,7 @@
import java.util.concurrent.CompletionStage;
import java.util.function.Consumer;
import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
Expand Down Expand Up @@ -135,6 +137,7 @@
import akka.actor.ActorRef;
import akka.actor.ActorSelection;
import akka.actor.ActorSystem;
import akka.actor.Cancellable;
import akka.actor.CoordinatedShutdown;
import akka.actor.FSM;
import akka.actor.OneForOneStrategy;
Expand Down Expand Up @@ -199,6 +202,7 @@ public abstract class BaseClientActor extends AbstractFSMWithStash<BaseClientSta
protected final ChildActorNanny childActorNanny;
private final DittoHeadersValidator dittoHeadersValidator;
private final boolean dryRun;
private final List<Cancellable> cancelOnStopTasks = new ArrayList<>();

private Sink<Object, NotUsed> inboundMappingSink;
private ActorRef outboundDispatchingActor;
Expand Down Expand Up @@ -298,16 +302,8 @@ public void preStart() throws Exception {
initialize();

// register this client actor at connection pub-sub
final var connectionId = connection.getId();
connectionPubSub.subscribe(connectionId, getSelf(), false).whenComplete((consistent, error) -> {
if (error != null) {
logger.error(error, "Failed to register client actor for connection <{}>", connectionId);
} else {
logger.info("Client actor registered: <{}>", connectionId);
}
});

closeConnectionBeforeTerminatingCluster();
registerConnectionPubSub();
addCoordinatedShutdownTasks();

init();
}
Expand Down Expand Up @@ -337,16 +333,39 @@ protected void init() {
getSelf().tell(Control.INIT_COMPLETE, ActorRef.noSender());
}

private void closeConnectionBeforeTerminatingCluster() {
private void registerConnectionPubSub() {
if (dryRun) {
// no need to modify distributed data if it is a dry run
return;
}
final var connectionId = connection.getId();
connectionPubSub.subscribe(connectionId, getSelf(), false).whenComplete((consistent, error) -> {
if (error != null) {
logger.error(error, "Failed to register client actor for connection <{}>", connectionId);
} else {
logger.info("Client actor registered: <{}>", connectionId);
}
});

}

private void addCoordinatedShutdownTasks() {
final var system = getContext().getSystem();
final var coordinatedShutdown = CoordinatedShutdown.get(system);
if (shouldAnyTargetSendConnectionAnnouncements()) {
// only add clients of connections to Coordinated shutdown having connection announcements configured
CoordinatedShutdown.get(getContext().getSystem())
.addActorTerminationTask(
CoordinatedShutdown.PhaseBeforeServiceUnbind(),
"closeConnectionAndShutdown",
getSelf(),
Optional.of(CloseConnectionAndShutdown.INSTANCE));
coordinatedShutdown.addActorTerminationTask(
CoordinatedShutdown.PhaseServiceRequestsDone(),
"closeConnectionAndShutdown",
getSelf(),
Optional.of(CloseConnectionAndShutdown.INSTANCE)
);
}
final var id = getSelf().path().toString();
cancelOnStopTasks.add(coordinatedShutdown.addCancellableTask(CoordinatedShutdown.PhaseServiceUnbind(),
"service-unbind-" + id, askSelfShutdownTask(Control.SERVICE_UNBIND)));
cancelOnStopTasks.add(coordinatedShutdown.addCancellableTask(CoordinatedShutdown.PhaseServiceRequestsDone(),
"service-requests-done" + id, askSelfShutdownTask(Control.SERVICE_REQUESTS_DONE)));
}

private boolean shouldAnyTargetSendConnectionAnnouncements() {
Expand All @@ -358,6 +377,7 @@ private boolean shouldAnyTargetSendConnectionAnnouncements() {

@Override
public void postStop() {
cancelOnStopTasks.forEach(Cancellable::cancel);
clientGauge.reset();
clientConnectingGauge.reset();
stopChildActor(tunnelActor);
Expand Down Expand Up @@ -423,6 +443,13 @@ private FSM.State<BaseClientState, BaseClientData> completeInitialization() {
*/
protected abstract CompletionStage<Status.Status> doTestConnection(TestConnection testConnectionCommand);

/**
* Stop consuming messages from the connection.
*
* @return the CompletionStage that completes or fails according to whether consumer disconnection is successful.
*/
protected abstract CompletionStage<Void> stopConsuming();

/**
* Subclasses should allocate resources (publishers and consumers) in the implementation. This method is called once
* this {@code Client} connected successfully.
Expand Down Expand Up @@ -639,6 +666,7 @@ private FSM.State<BaseClientState, BaseClientData> goToTesting() {

private FSMStateFunctionBuilder<BaseClientState, BaseClientData> inUnknownState() {
return matchEventEquals(Control.INIT_COMPLETE, (init, baseClientData) -> completeInitialization())
.eventEquals(Control.SERVICE_UNBIND, this::serviceUnbindWhenDisconnected)
.event(RuntimeException.class, BaseClientActor::failInitialization)
.anyEvent((o, baseClientData) -> {
stash();
Expand All @@ -650,7 +678,8 @@ private FSMStateFunctionBuilder<BaseClientState, BaseClientData> inInitializedSt
return matchEvent(OpenConnection.class, this::openConnection)
.event(CloseConnection.class, this::closeConnection)
.event(CloseConnectionAndShutdown.class, this::closeConnectionAndShutdown)
.event(TestConnection.class, this::testConnection);
.event(TestConnection.class, this::testConnection)
.eventEquals(Control.SERVICE_UNBIND, this::serviceUnbindWhenWaitingForCommand);
}

/**
Expand All @@ -661,7 +690,8 @@ private FSMStateFunctionBuilder<BaseClientState, BaseClientData> inInitializedSt
private FSMStateFunctionBuilder<BaseClientState, BaseClientData> inDisconnectedState() {
return matchEvent(OpenConnection.class, this::openConnection)
.event(CloseConnection.class, this::connectionAlreadyClosed)
.event(TestConnection.class, this::testConnection);
.event(TestConnection.class, this::testConnection)
.eventEquals(Control.SERVICE_UNBIND, this::serviceUnbindWhenDisconnected);
}

/**
Expand All @@ -680,7 +710,8 @@ protected FSMStateFunctionBuilder<BaseClientState, BaseClientData> inConnectingS
.eventEquals(Control.CONNECT_AFTER_TUNNEL_ESTABLISHED, this::connectAfterTunnelStarted)
.eventEquals(Control.GOTO_CONNECTED_AFTER_INITIALIZATION, this::gotoConnectedAfterInitialization)
.event(SshTunnelActor.TunnelClosed.class, this::tunnelClosed)
.event(OpenConnection.class, this::openConnectionInConnectingState);
.event(OpenConnection.class, this::openConnectionInConnectingState)
.eventEquals(Control.SERVICE_UNBIND, this::serviceUnbindWhenDisconnected);
}

/**
Expand All @@ -696,7 +727,9 @@ protected FSMStateFunctionBuilder<BaseClientState, BaseClientData> inConnectedSt
.event(ConnectionFailure.class, this::connectedConnectionFailed)
.event(ReportConnectionStatusSuccess.class, this::updateConnectionStatusSuccess)
.event(ReportConnectionStatusError.class, this::updateConnectionStatusError)
.eventEquals(Control.RESUBSCRIBE, this::resubscribe);
.eventEquals(Control.RESUBSCRIBE, this::resubscribe)
.eventEquals(Control.SERVICE_UNBIND, this::serviceUnbindWhenConnected)
.eventEquals(SEND_DISCONNECT_ANNOUNCEMENT, (event, data) -> sendDisconnectAnnouncement(data));
}

private State<BaseClientState, BaseClientData> updateConnectionStatusSuccess(
Expand Down Expand Up @@ -747,6 +780,7 @@ private FSM.State<BaseClientState, BaseClientData> publishMappedMessage(final Pu
private FSMStateFunctionBuilder<BaseClientState, BaseClientData> inDisconnectingState() {
return matchEventEquals(StateTimeout(), (event, data) -> connectionTimedOut(data))
.eventEquals(SEND_DISCONNECT_ANNOUNCEMENT, (event, data) -> sendDisconnectAnnouncement(data))
.eventEquals(Control.SERVICE_UNBIND, this::serviceUnbindWhenDisconnected)
.event(Disconnect.class, this::disconnect)
.event(ConnectionFailure.class, this::connectingConnectionFailed)
.event(ClientDisconnected.class, this::clientDisconnected);
Expand Down Expand Up @@ -784,7 +818,8 @@ protected FSMStateFunctionBuilder<BaseClientState, BaseClientData> inTestingStat
sender.first().tell(new Status.Failure(error), getSelf());
});
return stop();
});
})
.eventEquals(Control.SERVICE_UNBIND, this::serviceUnbindWhenTesting);
}

private State<BaseClientState, BaseClientData> onUnknownEvent(final Object event, final BaseClientData state) {
Expand Down Expand Up @@ -824,6 +859,43 @@ private static FSM.State<BaseClientState, BaseClientData> failInitialization(fin
throw error;
}

private FSM.State<BaseClientState, BaseClientData> serviceUnbindWhenConnected(final Control serviceUnbind,
final BaseClientData data) {
log().info("ServiceUnbind State=<{}> unsubscribing from pubsub and stopping consumers", stateName());
final var dittoProtocolUnsub = dittoProtocolSub.removeSubscriber(getSelf(), getTargetAuthSubjects());
final var connectionUnsub = connectionPubSub.unsubscribe(connectionId(), getSelf());
final var stopConsumers = stopConsuming();
final var resultFuture = dittoProtocolUnsub.thenCompose(_void -> connectionUnsub)
.thenCompose(_void -> stopConsumers)
.handle((result, error) -> Done.getInstance());
Patterns.pipe(resultFuture, getContext().dispatcher()).to(getSender());
if (shouldAnyTargetSendConnectionAnnouncements()) {
getSelf().tell(SEND_DISCONNECT_ANNOUNCEMENT, getSender());
}
return stay();
}

private FSM.State<BaseClientState, BaseClientData> serviceUnbindWhenWaitingForCommand(final Control serviceUnbind,
final BaseClientData data) {
log().warning("ServiceUnbind waiting for command. Ongoing command is discarded.");
getSender().tell(Done.getInstance(), getSelf());
return stop();
}

private FSM.State<BaseClientState, BaseClientData> serviceUnbindWhenDisconnected(final Control serviceUnbind,
final BaseClientData data) {
log().info("ServiceUnbind State=<{}>", stateName());
getSender().tell(Done.getInstance(), getSelf());
return stop();
}

private FSM.State<BaseClientState, BaseClientData> serviceUnbindWhenTesting(final Control serviceUnbind,
final BaseClientData data) {
log().info("ServiceUnbind testing. Wait for ongoing test.");
getSender().tell(Done.getInstance(), getSelf());
return stay();
}

private FSM.State<BaseClientState, BaseClientData> closeConnectionAndShutdown(
final CloseConnectionAndShutdown closeConnectionAndShutdown,
final BaseClientData data) {
Expand Down Expand Up @@ -882,6 +954,7 @@ private FSM.State<BaseClientState, BaseClientData> openConnection(final WithDitt

final ActorRef sender = getSender();
final var dittoHeaders = openConnection.getDittoHeaders();

reconnectTimeoutStrategy.reset();
final Duration connectingTimeout = connectivityConfig().getClientConfig().getConnectingMinTimeout();

Expand Down Expand Up @@ -1833,10 +1906,6 @@ private FSM.State<BaseClientState, BaseClientData> resubscribe(final Control tri
return stay();
}

protected boolean isDryRun() {
return dryRun;
}

private BaseClientData setSession(final BaseClientData data, @Nullable final ActorRef sender,
final DittoHeaders headers) {

Expand Down Expand Up @@ -2000,6 +2069,11 @@ private void startSubscriptionRefreshTimer() {
startSingleTimer(Control.RESUBSCRIBE.name(), Control.RESUBSCRIBE, randomizedDelay);
}

private Supplier<CompletionStage<Done>> askSelfShutdownTask(final Object question) {
final var shutdownTimeout = Duration.ofMinutes(2);
return () -> Patterns.ask(getSelf(), question, shutdownTimeout).thenApply(answer -> Done.done());
}

private static Optional<StreamingType> toStreamingTypes(final Topic topic) {
return switch (topic) {
case POLICY_ANNOUNCEMENTS -> Optional.of(StreamingType.POLICY_ANNOUNCEMENTS);
Expand All @@ -2011,14 +2085,6 @@ private static Optional<StreamingType> toStreamingTypes(final Topic topic) {
};
}

private static Optional<Integer> parseHexString(final String hexString) {
try {
return Optional.of(Integer.parseUnsignedInt(hexString, 16));
} catch (final NumberFormatException e) {
return Optional.empty();
}
}

/*
* Reconnect timeout strategy that provides increasing timeouts for reconnecting the client.
* On timeout, increase the next timeout so that backoff happens when connecting to a drop-all firewall.
Expand Down Expand Up @@ -2209,13 +2275,6 @@ public String toString() {

}

private enum Control {
INIT_COMPLETE,
CONNECT_AFTER_TUNNEL_ESTABLISHED,
GOTO_CONNECTED_AFTER_INITIALIZATION,
RESUBSCRIBE
}

private static final Object SEND_DISCONNECT_ANNOUNCEMENT = new Object();

private static final class Disconnect {
Expand Down Expand Up @@ -2248,4 +2307,13 @@ private CloseConnectionAndShutdown() {
}
}

private enum Control {
INIT_COMPLETE,
CONNECT_AFTER_TUNNEL_ESTABLISHED,
GOTO_CONNECTED_AFTER_INITIALIZATION,
RESUBSCRIBE,
SERVICE_UNBIND,
SERVICE_REQUESTS_DONE
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
import java.util.stream.Stream;

import javax.annotation.Nullable;
import javax.annotation.concurrent.Immutable;
Expand Down Expand Up @@ -299,6 +300,16 @@ protected CompletionStage<Status.Status> doTestConnection(final TestConnection t
});
}

@Override
protected CompletionStage<Void> stopConsuming() {
final var timeout = Duration.ofMinutes(2L);
final CompletableFuture<?>[] futures = streamConsumerActors()
.map(consumer -> Patterns.ask(consumer, AmqpConsumerActor.Control.STOP_CONSUMER, timeout))
.map(CompletionStage::toCompletableFuture)
.toArray(CompletableFuture[]::new);
return CompletableFuture.allOf(futures);
}

@Override
protected void doConnectClient(final Connection connection, @Nullable final ActorRef origin) {
// delegate to child actor because the QPID JMS client is blocking until connection is opened/closed
Expand Down Expand Up @@ -437,12 +448,7 @@ private ActorRef startCommandConsumer(final ConsumerData consumer, final Sink<Ob
}

private void stopCommandConsumers() {
consumerByNamePrefix.forEach((namePrefix, child) -> {
final String actorName = child.path().name();
if (actorName.startsWith(AmqpConsumerActor.ACTOR_NAME_PREFIX)) {
stopChildActor(child);
}
});
streamConsumerActors().forEach(this::stopChildActor);
consumerByNamePrefix.clear();
}

Expand Down Expand Up @@ -611,6 +617,12 @@ private JmsConnect jmsConnect(@Nullable final ActorRef sender, final Connection
return new JmsConnect(sender, getClientId(connection.getId()));
}

private Stream<ActorRef> streamConsumerActors() {
return consumerByNamePrefix.values()
.stream()
.filter(child -> child.path().name().startsWith(AmqpConsumerActor.ACTOR_NAME_PREFIX));
}

/**
* {@code Connect} message for internal communication with {@link JMSConnectionHandlingActor}.
*/
Expand Down
Loading

0 comments on commit 4fa0a91

Please sign in to comment.