From db90679f2d204742e5a16e59520f0d6803e93359 Mon Sep 17 00:00:00 2001 From: Olivier Desenfans Date: Tue, 16 May 2023 11:36:06 +0200 Subject: [PATCH] Fix: reopen the API MQ channels if they are closed Problem: the MQ channels can close for a variety of issues. Once closed, the channel cannot be used anymore. Solution: detect if the channel is closed and reopen it if needed. --- .../web/controllers/app_state_getters.py | 41 ++++++++++++++++--- src/aleph/web/controllers/messages.py | 2 +- src/aleph/web/controllers/p2p.py | 2 +- 3 files changed, 38 insertions(+), 7 deletions(-) diff --git a/src/aleph/web/controllers/app_state_getters.py b/src/aleph/web/controllers/app_state_getters.py index 776029ec7..d6a25b9bf 100644 --- a/src/aleph/web/controllers/app_state_getters.py +++ b/src/aleph/web/controllers/app_state_getters.py @@ -3,7 +3,7 @@ This module provides an abstraction layer over the dictionary keys used to address these objects. """ - +import logging from typing import Optional, cast, TypeVar import aio_pika.abc @@ -50,12 +50,43 @@ def get_mq_conn_from_request(request: web.Request) -> aio_pika.abc.AbstractConne return cast(aio_pika.abc.AbstractConnection, request.app[APP_STATE_MQ_CONN]) -def get_mq_channel_from_request(request: web.Request) -> aio_pika.abc.AbstractChannel: - return cast(aio_pika.abc.AbstractChannel, request.app[APP_STATE_MQ_CHANNEL]) +async def _get_open_channel( + request: web.Request, channel_name: str, logger: logging.Logger +) -> aio_pika.abc.AbstractChannel: + channel = cast(aio_pika.abc.AbstractChannel, request.app[channel_name]) + if channel.is_closed: + # This should not happen, but does happen in practice because of RabbitMQ + # RPC timeouts. We need to figure out where this timeout comes from, + # but reopening the channel is mandatory to keep the endpoints using the MQ + # functional. + logger.error("%s channel is closed, reopening it", channel_name) + await channel.reopen() + + return channel + + +async def get_mq_channel_from_request( + request: web.Request, logger: logging.Logger +) -> aio_pika.abc.AbstractChannel: + """ + Gets the MQ channel from the app state and reopens it if needed. + """ + + return await _get_open_channel( + request=request, channel_name=APP_STATE_MQ_CHANNEL, logger=logger + ) + +async def get_mq_ws_channel_from_request( + request: web.Request, logger: logging.Logger +) -> aio_pika.abc.AbstractChannel: + """ + Gets the websocket MQ channel from the app state and reopens it if needed. + """ -def get_mq_ws_channel_from_request(request: web.Request) -> aio_pika.abc.AbstractChannel: - return cast(aio_pika.abc.AbstractChannel, request.app[APP_STATE_MQ_WS_CHANNEL]) + return await _get_open_channel( + request=request, channel_name=APP_STATE_MQ_WS_CHANNEL, logger=logger + ) def get_node_cache_from_request(request: web.Request) -> NodeCache: diff --git a/src/aleph/web/controllers/messages.py b/src/aleph/web/controllers/messages.py index 8dd64fca5..bf5cf4883 100644 --- a/src/aleph/web/controllers/messages.py +++ b/src/aleph/web/controllers/messages.py @@ -301,7 +301,7 @@ async def messages_ws(request: web.Request) -> web.WebSocketResponse: config = get_config_from_request(request) session_factory = get_session_factory_from_request(request) - mq_channel = get_mq_ws_channel_from_request(request) + mq_channel = await get_mq_ws_channel_from_request(request=request, logger=LOGGER) try: query_params = WsMessageQueryParams.parse_obj(request.query) diff --git a/src/aleph/web/controllers/p2p.py b/src/aleph/web/controllers/p2p.py index 0e4c274f4..3abecb8bb 100644 --- a/src/aleph/web/controllers/p2p.py +++ b/src/aleph/web/controllers/p2p.py @@ -197,7 +197,7 @@ async def pub_message(request: web.Request): config = get_config_from_request(request) if request_data.sync: - mq_channel = get_mq_channel_from_request(request) + mq_channel = await get_mq_channel_from_request(request=request, logger=LOGGER) mq_queue = await mq_make_aleph_message_topic_queue( channel=mq_channel, config=config,