From 147d81a507da469edb096c34aeba51dd81c12cba Mon Sep 17 00:00:00 2001 From: Cheolsoo Park Date: Wed, 25 Feb 2015 11:18:46 -0800 Subject: [PATCH 1/4] Fix "file system closed" error thrown when closing spark-sql on YARN --- .../scheduler/EventLoggingListener.scala | 22 +++++++++++-------- 1 file changed, 13 insertions(+), 9 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala index 30075c172bdb1..ce68f4c0bbbaa 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala @@ -132,7 +132,9 @@ private[spark] class EventLoggingListener( writer.foreach(_.println(compact(render(eventJson)))) if (flushLogger) { writer.foreach(_.flush()) - hadoopDataStream.foreach(hadoopFlushMethod.invoke(_)) + if (!Utils.inShutdown()) { + hadoopDataStream.foreach(hadoopFlushMethod.invoke(_)) + } } if (testing) { loggedEvents += eventJson @@ -183,16 +185,18 @@ private[spark] class EventLoggingListener( def stop() = { writer.foreach(_.close()) - val target = new Path(logPath) - if (fileSystem.exists(target)) { - if (shouldOverwrite) { - logWarning(s"Event log $target already exists. Overwriting...") - fileSystem.delete(target, true) - } else { - throw new IOException("Target log file already exists (%s)".format(logPath)) + if (!Utils.inShutdown()) { + val target = new Path(logPath) + if (fileSystem.exists(target)) { + if (shouldOverwrite) { + logWarning(s"Event log $target already exists. Overwriting...") + fileSystem.delete(target, true) + } else { + throw new IOException("Target log file already exists (%s)".format(logPath)) + } } + fileSystem.rename(new Path(logPath + IN_PROGRESS), target) } - fileSystem.rename(new Path(logPath + IN_PROGRESS), target) } } From 0ded59a07f5af7c00680fc47c0319fdebf70ca64 Mon Sep 17 00:00:00 2001 From: Cheolsoo Park Date: Wed, 25 Feb 2015 20:29:10 -0800 Subject: [PATCH 2/4] Use ShutdownHookManager to add shutdown hook and set the priority higher than file system shutdown hook --- .../scheduler/EventLoggingListener.scala | 22 ++++++++----------- .../hive/thriftserver/SparkSQLCLIDriver.scala | 18 ++++++++++----- 2 files changed, 21 insertions(+), 19 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala index ce68f4c0bbbaa..30075c172bdb1 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala @@ -132,9 +132,7 @@ private[spark] class EventLoggingListener( writer.foreach(_.println(compact(render(eventJson)))) if (flushLogger) { writer.foreach(_.flush()) - if (!Utils.inShutdown()) { - hadoopDataStream.foreach(hadoopFlushMethod.invoke(_)) - } + hadoopDataStream.foreach(hadoopFlushMethod.invoke(_)) } if (testing) { loggedEvents += eventJson @@ -185,18 +183,16 @@ private[spark] class EventLoggingListener( def stop() = { writer.foreach(_.close()) - if (!Utils.inShutdown()) { - val target = new Path(logPath) - if (fileSystem.exists(target)) { - if (shouldOverwrite) { - logWarning(s"Event log $target already exists. Overwriting...") - fileSystem.delete(target, true) - } else { - throw new IOException("Target log file already exists (%s)".format(logPath)) - } + val target = new Path(logPath) + if (fileSystem.exists(target)) { + if (shouldOverwrite) { + logWarning(s"Event log $target already exists. Overwriting...") + fileSystem.delete(target, true) + } else { + throw new IOException("Target log file already exists (%s)".format(logPath)) } - fileSystem.rename(new Path(logPath + IN_PROGRESS), target) } + fileSystem.rename(new Path(logPath + IN_PROGRESS), target) } } diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala index 401e97b162dea..9711429cd8fa6 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala @@ -27,6 +27,7 @@ import jline.{ConsoleReader, History} import org.apache.commons.lang.StringUtils import org.apache.commons.logging.LogFactory import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.FileSystem import org.apache.hadoop.hive.cli.{CliDriver, CliSessionState, OptionsProcessor} import org.apache.hadoop.hive.common.LogUtils.LogInitializationException import org.apache.hadoop.hive.common.{HiveInterruptCallback, HiveInterruptUtils, LogUtils} @@ -36,12 +37,14 @@ import org.apache.hadoop.hive.ql.exec.Utilities import org.apache.hadoop.hive.ql.processors.{SetProcessor, CommandProcessor, CommandProcessorFactory} import org.apache.hadoop.hive.ql.session.SessionState import org.apache.hadoop.hive.shims.ShimLoader +import org.apache.hadoop.util.ShutdownHookManager import org.apache.thrift.transport.TSocket import org.apache.spark.Logging import org.apache.spark.sql.hive.HiveShim private[hive] object SparkSQLCLIDriver { + val SHUTDOWN_HOOK_PRIORITY: Int = 30 private var prompt = "spark-sql" private var continuedPrompt = "".padTo(prompt.length, ' ') private var transport:TSocket = _ @@ -101,13 +104,16 @@ private[hive] object SparkSQLCLIDriver { SessionState.start(sessionState) // Clean up after we exit - Runtime.getRuntime.addShutdownHook( - new Thread() { - override def run() { - SparkSQLEnv.stop() - } + val cleanupHook = new Runnable { + override def run() { + SparkSQLEnv.stop() } - ) + } + + // Use higher priority than FileSystem. + assert(SparkSQLCLIDriver.SHUTDOWN_HOOK_PRIORITY > FileSystem.SHUTDOWN_HOOK_PRIORITY) + ShutdownHookManager + .get().addShutdownHook(cleanupHook, SparkSQLCLIDriver.SHUTDOWN_HOOK_PRIORITY) // "-h" option has been passed, so connect to Hive thrift server. if (sessionState.getHost != null) { From 86d1baab36713e970fdbdb18dce0656c0584eeb1 Mon Sep 17 00:00:00 2001 From: Cheolsoo Park Date: Thu, 26 Feb 2015 07:34:18 -0800 Subject: [PATCH 3/4] Minor coding style improvement --- .../hive/thriftserver/SparkSQLCLIDriver.scala | 21 +++++++++---------- 1 file changed, 10 insertions(+), 11 deletions(-) diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala index 9711429cd8fa6..d1f72b108b742 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala @@ -103,17 +103,16 @@ private[hive] object SparkSQLCLIDriver { SessionState.start(sessionState) - // Clean up after we exit - val cleanupHook = new Runnable { - override def run() { - SparkSQLEnv.stop() - } - } - - // Use higher priority than FileSystem. - assert(SparkSQLCLIDriver.SHUTDOWN_HOOK_PRIORITY > FileSystem.SHUTDOWN_HOOK_PRIORITY) - ShutdownHookManager - .get().addShutdownHook(cleanupHook, SparkSQLCLIDriver.SHUTDOWN_HOOK_PRIORITY) + // Clean up after we exit. Use higher priority than FileSystem. + assert(SHUTDOWN_HOOK_PRIORITY > FileSystem.SHUTDOWN_HOOK_PRIORITY) + ShutdownHookManager.get().addShutdownHook( + new Runnable { + override def run() { + SparkSQLEnv.stop() + } + }, + SHUTDOWN_HOOK_PRIORITY + ) // "-h" option has been passed, so connect to Hive thrift server. if (sessionState.getHost != null) { From 46e73b38ac973247c2112af0d3e19a264a746220 Mon Sep 17 00:00:00 2001 From: Cheolsoo Park Date: Thu, 26 Feb 2015 08:09:14 -0800 Subject: [PATCH 4/4] Apply the same fix to HiveThriftServer2 --- .../sql/hive/thriftserver/HiveThriftServer2.scala | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala index 6e07df18b0e15..da857ba685bab 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala @@ -18,8 +18,10 @@ package org.apache.spark.sql.hive.thriftserver import org.apache.commons.logging.LogFactory +import org.apache.hadoop.fs.FileSystem import org.apache.hadoop.hive.conf.HiveConf import org.apache.hadoop.hive.conf.HiveConf.ConfVars +import org.apache.hadoop.util.ShutdownHookManager import org.apache.hive.service.cli.thrift.{ThriftBinaryCLIService, ThriftHttpCLIService} import org.apache.hive.service.server.{HiveServer2, ServerOptionsProcessor} @@ -34,6 +36,7 @@ import org.apache.spark.scheduler.{SparkListenerApplicationEnd, SparkListener} * `HiveThriftServer2` thrift server. */ object HiveThriftServer2 extends Logging { + val SHUTDOWN_HOOK_PRIORITY: Int = 30 var LOG = LogFactory.getLog(classOf[HiveServer2]) /** @@ -57,12 +60,15 @@ object HiveThriftServer2 extends Logging { logInfo("Starting SparkContext") SparkSQLEnv.init() - Runtime.getRuntime.addShutdownHook( - new Thread() { + // Clean up after we exit. Use higher priority than FileSystem. + assert(SHUTDOWN_HOOK_PRIORITY > FileSystem.SHUTDOWN_HOOK_PRIORITY) + ShutdownHookManager.get().addShutdownHook( + new Runnable { override def run() { SparkSQLEnv.stop() } - } + }, + SHUTDOWN_HOOK_PRIORITY ) try {