From f7741a0d343ec633a38316e489b709e84d1ccf96 Mon Sep 17 00:00:00 2001 From: Tao Lin Date: Fri, 22 Jul 2016 23:04:03 +0800 Subject: [PATCH] add session support in the web UI --- .../apache/spark/SparkFirehoseListener.java | 5 +++ .../scheduler/EventLoggingListener.scala | 2 + .../spark/scheduler/SparkListener.scala | 11 +++++ .../spark/scheduler/SparkListenerBus.scala | 2 + .../scala/org/apache/spark/ui/SparkUI.scala | 11 +++-- .../apache/spark/ui/session/SessionPage.scala | 43 ++++++++++++++++++ .../apache/spark/ui/session/SessionTab.scala | 45 +++++++++++++++++++ .../org/apache/spark/util/JsonProtocol.scala | 16 +++++++ .../scheduler/EventLoggingListenerSuite.scala | 1 + .../apache/spark/util/JsonProtocolSuite.scala | 29 ++++++++++++ .../org/apache/spark/sql/RuntimeConfig.scala | 24 +++++++++- .../org/apache/spark/sql/SparkSession.scala | 9 ++-- 12 files changed, 190 insertions(+), 8 deletions(-) create mode 100644 core/src/main/scala/org/apache/spark/ui/session/SessionPage.scala create mode 100644 core/src/main/scala/org/apache/spark/ui/session/SessionTab.scala diff --git a/core/src/main/java/org/apache/spark/SparkFirehoseListener.java b/core/src/main/java/org/apache/spark/SparkFirehoseListener.java index 97eed611e8f9a..965600135bcb1 100644 --- a/core/src/main/java/org/apache/spark/SparkFirehoseListener.java +++ b/core/src/main/java/org/apache/spark/SparkFirehoseListener.java @@ -72,6 +72,11 @@ public final void onEnvironmentUpdate(SparkListenerEnvironmentUpdate environment onEvent(environmentUpdate); } + @Override + public final void onSessionUpdate(SparkListenerSessionUpdate sessionUpdate) { + onEvent(sessionUpdate); + } + @Override public final void onBlockManagerAdded(SparkListenerBlockManagerAdded blockManagerAdded) { onEvent(blockManagerAdded); diff --git a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala index a7d06391176d2..505326a3d1c1c 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala @@ -158,6 +158,8 @@ private[spark] class EventLoggingListener( override def onEnvironmentUpdate(event: SparkListenerEnvironmentUpdate): Unit = logEvent(event) + override def onSessionUpdate(event: SparkListenerSessionUpdate): Unit = logEvent(event) + // Events that trigger a flush override def onStageCompleted(event: SparkListenerStageCompleted): Unit = { logEvent(event, flushLogger = true) diff --git a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala index 7618dfeeedf8d..d2dbcb99c9d6a 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala @@ -86,6 +86,10 @@ case class SparkListenerJobEnd( case class SparkListenerEnvironmentUpdate(environmentDetails: Map[String, Seq[(String, String)]]) extends SparkListenerEvent +@DeveloperApi +case class SparkListenerSessionUpdate(sessionDetails: Map[String, String]) + extends SparkListenerEvent + @DeveloperApi case class SparkListenerBlockManagerAdded(time: Long, blockManagerId: BlockManagerId, maxMem: Long) extends SparkListenerEvent @@ -198,6 +202,11 @@ private[spark] trait SparkListenerInterface { */ def onEnvironmentUpdate(environmentUpdate: SparkListenerEnvironmentUpdate): Unit + /** + * Called when session properties have been updated + */ + def onSessionUpdate(sessionUpdate: SparkListenerSessionUpdate): Unit + /** * Called when a new block manager has joined */ @@ -275,6 +284,8 @@ abstract class SparkListener extends SparkListenerInterface { override def onEnvironmentUpdate(environmentUpdate: SparkListenerEnvironmentUpdate): Unit = { } + override def onSessionUpdate(sessionUpdate: SparkListenerSessionUpdate): Unit = { } + override def onBlockManagerAdded(blockManagerAdded: SparkListenerBlockManagerAdded): Unit = { } override def onBlockManagerRemoved( diff --git a/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala b/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala index 471586ac0852a..8d6c5d521a02b 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala @@ -45,6 +45,8 @@ private[spark] trait SparkListenerBus listener.onTaskEnd(taskEnd) case environmentUpdate: SparkListenerEnvironmentUpdate => listener.onEnvironmentUpdate(environmentUpdate) + case sessionUpdate: SparkListenerSessionUpdate => + listener.onSessionUpdate(sessionUpdate) case blockManagerAdded: SparkListenerBlockManagerAdded => listener.onBlockManagerAdded(blockManagerAdded) case blockManagerRemoved: SparkListenerBlockManagerRemoved => diff --git a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala index 39155ff2649ec..8c9d33fddb88b 100644 --- a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala +++ b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala @@ -32,6 +32,7 @@ import org.apache.spark.ui.env.{EnvironmentListener, EnvironmentTab} import org.apache.spark.ui.exec.{ExecutorsListener, ExecutorsTab} import org.apache.spark.ui.jobs.{JobProgressListener, JobsTab, StagesTab} import org.apache.spark.ui.scope.RDDOperationGraphListener +import org.apache.spark.ui.session.{SessionListener, SessionTab} import org.apache.spark.ui.storage.{StorageListener, StorageTab} import org.apache.spark.util.Utils @@ -43,6 +44,7 @@ private[spark] class SparkUI private ( val conf: SparkConf, securityManager: SecurityManager, val environmentListener: EnvironmentListener, + val sessionListener: SessionListener, val storageStatusListener: StorageStatusListener, val executorsListener: ExecutorsListener, val jobProgressListener: JobProgressListener, @@ -69,6 +71,7 @@ private[spark] class SparkUI private ( attachTab(stagesTab) attachTab(new StorageTab(this)) attachTab(new EnvironmentTab(this)) + attachTab(new SessionTab(this)) attachTab(new ExecutorsTab(this)) attachHandler(createStaticHandler(SparkUI.STATIC_RESOURCE_DIR, "/static")) attachHandler(createRedirectHandler("/", "/jobs/", basePath = basePath)) @@ -201,19 +204,21 @@ private[spark] object SparkUI { } val environmentListener = new EnvironmentListener + val sessionListener = new SessionListener val storageStatusListener = new StorageStatusListener(conf) val executorsListener = new ExecutorsListener(storageStatusListener, conf) val storageListener = new StorageListener(storageStatusListener) val operationGraphListener = new RDDOperationGraphListener(conf) listenerBus.addListener(environmentListener) + listenerBus.addListener(sessionListener) listenerBus.addListener(storageStatusListener) listenerBus.addListener(executorsListener) listenerBus.addListener(storageListener) listenerBus.addListener(operationGraphListener) - new SparkUI(sc, conf, securityManager, environmentListener, storageStatusListener, - executorsListener, _jobProgressListener, storageListener, operationGraphListener, - appName, basePath, startTime) + new SparkUI(sc, conf, securityManager, environmentListener, sessionListener, + storageStatusListener, executorsListener, _jobProgressListener, storageListener, + operationGraphListener, appName, basePath, startTime) } } diff --git a/core/src/main/scala/org/apache/spark/ui/session/SessionPage.scala b/core/src/main/scala/org/apache/spark/ui/session/SessionPage.scala new file mode 100644 index 0000000000000..752725b10a91b --- /dev/null +++ b/core/src/main/scala/org/apache/spark/ui/session/SessionPage.scala @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.ui.session + +import javax.servlet.http.HttpServletRequest + +import scala.xml.Node + +import org.apache.spark.ui.{UIUtils, WebUIPage} + +private[ui] class SessionPage(parent: SessionTab) extends WebUIPage("") { + private val listener = parent.listener + + def render(request: HttpServletRequest): Seq[Node] = { + val sessionPropertiesTable = UIUtils.listingTable( + propertyHeader, propertyRow, listener.sessionDetails.toSeq.sorted, fixedWidth = true) + val content = + +

Session Properties

{sessionPropertiesTable} +
+ + UIUtils.headerSparkPage("Session", content, parent) + } + + private def propertyHeader = Seq("Name", "Value") + private def propertyRow(kv: (String, String)) = {kv._1}{kv._2} +} + diff --git a/core/src/main/scala/org/apache/spark/ui/session/SessionTab.scala b/core/src/main/scala/org/apache/spark/ui/session/SessionTab.scala new file mode 100644 index 0000000000000..d8a0534e8cdc9 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/ui/session/SessionTab.scala @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.ui.session + +import scala.collection.Map + +import org.apache.spark.annotation.DeveloperApi +import org.apache.spark.scheduler.{SparkListener, SparkListenerSessionUpdate, _} +import org.apache.spark.ui.{SparkUI, SparkUITab, _} + +private[ui] class SessionTab(parent: SparkUI) extends SparkUITab(parent, "session") { + val listener = parent.sessionListener + attachPage(new SessionPage(this)) +} + +/** + * :: DeveloperApi :: + * A SparkListener that prepares information to be displayed on the SessionTab + */ +@DeveloperApi +class SessionListener extends SparkListener { + var sessionDetails = Map[String, String]() + + override def onSessionUpdate(sessionUpdate: SparkListenerSessionUpdate) { + synchronized { + sessionDetails = sessionUpdate.sessionDetails + } + } +} + diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala index 022b226894105..b1f79eff23355 100644 --- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala +++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala @@ -80,6 +80,8 @@ private[spark] object JsonProtocol { jobEndToJson(jobEnd) case environmentUpdate: SparkListenerEnvironmentUpdate => environmentUpdateToJson(environmentUpdate) + case sessionUpdate: SparkListenerSessionUpdate => + sessionUpdateToJson(sessionUpdate) case blockManagerAdded: SparkListenerBlockManagerAdded => blockManagerAddedToJson(blockManagerAdded) case blockManagerRemoved: SparkListenerBlockManagerRemoved => @@ -177,6 +179,13 @@ private[spark] object JsonProtocol { ("Classpath Entries" -> classpathEntries) } + def sessionUpdateToJson(sessionUpdate: SparkListenerSessionUpdate): JValue = { + val sessionDetails = sessionUpdate.sessionDetails + val sessionConf = mapToJson(sessionDetails.toMap) + ("Event" -> Utils.getFormattedClassName(sessionUpdate)) ~ + ("Session Properties" -> sessionConf) + } + def blockManagerAddedToJson(blockManagerAdded: SparkListenerBlockManagerAdded): JValue = { val blockManagerId = blockManagerIdToJson(blockManagerAdded.blockManagerId) ("Event" -> Utils.getFormattedClassName(blockManagerAdded)) ~ @@ -491,6 +500,7 @@ private[spark] object JsonProtocol { val jobStart = Utils.getFormattedClassName(SparkListenerJobStart) val jobEnd = Utils.getFormattedClassName(SparkListenerJobEnd) val environmentUpdate = Utils.getFormattedClassName(SparkListenerEnvironmentUpdate) + val sessionUpdate = Utils.getFormattedClassName(SparkListenerSessionUpdate) val blockManagerAdded = Utils.getFormattedClassName(SparkListenerBlockManagerAdded) val blockManagerRemoved = Utils.getFormattedClassName(SparkListenerBlockManagerRemoved) val unpersistRDD = Utils.getFormattedClassName(SparkListenerUnpersistRDD) @@ -510,6 +520,7 @@ private[spark] object JsonProtocol { case `jobStart` => jobStartFromJson(json) case `jobEnd` => jobEndFromJson(json) case `environmentUpdate` => environmentUpdateFromJson(json) + case `sessionUpdate` => sessionUpdateFromJson(json) case `blockManagerAdded` => blockManagerAddedFromJson(json) case `blockManagerRemoved` => blockManagerRemovedFromJson(json) case `unpersistRDD` => unpersistRDDFromJson(json) @@ -590,6 +601,11 @@ private[spark] object JsonProtocol { SparkListenerEnvironmentUpdate(environmentDetails) } + def sessionUpdateFromJson(json: JValue): SparkListenerSessionUpdate = { + val sessionDetails = mapFromJson(json \ "Session Properties") + SparkListenerSessionUpdate(sessionDetails) + } + def blockManagerAddedFromJson(json: JValue): SparkListenerBlockManagerAdded = { val blockManagerId = blockManagerIdFromJson(json \ "Block Manager ID") val maxMem = (json \ "Maximum Memory").extract[Long] diff --git a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala index c4c80b5b57daa..49b3321be3f11 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala @@ -209,6 +209,7 @@ class EventLoggingListenerSuite extends SparkFunSuite with LocalSparkContext wit SparkListenerBlockManagerAdded, SparkListenerExecutorAdded, SparkListenerEnvironmentUpdate, + SparkListenerSessionUpdate, SparkListenerJobStart, SparkListenerJobEnd, SparkListenerStageSubmitted, diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala index 85ca9d39d4a3f..6ce603af4e129 100644 --- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala @@ -67,6 +67,10 @@ class JsonProtocolSuite extends SparkFunSuite { "System Properties" -> Seq(("Username", "guest"), ("Password", "guest")), "Classpath Entries" -> Seq(("Super library", "/tmp/super_library")) )) + val sessionUpdate = SparkListenerSessionUpdate(Map[String, String]( + "spark.app.name" -> "Spark shell", + "spark.driver.port" -> "1234" + )) val blockManagerAdded = SparkListenerBlockManagerAdded(1L, BlockManagerId("Stars", "In your multitude...", 300), 500) val blockManagerRemoved = SparkListenerBlockManagerRemoved(2L, @@ -100,6 +104,7 @@ class JsonProtocolSuite extends SparkFunSuite { testEvent(jobStart, jobStartJsonString) testEvent(jobEnd, jobEndJsonString) testEvent(environmentUpdate, environmentUpdateJsonString) + testEvent(sessionUpdate, sessionUpdateJsonString) testEvent(blockManagerAdded, blockManagerAddedJsonString) testEvent(blockManagerRemoved, blockManagerRemovedJsonString) testEvent(unpersistRdd, unpersistRDDJsonString) @@ -528,6 +533,8 @@ private[spark] object JsonProtocolSuite extends Assertions { assertEquals(e1.jobResult, e2.jobResult) case (e1: SparkListenerEnvironmentUpdate, e2: SparkListenerEnvironmentUpdate) => assertEquals(e1.environmentDetails, e2.environmentDetails) + case (e1: SparkListenerSessionUpdate, e2: SparkListenerSessionUpdate) => + assertEquals_(e1.sessionDetails, e2.sessionDetails) case (e1: SparkListenerExecutorAdded, e2: SparkListenerExecutorAdded) => assert(e1.executorId === e1.executorId) assertEquals(e1.executorInfo, e2.executorInfo) @@ -684,6 +691,17 @@ private[spark] object JsonProtocolSuite extends Assertions { } } + // TODO: Anyway to overwrite the assertEquals(Map[String, Seq[(String, String)]], + // Map[String, Seq[(String, String)]]) above? It reports that assertEquals(Map, Map) + // is already defined in the scope, if I use assertEquals as the function name here. + private def assertEquals_(details1: Map[String, String], details2: Map[String, String]) { + details1.zip(details2).foreach { + case ((key1, values1), (key2, values2)) => + assert(key1 === key2) + assert(values1 === values2) + } + } + private def assertEquals(exception1: Exception, exception2: Exception) { assert(exception1.getMessage === exception2.getMessage) assertSeqEquals( @@ -1666,6 +1684,17 @@ private[spark] object JsonProtocolSuite extends Assertions { |} """.stripMargin + private val sessionUpdateJsonString = + """ + |{ + | "Event": "SparkListenerSessionUpdate", + | "Session Properties": { + | "spark.app.name": "Spark shell", + | "spark.driver.port": "1234" + | } + |} + """.stripMargin + private val blockManagerAddedJsonString = """ |{ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/RuntimeConfig.scala b/sql/core/src/main/scala/org/apache/spark/sql/RuntimeConfig.scala index 7e07e0cb84a87..b4700dd6b9ce0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/RuntimeConfig.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/RuntimeConfig.scala @@ -17,7 +17,10 @@ package org.apache.spark.sql +import scala.collection.mutable.HashMap + import org.apache.spark.internal.config.{ConfigEntry, OptionalConfigEntry} +import org.apache.spark.scheduler.{LiveListenerBus, SparkListenerSessionUpdate} import org.apache.spark.sql.internal.SQLConf @@ -28,7 +31,8 @@ import org.apache.spark.sql.internal.SQLConf * * @since 2.0.0 */ -class RuntimeConfig private[sql](sqlConf: SQLConf = new SQLConf) { +class RuntimeConfig private[sql](sqlConf: SQLConf = new SQLConf, + listenerBus: Option[LiveListenerBus] = None) { /** * Sets the given Spark runtime configuration property. @@ -37,6 +41,7 @@ class RuntimeConfig private[sql](sqlConf: SQLConf = new SQLConf) { */ def set(key: String, value: String): Unit = { sqlConf.setConfString(key, value) + postSessionUpdate() } /** @@ -123,6 +128,7 @@ class RuntimeConfig private[sql](sqlConf: SQLConf = new SQLConf) { */ def unset(key: String): Unit = { sqlConf.unsetConf(key) + postSessionUpdate() } /** @@ -132,4 +138,20 @@ class RuntimeConfig private[sql](sqlConf: SQLConf = new SQLConf) { sqlConf.contains(key) } + /** + * Set the sqlConf multiple times while call postSessionUpdate only once. + * This is faster than calling set multiple times which posts session update every time. + */ + protected[sql] def setBatch(options: HashMap[String, String]): Unit = { + options.foreach { case (k, v) => sqlConf.setConfString(k, v) } + postSessionUpdate() + } + + private def postSessionUpdate() { + if (listenerBus.isDefined) { + val sessionUpdate = SparkListenerSessionUpdate(getAll) + listenerBus.get.post(sessionUpdate) + } + } + } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala index 2ade36d075027..627839fd4001f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala @@ -130,7 +130,8 @@ class SparkSession private( * * @since 2.0.0 */ - @transient lazy val conf: RuntimeConfig = new RuntimeConfig(sessionState.conf) + @transient lazy val conf: RuntimeConfig = new RuntimeConfig(sessionState.conf, + Some(sparkContext.listenerBus)) /** * :: Experimental :: @@ -794,7 +795,7 @@ object SparkSession { // Get the session from current thread's active session. var session = activeThreadSession.get() if ((session ne null) && !session.sparkContext.isStopped) { - options.foreach { case (k, v) => session.conf.set(k, v) } + session.conf.setBatch(options) if (options.nonEmpty) { logWarning("Use an existing SparkSession, some configuration may not take effect.") } @@ -806,7 +807,7 @@ object SparkSession { // If the current thread does not have an active session, get it from the global session. session = defaultSession.get() if ((session ne null) && !session.sparkContext.isStopped) { - options.foreach { case (k, v) => session.conf.set(k, v) } + session.conf.setBatch(options) if (options.nonEmpty) { logWarning("Use an existing SparkSession, some configuration may not take effect.") } @@ -829,7 +830,7 @@ object SparkSession { sc } session = new SparkSession(sparkContext) - options.foreach { case (k, v) => session.conf.set(k, v) } + session.conf.setBatch(options) defaultSession.set(session) // Register a successfully instantiated context to the singleton. This should be at the