-
Notifications
You must be signed in to change notification settings - Fork 13.8k
[FLINK-28853][connector-base] Add FLIP-217 support for watermark alignment of source splits #20485
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
308488c to
2a4ae3e
Compare
mas-chen
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I did a quick first pass and still need to look deeper into the SplitFetcher changes. Lots of code changes! Do we have performance tests with analysis on the split fetcher changes, even without split alignment enabled?
| <td>Whether name of vertex includes topological index or not. When it is true, the name will have a prefix of index of the vertex, like '[vertex-0]Source: source'. It is false by default</td> | ||
| </tr> | ||
| <tr> | ||
| <td><h5>pipeline.watermark-alignment.allow-unaligned-source-splits</h5></td> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think allow-aligned-source-splits is more intuitive. The emphasis is on alignment, rather than unalignment like in checkpoints.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IMHO something like source.reader.synchronize.splits would be more understandable for users.
...main/java/org/apache/flink/connector/base/source/reader/fetcher/PauseOrResumeSplitsTask.java
Outdated
Show resolved
Hide resolved
flink-core/src/main/java/org/apache/flink/api/connector/source/SourceReader.java
Outdated
Show resolved
Hide resolved
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
Outdated
Show resolved
Hide resolved
...java/org/apache/flink/streaming/api/operators/SourceOperatorSplitWatermarkAlignmentTest.java
Outdated
Show resolved
Hide resolved
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
Show resolved
Hide resolved
|
|
||
| public void addPrefilledSplitsSingleReader(int numSplits, int numRecords) { | ||
| sourceReader.addSplits(createPrefilledSplits(numSplits, numRecords)); | ||
| sourceReader.notifyNoMoreSplits(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this a required assumption? We should also exercise the case where splits are dynamically discovered (e.g. Kafka source topic partition discovery).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is not required. The signal simply allows for a clean shutdown of the source.
...se/src/test/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherTest.java
Outdated
Show resolved
Hide resolved
mxm
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you @smattheis @AHeise @dawidwys! The changes definitely fill a gap in the source framework. There are a lot of changes here which are not always directly related to FLIP-217 like the refactoring of the threading and locking logic. Ideally, this could have done more incremental but I understand that was difficult due to the multiple authors.
I've left some comments and suggestions. All in all, the changes look good.
I wonder about the way the SourceOperator calls through the stack via pauseOrResumeSplits while also being called back with watermarks. My intuition would have been to do the pausing directly at the splits based on the current max watermark lag, e.g. by putting splits into a suspended mode where no more output is accepted from a split. I guess this could be tricky because the implementation of the source doesn't necessarily know it's paused and could timeout or otherwise misbehave. So it looks like the current implementation is simpler.
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
Show resolved
Hide resolved
...main/java/org/apache/flink/connector/base/source/reader/fetcher/PauseOrResumeSplitsTask.java
Outdated
Show resolved
Hide resolved
...main/java/org/apache/flink/connector/base/source/reader/fetcher/PauseOrResumeSplitsTask.java
Outdated
Show resolved
Hide resolved
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
Outdated
Show resolved
Hide resolved
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
Outdated
Show resolved
Hide resolved
.../src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReader.java
Show resolved
Hide resolved
...n/java/org/apache/flink/connector/kafka/source/reader/fetcher/KafkaSourceFetcherManager.java
Show resolved
Hide resolved
...va/org/apache/flink/connector/pulsar/source/reader/split/PulsarPartitionSplitReaderBase.java
Outdated
Show resolved
Hide resolved
| Collection<PulsarPartitionSplit> splitsToPause, | ||
| Collection<PulsarPartitionSplit> splitsToResume) { | ||
| if (splitsToPause.size() > 1 || splitsToResume.size() > 1) { | ||
| throw new IllegalStateException("This pulsar split reader only support one split."); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't understand why we can't pause single splits. Even if so, I don't know why that should leak to the Pulsar reader.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good question...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Have you found an answer for that @mxm ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, Pulsar splits are always started in a separate fetcher thread. We still want to be able to pause these split threads. So the code makes sense.
...java/org/apache/flink/streaming/api/operators/SourceOperatorSplitWatermarkAlignmentTest.java
Show resolved
Hide resolved
|
Any thoughts on how and when to proceed with this PR? We'd like to help to bring this over the finish line. |
Currently Splits are just pojos, data carriers, without much logic. If I understand you correctly, what you are suggesting would require us to push some availability logic down to the splits and it would still require handling of this availability in the Given that most of the users are expected to just use
Indeed there is an issue now, that I think @smattheis doesn't work on Flink anymore, so he can not finish this PR. I will go through the PR myself and do a quick pass/respond to the inlined questions from you @mxm and @mas-chen , but it would be great if someone could take over applying the actual changes after the review. Would you be up to do that @mxm ? |
...r-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcher.java
Show resolved
Hide resolved
...src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherManager.java
Outdated
Show resolved
Hide resolved
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
Show resolved
Hide resolved
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
Show resolved
Hide resolved
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
Show resolved
Hide resolved
.../src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReader.java
Show resolved
Hide resolved
| Collection<PulsarPartitionSplit> splitsToPause, | ||
| Collection<PulsarPartitionSplit> splitsToResume) { | ||
| if (splitsToPause.size() > 1 || splitsToResume.size() > 1) { | ||
| throw new IllegalStateException("This pulsar split reader only support one split."); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good question...
flink-core/src/main/java/org/apache/flink/configuration/PipelineOptions.java
Show resolved
Hide resolved
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
Outdated
Show resolved
Hide resolved
mxm
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@pnowojski Thank you for the comments. If nobody of the authors currently has time to work on this, I can take over the PR for now. I'd be glad to get another review from @mas-chen and you once I've addressed the comments.
pnowojski
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If nobody of the authors currently has time to work on this, I can take over the PR for now.
That would be great :)
flink-core/src/main/java/org/apache/flink/configuration/PipelineOptions.java
Show resolved
Hide resolved
flink-core/src/main/java/org/apache/flink/configuration/PipelineOptions.java
Show resolved
Hide resolved
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
Show resolved
Hide resolved
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
Show resolved
Hide resolved
flink-core/src/main/java/org/apache/flink/api/connector/source/SourceReader.java
Outdated
Show resolved
Hide resolved
...ase/src/main/java/org/apache/flink/connector/base/source/reader/splitreader/SplitReader.java
Outdated
Show resolved
Hide resolved
flink-core/src/main/java/org/apache/flink/configuration/PipelineOptions.java
Show resolved
Hide resolved
f9b2e3b to
a9246bd
Compare
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
Show resolved
Hide resolved
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the update @mxm , I've left a couple of minor comments.
The next question would be how to squash the commits? If you want to preserve authorship of your changes @mxm, without overwriting authors of the base commits, we would need either a single fixup commit from you, all one your fixup per original commit.
flink-core/src/main/java/org/apache/flink/api/connector/source/SourceReader.java
Outdated
Show resolved
Hide resolved
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
Outdated
Show resolved
Hide resolved
I'd prefer this to be a single suqashed commit with the use of Co-Authored-By. IMHO there is no benefit in merging the individual commits because they are very much tight to each other. This keeps the history clean while retaining authorship. What do you think? |
I wouldn't squash all commits into one. The original commits authored by Sebastian, Dawid and Arvid I think are nicely separated. But sure, squashing your fixups to those commits and co-authoring them is also perfectly fine. |
flink-core/src/main/java/org/apache/flink/api/common/eventtime/CombinedWatermarkStatus.java
Outdated
Show resolved
Hide resolved
If the commits are nicely separated, why were they not handled in separate PRs? Anyhow, I'm ok with just putting my changes in a single commit on top of those. |
cd9ce51 to
c2408e3
Compare
|
@flinkbot run azure |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We are often splitting larger features into smaller commits for the sake of easier reviewing. As long as commits are independently working, and coherent (one need to find a sensible division) that's fine.
It gets more complicated in those rare cases where there are various authors, especially if someone takes over an orphaned PR. 🤷
LGTM assuming green azure.
Thanks a lot @mxm for driving this forward! Much appreciated. Thank you @dawidwys, @AHeise , @smattheis for your contributions as well!
|
@mxm Thanks a lot also from my side for you contributions. However, @pnowojski this PR was not orphaned! (I wrote to you that I'm on vacations.) |
|
Thank you @pnowojski for your thoughtful comments and guidance. And of course thanks to the original authors @AHeise @dawidwys @smattheis. Whoops @smattheis I had assumed you wouldn't continue on this PR but I think no harm has been done. Let me know if anything is open on your side. It still need to revise the code with respect to a test failure. |
Yes, sorry for the mental shortcut. There was a delay in the communication between first my vacations and then yours. Around the time @mxm was asking how to continue with this PR, you @smattheis were not responding, and I got a message from you that you can work on it only after @mxm has already volunteered to take it over. After that I didn't want to create more confusion. But sorry for my inaccurate statement that it was "orphaned" :) |
|
@mxm @pnowojski All good. Thanks for driving it. |
ff462cd to
22db9b0
Compare
|
(Rebased) |
|
@flinkbot run azure |
|
There are still some related test failures:
and
|
|
Thanks @pnowojski. Will fix those. On another note, do you think it makes sense to add a configuration option to disable using split alignment entirely? I'm a bit worried this feature might cause issues with the already integrated Kafka/Pulsar sources that we are not foreseeing right now. The added configuration option only allows to disable split alignments for non-compatible connectors. We could add another option to disable it entirely to retain the old behavior (disabled by default). |
22db9b0 to
d6aed81
Compare
I had to remove the |
d6aed81 to
546fc79
Compare
|
@flinkbot run azure |
Does it make sense to use and enable watermark alignment via |
Instead of mixing 4 different concepts of synchronization, this commit aims to have one lock for everything and go through with it.
When there is only one SplitFetcher per split, the split advanced too much, and aligned event time is enabled, then the complete SplitFetcher can be paused by SplitFetcherManager#alignSplits, such that temporarily no more records are read from the respective split.
…dual splits This provides support for pausing and resuming individual splits in SourceReaderBase. This is used to align individual splits based on emitted watermarks in SourceOperator.
546fc79 to
991230a
Compare
|
Fair point, users may modify the job to disable alignment. It doesn't make sense to override the watermark alignment settings from the config. But the same argument applies to the other configuration option to disable split alignment for not-ready sources. I think it would make sense to provide a transitory option to disable split alignment altogether until this feature has proven to be stable. |
|
@flinkbot run azure |
|
Tests are passing now. |
|
Thank you everyone! |
|
Thank you for helping getting it over the line! @mxm |
|
Exciting to see this make the finish line! This was a long awaited feature (we implemented alignment for the Kinesis source ~4 years ago). Let's see how it holds up in production use cases! Thanks @mxm @pnowojski @smattheis @dawidwys, @AHeise for your contributions! |
|
Yes, it took longer than it should have. Good that it's finally been merged 🎉 |
What is the purpose of the change
Brief change log
SplitFetcherthreading model and add support to pause/resumeSplitFetcherSourceOperatorVerifying this change
This change added and extended tests and can be verified as follows:
SplitFetcherTestextended to test pause/resume ofSplitFetcherSourceOperatorSplitWatermarkAlignmentTestadded to test split alignment based on split reader watermarksSplitFetcherPauseResumeSplitReaderITCaseadded to test pause/resume of individual split readersDoes this pull request potentially affect one of the following parts:
@Public(Evolving): yes (according to FLIP-217)Documentation