Skip to content

Conversation

@azagrebin
Copy link
Contributor

@azagrebin azagrebin commented Jul 10, 2019

What is the purpose of the change

Add a unit test to #7757.

Currently, the TaskExecutor does not properly wait for the termination of Tasks when it shuts down in TaskExecutor#onStop. This patch changes TaskExecutor#onStop to fail all running tasks and wait for their termination before stopping all services.

Brief change log

  • add TaskCompletionTracker to track task termination futures in TaskExecutor
  • try to fail all running tasks and wait for their termination before stopping all services
  • add TaskExecutorTest.testTaskInterruptionAndTerminationOnShutdown

Verifying this change

run TaskExecutorTest.testTaskInterruptionAndTerminationOnShutdown

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): (no)
  • The public API, i.e., is any changed class annotated with @Public(Evolving): (no)
  • The serializers: (no)
  • The runtime per-record code paths (performance sensitive): (no)
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  • The S3 file system connector: (no)

Documentation

  • Does this pull request introduce a new feature? (no)
  • If yes, how is the feature documented? (not applicable)

@flinkbot
Copy link
Collaborator

flinkbot commented Jul 10, 2019

Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community
to review your pull request. We will use this comment to track the progress of the review.

Automated Checks

Last check on commit 181a7e4 (Fri Aug 23 10:20:12 UTC 2019)

Warnings:

  • No documentation files were touched! Remember to keep the Flink docs up to date!

Mention the bot in a comment to re-run the automated checks.

Review Progress

  • ❓ 1. The [description] looks good.
  • ❓ 2. There is [consensus] that the contribution should go into to Flink.
  • ❗ 3. Needs [attention] from.
  • ❓ 4. The change fits into the overall [architecture].
  • ❓ 5. Overall code [quality] is good.

Please see the Pull Request Review Guide for a full explanation of the review process.

Details
The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands
The @flinkbot bot supports the following commands:

  • @flinkbot approve description to approve one or more aspects (aspects: description, consensus, architecture and quality)
  • @flinkbot approve all to approve all aspects
  • @flinkbot approve-until architecture to approve everything until architecture
  • @flinkbot attention @username1 [@username2 ..] to require somebody's attention
  • @flinkbot disapprove architecture to remove an approval you gave earlier

@azagrebin
Copy link
Contributor Author

@flinkbot attention @tillrohrmann @kisimple

@flinkbot
Copy link
Collaborator

CI report for commit 29515b7: SUCCESS Build

@flinkbot
Copy link
Collaborator

CI report for commit cd5ad8d: SUCCESS Build

@ifndef-SleePy
Copy link
Contributor

Hi @azagrebin ,

I'm following the unstable case https://issues.apache.org/jira/browse/FLINK-11631 which is relevant to this PR.

I guess you have taken over the old PR #7757. I have a concern same with @tillrohrmann , there are two different tracking ways of task lifecycle. One is unregisterTaskAndNotifyFinalState and TaskSlotTable, the other is TaskCompletionTracker.

I think this PR could work well at the moment. I'm just afraid we might encounter some subtle corner cases in the future. Maybe we should unify these two lifecycle tracking ways. Or we could merge this PR first, then think about unification. What do you think?

@flinkbot
Copy link
Collaborator

flinkbot commented Jul 17, 2019

CI report:

@azagrebin
Copy link
Contributor Author

Hi @ifndef-SleePy
If you mean this concern #7757 (review) about forgetting to wait for some tasks, it is because the old PR used TaskSlotTable to get uncompleted tasks at some point of time. The problem is that TaskSlotTable might not always track tasks synchronised with their completion futures as it addresses other concerns and there are can be race conditions. I also left more explanation about it on the old PR #7757 (comment). This is the reason why the old PR was changed and this PR uses a separate tracking which explicitly removes the task only if it is completed. The completion future is always at the very end of Task.run and is called always on Task.run exit to not miss any action in Task.run (in this case release of resources is important).

@ifndef-SleePy
Copy link
Contributor

Hi @azagrebin
Thank you for explanation. Nice work to take the old PR over!
Anyway, for this PR, it looks good to me.

Copy link
Contributor

@tillrohrmann tillrohrmann left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for opening this PR @azagrebin. Overall the idea looks good. I had some comments concerning concurrency and how to test this feature. Would be great if you could take a look at my comments.

}
final Throwable throwableBeforeTasksCompletion = jobManagerDisconnectThrowable;
return taskCompletionTracker
.failIncompleteTasksAndWaitForAllCompleted()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's rename this method into failIncompleteTasksAndGetTerminationFuture()

private final Map<ExecutionAttemptID, Task> incompleteTasks;

private TaskCompletionTracker() {
incompleteTasks = new ConcurrentHashMap<>(8);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need a ConcurrentHashMap? Where does the concurrency come from?


void trackTaskCompletion(Task task) {
incompleteTasks.put(task.getExecutionId(), task);
task.getTerminationFuture().thenRun(() -> incompleteTasks.remove(task.getExecutionId()));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could we also rely on TaskExecutor#unregisterTaskAndNotifyFinalState to remove the task from the TaskCompletionTracker? That way we would not apply concurrent changes to incompleteTasks.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is also my concern. We already have TaskExecutor#unregisterTaskAndNotifyFinalState and Task#getTerminationFuture. I think we should unify them somehow.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Andrey explained to me that we call unregisterTaskAndNotifyFinalState before the clean up of the Task has completed. I think this is a problem in the Task's lifecycle. Hence, one could fix this as a follow up and then simply use the TerminationFuture to signal the final state to the TaskExecutor. But this is out of scope for this PR.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, Andrey has explained to me before.
Never mind, it's just a mention when I saw this comment. I agree that we could follow up on it later.
BTW, we could also avoid concurrent modifications through thenRunAsync with mainThreadExecutor if we prefer relying on task.getTerminationFuture().

try {
taskExecutor.start();
taskSlotTableStarted.get();
taskSlotTable.allocateSlot(0, jobId, allocationId, timeout);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is dangerous, as we are mutating state from the testing thread which is actually owned by the TaskExecutor's rpc main thread. Better to call TaskExecutorGateway#requestSlot after registering a ResourceManagerGateway.


Task task = taskSlotTable.getTask(taskDeploymentDescriptor.getExecutionAttemptId());
assertThat(task.getTerminationFuture().isDone(), is(true));
assertThat(TestInterruptableInvokable.INTERRUPTED_FUTURE.isDone(), is(true));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can't we change the test so that we have control over the submitted task's termination future t. Given that, we could stop the TaskExecutor and check that it has not terminated if the task's termination future t is not completed. Then we complete t and check that the TaskExecutor terminates. Then there would also be no need to interact directly with the TaskSlotTable which is an implementation detail of the TaskExecutor.

return jobManagerTable;
}

private TaskDeploymentDescriptor createTaskDeploymentDescriptor(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about introducing a TaskDeploymentDescriptorBuilder and replacing the different test instantiations of TDDs with that?

@azagrebin
Copy link
Contributor Author

Thanks for the review @tillrohrmann, I have addressed comments.

Copy link
Contributor

@tillrohrmann tillrohrmann left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for addressing my comments @azagrebin. LGTM.

The PR contains some checkstyle violations. I will push an update and merge once Travis gives green light.

tillrohrmann pushed a commit to tillrohrmann/flink that referenced this pull request Aug 16, 2019
tillrohrmann pushed a commit that referenced this pull request Aug 18, 2019
…ting down TaskExecutor

This closes #9072.
This closes #7757.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants