Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-11334] numRunningTasks can't be less than 0, or it will affect executor allocation #9288

Closed
wants to merge 1 commit into from

Conversation

XuTingjun
Copy link
Contributor

With Dynamic Allocation function, a task failed over maxFailure time, all the dependent jobs, stages, tasks will be killed or aborted. In this process, SparkListenerTaskEnd event will be behind in SparkListenerStageCompleted and SparkListenerJobEnd. Like the Event Log below:

{"Event":"SparkListenerStageCompleted","Stage Info":{"Stage ID":20,"Stage Attempt ID":0,"Stage Name":"run at AccessController.java:-2","Number of Tasks":200}
{"Event":"SparkListenerJobEnd","Job ID":9,"Completion Time":1444914699829}
{"Event":"SparkListenerTaskEnd","Stage ID":20,"Stage Attempt ID":0,"Task Type":"ResultTask","Task End Reason":{"Reason":"TaskKilled"},"Task Info":{"Task ID":1955,"Index":88,"Attempt":2,"Launch Time":1444914699763,"Executor ID":"5","Host":"linux-223","Locality":"PROCESS_LOCAL","Speculative":false,"Getting Result Time":0,"Finish Time":1444914699864,"Failed":true,"Accumulables":[]}}

Because that, the numRunningTasks in ExecutorAllocationManager class will be less than 0, and it will affect executor allocation.

@SparkQA
Copy link

SparkQA commented Oct 27, 2015

Test build #44397 has finished for PR 9288 at commit e32e684.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@jerryshao
Copy link
Contributor

IMHO, would it be better to fix this unexpected ordering of events, from my understanding, SparkListenerTaskEnd should be triggered before SparkListenerStageCompleted, right? I think that's root cause should be fixed, rather than adding some guard codes in dynamic allocation.

@@ -615,7 +615,11 @@ private[spark] class ExecutorAllocationManager(
val taskIndex = taskEnd.taskInfo.index
val stageId = taskEnd.stageId
allocationManager.synchronized {
numRunningTasks -= 1
if (numRunningTasks > 0) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm, this seems like a band-aid though. Is this because the task-end event happens in the wrong order with the job end event? can we catch and guard for that case more directly?

@XuTingjun
Copy link
Contributor Author

Yeah, I know the root cause is the wrong ordering of events.
The code of these event order are: kill Task, StageCompleted, JobEnd.
Because the TaskEnd is not serial with StageComplete and JobEnd, so I think maybe we can't control the ordering.
Can you read the code, and give me some good idea? Thanks.

@srowen
Copy link
Member

srowen commented Oct 27, 2015

Hm, is it better to check the status of the task, and only decrement if it wasn't dead already? (I may not know what I'm talking about there)

@jerryshao
Copy link
Contributor

Can we do this by adding pending to kill tasks into list, only when all the tasks marked as finished, then call markStageAsFinished, also post SparkListenerJobEnd. AFAIK, this is the way to manage executor killing in CoarseGrainedSchedulerBacked, from the code level (not sure the complexity) this can also be achieved here in DAGScheduler.

@@ -575,7 +575,7 @@ private[spark] class ExecutorAllocationManager(
if (stageIdToNumTasks.isEmpty) {
allocationManager.onSchedulerQueueEmpty()
if (numRunningTasks != 0) {
logWarning("No stages are running, but numRunningTasks != 0")
logWarning(s"No stages are running, but numRunningTasks = $numRunningTasks")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would say

... but numRunningTasks ($numRunningTasks) != 0

@andrewor14
Copy link
Contributor

It's unclear how the wrong ordering resulted because DAGScheduler supposably posts events from a single thread. I think investigating this will take much longer since the scheduler is quite complicated. I'm OK with doing a fix on the ExecutorAllocationManager side.

@XuTingjun
Copy link
Contributor Author

@jerryshao, I try to realize the code of your suggestion, but many unit tests are failed. I think it's difficult for me. If you can help me, I would be very grateful.

@XuTingjun
Copy link
Contributor Author

@andrewor14, yeah the DAGScheduler post events from a single thread, but the root cause is that DAGScheduler receive the SparkListenerTaskEnd behind SparkListenerStageCompleted.
When DAGScheduler ask executor to cancel running tasks, it will immediately mark stage finished, not wait for tasks to complete. So it cause post SparkListenerStageCompleted before SparkListenerTaskEnd.

I try to resolve the root cause, as you said scheduler is quite complicated, my realization caused many unit tests failed. So if you OK with doing a fix on the ExecutorAllocationManager side, I will update the ExecutorAllocationManager code.

@jerryshao
Copy link
Contributor

@XuTingjun , I'm OK with the current way if finding the root cause is a little bit complicated. Just need to add more comments why we have to change to that way.

@jerryshao
Copy link
Contributor

@andrewor14 , from my understanding this wrong ordering is happened when driver explicitly abort the running stages because of exceeding max failures. So SparkListenerStageCompleted will be received firstly, then SparkListenerJobEnd, SparkListenerTaskEnd will be received at last because it has to be killed and return back the result remotely.

@andrewor14
Copy link
Contributor

I see, thanks for the explanation.

@XuTingjun A more robust way to do this may be to keep track of

private val stageIdToRunningTaskIds = new mutable.HashMap[Int, mutable.HashSet[Int]]

then you can just remove all the tasks associated with the stage when the stage is completed.

def totalRunningTasks(): Int = {
  stageIdToRunningTaskIds.mapValues(_.size).sum
}

Can you address the comments?

@tgravescs
Copy link
Contributor

this seems very similar to what I'm seeing in https://issues.apache.org/jira/browse/SPARK-11701. that is dealing with speculation but also causes the numbers to be off. This is only fixing the executor allocation though. Is the executor page also showing a negative number of active tasks? I found that for speculation it affects multiple things that are doing accounting based on the taskEnd event that is coming after the stage is finished. if we can't fix the root cause of out of order we should check the other places it might affect.

@tgravescs
Copy link
Contributor

ping @XuTingjun are you still working on this?

@andrewor14
Copy link
Contributor

Let's close this PR for now since it's been inactive for many months.
@tgravescs @jerryshao would one of you have time to pick up this patch so we can make progress on it?

@jerryshao
Copy link
Contributor

Let me take a crack at this issue :).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
6 participants