Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 36 additions & 5 deletions src/aleph/web/controllers/app_state_getters.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
This module provides an abstraction layer over the dictionary keys used to
address these objects.
"""

import logging
from typing import Optional, cast, TypeVar

import aio_pika.abc
Expand Down Expand Up @@ -50,12 +50,43 @@ def get_mq_conn_from_request(request: web.Request) -> aio_pika.abc.AbstractConne
return cast(aio_pika.abc.AbstractConnection, request.app[APP_STATE_MQ_CONN])


def get_mq_channel_from_request(request: web.Request) -> aio_pika.abc.AbstractChannel:
return cast(aio_pika.abc.AbstractChannel, request.app[APP_STATE_MQ_CHANNEL])
async def _get_open_channel(
request: web.Request, channel_name: str, logger: logging.Logger
) -> aio_pika.abc.AbstractChannel:
channel = cast(aio_pika.abc.AbstractChannel, request.app[channel_name])
if channel.is_closed:
# This should not happen, but does happen in practice because of RabbitMQ
# RPC timeouts. We need to figure out where this timeout comes from,
# but reopening the channel is mandatory to keep the endpoints using the MQ
# functional.
logger.error("%s channel is closed, reopening it", channel_name)
await channel.reopen()

return channel


async def get_mq_channel_from_request(
request: web.Request, logger: logging.Logger
) -> aio_pika.abc.AbstractChannel:
"""
Gets the MQ channel from the app state and reopens it if needed.
"""

return await _get_open_channel(
request=request, channel_name=APP_STATE_MQ_CHANNEL, logger=logger
)


async def get_mq_ws_channel_from_request(
request: web.Request, logger: logging.Logger
) -> aio_pika.abc.AbstractChannel:
"""
Gets the websocket MQ channel from the app state and reopens it if needed.
"""

def get_mq_ws_channel_from_request(request: web.Request) -> aio_pika.abc.AbstractChannel:
return cast(aio_pika.abc.AbstractChannel, request.app[APP_STATE_MQ_WS_CHANNEL])
return await _get_open_channel(
request=request, channel_name=APP_STATE_MQ_WS_CHANNEL, logger=logger
)


def get_node_cache_from_request(request: web.Request) -> NodeCache:
Expand Down
2 changes: 1 addition & 1 deletion src/aleph/web/controllers/messages.py
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,7 @@ async def messages_ws(request: web.Request) -> web.WebSocketResponse:

config = get_config_from_request(request)
session_factory = get_session_factory_from_request(request)
mq_channel = get_mq_ws_channel_from_request(request)
mq_channel = await get_mq_ws_channel_from_request(request=request, logger=LOGGER)

try:
query_params = WsMessageQueryParams.parse_obj(request.query)
Expand Down
2 changes: 1 addition & 1 deletion src/aleph/web/controllers/p2p.py
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ async def pub_message(request: web.Request):
config = get_config_from_request(request)

if request_data.sync:
mq_channel = get_mq_channel_from_request(request)
mq_channel = await get_mq_channel_from_request(request=request, logger=LOGGER)
mq_queue = await mq_make_aleph_message_topic_queue(
channel=mq_channel,
config=config,
Expand Down