Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bug:Channel is not accessible when setup_state=False #989

Closed
kgadek93 opened this issue Nov 24, 2023 · 6 comments
Closed

Bug:Channel is not accessible when setup_state=False #989

kgadek93 opened this issue Nov 24, 2023 · 6 comments
Labels
bug Something isn't working

Comments

@kgadek93
Copy link

kgadek93 commented Nov 24, 2023

Describe the bug

Channel is not accessible when setup_state=False.
When i try to publish message i see this error:
"RabbitBroker channel is not started yet". Here is full traceback:

Traceback (most recent call last):\n  File \"/usr/local/lib/python3.11/site-packages/anyio/streams/memory.py\", line 98, in receive\n    return self.receive_nowait()\n           ^^^^^^^^^^^^^^^^^^^^^\n  File \"/usr/local/lib/python3.11/site-packages/anyio/streams/memory.py\", line 93, in receive_nowait\n    raise WouldBlock\nanyio.WouldBlock\n\nDuring handling of the above exception, another exception occurred:\n\nTraceback (most recent call last):\n  File \"/usr/local/lib/python3.11/site-packages/starlette/middleware/base.py\", line 78, in call_next\n    message = await recv_stream.receive()\n              ^^^^^^^^^^^^^^^^^^^^^^^^^^^\n  File \"/usr/local/lib/python3.11/site-packages/anyio/streams/memory.py\", line 118, in receive\n    raise EndOfStream\nanyio.EndOfStream\n\nDuring handling of the above exception, another exception occurred:\n\nTraceback (most recent call last):\n  File \"/code/src/main_fastapi.py\", line 87, in http_middleware\n    response = await call_next(request)\n               ^^^^^^^^^^^^^^^^^^^^^^^^\n  File \"/usr/local/lib/python3.11/site-packages/starlette/middleware/base.py\", line 84, in call_next\n    raise app_exc\n  File \"/usr/local/lib/python3.11/site-packages/starlette/middleware/base.py\", line 70, in coro\n    await self.app(scope, receive_or_disconnect, send_no_error)\n  File \"/usr/local/lib/python3.11/site-packages/starlette_context/middleware/raw_middleware.py\", line 92, in __call__\n    await self.app(scope, receive, send_wrapper)\n  File \"/usr/local/lib/python3.11/site-packages/starlette/middleware/base.py\", line 108, in __call__\n    response = await self.dispatch_func(request, call_next)\n               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^\n  File \"/code/src/middleware/add_spans_attrs_middleware.py\", line 37, in dispatch\n    response = await call_next(request)\n               ^^^^^^^^^^^^^^^^^^^^^^^^\n  File \"/usr/local/lib/python3.11/site-packages/starlette/middleware/base.py\", line 84, in call_next\n    raise app_exc\n  File \"/usr/local/lib/python3.11/site-packages/starlette/middleware/base.py\", line 70, in coro\n    await self.app(scope, receive_or_disconnect, send_no_error)\n  File \"/usr/local/lib/python3.11/site-packages/starlette/middleware/exceptions.py\", line 79, in __call__\n    raise exc\n  File \"/usr/local/lib/python3.11/site-packages/starlette/middleware/exceptions.py\", line 68, in __call__\n    await self.app(scope, receive, sender)\n  File \"/usr/local/lib/python3.11/site-packages/fastapi/middleware/asyncexitstack.py\", line 20, in __call__\n    raise e\n  File \"/usr/local/lib/python3.11/site-packages/fastapi/middleware/asyncexitstack.py\", line 17, in __call__\n    await self.app(scope, receive, send)\n  File \"/usr/local/lib/python3.11/site-packages/starlette/routing.py\", line 718, in __call__\n    await route.handle(scope, receive, send)\n  File \"/usr/local/lib/python3.11/site-packages/starlette/routing.py\", line 276, in handle\n    await self.app(scope, receive, send)\n  File \"/usr/local/lib/python3.11/site-packages/starlette/routing.py\", line 66, in app\n    response = await func(request)\n               ^^^^^^^^^^^^^^^^^^^\n  File \"/usr/local/lib/python3.11/site-packages/fastapi/routing.py\", line 273, in app\n    raw_response = await run_endpoint_function(\n                   ^^^^^^^^^^^^^^^^^^^^^^^^^^^^\n  File \"/usr/local/lib/python3.11/site-packages/fastapi/routing.py\", line 190, in run_endpoint_function\n    return await dependant.call(**values)\n           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^\n  File \"/code/src/woflow/router.py\", line 47, in create_woflow_import\n    await rabbit_broker.publish(\n  File \"/usr/local/lib/python3.11/site-packages/faststream/rabbit/broker.py\", line 469, in publish\n    assert self._producer, \"RabbitBroker channel is not started yet\"  # nosec B101\n    ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^\nAssertionError: RabbitBroker channel is not started yet"

How to reproduce
Include source code:

from faststream import FastStream
app = FastAPI(
    title="my-service",
    version=settings.RELEASE_VERSION,
)


router = RabbitRouter(
        f"amqp://{settings.RABBITMQ.USER}:{settings.RABBITMQ.PASS}@{settings.RABBITMQ.HOST}:{settings.RABBITMQ.PORT}",
        include_in_schema=True,
        setup_state=False,
    )
app.include_router(router, prefix=BASE_URL, tags=["router"])


@router.post(
    f"{BASE_URL}/",
    summary="Create a new message",
    response_description=NO_CONTENT_RESPONSE_DESCRIPTION,
    status_code=status.HTTP_200_OK,
)
async def create_msg(
    rabbit_broker: Annotated[RabbitBroker, Depends(broker)],
    request: Request,
):  # type: ignore
    await rabbit_broker.publish(
        ImportMessage(
            msg="msg"
        ),
        queue=QUEUE_NAME,
        persist=True,
    )
    return "OK"

...

And/Or steps to reproduce the behavior:

  1. ...

Expected behavior
Explain what you expected to happen clearly and concisely.
Message should be added successfully to queue.
Observed behavior
Message wasn't added into the queue and it's raising an error.
Screenshots
If applicable, attach screenshots to help illustrate the problem.

Environment
Include the output of the faststream -v command to display your current project and system environment.
I'm using faststream so i run server like that:
uvicorn main_fastapi:app --host 0.0.0.0 --reload --port 9080

Additional context
Provide any other relevant context or information about the problem here.

@kgadek93 kgadek93 added the bug Something isn't working label Nov 24, 2023
@Lancetnik
Copy link
Member

@kgadek93 you just forgot to setup your router with FastAPI lifespan. It is an important part (and, unfortunatelly, required one) until tiangolo doesn't merge this PR

Correct code shoud looks like:

from fastapi import FastAPI
from faststream.rabbit.fastapi import RabbitRouter

router = RabbitRouter(...)
app = FastAPI(..., lifespan=router.lifespan_context)  # you forgot about this one
app.include_router(router, ...)

@kgadek93
Copy link
Author

kgadek93 commented Jan 4, 2024

Hi, @Lancetnik I've check that and I've added lifespan like this:

app = FastAPI(
    title="my-service",
    middleware=middlewares,
    version=settings.RELEASE_VERSION,
    dependencies=[Depends(initialize_context)],
    lifespan=Lifespans([database_lifespan, import_router.lifespan_context]),
)

but I still see this issue:

fastapi_1                | {"caller": "/usr/local/lib/python3.11/site-packages/uvicorn/lifespan/on.py:121", "ts": "2024-01-04T13:58:24.317656+00:00", "level": "ERROR", "message": "Traceback (most recent call last):\n  File \"/usr/local/lib/python3.11/site-packages/aiormq/connection.py\", line 455, in connect\n    reader, writer = await asyncio.open_connection(\n                     ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^\n  File \"/usr/local/lib/python3.11/asyncio/streams.py\", line 48, in open_connection\n    transport, _ = await loop.create_connection(\n                   ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^\n  File \"uvloop/loop.pyx\", line 1978, in create_connection\nsocket.gaierror: [Errno -5] No address associated with hostname\n\nThe above exception was the direct cause of the following exception:\n\nTraceback (most recent call last):\n  File \"/usr/local/lib/python3.11/site-packages/starlette/routing.py\", line 677, in lifespan\n    async with self.lifespan_context(app) as maybe_state:\n  File \"/usr/local/lib/python3.11/contextlib.py\", line 210, in __aenter__\n    return await anext(self.gen)\n           ^^^^^^^^^^^^^^^^^^^^^\n  File \"/code/src/config/lifespans.py\", line 30, in _lifespan_manager\n    await exit_stack.enter_async_context(lifespan(app))\n  File \"/usr/local/lib/python3.11/contextlib.py\", line 650, in enter_async_context\n    result = await _enter(cm)\n             ^^^^^^^^^^^^^^^^\n  File \"/usr/local/lib/python3.11/contextlib.py\", line 210, in __aenter__\n    return await anext(self.gen)\n           ^^^^^^^^^^^^^^^^^^^^^\n  File \"/usr/local/lib/python3.11/site-packages/faststream/broker/fastapi/router.py\", line 334, in start_broker_lifespan\n    await self.broker.start()\n  File \"/usr/local/lib/python3.11/site-packages/faststream/rabbit/broker.py\", line 287, in start\n    await super().start()\n  File \"/usr/local/lib/python3.11/site-packages/faststream/broker/core/asyncronous.py\", line 89, in start\n    await self.connect()\n  File \"/usr/local/lib/python3.11/site-packages/faststream/rabbit/broker.py\", line 204, in connect\n    connection = await super().connect(*args, **kwargs)\n                 ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^\n  File \"/usr/local/lib/python3.11/site-packages/faststream/broker/core/asyncronous.py\", line 320, in connect\n    self._connection = await self._connect(**_kwargs)\n                       ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^\n  File \"/usr/local/lib/python3.11/site-packages/faststream/rabbit/broker.py\", line 233, in _connect\n    await aio_pika.connect_robust(\n  File \"/usr/local/lib/python3.11/site-packages/aio_pika/robust_connection.py\", line 338, in connect_robust\n    await connection.connect(timeout=timeout)\n  File \"/usr/local/lib/python3.11/site-packages/aio_pika/robust_connection.py\", line 184, in connect\n    await self.__fail_fast_future\n  File \"/usr/local/lib/python3.11/site-packages/aio_pika/robust_connection.py\", line 139, in __connection_factory\n    await Connection.connect(self, self.__connect_timeout)\n  File \"/usr/local/lib/python3.11/site-packages/aio_pika/connection.py\", line 118, in connect\n    self.transport = await UnderlayConnection.connect(\n                     ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^\n  File \"/usr/local/lib/python3.11/site-packages/aio_pika/abc.py\", line 666, in connect\n    connection = await cls.make_connection(\n                 ^^^^^^^^^^^^^^^^^^^^^^^^^^\n  File \"/usr/local/lib/python3.11/site-packages/aio_pika/abc.py\", line 654, in make_connection\n    connection: aiormq.abc.AbstractConnection = await asyncio.wait_for(\n                                                ^^^^^^^^^^^^^^^^^^^^^^^\n  File \"/usr/local/lib/python3.11/asyncio/tasks.py\", line 452, in wait_for\n    return await fut\n           ^^^^^^^^^\n  File \"/usr/local/lib/python3.11/site-packages/aiormq/connection.py\", line 918, in connect\n    await connection.connect(client_properties or {})\n  File \"/usr/local/lib/python3.11/site-packages/aiormq/base.py\", line 164, in wrap\n    return await self.create_task(func(self, *args, **kwargs))\n           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^\n  File \"/usr/local/lib/python3.11/site-packages/aiormq/abc.py\", line 44, in __inner\n    return await self.task\n           ^^^^^^^^^^^^^^^\n  File \"/usr/local/lib/python3.11/site-packages/aiormq/connection.py\", line 462, in connect\n    raise AMQPConnectionError(*e.args) from e\naiormq.exceptions.AMQPConnectionError: [Errno -5] No address associated with hostname\n", "error": "Traceback (most recent call last):\n  File \"/usr/local/lib/python3.11/site-packages/aiormq/connection.py\", line 455, in connect\n    reader, writer = await asyncio.open_connection(\n                     ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^\n  File \"/usr/local/lib/python3.11/asyncio/streams.py\", line 48, in open_connection\n    transport, _ = await loop.create_connection(\n                   ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^\n  File \"uvloop/loop.pyx\", line 1978, in create_connection\nsocket.gaierror: [Errno -5] No address associated with hostname\n\nThe above exception was the direct cause of the following exception:\n\nTraceback (most recent call last):\n  File \"/usr/local/lib/python3.11/site-packages/starlette/routing.py\", line 677, in lifespan\n    async with self.lifespan_context(app) as maybe_state:\n  File \"/usr/local/lib/python3.11/contextlib.py\", line 210, in __aenter__\n    return await anext(self.gen)\n           ^^^^^^^^^^^^^^^^^^^^^\n  File \"/code/src/config/lifespans.py\", line 30, in _lifespan_manager\n    await exit_stack.enter_async_context(lifespan(app))\n  File \"/usr/local/lib/python3.11/contextlib.py\", line 650, in enter_async_context\n    result = await _enter(cm)\n             ^^^^^^^^^^^^^^^^\n  File \"/usr/local/lib/python3.11/contextlib.py\", line 210, in __aenter__\n    return await anext(self.gen)\n           ^^^^^^^^^^^^^^^^^^^^^\n  File \"/usr/local/lib/python3.11/site-packages/faststream/broker/fastapi/router.py\", line 334, in start_broker_lifespan\n    await self.broker.start()\n  File \"/usr/local/lib/python3.11/site-packages/faststream/rabbit/broker.py\", line 287, in start\n    await super().start()\n  File \"/usr/local/lib/python3.11/site-packages/faststream/broker/core/asyncronous.py\", line 89, in start\n    await self.connect()\n  File \"/usr/local/lib/python3.11/site-packages/faststream/rabbit/broker.py\", line 204, in connect\n    connection = await super().connect(*args, **kwargs)\n                 ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^\n  File \"/usr/local/lib/python3.11/site-packages/faststream/broker/core/asyncronous.py\", line 320, in connect\n    self._connection = await self._connect(**_kwargs)\n                       ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^\n  File \"/usr/local/lib/python3.11/site-packages/faststream/rabbit/broker.py\", line 233, in _connect\n    await aio_pika.connect_robust(\n  File \"/usr/local/lib/python3.11/site-packages/aio_pika/robust_connection.py\", line 338, in connect_robust\n    await connection.connect(timeout=timeout)\n  File \"/usr/local/lib/python3.11/site-packages/aio_pika/robust_connection.py\", line 184, in connect\n    await self.__fail_fast_future\n  File \"/usr/local/lib/python3.11/site-packages/aio_pika/robust_connection.py\", line 139, in __connection_factory\n    await Connection.connect(self, self.__connect_timeout)\n  File \"/usr/local/lib/python3.11/site-packages/aio_pika/connection.py\", line 118, in connect\n    self.transport = await UnderlayConnection.connect(\n                     ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^\n  File \"/usr/local/lib/python3.11/site-packages/aio_pika/abc.py\", line 666, in connect\n    connection = await cls.make_connection(\n                 ^^^^^^^^^^^^^^^^^^^^^^^^^^\n  File \"/usr/local/lib/python3.11/site-packages/aio_pika/abc.py\", line 654, in make_connection\n    connection: aiormq.abc.AbstractConnection = await asyncio.wait_for(\n                                                ^^^^^^^^^^^^^^^^^^^^^^^\n  File \"/usr/local/lib/python3.11/asyncio/tasks.py\", line 452, in wait_for\n    return await fut\n           ^^^^^^^^^\n  File \"/usr/local/lib/python3.11/site-packages/aiormq/connection.py\", line 918, in connect\n    await connection.connect(client_properties or {})\n  File \"/usr/local/lib/python3.11/site-packages/aiormq/base.py\", line 164, in wrap\n    return await self.create_task(func(self, *args, **kwargs))\n           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^\n  File \"/usr/local/lib/python3.11/site-packages/aiormq/abc.py\", line 44, in __inner\n    return await self.task\n           ^^^^^^^^^^^^^^^\n  File \"/usr/local/lib/python3.11/site-packages/aiormq/connection.py\", line 462, in connect\n    raise AMQPConnectionError(*e.args) from e\naiormq.exceptions.AMQPConnectionError: [Errno -5] No address associated with hostname\n", "data": {"queue": "", "exchange": "", "message_id": ""}}
fastapi_1                | {"caller": "/usr/local/lib/python3.11/site-packages/uvicorn/lifespan/on.py:57", "ts": "2024-01-04T13:58:24.317850+00:00", "level": "ERROR", "message": "Application startup failed. Exiting.", "error": "Application startup failed. Exiting.", "data": {"queue": "", "exchange": "", "message_id": ""}}

@Lancetnik
Copy link
Member

@kgadek93

No address associated with hostname

Read the error message, please. You are trying to connect to not-existed RMQ instance

@kgadek93
Copy link
Author

kgadek93 commented Jan 8, 2024

hi,
@Lancetnik perhaps I have formulated the problem incorrectly...
I would like to setup rabbitmq when app is starting, but if sth is wrong E.g RMQ doesn't exist I would like to log this and start app anyway. Rabbitmq is not crucial for my application. It there any param in lifespan_context for that?

@Lancetnik
Copy link
Member

Nope, but you can create your custom lifespan and start broker manually to reach that behavior

@kgadek93
Copy link
Author

kgadek93 commented Jan 9, 2024

ok @Lancetnik I get it, thank you for your reply

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

No branches or pull requests

2 participants