Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion src/aleph/api_entrypoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
APP_STATE_NODE_CACHE,
APP_STATE_P2P_CLIENT,
APP_STATE_SESSION_FACTORY,
APP_STATE_STORAGE_SERVICE, APP_STATE_MQ_CHANNEL,
APP_STATE_STORAGE_SERVICE, APP_STATE_MQ_CHANNEL, APP_STATE_MQ_WS_CHANNEL,
)


Expand Down Expand Up @@ -57,11 +57,13 @@ async def configure_aiohttp_app(
# Channels are long-lived, so we create one at startup. Otherwise, we end up hitting
# the channel limit if we create a channel for each operation.
mq_channel = await mq_conn.channel()
mq_ws_channel = await mq_conn.channel()

app[APP_STATE_CONFIG] = config
app[APP_STATE_P2P_CLIENT] = p2p_client
app[APP_STATE_MQ_CONN] = mq_conn
app[APP_STATE_MQ_CHANNEL] = mq_channel
app[APP_STATE_MQ_WS_CHANNEL] = mq_ws_channel
app[APP_STATE_NODE_CACHE] = node_cache
app[APP_STATE_STORAGE_SERVICE] = storage_service
app[APP_STATE_SESSION_FACTORY] = session_factory
Expand Down
8 changes: 8 additions & 0 deletions src/aleph/web/controllers/app_state_getters.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@
APP_STATE_CONFIG = "config"
APP_STATE_MQ_CONN = "mq_conn"
APP_STATE_MQ_CHANNEL = "mq_channel"
# RabbitMQ channel dedicated to websocket operations.
# A yet to be understood issue causes the websocket channel to close unexpectedly.
# We use a dedicated channel to avoid propagation of the issue to other endpoints.
APP_STATE_MQ_WS_CHANNEL = "mq_ws_channel"
APP_STATE_NODE_CACHE = "node_cache"
APP_STATE_P2P_CLIENT = "p2p_client"
APP_STATE_SESSION_FACTORY = "session_factory"
Expand Down Expand Up @@ -50,6 +54,10 @@ def get_mq_channel_from_request(request: web.Request) -> aio_pika.abc.AbstractCh
return cast(aio_pika.abc.AbstractChannel, request.app[APP_STATE_MQ_CHANNEL])


def get_mq_ws_channel_from_request(request: web.Request) -> aio_pika.abc.AbstractChannel:
return cast(aio_pika.abc.AbstractChannel, request.app[APP_STATE_MQ_WS_CHANNEL])


def get_node_cache_from_request(request: web.Request) -> NodeCache:
return cast(NodeCache, request.app[APP_STATE_NODE_CACHE])

Expand Down
4 changes: 2 additions & 2 deletions src/aleph/web/controllers/messages.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
from aleph.web.controllers.app_state_getters import (
get_session_factory_from_request,
get_mq_channel_from_request,
get_config_from_request,
get_config_from_request, get_mq_ws_channel_from_request,
)
from aleph.web.controllers.utils import (
DEFAULT_MESSAGES_PER_PAGE,
Expand Down Expand Up @@ -301,7 +301,7 @@ async def messages_ws(request: web.Request) -> web.WebSocketResponse:

config = get_config_from_request(request)
session_factory = get_session_factory_from_request(request)
mq_channel = get_mq_channel_from_request(request)
mq_channel = get_mq_ws_channel_from_request(request)

try:
query_params = WsMessageQueryParams.parse_obj(request.query)
Expand Down