diff --git a/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala b/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala index 25b5780952f2..c70d0e964150 100644 --- a/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala +++ b/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala @@ -727,6 +727,7 @@ class GroupCoordinator(val brokerId: Int, group.inLock { // remove any members who haven't joined the group yet group.notYetRejoinedMembers.foreach { failedMember => + removeHeartbeatForLeavingMember(group, failedMember) group.remove(failedMember.memberId) // TODO: cut the socket connection to the client } diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala index 3592d6a62585..a68a3d81db08 100644 --- a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala +++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala @@ -469,11 +469,29 @@ class GroupCoordinatorTest extends JUnitSuite { heartbeatResult = heartbeat(groupId, firstMemberId, firstGenerationId) assertEquals(Errors.REBALANCE_IN_PROGRESS, heartbeatResult) - // now timeout the rebalance, which should kick the unjoined member out of the group - // and let the rebalance finish with only the new member + // now timeout the rebalance timer.advanceClock(500) val otherJoinResult = await(otherJoinFuture, DefaultSessionTimeout+100) + val otherMemberId = otherJoinResult.memberId + val otherGenerationId = otherJoinResult.generationId + EasyMock.reset(replicaManager) + val syncResult = syncGroupLeader(groupId, otherGenerationId, otherMemberId, Map(otherMemberId -> Array[Byte]())) + assertEquals(Errors.NONE, syncResult._2) + + // the unjoined member should be kicked out from the group assertEquals(Errors.NONE, otherJoinResult.error) + EasyMock.reset(replicaManager) + heartbeatResult = heartbeat(groupId, firstMemberId, firstGenerationId) + assertEquals(Errors.UNKNOWN_MEMBER_ID, heartbeatResult) + + // the joined member should get heart beat response with no error. Let the new member keep heartbeating for a while + // to verify that no new rebalance is triggered unexpectedly + for ( _ <- 1 to 20) { + timer.advanceClock(500) + EasyMock.reset(replicaManager) + heartbeatResult = heartbeat(groupId, otherMemberId, otherGenerationId) + assertEquals(Errors.NONE, heartbeatResult) + } } @Test