Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature: Allow concurrent RPC calls on rabbit implementation #1318

Open
a14n opened this issue Mar 18, 2024 · 5 comments
Open

Feature: Allow concurrent RPC calls on rabbit implementation #1318

a14n opened this issue Mar 18, 2024 · 5 comments
Labels
enhancement New feature or request RabbitMQ Issues related to `faststream.rabbit` module and RabbitMQ broker features

Comments

@a14n
Copy link

a14n commented Mar 18, 2024

Is your feature request related to a problem? Please describe.

Currently (with faststream-0.4.7) when multiple rpc publications are done simultaneously calls are done one after the other.
IIRC this is due to a AioPikaFastProducer._rpc_lock.

Describe the solution you'd like

It would be great that calls to rpc are done simultaneously (for instance with a correlation id as documented on https://aio-pika.readthedocs.io/en/latest/rabbitmq-tutorial/6-rpc.html).

Feature code example

client.py:

import asyncio
import datetime
import uuid

from faststream.rabbit import RabbitBroker

broker = RabbitBroker("amqp://guest:guest@localhost:5672/")


async def send(msg: str) -> None:
    print("sending", msg, datetime.datetime.now())
    r = await broker.publish(
        msg,
        "test",
        rpc=True,
        rpc_timeout=None,
    )
    print("received", r, datetime.datetime.now())


async def main() -> None:
    await broker.connect()
    await asyncio.gather(*[send(f"{i}:{uuid.uuid4()}") for i in range(10)])

if __name__ == "__main__":
    asyncio.run(main())

server.py

import asyncio

from faststream import FastStream
from faststream.rabbit import RabbitBroker

broker = RabbitBroker("amqp://guest:guest@localhost:5672/", max_consumers=10)


@broker.subscriber("test")
async def base_handler(body: str) -> str:
    print("received", body)
    await asyncio.sleep(1)
    return body


async def main() -> None:
    app = FastStream(broker)
    await app.run()  # blocking method

if __name__ == "__main__":
    asyncio.run(main())

logs :

sending 0:92d1c227-5b4f-468a-8779-723fff743252 2024-03-18 16:12:43.587208
sending 1:3d3e1780-fe2b-40b2-9a97-f75498c6d5a2 2024-03-18 16:12:43.589087
sending 2:9f25e75e-856e-409e-98a7-89244ed71860 2024-03-18 16:12:43.589135
sending 3:f1b008b3-434f-4281-9a07-b027063f65b9 2024-03-18 16:12:43.589204
sending 4:05abb242-9880-4a84-8d67-c1cca88ee2c1 2024-03-18 16:12:43.589230
sending 5:75914f26-290a-4fa5-9606-50d6d5312344 2024-03-18 16:12:43.589250
sending 6:08f436fa-8238-4aa8-be19-9d8cab9fd637 2024-03-18 16:12:43.589269
sending 7:67db3517-c552-46e7-95e5-0f0d171790f3 2024-03-18 16:12:43.589286
sending 8:c558b771-7058-48f8-a82c-05de46ea4b18 2024-03-18 16:12:43.589304
sending 9:095ea2da-7e09-4d84-9adb-f36284d12f12 2024-03-18 16:12:43.589321
received 0:92d1c227-5b4f-468a-8779-723fff743252 2024-03-18 16:12:44.606000
received 1:3d3e1780-fe2b-40b2-9a97-f75498c6d5a2 2024-03-18 16:12:45.619650
received 2:9f25e75e-856e-409e-98a7-89244ed71860 2024-03-18 16:12:46.627189
received 3:f1b008b3-434f-4281-9a07-b027063f65b9 2024-03-18 16:12:47.634551
received 4:05abb242-9880-4a84-8d67-c1cca88ee2c1 2024-03-18 16:12:48.646457
received 5:75914f26-290a-4fa5-9606-50d6d5312344 2024-03-18 16:12:49.655983
received 6:08f436fa-8238-4aa8-be19-9d8cab9fd637 2024-03-18 16:12:50.667091
received 7:67db3517-c552-46e7-95e5-0f0d171790f3 2024-03-18 16:12:51.676912
received 8:c558b771-7058-48f8-a82c-05de46ea4b18 2024-03-18 16:12:52.685301
received 9:095ea2da-7e09-4d84-9adb-f36284d12f12 2024-03-18 16:12:53.695455

Note that the last answer arrives 10s later (instead of 1s)

Describe alternatives you've considered

A workaround is to avoid faststream on client side and use rpc with correlation id as documented on https://aio-pika.readthedocs.io/en/latest/rabbitmq-tutorial/6-rpc.html

Additional context
None

@a14n a14n added the enhancement New feature or request label Mar 18, 2024
@Lancetnik
Copy link
Collaborator

Hello, @a14n!

Unfortunately, we can't use aio-pika RPC way. The main goal of FastStream RPC is do not create a special queue to consume responses. Thus, FastStream uses RMQ Direct Reply-to feature to send RPC requests. But, this mechanism has a limitation - you can't send multiple requests at the same time from one channel-connection pair. So, we can't send RPC requests concurrently until #975 is not released.

Although, RPC is not a correct case for RMQ: we already discussed it in #1252.

If you want to consume persistent request-reply stream, please create persistent subscriber and use reply_to header instead of RPC. If you want to match replies with original requests, you can use smth like the following snippet (from Discord)

from asyncio import Future
from contextlib import asynccontextmanager
from typing import Annotated, Any
from uuid import uuid4

from faststream import Context, FastStream, context
from faststream.rabbit import RabbitBroker


@asynccontextmanager
async def lifespan():
    context.set_global("replies_container", {})
    yield


RepliesContainer = Annotated[dict[str, Future[Any]], Context("replies_container")]

broker = RabbitBroker()
app = FastStream(broker, lifespan=lifespan)


@broker.subscriber("replies")
def handle_reply(
    data,
    container: RepliesContainer,
    cor_id: str = Context("message.correlation_id"),
):
    if (future := container.pop(cor_id, None)) and not future.done():
        future.set_result(data)


async def custom_rpc(
    broker: NatsBroker,
    msg: Any,
    subject: str,
    container: RepliesContainer,
) -> Any:
    cor_id = uuid4().hex
    container[cor_id] = result_future = Future()
    await broker.publish(msg, subject, correlation_id=cor_id)
    return await result_future


# Emulation

@broker.subscriber("in")
@broker.publisher("replies")
async def handle_request(data):
    """Your hardcoded not-FastStream service."""
    return data


@app.after_startup
async def t(container: RepliesContainer):
    response = await custom_rpc(broker, "test", "in", container)
    print(response)

This code is the same with aio-pika example thing, so it should solve your problem

@a14n
Copy link
Author

a14n commented Mar 18, 2024

Thanks for your quick reply.

@v2boardbot
Copy link

@broker.subscriber("replies")

Hello

If there are multiple producers, and multiple producers are subscribed to replies, and if messages sent by consumers to replies are received by other producers (consumers acting as replicas), then the producer who published the task will not receive the message.

@v2boardbot
Copy link

producer code:

import os
import random

from faststream.rabbit import RabbitBroker
from faststream import Context, FastStream, context
from fastapi import FastAPI
import uvicorn

from contextlib import asynccontextmanager
from asyncio import Future
from typing import Annotated, Any
import uuid


RepliesContainer = Annotated[dict[str, Future[Any]], Context("replies_container")]
broker = RabbitBroker()


@asynccontextmanager
async def fastapi_lifespan(fastapi_app: FastAPI):
    context.set_global("replies_container", {})
    await broker.start()
    yield
    await broker.close()


fastapi_app = FastAPI(lifespan=fastapi_lifespan)


@broker.subscriber("replies")
def handle_reply(
    data,
    container: RepliesContainer,
    cor_id: str = Context("message.correlation_id"),
):
    print('handle_reply receives message', data, cor_id, os.getpid())
    if (future := container.pop(cor_id, None)) and not future.done():
        future.set_result(data)


async def custom_rpc(
    broker: RabbitBroker,
    msg: Any,
    subject: str,
    container: RepliesContainer,
) -> Any:
    cor_id = uuid.uuid4().hex
    container[cor_id] = result_future = Future()
    print('Publish task', msg, cor_id, os.getpid())
    await broker.publish(msg, subject, correlation_id=cor_id)
    print('Publish task success', msg, cor_id, os.getpid())
    return await result_future


@fastapi_app.post('/push_task')
async def push_task():
    msg = 'test' + str(random.randint(100, 999))
    container = context.get('replies_container')
    response = await custom_rpc(broker, msg, "in", container)
    print('response received', msg, response, os.getpid())

Consumer code:

from faststream import Context, FastStream, context
from faststream.rabbit import RabbitBroker

from asyncio import Future
from contextlib import asynccontextmanager
from typing import Annotated, Any

@asynccontextmanager
async def lifespan():
    context.set_global("replies_container", {})
    yield

RepliesContainer = Annotated[dict[str, Future[Any]], Context("replies_container")]

broker = RabbitBroker()
app = FastStream(broker, lifespan=lifespan)


@broker.subscriber("in")
@broker.publisher("replies")
async def handle_request(data):
    """Your hardcoded not-FastStream service."""
    print('received task', data)
    return data

log:

Publish task test482 cc99276685bd4ac19227680b44743231 9744
Publish task success test482 cc99276685bd4ac19227680b44743231 9744
INFO:     Application startup complete.
2024-04-03 11:18:44,145 INFO     - default | replies | 1b9d8205b0 - Received
handle_reply receives message test482 cc99276685bd4ac19227680b44743231 9744
response received test482 test482 9744
INFO:     127.0.0.1:16378 - "POST /push_task HTTP/1.1" 200 OK
2024-04-03 11:18:44,155 INFO     - default | replies | 1b9d8205b0 - Processed
Publish task test225 46555d6651c943d5a835f10e88a79789 9744
Publish task success test225 46555d6651c943d5a835f10e88a79789 9744
2024-04-03 11:18:47,772 INFO     - default | replies | 95783da1b1 - Received
response received test225 46555d6651c943d5a835f10e88a79789 14688
2024-04-03 11:18:47,780 INFO     - default | replies | 95783da1b1 - Processed

When the process with pid 9744 publishes a task, and the response happens to be received by 9744, then the response is normal. However, if the task published by 9744 is received by 14688, the response cannot be obtained.

Are there any good ideas to solve this problem?

@v2boardbot
Copy link

Are there any good ideas to solve this problem?

Already solved, the queue is generated based on the pid, and the consumer adds messages to the specified queue.

@broker.subscriber("in")
async def handle_request(data: dict, cor_id: str = Context("message.correlation_id")):
    """Your hardcoded not-FastStream service."""
    print('收到任务', data, cor_id)
    reply_queue = data.pop('reply_queue', None)
    await broker.publish(message=data, queue=reply_queue, correlation_id=cor_id)

@Lancetnik Lancetnik added the RabbitMQ Issues related to `faststream.rabbit` module and RabbitMQ broker features label May 16, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request RabbitMQ Issues related to `faststream.rabbit` module and RabbitMQ broker features
Projects
None yet
Development

No branches or pull requests

3 participants