From b5c6ad3bf15023ceb6a1f02c15c0e3a8b6dc8124 Mon Sep 17 00:00:00 2001 From: Olivier Desenfans Date: Fri, 28 Apr 2023 11:59:17 +0200 Subject: [PATCH 1/2] Fix: support very large number of open websockets Problem: opening more than ~2000 message websockets hits the channel limit of RabbitMQ and resets the connection. Solution: create one channel per API process, as they are long-lived and should not be created for each operation according to the doc. --- src/aleph/api_entrypoint.py | 12 +++++-- .../web/controllers/app_state_getters.py | 5 +++ src/aleph/web/controllers/messages.py | 31 +++++++------------ src/aleph/web/controllers/p2p.py | 19 +++++++++--- src/aleph/web/controllers/utils.py | 3 +- 5 files changed, 40 insertions(+), 30 deletions(-) diff --git a/src/aleph/api_entrypoint.py b/src/aleph/api_entrypoint.py index cb4834084..0690b7b2d 100644 --- a/src/aleph/api_entrypoint.py +++ b/src/aleph/api_entrypoint.py @@ -21,7 +21,7 @@ APP_STATE_NODE_CACHE, APP_STATE_P2P_CLIENT, APP_STATE_SESSION_FACTORY, - APP_STATE_STORAGE_SERVICE, + APP_STATE_STORAGE_SERVICE, APP_STATE_MQ_CHANNEL, ) @@ -52,10 +52,16 @@ async def configure_aiohttp_app( app = create_aiohttp_app() + # Reuse the connection of the P2P client to avoid opening two connections + mq_conn = p2p_client.mq_client.connection + # Channels are long-lived, so we create one at startup. Otherwise, we end up hitting + # the channel limit if we create a channel for each operation. + mq_channel = await mq_conn.channel() + app[APP_STATE_CONFIG] = config app[APP_STATE_P2P_CLIENT] = p2p_client - # Reuse the connection of the P2P client to avoid opening two connections - app[APP_STATE_MQ_CONN] = p2p_client.mq_client.connection + app[APP_STATE_MQ_CONN] = mq_conn + app[APP_STATE_MQ_CHANNEL] = mq_channel app[APP_STATE_NODE_CACHE] = node_cache app[APP_STATE_STORAGE_SERVICE] = storage_service app[APP_STATE_SESSION_FACTORY] = session_factory diff --git a/src/aleph/web/controllers/app_state_getters.py b/src/aleph/web/controllers/app_state_getters.py index 5b29c7337..2adb6f6d5 100644 --- a/src/aleph/web/controllers/app_state_getters.py +++ b/src/aleph/web/controllers/app_state_getters.py @@ -18,6 +18,7 @@ APP_STATE_CONFIG = "config" APP_STATE_MQ_CONN = "mq_conn" +APP_STATE_MQ_CHANNEL = "mq_channel" APP_STATE_NODE_CACHE = "node_cache" APP_STATE_P2P_CLIENT = "p2p_client" APP_STATE_SESSION_FACTORY = "session_factory" @@ -45,6 +46,10 @@ def get_mq_conn_from_request(request: web.Request) -> aio_pika.abc.AbstractConne return cast(aio_pika.abc.AbstractConnection, request.app[APP_STATE_MQ_CONN]) +def get_mq_channel_from_request(request: web.Request) -> aio_pika.abc.AbstractChannel: + return cast(aio_pika.abc.AbstractChannel, request.app[APP_STATE_MQ_CHANNEL]) + + def get_node_cache_from_request(request: web.Request) -> NodeCache: return cast(NodeCache, request.app[APP_STATE_NODE_CACHE]) diff --git a/src/aleph/web/controllers/messages.py b/src/aleph/web/controllers/messages.py index 10d1b145b..f232f0e96 100644 --- a/src/aleph/web/controllers/messages.py +++ b/src/aleph/web/controllers/messages.py @@ -31,7 +31,11 @@ from aleph.types.db_session import DbSessionFactory, DbSession from aleph.types.message_status import MessageStatus from aleph.types.sort_order import SortOrder, SortBy -from aleph.web.controllers.app_state_getters import get_session_factory_from_request +from aleph.web.controllers.app_state_getters import ( + get_session_factory_from_request, + get_mq_channel_from_request, + get_config_from_request, +) from aleph.web.controllers.utils import ( DEFAULT_MESSAGES_PER_PAGE, DEFAULT_PAGE, @@ -229,29 +233,16 @@ async def view_messages_list(request: web.Request) -> web.Response: ) -async def declare_mq_queue( - mq_conn: aio_pika.abc.AbstractConnection, config: Config -) -> aio_pika.abc.AbstractQueue: - channel = await mq_conn.channel() - mq_message_exchange = await channel.declare_exchange( - name=config.rabbitmq.message_exchange.value, - type=aio_pika.ExchangeType.TOPIC, - auto_delete=False, - ) - mq_queue = await channel.declare_queue(auto_delete=True) - await mq_queue.bind(mq_message_exchange, routing_key="processed.*") - return mq_queue - - -async def messages_ws(request: web.Request): +async def messages_ws(request: web.Request) -> web.WebSocketResponse: ws = web.WebSocketResponse() await ws.prepare(request) - mq_conn: aio_pika.abc.AbstractConnection = request.app["mq_conn"] - session_factory: DbSessionFactory = request.app["session_factory"] - config = request.app["config"] + mq_channel: aio_pika.abc.AbstractChannel = get_mq_channel_from_request(request) + session_factory: DbSessionFactory = get_session_factory_from_request(request) + config = get_config_from_request(request) + mq_queue = await mq_make_aleph_message_topic_queue( - mq_conn=mq_conn, config=config, routing_key="processed.*" + channel=mq_channel, config=config, routing_key="processed.*" ) query_params = WsMessageQueryParams.parse_obj(request.query) diff --git a/src/aleph/web/controllers/p2p.py b/src/aleph/web/controllers/p2p.py index 10bae3a2b..6ee0ba88a 100644 --- a/src/aleph/web/controllers/p2p.py +++ b/src/aleph/web/controllers/p2p.py @@ -24,6 +24,7 @@ get_ipfs_service_from_request, get_p2p_client_from_request, get_mq_conn_from_request, + get_mq_channel_from_request, ) from aleph.web.controllers.utils import mq_make_aleph_message_topic_queue @@ -65,16 +66,22 @@ def _validate_request_data(config: Config, request_data: Dict) -> None: # Only accept publishing on the message topic. message_topic = config.aleph.queue_topic.value if topic != message_topic: - raise web.HTTPForbidden(reason=f"Unauthorized P2P topic: {topic}. Use {message_topic}.") + raise web.HTTPForbidden( + reason=f"Unauthorized P2P topic: {topic}. Use {message_topic}." + ) data = request_data.get("data") if not isinstance(data, str): - raise web.HTTPUnprocessableEntity(reason="'data': expected a serialized JSON string.") + raise web.HTTPUnprocessableEntity( + reason="'data': expected a serialized JSON string." + ) try: message_dict = json.loads(cast(str, request_data.get("data"))) except ValueError: - raise web.HTTPUnprocessableEntity(reason="'data': must be deserializable as JSON.") + raise web.HTTPUnprocessableEntity( + reason="'data': must be deserializable as JSON." + ) _validate_message_dict(message_dict) @@ -187,9 +194,11 @@ async def pub_message(request: web.Request): config = get_config_from_request(request) if request_data.sync: - mq_conn = get_mq_conn_from_request(request) + mq_channel = get_mq_channel_from_request(request) mq_queue = await mq_make_aleph_message_topic_queue( - mq_conn=mq_conn, config=config, routing_key=f"*.{pending_message.item_hash}" + channel=mq_channel, + config=config, + routing_key=f"*.{pending_message.item_hash}", ) else: mq_queue = None diff --git a/src/aleph/web/controllers/utils.py b/src/aleph/web/controllers/utils.py index dbfed1c5a..aec2bb9eb 100644 --- a/src/aleph/web/controllers/utils.py +++ b/src/aleph/web/controllers/utils.py @@ -143,11 +143,10 @@ def cond_output(request, context, template): async def mq_make_aleph_message_topic_queue( - mq_conn: aio_pika.abc.AbstractConnection, + channel: aio_pika.abc.AbstractChannel, config: Config, routing_key: Optional[str] = None, ) -> aio_pika.abc.AbstractQueue: - channel = await mq_conn.channel() mq_message_exchange = await channel.declare_exchange( name=config.rabbitmq.message_exchange.value, type=aio_pika.ExchangeType.TOPIC, From d9250bf6e8bfea0bca28a561e194f344d25eeddf Mon Sep 17 00:00:00 2001 From: Olivier Desenfans Date: Fri, 28 Apr 2023 12:00:48 +0200 Subject: [PATCH 2/2] Fix: delete RabbitMQ queues for unused websockets Problem: RabbitMQ queues can stay open indefinitely if the websocket filters only match few or no message. This can lead to increased resource consumption. Solution: add a large timeout (5 minutes) and check periodically if the websocket connection is still open. If the user is still listening, keep the queue open. Otherwise, delete the queue and terminate the request. --- src/aleph/web/controllers/messages.py | 45 ++++++++++++++++----------- 1 file changed, 27 insertions(+), 18 deletions(-) diff --git a/src/aleph/web/controllers/messages.py b/src/aleph/web/controllers/messages.py index f232f0e96..d7a49f84e 100644 --- a/src/aleph/web/controllers/messages.py +++ b/src/aleph/web/controllers/messages.py @@ -262,24 +262,33 @@ async def messages_ws(request: web.Request) -> web.WebSocketResponse: await ws.send_str(format_message(message).json()) try: - async with mq_queue.iterator() as queue_iter: - async for mq_message in queue_iter: - if ws.closed: - break - - await mq_message.ack() - item_hash = aleph_json.loads(mq_message.body)["item_hash"] - # A bastardized way to apply the filters on the message as well. - # TODO: put the filter key/values in the RabbitMQ message? - with session_factory() as session: - matching_messages = get_matching_messages( - session=session, - hashes=[item_hash], - include_confirmations=True, - **find_filters, - ) - for message in matching_messages: - await ws.send_str(format_message(message).json()) + # The timeout is necessary to check if the websocket connection is still open once in a while. + # Otherwise, we keep queues indefinitely if the user specifies filters that do not match + # a message. + async with mq_queue.iterator(timeout=300) as queue_iter: + while not ws.closed: + try: + async for mq_message in queue_iter: + if ws.closed: + break + + await mq_message.ack() + item_hash = aleph_json.loads(mq_message.body)["item_hash"] + # A bastardized way to apply the filters on the message as well. + # TODO: put the filter key/values in the RabbitMQ message? + with session_factory() as session: + matching_messages = get_matching_messages( + session=session, + hashes=[item_hash], + include_confirmations=True, + **find_filters, + ) + for message in matching_messages: + await ws.send_str(format_message(message).json()) + + except TimeoutError: + LOGGER.info("Message ws timeout. ws closed? %s", ws.closed) + pass except ConnectionResetError: pass