New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-14462; [12/N] Add GroupMetadataManager and ConsumerGroup #13639
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the logs may be misleading if clients join/leave via heartbeat requests but the coordinator fails to append records right?
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
Show resolved
Hide resolved
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
Outdated
Show resolved
Hide resolved
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
Outdated
Show resolved
Hide resolved
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
Show resolved
Hide resolved
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
Outdated
Show resolved
Hide resolved
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
Show resolved
Hide resolved
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
Show resolved
Hide resolved
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
Outdated
Show resolved
Hide resolved
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
Outdated
Show resolved
Hide resolved
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
Outdated
Show resolved
Hide resolved
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
Show resolved
Hide resolved
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
Show resolved
Hide resolved
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
Show resolved
Hide resolved
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
Show resolved
Hide resolved
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java
Show resolved
Hide resolved
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
Show resolved
Hide resolved
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
Show resolved
Hide resolved
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
Outdated
Show resolved
Hide resolved
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java
Outdated
Show resolved
Hide resolved
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java
Outdated
Show resolved
Hide resolved
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java
Outdated
Show resolved
Hide resolved
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java
Outdated
Show resolved
Hide resolved
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java
Outdated
Show resolved
Hide resolved
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java
Show resolved
Hide resolved
...p-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
Show resolved
Hide resolved
...p-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
Show resolved
Hide resolved
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java
Outdated
Show resolved
Hide resolved
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java
Show resolved
Hide resolved
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java
Outdated
Show resolved
Hide resolved
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java
Show resolved
Hide resolved
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java
Outdated
Show resolved
Hide resolved
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
Outdated
Show resolved
Hide resolved
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java
Outdated
Show resolved
Hide resolved
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
Outdated
Show resolved
Hide resolved
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
Outdated
Show resolved
Hide resolved
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
Outdated
Show resolved
Hide resolved
...p-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
Show resolved
Hide resolved
...p-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
Show resolved
Hide resolved
...coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupTest.java
Outdated
Show resolved
Hide resolved
ConsumerGroupMember member1 = new ConsumerGroupMember.Builder("member1") | ||
.setMemberEpoch(1) | ||
.setPreviousMemberEpoch(0) | ||
.setNextMemberEpoch(1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there ever a time the next member epoch is not the same as the member epoch?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i am curious as well. i notice 3 places
CurrentAssignmentBuilder#maybeTransitionFromAssigningToAssigningOrStable
CurrentAssignmentBuilder#transitionToNewTargetAssignmentState
CurrentAssignmentBuilder#maybeTransitionFromRevokingToAssigningOrStable
that both set member epoch and next member epoch to targetAssignmentEpoch
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's right. There is only one case where it is not. When a member must revoke partitions, it stays in its current epoch so member epoch is different from the next epoch in this case.
The rational of keeping track of the next epoch here is to basically prevent recomputing the state while the member is in revoking state. Without it, we would have to recompute it on every heartbeat.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is nextMemberEpoch a bit confusing here? It seems more like a target epoch.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I agree. Let me use targetMemberEpoch
.
...coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupTest.java
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
took a look at ConsumerGroup and ConsumerGroupTest
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java
Outdated
Show resolved
Hide resolved
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java
Outdated
Show resolved
Hide resolved
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java
Outdated
Show resolved
Hide resolved
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java
Outdated
Show resolved
Hide resolved
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java
Show resolved
Hide resolved
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java
Outdated
Show resolved
Hide resolved
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java
Outdated
Show resolved
Hide resolved
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java
Show resolved
Hide resolved
...coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupTest.java
Outdated
Show resolved
Hide resolved
...coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupTest.java
Show resolved
Hide resolved
...coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupTest.java
Outdated
Show resolved
Hide resolved
...coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupTest.java
Show resolved
Hide resolved
...coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupTest.java
Outdated
Show resolved
Hide resolved
...p-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
Outdated
Show resolved
Hide resolved
...p-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
Outdated
Show resolved
Hide resolved
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java
Outdated
Show resolved
Hide resolved
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java
Outdated
Show resolved
Hide resolved
...p-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
Show resolved
Hide resolved
...p-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
Outdated
Show resolved
Hide resolved
...p-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
Show resolved
Hide resolved
...p-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
Show resolved
Hide resolved
...p-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
Outdated
Show resolved
Hide resolved
...p-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
Outdated
Show resolved
Hide resolved
...p-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
Show resolved
Hide resolved
@jolshan @jeffkbkim Thanks for your comments. I have addressed them, I think. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
left minor comments, i think the rest look good.
the comments really help with understanding the code. thanks!
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
Outdated
Show resolved
Hide resolved
throw new FencedMemberEpochException("The consumer group member has a greater member " | ||
+ "epoch (" + receivedMemberEpoch + ") than the one known by the group coordinator (" | ||
+ member.memberEpoch() + "). The member must abandon all its partitions and rejoin."); | ||
} else if (receivedMemberEpoch < member.memberEpoch()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we have a test case for this?
i expect that when we accept a request with the previous epoch, we compute the diff from the request's owned partitions and the target assignment and respond to the consumer (assignedTopicPartitions, pending assignment if exists). which would be identical to what we do for the expected member epoch.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is covered in testConsumerGroupMemberEpochValidation
. If the member comes with the previous epoch and its owned partitions is a subset of its assigned partitions, we accept it and it goes through the regular process. If nothing has changed since the last heartbeat, it will just receive the current assignment/epoch.
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
left minor comments, i think the rest look good.
the comments really help with understanding the code. thanks!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks David :)
This PR adds the GroupMetadataManager to the group-coordinator module. This manager is responsible for handling the groups management, the members management and the entire reconciliation process. At this point, only the new consumer group type/protocol is implemented.
As you will see, the new manager is based on an architecture inspired from the quorum controller. A request can access/read the state but can't mutate it directly. Instead, a list of records is generated together with the response and those records will be applied to the state by the runtime framework. We use timeline data structures. Note that the runtime framework is not part of this PR. It will come in a following one.
For the reviewers, I suggest starting from the GroupMetadataManager.consumerGroupHeartbeat method. From there, you will how the consumer group heartbeat is handled and how all the classes fit together. Then, it is possible to review the classes independently, I suppose.
Committer Checklist (excluded from commit message)