Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bug: OffsetCommit failed for mixed subscribtion on Kafka topic #1571

Closed
dem214 opened this issue Jul 5, 2024 · 4 comments
Closed

Bug: OffsetCommit failed for mixed subscribtion on Kafka topic #1571

dem214 opened this issue Jul 5, 2024 · 4 comments
Labels
AioKafka Issues related to `faststream.kafka` module bug Something isn't working

Comments

@dem214
Copy link

dem214 commented Jul 5, 2024

Describe the bug
When subscribing to a topic through a coordinator and to partitions directly using one group_id, an error occurs, making consuming of messages is imposible.

How to reproduce
Include source code:

import logging

from faststream import FastStream
from faststream.kafka import KafkaRouter, TopicPartition, KafkaBroker

router = KafkaRouter()


@router.subscriber(
    't1',
    group_id='cg1',
)
async def handle_group(msg):
    print('handle_group')


@router.subscriber(
    group_id='cg1',
    partitions=[TopicPartition('t2', 0), TopicPartition('t2', 1)]
)
async def handle_partition(msg):
    print('handle_partition')


broker = KafkaBroker(
    log_level=logging.INFO,
    graceful_timeout=5.0,
)
broker.include_router(router, prefix='rtr_')

app = FastStream(broker)

And/Or steps to reproduce the behavior:

  1. ...

Expected behavior
Works fine and consume messages from both topics

Observed behavior
Got an error in logs:

24-07-05 16:52:04,221 INFO     - FastStream app starting...
2024-07-05 16:52:04,228 INFO     - rtr_t1            | cg1 |            - `HandleGroup` waiting for messages
Topic rtr_t1 not found in cluster metadata
2024-07-05 16:52:04,343 INFO     - rtr_t2-0,rtr_t2-1 | cg1 |            - `HandlePartition` waiting for messages
Topic rtr_t2 not found in cluster metadata
2024-07-05 16:52:04,451 INFO     - FastStream app started successfully! To exit, press CTRL+C
OffsetCommit failed for group cg1 due to group error ([Error 25] UnknownMemberIdError: cg1), will rejoin
OffsetCommit failed for group cg1 due to group error ([Error 25] UnknownMemberIdError: cg1), will rejoin
Auto offset commit failed: [Error 25] UnknownMemberIdError: cg1
OffsetCommit failed for group cg1 due to group error ([Error 25] UnknownMemberIdError: cg1), will rejoin
OffsetCommit failed for group cg1 due to group error ([Error 25] UnknownMemberIdError: cg1), will rejoin
Auto offset commit failed: [Error 25] UnknownMemberIdError: cg1
OffsetCommit failed for group cg1 due to group error ([Error 25] UnknownMemberIdError: cg1), will rejoin
OffsetCommit failed for group cg1 due to group error ([Error 25] UnknownMemberIdError: cg1), will rejoin
Auto offset commit failed: [Error 25] UnknownMemberIdError: cg1

Environment
Running FastStream 0.5.14 with CPython 3.11.4 on Linux

@dem214 dem214 added the bug Something isn't working label Jul 5, 2024
@Lancetnik Lancetnik added the AioKafka Issues related to `faststream.kafka` module label Jul 5, 2024
@spataphore1337
Copy link
Contributor

Your example is inherently flawed because you are specifying two different topics for the same consumer group. Use a single topic for one consumer group.

@Lancetnik
Copy link
Member

Seems like you are using Kafka in a wrong way and this problem does not related to FastStream iself. Please, reopen the Issue if you don't agree

@Lancetnik Lancetnik closed this as not planned Won't fix, can't repro, duplicate, stale Aug 27, 2024
@dem214
Copy link
Author

dem214 commented Aug 27, 2024

Is there a problem to use one consumer group ID for several Kafka topics? I didn't find any restrictions in the Kafka architecture or its documentation. If you have any other information, please let me know.

@kumaranvpl
Copy link
Contributor

Is there a problem to use one consumer group ID for several Kafka topics? I didn't find any restrictions in the Kafka architecture or its documentation. If you have any other information, please let me know.

This looks like an issue with aiokafka rather than an issue with FastStream or a restriction from Kafka. There are multiple issues from multiple time period indicating the same issue in aiokafka - aio-libs/aiokafka#880, aio-libs/aiokafka#575, aio-libs/aiokafka#727.

@dem214 The example you have mentioned works with confluent-kafka library without any change.

import asyncio
import logging

from faststream import FastStream
# import from faststream.confluent instead of faststream.kafka
from faststream.confluent import KafkaRouter, TopicPartition, KafkaBroker

router = KafkaRouter()


@router.subscriber(
    't1',
    group_id='cg1',
)
async def handle_group(msg):
    print('handle_group')


@router.subscriber(
    group_id='cg1',
    partitions=[TopicPartition('t2', 0), TopicPartition('t2', 1)]
)
async def handle_partition(msg):
    print('handle_partition')


broker = KafkaBroker(
    log_level=logging.INFO,
    graceful_timeout=5.0,
)
broker.include_router(router, prefix='rtr_')

app = FastStream(broker)

# Added to publish a message to topics rtr_t1 and rtr_t2
@app.after_startup
async def publish_msgs() -> None:
    async def _publish_msgs() -> None:
        await asyncio.sleep(5)
        print("Publishing messages")
        await broker.publish({"name": "Alice"}, topic="rtr_t1")
        await broker.publish({"name": "Bob"}, topic="rtr_t2")

    asyncio.create_task(_publish_msgs())

If you need the behaviour you have mentioned, you could use confluent-kafka instead of aiokafka.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
AioKafka Issues related to `faststream.kafka` module bug Something isn't working
Projects
None yet
Development

No branches or pull requests

4 participants