Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rebalancing causes infinite loop in CommitAsync #10

Open
Gonyoda opened this issue Feb 8, 2022 · 3 comments
Open

Rebalancing causes infinite loop in CommitAsync #10

Gonyoda opened this issue Feb 8, 2022 · 3 comments
Assignees

Comments

@Gonyoda
Copy link
Contributor

Gonyoda commented Feb 8, 2022

Using the Confluent kafka adapter, an error during commit causes an infinite loop for some errors.

  • Broker: Unknown member
  • Broker: Specified group generation id is not valid

These errors seem to occur when the consumers are rebalanced and the Parallafka code attempts to commit a message offset. The MessageCommitter.CommitNow method loops endlessly (with a 99ms delay) since the error will never be resolved.

@Gonyoda
Copy link
Contributor Author

Gonyoda commented Feb 8, 2022

Possible solutions:

When a commit failure occurs:
. purge the queue of messages for the partition that failed the commit
. purge all the messages

It is not yet known if, after a rebalance, that the specific consumer can keep messages for a partition that it did not lose, or if it needs to start over. Say a consumer is assigned 3 partitions. A new consumer joins the group and a rebalance occurs and this consumer now is assigned 2 of the 3 original partitions. Should the consumer remove all queued messages? Or just remove the messages from the 3rd partition for which it is no longer assigned?

@Gonyoda
Copy link
Contributor Author

Gonyoda commented Feb 17, 2022

I believe the correct solution is:
when a commit failure occurs due to a repartitioning event, discard all the messages for the partition. This can mean that messages from the partition are re-processed by the new consumer.

@alexreidy alexreidy self-assigned this Apr 12, 2022
@alexreidy
Copy link
Owner

alexreidy commented Apr 18, 2022

SetPartitionsRevokedHandler should make the graceful-rebalance happy path straightforward enough, but we'll still need to support edge cases where we detect partition loss at commit time.

It's not good if it ends up having to detect and purge at commit time - it means there's potential out-of-order handling, so it's kind of a last resort. Picture a bunch of messages "in the pipes" with a shared key. For some reason, maybe a network blip, the consumer no longer owns the partition, and commits fail. Maybe half the messages are handled, and then we purge the queues of that partition's records, and/or it simply stops receiving them. Well, the new partition owner across the cloud might have already consumed all the messages for that key, but the original slower consumer had no idea and "won," handling a middle message last - maybe an Upsert message that should have been followed by a Delete were it not for the rebalance and purge. I think there is a similar theoretical risk in the normal serial poll->handle->commit consumer flow in the edge case that it loses connection during a slow handle step, its partitions are reassigned, and then it finishes handling anyway. Hopefully, with the PartitionsRevokedHandler, we're at no more risk for that kind of race condition than the default Kafka consumer flow.

Need to verify that this callback is called as part of the Poll/Consume call as in Java and that subsequent ConsumeResults are never stale / from a still-revoked partition.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants