From e7039dc6dcf3ece047cf98f7f44024bc0c492e31 Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Thu, 16 Apr 2015 14:39:26 -0700 Subject: [PATCH] [SPARK-6014] [core] Revamp Spark shutdown hooks, fix shutdown races. This change adds some new utility code to handle shutdown hooks in Spark. The main goal is to take advantage of Hadoop 2.x's API for shutdown hooks, which allows Spark to register a hook that will run before the one that cleans up HDFS clients, and thus avoids some races that would cause exceptions to show up and other issues such as failure to properly close event logs. Unfortunately, Hadoop 1.x does not have such APIs, so in that case correctness is still left to chance. --- .../spark/deploy/history/HistoryServer.scala | 6 +- .../spark/deploy/worker/ExecutorRunner.scala | 12 +- .../spark/storage/DiskBlockManager.scala | 18 +-- .../spark/storage/TachyonBlockManager.scala | 24 ++-- .../scala/org/apache/spark/util/Utils.scala | 127 +++++++++++++++--- .../org/apache/spark/util/UtilsSuite.scala | 32 +++-- .../hive/thriftserver/HiveThriftServer2.scala | 9 +- .../hive/thriftserver/SparkSQLCLIDriver.scala | 9 +- .../spark/deploy/yarn/ApplicationMaster.scala | 65 ++++----- 9 files changed, 187 insertions(+), 115 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala index 72f6048239297..56bef57e55392 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala @@ -27,7 +27,7 @@ import org.apache.spark.{Logging, SecurityManager, SparkConf} import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.ui.{SparkUI, UIUtils, WebUI} import org.apache.spark.ui.JettyUtils._ -import org.apache.spark.util.SignalLogger +import org.apache.spark.util.{SignalLogger, Utils} /** * A web server that renders SparkUIs of completed applications. @@ -194,9 +194,7 @@ object HistoryServer extends Logging { val server = new HistoryServer(conf, provider, securityManager, port) server.bind() - Runtime.getRuntime().addShutdownHook(new Thread("HistoryServerStopper") { - override def run(): Unit = server.stop() - }) + Utils.addShutdownHook { () => server.stop() } // Wait until the end of the world... or if the HistoryServer process is manually stopped while(true) { Thread.sleep(Int.MaxValue) } diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala index 7d5acabb95a48..7aa85b732fc87 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala @@ -28,6 +28,7 @@ import com.google.common.io.Files import org.apache.spark.{SparkConf, Logging} import org.apache.spark.deploy.{ApplicationDescription, ExecutorState} import org.apache.spark.deploy.DeployMessages.ExecutorStateChanged +import org.apache.spark.util.Utils import org.apache.spark.util.logging.FileAppender /** @@ -61,7 +62,7 @@ private[deploy] class ExecutorRunner( // NOTE: This is now redundant with the automated shut-down enforced by the Executor. It might // make sense to remove this in the future. - private var shutdownHook: Thread = null + private var shutdownHook: AnyRef = null private[worker] def start() { workerThread = new Thread("ExecutorRunner for " + fullId) { @@ -69,12 +70,7 @@ private[deploy] class ExecutorRunner( } workerThread.start() // Shutdown hook that kills actors on shutdown. - shutdownHook = new Thread() { - override def run() { - killProcess(Some("Worker shutting down")) - } - } - Runtime.getRuntime.addShutdownHook(shutdownHook) + shutdownHook = Utils.addShutdownHook { () => killProcess(Some("Worker shutting down")) } } /** @@ -106,7 +102,7 @@ private[deploy] class ExecutorRunner( workerThread = null state = ExecutorState.KILLED try { - Runtime.getRuntime.removeShutdownHook(shutdownHook) + Utils.removeShutdownHook(shutdownHook) } catch { case e: IllegalStateException => None } diff --git a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala index 2883137872600..7ea5e54f9e1fe 100644 --- a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala @@ -138,25 +138,17 @@ private[spark] class DiskBlockManager(blockManager: BlockManager, conf: SparkCon } } - private def addShutdownHook(): Thread = { - val shutdownHook = new Thread("delete Spark local dirs") { - override def run(): Unit = Utils.logUncaughtExceptions { - logDebug("Shutdown hook called") - DiskBlockManager.this.doStop() - } + private def addShutdownHook(): AnyRef = { + Utils.addShutdownHook { () => + logDebug("Shutdown hook called") + DiskBlockManager.this.doStop() } - Runtime.getRuntime.addShutdownHook(shutdownHook) - shutdownHook } /** Cleanup local dirs and stop shuffle sender. */ private[spark] def stop() { // Remove the shutdown hook. It causes memory leaks if we leave it around. - try { - Runtime.getRuntime.removeShutdownHook(shutdownHook) - } catch { - case e: IllegalStateException => None - } + Utils.removeShutdownHook(shutdownHook) doStop() } diff --git a/core/src/main/scala/org/apache/spark/storage/TachyonBlockManager.scala b/core/src/main/scala/org/apache/spark/storage/TachyonBlockManager.scala index af873034215a9..951897cead996 100644 --- a/core/src/main/scala/org/apache/spark/storage/TachyonBlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/TachyonBlockManager.scala @@ -135,21 +135,19 @@ private[spark] class TachyonBlockManager( private def addShutdownHook() { tachyonDirs.foreach(tachyonDir => Utils.registerShutdownDeleteDir(tachyonDir)) - Runtime.getRuntime.addShutdownHook(new Thread("delete Spark tachyon dirs") { - override def run(): Unit = Utils.logUncaughtExceptions { - logDebug("Shutdown hook called") - tachyonDirs.foreach { tachyonDir => - try { - if (!Utils.hasRootAsShutdownDeleteDir(tachyonDir)) { - Utils.deleteRecursively(tachyonDir, client) - } - } catch { - case e: Exception => - logError("Exception while deleting tachyon spark dir: " + tachyonDir, e) + Utils.addShutdownHook { () => + logDebug("Shutdown hook called") + tachyonDirs.foreach { tachyonDir => + try { + if (!Utils.hasRootAsShutdownDeleteDir(tachyonDir)) { + Utils.deleteRecursively(tachyonDir, client) } + } catch { + case e: Exception => + logError("Exception while deleting tachyon spark dir: " + tachyonDir, e) } - client.close() } - }) + client.close() + } } } diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 1029b0f9fce1e..af33ae9b690ee 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -21,7 +21,7 @@ import java.io._ import java.lang.management.ManagementFactory import java.net._ import java.nio.ByteBuffer -import java.util.{Properties, Locale, Random, UUID} +import java.util.{PriorityQueue, Properties, Locale, Random, UUID} import java.util.concurrent._ import javax.net.ssl.HttpsURLConnection @@ -30,7 +30,7 @@ import scala.collection.Map import scala.collection.mutable.ArrayBuffer import scala.io.Source import scala.reflect.ClassTag -import scala.util.Try +import scala.util.{Failure, Success, Try} import scala.util.control.{ControlThrowable, NonFatal} import com.google.common.io.{ByteStreams, Files} @@ -64,9 +64,15 @@ private[spark] object CallSite { private[spark] object Utils extends Logging { val random = new Random() + val DEFAULT_SHUTDOWN_PRIORITY = 100 + private val MAX_DIR_CREATION_ATTEMPTS: Int = 10 @volatile private var localRootDirs: Array[String] = null + + private val shutdownHooks = new SparkShutdownHookManager() + shutdownHooks.install() + /** Serialize an object using Java serialization */ def serialize[T](o: T): Array[Byte] = { val bos = new ByteArrayOutputStream() @@ -176,18 +182,16 @@ private[spark] object Utils extends Logging { private val shutdownDeleteTachyonPaths = new scala.collection.mutable.HashSet[String]() // Add a shutdown hook to delete the temp dirs when the JVM exits - Runtime.getRuntime.addShutdownHook(new Thread("delete Spark temp dirs") { - override def run(): Unit = Utils.logUncaughtExceptions { - logDebug("Shutdown hook called") - shutdownDeletePaths.foreach { dirPath => - try { - Utils.deleteRecursively(new File(dirPath)) - } catch { - case e: Exception => logError(s"Exception while deleting Spark temp dir: $dirPath", e) - } + addShutdownHook { () => + logDebug("Shutdown hook called") + shutdownDeletePaths.foreach { dirPath => + try { + Utils.deleteRecursively(new File(dirPath)) + } catch { + case e: Exception => logError(s"Exception while deleting Spark temp dir: $dirPath", e) } } - }) + } // Register the path to be deleted via shutdown hook def registerShutdownDeleteDir(file: File) { @@ -613,7 +617,7 @@ private[spark] object Utils extends Logging { } Utils.setupSecureURLConnection(uc, securityMgr) - val timeoutMs = + val timeoutMs = conf.getTimeAsSeconds("spark.files.fetchTimeout", "60s").toInt * 1000 uc.setConnectTimeout(timeoutMs) uc.setReadTimeout(timeoutMs) @@ -1172,7 +1176,7 @@ private[spark] object Utils extends Logging { /** * Execute a block of code that evaluates to Unit, forwarding any uncaught exceptions to the * default UncaughtExceptionHandler - * + * * NOTE: This method is to be called by the spark-started JVM process. */ def tryOrExit(block: => Unit) { @@ -1185,11 +1189,11 @@ private[spark] object Utils extends Logging { } /** - * Execute a block of code that evaluates to Unit, stop SparkContext is there is any uncaught + * Execute a block of code that evaluates to Unit, stop SparkContext is there is any uncaught * exception - * - * NOTE: This method is to be called by the driver-side components to avoid stopping the - * user-started JVM process completely; in contrast, tryOrExit is to be called in the + * + * NOTE: This method is to be called by the driver-side components to avoid stopping the + * user-started JVM process completely; in contrast, tryOrExit is to be called in the * spark-started JVM process . */ def tryOrStopSparkContext(sc: SparkContext)(block: => Unit) { @@ -2132,6 +2136,93 @@ private[spark] object Utils extends Logging { .getOrElse(UserGroupInformation.getCurrentUser().getShortUserName()) } + /** + * Adds a shutdown hook with default priority. + */ + def addShutdownHook(hook: () => Unit): AnyRef = { + addShutdownHook(DEFAULT_SHUTDOWN_PRIORITY, hook) + } + + /** + * Adds a shutdown hook with the given priority. Hooks with lower priority values run + * first. + */ + def addShutdownHook(priority: Int, hook: () => Unit): AnyRef = { + shutdownHooks.add(priority, hook) + } + + /** + * Remove a previously installed shutdown hook. + */ + def removeShutdownHook(ref: AnyRef): Boolean = { + shutdownHooks.remove(ref) + } + +} + +private [util] class SparkShutdownHookManager { + + private val hooks = new PriorityQueue[SparkShutdownHook]() + private var shuttingDown = false + + /** + * Install a hook to run at shutdown and run all registered hooks in order. Hadoop 1.x does not + * have `ShutdownHookManager`, so in that case we just use the JVM's `Runtime` object and hope for + * the best. + */ + def install(): Unit = { + val hookTask = new Runnable() { + override def run(): Unit = runAll() + } + Try(Class.forName("org.apache.hadoop.util.ShutdownHookManager")) match { + case Success(shmClass) => + val fsPriority = classOf[FileSystem].getField("SHUTDOWN_HOOK_PRIORITY").get() + .asInstanceOf[Int] + val shm = shmClass.getMethod("get").invoke(null) + shm.getClass().getMethod("addShutdownHook", classOf[Runnable], classOf[Int]) + .invoke(shm, hookTask, Integer.valueOf(fsPriority + 30)) + + case Failure(_) => + Runtime.getRuntime.addShutdownHook(new Thread(hookTask, "Spark Shutdown Hook")); + } + } + + def runAll(): Unit = synchronized { + shuttingDown = true + while (!hooks.isEmpty()) { + Utils.logUncaughtExceptions(hooks.poll().run()) + } + } + + def add(priority: Int, hook: () => Unit): AnyRef = synchronized { + checkState() + val hookRef = new SparkShutdownHook(priority, hook) + hooks.add(hookRef) + hookRef + } + + def remove(ref: AnyRef): Boolean = synchronized { + checkState() + hooks.remove(ref) + } + + private def checkState(): Unit = { + if (shuttingDown) { + throw new IllegalStateException("Shutdown hooks cannot be modified during shutdown.") + } + } + +} + +private class SparkShutdownHook(private val priority: Int, hook: () => Unit) + extends Comparable[SparkShutdownHook] { + + override def compareTo(other: SparkShutdownHook): Int = { + other.priority - priority + } + + def run(): Unit = hook() + } /** diff --git a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala index fb97e650ff95c..1ba99803f5a0e 100644 --- a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala @@ -17,14 +17,16 @@ package org.apache.spark.util -import scala.util.Random - import java.io.{File, ByteArrayOutputStream, ByteArrayInputStream, FileOutputStream} import java.net.{BindException, ServerSocket, URI} import java.nio.{ByteBuffer, ByteOrder} import java.text.DecimalFormatSymbols import java.util.concurrent.TimeUnit import java.util.Locale +import java.util.PriorityQueue + +import scala.collection.mutable.ListBuffer +import scala.util.Random import com.google.common.base.Charsets.UTF_8 import com.google.common.io.Files @@ -36,14 +38,14 @@ import org.apache.hadoop.fs.Path import org.apache.spark.SparkConf class UtilsSuite extends FunSuite with ResetSystemProperties { - + test("timeConversion") { // Test -1 assert(Utils.timeStringAsSeconds("-1") === -1) - + // Test zero assert(Utils.timeStringAsSeconds("0") === 0) - + assert(Utils.timeStringAsSeconds("1") === 1) assert(Utils.timeStringAsSeconds("1s") === 1) assert(Utils.timeStringAsSeconds("1000ms") === 1) @@ -52,7 +54,7 @@ class UtilsSuite extends FunSuite with ResetSystemProperties { assert(Utils.timeStringAsSeconds("1min") === TimeUnit.MINUTES.toSeconds(1)) assert(Utils.timeStringAsSeconds("1h") === TimeUnit.HOURS.toSeconds(1)) assert(Utils.timeStringAsSeconds("1d") === TimeUnit.DAYS.toSeconds(1)) - + assert(Utils.timeStringAsMs("1") === 1) assert(Utils.timeStringAsMs("1ms") === 1) assert(Utils.timeStringAsMs("1000us") === 1) @@ -61,7 +63,7 @@ class UtilsSuite extends FunSuite with ResetSystemProperties { assert(Utils.timeStringAsMs("1min") === TimeUnit.MINUTES.toMillis(1)) assert(Utils.timeStringAsMs("1h") === TimeUnit.HOURS.toMillis(1)) assert(Utils.timeStringAsMs("1d") === TimeUnit.DAYS.toMillis(1)) - + // Test invalid strings intercept[NumberFormatException] { Utils.timeStringAsMs("This breaks 600s") @@ -79,7 +81,7 @@ class UtilsSuite extends FunSuite with ResetSystemProperties { Utils.timeStringAsMs("This 123s breaks") } } - + test("bytesToString") { assert(Utils.bytesToString(10) === "10.0 B") assert(Utils.bytesToString(1500) === "1500.0 B") @@ -466,4 +468,18 @@ class UtilsSuite extends FunSuite with ResetSystemProperties { val newFileName = new File(testFileDir, testFileName) assert(newFileName.isFile()) } + + test("shutdown hook manager") { + val manager = new SparkShutdownHookManager() + val output = new ListBuffer[Int]() + + val hook1 = manager.add(1, () => output += 1) + manager.add(3, () => output += 3) + manager.add(2, () => output += 2) + manager.add(4, () => output += 4) + manager.remove(hook1) + + manager.runAll() + assert(output.toList === List(4, 3, 2)) + } } diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala index c3a3f8c0f41df..832596fc8bee5 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala @@ -28,6 +28,7 @@ import org.apache.spark.annotation.DeveloperApi import org.apache.spark.sql.hive.HiveContext import org.apache.spark.sql.hive.thriftserver.ReflectionUtils._ import org.apache.spark.scheduler.{SparkListenerApplicationEnd, SparkListener} +import org.apache.spark.util.Utils /** * The main entry point for the Spark SQL port of HiveServer2. Starts up a `SparkSQLContext` and a @@ -57,13 +58,7 @@ object HiveThriftServer2 extends Logging { logInfo("Starting SparkContext") SparkSQLEnv.init() - Runtime.getRuntime.addShutdownHook( - new Thread() { - override def run() { - SparkSQLEnv.stop() - } - } - ) + Utils.addShutdownHook { () => SparkSQLEnv.stop() } try { val server = new HiveThriftServer2(SparkSQLEnv.hiveContext) diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala index 85281c6d73a3b..7e307bb4ad1e8 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala @@ -40,6 +40,7 @@ import org.apache.thrift.transport.TSocket import org.apache.spark.Logging import org.apache.spark.sql.hive.HiveShim +import org.apache.spark.util.Utils private[hive] object SparkSQLCLIDriver { private var prompt = "spark-sql" @@ -101,13 +102,7 @@ private[hive] object SparkSQLCLIDriver { SessionState.start(sessionState) // Clean up after we exit - Runtime.getRuntime.addShutdownHook( - new Thread() { - override def run() { - SparkSQLEnv.stop() - } - } - ) + Utils.addShutdownHook { () => SparkSQLEnv.stop() } // "-h" option has been passed, so connect to Hive thrift server. if (sessionState.getHost != null) { diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala index c357b7ae9d4da..a1e7d4c54acef 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala @@ -25,7 +25,6 @@ import java.net.{Socket, URL} import java.util.concurrent.atomic.AtomicReference import org.apache.hadoop.fs.{FileSystem, Path} -import org.apache.hadoop.util.ShutdownHookManager import org.apache.hadoop.yarn.api._ import org.apache.hadoop.yarn.api.records._ import org.apache.hadoop.yarn.conf.YarnConfiguration @@ -95,44 +94,38 @@ private[spark] class ApplicationMaster( logInfo("ApplicationAttemptId: " + appAttemptId) val fs = FileSystem.get(yarnConf) - val cleanupHook = new Runnable { - override def run() { - // If the SparkContext is still registered, shut it down as a best case effort in case - // users do not call sc.stop or do System.exit(). - val sc = sparkContextRef.get() - if (sc != null) { - logInfo("Invoking sc stop from shutdown hook") - sc.stop() - } - val maxAppAttempts = client.getMaxRegAttempts(sparkConf, yarnConf) - val isLastAttempt = client.getAttemptId().getAttemptId() >= maxAppAttempts - - if (!finished) { - // This happens when the user application calls System.exit(). We have the choice - // of either failing or succeeding at this point. We report success to avoid - // retrying applications that have succeeded (System.exit(0)), which means that - // applications that explicitly exit with a non-zero status will also show up as - // succeeded in the RM UI. - finish(finalStatus, - ApplicationMaster.EXIT_SUCCESS, - "Shutdown hook called before final status was reported.") - } - if (!unregistered) { - // we only want to unregister if we don't want the RM to retry - if (finalStatus == FinalApplicationStatus.SUCCEEDED || isLastAttempt) { - unregister(finalStatus, finalMsg) - cleanupStagingDir(fs) - } + Utils.addShutdownHook { () => + // If the SparkContext is still registered, shut it down as a best case effort in case + // users do not call sc.stop or do System.exit(). + val sc = sparkContextRef.get() + if (sc != null) { + logInfo("Invoking sc stop from shutdown hook") + sc.stop() + } + val maxAppAttempts = client.getMaxRegAttempts(sparkConf, yarnConf) + val isLastAttempt = client.getAttemptId().getAttemptId() >= maxAppAttempts + + if (!finished) { + // This happens when the user application calls System.exit(). We have the choice + // of either failing or succeeding at this point. We report success to avoid + // retrying applications that have succeeded (System.exit(0)), which means that + // applications that explicitly exit with a non-zero status will also show up as + // succeeded in the RM UI. + finish(finalStatus, + ApplicationMaster.EXIT_SUCCESS, + "Shutdown hook called before final status was reported.") + } + + if (!unregistered) { + // we only want to unregister if we don't want the RM to retry + if (finalStatus == FinalApplicationStatus.SUCCEEDED || isLastAttempt) { + unregister(finalStatus, finalMsg) + cleanupStagingDir(fs) } } } - // Use higher priority than FileSystem. - assert(ApplicationMaster.SHUTDOWN_HOOK_PRIORITY > FileSystem.SHUTDOWN_HOOK_PRIORITY) - ShutdownHookManager - .get().addShutdownHook(cleanupHook, ApplicationMaster.SHUTDOWN_HOOK_PRIORITY) - // Call this to force generation of secret so it gets populated into the // Hadoop UGI. This has to happen before the startUserApplication which does a // doAs in order for the credentials to be passed on to the executor containers. @@ -379,7 +372,7 @@ private[spark] class ApplicationMaster( logWarning( "spark.yarn.applicationMaster.waitTries is deprecated, use spark.yarn.am.waitTime") } - val totalWaitTime = sparkConf.getTimeAsMs("spark.yarn.am.waitTime", + val totalWaitTime = sparkConf.getTimeAsMs("spark.yarn.am.waitTime", s"${waitTries.getOrElse(100000L)}ms") val deadline = System.currentTimeMillis() + totalWaitTime @@ -553,8 +546,6 @@ private[spark] class ApplicationMaster( object ApplicationMaster extends Logging { - val SHUTDOWN_HOOK_PRIORITY: Int = 30 - // exit codes for different causes, no reason behind the values private val EXIT_SUCCESS = 0 private val EXIT_UNCAUGHT_EXCEPTION = 10