From a10b63fb90da3a7fd5c9ce7aaeb891b275507b93 Mon Sep 17 00:00:00 2001 From: Nan Zhu Date: Thu, 23 Oct 2014 17:25:56 -0400 Subject: [PATCH 1/3] refactor --- .../src/main/scala/org/apache/spark/executor/Executor.scala | 6 +++--- .../SparkUncaughtExceptionHandler.scala} | 6 +++--- core/src/main/scala/org/apache/spark/util/Utils.scala | 4 ++-- 3 files changed, 8 insertions(+), 8 deletions(-) rename core/src/main/scala/org/apache/spark/{executor/ExecutorUncaughtExceptionHandler.scala => util/SparkUncaughtExceptionHandler.scala} (93%) diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index 616c7e6a46368..10934f78b89ae 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -31,7 +31,7 @@ import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.scheduler._ import org.apache.spark.shuffle.FetchFailedException import org.apache.spark.storage.{StorageLevel, TaskResultBlockId} -import org.apache.spark.util.{AkkaUtils, Utils} +import org.apache.spark.util.{SparkUncaughtExceptionHandler, AkkaUtils, Utils} /** * Spark executor used with Mesos, YARN, and the standalone scheduler. @@ -68,7 +68,7 @@ private[spark] class Executor( // Setup an uncaught exception handler for non-local mode. // Make any thread terminations due to uncaught exceptions kill the entire // executor process to avoid surprising stalls. - Thread.setDefaultUncaughtExceptionHandler(ExecutorUncaughtExceptionHandler) + Thread.setDefaultUncaughtExceptionHandler(SparkUncaughtExceptionHandler) } val executorSource = new ExecutorSource(this, executorId) @@ -253,7 +253,7 @@ private[spark] class Executor( // Don't forcibly exit unless the exception was inherently fatal, to avoid // stopping other tasks unnecessarily. if (Utils.isFatalError(t)) { - ExecutorUncaughtExceptionHandler.uncaughtException(t) + SparkUncaughtExceptionHandler.uncaughtException(t) } } } finally { diff --git a/core/src/main/scala/org/apache/spark/executor/ExecutorUncaughtExceptionHandler.scala b/core/src/main/scala/org/apache/spark/util/SparkUncaughtExceptionHandler.scala similarity index 93% rename from core/src/main/scala/org/apache/spark/executor/ExecutorUncaughtExceptionHandler.scala rename to core/src/main/scala/org/apache/spark/util/SparkUncaughtExceptionHandler.scala index b0e984c03964c..91fc722f905fc 100644 --- a/core/src/main/scala/org/apache/spark/executor/ExecutorUncaughtExceptionHandler.scala +++ b/core/src/main/scala/org/apache/spark/util/SparkUncaughtExceptionHandler.scala @@ -15,17 +15,17 @@ * limitations under the License. */ -package org.apache.spark.executor +package org.apache.spark.util import org.apache.spark.Logging -import org.apache.spark.util.Utils +import org.apache.spark.executor.ExecutorExitCode /** * The default uncaught exception handler for Executors terminates the whole process, to avoid * getting into a bad state indefinitely. Since Executors are relatively lightweight, it's better * to fail fast when things go wrong. */ -private[spark] object ExecutorUncaughtExceptionHandler +private[spark] object SparkUncaughtExceptionHandler extends Thread.UncaughtExceptionHandler with Logging { override def uncaughtException(thread: Thread, exception: Throwable) { diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 0aeff6455b3fe..5182519d2a55a 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -43,7 +43,7 @@ import org.json4s._ import tachyon.client.{TachyonFile,TachyonFS} import org.apache.spark._ -import org.apache.spark.executor.ExecutorUncaughtExceptionHandler +import org.apache.spark.util.SparkUncaughtExceptionHandler import org.apache.spark.serializer.{DeserializationStream, SerializationStream, SerializerInstance} /** CallSite represents a place in user code. It can have a short and a long form. */ @@ -906,7 +906,7 @@ private[spark] object Utils extends Logging { block } catch { case e: ControlThrowable => throw e - case t: Throwable => ExecutorUncaughtExceptionHandler.uncaughtException(t) + case t: Throwable => SparkUncaughtExceptionHandler.uncaughtException(t) } } From e62e4169de576e4cd65d5e545490e6baebf99274 Mon Sep 17 00:00:00 2001 From: Nan Zhu Date: Thu, 23 Oct 2014 18:02:04 -0400 Subject: [PATCH 2/3] add some general Exit code --- .../apache/spark/executor/ExecutorExitCode.scala | 12 ++---------- .../org/apache/spark/util/SparkExitCode.scala | 15 +++++++++++++++ .../util/SparkUncaughtExceptionHandler.scala | 8 ++++---- 3 files changed, 21 insertions(+), 14 deletions(-) create mode 100644 core/src/main/scala/org/apache/spark/util/SparkExitCode.scala diff --git a/core/src/main/scala/org/apache/spark/executor/ExecutorExitCode.scala b/core/src/main/scala/org/apache/spark/executor/ExecutorExitCode.scala index 38be2c58b333f..52862ae0ca5e4 100644 --- a/core/src/main/scala/org/apache/spark/executor/ExecutorExitCode.scala +++ b/core/src/main/scala/org/apache/spark/executor/ExecutorExitCode.scala @@ -17,6 +17,8 @@ package org.apache.spark.executor +import org.apache.spark.util.SparkExitCode._ + /** * These are exit codes that executors should use to provide the master with information about * executor failures assuming that cluster management framework can capture the exit codes (but @@ -27,16 +29,6 @@ package org.apache.spark.executor */ private[spark] object ExecutorExitCode { - /** The default uncaught exception handler was reached. */ - val UNCAUGHT_EXCEPTION = 50 - - /** The default uncaught exception handler was called and an exception was encountered while - logging the exception. */ - val UNCAUGHT_EXCEPTION_TWICE = 51 - - /** The default uncaught exception handler was reached, and the uncaught exception was an - OutOfMemoryError. */ - val OOM = 52 /** DiskStore failed to create a local temporary directory after many attempts. */ val DISK_STORE_FAILED_TO_CREATE_DIR = 53 diff --git a/core/src/main/scala/org/apache/spark/util/SparkExitCode.scala b/core/src/main/scala/org/apache/spark/util/SparkExitCode.scala new file mode 100644 index 0000000000000..06cb6a3adda79 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/util/SparkExitCode.scala @@ -0,0 +1,15 @@ +package org.apache.spark.util + +private[spark] object SparkExitCode { + /** The default uncaught exception handler was reached. */ + val UNCAUGHT_EXCEPTION = 50 + + /** The default uncaught exception handler was called and an exception was encountered while + logging the exception. */ + val UNCAUGHT_EXCEPTION_TWICE = 51 + + /** The default uncaught exception handler was reached, and the uncaught exception was an + OutOfMemoryError. */ + val OOM = 52 + +} diff --git a/core/src/main/scala/org/apache/spark/util/SparkUncaughtExceptionHandler.scala b/core/src/main/scala/org/apache/spark/util/SparkUncaughtExceptionHandler.scala index 91fc722f905fc..7586e8c77ed29 100644 --- a/core/src/main/scala/org/apache/spark/util/SparkUncaughtExceptionHandler.scala +++ b/core/src/main/scala/org/apache/spark/util/SparkUncaughtExceptionHandler.scala @@ -36,14 +36,14 @@ private[spark] object SparkUncaughtExceptionHandler // (If we do, we will deadlock.) if (!Utils.inShutdown()) { if (exception.isInstanceOf[OutOfMemoryError]) { - System.exit(ExecutorExitCode.OOM) + System.exit(SparkExitCode.OOM) } else { - System.exit(ExecutorExitCode.UNCAUGHT_EXCEPTION) + System.exit(SparkExitCode.UNCAUGHT_EXCEPTION) } } } catch { - case oom: OutOfMemoryError => Runtime.getRuntime.halt(ExecutorExitCode.OOM) - case t: Throwable => Runtime.getRuntime.halt(ExecutorExitCode.UNCAUGHT_EXCEPTION_TWICE) + case oom: OutOfMemoryError => Runtime.getRuntime.halt(SparkExitCode.OOM) + case t: Throwable => Runtime.getRuntime.halt(SparkExitCode.UNCAUGHT_EXCEPTION_TWICE) } } From 035ee3dc64392054bdfb84ba38b43cb449d62d5d Mon Sep 17 00:00:00 2001 From: Nan Zhu Date: Thu, 23 Oct 2014 18:48:49 -0400 Subject: [PATCH 3/3] make RAT happy --- .../org/apache/spark/util/SparkExitCode.scala | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/core/src/main/scala/org/apache/spark/util/SparkExitCode.scala b/core/src/main/scala/org/apache/spark/util/SparkExitCode.scala index 06cb6a3adda79..c93b1cca9f564 100644 --- a/core/src/main/scala/org/apache/spark/util/SparkExitCode.scala +++ b/core/src/main/scala/org/apache/spark/util/SparkExitCode.scala @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.spark.util private[spark] object SparkExitCode {