Skip to content

Commit

Permalink
[SPARK-29177][CORE] fix zombie tasks after stage abort
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?
Do task handling even the task exceeds maxResultSize configured. More details are in the jira description https://issues.apache.org/jira/browse/SPARK-29177 .

### Why are the changes needed?
Without this patch, the zombie tasks will prevent yarn from recycle those containers running these tasks, which will affect other applications.

### Does this PR introduce any user-facing change?
No

### How was this patch tested?
unit test and production test with a very large `SELECT` in spark thriftserver.

Closes #25850 from adrian-wang/zombie.

Authored-by: Daoyuan Wang <me@daoyuan.wang>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
  • Loading branch information
adrian-wang authored and cloud-fan committed Sep 23, 2019
1 parent 655356e commit c08bc37
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 0 deletions.
Expand Up @@ -64,6 +64,9 @@ private[spark] class TaskResultGetter(sparkEnv: SparkEnv, scheduler: TaskSchedul
val (result, size) = serializer.get().deserialize[TaskResult[_]](serializedData) match {
case directResult: DirectTaskResult[_] =>
if (!taskSetManager.canFetchMoreResults(serializedData.limit())) {
// kill the task so that it will not become zombie task
scheduler.handleFailedTask(taskSetManager, tid, TaskState.KILLED, TaskKilled(
"Tasks result size has exceeded maxResultSize"))
return
}
// deserialize "value" without holding any lock so that it won't block other threads.
Expand All @@ -75,6 +78,9 @@ private[spark] class TaskResultGetter(sparkEnv: SparkEnv, scheduler: TaskSchedul
if (!taskSetManager.canFetchMoreResults(size)) {
// dropped by executor if size is larger than maxResultSize
sparkEnv.blockManager.master.removeBlock(blockId)
// kill the task so that it will not become zombie task
scheduler.handleFailedTask(taskSetManager, tid, TaskState.KILLED, TaskKilled(
"Tasks result size has exceeded maxResultSize"))
return
}
logDebug("Fetching indirect task result for TID %s".format(tid))
Expand Down
Expand Up @@ -33,6 +33,7 @@ import org.scalatest.BeforeAndAfter
import org.scalatest.concurrent.Eventually._

import org.apache.spark._
import org.apache.spark.TaskState.TaskState
import org.apache.spark.TestUtils.JavaSourceFromString
import org.apache.spark.internal.config.Network.RPC_MESSAGE_MAX_SIZE
import org.apache.spark.storage.TaskResultBlockId
Expand Down Expand Up @@ -78,6 +79,16 @@ private class ResultDeletingTaskResultGetter(sparkEnv: SparkEnv, scheduler: Task
}
}

private class DummyTaskSchedulerImpl(sc: SparkContext)
extends TaskSchedulerImpl(sc, 1, true) {
override def handleFailedTask(
taskSetManager: TaskSetManager,
tid: Long,
taskState: TaskState,
reason: TaskFailedReason): Unit = {
// do nothing
}
}

/**
* A [[TaskResultGetter]] that stores the [[DirectTaskResult]]s it receives from executors
Expand Down Expand Up @@ -130,6 +141,31 @@ class TaskResultGetterSuite extends SparkFunSuite with BeforeAndAfter with Local
"Expect result to be removed from the block manager.")
}

test("handling total size of results larger than maxResultSize") {
sc = new SparkContext("local", "test", conf)
val scheduler = new DummyTaskSchedulerImpl(sc)
val spyScheduler = spy(scheduler)
val resultGetter = new TaskResultGetter(sc.env, spyScheduler)
scheduler.taskResultGetter = resultGetter
val myTsm = new TaskSetManager(spyScheduler, FakeTask.createTaskSet(2), 1) {
// always returns false
override def canFetchMoreResults(size: Long): Boolean = false
}
val indirectTaskResult = IndirectTaskResult(TaskResultBlockId(0), 0)
val directTaskResult = new DirectTaskResult(ByteBuffer.allocate(0), Nil, Array())
val ser = sc.env.closureSerializer.newInstance()
val serializedIndirect = ser.serialize(indirectTaskResult)
val serializedDirect = ser.serialize(directTaskResult)
resultGetter.enqueueSuccessfulTask(myTsm, 0, serializedDirect)
resultGetter.enqueueSuccessfulTask(myTsm, 1, serializedIndirect)
eventually(timeout(1.second)) {
verify(spyScheduler, times(1)).handleFailedTask(
myTsm, 0, TaskState.KILLED, TaskKilled("Tasks result size has exceeded maxResultSize"))
verify(spyScheduler, times(1)).handleFailedTask(
myTsm, 1, TaskState.KILLED, TaskKilled("Tasks result size has exceeded maxResultSize"))
}
}

test("task retried if result missing from block manager") {
// Set the maximum number of task failures to > 0, so that the task set isn't aborted
// after the result is missing.
Expand Down

0 comments on commit c08bc37

Please sign in to comment.