Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 9 additions & 3 deletions src/aleph/api_entrypoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
APP_STATE_NODE_CACHE,
APP_STATE_P2P_CLIENT,
APP_STATE_SESSION_FACTORY,
APP_STATE_STORAGE_SERVICE,
APP_STATE_STORAGE_SERVICE, APP_STATE_MQ_CHANNEL,
)


Expand Down Expand Up @@ -52,10 +52,16 @@ async def configure_aiohttp_app(

app = create_aiohttp_app()

# Reuse the connection of the P2P client to avoid opening two connections
mq_conn = p2p_client.mq_client.connection
# Channels are long-lived, so we create one at startup. Otherwise, we end up hitting
# the channel limit if we create a channel for each operation.
mq_channel = await mq_conn.channel()

app[APP_STATE_CONFIG] = config
app[APP_STATE_P2P_CLIENT] = p2p_client
# Reuse the connection of the P2P client to avoid opening two connections
app[APP_STATE_MQ_CONN] = p2p_client.mq_client.connection
app[APP_STATE_MQ_CONN] = mq_conn
app[APP_STATE_MQ_CHANNEL] = mq_channel
app[APP_STATE_NODE_CACHE] = node_cache
app[APP_STATE_STORAGE_SERVICE] = storage_service
app[APP_STATE_SESSION_FACTORY] = session_factory
Expand Down
5 changes: 5 additions & 0 deletions src/aleph/web/controllers/app_state_getters.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

APP_STATE_CONFIG = "config"
APP_STATE_MQ_CONN = "mq_conn"
APP_STATE_MQ_CHANNEL = "mq_channel"
APP_STATE_NODE_CACHE = "node_cache"
APP_STATE_P2P_CLIENT = "p2p_client"
APP_STATE_SESSION_FACTORY = "session_factory"
Expand Down Expand Up @@ -45,6 +46,10 @@ def get_mq_conn_from_request(request: web.Request) -> aio_pika.abc.AbstractConne
return cast(aio_pika.abc.AbstractConnection, request.app[APP_STATE_MQ_CONN])


def get_mq_channel_from_request(request: web.Request) -> aio_pika.abc.AbstractChannel:
return cast(aio_pika.abc.AbstractChannel, request.app[APP_STATE_MQ_CHANNEL])


def get_node_cache_from_request(request: web.Request) -> NodeCache:
return cast(NodeCache, request.app[APP_STATE_NODE_CACHE])

Expand Down
31 changes: 11 additions & 20 deletions src/aleph/web/controllers/messages.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,11 @@
from aleph.types.db_session import DbSessionFactory, DbSession
from aleph.types.message_status import MessageStatus
from aleph.types.sort_order import SortOrder, SortBy
from aleph.web.controllers.app_state_getters import get_session_factory_from_request
from aleph.web.controllers.app_state_getters import (
get_session_factory_from_request,
get_mq_channel_from_request,
get_config_from_request,
)
from aleph.web.controllers.utils import (
DEFAULT_MESSAGES_PER_PAGE,
DEFAULT_PAGE,
Expand Down Expand Up @@ -229,29 +233,16 @@ async def view_messages_list(request: web.Request) -> web.Response:
)


async def declare_mq_queue(
mq_conn: aio_pika.abc.AbstractConnection, config: Config
) -> aio_pika.abc.AbstractQueue:
channel = await mq_conn.channel()
mq_message_exchange = await channel.declare_exchange(
name=config.rabbitmq.message_exchange.value,
type=aio_pika.ExchangeType.TOPIC,
auto_delete=False,
)
mq_queue = await channel.declare_queue(auto_delete=True)
await mq_queue.bind(mq_message_exchange, routing_key="processed.*")
return mq_queue


async def messages_ws(request: web.Request):
async def messages_ws(request: web.Request) -> web.WebSocketResponse:
ws = web.WebSocketResponse()
await ws.prepare(request)

mq_conn: aio_pika.abc.AbstractConnection = request.app["mq_conn"]
session_factory: DbSessionFactory = request.app["session_factory"]
config = request.app["config"]
mq_channel: aio_pika.abc.AbstractChannel = get_mq_channel_from_request(request)
session_factory: DbSessionFactory = get_session_factory_from_request(request)
config = get_config_from_request(request)

mq_queue = await mq_make_aleph_message_topic_queue(
mq_conn=mq_conn, config=config, routing_key="processed.*"
channel=mq_channel, config=config, routing_key="processed.*"
)

query_params = WsMessageQueryParams.parse_obj(request.query)
Expand Down
19 changes: 14 additions & 5 deletions src/aleph/web/controllers/p2p.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
get_ipfs_service_from_request,
get_p2p_client_from_request,
get_mq_conn_from_request,
get_mq_channel_from_request,
)
from aleph.web.controllers.utils import mq_make_aleph_message_topic_queue

Expand Down Expand Up @@ -65,16 +66,22 @@ def _validate_request_data(config: Config, request_data: Dict) -> None:
# Only accept publishing on the message topic.
message_topic = config.aleph.queue_topic.value
if topic != message_topic:
raise web.HTTPForbidden(reason=f"Unauthorized P2P topic: {topic}. Use {message_topic}.")
raise web.HTTPForbidden(
reason=f"Unauthorized P2P topic: {topic}. Use {message_topic}."
)

data = request_data.get("data")
if not isinstance(data, str):
raise web.HTTPUnprocessableEntity(reason="'data': expected a serialized JSON string.")
raise web.HTTPUnprocessableEntity(
reason="'data': expected a serialized JSON string."
)

try:
message_dict = json.loads(cast(str, request_data.get("data")))
except ValueError:
raise web.HTTPUnprocessableEntity(reason="'data': must be deserializable as JSON.")
raise web.HTTPUnprocessableEntity(
reason="'data': must be deserializable as JSON."
)

_validate_message_dict(message_dict)

Expand Down Expand Up @@ -187,9 +194,11 @@ async def pub_message(request: web.Request):
config = get_config_from_request(request)

if request_data.sync:
mq_conn = get_mq_conn_from_request(request)
mq_channel = get_mq_channel_from_request(request)
mq_queue = await mq_make_aleph_message_topic_queue(
mq_conn=mq_conn, config=config, routing_key=f"*.{pending_message.item_hash}"
channel=mq_channel,
config=config,
routing_key=f"*.{pending_message.item_hash}",
)
else:
mq_queue = None
Expand Down
3 changes: 1 addition & 2 deletions src/aleph/web/controllers/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,11 +143,10 @@ def cond_output(request, context, template):


async def mq_make_aleph_message_topic_queue(
mq_conn: aio_pika.abc.AbstractConnection,
channel: aio_pika.abc.AbstractChannel,
config: Config,
routing_key: Optional[str] = None,
) -> aio_pika.abc.AbstractQueue:
channel = await mq_conn.channel()
mq_message_exchange = await channel.declare_exchange(
name=config.rabbitmq.message_exchange.value,
type=aio_pika.ExchangeType.TOPIC,
Expand Down