Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-11334][Core] Handle maximum task failure situation in dynamic allocation #11205

Closed
wants to merge 1 commit into from

Conversation

jerryshao
Copy link
Contributor

Currently there're two problems in dynamic allocation when maximum task failure is met:

  1. Number of running tasks will possibly be negative, which will affect the calculation of needed executors.
  2. Executors may never be idle. Currently we use the executor to tasks mapping relation to identify the status of executors, in maximum task failure situation, some TaskEnd events may never be delivered, which makes the related executor always be busy.

This patch tries to fix these two issues, please review, thanks a lot.

CC @andrewor14 and @tgravescs .

@SparkQA
Copy link

SparkQA commented Feb 15, 2016

Test build #51298 has finished for PR 11205 at commit 966eb89.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@andrewor14
Copy link
Contributor

Just to add a link here to the previous PR #9288

// Number of tasks currently running on the cluster. Should be 0 when no stages are active.
private var numRunningTasks: Int = _
private val executorIdToStageAndNumTasks =
new mutable.HashMap[String, mutable.HashMap[Int, Int]]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is a very complicated data structure. Why do we need to keep track of what stage each executor is running? Is there a simpler way to fix this?

@andrewor14
Copy link
Contributor

@jerryshao I took a look at this and it looks overly complicated. It seems that the problem is sometimes we have negative totalRunningTasks and that leads to undesirable behavior. Can't we fix this by expressing totalRunningTasks in terms of stageIdToTaskIndices.map(_.values.size).sum?

@jerryshao
Copy link
Contributor Author

Hi @andrewor14 , thanks a lot for your comments.

The reason why I introduce another data structure to track each executor's stage and task numbers is mentioned before and I pasted here again:

Executors may never be idle. Currently we use the executor to tasks mapping relation to identify the status of executors, in maximum task failure situation, some TaskEnd events may never be delivered, which makes the related executor always be busy.

According to my test, TaskEnd event may not be delivered as the expected number, which will make executor never be released. So compared to the old implementation, I changed to clean the related task number when stage is completed. That's why I introduce a complicated data structure.

@andrewor14
Copy link
Contributor

Also cc @vanzin @srowen

@tgravescs
Copy link
Contributor

sorry somehow I missed this go by, I haven't looked at the code chanes in detail yet. The TaskEnd event should be being sent all the time now, we fixed this bug a while back. Or is it because its out of order?

Can you describe in more detail the exact issue and how this change fixes it?

@rustagi
Copy link

rustagi commented Sep 3, 2016

I am seeing this issue quite frequently. Not sure what is causing it but frequently we will get a onTaskEnd event after a stage has ended. This will cause the numRunningTasks to become negative. If executor number is updated then number of required executors(maxNumExecutorsNeeded) becomes negative & have issues in new executor allocation and deallocation. Best case you get executors that are unable to deallocate & over time spark does not allocate new executors even if there are tasks pending.
There is a simple hacky patch here: #9288 & this one is an attempt to correct it with more accountability.
I am seeing this issue so frequently that I am not sure its possible to run Spark with dynamic allocation successfully for long duration without fixing it. I'll try the hacky patch & confirm.

@rustagi
Copy link

rustagi commented Sep 6, 2016

I can confirm that removing speculation & setting maxtaskfailure to 1 eliminates this problem. Will try the patch & confirm

@HyukjinKwon
Copy link
Member

HyukjinKwon commented Jun 19, 2017

gentle ping @rustagi, have you maybe had some time to confirm this patch maybe? It sounds the only thing we need here is the confirmation.

@HyukjinKwon
Copy link
Member

gentle ping @rustagi

@jiangxb1987
Copy link
Contributor

What's the status of this PR? @jerryshao

@jerryshao
Copy link
Contributor Author

I guess the issue still exists, let me verify the issue again, if it still exists I will bring the PR to latest. Thanks!

@rustagi
Copy link

rustagi commented Oct 9, 2017

Sorry haven't been able to confirm this patch becaus have not seen issue in production for quite some time.
It was much more persistent with 2.0 than 2.1
Not sure of cause.

@vanzin
Copy link
Contributor

vanzin commented Oct 23, 2017

This PR is pretty old and a lot has changed since, but it looks like this can be fixed now by just fixing code to look at stageIdToTaskIndices instead of keeping numRunningTasks around? (Or maybe use numRunningTasks as a cache for stageIdToTaskIndices.values.sum.)

Also, doesn't isExecutorIdle take care of the second bullet in your description?

@jerryshao
Copy link
Contributor Author

@vanzin , in the current code stageIdToTaskIndices cannot be used to track number of running tasks, because this structure doesn't remove task index from itself when task is finished successfully.

Yes isExecutorIdle is used to take care of executor idle, but the way to identify whether executor is idle is not robust enough. In this scenario, when stage is aborted because of max task failures, some task end event will be missing, so using number of tasks per executor will lead to residual data, and makes executor always be busy.

@jerryshao
Copy link
Contributor Author

Verified again, looks like the 2nd bullet is not valid anymore, I cannot reproduce it in latest master branch, this might have already been fixed in SPARK-13054.

So only first issue still exists, I think @sitalkedia 's PR is enough to handle this 1st issue. I'm going to close this one. @sitalkedia would you please reopen your PR, sorry to bring in noise.

@jerryshao jerryshao closed this Oct 26, 2017
@SparkQA
Copy link

SparkQA commented Oct 26, 2017

Test build #83067 has finished for PR 11205 at commit 59f9c15.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@da-liii
Copy link
Contributor

da-liii commented Apr 26, 2018

@jerryshao I think the 2nd bullet has not been fixed in SPARK-13054.

I use spark 2.1.1, and I still find that finished tasks remain in private val executorIdToTaskIds = new mutable.HashMap[String, mutable.HashSet[Long]]

But the numRunningTasks equals 0 since:

          if (numRunningTasks != 0) {
            logWarning("No stages are running, but numRunningTasks != 0")
            numRunningTasks = 0
          }

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
9 participants