Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-14016: Revoke more partitions than expected in Cooperative rebalance #12348

Closed

Conversation

aiquestion
Copy link
Contributor

@aiquestion aiquestion commented Jun 26, 2022

With latest trunk branch's code we found that in Cooperative rebalance consumer will revoke more partitions than expected. Details here https://issues.apache.org/jira/browse/KAFKA-14016

So i want to start a PR to discuss the fix code. test will be added later.

Summary of testing strategy (including rationale)
for the feature or bug fix. Unit and/or integration
tests are expected for any behaviour change and
system tests should be considered for larger changes.

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

Copy link
Contributor

@showuon showuon left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The timing consumer got REBALANCE_IN_PROGRESS is when current reblance round completed join_group, the group state changed to completingRebalance. And during this time, some reason causes rebalancing triggered again (ex: new member joined), so the group state change to preparingRebalancing. Thus, any sync_group sent this time will get REBALANCE_IN_PROGRESS. And this error will not happen when this consumer send sync_group with old generation id, otherwise, ILLEGAL_GENERATION error will receive.

So, we want to make sure the ownedPartitions in consumers are up-to-date. And if the consumer leader already calculated the assignment in this round, we should distribute them to consumers, even though we already started next round of rebalance.

Is my understanding correct?

If so, I think I agree with the solution. But I'd like to hear @guozhangwang 's opinion.

Thanks.

@aiquestion
Copy link
Contributor Author

@showuon Yes.
And Cooperative rebalance will make REBALANCE_IN_PROGRESS happens more often, because: after every consumer joined the group (joinGroupResponse) and leader send syncGroup, group state will change to Stable. If some consumer call send syncGroup and got the assignment and cause it to revoke partition, it will do a re-join according to the Cooperative protocol, and group state will change to preparingRebalancing.

@dajac
Copy link
Contributor

dajac commented Jun 28, 2022

@aiquestion Thanks for the patch. Before settling in on the best approach to fix this, could we start by adding a unit test which reproduces the issue?

Could you explain why we need to revert https://issues.apache.org/jira/browse/KAFKA-13891?

In the scenario you describe in the Jira:

  • consumer A1-A10 (ten consumers) joined and synced group successfully with generation 1
  • New consumer B1 joined and start a rebalance
  • all consumer joined successfully and then A1 need to revoke partition to transfer to B1
  • A1 do a very quick syncGroup and re-join, because it revoked partition
  • A2-A10 didn't send syncGroup before A1 re-join, so after the send syncGruop, will get REBALANCE_IN_PROGRESS
  • A2-A10 will revoke there partitions and re-join

I suppose that this scenario only works if A1 is the leader, right? Otherwise, A1 would not have received the sync group response.

@aiquestion
Copy link
Contributor Author

@dajac okay, will add ut for it first.
It not only happens when A1 is leader and it re-join quickly. If A1 is leader and A2 re-join quickly, other consumer will still revoke their partitions. I refined my example to include a leader:

  • consumer A1-A10 (ten consumers) joined and synced group successfully with generation 1 (A1 is the leader)
  • New consumer B1 joined and start a rebalance
  • all consumer joined successfully and then A1 send syncGroup request and group changed to Stable
  • A2 send syncGroup request, and got assignment which need to revoke partition to transfer to B1
  • A2 do a very quick syncGroup and re-join, because it revoked partition. Group state then changed to PreparingRebalance.
  • A3-A10 didn't send syncGroup before A2 re-join, so after the send syncGruop, will get REBALANCE_IN_PROGRESS
  • A3-A10 will revoke there partitions and re-join

@dajac
Copy link
Contributor

dajac commented Jun 30, 2022

@aiquestion Yeah, this is what I thought. All the members rejoining after the leader are concerned.

I think that the fundamental issue here is that we don't really enforce the synchronization barrier after each rebalance in the cooperative mode. We consider the rebalance completed as soon as the assignment provided by the leader are persisted and transition the group to Stable. The barrier is loose in a sense.

So an alternative approach to your current proposal would be to really enforce that synchronization barrier. We could basically release the sync-group responses only when all the members are there instead of doing it when assignment is persisted. The down side is that it would also impact the eager mode which does not really require this.

@dajac
Copy link
Contributor

dajac commented Jul 14, 2022

@aiquestion Any update on this one?

@aiquestion
Copy link
Contributor Author

@dajac sorry for the delay.
We don't have synchronization barrier for syncGroup, but we do have a synchronization barrier for joinGroup. So if a consumer's syncGroup get 'REBALNCE_IN_PROGRESS' it should have successfully do joinGroup, so it will only missed 1 round of assignment if leader have syncGroup or will not missed any assignment if another round begin before leader do syncGroup. (If it didn't joinGroup successfully, a 'UNKNOW_MEMBER_ID' will get in syncGroup)

So i think returning assignment along with 'REBALNCE_IN_PROGRESS' can be a fix for it.
But as @guozhangwang said in KAFKA-14016, this will need broker upgrade. Since you are refactoring the rebalance protocol, a client side change that can workaround it maybe a good choice for this.

i think we can just update the assignment's generation if consumer get a 'REBALNCE_IN_PROGRESS' error in syncGroup.

  • no partition will be dup assigned because the assignment in this consumer will not be assign to other before it revoked. And our code is just to make the revoke didn't work.
  • In normal cases only the Consumer which need to revoke partition will trigger a re-join, the total partitions need to be revoked is decreasing and will finally get to a Stable.

WDYT?

@dajac
Copy link
Contributor

dajac commented May 23, 2023

Fixed by c6ad151. Closing it.

@dajac dajac closed this May 23, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
3 participants