Skip to content

Commit

Permalink
fix: properly manage controller shutdown process
Browse files Browse the repository at this point in the history
  • Loading branch information
guillaumelamirand committed Mar 5, 2024
1 parent 3ba8c76 commit dd59ba2
Show file tree
Hide file tree
Showing 6 changed files with 52 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -358,7 +358,7 @@ protected Single<HelloReply> sendHelloCommand(final HelloCommand helloCommand) {
return send(helloCommand, true);
}

private <C extends Command<?>, R extends Reply<?>> Single<R> send(final C command, final boolean ignoreActiveStatus) {
protected <C extends Command<?>, R extends Reply<?>> Single<R> send(final C command, final boolean ignoreActiveStatus) {
return Single
.defer(() -> {
if (!ignoreActiveStatus && !active) {
Expand All @@ -377,13 +377,6 @@ private <C extends Command<?>, R extends Reply<?>> Single<R> send(final C comman
resultEmitters.put(decoratedCommand.getId(), emitter);
writeCommand(decoratedCommand).doOnError(emitter::onError).onErrorComplete().subscribe();
})
.doOnError(throwable ->
log.warn(
"Unable to send command or receive reply for command [{}, {}].",
decoratedCommand.getType(),
decoratedCommand.getId()
)
)
.timeout(
decoratedCommand.getReplyTimeoutMs(),
TimeUnit.MILLISECONDS,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,14 +206,22 @@ private Completable sendHealthCheckCommand() {
@Override
protected void doStop() throws Exception {
log.debug("[{}] Stopping channel manager", this.identifyConfiguration.id());
super.doStop();

// Unregister all local channel
Flowable
.fromIterable(this.localChannelRegistry.getAll())
.flatMapCompletable(controllerChannel -> unregister(controllerChannel).onErrorComplete())
.doOnComplete(() -> log.debug("[{}] All local channel unregistered.", this.identifyConfiguration.id()))
.blockingAwait();

primaryChannelManager.stop();
if (healthCheckDisposable != null) {
healthCheckDisposable.dispose();
}
if (primaryChannelElectedEventTopic != null && primaryChannelElectedSubscriptionId != null) {
primaryChannelElectedEventTopic.removeMessageListener(primaryChannelElectedSubscriptionId);
}
super.doStop();
}

public Flowable<TargetMetric> targetsMetric() {
Expand Down Expand Up @@ -316,12 +324,31 @@ public <C extends Command<?>, R extends Reply<?>> Single<R> send(C command, Stri
)
.switchIfEmpty(Single.error(new NoChannelFoundException()))
.<R>flatMap(controllerChannel -> {
log.debug("[{}] Sending command '{}' to channel '{}'", this.identifyConfiguration.id(), command, controllerChannel);
log.debug(
"[{}] Sending command '{}' with id '{}' to channel '{}'",
this.identifyConfiguration.id(),
command.getType(),
command.getId(),
controllerChannel
);
return controllerChannel.send(command);
})
.doOnSuccess(reply -> log.debug("[{}] Command '{}' successfully sent", this.identifyConfiguration.id(), command.getId()))
.doOnSuccess(reply ->
log.debug(
"[{}] Command '{}' with id '{}' successfully sent",
this.identifyConfiguration.id(),
command.getType(),
command.getId()
)
)
.doOnError(throwable ->
log.warn("[{}] Unable to send command '{}'", this.identifyConfiguration.id(), command.getId(), throwable)
log.warn(
"[{}] Unable to send command or receive reply for command '{}' with id '{}'",
this.identifyConfiguration.id(),
command.getType(),
command.getId(),
throwable
)
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,10 +92,10 @@ protected void doStart() throws Exception {
@Override
protected void doStop() throws Exception {
log.debug("[{}] Stopping primary channel manager", this.identifyConfiguration.id());
super.doStop();
if (primaryChannelEventTopic != null && subscriptionListenerId != null) {
primaryChannelEventTopic.removeMessageListener(subscriptionListenerId);
}
super.doStop();
}

public Flowable<Map.Entry<String, List<String>>> candidatesChannel() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,8 +102,6 @@ private void handleClusteredReply(Message<ClusteredReply<?>> clusteredReplyMessa
protected void doStop() throws Exception {
log.debug("[{}] Stopping controller cluster manager", identifyConfiguration.id());
super.doStop();
// Stop channel manager
channelManager.stop();

// Stop all command listeners.
final List<ControllerChannel> channels = subscriptionsListenersByChannel
Expand All @@ -115,6 +113,9 @@ protected void doStop() throws Exception {

channels.forEach(this::channelDisconnected);

// Stop channel manager
channelManager.stop();

// Stop listening the reply queue.
if (clusteredReplyQueue != null && clusteredReplySubscriptionId != null) {
clusteredReplyQueue.removeMessageListener(clusteredReplySubscriptionId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,6 @@
@RequiredArgsConstructor
public class WebSocketRequestHandler implements io.vertx.core.Handler<io.vertx.rxjava3.ext.web.RoutingContext> {

public static final String CTX_PROTOCOL_VERSION = "X-Gravitee-Exchange-Protocol";

private final Vertx vertx;
private final ExchangeController exchangeController;
private final WebSocketControllerAuthentication<?> controllerAuthentication;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package io.gravitee.exchange.controller.websocket.channel;

import io.gravitee.exchange.api.channel.exception.ChannelClosedException;
import io.gravitee.exchange.api.command.Command;
import io.gravitee.exchange.api.command.CommandAdapter;
import io.gravitee.exchange.api.command.CommandHandler;
Expand All @@ -25,7 +26,6 @@
import io.gravitee.exchange.api.controller.ControllerChannel;
import io.gravitee.exchange.api.websocket.channel.AbstractWebSocketChannel;
import io.gravitee.exchange.api.websocket.protocol.ProtocolAdapter;
import io.gravitee.exchange.controller.core.channel.primary.PrimaryChannelManager;
import io.reactivex.rxjava3.core.Completable;
import io.reactivex.rxjava3.core.CompletableEmitter;
import io.vertx.rxjava3.core.Vertx;
Expand Down Expand Up @@ -66,7 +66,21 @@ public Completable close() {
return Completable
.defer(() -> {
if (!webSocket.isClosed()) {
return send(new GoodByeCommand(new GoodByeCommandPayload(targetId, true))).ignoreElement();
return send(new GoodByeCommand(new GoodByeCommandPayload(targetId, true)), true)
.ignoreElement()
.onErrorResumeNext(throwable -> {
if (throwable instanceof ChannelClosedException) {
log.debug(
"GoodBye command successfully sent for channel '{}' for target '{}' got closed normally",
id,
targetId
);
return Completable.complete();
} else {
log.debug("Unable to send GoodBye command for channel '{}' for target '{}'", id, targetId);
return Completable.error(throwable);
}
});
}
return Completable.complete();
})
Expand Down

0 comments on commit dd59ba2

Please sign in to comment.