Skip to content

Commit

Permalink
KAFKA-9885; Evict last members of a group when the maximum allowed is…
Browse files Browse the repository at this point in the history
… reached (#8525)

This PR updates the algorithm which limits the number of members within a group (`group.max.size`) to fix the following two issues:
1. As described in KAFKA-9885, we found out that multiple members of a group can be evicted if the leader of the consumer offset partition changes before the group is persisted. This happens because the current eviction logic always evict the first member rejoining the group.
2. We also found out that dynamic members, when required to have a known member id, are not always limited. The caveat is that the current logic only considers unknown members and uses the group size, which does not include the so called pending members, to accept or reject a member. In this case, when they rejoins, they are not unknown member anymore and thus could bypass the limit. See `testDynamicMembersJoinGroupWithMaxSizeAndRequiredKnownMember` for the whole scenario.

This PR changes the logic to address the above two issues and extends the tests coverage to cover all the member types.

Reviewers: Jason Gustafson <jason@confluent.io>
  • Loading branch information
dajac committed Apr 27, 2020
1 parent db9e55a commit c5d13dc
Show file tree
Hide file tree
Showing 3 changed files with 234 additions and 4 deletions.
36 changes: 32 additions & 4 deletions core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,35 @@ class GroupCoordinator(val brokerId: Int,
info("Shutdown complete.")
}

/**
* Verify if the group has space to accept the joining member. The various
* criteria are explained below.
*/
private def acceptJoiningMember(group: GroupMetadata, member: String): Boolean = {
group.currentState match {
// Always accept the request when the group is empty or dead
case Empty | Dead =>
true

// An existing member is accepted if it is already awaiting. New members are accepted
// up to the max group size. Note that the number of awaiting members is used here
// for two reasons:
// 1) the group size is not reliable as it could already be above the max group size
// if the max group size was reduced.
// 2) using the number of awaiting members allows to kick out the last rejoining
// members of the group.
case PreparingRebalance =>
(group.has(member) && group.get(member).isAwaitingJoin) ||
group.numAwaiting < groupConfig.groupMaxSize

// An existing member is accepted. New members are accepted up to the max group size.
// Note that the group size is used here. When the group transitions to CompletingRebalance,
// members which haven't rejoined are removed.
case CompletingRebalance | Stable =>
group.has(member) || group.size < groupConfig.groupMaxSize
}
}

def handleJoinGroup(groupId: String,
memberId: String,
groupInstanceId: Option[String],
Expand Down Expand Up @@ -152,9 +181,7 @@ class GroupCoordinator(val brokerId: Int,
responseCallback(JoinGroupResult(memberId, Errors.UNKNOWN_MEMBER_ID))
case Some(group) =>
group.inLock {
if ((groupIsOverCapacity(group)
&& group.has(memberId) && !group.get(memberId).isAwaitingJoin) // oversized group, need to shed members that haven't joined yet
|| (isUnknownMember && group.size >= groupConfig.groupMaxSize)) {
if (!acceptJoiningMember(group, memberId)) {
group.remove(memberId)
group.removeStaticMember(groupInstanceId)
responseCallback(JoinGroupResult(JoinGroupRequest.UNKNOWN_MEMBER_ID, Errors.GROUP_MAX_SIZE_REACHED))
Expand Down Expand Up @@ -1051,7 +1078,8 @@ class GroupCoordinator(val brokerId: Int,
}
}

private def prepareRebalance(group: GroupMetadata, reason: String): Unit = {
// package private for testing
private[group] def prepareRebalance(group: GroupMetadata, reason: String): Unit = {
// if any members are awaiting sync, cancel their request and have them rejoin
if (group.is(CompletingRebalance))
resetAndPropagateAssignmentError(group, Errors.REBALANCE_IN_PROGRESS)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -363,6 +363,8 @@ private[group] class GroupMetadata(val groupId: String, initialState: GroupState

def numPending = pendingMembers.size

def numAwaiting: Int = numMembersAwaitingJoin

def allMemberMetadata = members.values.toList

def rebalanceTimeoutMs = members.values.foldLeft(0) { (timeout, member) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,206 @@ class GroupCoordinatorTest {
assertEquals(Errors.GROUP_MAX_SIZE_REACHED, await(errorFuture, 1).error)
}

@Test
def testDynamicMembersJoinGroupWithMaxSizeAndRequiredKnownMember(): Unit = {
val requiredKnownMemberId = true
val nbMembers = GroupMaxSize + 1

// First JoinRequests
var futures = 1.to(nbMembers).map { _ =>
EasyMock.reset(replicaManager)
sendJoinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, protocolType, protocols,
None, DefaultSessionTimeout, DefaultRebalanceTimeout, requiredKnownMemberId)
}

// Get back the assigned member ids
val memberIds = futures.map(await(_, 1).memberId)

// Second JoinRequests
futures = memberIds.map { memberId =>
EasyMock.reset(replicaManager)
sendJoinGroup(groupId, memberId, protocolType, protocols,
None, DefaultSessionTimeout, DefaultRebalanceTimeout, requiredKnownMemberId)
}

// advance clock by GroupInitialRebalanceDelay to complete first InitialDelayedJoin
timer.advanceClock(GroupInitialRebalanceDelay + 1)
// advance clock by GroupInitialRebalanceDelay to complete second InitialDelayedJoin
timer.advanceClock(GroupInitialRebalanceDelay + 1)

// Awaiting results
val errors = futures.map(await(_, DefaultRebalanceTimeout + 1).error)

assertEquals(GroupMaxSize, errors.count(_ == Errors.NONE))
assertEquals(nbMembers-GroupMaxSize, errors.count(_ == Errors.GROUP_MAX_SIZE_REACHED))

// Members which were accepted can rejoin, others are rejected, while
// completing rebalance
futures = memberIds.map { memberId =>
EasyMock.reset(replicaManager)
sendJoinGroup(groupId, memberId, protocolType, protocols,
None, DefaultSessionTimeout, DefaultRebalanceTimeout, requiredKnownMemberId)
}

// Awaiting results
val rejoinErrors = futures.map(await(_, 1).error)

assertEquals(errors, rejoinErrors)
}

@Test
def testDynamicMembersJoinGroupWithMaxSize(): Unit = {
val requiredKnownMemberId = false
val nbMembers = GroupMaxSize + 1

// JoinRequests
var futures = 1.to(nbMembers).map { _ =>
EasyMock.reset(replicaManager)
sendJoinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, protocolType, protocols,
None, DefaultSessionTimeout, DefaultRebalanceTimeout, requiredKnownMemberId)
}

// advance clock by GroupInitialRebalanceDelay to complete first InitialDelayedJoin
timer.advanceClock(GroupInitialRebalanceDelay + 1)
// advance clock by GroupInitialRebalanceDelay to complete second InitialDelayedJoin
timer.advanceClock(GroupInitialRebalanceDelay + 1)

// Awaiting results
val joinGroupResults = futures.map(await(_, DefaultRebalanceTimeout + 1))
val errors = joinGroupResults.map(_.error)

assertEquals(GroupMaxSize, errors.count(_ == Errors.NONE))
assertEquals(nbMembers-GroupMaxSize, errors.count(_ == Errors.GROUP_MAX_SIZE_REACHED))

// Members which were accepted can rejoin, others are rejected, while
// completing rebalance
val memberIds = joinGroupResults.map(_.memberId)
futures = memberIds.map { memberId =>
EasyMock.reset(replicaManager)
sendJoinGroup(groupId, memberId, protocolType, protocols,
None, DefaultSessionTimeout, DefaultRebalanceTimeout, requiredKnownMemberId)
}

// Awaiting results
val rejoinErrors = futures.map(await(_, 1).error)

assertEquals(errors, rejoinErrors)
}

@Test
def testStaticMembersJoinGroupWithMaxSize(): Unit = {
val nbMembers = GroupMaxSize + 1
val instanceIds = 1.to(nbMembers).map(i => Some(s"instance-id-$i"))

// JoinRequests
var futures = instanceIds.map { instanceId =>
EasyMock.reset(replicaManager)
sendJoinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, protocolType, protocols,
instanceId, DefaultSessionTimeout, DefaultRebalanceTimeout)
}

// advance clock by GroupInitialRebalanceDelay to complete first InitialDelayedJoin
timer.advanceClock(GroupInitialRebalanceDelay + 1)
// advance clock by GroupInitialRebalanceDelay to complete second InitialDelayedJoin
timer.advanceClock(GroupInitialRebalanceDelay + 1)

// Awaiting results
val joinGroupResults = futures.map(await(_, DefaultRebalanceTimeout + 1))
val errors = joinGroupResults.map(_.error)

assertEquals(GroupMaxSize, errors.count(_ == Errors.NONE))
assertEquals(nbMembers-GroupMaxSize, errors.count(_ == Errors.GROUP_MAX_SIZE_REACHED))

// Members which were accepted can rejoin, others are rejected, while
// completing rebalance
val memberIds = joinGroupResults.map(_.memberId)
futures = instanceIds.zip(memberIds).map { case (instanceId, memberId) =>
EasyMock.reset(replicaManager)
sendJoinGroup(groupId, memberId, protocolType, protocols,
instanceId, DefaultSessionTimeout, DefaultRebalanceTimeout)
}

// Awaiting results
val rejoinErrors = futures.map(await(_, 1).error)

assertEquals(errors, rejoinErrors)
}

@Test
def testDynamicMembersCanReJoinGroupWithMaxSizeWhileRebalancing(): Unit = {
val requiredKnownMemberId = true
val nbMembers = GroupMaxSize + 1

// First JoinRequests
var futures = 1.to(nbMembers).map { _ =>
EasyMock.reset(replicaManager)
sendJoinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, protocolType, protocols,
None, DefaultSessionTimeout, DefaultRebalanceTimeout, requiredKnownMemberId)
}

// Get back the assigned member ids
val memberIds = futures.map(await(_, 1).memberId)

// Second JoinRequests
memberIds.map { memberId =>
EasyMock.reset(replicaManager)
sendJoinGroup(groupId, memberId, protocolType, protocols,
None, DefaultSessionTimeout, DefaultRebalanceTimeout, requiredKnownMemberId)
}

// Members can rejoin while rebalancing
futures = memberIds.map { memberId =>
EasyMock.reset(replicaManager)
sendJoinGroup(groupId, memberId, protocolType, protocols,
None, DefaultSessionTimeout, DefaultRebalanceTimeout, requiredKnownMemberId)
}

// advance clock by GroupInitialRebalanceDelay to complete first InitialDelayedJoin
timer.advanceClock(GroupInitialRebalanceDelay + 1)
// advance clock by GroupInitialRebalanceDelay to complete second InitialDelayedJoin
timer.advanceClock(GroupInitialRebalanceDelay + 1)

// Awaiting results
val errors = futures.map(await(_, DefaultRebalanceTimeout + 1).error)

assertEquals(GroupMaxSize, errors.count(_ == Errors.NONE))
assertEquals(nbMembers-GroupMaxSize, errors.count(_ == Errors.GROUP_MAX_SIZE_REACHED))
}

@Test
def testLastJoiningMembersAreKickedOutWhenReJoiningGroupWithMaxSize(): Unit = {
val nbMembers = GroupMaxSize + 2
val group = new GroupMetadata(groupId, Stable, new MockTime())
val memberIds = 1.to(nbMembers).map(_ => group.generateMemberId(ClientId, None))

memberIds.foreach { memberId =>
group.add(new MemberMetadata(memberId, groupId, None, ClientId, ClientHost,
DefaultRebalanceTimeout, GroupMaxSessionTimeout, protocolType, protocols))
}
groupCoordinator.groupManager.addGroup(group)

groupCoordinator.prepareRebalance(group, "")

val futures = memberIds.map { memberId =>
EasyMock.reset(replicaManager)
sendJoinGroup(groupId, memberId, protocolType, protocols,
None, GroupMaxSessionTimeout, DefaultRebalanceTimeout)
}

// advance clock by GroupInitialRebalanceDelay to complete first InitialDelayedJoin
timer.advanceClock(DefaultRebalanceTimeout + 1)

// Awaiting results
val errors = futures.map(await(_, DefaultRebalanceTimeout + 1).error)

assertEquals(Set(Errors.NONE), errors.take(GroupMaxSize).toSet)
assertEquals(Set(Errors.GROUP_MAX_SIZE_REACHED), errors.drop(GroupMaxSize).toSet)

memberIds.drop(GroupMaxSize).foreach { memberId =>
assertFalse(group.has(memberId))
}
}

@Test
def testJoinGroupSessionTimeoutTooSmall(): Unit = {
val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID
Expand Down

0 comments on commit c5d13dc

Please sign in to comment.