-
Notifications
You must be signed in to change notification settings - Fork 13.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FLINK-33360] HybridSourceSplitEnumerator clear finishedReaders when … #23593
Conversation
@flinkbot run azure |
Can we add a UT to reproduce the problem? |
assertThat(getCurrentSourceIndex(enumerator)) | ||
.as( | ||
"only reader-0 has finished reading, reader-1 is not yet done, so do not switch to the next source") | ||
.isEqualTo(1); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Before this PR modification, the assert will fail here, which means switching to the next source only after one reader completes reading
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @fengjiajie for the contribution and @Apache9 for the review, I left one comment.
...src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceSplitEnumerator.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @fengjiajie for the update, LGTM, wait the CI green.
CI passed, merging, @fengjiajie Would you like to backport the fix to |
… to next Enumerator This closes apache#23593.
… to next Enumerator This closes apache#23593.
Thanks @leonardBang @Apache9. I have already submitted the backport: |
…switchEnumerator
What is the purpose of the change
org.apache.flink.connector.base.source.hybrid.HybridSourceSplitEnumerator:
I think that finishedReaders is used to keep track of all the subTaskIds that have finished reading the current round of the source. Therefore, in the switchEnumerator function, finishedReaders should be cleared:
If it's not cleared, then in the next source reading, whenever any SourceReader reports a SourceReaderFinishedEvent (while other SourceReaders may not have finished processing in parallel), the condition finishedReaders.size() == context.currentParallelism() will be satisfied and it will trigger sendSwitchSourceEvent(i, currentSourceIndex), sending a SwitchSourceEvent to all SourceReaders.
If a SourceReader receives a SwitchSourceEvent before it finishes reading the previous source, it will execute currentReader.close(), and some data may not be fully read, resulting in a partial data loss in the source.
Brief change log
Fix the issue of HybridSource prematurely closing the reader and missing data.
Verifying this change
When each shard takes a long time to process, it may result in data loss. The fix has been validated in my scenario and proven effective.
Does this pull request potentially affect one of the following parts:
@Public(Evolving)
: (yes / no)Documentation