New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-14462; [4/N] Add GroupMetadataManager: ConsumerGroups Management, Members Management and Reconciliation Logic #13476
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Mainly looked at GroupMetadataManager.java and left some minor comments, will take a bit more to digest it
* a member. | ||
* | ||
* @param updatedMemberId The member id. | ||
* @param updatedMemberSubscriptions The updated member. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should this be The member's updated topic subscriptions?
records.add(newGroupSubscriptionMetadataRecord(groupId, subscriptionMetadata)); | ||
} | ||
|
||
groupEpoch += 1; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
to confirm, we will reach this line when a new member joins since the new ConsumerGroupMember's topic subscription is initialized as empty on group.getOrMaybeCreateMember()
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's correct.
import static org.junit.jupiter.api.Assertions.assertNotNull; | ||
import static org.junit.jupiter.api.Assertions.assertThrows; | ||
|
||
public class GroupMetadataManagerTest { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we add a test case for when PartitionAssignor.assign() throws PartitionAssignorException or will this be handled elsewhere?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
List<org.apache.kafka.coordinator.group.Record> records = new ArrayList<>(); | ||
Map<String, ConsumerGroupMemberAssignment> newTargetAssignment = new HashMap<>(); | ||
|
||
// Compute delta from previous to new assignment and create the |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we actually compute the delta here? it seems like we're comparing old vs. new and add the entire new assignment to the log if they're different.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The delta means that we only write a record for a member if its assignment is different from the previous one.
.build(); | ||
|
||
// Checking the reference is enough here because a new instance | ||
// is created only the state has changed. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: only when the state has changed
) { | ||
return new Record( | ||
new ApiMessageAndVersion( | ||
new ConsumerGroupCurrentMemberAssignmentKey() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"Assignment" and "Member" are swapped in ConsumerGroupCurrentMemberAssignmentKey
and ConsumerGroupTargetAssignmentMemberKey
which seems to be because we also have ConsumerGroupTargetAssignmentMetadataKey
. Is it weird to use ConsumerGroupCurrentAssignmentMemberKey
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see... The reason for putting Member
and Metadata
second in the target assignment case is because Member
and Metadata
are attributes of the target assignment. For the current assignment, we don't have this so I went with ConsumerGroupCurrentMemberAssignmentKey/Value
which is a bit more readable in my opinion.
That being said, I am not attached to it at all. Should we just use ConsumerGroupCurrentAssignmentKey/Value
without Member
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, I prefer to keep the Member keyword because it suggests that the key is unique per member.
i agree ConsumerGroupCurrentMemberAssignmentKey/Value
is more readable, but it is also confusing that the ordering is different
* @param ownedTopicPartitions The partitions provided by the client in the request. | ||
* @param target The partitions that they client should have. | ||
* | ||
* @return A boolean indicating whether the owned partitions are a subset of not. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: subset or not
List<ConsumerGroupHeartbeatRequestData.TopicPartitions> ownedTopicPartitions, | ||
Map<Uuid, Set<Integer>> target | ||
) { | ||
if (ownedTopicPartitions == null) return false; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To confirm, the consumer will send an empty list instead of null when heartbeating with no owned partitions?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's correct. null
means that there is not change since the last heartbeat.
} | ||
} else if (request.memberEpoch() == 0) { | ||
if (request.rebalanceTimeoutMs() == -1) { | ||
throw new InvalidRequestException("RebalanceTimeoutMs must in first request."); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: must be provided
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi David, I started to read the PR and left very minor comments (mostly nits) while I am working on understanding the protocol. Thanks!
* the hard and the soft state of the groups. This class has two kinds of methods: | ||
* 1) The request handlers which handle the requests and generate a response and records to | ||
* mutate the hard state. Those records will be written by the runtime and applied to the | ||
* hard stay via the replay methods. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: stay -> state
|
||
@Override | ||
public int hashCode() { | ||
int result = memberId != null ? memberId.hashCode() : 0; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: maybe Objects.hashCode()
could be used?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I just relied on the auto-generated code for the hashcode. Let me try to clean this up.
); | ||
} | ||
|
||
public static Map.Entry<Uuid, Set<Integer>> mkOrderedTopicAssignment( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: ordered -> sorted
* a list of records to update the state machine. | ||
*/ | ||
private Result<ConsumerGroupHeartbeatResponseData> consumerGroupHeartbeat( | ||
String groupId, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: maybe a class could encapsulate all these parameters? Or maybe a DTO could be re-used?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I considered whether passing RequestContext
and ConsumerGroupHeartbeatRequestData
would be better here. Would it be better?
groupEpoch += 1; | ||
records.add(newGroupEpochRecord(groupId, groupEpoch)); | ||
|
||
log.info("[GroupId " + groupId + "] Bumped group epoch to " + groupEpoch + "."); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: maybe this line and the one above could be merged (to avoid interleaving logs)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will take a look.
) { | ||
Objects.requireNonNull(records); | ||
Objects.requireNonNull(assignments); | ||
this.records = records; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit - this.records = Objects.requireNonNull(records);
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ack.
throwIfMemberEpochIsInvalid(member, memberEpoch, ownedTopicPartitions); | ||
|
||
if (memberEpoch == 0) { | ||
log.info("[GroupId " + groupId + "] Member " + memberId + " re-joins the consumer group."); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this log provided when a new member joins the group? (or is it always re-joins?)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is provided in both cases.
} | ||
|
||
if (group == null) { | ||
ConsumerGroup consumerGroup = new ConsumerGroup(snapshotRegistry, groupId); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should the consumer group creation be logged?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think that it is necessary.
throw new UnknownMemberIdException(String.format("Member %s is not a member of group %s.", | ||
memberId, groupId)); | ||
} | ||
member = new ConsumerGroupMember.Builder(memberId).build(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should the new member creation be logged?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We already have the log printed when the member joins or re-joins. Based on this, we know that a member is implicitly created.
) { | ||
List<Record> records = new ArrayList<>(); | ||
|
||
ConsumerGroup group = getOrMaybeCreateConsumerGroup(groupId, false); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just for my understanding, does the consumer group protocol allows the use case where the coordinator receives a LeaveGroup
but the consumer group and/or member is not registered with the coordinator? Can it happen in case of heartbeat request loss, an invalid client request, or after a coordinator change?
If it is the case the group did not exist, do we need to compute a new assignment?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the member or the group does not exist, the request is rejected with an unknown group id or an unknown member id error.
Extracted a few simple classes into #13520. |
We will also do a separate PR for the changes in the assignor and the records. |
) { | ||
return new Record( | ||
new ApiMessageAndVersion( | ||
new ConsumerGroupCurrentMemberAssignmentKey() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, I prefer to keep the Member keyword because it suggests that the key is unique per member.
i agree ConsumerGroupCurrentMemberAssignmentKey/Value
is more readable, but it is also confusing that the ordering is different
|
||
/** | ||
* The GroupMetadataManager manages the metadata of all generic and consumer groups. It holds | ||
* the hard and the soft state of the groups. This class has two kinds of methods: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what's the soft state and how is it mutated?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is actually no soft state at the moment. This will come with the implementation of the old protocol.
/** | ||
* The generic and consumer groups keyed by their subscribed topics. | ||
*/ | ||
private final TimelineHashMap<String, TimelineHashSet<Group>> groupsByTopicName; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i don't see this being used right now. Should we update this once we accept a heartbeat request with SubscribedTopicNames
?
i assume this will be used to listen to trigger new assignment computation when there are changes to a topic's metadata?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed it for now. You assumption is correct but this will come in another PR.
/** | ||
* The topics metadata (or image). | ||
*/ | ||
private TopicsImage topicsImage; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will this be updated when we read changes from the metadata log?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Correct.
/** | ||
* Verifies that the partitions currently owned by the member (the ones set in the | ||
* request) matches the ones that the member should own. It matches if the client | ||
* has a least of subset of them. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: has at least
* has a least of subset of them. | ||
* | ||
* @param ownedTopicPartitions The partitions provided by the client in the request. | ||
* @param target The partitions that they client should have. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: that the client/member
* @return The new subscription metadata as an immutable Map. | ||
*/ | ||
public Map<String, TopicMetadata> computeSubscriptionMetadata( | ||
String updatedMemberId, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i realize it should be read as updated member's id but initially i read it as the updated memberId. can we just use memberId?
} | ||
|
||
/** | ||
* Replays ConsumerGroupPartitionMetadataKey/Value to update the hard state of |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Curious as to why the name includes Partition and not Topics. my understanding is this record holds all subscribed topics per group id
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good question. I don't remember why we called it like this. Once we have merged all these PRs, I will do another one to rename records.
members.forEach((memberId, member) -> { | ||
if (!memberId.equals(updatedMemberId)) { | ||
updateSubscription.accept(member.subscribedTopicNames()); | ||
} | ||
}); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it's a bit unfortunate that we are iterating through the entire members map every time a single member changes its subscription. i wonder if we can keep a map of TopicMetadata to a set of member ids
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let me see if i can do this.
Closing this PR as I have opened smaller ones with the same code. @jeffkbkim I have addressed some of your last comments in #13639. |
This PR adds the GroupMetadataManager to the group-coordinator module. This manager is responsible for handling the groups management, the members management and the entire reconciliation process. At this point, only the new consumer group type/protocol is implemented.
As you will see, the new manager is based on an architecture inspired from the quorum controller. A request can access/read the state but can't mutate it directly. Instead, a list of records is generated together with the response and those records will be applied to the state by the runtime framework. We use timeline data structures. Note that the runtime framework is not part of this PR. It will come in a following one.
For the reviewers, I suggest starting from the GroupMetadataManager.consumerGroupHeartbeat method. From there, you will how the consumer group heartbeat is handled and how all the classes fit together. Then, it is possible to review the classes independently, I suppose.
Note that I have diverged from the KIP in a few places. Firstly, I have adapted to assignor interface to be more convenient with the internals. This is something that we could still change. Secondly, I have adapted a few records. Especially, I have changed how the current state of the member is persisted. The way that we had in the KIP was a but naive and not practical at all for the implementation.
Committer Checklist (excluded from commit message)