Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-7317: Use collections subscription for main consumer to reduce metadata #7969

Merged
merged 6 commits into from
Jan 24, 2020

Conversation

ableegoldman
Copy link
Contributor

Also addresses KAFKA-8821

Note that we still have to fall back to using pattern subscription if the user has added any regex-based source nodes to the topology. Includes some minor cleanup on the side

@@ -87,10 +85,6 @@
// map from sink processor names to sink topic (without application-id prefix for internal topics)
private final Map<String, String> nodeToSinkTopic = new HashMap<>();

// map from topics to their matched regex patterns, this is to ensure one topic is passed through on source node
// even if it can be matched by multiple regex patterns
private final Map<String, Pattern> topicToPatterns = new HashMap<>();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is just moved since it's only used by a single class

final List<String> sourceTopics = ((SourceNodeFactory) nodeFactories.get(nodeName))
.getTopics(subscriptionUpdates);
nodeToSourceTopics.put(nodeName, sourceTopics);
sourceTopicNames.addAll(sourceTopics);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also update sourceTopicNames to keep in sync with nodeToSourceTopics

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to do this at this point? I guess so at it makes sense to have sourceTopicNames match what's in nodeToSourceTopics. I'm only asking as we never had this before and I'm curious as to why.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well I did switch #sourceTopicPattern and the new #sourceTopicCollection to use sourceTopicNames instead of nodeToSourceTopics.values, in which case we do still need to do this.
It shouldn't matter, although I will say the hardest part of working with this code/class was figuring out which data structures did/did not contain what contents or updates. I'm inclined to leave this in so that the two similar data structures are kept in sync, but I'm fine with removing it just to keep the changes minimal/necessary

if (topicPattern == null) {
final List<String> allSourceTopics = new ArrayList<>();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can be simplified by just using sourceTopicNames, which is identical to nodeToSourceTopics.values() but without the global topics, which we remove

}

synchronized void updateSubscribedTopics(final Set<String> topics,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The SubscriptionUpdates class and various updateXXX methods were unnecessarily complex given they all boiled down to just the 4 lines of actual actions below

}

private void subscribeConsumer() {
if (builder.usesPatternSubscription()) {
Copy link
Contributor Author

@ableegoldman ableegoldman Jan 16, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the actual fix; if the user has not themselves added pattern source topics we will go back to using regular subscription (having safely disabled auto topic creation)

Copy link
Contributor

@bbejeck bbejeck left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR @ableegoldman. I've made a quick pass.
The build didn't run but when I ran the tests locally I get some checkstyle failures for InternalTopologyBuilder.

@@ -1064,6 +1064,9 @@ private void checkIfUnexpectedUserSpecifiedConsumerConfig(final Map<String, Obje
consumerProps.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, StreamsPartitionAssignor.class.getName());
consumerProps.put(WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG, getLong(WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG));

// disable auto topic creation
consumerProps.put(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG, "false");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

qq - what does this mean for users that have enabled auto topic creation? Although it's not a best practice, this seems it could lead to unexpected behavior.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you mean on the server side or on the client? I agree we should consider checking for this in the user supplied configs before overriding it, but I think the reasonable default behavior is to disable it (by client config, since it's enabled server-side by default).
If users really do want it then it's on them to enable it through the main consumer config. On the other hand, prior to this it was effectively disabled permanently by the workaround of using pattern subscription. WDYT?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, that SGTM I forgot that auto topic creation is the default server-side.

@bbejeck
Copy link
Contributor

bbejeck commented Jan 16, 2020

Retest this please.

@bbejeck
Copy link
Contributor

bbejeck commented Jan 16, 2020

Strange, the PR builder didn't run on PR submission but I was able to trigger the build with "Retest..."

@bbejeck
Copy link
Contributor

bbejeck commented Jan 18, 2020

@ableegoldman failures are related - checkstyle failures.

@bbejeck
Copy link
Contributor

bbejeck commented Jan 22, 2020

retest this please

Copy link
Contributor

@bbejeck bbejeck left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for this fix @ableegoldman. LGTM, I only have minor comments. We need to get a build running before merging though.

builder.addSource(null, "source-1", null, null, null, "topic-foo");
builder.addSource(null, "source-2", null, null, null, Pattern.compile("topic-[A-C]"));
builder.addSource(null, "source-3", null, null, null, Pattern.compile("topic-\\d"));

final InternalTopologyBuilder.SubscriptionUpdates subscriptionUpdates = new InternalTopologyBuilder.SubscriptionUpdates();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice to get rid of the use of reflection in the tests here and below

Copy link
Contributor Author

@ableegoldman ableegoldman Jan 23, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

😄Agreed!

@@ -215,6 +212,10 @@ Processor describe() {
}
}

// Map from topics to their matched regex patterns, this is to ensure one topic is passed through on source node
// even if it can be matched by multiple regex patterns. Only used by SourceNodeFactory
private final Map<String, Pattern> topicToPatterns = new HashMap<>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think in a previous commit this was static and I had concerns that it wouldn't work, so I'm glad to see you changed this.

if (hasSubscriptionUpdates()) {
for (final String nodeName : nodeToSourcePatterns.keySet()) {
//need to update nodeToSourceTopics and sourceTopicNames with topics matched from given regex
final List<String> sourceTopics = ((SourceNodeFactory) nodeFactories.get(nodeName))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I understand the change here for compactness, but I find it a little hard to follow. This is subjective however so feel free to keep as is.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I kind of thought this way was easier to understand, but I did go back and forth on it. I'm happy to change it back

final List<String> sourceTopics = ((SourceNodeFactory) nodeFactories.get(nodeName))
.getTopics(subscriptionUpdates);
nodeToSourceTopics.put(nodeName, sourceTopics);
sourceTopicNames.addAll(sourceTopics);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to do this at this point? I guess so at it makes sense to have sourceTopicNames match what's in nodeToSourceTopics. I'm only asking as we never had this before and I'm curious as to why.

@bbejeck
Copy link
Contributor

bbejeck commented Jan 22, 2020

Retest this please.

1 similar comment
@bbejeck
Copy link
Contributor

bbejeck commented Jan 22, 2020

Retest this please.

@bbejeck
Copy link
Contributor

bbejeck commented Jan 22, 2020

Java 8 build https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/321/
Java 11 build https://builds.apache.org/job/kafka-pr-jdk11-scala2.13/4333/

For some reason GitHub page not always updating when build is kicked off

EDIT: Builds just showed up now. ???

@ableegoldman
Copy link
Contributor Author

retest this please

@bbejeck
Copy link
Contributor

bbejeck commented Jan 23, 2020

Retest this please.

@bbejeck
Copy link
Contributor

bbejeck commented Jan 23, 2020

Java 8 passed
Java 11 failed with Build step 'Execute shell' marked build as failure

Retest this please.

@bbejeck
Copy link
Contributor

bbejeck commented Jan 24, 2020

Retest this please.

@bbejeck
Copy link
Contributor

bbejeck commented Jan 24, 2020

Java 8 failed with

20:39:56 # There is insufficient memory for the Java Runtime Environment to continue.
20:39:56 # Cannot create GC thread. Out of system resources.

Java 11 passed https://builds.apache.org/view/All/job/kafka-pr-jdk11-scala2.13/4355/

@bbejeck
Copy link
Contributor

bbejeck commented Jan 24, 2020

Java 11 failed with kafka.admin.DeleteConsumerGroupsTest.testDeleteCmdAllGroups
Java 8 passed

@bbejeck
Copy link
Contributor

bbejeck commented Jan 24, 2020

Retest this please.

@bbejeck
Copy link
Contributor

bbejeck commented Jan 24, 2020

Java 8 on previous PR build passed. On this run it failed on an environmental issue Caused by: java.lang.OutOfMemoryError: unable to create new native thread

Java 11 passed this run.

So I'm merging this PR.

@bbejeck bbejeck merged commit 57b2f68 into apache:trunk Jan 24, 2020
@bbejeck
Copy link
Contributor

bbejeck commented Jan 24, 2020

Merged #7969 into trunk.

}

synchronized void updateSubscribedTopics(final Set<String> topics,
final String logPrefix) {
log.debug("{}found {} topics possibly matching subscription", logPrefix, topics);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: missing space between logPrefix and 'found'

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

topics is a Set.
What's your intention for the second parameter ?
If you want the number of topics logged, you should use topics.size().

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The spacing kept bothering me too but it's actually correct (logPrefix has a space at the end) -- we should actually clean up this class at some point to use log = new LogContext(logPrefix).logger(getClass()); rather than explicitly insert the prefix everywhere.
I didn't write this log message, just moved things around, but I think the intention was to list the actual topics not just the size. I probably could've improved the wording of it though

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since topics Set can be quite large, I doubt the intention was to show the contents.
'{} topics' reads like the count of entries should be shown.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well the topics set should only include the matching topics that the Streams app is actually subscribed to, but I agree the wording suggests it should be the size/count. Since this PR is already merged do you want to submit a quick follow-up PR?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Created #8005

subscriptionUpdates.clear();
subscriptionUpdates.addAll(topics);

log.debug("{}updating builder with {} topic(s) with possible matching regex subscription(s)",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar comment as above two.

ijuma added a commit to confluentinc/kafka that referenced this pull request Jan 25, 2020
Conflicts and/or compiler errors due to the fact that we
temporarily reverted the commit that removes
Scala 2.11 support:

* Exit.scala: replace SAMs with anonymous inner classes.
* MiniKdc.scala: take upstream changes.

# By A. Sophie Blee-Goldman (1) and others
# Via Jason Gustafson
* apache-github/trunk:
  KAFKA-9254; Overridden topic configs are reset after dynamic default change (apache#7870)
  MINOR: MiniKdc JVM shutdown hook fix (apache#7946)
  KAFKA-9152; Improve Sensor Retrieval (apache#7928)
  Correct exception message in DistributedHerder (apache#7995)
  KAFKA-7317: Use collections subscription for main consumer to reduce metadata (apache#7969)
  KAFKA-9181; Maintain clean separation between local and group subscriptions in consumer's SubscriptionState (apache#7941)
  KAFKA-7737; Use single path in producer for initializing the producerId (apache#7920)

# Conflicts:
#	core/src/test/scala/kafka/security/minikdc/MiniKdc.scala
@ableegoldman ableegoldman deleted the 7317-use-regular-subscription branch June 26, 2020 22:38
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants