-
Notifications
You must be signed in to change notification settings - Fork 13.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-16793: Heartbeat API for upgrading ConsumerGroup #15988
Conversation
"classic-group-heartbeat", | ||
topicPartitionFor(request.groupId()), | ||
(coordinator, __) -> coordinator.classicGroupHeartbeat(context, request) | ||
Duration.ofMillis(config.offsetCommitTimeoutMs), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not necessarily a comment for this PR but i wonder if we should change the name of this config since it's being used for all writes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that we should actually not use offsetCommitTimeoutMs
as timeout for any operations except the one writing offsets. I think that we used it because we had no others. We can address this separately.
member.state() == MemberState.UNREVOKED_PARTITIONS || | ||
(member.state() == MemberState.UNRELEASED_PARTITIONS && !group.hasUnreleasedPartitions(member))) { | ||
error = Errors.REBALANCE_IN_PROGRESS; | ||
scheduleConsumerGroupJoinTimeout(groupId, memberId, member.rebalanceTimeoutMs()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we are saying that we cancel the join timeout when we first convert to consumer group, then when we have a group epoch bump we tell the classic group member we're rebalancing and they should send a join request. is my understanding correct?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we cancel the join timeout when we first convert to consumer group
We don't cancel the timeout in case the conversion fails and the state needs to be reverted. The classic group join timeout does nothing if the group is a consumer group.
when we have a group epoch bump we tell the classic group member we're rebalancing and they should send a join request
Yes correct, and the timeout here is for the member instead of the whole group. For each member, the rebalance will be something like
- heartbeat -- if there's an ongoing rebalance, schedule the join timeout
- join -- cancel the join timeout; schedule the sync timeout
- sync -- cancel the sync timeout; maybe schedule a join timeout if a new rebalance ongoing
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@dongnuo123 Thanks for the patch. I left a few comments for consideration.
"classic-group-heartbeat", | ||
topicPartitionFor(request.groupId()), | ||
(coordinator, __) -> coordinator.classicGroupHeartbeat(context, request) | ||
Duration.ofMillis(config.offsetCommitTimeoutMs), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that we should actually not use offsetCommitTimeoutMs
as timeout for any operations except the one writing offsets. I think that we used it because we had no others. We can address this separately.
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
Outdated
Show resolved
Hide resolved
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
Outdated
Show resolved
Hide resolved
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
Show resolved
Hide resolved
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
Outdated
Show resolved
Hide resolved
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
Outdated
Show resolved
Hide resolved
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java
Outdated
Show resolved
Hide resolved
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
Show resolved
Hide resolved
...p-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
Show resolved
Hide resolved
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. @dongnuo123 I left one nit about a typo. We can fix it in your next PR.
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
Show resolved
Hide resolved
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
Outdated
Show resolved
Hide resolved
// 3) The member's partitions pending assignment are free, so it can rejoin to get the complete assignment. | ||
if (member.memberEpoch() < group.groupEpoch() || | ||
member.state() == MemberState.UNREVOKED_PARTITIONS || | ||
(member.state() == MemberState.UNRELEASED_PARTITIONS && !group.waitingOnUnreleasedPartition(member))) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i'm not sure i fully understand this part.
UNRELEASED_PARTITIONS means that the member is waiting on partitions. However, i'm guessing the helper checks that the latest state does in fact have all partitions released but we want it to rejoin to get the updated assignment. Is this correct?
Will this member be updated to STABLE state in the next CurrentAssignmentBuilder#computeNextAssignment?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the helper checks that the latest state does in fact have all partitions released but we want it to rejoin to get the updated assignment
Yes this is correct.
Will this member be updated to STABLE state in the next CurrentAssignmentBuilder#computeNextAssignment
Yes it will in the reconciliation part in the classicGroupJoinToConsumerGroup
kafka/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
Lines 1737 to 1745 in 27a6c15
updatedMember = maybeReconcile( | |
groupId, | |
updatedMember, | |
group::currentPartitionEpoch, | |
targetAssignmentEpoch, | |
targetAssignment, | |
ownedTopicPartitions, | |
records | |
); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
got it. thanks!
@@ -12143,7 +12183,6 @@ public void testClassicGroupSyncToConsumerGroupWithAllConsumerProtocolVersions() | |||
// Consumer group with two members. | |||
// Member 1 uses the classic protocol and member 2 uses the consumer protocol. | |||
GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() | |||
.withConsumerGroupMigrationPolicy(ConsumerGroupMigrationPolicy.DOWNGRADE) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why were these removed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It actually doesn't matter. This was first added because I copied and pasted it from the downgrade unit test.
.setName("range") | ||
.setMetadata(Utils.toArray(ConsumerProtocol.serializeSubscription( | ||
new ConsumerPartitionAssignor.Subscription( | ||
Arrays.asList("foo"), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: i think we can use Collections.singletonList("foo"),
.setName("range") | ||
.setMetadata(Utils.toArray(ConsumerProtocol.serializeSubscription( | ||
new ConsumerPartitionAssignor.Subscription( | ||
Arrays.asList("foo"), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Collections.singletonList("foo")
.setName("range") | ||
.setMetadata(Utils.toArray(ConsumerProtocol.serializeSubscription( | ||
new ConsumerPartitionAssignor.Subscription( | ||
Arrays.asList("foo"), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: singletonList
CoordinatorRecordHelpers.newCurrentAssignmentTombstoneRecord(groupId, memberId), | ||
CoordinatorRecordHelpers.newTargetAssignmentTombstoneRecord(groupId, memberId), | ||
CoordinatorRecordHelpers.newMemberSubscriptionTombstoneRecord(groupId, memberId), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not necessary for this PR but i was wondering if we should have a helper for this. it's hard to know these are records created when a member is removed without existing context
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Another possible way is to make groupMetadataManager#removeMember package private for testing. Not sure which one is better.
kafka/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
Lines 2049 to 2053 in 27a6c15
private void removeMember(List<CoordinatorRecord> records, String groupId, String memberId) { | |
records.add(newCurrentAssignmentTombstoneRecord(groupId, memberId)); | |
records.add(newTargetAssignmentTombstoneRecord(groupId, memberId)); | |
records.add(newMemberSubscriptionTombstoneRecord(groupId, memberId)); | |
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
that works for me
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would not do this because you end up with the same code to generate the records on both sides. If there is a bug in groupMetadataManager#removeMember, you don't catch it. So I would rather keep it explicit or have an helper in the test to reduce the duplication.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
All the |
commit 93238ae Author: Antoine Pourchet <antoine@responsive.dev> Date: Thu May 23 13:45:29 2024 -0600 KAFKA-15045: (KIP-924 pt. 10) Topic partition rack annotation simplified (apache#16034) This PR uses the new TaskTopicPartition structure to simplify the build process for the ApplicationState, which is the input to the new TaskAssignor#assign call. Reviewers: Anna Sophie Blee-Goldman <ableegoldman@apache.org> commit 4020307 Author: Kuan-Po (Cooper) Tseng <brandboat@gmail.com> Date: Fri May 24 02:51:26 2024 +0800 KAFKA-16795 Fix broken compatibility in kafka.tools.NoOpMessageFormatter, kafka.tools.DefaultMessageFormatter, and kafka.tools.LoggingMessageFormatter (apache#16020) This commit allows users to apply the scala version Formatters, but users will receive the warning messages about deprecation. This compatibility support will be removed from 4.0.0 Reviewers: Chia-Ping Tsai <chia7712@gmail.com> commit c3018ef Author: TingIāu "Ting" Kì <51072200+frankvicky@users.noreply.github.com> Date: Fri May 24 01:15:56 2024 +0800 KAFKA-16804: Replace archivesBaseName with archivesName (apache#16016) Reviewers: Chia-Ping Tsai <chia7712@gmail.com>, Greg Harris <greg.harris@aiven.io> commit 0ba15ad Author: Edoardo Comar <ecomar@uk.ibm.com> Date: Thu May 23 17:17:56 2024 +0100 KAFKA-15905 Restarts of MirrorCheckpointTask should not permanently i… (apache#15910) * KAFKA-15905 Restarts of MirrorCheckpointTask should not permanently interrupt offset translation MirrorCheckpointTask reloads the last checkpoint at start, OffsetSyncStore stores OffsetSyncs before reading till end. If CheckpointTask cannot read checkpoints at startup, use previous OffsetSyncStore load logic, with warning log message about degraded offset translation. Also addresses KAFKA-16622 Mirromaker2 first Checkpoint not emitted until consumer group fully catches up once because the OffsetSyncStore store is populated before reading to log end. Co-Authored-By: Adrian Preston <prestona@uk.ibm.com> Reviewers: Greg Harris <greg.harris@aiven.io> commit 5a48984 Author: Viktor Somogyi-Vass <viktorsomogyi@gmail.com> Date: Thu May 23 17:36:39 2024 +0200 KAFKA-15649: Handle directory failure timeout (apache#15697) A broker that is unable to communicate with the controller will shut down after the configurable log.dir.failure.timeout.ms. The implementation adds a new event to the Kafka EventQueue. This event is deferred by the configured timeout and will execute the shutdown if the heartbeat communication containing the failed log dir is still pending with the controller. Reviewers: Igor Soarez <soarez@apple.com> commit 8d117a1 Author: Mickael Maison <mimaison@users.noreply.github.com> Date: Thu May 23 17:03:24 2024 +0200 KAFKA-16825: Update netty/jetty/jackson/zstd dependencies (apache#16038) Reviewers: Luke Chen <showuon@gmail.com> commit ab0cc72 Author: Mickael Maison <mimaison@users.noreply.github.com> Date: Thu May 23 16:01:45 2024 +0200 MINOR: Move parseCsvList to server-common (apache#16029) Reviewers: Chia-Ping Tsai <chia7712@gmail.com> commit 14b5c4d Author: Dongnuo Lyu <139248811+dongnuo123@users.noreply.github.com> Date: Thu May 23 02:27:00 2024 -0400 KAFKA-16793; Heartbeat API for upgrading ConsumerGroup (apache#15988) This patch implements the heartbeat api to the members that use the classic protocol in a ConsumerGroup. Reviewers: Jeff Kim <jeff.kim@confluent.io>, David Jacot <djacot@confluent.io> commit e692fee Author: Jeff Kim <kimkb2011@gmail.com> Date: Thu May 23 02:24:23 2024 -0400 MINOR: fix flaky testRecordThreadIdleRatio (apache#15987) DelayEventAccumulator should return immediately if there are no events in the queue. Also removed some unused fields inside EventProcessorThread. Reviewers: Gaurav Narula <gaurav_narula2@apple.com>, Chia-Ping Tsai <chia7712@gmail.com>, David Jacot <djacot@confluent.io> commit bef83ce Author: Nick Telford <nick.telford@gmail.com> Date: Thu May 23 05:34:31 2024 +0100 KAFKA-15541: Add iterator-duration metrics (apache#16028) Part of [KIP-989](https://cwiki.apache.org/confluence/x/9KCzDw). This new `StateStore` metric tracks the average and maximum amount of time between creating and closing Iterators. Iterators with very high durations can indicate to users performance problems that should be addressed. If a store reports no data for these metrics, despite the user opening Iterators on the store, it suggests those iterators are not being closed, and have therefore leaked. Reviewers: Matthias J. Sax <matthias@confluent.io>
This patch implements the heartbeat api to the members that use the classic protocol in a ConsumerGroup. Reviewers: Jeff Kim <jeff.kim@confluent.io>, David Jacot <djacot@confluent.io>
This patch implements the heartbeat api to the members that use the classic protocol in a ConsumerGroup. Reviewers: Jeff Kim <jeff.kim@confluent.io>, David Jacot <djacot@confluent.io>
This patch implements the heartbeat api to the members that use the classic protocol in a ConsumerGroup.
Committer Checklist (excluded from commit message)