Skip to content

Commit

Permalink
[SPARK-6014] [core] Revamp Spark shutdown hooks, fix shutdown races.
Browse files Browse the repository at this point in the history
This change adds some new utility code to handle shutdown hooks in
Spark. The main goal is to take advantage of Hadoop 2.x's API for
shutdown hooks, which allows Spark to register a hook that will
run before the one that cleans up HDFS clients, and thus avoids
some races that would cause exceptions to show up and other issues
such as failure to properly close event logs.

Unfortunately, Hadoop 1.x does not have such APIs, so in that case
correctness is still left to chance.
  • Loading branch information
Marcelo Vanzin committed Apr 17, 2015
1 parent c84d916 commit e7039dc
Show file tree
Hide file tree
Showing 9 changed files with 187 additions and 115 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import org.apache.spark.{Logging, SecurityManager, SparkConf}
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.ui.{SparkUI, UIUtils, WebUI}
import org.apache.spark.ui.JettyUtils._
import org.apache.spark.util.SignalLogger
import org.apache.spark.util.{SignalLogger, Utils}

/**
* A web server that renders SparkUIs of completed applications.
Expand Down Expand Up @@ -194,9 +194,7 @@ object HistoryServer extends Logging {
val server = new HistoryServer(conf, provider, securityManager, port)
server.bind()

Runtime.getRuntime().addShutdownHook(new Thread("HistoryServerStopper") {
override def run(): Unit = server.stop()
})
Utils.addShutdownHook { () => server.stop() }

// Wait until the end of the world... or if the HistoryServer process is manually stopped
while(true) { Thread.sleep(Int.MaxValue) }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import com.google.common.io.Files
import org.apache.spark.{SparkConf, Logging}
import org.apache.spark.deploy.{ApplicationDescription, ExecutorState}
import org.apache.spark.deploy.DeployMessages.ExecutorStateChanged
import org.apache.spark.util.Utils
import org.apache.spark.util.logging.FileAppender

/**
Expand Down Expand Up @@ -61,20 +62,15 @@ private[deploy] class ExecutorRunner(

// NOTE: This is now redundant with the automated shut-down enforced by the Executor. It might
// make sense to remove this in the future.
private var shutdownHook: Thread = null
private var shutdownHook: AnyRef = null

private[worker] def start() {
workerThread = new Thread("ExecutorRunner for " + fullId) {
override def run() { fetchAndRunExecutor() }
}
workerThread.start()
// Shutdown hook that kills actors on shutdown.
shutdownHook = new Thread() {
override def run() {
killProcess(Some("Worker shutting down"))
}
}
Runtime.getRuntime.addShutdownHook(shutdownHook)
shutdownHook = Utils.addShutdownHook { () => killProcess(Some("Worker shutting down")) }
}

/**
Expand Down Expand Up @@ -106,7 +102,7 @@ private[deploy] class ExecutorRunner(
workerThread = null
state = ExecutorState.KILLED
try {
Runtime.getRuntime.removeShutdownHook(shutdownHook)
Utils.removeShutdownHook(shutdownHook)
} catch {
case e: IllegalStateException => None
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,25 +138,17 @@ private[spark] class DiskBlockManager(blockManager: BlockManager, conf: SparkCon
}
}

private def addShutdownHook(): Thread = {
val shutdownHook = new Thread("delete Spark local dirs") {
override def run(): Unit = Utils.logUncaughtExceptions {
logDebug("Shutdown hook called")
DiskBlockManager.this.doStop()
}
private def addShutdownHook(): AnyRef = {
Utils.addShutdownHook { () =>
logDebug("Shutdown hook called")
DiskBlockManager.this.doStop()
}
Runtime.getRuntime.addShutdownHook(shutdownHook)
shutdownHook
}

/** Cleanup local dirs and stop shuffle sender. */
private[spark] def stop() {
// Remove the shutdown hook. It causes memory leaks if we leave it around.
try {
Runtime.getRuntime.removeShutdownHook(shutdownHook)
} catch {
case e: IllegalStateException => None
}
Utils.removeShutdownHook(shutdownHook)
doStop()
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,21 +135,19 @@ private[spark] class TachyonBlockManager(

private def addShutdownHook() {
tachyonDirs.foreach(tachyonDir => Utils.registerShutdownDeleteDir(tachyonDir))
Runtime.getRuntime.addShutdownHook(new Thread("delete Spark tachyon dirs") {
override def run(): Unit = Utils.logUncaughtExceptions {
logDebug("Shutdown hook called")
tachyonDirs.foreach { tachyonDir =>
try {
if (!Utils.hasRootAsShutdownDeleteDir(tachyonDir)) {
Utils.deleteRecursively(tachyonDir, client)
}
} catch {
case e: Exception =>
logError("Exception while deleting tachyon spark dir: " + tachyonDir, e)
Utils.addShutdownHook { () =>
logDebug("Shutdown hook called")
tachyonDirs.foreach { tachyonDir =>
try {
if (!Utils.hasRootAsShutdownDeleteDir(tachyonDir)) {
Utils.deleteRecursively(tachyonDir, client)
}
} catch {
case e: Exception =>
logError("Exception while deleting tachyon spark dir: " + tachyonDir, e)
}
client.close()
}
})
client.close()
}
}
}
127 changes: 109 additions & 18 deletions core/src/main/scala/org/apache/spark/util/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import java.io._
import java.lang.management.ManagementFactory
import java.net._
import java.nio.ByteBuffer
import java.util.{Properties, Locale, Random, UUID}
import java.util.{PriorityQueue, Properties, Locale, Random, UUID}
import java.util.concurrent._
import javax.net.ssl.HttpsURLConnection

Expand All @@ -30,7 +30,7 @@ import scala.collection.Map
import scala.collection.mutable.ArrayBuffer
import scala.io.Source
import scala.reflect.ClassTag
import scala.util.Try
import scala.util.{Failure, Success, Try}
import scala.util.control.{ControlThrowable, NonFatal}

import com.google.common.io.{ByteStreams, Files}
Expand Down Expand Up @@ -64,9 +64,15 @@ private[spark] object CallSite {
private[spark] object Utils extends Logging {
val random = new Random()

val DEFAULT_SHUTDOWN_PRIORITY = 100

private val MAX_DIR_CREATION_ATTEMPTS: Int = 10
@volatile private var localRootDirs: Array[String] = null


private val shutdownHooks = new SparkShutdownHookManager()
shutdownHooks.install()

/** Serialize an object using Java serialization */
def serialize[T](o: T): Array[Byte] = {
val bos = new ByteArrayOutputStream()
Expand Down Expand Up @@ -176,18 +182,16 @@ private[spark] object Utils extends Logging {
private val shutdownDeleteTachyonPaths = new scala.collection.mutable.HashSet[String]()

// Add a shutdown hook to delete the temp dirs when the JVM exits
Runtime.getRuntime.addShutdownHook(new Thread("delete Spark temp dirs") {
override def run(): Unit = Utils.logUncaughtExceptions {
logDebug("Shutdown hook called")
shutdownDeletePaths.foreach { dirPath =>
try {
Utils.deleteRecursively(new File(dirPath))
} catch {
case e: Exception => logError(s"Exception while deleting Spark temp dir: $dirPath", e)
}
addShutdownHook { () =>
logDebug("Shutdown hook called")
shutdownDeletePaths.foreach { dirPath =>
try {
Utils.deleteRecursively(new File(dirPath))
} catch {
case e: Exception => logError(s"Exception while deleting Spark temp dir: $dirPath", e)
}
}
})
}

// Register the path to be deleted via shutdown hook
def registerShutdownDeleteDir(file: File) {
Expand Down Expand Up @@ -613,7 +617,7 @@ private[spark] object Utils extends Logging {
}
Utils.setupSecureURLConnection(uc, securityMgr)

val timeoutMs =
val timeoutMs =
conf.getTimeAsSeconds("spark.files.fetchTimeout", "60s").toInt * 1000
uc.setConnectTimeout(timeoutMs)
uc.setReadTimeout(timeoutMs)
Expand Down Expand Up @@ -1172,7 +1176,7 @@ private[spark] object Utils extends Logging {
/**
* Execute a block of code that evaluates to Unit, forwarding any uncaught exceptions to the
* default UncaughtExceptionHandler
*
*
* NOTE: This method is to be called by the spark-started JVM process.
*/
def tryOrExit(block: => Unit) {
Expand All @@ -1185,11 +1189,11 @@ private[spark] object Utils extends Logging {
}

/**
* Execute a block of code that evaluates to Unit, stop SparkContext is there is any uncaught
* Execute a block of code that evaluates to Unit, stop SparkContext is there is any uncaught
* exception
*
* NOTE: This method is to be called by the driver-side components to avoid stopping the
* user-started JVM process completely; in contrast, tryOrExit is to be called in the
*
* NOTE: This method is to be called by the driver-side components to avoid stopping the
* user-started JVM process completely; in contrast, tryOrExit is to be called in the
* spark-started JVM process .
*/
def tryOrStopSparkContext(sc: SparkContext)(block: => Unit) {
Expand Down Expand Up @@ -2132,6 +2136,93 @@ private[spark] object Utils extends Logging {
.getOrElse(UserGroupInformation.getCurrentUser().getShortUserName())
}

/**
* Adds a shutdown hook with default priority.
*/
def addShutdownHook(hook: () => Unit): AnyRef = {
addShutdownHook(DEFAULT_SHUTDOWN_PRIORITY, hook)
}

/**
* Adds a shutdown hook with the given priority. Hooks with lower priority values run
* first.
*/
def addShutdownHook(priority: Int, hook: () => Unit): AnyRef = {
shutdownHooks.add(priority, hook)
}

/**
* Remove a previously installed shutdown hook.
*/
def removeShutdownHook(ref: AnyRef): Boolean = {
shutdownHooks.remove(ref)
}

}

private [util] class SparkShutdownHookManager {

private val hooks = new PriorityQueue[SparkShutdownHook]()
private var shuttingDown = false

/**
* Install a hook to run at shutdown and run all registered hooks in order. Hadoop 1.x does not
* have `ShutdownHookManager`, so in that case we just use the JVM's `Runtime` object and hope for
* the best.
*/
def install(): Unit = {
val hookTask = new Runnable() {
override def run(): Unit = runAll()
}
Try(Class.forName("org.apache.hadoop.util.ShutdownHookManager")) match {
case Success(shmClass) =>
val fsPriority = classOf[FileSystem].getField("SHUTDOWN_HOOK_PRIORITY").get()
.asInstanceOf[Int]
val shm = shmClass.getMethod("get").invoke(null)
shm.getClass().getMethod("addShutdownHook", classOf[Runnable], classOf[Int])
.invoke(shm, hookTask, Integer.valueOf(fsPriority + 30))

case Failure(_) =>
Runtime.getRuntime.addShutdownHook(new Thread(hookTask, "Spark Shutdown Hook"));
}
}

def runAll(): Unit = synchronized {
shuttingDown = true
while (!hooks.isEmpty()) {
Utils.logUncaughtExceptions(hooks.poll().run())
}
}

def add(priority: Int, hook: () => Unit): AnyRef = synchronized {
checkState()
val hookRef = new SparkShutdownHook(priority, hook)
hooks.add(hookRef)
hookRef
}

def remove(ref: AnyRef): Boolean = synchronized {
checkState()
hooks.remove(ref)
}

private def checkState(): Unit = {
if (shuttingDown) {
throw new IllegalStateException("Shutdown hooks cannot be modified during shutdown.")
}
}

}

private class SparkShutdownHook(private val priority: Int, hook: () => Unit)
extends Comparable[SparkShutdownHook] {

override def compareTo(other: SparkShutdownHook): Int = {
other.priority - priority
}

def run(): Unit = hook()

}

/**
Expand Down
32 changes: 24 additions & 8 deletions core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,16 @@

package org.apache.spark.util

import scala.util.Random

import java.io.{File, ByteArrayOutputStream, ByteArrayInputStream, FileOutputStream}
import java.net.{BindException, ServerSocket, URI}
import java.nio.{ByteBuffer, ByteOrder}
import java.text.DecimalFormatSymbols
import java.util.concurrent.TimeUnit
import java.util.Locale
import java.util.PriorityQueue

import scala.collection.mutable.ListBuffer
import scala.util.Random

import com.google.common.base.Charsets.UTF_8
import com.google.common.io.Files
Expand All @@ -36,14 +38,14 @@ import org.apache.hadoop.fs.Path
import org.apache.spark.SparkConf

class UtilsSuite extends FunSuite with ResetSystemProperties {

test("timeConversion") {
// Test -1
assert(Utils.timeStringAsSeconds("-1") === -1)

// Test zero
assert(Utils.timeStringAsSeconds("0") === 0)

assert(Utils.timeStringAsSeconds("1") === 1)
assert(Utils.timeStringAsSeconds("1s") === 1)
assert(Utils.timeStringAsSeconds("1000ms") === 1)
Expand All @@ -52,7 +54,7 @@ class UtilsSuite extends FunSuite with ResetSystemProperties {
assert(Utils.timeStringAsSeconds("1min") === TimeUnit.MINUTES.toSeconds(1))
assert(Utils.timeStringAsSeconds("1h") === TimeUnit.HOURS.toSeconds(1))
assert(Utils.timeStringAsSeconds("1d") === TimeUnit.DAYS.toSeconds(1))

assert(Utils.timeStringAsMs("1") === 1)
assert(Utils.timeStringAsMs("1ms") === 1)
assert(Utils.timeStringAsMs("1000us") === 1)
Expand All @@ -61,7 +63,7 @@ class UtilsSuite extends FunSuite with ResetSystemProperties {
assert(Utils.timeStringAsMs("1min") === TimeUnit.MINUTES.toMillis(1))
assert(Utils.timeStringAsMs("1h") === TimeUnit.HOURS.toMillis(1))
assert(Utils.timeStringAsMs("1d") === TimeUnit.DAYS.toMillis(1))

// Test invalid strings
intercept[NumberFormatException] {
Utils.timeStringAsMs("This breaks 600s")
Expand All @@ -79,7 +81,7 @@ class UtilsSuite extends FunSuite with ResetSystemProperties {
Utils.timeStringAsMs("This 123s breaks")
}
}

test("bytesToString") {
assert(Utils.bytesToString(10) === "10.0 B")
assert(Utils.bytesToString(1500) === "1500.0 B")
Expand Down Expand Up @@ -466,4 +468,18 @@ class UtilsSuite extends FunSuite with ResetSystemProperties {
val newFileName = new File(testFileDir, testFileName)
assert(newFileName.isFile())
}

test("shutdown hook manager") {
val manager = new SparkShutdownHookManager()
val output = new ListBuffer[Int]()

val hook1 = manager.add(1, () => output += 1)
manager.add(3, () => output += 3)
manager.add(2, () => output += 2)
manager.add(4, () => output += 4)
manager.remove(hook1)

manager.runAll()
assert(output.toList === List(4, 3, 2))
}
}
Loading

0 comments on commit e7039dc

Please sign in to comment.