From 431b31a68dd5e166f7a76c41ccd0abd2d4e5a06f Mon Sep 17 00:00:00 2001 From: Olivier Desenfans Date: Tue, 16 May 2023 11:18:38 +0200 Subject: [PATCH] Internal: use a separate MQ channel for websockets Problem: the message websocket appears to close the channel in some situations. We want to avoid a contagion to the other endpoints using the MQ. Solution: use a separate channel. The WS issue will not propagate to other channels. --- src/aleph/api_entrypoint.py | 4 +++- src/aleph/web/controllers/app_state_getters.py | 8 ++++++++ src/aleph/web/controllers/messages.py | 4 ++-- 3 files changed, 13 insertions(+), 3 deletions(-) diff --git a/src/aleph/api_entrypoint.py b/src/aleph/api_entrypoint.py index 0690b7b2d..b5031f8de 100644 --- a/src/aleph/api_entrypoint.py +++ b/src/aleph/api_entrypoint.py @@ -21,7 +21,7 @@ APP_STATE_NODE_CACHE, APP_STATE_P2P_CLIENT, APP_STATE_SESSION_FACTORY, - APP_STATE_STORAGE_SERVICE, APP_STATE_MQ_CHANNEL, + APP_STATE_STORAGE_SERVICE, APP_STATE_MQ_CHANNEL, APP_STATE_MQ_WS_CHANNEL, ) @@ -57,11 +57,13 @@ async def configure_aiohttp_app( # Channels are long-lived, so we create one at startup. Otherwise, we end up hitting # the channel limit if we create a channel for each operation. mq_channel = await mq_conn.channel() + mq_ws_channel = await mq_conn.channel() app[APP_STATE_CONFIG] = config app[APP_STATE_P2P_CLIENT] = p2p_client app[APP_STATE_MQ_CONN] = mq_conn app[APP_STATE_MQ_CHANNEL] = mq_channel + app[APP_STATE_MQ_WS_CHANNEL] = mq_ws_channel app[APP_STATE_NODE_CACHE] = node_cache app[APP_STATE_STORAGE_SERVICE] = storage_service app[APP_STATE_SESSION_FACTORY] = session_factory diff --git a/src/aleph/web/controllers/app_state_getters.py b/src/aleph/web/controllers/app_state_getters.py index 2adb6f6d5..776029ec7 100644 --- a/src/aleph/web/controllers/app_state_getters.py +++ b/src/aleph/web/controllers/app_state_getters.py @@ -19,6 +19,10 @@ APP_STATE_CONFIG = "config" APP_STATE_MQ_CONN = "mq_conn" APP_STATE_MQ_CHANNEL = "mq_channel" +# RabbitMQ channel dedicated to websocket operations. +# A yet to be understood issue causes the websocket channel to close unexpectedly. +# We use a dedicated channel to avoid propagation of the issue to other endpoints. +APP_STATE_MQ_WS_CHANNEL = "mq_ws_channel" APP_STATE_NODE_CACHE = "node_cache" APP_STATE_P2P_CLIENT = "p2p_client" APP_STATE_SESSION_FACTORY = "session_factory" @@ -50,6 +54,10 @@ def get_mq_channel_from_request(request: web.Request) -> aio_pika.abc.AbstractCh return cast(aio_pika.abc.AbstractChannel, request.app[APP_STATE_MQ_CHANNEL]) +def get_mq_ws_channel_from_request(request: web.Request) -> aio_pika.abc.AbstractChannel: + return cast(aio_pika.abc.AbstractChannel, request.app[APP_STATE_MQ_WS_CHANNEL]) + + def get_node_cache_from_request(request: web.Request) -> NodeCache: return cast(NodeCache, request.app[APP_STATE_NODE_CACHE]) diff --git a/src/aleph/web/controllers/messages.py b/src/aleph/web/controllers/messages.py index 34a6a3478..8dd64fca5 100644 --- a/src/aleph/web/controllers/messages.py +++ b/src/aleph/web/controllers/messages.py @@ -34,7 +34,7 @@ from aleph.web.controllers.app_state_getters import ( get_session_factory_from_request, get_mq_channel_from_request, - get_config_from_request, + get_config_from_request, get_mq_ws_channel_from_request, ) from aleph.web.controllers.utils import ( DEFAULT_MESSAGES_PER_PAGE, @@ -301,7 +301,7 @@ async def messages_ws(request: web.Request) -> web.WebSocketResponse: config = get_config_from_request(request) session_factory = get_session_factory_from_request(request) - mq_channel = get_mq_channel_from_request(request) + mq_channel = get_mq_ws_channel_from_request(request) try: query_params = WsMessageQueryParams.parse_obj(request.query)