From cd2277069d355d2a9839c97265448b5a883feb89 Mon Sep 17 00:00:00 2001 From: Lincong Li Date: Thu, 4 Oct 2018 09:14:44 -0700 Subject: [PATCH] KAFKA-7196; Remove heartbeat delayed operation for those removed consumers at the end of each rebalance During the consumer group rebalance, when the joining group phase finishes, the heartbeat delayed operation of the consumer that fails to rejoin the group should be removed from the purgatory. Otherwise, even though the member ID of the consumer has been removed from the group, its heartbeat delayed operation is still registered in the purgatory and the heartbeat delayed operation is going to timeout and then another unnecessary rebalance is triggered because of it. Author: Lincong Li Reviewers: Dong Lin Closes #5556 from Lincong/remove_heartbeat_delayedOperation (cherry picked from commit 260b07a6da070e6312443fb7cc6b937bef2865ea) Signed-off-by: Dong Lin --- .../coordinator/group/GroupCoordinator.scala | 1 + .../group/GroupCoordinatorTest.scala | 22 +++++++++++++++++-- 2 files changed, 21 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala b/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala index 25b5780952f2..c70d0e964150 100644 --- a/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala +++ b/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala @@ -727,6 +727,7 @@ class GroupCoordinator(val brokerId: Int, group.inLock { // remove any members who haven't joined the group yet group.notYetRejoinedMembers.foreach { failedMember => + removeHeartbeatForLeavingMember(group, failedMember) group.remove(failedMember.memberId) // TODO: cut the socket connection to the client } diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala index 3592d6a62585..a68a3d81db08 100644 --- a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala +++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala @@ -469,11 +469,29 @@ class GroupCoordinatorTest extends JUnitSuite { heartbeatResult = heartbeat(groupId, firstMemberId, firstGenerationId) assertEquals(Errors.REBALANCE_IN_PROGRESS, heartbeatResult) - // now timeout the rebalance, which should kick the unjoined member out of the group - // and let the rebalance finish with only the new member + // now timeout the rebalance timer.advanceClock(500) val otherJoinResult = await(otherJoinFuture, DefaultSessionTimeout+100) + val otherMemberId = otherJoinResult.memberId + val otherGenerationId = otherJoinResult.generationId + EasyMock.reset(replicaManager) + val syncResult = syncGroupLeader(groupId, otherGenerationId, otherMemberId, Map(otherMemberId -> Array[Byte]())) + assertEquals(Errors.NONE, syncResult._2) + + // the unjoined member should be kicked out from the group assertEquals(Errors.NONE, otherJoinResult.error) + EasyMock.reset(replicaManager) + heartbeatResult = heartbeat(groupId, firstMemberId, firstGenerationId) + assertEquals(Errors.UNKNOWN_MEMBER_ID, heartbeatResult) + + // the joined member should get heart beat response with no error. Let the new member keep heartbeating for a while + // to verify that no new rebalance is triggered unexpectedly + for ( _ <- 1 to 20) { + timer.advanceClock(500) + EasyMock.reset(replicaManager) + heartbeatResult = heartbeat(groupId, otherMemberId, otherGenerationId) + assertEquals(Errors.NONE, heartbeatResult) + } } @Test