Skip to content
Permalink
Browse files

[SPARK-25139][SPARK-18406][CORE][BRANCH-2.3] Avoid NonFatals to kill …

…the Executor in PythonRunner

## What changes were proposed in this pull request?

Python uses a prefetch approach to read the result from upstream and serve them in another thread, thus it's possible that if the children operator doesn't consume all the data then the Task cleanup may happen before Python side read process finishes, this in turn create a race condition that the block read locks are freed during Task cleanup and then the reader try to release the read lock it holds and find it has been released, in this case we shall hit a AssertionError.

We shall catch the AssertionError in PythonRunner and prevent this kill the Executor.

## How was this patch tested?

Hard to write a unit test case for this case, manually verified with failed job.

Closes #24670 from rezasafi/branch-2.3.

Authored-by: Xingbo Jiang <xingbo.jiang@databricks.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
  • Loading branch information...
jiangxb1987 authored and dongjoon-hyun committed May 22, 2019
1 parent db33bd2 commit e56a9b19685b0ba089a0d6979d74f6592b9029d1
@@ -88,7 +88,7 @@ private[spark] case class PythonFunction(
private[spark] case class ChainedPythonFunctions(funcs: Seq[PythonFunction])

/** Thrown for exceptions in user Python code. */
private[spark] class PythonException(msg: String, cause: Exception)
private[spark] class PythonException(msg: String, cause: Throwable)
extends RuntimeException(msg, cause)

/**
@@ -23,6 +23,7 @@ import java.nio.charset.StandardCharsets
import java.util.concurrent.atomic.AtomicBoolean

import scala.collection.JavaConverters._
import scala.util.control.NonFatal

import org.apache.spark._
import org.apache.spark.internal.Logging
@@ -143,15 +144,15 @@ private[spark] abstract class BasePythonRunner[IN, OUT](
context: TaskContext)
extends Thread(s"stdout writer for $pythonExec") {

@volatile private var _exception: Exception = null
@volatile private var _exception: Throwable = null

private val pythonIncludes = funcs.flatMap(_.funcs.flatMap(_.pythonIncludes.asScala)).toSet
private val broadcastVars = funcs.flatMap(_.funcs.flatMap(_.broadcastVars.asScala))

setDaemon(true)

/** Contains the exception thrown while writing the parent iterator to the Python process. */
def exception: Option[Exception] = Option(_exception)
/** Contains the throwable thrown while writing the parent iterator to the Python process. */
def exception: Option[Throwable] = Option(_exception)

/** Terminates the writer thread, ignoring any exceptions that may occur due to cleanup. */
def shutdownOnTaskCompletion() {
@@ -251,18 +252,21 @@ private[spark] abstract class BasePythonRunner[IN, OUT](
dataOut.writeInt(SpecialLengths.END_OF_STREAM)
dataOut.flush()
} catch {
case e: Exception if context.isCompleted || context.isInterrupted =>
logDebug("Exception thrown after task completion (likely due to cleanup)", e)
if (!worker.isClosed) {
Utils.tryLog(worker.shutdownOutput())
}

case e: Exception =>
// We must avoid throwing exceptions here, because the thread uncaught exception handler
// will kill the whole executor (see org.apache.spark.executor.Executor).
_exception = e
if (!worker.isClosed) {
Utils.tryLog(worker.shutdownOutput())
case t: Throwable if (NonFatal(t) || t.isInstanceOf[Exception]) =>
if (context.isCompleted || context.isInterrupted) {
logDebug("Exception/NonFatal Error thrown after task completion (likely due to " +
"cleanup)", t)
if (!worker.isClosed) {
Utils.tryLog(worker.shutdownOutput())
}
} else {
// We must avoid throwing exceptions/NonFatals here, because the thread uncaught
// exception handler will kill the whole executor (see
// org.apache.spark.executor.Executor).
_exception = t
if (!worker.isClosed) {
Utils.tryLog(worker.shutdownOutput())
}
}
}
}

0 comments on commit e56a9b1

Please sign in to comment.
You can’t perform that action at this time.