New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-8220 & KIP-345 part-3: Avoid kicking out members through rebalance timeout #6666
Conversation
Retest this please |
@guozhangwang @hachikuji when you got time :) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@abbccdda Thanks for the PR. Changes look good to me, but it will be good to get a review from @hachikuji before merging in case I missed something.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@abbccdda Thanks for the PR. Changes look good to me, but it will be good to get a review from @hachikuji before merging in case I missed something.
@rajinisivaram Thanks a lot for the review! Yea, pinging @hachikuji @guozhangwang again since we got your green light lol. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @abbccdda. I just had a question.
core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
Outdated
Show resolved
Hide resolved
core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
Outdated
Show resolved
Hide resolved
@@ -244,6 +244,25 @@ private[group] class GroupMetadata(val groupId: String, initialState: GroupState | |||
leaderId = members.keys.headOption | |||
} | |||
|
|||
def maybeElectNewLeader() { | |||
leaderId match { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can use foreach
instead of match
if there is nothing to do in the None
case
core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala
Outdated
Show resolved
Hide resolved
Retest this please |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, left a few more comments.
core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
Outdated
Show resolved
Hide resolved
core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
Outdated
Show resolved
Hide resolved
// until session timeout removes all the non-responsive members. | ||
joinPurgatory.tryCompleteElseWatch( | ||
new DelayedJoin(this, group, group.rebalanceTimeoutMs), | ||
Seq(group.allMembers.headOption) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure I get this. Why are we using the memberId as the purgatory key?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we could use a dummy key here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In fact, I do realize the purpose here. If this is last delayed join expiration, we will not trigger onJoinComplete
again only depending on heartbeat timeout. So we have to constantly check for group emptiness
core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala
Outdated
Show resolved
Hide resolved
core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala
Outdated
Show resolved
Hide resolved
core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala
Outdated
Show resolved
Hide resolved
assertGroupState(groupState = PreparingRebalance) | ||
|
||
timer.advanceClock(DefaultRebalanceTimeout + 1) | ||
// Only static leader is maintained, and group is stuck at PreparingRebalance stage |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@hachikuji This test case covers the case where all static members are not joined but haven't session timeout yet. No new generation shall be bumped.
ea8cfcc
to
11d8866
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the updates. Just a few more comments.
core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
Outdated
Show resolved
Hide resolved
core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
Outdated
Show resolved
Hide resolved
core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala
Outdated
Show resolved
Hide resolved
core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala
Outdated
Show resolved
Hide resolved
core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala
Outdated
Show resolved
Hide resolved
d1f4bca
to
bc9832d
Compare
Retest this please |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, just a couple more comments.
core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala
Outdated
Show resolved
Hide resolved
core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
Show resolved
Hide resolved
assertEquals(2, getGroup(groupId).generationId) | ||
assertGroupState(groupState = PreparingRebalance) | ||
|
||
timer.advanceClock(DefaultRebalanceTimeout + 1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess another case is if the static member rejoins instead of timing out. Do you think this is worth another test case?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we have many static member rejoining during PrepareRebalance
tests. This should be ok to skip.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, my thought is to exercise the "dummy" rebalance logic completely.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds good, I got a new test case branching from the existing one @hachikuji
82bd222
to
16de0b4
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. Thanks for the patch.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM! Just some minor questions.
// of rebalance preparing stage, and send out another delayed operation | ||
// until session timeout removes all the non-responsive members. | ||
error(s"Group ${group.groupId} could not complete rebalance because no members rejoined") | ||
joinPurgatory.tryCompleteElseWatch( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Personally I'd prefer to have such logic in DelayedJoin instead, i.e. to re-write the current
override def onComplete() = coordinator.onCompleteJoin(group)
code. Similar to the extended InitialDelayedJoin#onComplete
.
@@ -480,11 +480,11 @@ class GroupCoordinatorTest extends JUnitSuite { | |||
|
|||
@Test | |||
def staticMemberRejoinWithLeaderIdAndKnownMemberId() { | |||
val rebalanceResult = staticMembersJoinAndRebalance(leaderInstanceId, followerInstanceId) | |||
val rebalanceResult = staticMembersJoinAndRebalance(leaderInstanceId, followerInstanceId, sessionTimeout = DefaultRebalanceTimeout / 2) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not clear why we want to override to half of the value?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We are just trying to make sure we timeout the follower instance because default session timeout = default rebalance timeout
@@ -522,19 +522,21 @@ class GroupCoordinatorTest extends JUnitSuite { | |||
|
|||
@Test | |||
def staticMemberRejoinWithFollowerIdAndChangeOfProtocol() { | |||
val rebalanceResult = staticMembersJoinAndRebalance(leaderInstanceId, followerInstanceId) | |||
val rebalanceResult = staticMembersJoinAndRebalance(leaderInstanceId, followerInstanceId, sessionTimeout = DefaultSessionTimeout * 2) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ditto, why double the value here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Doubling is aiming to reproduce the scenario where rebalanceTImeout
< sessionTimeout
. I could write comments for both scenarios.
} | ||
|
||
@Test | ||
def testStaticMemberFollowerFailToRejoinBeforeRebalanceTimeout() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for adding these two test cases for improving the coverage!
EasyMock.reset(replicaManager) | ||
heartbeatResult = heartbeat(groupId, firstMemberId, firstGenerationId) | ||
assertEquals(Errors.REBALANCE_IN_PROGRESS, heartbeatResult) | ||
var expectedResultList = List(Errors.REBALANCE_IN_PROGRESS, Errors.REBALANCE_IN_PROGRESS) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice refactoring.
…timeout (apache#6666) To make static consumer group members more persistent, we want to avoid kicking out unjoined members through rebalance timeout. Essentially we allow static members to participate in a rebalance using their old subscription without sending a JoinGroup. The only catch is that an unjoined static member might be the current group leader, and we may need to elect a different leader. Reviewers: Rajini Sivaram <rajinisivaram@googlemail.com>, Jason Gustafson <jason@confluent.io>
To make consumer group members more persist, we want to avoid kick-out unjoined members through rebalance timeout. The only exception is when leader fails to join, because we will at risk of no assignment computed during sync stage. The choice will be kicking off non-responsive leader and choose a new leader if possible.
Committer Checklist (excluded from commit message)