Skip to content

This issue was moved to a discussion.

You can continue the conversation there. Go to discussion →

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[QUESTION] How to deal with the background rebalance process #929

Closed
vmaurin opened this issue Oct 20, 2023 · 0 comments
Closed

[QUESTION] How to deal with the background rebalance process #929

vmaurin opened this issue Oct 20, 2023 · 0 comments
Labels

Comments

@vmaurin
Copy link
Contributor

vmaurin commented Oct 20, 2023

Hi !

In my company, we are using quite a lot aiokafka : for basic producer/consumer loop, but we also built our own streaming lib on top of it (with statefull transformer).

We stumble quite often on issue related to have the rebalance logic done in a background coroutine, like it is explained here https://aiokafka.readthedocs.io/en/stable/kafka-python_difference.html#rebalances-are-happening-in-the-background

Is has a lot consequences, and makes some classic patterns wrong.

The simplest example is the "at least once" consumer loop

The pattern we did, like in a java client was, like in the example https://aiokafka.readthedocs.io/en/stable/examples/manual_commit.html :

while True:
    records = consumer.getmany(...)
    await some_processing(records)
    consumer.commit()

But this kind of loop will fail if the background group rebalance routine is executing during the processing. When reaching the commit, you will get a CommitFailedError. If not catched, the worker will die, then a new rebalance will be triggering, causing again some rebalance, and so on. We are also under the impression that the "commit" without argument could be risky as subscription could have change in between

We have end up implementing the above pattern like this

while True:
    records = consumer.getmany(...)
    offsets = {}
    for tp, messages in records.items():
        offsets[tp] = messages[-1].offset + 1
    await some_processing(records)
    try:
        consumer.commit(offsets)
    except (CommitFailedError, IllegalStateError) as err:
        logger.warning("Commit failed due to rebalancing, circle back to consume new messages",  exc_info=err)

Is this pattern correct ? Then should the example in the documentation highlights this behavior ? Should aiokafka provides helpers for such pattern ? max_poll_interval_ms documentation and related error messages are also wrong and confusing (like the above will suggest to change max_poll_interval_ms but it won't help, as it is happening when the processing time is below max_poll_interval_ms too)

In the past, we also tried to mutually exclude the rebalance process from the consumer loop with locks, but then it is blocking also the heartbeat and makes it complicated to maintain a consumer alive.

@aio-libs aio-libs locked and limited conversation to collaborators Oct 25, 2023
@ods ods converted this issue into discussion #935 Oct 25, 2023

This issue was moved to a discussion.

You can continue the conversation there. Go to discussion →

Labels
Projects
None yet
Development

No branches or pull requests

1 participant