Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-16709][CORE] Kill the running task if stage failed #14557

Closed
wants to merge 4 commits into from

Conversation

shenh062326
Copy link
Contributor

@shenh062326 shenh062326 commented Aug 9, 2016

What changes were proposed in this pull request?

At SPARK-16709, when a stage failed, but the running task is still running, the retry stage will rerun the running task, it could cause TaskCommitDeniedException and task retry forever.

Here is the log:

16/07/28 05:22:15 INFO scheduler.TaskSetManager: Starting task 1.0 in stage 1.0 (TID 175, 10.215.146.81, partition 1,PROCESS_LOCAL, 1930 bytes)
16/07/28 05:28:35 INFO scheduler.TaskSetManager: Starting task 1.0 in stage 1.1 (TID 207, 10.196.147.232, partition 1,PROCESS_LOCAL, 1930 bytes)
16/07/28 05:28:48 INFO scheduler.TaskSetManager: Finished task 1.0 in stage 1.0 (TID 175) in 393261 ms on 10.215.146.81 (3/50)
16/07/28 05:34:11 WARN scheduler.TaskSetManager: Lost task 1.0 in stage 1.1 (TID 207, 10.196.147.232): TaskCommitDenied (Driver denied task commit) for job: 1, partition: 1, attemptNumber: 207
16/07/28 05:34:40 INFO scheduler.TaskSetManager: Starting task 1.1 in stage 1.1 (TID 227, 10.196.129.22, partition 1,PROCESS_LOCAL, 1930 bytes)
16/07/28 05:40:08 WARN scheduler.TaskSetManager: Lost task 1.1 in stage 1.1 (TID 227, 10.196.129.22): TaskCommitDenied (Driver denied task commit) for job: 1, partition: 1, attemptNumber: 227
16/07/28 05:42:28 INFO scheduler.TaskSetManager: Starting task 1.2 in stage 1.1 (TID 250, 10.215.146.82, partition 1,PROCESS_LOCAL, 1930 bytes)
...

1 task 1.0 in stage1.0 start
2 stage1.0 failed, start stage1.1.
3 task 1.0 in stage1.1 start
4 task 1.0 in stage1.0 finished.
5 task 1.0 in stage1.1 failed with TaskCommitDenied Exception, then retry forever.

How was this patch tested?

(Please explain how this patch was tested. E.g. unit tests, integration tests, manual tests)

(If this patch involves UI changes, please attach a screenshot; otherwise, remove this)

@SparkQA
Copy link

SparkQA commented Aug 9, 2016

Test build #63421 has finished for PR 14557 at commit 1a1ea2f.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@@ -1564,6 +1564,14 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
}
}

def killTasks(tasks: HashSet[Long], taskInfo: HashMap[Long, TaskInfo]): Boolean = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is not suitable to add a public method here in SparkContext, SparkContext is a public entry point, any method adds to here should be considered carefully. In your case looks like only Spark internally will use this method, why not directly change the TaskSetManager?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Jerryshao, Thanks for your prompt. I will move the method to TaskSetManager.

@SparkQA
Copy link

SparkQA commented Aug 9, 2016

Test build #63424 has finished for PR 14557 at commit 9263678.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Aug 9, 2016

Test build #63434 has finished for PR 14557 at commit 9ea08e8.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@@ -798,6 +798,19 @@ private[spark] class TaskSetManager(
}
}
maybeFinishTaskSet()

// kill running task if stage failed
if(reason.isInstanceOf[FetchFailed]) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A space between if and (.

@jaceklaskowski
Copy link
Contributor

LGTM

@SparkQA
Copy link

SparkQA commented Aug 15, 2016

Test build #63768 has finished for PR 14557 at commit fbe31eb.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@markhamstra
Copy link
Contributor

There are multiple issues with this PR. Some are at a more stylistic level, but some include deeper issues -- e.g. see SPARK-17064. Most fundamentally, this PR is the wrong solution at least in the sense that it does not implement a minimal fix without other side effects. The problem is that TaskCommitDenied is not being handled properly when a duplicate Task tries to commit a result that has already been successfully committed by another attempt of this Task. The proper fix needs to be at that point of committing duplicate results, not by making the larger, unnecessary change in how we handle cancellation/interruption of other Tasks in a TaskSet when one of them produces a FetchFailed.

@HyukjinKwon
Copy link
Member

@shenh062326 I would rather like to propose to close this if there is no argument against ^.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants