From 52641db34bdb9fac5b5808fa2c6334837eeedbc3 Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Wed, 20 Dec 2017 10:29:51 -0800 Subject: [PATCH] [SPARK-22836][ui] Show driver logs in UI when available. Port code from the old executors listener to the new one, so that the driver logs present in the application start event are kept. --- .../spark/status/AppStatusListener.scala | 11 +++++++++++ .../spark/status/AppStatusListenerSuite.scala | 18 ++++++++++++++++++ 2 files changed, 29 insertions(+) diff --git a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala index 1fb7b76d43d04..f0de1e6cce1fb 100644 --- a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala +++ b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala @@ -114,6 +114,17 @@ private[spark] class AppStatusListener( kvstore.write(new ApplicationInfoWrapper(appInfo)) kvstore.write(appSummary) + + // Update the driver block manager with logs from this event. The SparkContext initialization + // code registers the driver before this event is sent. + event.driverLogs.foreach { logs => + val driver = liveExecutors.get(SparkContext.DRIVER_IDENTIFIER) + .orElse(liveExecutors.get(SparkContext.LEGACY_DRIVER_IDENTIFIER)) + driver.foreach { d => + d.executorLogs = logs.toMap + update(d, System.nanoTime()) + } + } } override def onEnvironmentUpdate(event: SparkListenerEnvironmentUpdate): Unit = { diff --git a/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala b/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala index 9cf4f7efb24a8..d5345b933ed39 100644 --- a/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala @@ -939,6 +939,24 @@ class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter { } } + test("driver logs") { + val listener = new AppStatusListener(store, conf, true) + + val driver = BlockManagerId(SparkContext.DRIVER_IDENTIFIER, "localhost", 42) + listener.onBlockManagerAdded(SparkListenerBlockManagerAdded(time, driver, 42L)) + listener.onApplicationStart(SparkListenerApplicationStart( + "name", + Some("id"), + time, + "user", + Some("attempt"), + Some(Map("stdout" -> "file.txt")))) + + check[ExecutorSummaryWrapper](SparkContext.DRIVER_IDENTIFIER) { d => + assert(d.info.executorLogs("stdout") === "file.txt") + } + } + private def key(stage: StageInfo): Array[Int] = Array(stage.stageId, stage.attemptId) private def check[T: ClassTag](key: Any)(fn: T => Unit): Unit = {