From 113eec4ae9437218ffd18f792571302e7667fc7e Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Sun, 23 Jul 2017 23:23:13 +0800 Subject: [PATCH] [SPARK-20904][CORE] Don't report task failures to driver during shutdown. Executors run a thread pool with daemon threads to run tasks. This means that those threads remain active when the JVM is shutting down, meaning those tasks are affected by code that runs in shutdown hooks. So if a shutdown hook messes with something that the task is using (e.g. an HDFS connection), the task will fail and will report that failure to the driver. That will make the driver mark the task as failed regardless of what caused the executor to shut down. So, for example, if YARN pre-empted that executor, the driver would consider that task failed when it should instead ignore the failure. This change avoids reporting failures to the driver when shutdown hooks are executing; this fixes the YARN preemption accounting, and doesn't really change things much for other scenarios, other than reporting a more generic error ("Executor lost") when the executor shuts down unexpectedly - which is arguably more correct. Tested with a hacky app running on spark-shell that tried to cause failures only when shutdown hooks were running, verified that preemption didn't cause the app to fail because of task failures exceeding the threshold. Author: Marcelo Vanzin Closes #18594 from vanzin/SPARK-20904. (cherry picked from commit cecd285a2aabad4e7db5a3d18944b87fbc4eee6c) Signed-off-by: Wenchen Fan --- .../org/apache/spark/executor/Executor.scala | 47 +++++++++++-------- 1 file changed, 28 insertions(+), 19 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index d54dd2d464821..e53d91d4e468b 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -467,29 +467,38 @@ private[spark] class Executor( // the default uncaught exception handler, which will terminate the Executor. logError(s"Exception in $taskName (TID $taskId)", t) - // Collect latest accumulator values to report back to the driver - val accums: Seq[AccumulatorV2[_, _]] = - if (task != null) { - task.metrics.setExecutorRunTime(System.currentTimeMillis() - taskStart) - task.metrics.setJvmGCTime(computeTotalGcTime() - startGCTime) - task.collectAccumulatorUpdates(taskFailed = true) - } else { - Seq.empty - } + // SPARK-20904: Do not report failure to driver if if happened during shut down. Because + // libraries may set up shutdown hooks that race with running tasks during shutdown, + // spurious failures may occur and can result in improper accounting in the driver (e.g. + // the task failure would not be ignored if the shutdown happened because of premption, + // instead of an app issue). + if (!ShutdownHookManager.inShutdown()) { + // Collect latest accumulator values to report back to the driver + val accums: Seq[AccumulatorV2[_, _]] = + if (task != null) { + task.metrics.setExecutorRunTime(System.currentTimeMillis() - taskStart) + task.metrics.setJvmGCTime(computeTotalGcTime() - startGCTime) + task.collectAccumulatorUpdates(taskFailed = true) + } else { + Seq.empty + } - val accUpdates = accums.map(acc => acc.toInfo(Some(acc.value), None)) + val accUpdates = accums.map(acc => acc.toInfo(Some(acc.value), None)) - val serializedTaskEndReason = { - try { - ser.serialize(new ExceptionFailure(t, accUpdates).withAccums(accums)) - } catch { - case _: NotSerializableException => - // t is not serializable so just send the stacktrace - ser.serialize(new ExceptionFailure(t, accUpdates, false).withAccums(accums)) + val serializedTaskEndReason = { + try { + ser.serialize(new ExceptionFailure(t, accUpdates).withAccums(accums)) + } catch { + case _: NotSerializableException => + // t is not serializable so just send the stacktrace + ser.serialize(new ExceptionFailure(t, accUpdates, false).withAccums(accums)) + } } + setTaskFinishedAndClearInterruptStatus() + execBackend.statusUpdate(taskId, TaskState.FAILED, serializedTaskEndReason) + } else { + logInfo("Not reporting error to driver during JVM shutdown.") } - setTaskFinishedAndClearInterruptStatus() - execBackend.statusUpdate(taskId, TaskState.FAILED, serializedTaskEndReason) // Don't forcibly exit unless the exception was inherently fatal, to avoid // stopping other tasks unnecessarily.