From 7271308de670c582fa97ff1db13401db379d4cb2 Mon Sep 17 00:00:00 2001 From: GuoQiang Li Date: Wed, 2 Sep 2015 16:10:32 +0800 Subject: [PATCH] Fix When an OOM is thrown,the executor does not stop properly. --- .../org/apache/spark/TaskEndReason.scala | 11 +++++++ .../org/apache/spark/executor/Executor.scala | 31 +++++++++++-------- .../spark/scheduler/TaskSetManager.scala | 5 +++ 3 files changed, 34 insertions(+), 13 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/TaskEndReason.scala b/core/src/main/scala/org/apache/spark/TaskEndReason.scala index 934d00dc708b9..2a657d53185ee 100644 --- a/core/src/main/scala/org/apache/spark/TaskEndReason.scala +++ b/core/src/main/scala/org/apache/spark/TaskEndReason.scala @@ -215,3 +215,14 @@ case class ExecutorLostFailure(execId: String) extends TaskFailedReason { case object UnknownReason extends TaskFailedReason { override def toErrorString: String = "UnknownReason" } + +/** + * :: DeveloperApi :: + * Task was failed because the executor exit. + */ +@DeveloperApi +case class ExecutorExitFailure( + exitCode: Option[Int], + metrics: Option[TaskMetrics]) extends TaskFailedReason { + override def toErrorString: String = "ExecutorExitFailure (executor exit)" +} diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index c3491bb8b1cf3..01e7561994b34 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -305,21 +305,26 @@ private[spark] class Executor( m } } - val serializedTaskEndReason = { - try { - ser.serialize(new ExceptionFailure(t, metrics)) - } catch { - case _: NotSerializableException => - // t is not serializable so just send the stacktrace - ser.serialize(new ExceptionFailure(t, metrics, false)) + if (ShutdownHookManager.inShutdown()) { + val reason = ExecutorExitFailure(None, metrics) + execBackend.statusUpdate(taskId, TaskState.FAILED, ser.serialize(reason)) + } else { + val serializedTaskEndReason = { + try { + ser.serialize(new ExceptionFailure(t, metrics)) + } catch { + case _: NotSerializableException => + // t is not serializable so just send the stacktrace + ser.serialize(new ExceptionFailure(t, metrics, false)) + } } - } - execBackend.statusUpdate(taskId, TaskState.FAILED, serializedTaskEndReason) + execBackend.statusUpdate(taskId, TaskState.FAILED, serializedTaskEndReason) - // Don't forcibly exit unless the exception was inherently fatal, to avoid - // stopping other tasks unnecessarily. - if (Utils.isFatalError(t)) { - SparkUncaughtExceptionHandler.uncaughtException(t) + // Don't forcibly exit unless the exception was inherently fatal, to avoid + // stopping other tasks unnecessarily. + if (Utils.isFatalError(t)) { + SparkUncaughtExceptionHandler.uncaughtException(t) + } } } finally { diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala index 818b95d67f6be..1c7e308200a69 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@ -709,6 +709,11 @@ private[spark] class TaskSetManager( } ef.exception + case e: ExecutorExitFailure => + taskMetrics = e.metrics.orNull + logWarning(failureReason) + None + case e: TaskFailedReason => // TaskResultLost, TaskKilled, and others logWarning(failureReason) None