-
Notifications
You must be signed in to change notification settings - Fork 14.6k
KAFKA-18117; KAFKA-18729: Use assigned topic IDs to avoid full metadata requests on broker-side regex #19814
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@lianetm Thanks for the patch. I left some comments/questions.
* Check if the topic ID has been received in an assignment | ||
* from the coordinator after subscribing to a broker-side regex. | ||
*/ | ||
public synchronized boolean isAssignedFromRe2j(Uuid topic) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: topicId?
/** | ||
* Topic IDs received in an assignment from the coordinator when using the Consumer rebalance protocol. | ||
* This will be used to include assigned topic IDs in metadata requests when the consumer | ||
* does not know the topic names (ex. when the user subscribes to a RE2J regex computed on the broker) | ||
*/ | ||
private Set<Uuid> assignedTopicIds; | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that this is fine for addressing the bug. However, it would be great if we could try to consolidate it with the assignment in the future. I haven't look into it but it seems feasible to consolidate all the various assignment states that we have (here and in the membership manager) into a single view. Have you considered it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeap, totally agree, just that I gave it a try and seemed like too much of a refactoring on the MembershipMgr
and SubscriptionState
at this point (close to the 4.1 cut), so went with the small fix to ensure it lands.
But totally agree that we could extract the assignment state that is now into the subscription state for the classic consumer, and in the subscriptionState and membershipMgr for the new consumer. Filing a jira and I'll address that myself after this if that's ok.
return MetadataRequest.Builder.allTopics(); | ||
List<String> topics = new ArrayList<>(); | ||
topics.addAll(subscription.metadataTopics()); | ||
topics.addAll(transientTopics); | ||
return new MetadataRequest.Builder(topics, allowAutoTopicCreation); | ||
return new MetadataRequest.Builder(topics, subscription.assignedTopicIds(), allowAutoTopicCreation); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am a bit confused here. The javadoc says that the metadata request will include all the topics. However, the builder does not do it internally. Why that? I suppose that we could actually have both transient topics and topic ids at the same time, no?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good catch (mismatch between the javadoc and code because I did add the code at some point but seems I ended up removing it unintentionally).
You're right that we could need metadata for assigned topic IDs and transient topics (names), but since the metadata request handling on the broker does not support that we have no choice but to request them all. Added the fix.
Since we're here, curious, would it make sense to review that broker-side logic to process metadata requests and take both, names and IDs if present? (this bit, that at the moment only considers topic names if no topic IDs provided
kafka/core/src/main/scala/kafka/server/KafkaApis.scala
Lines 899 to 902 in c4a769b
else if (useTopicId) | |
knownTopicNames | |
else | |
metadataRequest.topics.asScala.toSet |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thinking more about this I simplified it a bit more around the case where the consumer may be using boker-side regex and transient topics. So the consumer would need metadata for topic IDs and names at the same time only if all this happens at the same time:
- call to offsets-related APIs (beginning/endOffsets or offsetsForTimes)
- topic not in metadata cache
- new partition assigned using RE2J (in background received in HB), for topic not in metadata cache
Probably an edge case, but went ahead with the change to simply "pipeline" the requests, resolving the reconciliation first (seemed sensible to prioritize not blocking reconciliation/poll, than a single API call to beginningOffsets/endOffsets/offsetsForTimes, thoughts?)
So with the current change the consumer that needs metadata for a topic ID from RE2J and transient topics will send a first request for the assigned topic ID metadata. Once that resolved, it will send the request for transient topics if any, on a next background poll iteration (background poll -> network client poll -> request metadata).
I'm adding more tests but pushed the change to align
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm.
…ta requests on broker-side regex (#19814) This PR uses topic IDs received in assignment (under new protocol) to ensure that only these assigned topics are included in the consumer metadata requests performed when the user subscribes to broker-side regex (RE2J). For handling the edge case of consumer needing metadata for topics IDs (from RE2J) and topic names (from transient topics), the approach is to send a request for the transient topics needed temporarily, and once those resolved, the request for the topic IDs needed for RE2J will follow. (this is because the broker doesn't accept requests for names and IDs at the same time) With the changes we also end up fixing another issue (KAFKA-18729) aimed at avoiding iterating the full set of assigned partitions when checking if a topic should be retained from the metadata response when using RE2J. Reviewers: David Jacot <djacot@confluent.io>
This PR uses topic IDs received in assignment (under new protocol) to
ensure that only these assigned topics are included in the consumer
metadata requests performed when the user subscribes to broker-side
regex (RE2J).
For handling the edge case of consumer needing metadata for topics IDs
(from RE2J) and topic names (from transient topics), the approach is to
send a request for the transient topics needed temporarily, and once
those resolved, the request for the topic IDs needed for RE2J will
follow. (this is because the broker doesn't accept requests for names
and IDs at the same time)
With the changes we also end up fixing another issue (KAFKA-18729) aimed
at avoiding iterating the full set of assigned partitions when checking
if a topic should be retained from the metadata response when using
RE2J.
Reviewers: David Jacot djacot@confluent.io