Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-7641: Introduce "group.max.size" config to limit group sizes #6163

Merged
merged 16 commits into from
Feb 2, 2019
Merged

KAFKA-7641: Introduce "group.max.size" config to limit group sizes #6163

merged 16 commits into from
Feb 2, 2019

Conversation

stanislavkozlovski
Copy link
Contributor

This patch introduces a new config - "consumer.group.max.size", which caps the maximum size any consumer group can reach. It has a default value of Int.MAX_VALUE.
Once a consumer group is of the maximum size, subsequent JoinGroup requests receive a MAX_SIZE_REACHED error.

In the case where the config is changed and a Coordinator broker with the new config loads an old group that is over the threshold, members are kicked out of the group and a rebalance is forced.

I have added two integration tests for both scenarios - a member joining an already-full group and a rolling restart with a new config

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

Copy link
Contributor

@gwenshap gwenshap left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Left quite a few comments. Two high level things that worry me:

  1. I had lots of unrelated consumer test failures with this PR, leading me to guess that something about the logic isn't robust yet.
  2. Shrinking seems like the trickiest bit, both to implement and to test. I think we don't really need - if a group is really so big that it causes issues, it will eventually timeout and rebalance... and then we'll enforce the new size with "join group". WDYT?
    If we do keep shrinking, I left some suggestions for simpler implementation. But I'm not confident that I understand the group reload logic enough, so asked @hachikuji to weigh in.

