-
Notifications
You must be signed in to change notification settings - Fork 13.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-7858: Replace JoinGroup request/response with automated protocol #6419
Conversation
2ee5bdd
to
b68720a
Compare
@cmccabe @hachikuji Could you guys give this diff a quick r? This would unblock the progress on KIP-345, thanks! |
b68720a
to
fbdd1d1
Compare
cc @vahidhashemian on this thread since the change is pretty straightforward and requires no background context. |
fbdd1d1
to
2472a8a
Compare
Retest this please |
8126990
to
85ee20f
Compare
Retest this please |
85ee20f
to
dbb400b
Compare
@abbccdda: looks good, thanks! When processing older responses, I think generation ID needs to default to -1 to match the previous behavior. This should be changed in |
@@ -202,7 +203,7 @@ public AbstractCoordinator(LogContext logContext, | |||
*/ | |||
protected abstract Map<String, ByteBuffer> performAssignment(String leaderId, | |||
String protocol, | |||
Map<String, ByteBuffer> allMemberMetadata); | |||
List<JoinGroupResponseData.JoinGroupResponseMember> allMemberMetadata); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this be a JoinGroupResponseDataSet
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't see this struct anywhere, but I think a list should be fine here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry, I meant JoinGroupResponseMemberSet
. Anyway, a list is fine too-- I don't feel that strongly about it.
dbb400b
to
99578dc
Compare
Retest this please |
@cmccabe @hachikuji mind taking another look? |
@cmccabe Another look when you get time? |
LGTM. Thanks, @abbccdda . |
Thanks! @cmccabe |
…p v0 (#7072) The rebalance timeout was added to the JoinGroup protocol in version 1. Prior to 2.3, we handled version 0 JoinGroup requests by setting the rebalance timeout to be equal to the session timeout. We lost this logic when we converted the API to use the generated schema definition (#6419) which uses the default value of -1. The impact of this is that the group rebalance timeout becomes 0, so rebalances finish immediately after we enter the PrepareRebalance state and kick out all old members. This causes consumer groups to enter an endless rebalance loop. This patch restores the old behavior. Reviewers: Ismael Juma <ismael@juma.me.uk>
…p v0 (apache#7072) The rebalance timeout was added to the JoinGroup protocol in version 1. Prior to 2.3, we handled version 0 JoinGroup requests by setting the rebalance timeout to be equal to the session timeout. We lost this logic when we converted the API to use the generated schema definition (apache#6419) which uses the default value of -1. The impact of this is that the group rebalance timeout becomes 0, so rebalances finish immediately after we enter the PrepareRebalance state and kick out all old members. This causes consumer groups to enter an endless rebalance loop. This patch restores the old behavior. Reviewers: Ismael Juma <ismael@juma.me.uk>
…p v0 (#7072) The rebalance timeout was added to the JoinGroup protocol in version 1. Prior to 2.3, we handled version 0 JoinGroup requests by setting the rebalance timeout to be equal to the session timeout. We lost this logic when we converted the API to use the generated schema definition (#6419) which uses the default value of -1. The impact of this is that the group rebalance timeout becomes 0, so rebalances finish immediately after we enter the PrepareRebalance state and kick out all old members. This causes consumer groups to enter an endless rebalance loop. This patch restores the old behavior. Reviewers: Ismael Juma <ismael@juma.me.uk>
…ession timeout for JoinGroup v0 (apache#7072) TICKET = KAFKA-8653 LI_DESCRIPTION = EXIT_CRITERIA = HASH [b725b3c] ORIGINAL_DESCRIPTION = The rebalance timeout was added to the JoinGroup protocol in version 1. Prior to 2.3, we handled version 0 JoinGroup requests by setting the rebalance timeout to be equal to the session timeout. We lost this logic when we converted the API to use the generated schema definition (apache#6419) which uses the default value of -1. The impact of this is that the group rebalance timeout becomes 0, so rebalances finish immediately after we enter the PrepareRebalance state and kick out all old members. This causes consumer groups to enter an endless rebalance loop. This patch restores the old behavior. Reviewers: Ismael Juma <ismael@juma.me.uk> (cherry picked from commit b725b3c)
*add more validation during KRPC deserialization When deserializing KRPC (which is used for RPCs sent to Kafka, Kafka Metadata records, and some other things), check that we have at least N bytes remaining before allocating an array of size N. Remove DataInputStreamReadable since it was hard to make this class aware of how many bytes were remaining. Instead, when reading an individual record in the Raft layer, simply create a ByteBufferAccessor with a ByteBuffer containing just the bytes we're interested in. Add SimpleArraysMessageTest and ByteBufferAccessorTest. Also add some additional tests in RequestResponseTest. Co-author: Manikumar Reddy <manikumar.reddy@gmail.com> Reviewers: Ismael Juma <ismael@juma.me.uk>, José Armando García Sancio <jsancio@gmail.com>
*add more validation during KRPC deserialization When deserializing KRPC (which is used for RPCs sent to Kafka, Kafka Metadata records, and some other things), check that we have at least N bytes remaining before allocating an array of size N. Remove DataInputStreamReadable since it was hard to make this class aware of how many bytes were remaining. Instead, when reading an individual record in the Raft layer, simply create a ByteBufferAccessor with a ByteBuffer containing just the bytes we're interested in. Add SimpleArraysMessageTest and ByteBufferAccessorTest. Also add some additional tests in RequestResponseTest. Co-author: Manikumar Reddy <manikumar.reddy@gmail.com>
Prioritizing this migration because we have blocking feature from KIP-345 part 1: #6177
Upgrade join group protocols could ease the process of adding group instance id towards JoinGroupResponse.
Committer Checklist (excluded from commit message)