[FLINK-4022] [kafka] Partition and topic pattern discovery for FlinkKafkaConsumer #3476
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
This PR adds the required internals to allow partition and topic regex pattern discovery in the
FlinkKafkaConsumer
.It doesn't expose a new constructor that accepts regex topic patterns yet. I propose to expose that with https://issues.apache.org/jira/browse/FLINK-5704 (deprecate the original
FlinkKafkaConsumer
constructors in favor of new ones with new offset behaviours). For this reason, I also propose to update the Kafka documentation when the new constructors are added.Design
Some description to ease review:
AbstractPartitionDiscoverer
:A
AbstractPartitionDiscoverer
is a stateful utility instance that remembers what partitions are discovered already. It also wraps the logic for partition-to-subtask assignment. The mainrun()
method now has a discovery loop that callsAbstractPartitionDiscoverer#discoverPartitions()
on a fixed interval. This method returns only new partitions that should be subscribed by the subtask.The returned partitions are used to invoke
AbstractFetcher#addDiscoveredPartitions(...)
on the fetcher.On a fresh startup,
AbstractPartitionDiscoverer#discoverPartitions()
is also used to fetch the initial seed startup partitions inopen()
.AbstractFetcher#addDiscoveredPartitions(...)
The fetcher now has a
unassignedPartitionsQueue
that contains discovered partitions not yet consumed by concrete Kafka clients. WheneveraddDiscoveredPartitions(...)
is called on the fetcher, the fetcher will create the state holders for the partitions, and add the partitions to the queue.Concrete implementations of the fetcher should continuously poll this queue in the fetch loop. If partitions are found from the queue, they should be assigned for consuming.
Concrete fetchers continuously polls the queue in
runFetchLoop()
For 0.8, this simply means that the original
unassignedPartitionsQueue
inKafka08Fetcher
is moved to the base abstract fetcher class. Nothing else is touched.For 0.9+, queue polling and partition reassignment for the high-level consumer happens in
KafkaConsumerThread
.TODOs
This PR serves as a preview for the new functionality and additional internals. Below are some pending TODOs.
Currently, partition discovery will not work correctly after restore. The reason for this is explained with
TODO
comments within theFlinkKafkaConsumerBase#open()
method. For this to work correctly, @rmetzger and I are considering 2 options: 1) use broadcast state, or 2) assign partitions usingmaxParallelism
andassignedKeyGroupIds
instead of subtask index / number of subtasks.The PR still lacks exactly-once integration tests with Kafka repartitioning / dynamic topics.