From b5c6ad3bf15023ceb6a1f02c15c0e3a8b6dc8124 Mon Sep 17 00:00:00 2001 From: Olivier Desenfans Date: Fri, 28 Apr 2023 11:59:17 +0200 Subject: [PATCH] Fix: support very large number of open websockets Problem: opening more than ~2000 message websockets hits the channel limit of RabbitMQ and resets the connection. Solution: create one channel per API process, as they are long-lived and should not be created for each operation according to the doc. --- src/aleph/api_entrypoint.py | 12 +++++-- .../web/controllers/app_state_getters.py | 5 +++ src/aleph/web/controllers/messages.py | 31 +++++++------------ src/aleph/web/controllers/p2p.py | 19 +++++++++--- src/aleph/web/controllers/utils.py | 3 +- 5 files changed, 40 insertions(+), 30 deletions(-) diff --git a/src/aleph/api_entrypoint.py b/src/aleph/api_entrypoint.py index cb4834084..0690b7b2d 100644 --- a/src/aleph/api_entrypoint.py +++ b/src/aleph/api_entrypoint.py @@ -21,7 +21,7 @@ APP_STATE_NODE_CACHE, APP_STATE_P2P_CLIENT, APP_STATE_SESSION_FACTORY, - APP_STATE_STORAGE_SERVICE, + APP_STATE_STORAGE_SERVICE, APP_STATE_MQ_CHANNEL, ) @@ -52,10 +52,16 @@ async def configure_aiohttp_app( app = create_aiohttp_app() + # Reuse the connection of the P2P client to avoid opening two connections + mq_conn = p2p_client.mq_client.connection + # Channels are long-lived, so we create one at startup. Otherwise, we end up hitting + # the channel limit if we create a channel for each operation. + mq_channel = await mq_conn.channel() + app[APP_STATE_CONFIG] = config app[APP_STATE_P2P_CLIENT] = p2p_client - # Reuse the connection of the P2P client to avoid opening two connections - app[APP_STATE_MQ_CONN] = p2p_client.mq_client.connection + app[APP_STATE_MQ_CONN] = mq_conn + app[APP_STATE_MQ_CHANNEL] = mq_channel app[APP_STATE_NODE_CACHE] = node_cache app[APP_STATE_STORAGE_SERVICE] = storage_service app[APP_STATE_SESSION_FACTORY] = session_factory diff --git a/src/aleph/web/controllers/app_state_getters.py b/src/aleph/web/controllers/app_state_getters.py index 5b29c7337..2adb6f6d5 100644 --- a/src/aleph/web/controllers/app_state_getters.py +++ b/src/aleph/web/controllers/app_state_getters.py @@ -18,6 +18,7 @@ APP_STATE_CONFIG = "config" APP_STATE_MQ_CONN = "mq_conn" +APP_STATE_MQ_CHANNEL = "mq_channel" APP_STATE_NODE_CACHE = "node_cache" APP_STATE_P2P_CLIENT = "p2p_client" APP_STATE_SESSION_FACTORY = "session_factory" @@ -45,6 +46,10 @@ def get_mq_conn_from_request(request: web.Request) -> aio_pika.abc.AbstractConne return cast(aio_pika.abc.AbstractConnection, request.app[APP_STATE_MQ_CONN]) +def get_mq_channel_from_request(request: web.Request) -> aio_pika.abc.AbstractChannel: + return cast(aio_pika.abc.AbstractChannel, request.app[APP_STATE_MQ_CHANNEL]) + + def get_node_cache_from_request(request: web.Request) -> NodeCache: return cast(NodeCache, request.app[APP_STATE_NODE_CACHE]) diff --git a/src/aleph/web/controllers/messages.py b/src/aleph/web/controllers/messages.py index 10d1b145b..f232f0e96 100644 --- a/src/aleph/web/controllers/messages.py +++ b/src/aleph/web/controllers/messages.py @@ -31,7 +31,11 @@ from aleph.types.db_session import DbSessionFactory, DbSession from aleph.types.message_status import MessageStatus from aleph.types.sort_order import SortOrder, SortBy -from aleph.web.controllers.app_state_getters import get_session_factory_from_request +from aleph.web.controllers.app_state_getters import ( + get_session_factory_from_request, + get_mq_channel_from_request, + get_config_from_request, +) from aleph.web.controllers.utils import ( DEFAULT_MESSAGES_PER_PAGE, DEFAULT_PAGE, @@ -229,29 +233,16 @@ async def view_messages_list(request: web.Request) -> web.Response: ) -async def declare_mq_queue( - mq_conn: aio_pika.abc.AbstractConnection, config: Config -) -> aio_pika.abc.AbstractQueue: - channel = await mq_conn.channel() - mq_message_exchange = await channel.declare_exchange( - name=config.rabbitmq.message_exchange.value, - type=aio_pika.ExchangeType.TOPIC, - auto_delete=False, - ) - mq_queue = await channel.declare_queue(auto_delete=True) - await mq_queue.bind(mq_message_exchange, routing_key="processed.*") - return mq_queue - - -async def messages_ws(request: web.Request): +async def messages_ws(request: web.Request) -> web.WebSocketResponse: ws = web.WebSocketResponse() await ws.prepare(request) - mq_conn: aio_pika.abc.AbstractConnection = request.app["mq_conn"] - session_factory: DbSessionFactory = request.app["session_factory"] - config = request.app["config"] + mq_channel: aio_pika.abc.AbstractChannel = get_mq_channel_from_request(request) + session_factory: DbSessionFactory = get_session_factory_from_request(request) + config = get_config_from_request(request) + mq_queue = await mq_make_aleph_message_topic_queue( - mq_conn=mq_conn, config=config, routing_key="processed.*" + channel=mq_channel, config=config, routing_key="processed.*" ) query_params = WsMessageQueryParams.parse_obj(request.query) diff --git a/src/aleph/web/controllers/p2p.py b/src/aleph/web/controllers/p2p.py index 10bae3a2b..6ee0ba88a 100644 --- a/src/aleph/web/controllers/p2p.py +++ b/src/aleph/web/controllers/p2p.py @@ -24,6 +24,7 @@ get_ipfs_service_from_request, get_p2p_client_from_request, get_mq_conn_from_request, + get_mq_channel_from_request, ) from aleph.web.controllers.utils import mq_make_aleph_message_topic_queue @@ -65,16 +66,22 @@ def _validate_request_data(config: Config, request_data: Dict) -> None: # Only accept publishing on the message topic. message_topic = config.aleph.queue_topic.value if topic != message_topic: - raise web.HTTPForbidden(reason=f"Unauthorized P2P topic: {topic}. Use {message_topic}.") + raise web.HTTPForbidden( + reason=f"Unauthorized P2P topic: {topic}. Use {message_topic}." + ) data = request_data.get("data") if not isinstance(data, str): - raise web.HTTPUnprocessableEntity(reason="'data': expected a serialized JSON string.") + raise web.HTTPUnprocessableEntity( + reason="'data': expected a serialized JSON string." + ) try: message_dict = json.loads(cast(str, request_data.get("data"))) except ValueError: - raise web.HTTPUnprocessableEntity(reason="'data': must be deserializable as JSON.") + raise web.HTTPUnprocessableEntity( + reason="'data': must be deserializable as JSON." + ) _validate_message_dict(message_dict) @@ -187,9 +194,11 @@ async def pub_message(request: web.Request): config = get_config_from_request(request) if request_data.sync: - mq_conn = get_mq_conn_from_request(request) + mq_channel = get_mq_channel_from_request(request) mq_queue = await mq_make_aleph_message_topic_queue( - mq_conn=mq_conn, config=config, routing_key=f"*.{pending_message.item_hash}" + channel=mq_channel, + config=config, + routing_key=f"*.{pending_message.item_hash}", ) else: mq_queue = None diff --git a/src/aleph/web/controllers/utils.py b/src/aleph/web/controllers/utils.py index dbfed1c5a..aec2bb9eb 100644 --- a/src/aleph/web/controllers/utils.py +++ b/src/aleph/web/controllers/utils.py @@ -143,11 +143,10 @@ def cond_output(request, context, template): async def mq_make_aleph_message_topic_queue( - mq_conn: aio_pika.abc.AbstractConnection, + channel: aio_pika.abc.AbstractChannel, config: Config, routing_key: Optional[str] = None, ) -> aio_pika.abc.AbstractQueue: - channel = await mq_conn.channel() mq_message_exchange = await channel.declare_exchange( name=config.rabbitmq.message_exchange.value, type=aio_pika.ExchangeType.TOPIC,