New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-2832: Add a consumer config option to exclude internal topics #932
Conversation
@granthenke Thanks for your review, and fair comments. I'll update the patch. |
558e49a
to
ca4c47f
Compare
I would be interested in what others think about the default value. @rajinisivaram Mentioned on my comment on the old commit here that default behavior for the new consumer in 0.9.0.0 and 0.9.0.1 is false because excluding the internal topics did not exist. Is that behavior we need to maintain or can we set the default to true for consistency and to cover the common case? |
ca4c47f
to
1b03ad9
Compare
1b03ad9
to
8829be1
Compare
private final boolean excludeInternalTopics; | ||
|
||
static { | ||
INTERNAL_TOPICS.add("__consumer_offsets"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One issue with this is that older clients are meant to work with newer brokers and if additional internal topics are added, this could get out of sync.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure if it's a problem in practice, but I thought I'd mention it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, that would be a nicer way to do it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@ijuma @granthenke Thank you for the feedback. What I understand is we should wait for KIP-4 to merge so we can leverage the boolean field here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@granthenke Would you please make the JIRA ticket for this PR dependent on the proper sub-task of KIP-4 (that would implement the boolean field you mentioned)? That way I can update this PR as soon as the KIP-4 sub-task is finished. Thanks.
cc @hachikuji |
@@ -172,6 +172,10 @@ | |||
public static final String MAX_POLL_RECORDS_CONFIG = "max.poll.records"; | |||
private static final String MAX_POLL_RECORDS_DOC = "The maximum number of records returned in a single call to poll()."; | |||
|
|||
/** <code>exclude.internal.topics</code> */ | |||
public static final String EXCLUDE_INTERNAL_TOPICS_CONFIG = "exclude.internal.topics"; | |||
private static final String EXCLUDE_INTERNAL_TOPICS_DOC = "Whether messages from internal topics (such as offsets) should be exposed to the consumer."; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you clarify this description a little bit? It seems like it's still possible for the user to subscribe to internal topics explicitly. Also, don't we still return internal topics from the listTopics() API?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@hachikuji I actually copied the description from the old consumer config (here) to remain consistent. If "true" internal topics will not be exposed to the new consumer; if "false" they will.
With respect to listTopic() API nothing is changed, which is, if I'm not mistaken, internal topics are still returned.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I missed the part about "messages," so yeah, no confusion about the listTopics() API. I think it still may be worth clarifying that messages from internal topics will only be included if those topics are explicitly subscribed to.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One other minor note: for whatever reason, the new consumer uses the term "records" instead of "messages." We flop back and forth between these terms even in this class, but it would be nice to start to settle.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@hachikuji Sure, I'll update the text, and mention that if this config is "true" the only way to receive messages from internal topics is subscribing to them. Also, I'll replace references to "message" with "record". Thanks for the feedback.
Did the necessary updates get completed here? Re: default, I think we can view this as buggy behavior that nobody should be relying on, although I guess it could cause a bit of compatibility pain since the flag didn't previously exist. But the unexpected results of, e.g., consumer offsets being caught by MM is probably much worse. @ijuma Is this possibly a blocker for 0.10 instead of just major? Seems like a pretty important fix to get in now, before adoption of the new consumer really picks up. |
@ewencp I marked it as a blocker for now. What are your thoughts on the hard-coded approach used here versus getting the data from the server via the topic metadata? |
@ijuma Getting the list via topic metadata is definitely the right solution. If it depends on some other patch, the hard coded approach might be a good solution to address the immediate problem, and a follow up could always fill in the complete fix. |
I'll submit an update shortly to take care of the other comments on the PR. |
220dce0
to
0578214
Compare
Sorry if this was already discussed - but we have Topics.internalTopics which already contains a list of internal topics and it is used in about 3 places. We probably don't want to duplicate this information and since we can't have the new consumer depend on the scala objects, can we move this definition to o.a.k.common somewhere and refactor the scala code to use this list? Even if we can't use it in the server yet, we probably want to push the list outside the internal AbstractCoordinator and into somewhere in Common package, so it will be visible for reuse in future client-side tools (KIP-4 comes to mind...) |
Sounds good to me @gwenshap. |
0578214
to
09628f3
Compare
A new consumer config option 'exclude.internal.topics' was added to allow excluding internal topics when wildcards are used to specify consumers. The new option takes a boolean value, with a default 'false' value (i.e. no exclusion). This patch is co-authored with @rajinisivaram.
09628f3
to
c054153
Compare
@gwenshap @granthenke @ijuma @ewencp Thanks for your feedback. I updated the patch according to @gwenshap's suggestion. I hope the update is close to what she had in mind. |
I actually thought of something less extensive... I didn't intend to move GroupMetadataTopicName (although that also looks ok to me), what I did intend is to get rid of Topic.InternalTopics. |
public final class CommonDefs { | ||
// TODO: we store both group metadata and offset data here despite the topic name being offsets only | ||
public static final String GROUP_METADATA_TOPIC_NAME = "__consumer_offsets"; | ||
public static final Set<String> INTERNAL_TOPICS = Collections.unmodifiableSet(new HashSet<String>(Arrays.asList(GROUP_METADATA_TOPIC_NAME))); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we really want anything related to internal topics to be client side? This could change in brokers from version to version and the clients should still work. I understand that for now we have no way to get that information, but we will soon (KAFKA-3306). I imagine removing the client side list would be part of the cleanup once thats available. So whatever exists in the mean time should be private so we don't need a deprecation cycle.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, this should on an internal package (eg common.internals
).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@ijuma I assume you mean moving the whole class under common.internals
. I can do that.
@granthenke Sorry for the naive question. INTERNAL_TOPICS
is being used in ConsumerCoordinator
; so making it private would break how it's currently being used there. Making it private could also cause issues with implementing @gwenshap's comment to remove Topic.InternalTopics
and use this new INTERNAL_TOPICS
instead. Could you please clarify? Thanks.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@vahidhashemian, yes, that's what I mean.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@gwenshap meant that kafka.common.Topic.InternalTopics
should be removed in favour of the INTERNAL_TOPICS
defined in this PR.
@gwenshap You are right. We can remove |
/** | ||
* Common definitions used in client-side tools | ||
*/ | ||
public final class CommonDefs { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of the overly generic CommonDefs
, why not call this Topics
or something like that?
PR #1082 resumes this PR. |
For anyone else reading this later, the default is to IGNORE internal topics, not include them (contrary to what the top comment says). |
A new consumer config option 'exclude.internal.topics' was added to allow excluding internal topics when wildcards are used to specify consumers.
The new option takes a boolean value, with a default 'false' value (i.e. no exclusion).
This patch is co-authored with @rajinisivaram.