Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-26634]Do not allow task of FetchFailureStage commit in OutputCommitCoordinator #23563

Closed

Conversation

liupc
Copy link

@liupc liupc commented Jan 16, 2019

What changes were proposed in this pull request?

What's the problem?

canCommit of OutputCommitCoordinator would allow the task of FetchFailure stage commit, which result in TaskCommitDenied for the task(with the same partition as the commit task of the fetchfailure stage) of retry stage. Because of TaskCommitDenied is not counting towards failure, So the scheduler will constantly scheduling task and got TaskCommitDenied, thus causing the application hangs forever.

How does it happen?

A detailed explaination for this:
Let's say we have:
stage 0.0 . (stage id 0, attempt 0)

  • task 1.0 (task 1, attempt 0)

Stage 0.1 (stage id 0, attempt 1) started due to fetch failure for instance

  • task 1.0 (task 1, attempt 0) . Equivalent with task 1.0 in stage 0.0
  1. Task 1.0 in stage 0.0 is successfuly compeleted after the the launch of Stage 0.1, it will hold the commit lock for partition 1. (Sure, because AuthorizedCommiters for partition 1 is not exist and the attempt is not failed.)

    case Some(state) if attemptFailed(state, stageAttempt, partition, attemptNumber) =>



    val existing = state.authorizedCommitters(partition)


    state.authorizedCommitters(partition) = TaskIdentifier(stageAttempt, attemptNumber)

  2. Task 1.0 in stage 0.1 compeleted, it can not get the commit lock. (Sure, already hold by task 1.0 in stage 0.0)

    val existing = state.authorizedCommitters(partition)



    logDebug(s"Commit denied for stage=$stage.$stageAttempt, partition=$partition: " +

  3. Because of TaskCommitDenied not counting towards failure, TaskSetManager.handleFailedTask would not abort despite the consecutive failure of task1.x for parition 1 in stage0.1.

    override def countTowardsTaskFailures: Boolean = false

    if (!isZombie && reason.countTowardsTaskFailures) {

  4. task 1 will be readded to pendingTasks and scheduler will schedule Task1.1 later.

  5. Task 1.1 in stage 0.1 completed and also can not get the commit lock. and so back and forth

Logs:

2019-01-09,08:39:53,676 INFO org.apache.spark.scheduler.TaskSetManager: Starting task 138.0 in stage 5.1 (TID 31437, zjy-hadoop-prc-st159.bj, executor 456, partition 138, PROCESS_LOCAL, 5829 bytes)
2019-01-09,08:43:37,514 INFO org.apache.spark.scheduler.TaskSetManager: Finished task 138.0 in stage 5.0 (TID 30634) in 466958 ms on zjy-hadoop-prc-st1212.bj (executor 1632) (674/5000)
 2019-01-09,08:45:56,284 INFO org.apache.spark.scheduler.OutputCommitCoordinator: Denying attemptNumber=1 to commit for stage=5, partition=138; existingCommitter = 0
2019-01-09,08:45:57,372 WARN org.apache.spark.scheduler.TaskSetManager: Lost task 138.0 in stage 5.1 (TID 31437, zjy-hadoop-prc-st159.bj, executor 456): TaskCommitDenied (Driver denied task commit) for job: 5, partition: 138, attemptNumber: 1
166483 2019-01-09,08:45:57,373 INFO org.apache.spark.scheduler.OutputCommitCoordinator: Task was denied committing, stage: 5, partition: 138, attempt number: 0, attempt number(counting failed stage): 1

How does this PR fix?

This PR will forbidden task of failed stage commit in the term of the new stage and thus solve the problem.

How was this patch tested?

unittest

Please review http://spark.apache.org/contributing.html before opening a pull request.

@HyukjinKwon
Copy link
Member

@liupc, those logics are pretty core logics. It's not required but I would like to suggest elaborate it with examples, and pointers for codes so that reviewers can take a look easily. One example can be ... #21815 which has similar amount of change with it.

@liupc
Copy link
Author

liupc commented Jan 17, 2019

@HyukjinKwon Ok, thanks, I will try to explain it with examples in the description.

Copy link
Contributor

@vanzin vanzin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this makes sense, but in this code it's good to get more eyes. @squito @tgravescs @cloud-fan

* @param maxPartitionId the maximum partition id that could appear in this stage's tasks (i.e.
* the maximum possible value of `context.partitionId`).
*/
private[scheduler] def stageStart(stage: Int, maxPartitionId: Int): Unit = synchronized {
private[scheduler] def stageStart(
stage: Int, stageAttemptNumber: Int, maxPartitionId: Int): Unit = synchronized {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: one parameter per line, double indented

@vanzin
Copy link
Contributor

vanzin commented Feb 13, 2019

ok to test

stageStates.get(stage) match {
case Some(state) =>
require(state.authorizedCommitters.length == maxPartitionId + 1)
state.latestStageAttempt = stageAttemptNumber
logInfo(s"Reusing state from previous attempt of stage $stage.")

case _ =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is better to assign stageAttemptNumber to latestStageAttempt of newly create StageState too.

stageStates.get(stage) match {
case Some(state) =>
require(state.authorizedCommitters.length == maxPartitionId + 1)
state.latestStageAttempt = stageAttemptNumber
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't we need to check if current latestStageAttempt is less than stageAttemptNumber or not?

@viirya
Copy link
Member

viirya commented Feb 14, 2019

  1. Task 1.0 in stage 0.0 is successfuly compeleted after the the launch of Stage 0.1, it will hold the commit lock for partition 1. (Sure, because AuthorizedCommiters for partition 1 is not exist and the attempt is not failed.)

hmm, doesn't Task 1.0 in stage 0.0 hits FetchFailure? Or it can be successfully completed after FetchFailure? When the stage 0.1 gets launched? After FetchFailure of stage 0.0 but before it is completed?

@vanzin
Copy link
Contributor

vanzin commented Feb 14, 2019

hmm, doesn't Task 1.0 in stage 0.0 hits FetchFailure?

No, some other task fails and causes the stage to be recomputed. That's what causes the scenario where task 1.0 commits after the new stage attempt starts.

@viirya
Copy link
Member

viirya commented Feb 14, 2019

Oh, I see. Thanks @vanzin.

Won't task 1.0 in stage 0.0 release the commit lock later? Will it hold the lock forever?

@vanzin
Copy link
Contributor

vanzin commented Feb 14, 2019

No, once you commit, that's it. The partition stays "locked" until the stage finishes.

@SparkQA
Copy link

SparkQA commented Feb 14, 2019

Test build #102310 has finished for PR 23563 at commit 5967f11.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor

  1. task 1 will be readded to pendingTasks and scheduler will schedule Task1.1 later.

the partition 1 of stage 0 is marked as completed, why the scheduler need to schedule this task?

@liupc
Copy link
Author

liupc commented Feb 14, 2019

@cloud-fan @vanzin @viirya @HyukjinKwon
Sorry, I think this issue is resolved by #21131, and this PR allow markPartitionCompletedInAllTaskSets which is a better way to fix the issue we encountered in spark2.1, as it improves performance by letting the partition being marked as successful by the first completion attempt.

@vanzin
Copy link
Contributor

vanzin commented Feb 14, 2019

the partition 1 of stage 0 is marked as completed, why the scheduler need to schedule this task?

Because it completes after the next stage attempt starts, so the scheduler creates new tasks for all the tasks that did not finish in the previous attempt up to that point.

Anyway, if Imran's fix covers this case, that's good, no need to make this more complicated.

@squito
Copy link
Contributor

squito commented Feb 14, 2019

btw that fix also had a bug, see followup in #22806 (which unfortunately still needs to be figured out). But that is needed for more than OutputCommitters, so that issue should still cover this.

@vanzin
Copy link
Contributor

vanzin commented Mar 15, 2019

Based on the above I'm closing this.

@vanzin vanzin closed this Mar 15, 2019
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
7 participants