Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bug: AsyncConfluentProducer / AsyncConfluentConsumer are not that async #1363

Closed
lecko-cngroup opened this issue Apr 11, 2024 · 1 comment · Fixed by #1556
Closed

Bug: AsyncConfluentProducer / AsyncConfluentConsumer are not that async #1363

lecko-cngroup opened this issue Apr 11, 2024 · 1 comment · Fixed by #1556
Assignees
Labels
bug Something isn't working Confluent Issues related to `faststream.confluent` module

Comments

@lecko-cngroup
Copy link

Both classes from faststream.confluent.client call blocking code in their constructors, self.producer.list_topics() and create_topics(topics=self.topics, config=self.config) respectively. When Kafka cluster is not reachable (DNS issues, firewall, ...), FastAPI application does not reach started state (uvicorn does not print INFO: Uvicorn running on http://127.0.0.1:8080 (Press CTRL+C to quit)) and endpoints do not respond to requests.

How to reproduce
Start example at https://faststream.airt.ai/latest/getting-started/integrations/fastapi/#__tabbed_1_2 without broker, try to fetch http://localhost:8080/docs (I'm getting timeout)

With broker still unreachable, commenting

self.producer.list_topics()
and
create_topics(topics=self.topics, config=self.config)
allows successful startup, with connection refused errors in log. Then start

docker run -d -p 9092:9092 --name kafka \
-e KAFKA_ENABLE_KRAFT=yes \
-e KAFKA_CFG_NODE_ID=1 \
-e KAFKA_CFG_PROCESS_ROLES=broker,controller \
-e KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER \
-e KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093 \
-e KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT \
-e KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://127.0.0.1:9092 \
-e KAFKA_BROKER_ID=1 \
-e KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=1@kafka:9093 \
-e ALLOW_PLAINTEXT_LISTENER=yes \
--add-host kafka:127.0.0.1 bitnami/kafka:3.7.0

and observe that error log messages from faststream consumer about connection refused stopped and /docs still responds.

@lecko-cngroup lecko-cngroup added the bug Something isn't working label Apr 11, 2024
@Lancetnik Lancetnik added the Confluent Issues related to `faststream.confluent` module label May 16, 2024
@kumaranvpl kumaranvpl self-assigned this May 24, 2024
@kumaranvpl
Copy link
Contributor

Hello @lecko-cngroup,

This is fixed in version 0.5.14.

from fastapi import Depends, FastAPI
from pydantic import BaseModel

from faststream.confluent.fastapi import KafkaRouter, Logger

router = KafkaRouter("localhost:9099")

class Incoming(BaseModel):
    m: dict

def call():
    return True

@router.subscriber("test")
@router.publisher("response")
async def hello(m: Incoming, logger: Logger, d=Depends(call)):
    logger.info(m)
    return {"response": "Hello, Kafka!"}

@router.get("/")
async def hello_http():
    return "Hello, HTTP!"

app = FastAPI(lifespan=router.lifespan_context)
app.include_router(router)

In the above code example, I intentionally create a router which connects to wrong Kafka port 9099. If you start the above example with uvicorn, the log messages will show the error connection refused but fastapi will still be accessible and /docs will load.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working Confluent Issues related to `faststream.confluent` module
Projects
Archived in project
Development

Successfully merging a pull request may close this issue.

3 participants