From 715b4ffd63c9a43f10ca8c8ae5ad653d862e5d49 Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Thu, 8 Oct 2015 18:39:28 -0700 Subject: [PATCH 1/4] [SPARK-11020] [core] Wait for HDFS to leave safe mode before initializing HS. Large HDFS clusters may take a while to leave safe mode when starting; this change makes the HS wait for that before doing checks about its configuraton. This means the HS won't stop right away if HDFS is in safe mode and the configuration is not correct, but that should be a very uncommon situation. --- .../deploy/history/FsHistoryProvider.scala | 104 +++++++++++++++++- .../history/FsHistoryProviderSuite.scala | 65 +++++++++++ 2 files changed, 166 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala index 5eb8adf97d90b..fceb53ae8a648 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala @@ -28,6 +28,7 @@ import com.google.common.io.ByteStreams import com.google.common.util.concurrent.{MoreExecutors, ThreadFactoryBuilder} import org.apache.hadoop.fs.{FileStatus, FileSystem, Path} import org.apache.hadoop.fs.permission.AccessControlException +import org.apache.hadoop.hdfs.DistributedFileSystem import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkException} import org.apache.spark.deploy.SparkHadoopUtil @@ -52,6 +53,10 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) private val NOT_STARTED = "" + // Interval between safemode checks. + private val SAFEMODE_CHECK_INTERVAL_S = conf.getTimeAsSeconds( + "spark.history.fs.safemodeCheck.interval", "5s") + // Interval between each check for event log updates private val UPDATE_INTERVAL_S = conf.getTimeAsSeconds("spark.history.fs.update.interval", "10s") @@ -107,9 +112,57 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) } } - initialize() + // Conf option used for testing the initialization code. + val initThread = if (!conf.getBoolean("spark.history.testing.skipInitialize", false)) { + initialize(None) + } else { + null + } + + private[history] def initialize(errorHandler: Option[Thread.UncaughtExceptionHandler]): Thread = { + if (!isFsInSafeMode()) { + startPolling() + return null + } + + // Cannot probe anything while the FS is in safe mode, so spawn a new thread that will wait + // for the FS to leave safe mode before enabling polling. This allows the main history server + // UI to be shown (so that the user can see the HDFS status). + // + // The synchronization in the run() method is needed because of the tests; mockito can + // misbehave if the test is modifying the mocked methods while the thread is calling + // them. + val initThread = new Thread(new Runnable() { + override def run(): Unit = { + try { + clock.synchronized { + while (isFsInSafeMode()) { + logInfo("HDFS is still in safe mode. Waiting...") + val deadline = clock.getTimeMillis() + + TimeUnit.SECONDS.toMillis(SAFEMODE_CHECK_INTERVAL_S) + clock.waitTillTime(deadline) + } + } + startPolling() + } catch { + case _: InterruptedException => + } + } + }) + initThread.setDaemon(true) + initThread.setName(s"${getClass().getSimpleName()}-init") + initThread.setUncaughtExceptionHandler(errorHandler.getOrElse( + new Thread.UncaughtExceptionHandler() { + override def uncaughtException(t: Thread, e: Throwable): Unit = { + logError("Error initializing FsHistoryProvider.", e) + System.exit(1) + } + })) + initThread.start() + initThread + } - private def initialize(): Unit = { + private def startPolling(): Unit = { // Validate the log directory. val path = new Path(logDir) if (!fs.exists(path)) { @@ -170,7 +223,21 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) } } - override def getConfig(): Map[String, String] = Map("Event log directory" -> logDir.toString) + override def getConfig(): Map[String, String] = { + val safeMode = if (isFsInSafeMode()) { + Map("HDFS State" -> "In safe mode.") + } else { + Map() + } + Map("Event log directory" -> logDir.toString) ++ safeMode + } + + override def stop(): Unit = { + if (initThread != null && initThread.isAlive()) { + initThread.interrupt() + initThread.join() + } + } /** * Builds the application list based on the current contents of the log directory. @@ -585,6 +652,37 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) } } + /** + * Checks whether HDFS is in safe mode. The API is slightly different between hadoop 1 and 2, + * so we have to resort to ugly reflection (as usual...). + * + * Note that DistributedFileSystem is a `@LimitedPrivate` class, which for all practical reasons + * makes it more public than not. + */ + private[history] def isFsInSafeMode(): Boolean = { + if (!fs.isInstanceOf[DistributedFileSystem]) { + return false + } + isFsInSafeMode(fs.asInstanceOf[DistributedFileSystem]) + } + + // For testing. + private[history] def isFsInSafeMode(dfs: DistributedFileSystem): Boolean = { + val hadoop1Class = "org.apache.hadoop.hdfs.protocol.FSConstants$SafeModeAction" + val hadoop2Class = "org.apache.hadoop.hdfs.protocol.HdfsConstants$SafeModeAction" + val actionClass: Class[_] = + try { + getClass().getClassLoader().loadClass(hadoop2Class) + } catch { + case _: ClassNotFoundException => + getClass().getClassLoader().loadClass(hadoop1Class) + } + + val action = actionClass.getField("SAFEMODE_GET").get(null) + val method = dfs.getClass().getMethod("setSafeMode", action.getClass()) + method.invoke(dfs, action).asInstanceOf[Boolean] + } + } private[history] object FsHistoryProvider { diff --git a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala index 73cff89544dc3..507faec54d973 100644 --- a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala @@ -24,13 +24,19 @@ import java.util.concurrent.TimeUnit import java.util.zip.{ZipInputStream, ZipOutputStream} import scala.io.Source +import scala.concurrent.duration._ +import scala.language.postfixOps import com.google.common.base.Charsets import com.google.common.io.{ByteStreams, Files} import org.apache.hadoop.fs.Path +import org.apache.hadoop.hdfs.DistributedFileSystem import org.json4s.jackson.JsonMethods._ +import org.mockito.Matchers.any +import org.mockito.Mockito.{doReturn, mock, spy, verify, when} import org.scalatest.BeforeAndAfter import org.scalatest.Matchers +import org.scalatest.concurrent.Eventually._ import org.apache.spark.{Logging, SparkConf, SparkFunSuite} import org.apache.spark.io._ @@ -407,6 +413,65 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc } } + test("provider correctly checks whether fs is in safe mode") { + val provider = spy(new FsHistoryProvider(createTestConf())) + val dfs = mock(classOf[DistributedFileSystem]) + // Asserts that safe mode is falsebecause we can't really control the return value of the mock, + // since the API is different between hadoop 1 and 2. + assert(!provider.isFsInSafeMode(dfs)) + } + + test("provider waits for safe mode to finish before initializing") { + val clock = new ManualClock() + val conf = createTestConf().set("spark.history.testing.skipInitialize", "true") + val provider = spy(new FsHistoryProvider(conf, clock)) + doReturn(true).when(provider).isFsInSafeMode() + + val initThread = provider.initialize(None) + try { + provider.getConfig().keys should contain ("HDFS State") + + clock.setTime(5000) + provider.getConfig().keys should contain ("HDFS State") + + // Synchronization needed because of mockito. + clock.synchronized { + doReturn(false).when(provider).isFsInSafeMode() + clock.setTime(10000) + } + + eventually(timeout(1 second), interval(10 millis)) { + provider.getConfig().keys should not contain ("HDFS State") + } + } finally { + provider.stop() + } + } + + test("provider reports error after FS leaves safe mode") { + testDir.delete() + val clock = new ManualClock() + val conf = createTestConf().set("spark.history.testing.skipInitialize", "true") + val provider = spy(new FsHistoryProvider(conf, clock)) + doReturn(true).when(provider).isFsInSafeMode() + + val errorHandler = mock(classOf[Thread.UncaughtExceptionHandler]) + val initThread = provider.initialize(Some(errorHandler)) + try { + // Synchronization needed because of mockito. + clock.synchronized { + doReturn(false).when(provider).isFsInSafeMode() + clock.setTime(10000) + } + + eventually(timeout(1 second), interval(10 millis)) { + verify(errorHandler).uncaughtException(any(), any()) + } + } finally { + provider.stop() + } + } + /** * Asks the provider to check for logs and calls a function to perform checks on the updated * app list. Example: From b07e1960ddd2560ad74bb4e6488dd206b7296572 Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Fri, 9 Oct 2015 10:38:24 -0700 Subject: [PATCH 2/4] Style. --- .../org/apache/spark/deploy/history/FsHistoryProvider.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala index fceb53ae8a648..85d0930c67779 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala @@ -223,7 +223,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) } } - override def getConfig(): Map[String, String] = { + override def getConfig(): Map[String, String] = { val safeMode = if (isFsInSafeMode()) { Map("HDFS State" -> "In safe mode.") } else { From c56b645f6d54b3fdfb814c545ab6dcbe87fab645 Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Fri, 9 Oct 2015 12:35:49 -0700 Subject: [PATCH 3/4] Small nit. --- .../apache/spark/deploy/history/FsHistoryProviderSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala index 507faec54d973..833aab14ca2da 100644 --- a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala @@ -416,7 +416,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc test("provider correctly checks whether fs is in safe mode") { val provider = spy(new FsHistoryProvider(createTestConf())) val dfs = mock(classOf[DistributedFileSystem]) - // Asserts that safe mode is falsebecause we can't really control the return value of the mock, + // Asserts that safe mode is false because we can't really control the return value of the mock, // since the API is different between hadoop 1 and 2. assert(!provider.isFsInSafeMode(dfs)) } From 91968442f071a351a897088ca9513dee38a40c09 Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Mon, 12 Oct 2015 13:43:11 -0700 Subject: [PATCH 4/4] Feedback. --- .../spark/deploy/history/FsHistoryProvider.scala | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala index 85d0930c67779..749e2957a62de 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala @@ -225,7 +225,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) override def getConfig(): Map[String, String] = { val safeMode = if (isFsInSafeMode()) { - Map("HDFS State" -> "In safe mode.") + Map("HDFS State" -> "In safe mode, application logs not available.") } else { Map() } @@ -659,11 +659,11 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) * Note that DistributedFileSystem is a `@LimitedPrivate` class, which for all practical reasons * makes it more public than not. */ - private[history] def isFsInSafeMode(): Boolean = { - if (!fs.isInstanceOf[DistributedFileSystem]) { - return false - } - isFsInSafeMode(fs.asInstanceOf[DistributedFileSystem]) + private[history] def isFsInSafeMode(): Boolean = fs match { + case dfs: DistributedFileSystem => + isFsInSafeMode(dfs) + case _ => + false } // For testing.