/**
* Indicates that a consumer group is already at its configured maximum capacity and cannot accommodate more members
*/
public class GroupMaxSizeReachedException extends ApiException {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be Retriable? in theory, others could leave the group while we are retrying? Not 100% sure if we want any retriable exception in join group, which is time sensitive.

cc @hachikuji for second opinion :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the KIP discussions we settled on fatally failing a consumer that encounters this error.

Does the consumer retry Retriable exceptions up to a certain point? I'm not sure. If we could retry up to 3 times and then fatally fail, that would be somewhat better

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it probably should be fatal. If a user is bumping up against the limit, we want them to know about it. Not sure there is much benefit trying to operate too finely at the edge.

import org.apache.kafka.common.errors.DelegationTokenAuthorizationException;
import org.apache.kafka.common.errors.DelegationTokenDisabledException;
import org.apache.kafka.common.errors.DelegationTokenExpiredException;
import org.apache.kafka.common.errors.DelegationTokenNotFoundException;
import org.apache.kafka.common.errors.DelegationTokenOwnerMismatchException;
import org.apache.kafka.common.errors.DuplicateSequenceException;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Can you configure your IDE not to reorder imports?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, reverted these back

@@ -48,6 +48,7 @@ import scala.math.max
* since the delayed operation is completed only if the group lock can be acquired.
*/
class GroupCoordinator(val brokerId: Int,
val groupMaxSize: Int,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any reason this isn't in GroupConfig?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Completely missed it, my bad

@@ -159,8 +160,12 @@ class GroupCoordinator(val brokerId: Int,
} else if (!group.supportsProtocols(protocolType, MemberMetadata.plainProtocolSet(protocols))) {
responseCallback(joinError(JoinGroupRequest.UNKNOWN_MEMBER_ID, Errors.INCONSISTENT_GROUP_PROTOCOL))
} else {
val newMemberId = clientId + "-" + group.generateMemberIdSuffix
if (groupIsFull(group)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

few style nits:

  • the if-else style here is not consistent
  • use of NoMemberId vs JoinGroupRequest.UNKNOWN_MEMBER_ID. I'm not sure why we have both, but we probably want to be consistent with the other two places where we return an error here.
  • Do we really need the "return" here? Scala style recommends against multiple exit points.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

re: NoMemberId, I had seen it from the removeMemberAndUpdateGroup. I also think it should use JoinGroupRequest.UNKNOWN_MEMBER_ID, especially now that the lines are right next to each other after the rebase

Agreed with the style, it reads better as an else if for sure

@@ -651,6 +656,11 @@ class GroupCoordinator(val brokerId: Int,
group.inLock {
info(s"Loading group metadata for ${group.groupId} with generation ${group.generationId}")
assert(group.is(Stable) || group.is(Empty))
if (groupIsOverCapacity(group)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we really need to shrink? If we detect a size issue and force a rebalance, everyone is forced to re-join and then the "group too large" logic in the JoinGroup path will kick in.

Personally, I'd leave existing groups alone. If the size causes issues, they'll timeout and rebalance themselves anyway. And if there are no issues, why introduce new issues.

@hachikuji should probably check the logic here, since reloading of groups has a bunch of logic (including transactions) that I'm not comfortable with.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, this is the trickiest part of the KIP. My naive initial reasoning was that we might want to maintain the same leader but now that I think of it that is more or less pointless (unless it loads some state in its initial PartitionAssignor) and we most likely will pick another leader from the subsequent rebalance

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not a bad idea to preserve the leader, but I think Gwen's suggestion seems simpler. I think the main thing I was hoping is that the resizing could be graceful. The existing members should be able to commit their offsets before getting kicked out. It seems like we don't get that with this approach since we are forcefully removing some members.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By kicking out the extra members, like Jason said those members lose their opportunities to commit the last batch. However on the other hand, this should only happen when the server operator believes that an absurd group size is configured that should be prevented. In this sense, the group is interpreted as over capacity, so allowing rest of consumers take over the jobs should be very light lifting, which I don't see an obvious negative impact. Thoughts?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this sense, the group is interpreted as over capacity, so allowing rest of consumers take over the jobs should be very light lifting
Sorry, I couldn't understand this exactly. Do you mean that it's reasonable to "shrink" the group in one way or another and not let it continue over capacity?

My thinking is to go with a forced rebalance

*/
def shrinkTo(maxSize: Int): Unit = {
if (members.size < maxSize)
throw new IllegalArgumentException(s"Cannot shrink group $groupId to $maxSize as it's current count ${members.size} is smaller.")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we really want to throw? Rather than just silently do nothing?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't have a strong opinion about this

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps a log message (warning) would suffice?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we just use assert here? As long as the change is robust, this should never happen which makes me believe adding assertion will be a better approach to avoid future changes affecting group max size logic.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The argument name maxSize suggests this should be a no-op if the group is already smaller. I'd just remove the check.

val newMembers = new mutable.HashMap[String, MemberMetadata]
val maxSizeWithoutLeader = maxSize - 1

members.foreach(memberEntry => {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm worried about iterating the entire member map and not sure it is necessary.
I think we can assume that the number of members we remove is much smaller than the total number of members (i.e. the group is just slightly oversized).

Maybe this is cleaner:

  • calculate how many members to remove.
  • find N members out of members.keys
  • call remove() on each (thereby not rewriting lots of logic, not moving around lots of stuff in memory, etc).

WDYT?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah this is pretty inefficient.
My thinking is that we should run through this code path only on broker rolls with a new config. But regarding your other comment, shrinking may turn out to be completely unnecessary. I'll wait for @hachikuji to give his thoughts as well before removing it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess this part of the logic is to add as many as members while making sure the leader is successfully joining the newMembers group. How about we just put the leader into the newMembers list first and continue the iteration without worrying about it? Note that double putting of leader shouldn't be a problem. This should make the logic simpler here as I perceived.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree that @gwenshap's idea is more efficient. Otherwise, if we are sticking to the existing logic, we might be able to leverage the retain(...) function and avoid creating a second hash map.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

retain() is a cool function but I will go with @gwenshap's suggestion as it makes the updating of supportedProtocols easier

val GroupMinSessionTimeoutMsDoc = "The minimum allowed session timeout for registered consumers. Shorter timeouts result in quicker failure detection at the cost of more frequent consumer heartbeating, which can overwhelm broker resources."
val GroupMaxSessionTimeoutMsDoc = "The maximum allowed session timeout for registered consumers. Longer timeouts give consumers more time to process messages in between heartbeats at the cost of a longer time to detect failures."
val GroupInitialRebalanceDelayMsDoc = "The amount of time the group coordinator will wait for more consumers to join a new group before performing the first rebalance. A longer delay means potentially fewer rebalances, but increases the time until processing begins."
val GroupMaxSizeDoc = "The maximum amount of consumers that a single consumer group can accommodate."
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

super nit: "maximum number of..." amount usually refers to things that aren't counted individually.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

English! :)

@@ -188,14 +200,14 @@ class ConsumerBounceTest extends BaseRequestTest with Logging {
}

sendRecords(numRecords, newtopic)
receiveRecords(consumer, numRecords, newtopic, 10000)
receiveRecords(consumer, numRecords, 10000)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure I understand what the changes to receive records are doing here...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

receiveRecords used to accept a topic parameter which it did not use at all, so I removed it

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you're asking in a more general sense - I added a way to handle exceptions in receiveRecords through a callback. This was to ease implementation since I needed to run receiveRecords() in parallel. I played a lot with these tests, now I clearly see that callback logic isn't needed.

@@ -190,6 +193,27 @@ class GroupCoordinatorTest extends JUnitSuite {
assertEquals(Errors.NOT_COORDINATOR, joinGroupError)
}

@Test
def testJoinGroupShouldReceiveErrorIfGroupOverMaxSize() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seems to be consistently failing for me with:
java.util.concurrent.TimeoutException: Futures timed out after [1 millisecond]

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do not see any failures. Passed 100/100.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is now failing for me too.

@stanislavkozlovski
Copy link
Contributor Author

Hmm, the rebase with KIP-394 caused the test failures. I'll need to investigate further

@@ -743,7 +753,8 @@ class GroupCoordinator(val brokerId: Int,
protocolType: String,
protocols: List[(String, Array[Byte])],
group: GroupMetadata,
callback: JoinCallback): MemberMetadata = {
callback: JoinCallback): Unit = {
val memberId = clientId + "-" + group.generateMemberIdSuffix
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Aha, this is why the tests are failing - left over from the merge with KIP-394 ;)

Copy link
Contributor

@abbccdda abbccdda left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great job Stanislav! The unit tests created are well thought and make me confident about the changes here. Left some comments.

@@ -560,6 +561,8 @@ public void handle(JoinGroupResponse joinResponse, RequestFuture<ByteBuffer> fut
AbstractCoordinator.this.state = MemberState.UNJOINED;
}
future.raise(Errors.MEMBER_ID_REQUIRED);
} else if (error == Errors.GROUP_MAX_SIZE_REACHED) {
future.raise(new GroupMaxSizeReachedException("Consumer group " + groupId + " is at full capacity. There is no room for this consumer."));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would this be easier if we name a constructor as GroupMaxSizeReachedException(String groupId)?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also like what @abbccdda suggests (even though most of the existing exception constructors just take a message string).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also wonder if the message ... at full capacity ... provides enough context to users who may not be aware of this new feature. I think a clearer message that emphasizes on number of consumers in the group already at the configured maximum would be less confusing.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also wonder if the message ... at full capacity ... provides enough context to users who may not be aware of this new feature. I think a clearer message that emphasizes on number of consumers in the group already at the configured maximum would be less confusing.

This sounds good to me.

I was thinking about a groupId as a parameter as well and was wondering how to configure it in Errors.java for when the server uses that. I've updated that, please see it again.

*/
def shrinkTo(maxSize: Int): Unit = {
if (members.size < maxSize)
throw new IllegalArgumentException(s"Cannot shrink group $groupId to $maxSize as it's current count ${members.size} is smaller.")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we just use assert here? As long as the change is robust, this should never happen which makes me believe adding assertion will be a better approach to avoid future changes affecting group max size logic.

val maxSizeWithoutLeader = maxSize - 1

members.foreach(memberEntry => {
val memberId = memberEntry._1
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we just do a two-element (memberId, member) in the foreach iteration to avoid extra definition here?

members.foreach(memberEntry => {
val memberId = memberEntry._1
val member = memberEntry._2
if (!leaderWasAdded && isLeader(memberId)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since leaderWasAdded is only used twice, we might just want to use !leader.isEmpty for straightforward logic lookup.

})
members.clear()
supportedProtocols.clear()
members ++= newMembers
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just want to understand here: (I'm not 100% sure) does a plain assignment for members = newMembers affect the final result?

/**
* Shrinks the consumer group to the given size by removing members while maintaining the leader
*/
def shrinkTo(maxSize: Int): Unit = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/maxSize/groupMaxSize

val newMembers = new mutable.HashMap[String, MemberMetadata]
val maxSizeWithoutLeader = maxSize - 1

members.foreach(memberEntry => {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess this part of the logic is to add as many as members while making sure the leader is successfully joining the newMembers group. How about we just put the leader into the newMembers list first and continue the iteration without worrying about it? Note that double putting of leader shouldn't be a problem. This should make the logic simpler here as I perceived.

* Creates N+1 consumers in the same consumer group, where N is the maximum size of a single consumer group.
* Asserts that the N+1th consumer receives a fatal error when it tries to join the group
*/
private def checkExceptionDuringRebalance(groupId: String, topic: String, executor: ExecutorService, brokersAvailableDuringClose: Boolean): ArrayBuffer[KafkaConsumer[Array[Byte], Array[Byte]]] = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to create a separate function? Since checkExceptionDuringRebalance is used only once.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I initially had it separate as it had a lot of lines of code but now that it's less than 10 lines I think it's a good idea to merge it back in

@@ -651,6 +656,11 @@ class GroupCoordinator(val brokerId: Int,
group.inLock {
info(s"Loading group metadata for ${group.groupId} with generation ${group.generationId}")
assert(group.is(Stable) || group.is(Empty))
if (groupIsOverCapacity(group)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By kicking out the extra members, like Jason said those members lose their opportunities to commit the last batch. However on the other hand, this should only happen when the server operator believes that an absurd group size is configured that should be prevented. In this sense, the group is interpreted as over capacity, so allowing rest of consumers take over the jobs should be very light lifting, which I don't see an obvious negative impact. Thoughts?

@@ -743,7 +749,7 @@ class GroupCoordinator(val brokerId: Int,
protocolType: String,
protocols: List[(String, Array[Byte])],
group: GroupMetadata,
callback: JoinCallback): MemberMetadata = {
callback: JoinCallback): Unit = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's just remove the return type.

Copy link
Contributor

@vahidhashemian vahidhashemian left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @stanislavkozlovski for the PR. I left a few comments.

*/
def shrinkTo(maxSize: Int): Unit = {
if (members.size < maxSize)
throw new IllegalArgumentException(s"Cannot shrink group $groupId to $maxSize as it's current count ${members.size} is smaller.")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps a log message (warning) would suffice?

* Then, 1 consumer should be left out of the group
*/
@Test
def testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup(): Unit = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test fails for me about 50% of the time when I run it locally. I wonder if you see this flakiness on your side.
Sample error messages:

java.lang.AssertionError: expected:<102> but was:<105>

org.scalatest.junit.JUnitTestFailedError: Expected to only receive one exception of type
class org.apache.kafka.common.errors.GroupMaxSizeReachedExceptionduring consumption.
Received: ArrayBuffer(org.apache.kafka.clients.consumer.CommitFailedException: Commit
cannot be completed since the group has already rebalanced and assigned the partitions to
another member. This means that the time between subsequent calls to poll() was longer
than the configured max.poll.interval.ms, which typically implies that the poll loop is
spending too much time message processing. You can address this either by increasing
max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with
max.poll.records.)

java.lang.AssertionError: expected:<102> but was:<136>

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not 50% of the time but I do see some flakiness now. What do you think would be a good way to go around this? Checking for at least X records rather than exact?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After the changes, this test has passed 50/50 times for me

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it now passes for me too (20/20). Thanks!

@@ -560,6 +561,8 @@ public void handle(JoinGroupResponse joinResponse, RequestFuture<ByteBuffer> fut
AbstractCoordinator.this.state = MemberState.UNJOINED;
}
future.raise(Errors.MEMBER_ID_REQUIRED);
} else if (error == Errors.GROUP_MAX_SIZE_REACHED) {
future.raise(new GroupMaxSizeReachedException("Consumer group " + groupId + " is at full capacity. There is no room for this consumer."));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also like what @abbccdda suggests (even though most of the existing exception constructors just take a message string).

@@ -560,6 +561,8 @@ public void handle(JoinGroupResponse joinResponse, RequestFuture<ByteBuffer> fut
AbstractCoordinator.this.state = MemberState.UNJOINED;
}
future.raise(Errors.MEMBER_ID_REQUIRED);
} else if (error == Errors.GROUP_MAX_SIZE_REACHED) {
future.raise(new GroupMaxSizeReachedException("Consumer group " + groupId + " is at full capacity. There is no room for this consumer."));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also wonder if the message ... at full capacity ... provides enough context to users who may not be aware of this new feature. I think a clearer message that emphasizes on number of consumers in the group already at the configured maximum would be less confusing.

val newMembers = new mutable.HashMap[String, MemberMetadata]
val maxSizeWithoutLeader = maxSize - 1

members.foreach(memberEntry => {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree that @gwenshap's idea is more efficient. Otherwise, if we are sticking to the existing logic, we might be able to leverage the retain(...) function and avoid creating a second hash map.

@@ -190,6 +193,27 @@ class GroupCoordinatorTest extends JUnitSuite {
assertEquals(Errors.NOT_COORDINATOR, joinGroupError)
}

@Test
def testJoinGroupShouldReceiveErrorIfGroupOverMaxSize() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do not see any failures. Passed 100/100.

@stanislavkozlovski
Copy link
Contributor Author

I addressed some of the style comments with the latest commit. I will now work on removing the shrinking logic by forcing a rebalance instead. I feel that might be the better and simpler approach

@stanislavkozlovski
Copy link
Contributor Author

Okay I have revisited the group migration and shrinkTo logic - upon loading a big group, the coordinator triggers a rebalance. This gives a chance for the consumers to commit offsets while the Coordinator is collecting JoinGroup requests.

In the way I have written it, I believe there may exist a very slight race condition in between the response for JoinGroup and the removal from the group.
I wanted to first shrink the group, then trigger a rebalance and in the end respond to JoinGroup but I could get the tests to pass with that combination after banging my head against it.

We also saw the last build fail with the test flakiness that @vahidhashemian pointed out. I have changed the test to assert that at least X records have been consumed. It is very peculiar

Copy link
Contributor

@vahidhashemian vahidhashemian left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the updates. It seems to me the latest commit has broken some of the unit tests (while fixing the flaky one). Without that commit all unit tests seem to pass for me.

private static final long serialVersionUID = 1L;

public GroupMaxSizeReachedException(String groupId) {
super("Consumer group " + groupId + " already has as much members as the configured maximum.");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: ... as many members, or even ... already has the configured maximum number of members.

* When we have the consumer group max size configured to X, the X+1th consumer trying to join should receive a fatal exception
*/
@Test
def testConsumerReceivesFatalExceptionWhenGroupPassesMaxSize(): Unit = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems this test doesn't properly handle the failure.

@@ -190,6 +193,27 @@ class GroupCoordinatorTest extends JUnitSuite {
assertEquals(Errors.NOT_COORDINATOR, joinGroupError)
}

@Test
def testJoinGroupShouldReceiveErrorIfGroupOverMaxSize() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is now failing for me too.

* Then, 1 consumer should be left out of the group
*/
@Test
def testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup(): Unit = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it now passes for me too (20/20). Thanks!

@stanislavkozlovski
Copy link
Contributor Author

@vahidhashemian
Good thing the tests fail - they caught a hole in the logic. We would not block a JoinGroup in the scenario where the group is full and a new member wants to join.

The latest commit should fix everything - the tests pass fine locally

Copy link
Contributor

@abbccdda abbccdda left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the updates. Left more comments.

@@ -560,6 +561,8 @@ public void handle(JoinGroupResponse joinResponse, RequestFuture<ByteBuffer> fut
AbstractCoordinator.this.state = MemberState.UNJOINED;
}
future.raise(Errors.MEMBER_ID_REQUIRED);
} else if (error == Errors.GROUP_MAX_SIZE_REACHED) {
future.raise(new GroupMaxSizeReachedException(groupId));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this is non-retriable exception, we could consolidate it into code block L546 as fatal error.

doJoinGroup(group, memberId, clientId, clientHost, rebalanceTimeoutMs, sessionTimeoutMs, protocolType, protocols, responseCallback)
group.inLock {
if (groupIsOverCapacity(group) || (isUnknownMember && groupIsFull(group))) {
if (!isUnknownMember && group.has(memberId)) // oversized group, need to shed members
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't need to check anything here I guess, we should just need to attempt removing the member id (because no member.id match will do a no-op IMO.

} else {
doJoinGroup(group, memberId, clientId, clientHost, rebalanceTimeoutMs, sessionTimeoutMs, protocolType, protocols, responseCallback)
group.inLock {
if (groupIsOverCapacity(group) || (isUnknownMember && groupIsFull(group))) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we just need to kick out real members until the groupIsFull is met. Probably we could just use groupIsFull to keep rejecting new members, while doesn't care about pending members to make the handling logic simpler.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think that would work.
If we were to detect that groupIsOverCapacity() and start kicking out members until groupIsFull, we might kick out members before they had a chance to commit offsets.

When we receive aJoinGroup request from a member, we know that it has attempted to commit offsets. (see ConsumerCoordinator#onJoinPrepare)

@stanislavkozlovski
Copy link
Contributor Author

Grr, we saw JDK8 fail with the new test:

12:19:07 kafka.api.ConsumerBounceTest > testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup FAILED
12:19:07     java.lang.AssertionError: Received 0, expected at least 102
12:19:07         at org.junit.Assert.fail(Assert.java:88)
12:19:07         at org.junit.Assert.assertTrue(Assert.java:41)
12:19:07         at kafka.api.ConsumerBounceTest.kafka$api$ConsumerBounceTest$$receiveAtLeastRecords(ConsumerBounceTest.scala:554)
12:19:07         at kafka.api.ConsumerBounceTest$$anonfun$testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup$3.apply(ConsumerBounceTest.scala:378)
12:19:07         at kafka.api.ConsumerBounceTest$$anonfun$testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup$3.apply(ConsumerBounceTest.scala:377)
12:19:07         at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
12:19:07         at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
12:19:07         at kafka.api.ConsumerBounceTest.testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup(ConsumerBounceTest.scala:377)
12:19:07 

Meaning it can be somewhat flaky... Does anybody have any recommendations on how to reduce the flakiness here? We could catch such errors and retry at least once

log.error("Attempt to join group failed due to fatal error: {}", error.message());
future.raise(error);
if (error == Errors.GROUP_MAX_SIZE_REACHED) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we have to handle this separately, maybe we could move the check to the top level like the GROUP_AUTHORIZATION check.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would be good to have the log

log.error("Attempt to join group failed due to fatal error: {}", error.message());

Maybe we could move the GROUP_AUTHORIZATION to this level?

@@ -921,6 +931,14 @@ class GroupCoordinator(val brokerId: Int,

def partitionFor(group: String): Int = groupManager.partitionFor(group)

private def groupIsFull(group: GroupMetadata): Boolean = {
group.size == groupConfig.groupMaxSize || groupIsOverCapacity(group)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: this seems a verbose way of saying group.size >= groupConfig.groupMaxSize

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe, I thought it reads better in the if check. I've removed this.
WDYT about groupIsOverCapacity?

} else {
doJoinGroup(group, memberId, clientId, clientHost, rebalanceTimeoutMs, sessionTimeoutMs, protocolType, protocols, responseCallback)
group.inLock {
if (groupIsOverCapacity(group) // oversized group, need to shed members

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Technically this JoinGroup could be a retry from a member that was already accepted. One way to detect this case would be to check the member's awaitingJoinCallback.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great catch

Copy link
Contributor

@abbccdda abbccdda left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great work so far! The algorithm part LGTM now. One meta comment was that do we have logic to check whether GROUP_MAX_SIZE_REACHED is a fatal exception? (Hint from you in KIP-345 lol)

@stanislavkozlovski
Copy link
Contributor Author

@abbccdda @hachikuji I think I've addressed your comments.
I'm interested if you have any suggestions on how to reduce the flakiness of integration tests in ConsumerBounceTest.scala

@abbccdda
Copy link
Contributor

Thanks a lot for the updates! @stanislavkozlovski The implementation LGTM for now.

Copy link

@hachikuji hachikuji left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, looks good overall. I left small comments on the test cases.


RequestFuture<ByteBuffer> future = coordinator.sendJoinGroupRequest();
assertTrue(consumerClient.poll(future, mockTime.timer(REQUEST_TIMEOUT_MS)));
assertEquals(String.format("Consumer group %s already has the configured maximum number of members.", GROUP_ID),

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: this seems a little brittle. Would it be enough to just check the exception type?

@@ -201,6 +201,7 @@ private[group] class GroupMetadata(val groupId: String, initialState: GroupState
def not(groupState: GroupState) = state != groupState
def has(memberId: String) = members.contains(memberId)
def get(memberId: String) = members(memberId)
def size() = members.size

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: since it's just an accessor, maybe we could drop the parenthesis?

// assert group continues to live
val producer = createProducer()
sendRecords(producer, maxGroupSize * 100, topic, numPartitions = Some(maxGroupSize))
stableConsumers.foreach(cons => {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: usually we favor

stableConsumers.foreach { cons =>

val stableConsumers = createConsumersWithGroupId(groupId, maxGroupSize, executor, topic)
val newConsumer = createConsumerWithGroupId(groupId)
var failedRebalance = false
waitForRebalance(5000, subscribeAndPoll(newConsumer, executor = executor, onException = _ => {failedRebalance = true}),

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we assert the expected exception?

executor.submit(CoreUtils.runnable {
try {
consumer.subscribe(Collections.singletonList(topic))
consumer.poll(0)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: do we need to add new code using the deprecated poll api?

stableConsumers
}

def subscribeAndPoll(consumer: KafkaConsumer[Array[Byte], Array[Byte]], executor: ExecutorService, revokeSemaphore: Option[Semaphore] = None,

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have similar logic in PlaintextConsumerTest.subscribePollers. Not sure how easy it is to factor out the common logic, but it would be preferable if possible.

assertEquals(numRecords, received)
}

private def receiveAtLeastRecords(consumer: KafkaConsumer[Array[Byte], Array[Byte]], numRecords: Int, timeoutMs: Long): Unit = {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we could call this receiveAndCommit or something so that the offset commit expectation is clear.

kickedOutConsumerIdx = Some(idx)
}

CoreUtils.inLock(lock) { receivedExceptions += e }

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems we might be able to simplify this. If we receive anything other than GroupMaxSizeReachedException, couldn't we fail directly? Then it could be a simple AtomicBoolean tracking whether or not we have received exactly one exception.

…mer group sizes

This patch introduces a new config - "consumer.group.max.size", which caps the maximum size any consumer group can reach. It has a default value of Int.MAX_VALUE.
Once a consumer group is of the maximum size, subsequent JoinGroup requests receive a MAX_SIZE_REACHED error.

In the case where the config is changed and a Coordinator broker with the new config loads an old group that is over the threshold, members are kicked out of the group and a rebalance is forced.
The Coordinator now triggers a rebalance when it encounters an over-sized group. This gives a chance for the group members to commit offsets. Once the rebalance collects all the `JoinGroup` requests, the Coordinator shrinks the group, triggers another rebalance and responds to the JoinGroup requests to unblock consumers.
…roup is over capacity

Add test to ensure Errors.GROUP_MAX_SIZE_REACHED isn't retriable
Small refactorigs
Copy link

@hachikuji hachikuji left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. I will merge once the build completes (supposing no errors).

@hachikuji hachikuji changed the title KAFKA-7641: Introduce "consumer.group.max.size" config to limit consumer group sizes KAFKA-7641: Introduce "group.max.size" config to limit group sizes Feb 2, 2019
@hachikuji hachikuji merged commit 4420d9e into apache:trunk Feb 2, 2019
jarekr pushed a commit to confluentinc/kafka that referenced this pull request Apr 18, 2019
* AK/trunk:
  fix typo (apache#5150)
  MINOR: Reduce replica.fetch.backoff.ms in ReassignPartitionsClusterTest (apache#5887)
  KAFKA-7766: Fail fast PR builds (apache#6059)
  KAFKA-7798: Expose embedded clientIds (apache#6107)
  KAFKA-7641; Introduce "group.max.size" config to limit group sizes (apache#6163)
  KAFKA-7433; Introduce broker options in TopicCommand to use AdminClient (KIP-377)
  MINOR: Fix some field definitions for ListOffsetReponse (apache#6214)
  KAFKA-7873; Always seek to beginning in KafkaBasedLog (apache#6203)
  KAFKA-7719: Improve fairness in SocketServer processors (KIP-402) (apache#6022)
  MINOR: fix checkstyle suppressions for generated RPC code to work on Windows
  KAFKA-7859: Use automatic RPC generation in LeaveGroups (apache#6188)
  KAFKA-7652: Part II; Add single-point query for SessionStore and use for flushing / getter (apache#6161)
  KAFKA-3522: Add RocksDBTimestampedStore (apache#6149)
  KAFKA-3522: Replace RecordConverter with TimestampedBytesStore (apache#6204)
pengxiaolong pushed a commit to pengxiaolong/kafka that referenced this pull request Jun 14, 2019
…pache#6163)

This patch introduces a new config - "group.max.size", which caps the maximum size any group can reach. It has a default value of Int.MAX_VALUE. Once a group is of the maximum size, subsequent JoinGroup requests receive a MAX_SIZE_REACHED error.

In the case where the config is changed and a Coordinator broker with the new config loads an old group that is over the threshold, members are kicked out of the group and a rebalance is forced.

Reviewers: Vahid Hashemian <vahid.hashemian@gmail.com>, Boyang Chen <bchen11@outlook.com>, Gwen Shapira <cshapi@gmail.com>, Jason Gustafson <jason@confluent.io>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants