diff --git a/gravitee-exchange-api/src/main/java/io/gravitee/exchange/api/websocket/channel/AbstractWebSocketChannel.java b/gravitee-exchange-api/src/main/java/io/gravitee/exchange/api/websocket/channel/AbstractWebSocketChannel.java index acfc3f1..c264654 100644 --- a/gravitee-exchange-api/src/main/java/io/gravitee/exchange/api/websocket/channel/AbstractWebSocketChannel.java +++ b/gravitee-exchange-api/src/main/java/io/gravitee/exchange/api/websocket/channel/AbstractWebSocketChannel.java @@ -358,7 +358,7 @@ protected Single sendHelloCommand(final HelloCommand helloCommand) { return send(helloCommand, true); } - private , R extends Reply> Single send(final C command, final boolean ignoreActiveStatus) { + protected , R extends Reply> Single send(final C command, final boolean ignoreActiveStatus) { return Single .defer(() -> { if (!ignoreActiveStatus && !active) { @@ -377,13 +377,6 @@ private , R extends Reply> Single send(final C comman resultEmitters.put(decoratedCommand.getId(), emitter); writeCommand(decoratedCommand).doOnError(emitter::onError).onErrorComplete().subscribe(); }) - .doOnError(throwable -> - log.warn( - "Unable to send command or receive reply for command [{}, {}].", - decoratedCommand.getType(), - decoratedCommand.getId() - ) - ) .timeout( decoratedCommand.getReplyTimeoutMs(), TimeUnit.MILLISECONDS, diff --git a/gravitee-exchange-controller/gravitee-exchange-controller-core/src/main/java/io/gravitee/exchange/controller/core/channel/ChannelManager.java b/gravitee-exchange-controller/gravitee-exchange-controller-core/src/main/java/io/gravitee/exchange/controller/core/channel/ChannelManager.java index 31a262d..bc5b65a 100644 --- a/gravitee-exchange-controller/gravitee-exchange-controller-core/src/main/java/io/gravitee/exchange/controller/core/channel/ChannelManager.java +++ b/gravitee-exchange-controller/gravitee-exchange-controller-core/src/main/java/io/gravitee/exchange/controller/core/channel/ChannelManager.java @@ -206,7 +206,14 @@ private Completable sendHealthCheckCommand() { @Override protected void doStop() throws Exception { log.debug("[{}] Stopping channel manager", this.identifyConfiguration.id()); - super.doStop(); + + // Unregister all local channel + Flowable + .fromIterable(this.localChannelRegistry.getAll()) + .flatMapCompletable(controllerChannel -> unregister(controllerChannel).onErrorComplete()) + .doOnComplete(() -> log.debug("[{}] All local channel unregistered.", this.identifyConfiguration.id())) + .blockingAwait(); + primaryChannelManager.stop(); if (healthCheckDisposable != null) { healthCheckDisposable.dispose(); @@ -214,6 +221,7 @@ protected void doStop() throws Exception { if (primaryChannelElectedEventTopic != null && primaryChannelElectedSubscriptionId != null) { primaryChannelElectedEventTopic.removeMessageListener(primaryChannelElectedSubscriptionId); } + super.doStop(); } public Flowable targetsMetric() { @@ -316,12 +324,31 @@ public , R extends Reply> Single send(C command, Stri ) .switchIfEmpty(Single.error(new NoChannelFoundException())) .flatMap(controllerChannel -> { - log.debug("[{}] Sending command '{}' to channel '{}'", this.identifyConfiguration.id(), command, controllerChannel); + log.debug( + "[{}] Sending command '{}' with id '{}' to channel '{}'", + this.identifyConfiguration.id(), + command.getType(), + command.getId(), + controllerChannel + ); return controllerChannel.send(command); }) - .doOnSuccess(reply -> log.debug("[{}] Command '{}' successfully sent", this.identifyConfiguration.id(), command.getId())) + .doOnSuccess(reply -> + log.debug( + "[{}] Command '{}' with id '{}' successfully sent", + this.identifyConfiguration.id(), + command.getType(), + command.getId() + ) + ) .doOnError(throwable -> - log.warn("[{}] Unable to send command '{}'", this.identifyConfiguration.id(), command.getId(), throwable) + log.warn( + "[{}] Unable to send command or receive reply for command '{}' with id '{}'", + this.identifyConfiguration.id(), + command.getType(), + command.getId(), + throwable + ) ); } diff --git a/gravitee-exchange-controller/gravitee-exchange-controller-core/src/main/java/io/gravitee/exchange/controller/core/channel/primary/PrimaryChannelManager.java b/gravitee-exchange-controller/gravitee-exchange-controller-core/src/main/java/io/gravitee/exchange/controller/core/channel/primary/PrimaryChannelManager.java index e6dfdff..cba1e3a 100644 --- a/gravitee-exchange-controller/gravitee-exchange-controller-core/src/main/java/io/gravitee/exchange/controller/core/channel/primary/PrimaryChannelManager.java +++ b/gravitee-exchange-controller/gravitee-exchange-controller-core/src/main/java/io/gravitee/exchange/controller/core/channel/primary/PrimaryChannelManager.java @@ -92,10 +92,10 @@ protected void doStart() throws Exception { @Override protected void doStop() throws Exception { log.debug("[{}] Stopping primary channel manager", this.identifyConfiguration.id()); - super.doStop(); if (primaryChannelEventTopic != null && subscriptionListenerId != null) { primaryChannelEventTopic.removeMessageListener(subscriptionListenerId); } + super.doStop(); } public Flowable>> candidatesChannel() { diff --git a/gravitee-exchange-controller/gravitee-exchange-controller-core/src/main/java/io/gravitee/exchange/controller/core/cluster/ControllerClusterManager.java b/gravitee-exchange-controller/gravitee-exchange-controller-core/src/main/java/io/gravitee/exchange/controller/core/cluster/ControllerClusterManager.java index 5ba141a..c59359b 100644 --- a/gravitee-exchange-controller/gravitee-exchange-controller-core/src/main/java/io/gravitee/exchange/controller/core/cluster/ControllerClusterManager.java +++ b/gravitee-exchange-controller/gravitee-exchange-controller-core/src/main/java/io/gravitee/exchange/controller/core/cluster/ControllerClusterManager.java @@ -102,8 +102,6 @@ private void handleClusteredReply(Message> clusteredReplyMessa protected void doStop() throws Exception { log.debug("[{}] Stopping controller cluster manager", identifyConfiguration.id()); super.doStop(); - // Stop channel manager - channelManager.stop(); // Stop all command listeners. final List channels = subscriptionsListenersByChannel @@ -115,6 +113,9 @@ protected void doStop() throws Exception { channels.forEach(this::channelDisconnected); + // Stop channel manager + channelManager.stop(); + // Stop listening the reply queue. if (clusteredReplyQueue != null && clusteredReplySubscriptionId != null) { clusteredReplyQueue.removeMessageListener(clusteredReplySubscriptionId); diff --git a/gravitee-exchange-controller/gravitee-exchange-controller-websocket/src/main/java/io/gravitee/exchange/controller/websocket/WebSocketRequestHandler.java b/gravitee-exchange-controller/gravitee-exchange-controller-websocket/src/main/java/io/gravitee/exchange/controller/websocket/WebSocketRequestHandler.java index 48830f5..d103662 100644 --- a/gravitee-exchange-controller/gravitee-exchange-controller-websocket/src/main/java/io/gravitee/exchange/controller/websocket/WebSocketRequestHandler.java +++ b/gravitee-exchange-controller/gravitee-exchange-controller-websocket/src/main/java/io/gravitee/exchange/controller/websocket/WebSocketRequestHandler.java @@ -47,8 +47,6 @@ @RequiredArgsConstructor public class WebSocketRequestHandler implements io.vertx.core.Handler { - public static final String CTX_PROTOCOL_VERSION = "X-Gravitee-Exchange-Protocol"; - private final Vertx vertx; private final ExchangeController exchangeController; private final WebSocketControllerAuthentication controllerAuthentication; diff --git a/gravitee-exchange-controller/gravitee-exchange-controller-websocket/src/main/java/io/gravitee/exchange/controller/websocket/channel/WebSocketControllerChannel.java b/gravitee-exchange-controller/gravitee-exchange-controller-websocket/src/main/java/io/gravitee/exchange/controller/websocket/channel/WebSocketControllerChannel.java index cb481ff..41ca845 100644 --- a/gravitee-exchange-controller/gravitee-exchange-controller-websocket/src/main/java/io/gravitee/exchange/controller/websocket/channel/WebSocketControllerChannel.java +++ b/gravitee-exchange-controller/gravitee-exchange-controller-websocket/src/main/java/io/gravitee/exchange/controller/websocket/channel/WebSocketControllerChannel.java @@ -15,6 +15,7 @@ */ package io.gravitee.exchange.controller.websocket.channel; +import io.gravitee.exchange.api.channel.exception.ChannelClosedException; import io.gravitee.exchange.api.command.Command; import io.gravitee.exchange.api.command.CommandAdapter; import io.gravitee.exchange.api.command.CommandHandler; @@ -25,7 +26,6 @@ import io.gravitee.exchange.api.controller.ControllerChannel; import io.gravitee.exchange.api.websocket.channel.AbstractWebSocketChannel; import io.gravitee.exchange.api.websocket.protocol.ProtocolAdapter; -import io.gravitee.exchange.controller.core.channel.primary.PrimaryChannelManager; import io.reactivex.rxjava3.core.Completable; import io.reactivex.rxjava3.core.CompletableEmitter; import io.vertx.rxjava3.core.Vertx; @@ -66,7 +66,21 @@ public Completable close() { return Completable .defer(() -> { if (!webSocket.isClosed()) { - return send(new GoodByeCommand(new GoodByeCommandPayload(targetId, true))).ignoreElement(); + return send(new GoodByeCommand(new GoodByeCommandPayload(targetId, true)), true) + .ignoreElement() + .onErrorResumeNext(throwable -> { + if (throwable instanceof ChannelClosedException) { + log.debug( + "GoodBye command successfully sent for channel '{}' for target '{}' got closed normally", + id, + targetId + ); + return Completable.complete(); + } else { + log.debug("Unable to send GoodBye command for channel '{}' for target '{}'", id, targetId); + return Completable.error(throwable); + } + }); } return Completable.complete(); })