diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index d54dd2d464821..e53d91d4e468b 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -467,29 +467,38 @@ private[spark] class Executor( // the default uncaught exception handler, which will terminate the Executor. logError(s"Exception in $taskName (TID $taskId)", t) - // Collect latest accumulator values to report back to the driver - val accums: Seq[AccumulatorV2[_, _]] = - if (task != null) { - task.metrics.setExecutorRunTime(System.currentTimeMillis() - taskStart) - task.metrics.setJvmGCTime(computeTotalGcTime() - startGCTime) - task.collectAccumulatorUpdates(taskFailed = true) - } else { - Seq.empty - } + // SPARK-20904: Do not report failure to driver if if happened during shut down. Because + // libraries may set up shutdown hooks that race with running tasks during shutdown, + // spurious failures may occur and can result in improper accounting in the driver (e.g. + // the task failure would not be ignored if the shutdown happened because of premption, + // instead of an app issue). + if (!ShutdownHookManager.inShutdown()) { + // Collect latest accumulator values to report back to the driver + val accums: Seq[AccumulatorV2[_, _]] = + if (task != null) { + task.metrics.setExecutorRunTime(System.currentTimeMillis() - taskStart) + task.metrics.setJvmGCTime(computeTotalGcTime() - startGCTime) + task.collectAccumulatorUpdates(taskFailed = true) + } else { + Seq.empty + } - val accUpdates = accums.map(acc => acc.toInfo(Some(acc.value), None)) + val accUpdates = accums.map(acc => acc.toInfo(Some(acc.value), None)) - val serializedTaskEndReason = { - try { - ser.serialize(new ExceptionFailure(t, accUpdates).withAccums(accums)) - } catch { - case _: NotSerializableException => - // t is not serializable so just send the stacktrace - ser.serialize(new ExceptionFailure(t, accUpdates, false).withAccums(accums)) + val serializedTaskEndReason = { + try { + ser.serialize(new ExceptionFailure(t, accUpdates).withAccums(accums)) + } catch { + case _: NotSerializableException => + // t is not serializable so just send the stacktrace + ser.serialize(new ExceptionFailure(t, accUpdates, false).withAccums(accums)) + } } + setTaskFinishedAndClearInterruptStatus() + execBackend.statusUpdate(taskId, TaskState.FAILED, serializedTaskEndReason) + } else { + logInfo("Not reporting error to driver during JVM shutdown.") } - setTaskFinishedAndClearInterruptStatus() - execBackend.statusUpdate(taskId, TaskState.FAILED, serializedTaskEndReason) // Don't forcibly exit unless the exception was inherently fatal, to avoid // stopping other tasks unnecessarily.