Skip to content

Commit

Permalink
KAFKA-7196; Remove heartbeat delayed operation for those removed cons…
Browse files Browse the repository at this point in the history
…umers at the end of each rebalance

During the consumer group rebalance, when the joining group phase finishes, the heartbeat delayed operation of the consumer that fails to rejoin the group should be removed from the purgatory. Otherwise, even though the member ID of the consumer has been removed from the group, its heartbeat delayed operation is still registered in the purgatory and the heartbeat delayed operation is going to timeout and then another unnecessary rebalance is triggered because of it.

Author: Lincong Li <lcli@linkedin.com>

Reviewers: Dong Lin <lindong28@gmail.com>

Closes #5556 from Lincong/remove_heartbeat_delayedOperation

(cherry picked from commit 260b07a)
Signed-off-by: Dong Lin <lindong28@gmail.com>
  • Loading branch information
Lincong Li authored and lindong28 committed Oct 4, 2018
1 parent 8e6ffd2 commit cd22770
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 2 deletions.
Expand Up @@ -727,6 +727,7 @@ class GroupCoordinator(val brokerId: Int,
group.inLock {
// remove any members who haven't joined the group yet
group.notYetRejoinedMembers.foreach { failedMember =>
removeHeartbeatForLeavingMember(group, failedMember)
group.remove(failedMember.memberId)
// TODO: cut the socket connection to the client
}
Expand Down
Expand Up @@ -469,11 +469,29 @@ class GroupCoordinatorTest extends JUnitSuite {
heartbeatResult = heartbeat(groupId, firstMemberId, firstGenerationId)
assertEquals(Errors.REBALANCE_IN_PROGRESS, heartbeatResult)

// now timeout the rebalance, which should kick the unjoined member out of the group
// and let the rebalance finish with only the new member
// now timeout the rebalance
timer.advanceClock(500)
val otherJoinResult = await(otherJoinFuture, DefaultSessionTimeout+100)
val otherMemberId = otherJoinResult.memberId
val otherGenerationId = otherJoinResult.generationId
EasyMock.reset(replicaManager)
val syncResult = syncGroupLeader(groupId, otherGenerationId, otherMemberId, Map(otherMemberId -> Array[Byte]()))
assertEquals(Errors.NONE, syncResult._2)

// the unjoined member should be kicked out from the group
assertEquals(Errors.NONE, otherJoinResult.error)
EasyMock.reset(replicaManager)
heartbeatResult = heartbeat(groupId, firstMemberId, firstGenerationId)
assertEquals(Errors.UNKNOWN_MEMBER_ID, heartbeatResult)

// the joined member should get heart beat response with no error. Let the new member keep heartbeating for a while
// to verify that no new rebalance is triggered unexpectedly
for ( _ <- 1 to 20) {
timer.advanceClock(500)
EasyMock.reset(replicaManager)
heartbeatResult = heartbeat(groupId, otherMemberId, otherGenerationId)
assertEquals(Errors.NONE, heartbeatResult)
}
}

@Test
Expand Down

0 comments on commit cd22770

Please sign in to comment.