New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FLINK-15467][task] Wait for invokable cancellation when stopping Task #12714
[FLINK-15467][task] Wait for invokable cancellation when stopping Task #12714
Conversation
Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community Automated ChecksLast check on commit 5d78f3b (Fri Jun 19 07:35:35 UTC 2020) Warnings:
Mention the bot in a comment to re-run the automated checks. Review Progress
Please see the Pull Request Review Guide for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commandsThe @flinkbot bot supports the following commands:
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the bug fix, I've left couple of comments & questions.
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
Outdated
Show resolved
Hide resolved
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
Show resolved
Hide resolved
flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
Outdated
Show resolved
Hide resolved
flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
Outdated
Show resolved
Hide resolved
flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
Outdated
Show resolved
Hide resolved
5d78f3b
to
35c875a
Compare
Thanks for the review @pnowojski, I've addressed your comments (1 unresolved). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the quick update. Left one comment, besides I think LGTM.
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
Show resolved
Hide resolved
flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
Show resolved
Hide resolved
35c875a
to
2a05ea5
Compare
flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
Outdated
Show resolved
Hide resolved
@@ -139,6 +142,33 @@ public void testRegularExecution() throws Exception { | |||
taskManagerActions.validateListenerMessage(ExecutionState.FINISHED, task, null); | |||
} | |||
|
|||
@Test | |||
public void testTaskTerminatesOnlyAfterInvokable() throws Exception { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it covering for the user reported bug? It's not using SourceStreamTask
anywhere, so it looks like it's not? If that's the case, couldn't we add a test in SourceStreamTaskTest
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added SourceStreamTaskTest.testWatForCancelCompletion
.
For that, I extracted SourceStreamTask.LegacySourceFunctionRunner
interface in a preceding hotfix commit.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm.
extracted SourceStreamTask.LegacySourceFunctionRunner interface in a preceding hotfix commit.
this commit is not super pretty, maybe we could make it a bit better, but I'm not sure if that's the right direction.
SourceStreamTaskTest.testWatForCancelCompletion
this doesn't seem to be testing much, it's almost a no-op test for passing the returned value of one getter. It's definitely not testing for the reported bug.
What about creating a mock source that waits on a latch in a loop ignoring interrupts and the test thread fires cancellation & ensures task is not completed? Setup should be quite easy like in SourceStreamTaskTest#finishingIgnoresExceptions
? What do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's definitely not testing for the reported bug.
A proper test for the reported bug would be an end-to-end test involving running a remote cluster, syncing source thread with class unloading and then checking the logs.
I think we agreed offline that such automated test would be an overkill and could rather be done manually. And the rest should be covered with unit tests.
Now, this unit test checks that SourceStreamTask
returns completion future of its thread.
This is what changed - and this is what I think should be tested.
If we also want to test LegacySourceFunctionThread
future completion, it can be done independently now after extraction (however, I think this is out of the scope of this PR).
Furthermore, it's not possible to reliably test that a program does not perform some action if there is no time boundary (in this case test that task never completes if source thread never completes).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, e2e test would be an overkill, but semi integration test in SourceStreamTask
seems to be quite easy.
it's not possible to reliably test that a program does not perform some action if there is no time boundary (in this case test that task never completes if source thread never completes).
What's the problem with waiting on the latch as I proposed? It should provide automated test coverage for that bug.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What's the problem with waiting on the latch as I proposed?
For how long should the test thread wait on the latch (assuming SourceStreamTask.sourceThread
never exits)?
Or did I misunderstood your proposal?
It should provide automated test coverage for that bug.
It provides only partial coverage because it doesn't cover integration with Task
, StreamTask
and the code that actually unloads the classes. So in terms of coverage, it's equivalent to a set of unit tests.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I meant that source thread, running mock source, would be waiting on the latch (with some timeout), not the test thread. Test thread would:
- trigger cancellation
- maybe wait 10ms
- assert source task's thread hasn't completed
- release the latch
- assert that source task has completed (with a timeout)
It provides only partial coverage because it doesn't cover integration with Task, StreamTask and the code that actually unloads the classes. So in terms of coverage, it's equivalent to a set of unit tests.
It would provide a coverage for task thread shutting down prematurely, before source thread completes.
eaf54b8
to
1d9d6ab
Compare
…terface to facilitate testing
Task.invokableCancelFuture is added instead of just joining the source thread in cancelTask. This is because cancelTask can be called from TaskCanceller thread, while resources are freed by Task.executingThread or upon it's death.
1d9d6ab
to
0d85f9e
Compare
Superseded by #13000. |
What is the purpose of the change
Wait for the
LegacySourceFunctionThread
to finish when cancelling the task.This prevents premature freeing of resources, particularly LibraryCache (see FLINK-15467 discussion).
Implemented by adding
Task. invokableCancelFuture
instead of just joining the source thread incancelTask
. This is becausecancelTask
can be called from TaskCanceller, while resources are freed byTask.executingThread
or upon its death.Brief change log
There are three commits that should be squashed after reviewing:
StreamTask.cancelTask
and finishTask - only change of signatures except forSourceStreamTask
AbstractInvokable
cancel - only change of signatures except forStreamTask
Task.cancelInvokable
for completionVerifying this change
TaskTest.testTaskTerminatesOnlyAfterInvokable
Does this pull request potentially affect one of the following parts:
@Public(Evolving)
: noDocumentation