-
Notifications
You must be signed in to change notification settings - Fork 13.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-14365: Extract common logic from Fetcher #13425
KAFKA-14365: Extract common logic from Fetcher #13425
Conversation
Extract logic from Fetcher into AbstractFetcher. Also introduce FetchConfig as a more concise way to delineate state from incoming configuration. Formalized the defaults in CommonClientConfigs and ConsumerConfig to be accessible elsewhere.
@guozhangwang @hachikuji @rajinisivaram @philipnee This is ready for a review from whomever has the time. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall looks good to me. Except the question for synchronizing the super's functions I think we can move forward to merge.
clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
Show resolved
Hide resolved
@@ -797,24 +799,15 @@ public KafkaConsumer(Map<String, Object> configs, | |||
config.getBoolean(ConsumerConfig.THROW_ON_FETCH_STABLE_OFFSET_UNSUPPORTED), | |||
config.getString(ConsumerConfig.CLIENT_RACK_CONFIG)); | |||
} | |||
FetchConfig<K, V> fetchConfig = new FetchConfig<>(config, keyDeserializer, valueDeserializer, isolationLevel); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a nice improvement, +1!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
FetchConfig<K, V> fetchConfig = new FetchConfig<>(config, keyDeserializer, valueDeserializer, isolationLevel); | |
FetchConfig<K, V> fetchConfig = new FetchConfig<>(config, this.keyDeserializer, this.valueDeserializer, isolationLevel); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@kirktrue , these should be this.keyDeserializer
and this.valueDeserializer
. Otherwise the original values may be null and will cause an NPE on line 300 of CompletedFetch.java
clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
Outdated
Show resolved
Hide resolved
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractFetch.java
Show resolved
Hide resolved
* | ||
* @return true if there are completed fetches, false otherwise | ||
*/ | ||
boolean hasCompletedFetches() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I made a quick browse on this class assuming that all the logic are not changed, i.e. mostly copy-paste here except the FetchConfig wrapping. Please LMK otherwise and I will give it a deeper look.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I intentionally did my best to keep the code I moved from Fetcher
into AbstractFetch
identical. There was some minor refactoring which could technically introduce bugs.
I've checked the failed tests, which are irrelevant. |
LGTM, merged to trunk. |
@guozhangwang , @kirktrue , this PR is causing NPEs downstream. I've added a comment above that indicates the problematic lines. Thanks. |
The
Fetcher
class is used internally by theKafkaConsumer
to fetch records from the brokers. There is ongoing work to create a new consumer implementation with a significantly refactored threading model. The threading refactor work requires a similarly refactoredFetcher
.This task includes refactoring
Fetcher
by extracting out some common logic to allow forthcoming implementations to leverage it.Fetcher
intoAbstractFetch
FetchConfig
as a concise way to delineate incoming configuration from stateCommonClientConfigs
andConsumerConfig
to be accessible elsewhereCommitter Checklist (excluded from commit message)