Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-7196: Remove heartbeat delayed operation for those removed consumers at the end of each rebalance #5556

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -775,6 +775,7 @@ class GroupCoordinator(val brokerId: Int,
group.inLock {
// remove any members who haven't joined the group yet
group.notYetRejoinedMembers.foreach { failedMember =>
removeHeartbeatForLeavingMember(group, failedMember)
group.remove(failedMember.memberId)
// TODO: cut the socket connection to the client
}
Expand Down
Expand Up @@ -527,11 +527,29 @@ class GroupCoordinatorTest extends JUnitSuite {
heartbeatResult = heartbeat(groupId, firstMemberId, firstGenerationId)
assertEquals(Errors.REBALANCE_IN_PROGRESS, heartbeatResult)

// now timeout the rebalance, which should kick the unjoined member out of the group
// and let the rebalance finish with only the new member
// now timeout the rebalance
timer.advanceClock(500)
val otherJoinResult = await(otherJoinFuture, DefaultSessionTimeout+100)
val otherMemberId = otherJoinResult.memberId
val otherGenerationId = otherJoinResult.generationId
EasyMock.reset(replicaManager)
val syncResult = syncGroupLeader(groupId, otherGenerationId, otherMemberId, Map(otherMemberId -> Array[Byte]()))
assertEquals(Errors.NONE, syncResult._2)

// the unjoined member should be kicked out from the group
assertEquals(Errors.NONE, otherJoinResult.error)
EasyMock.reset(replicaManager)
heartbeatResult = heartbeat(groupId, firstMemberId, firstGenerationId)
assertEquals(Errors.UNKNOWN_MEMBER_ID, heartbeatResult)

// the joined member should get heart beat response with no error. Let the new member keep heartbeating for a while
// to verify that no new rebalance is triggered unexpectedly
for ( _ <- 1 to 20) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you help me understand why we choose 20 iteration and 500 ms per clock advance? What would be the minimum possible iteration to make sure that we catch this bug in the test?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The minimum possible number of iteration to make sure we catch this bug is 3. Because the session timeout is 1000 ms. That means that the heartbeat of the previous consumer should timeout in 1000 ms and that would trigger another unnecessary rebalance if the heartbeat delayed operation is not removed at the end of the first rebalance.

1000 ms is 2 iterations. So for the code to verify the existence or absence of the error, we just need another extra iteration, which is (2 + 1) = 3.

I chose to make it 20 iteration is to verify the consumer group stays stable in a much longer period of time after the rebalance. Plus that running each iteration takes a negligible amount of time.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the explanation. Sounds good.

timer.advanceClock(500)
EasyMock.reset(replicaManager)
heartbeatResult = heartbeat(groupId, otherMemberId, otherGenerationId)
assertEquals(Errors.NONE, heartbeatResult)
}
}

@Test
Expand Down