Skip to content

Commit

Permalink
[SPARK-22535][PYSPARK] Sleep before killing the python worker in Pyth…
Browse files Browse the repository at this point in the history
…Runner.MonitorThread (branch-2.2)

## What changes were proposed in this pull request?

Backport #19762 to 2.2

## How was this patch tested?

Jenkins

Author: Shixiong Zhu <zsxwing@gmail.com>

Closes #19768 from zsxwing/SPARK-22535-2.2.
  • Loading branch information
zsxwing committed Nov 16, 2017
1 parent 0b51fd3 commit be68f86
Showing 1 changed file with 15 additions and 6 deletions.
21 changes: 15 additions & 6 deletions core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -360,6 +360,9 @@ private[spark] class PythonRunner(
class MonitorThread(env: SparkEnv, worker: Socket, context: TaskContext)
extends Thread(s"Worker Monitor for $pythonExec") {

/** How long to wait before killing the python worker if a task cannot be interrupted. */
private val taskKillTimeout = env.conf.getTimeAsMs("spark.python.task.killTimeout", "2s")

setDaemon(true)

override def run() {
Expand All @@ -369,12 +372,18 @@ private[spark] class PythonRunner(
Thread.sleep(2000)
}
if (!context.isCompleted) {
try {
logWarning("Incomplete task interrupted: Attempting to kill Python Worker")
env.destroyPythonWorker(pythonExec, envVars.asScala.toMap, worker)
} catch {
case e: Exception =>
logError("Exception when trying to kill worker", e)
Thread.sleep(taskKillTimeout)
if (!context.isCompleted) {
try {
// Mimic the task name used in `Executor` to help the user find out the task to blame.
val taskName = s"${context.partitionId}.${context.taskAttemptId} " +
s"in stage ${context.stageId} (TID ${context.taskAttemptId})"
logWarning(s"Incomplete task $taskName interrupted: Attempting to kill Python Worker")
env.destroyPythonWorker(pythonExec, envVars.asScala.toMap, worker)
} catch {
case e: Exception =>
logError("Exception when trying to kill worker", e)
}
}
}
}
Expand Down

0 comments on commit be68f86

Please sign in to comment.