-
Notifications
You must be signed in to change notification settings - Fork 13.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
(1.11) [FLINK-21132][runtime] Don't end input on stop with savepoint #14814
Conversation
Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community Automated ChecksLast check on commit 71e3f69 (Fri May 28 08:16:53 UTC 2021) Warnings:
Mention the bot in a comment to re-run the automated checks. Review Progress
Please see the Pull Request Review Guide for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commandsThe @flinkbot bot supports the following commands:
|
bc2277c
to
53237d3
Compare
d77d30a
to
123e752
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the fix @rkhachatryan . As we discussed in the Jira ticket, I guess this is the best approach for the time being. I've left a couple of smaller comments.
One meta comment. First we added two test cases in SavepointITCase
next to test network tasks better, we added a couple of more in StreamTaskTest
. How much are those tests overlapping? Maybe we should have all end of input tests on the test harness level?
...-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
Outdated
Show resolved
Hide resolved
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
Show resolved
Hide resolved
flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
Outdated
Show resolved
Hide resolved
EndOfInput was used to handle any stopping of the job. When stopping with savepoint the input is not actually ended. This causes issues with some sinks (e.g. Iceberg). With this change, endInput is not call for stop-with-savepoint. To differentiate stop-with-savepoint from other cases only checkpoint (RPC/barriers) are considered and not network EOP. That's enough because EOP is only injected after the CP completion (i.e. when the downstream is also notified by sync savepoint by CP barriers).
…h chaining strategy
c4c98aa
to
b458c15
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for reviewing @pnowojski !
The test failure was unrelated (FLINK-21216), rebased.
While debugging, I found a missing synchronization in StreamSourceTask
(see below).
PTAL.
...-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the changes! Can you squash the fixup commits?
06de5ea
to
71e3f69
Compare
What is the purpose of the change
(1.11 backport)
Verifying this change
SavepointITCase
(testStopSavepointWithBoundedInput
,testStopSavepointWithBoundedInputConcurrently
)StreamTaskTest
(testSyncSavepointAborted
,testSyncSavepointAbortedAsync
,testSyncSavepointCompleted
)Does this pull request potentially affect one of the following parts:
@Public(Evolving)
: yes (endInput
is not called anymore for stop-with-savepoint)Documentation