From 7d57444044296773a27fe4141520291f96b2137f Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Wed, 2 Apr 2014 16:57:07 -0700 Subject: [PATCH] Refactoring the UI interface to add flexibility This commit introduces three (abstract) classes: WebUI, UITab, and UIPage. The top of the hierarchy is the WebUI, which contains many tabs and pages. Each tab in turn contains many pages. When a UITab is attached to a WebUI, the WebUI creates a handler for each of the tab's pages. Similarly, when a UIPage is attached to a WebUI, its handler is created. The server in WebUI is then ready to be bound to a host and a port. This commit also breaks down a couple of unnecessarily large files by moving certain classes to their own files. --- .../scala/org/apache/spark/SparkContext.scala | 2 +- .../apache/spark/deploy/master/Master.scala | 1 + .../deploy/master/ui/ApplicationPage.scala | 16 +- .../spark/deploy/master/ui/IndexPage.scala | 25 ++- .../spark/deploy/master/ui/MasterWebUI.scala | 50 ++---- .../apache/spark/deploy/worker/Worker.scala | 3 +- .../spark/deploy/worker/ui/IndexPage.scala | 8 +- .../spark/deploy/worker/ui/LogPage.scala | 147 ++++++++++++++++ .../spark/deploy/worker/ui/WorkerWebUI.scala | 160 ++---------------- .../scala/org/apache/spark/ui/SparkUI.scala | 95 ++++------- .../scala/org/apache/spark/ui/UIUtils.scala | 30 +++- .../scala/org/apache/spark/ui/WebUI.scala | 110 +++++++++--- .../apache/spark/ui/env/EnvironmentTab.scala | 56 ++++++ .../{EnvironmentUI.scala => IndexPage.scala} | 43 +---- .../apache/spark/ui/exec/ExecutorsTab.scala | 92 ++++++++++ .../{ExecutorsUI.scala => IndexPage.scala} | 90 ++-------- .../apache/spark/ui/jobs/ExecutorTable.scala | 7 +- .../org/apache/spark/ui/jobs/IndexPage.scala | 12 +- ...bProgressUI.scala => JobProgressTab.scala} | 37 ++-- .../org/apache/spark/ui/jobs/PoolPage.scala | 8 +- .../org/apache/spark/ui/jobs/PoolTable.scala | 7 +- .../org/apache/spark/ui/jobs/StagePage.scala | 34 ++-- .../org/apache/spark/ui/jobs/StageTable.scala | 8 +- ...kManagerUI.scala => BlockManagerTab.scala} | 27 +-- .../apache/spark/ui/storage/IndexPage.scala | 8 +- .../org/apache/spark/ui/storage/RDDPage.scala | 8 +- 26 files changed, 583 insertions(+), 501 deletions(-) create mode 100644 core/src/main/scala/org/apache/spark/deploy/worker/ui/LogPage.scala create mode 100644 core/src/main/scala/org/apache/spark/ui/env/EnvironmentTab.scala rename core/src/main/scala/org/apache/spark/ui/env/{EnvironmentUI.scala => IndexPage.scala} (62%) create mode 100644 core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala rename core/src/main/scala/org/apache/spark/ui/exec/{ExecutorsUI.scala => IndexPage.scala} (60%) rename core/src/main/scala/org/apache/spark/ui/jobs/{JobProgressUI.scala => JobProgressTab.scala} (50%) rename core/src/main/scala/org/apache/spark/ui/storage/{BlockManagerUI.scala => BlockManagerTab.scala} (78%) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index b23accbbb9410..495e9e485193c 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -158,8 +158,8 @@ class SparkContext( // Initialize the Spark UI, registering all associated listeners private[spark] val ui = new SparkUI(this) - ui.bind() ui.start() + ui.bind() // Optionally log Spark events private[spark] val eventLogger: Option[EventLoggingListener] = { diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index 95bd62e88db2b..4c12ab192079f 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -85,6 +85,7 @@ private[spark] class Master( val masterSource = new MasterSource(this) val webUi = new MasterWebUI(this, webUiPort) + webUi.start() val masterPublicAddress = { val envVar = System.getenv("SPARK_PUBLIC_DNS") diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala b/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala index cb092cb5d576b..24282048b842e 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala @@ -28,15 +28,17 @@ import org.json4s.JValue import org.apache.spark.deploy.JsonProtocol import org.apache.spark.deploy.DeployMessages.{MasterStateResponse, RequestMasterState} import org.apache.spark.deploy.master.ExecutorInfo -import org.apache.spark.ui.UIUtils +import org.apache.spark.ui.{UIPage, UIUtils} import org.apache.spark.util.Utils -private[spark] class ApplicationPage(parent: MasterWebUI) { - val master = parent.masterActorRef - val timeout = parent.timeout +private[spark] class ApplicationPage(parent: MasterWebUI) + extends UIPage("app", includeJson = true) { + + private val master = parent.masterActorRef + private val timeout = parent.timeout /** Executor details for a particular application */ - def renderJson(request: HttpServletRequest): JValue = { + override def renderJson(request: HttpServletRequest): JValue = { val appId = request.getParameter("appId") val stateFuture = (master ? RequestMasterState)(timeout).mapTo[MasterStateResponse] val state = Await.result(stateFuture, timeout) @@ -47,7 +49,7 @@ private[spark] class ApplicationPage(parent: MasterWebUI) { } /** Executor details for a particular application */ - def render(request: HttpServletRequest): Seq[Node] = { + override def render(request: HttpServletRequest): Seq[Node] = { val appId = request.getParameter("appId") val stateFuture = (master ? RequestMasterState)(timeout).mapTo[MasterStateResponse] val state = Await.result(stateFuture, timeout) @@ -96,7 +98,7 @@ private[spark] class ApplicationPage(parent: MasterWebUI) { UIUtils.basicSparkPage(content, "Application: " + app.desc.name) } - def executorRow(executor: ExecutorInfo): Seq[Node] = { + private def executorRow(executor: ExecutorInfo): Seq[Node] = { {executor.id} diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ui/IndexPage.scala b/core/src/main/scala/org/apache/spark/deploy/master/ui/IndexPage.scala index 8c1d6c7cce450..f011c830a02da 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/ui/IndexPage.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/ui/IndexPage.scala @@ -25,24 +25,24 @@ import scala.xml.Node import akka.pattern.ask import org.json4s.JValue -import org.apache.spark.deploy.{JsonProtocol} +import org.apache.spark.deploy.JsonProtocol import org.apache.spark.deploy.DeployMessages.{MasterStateResponse, RequestMasterState} import org.apache.spark.deploy.master.{ApplicationInfo, DriverInfo, WorkerInfo} -import org.apache.spark.ui.{WebUI, UIUtils} +import org.apache.spark.ui.{UIPage, UIUtils} import org.apache.spark.util.Utils -private[spark] class IndexPage(parent: MasterWebUI) { - val master = parent.masterActorRef - val timeout = parent.timeout +private[spark] class IndexPage(parent: MasterWebUI) extends UIPage("", includeJson = true) { + private val master = parent.masterActorRef + private val timeout = parent.timeout - def renderJson(request: HttpServletRequest): JValue = { + override def renderJson(request: HttpServletRequest): JValue = { val stateFuture = (master ? RequestMasterState)(timeout).mapTo[MasterStateResponse] val state = Await.result(stateFuture, timeout) JsonProtocol.writeMasterState(state) } /** Index view listing applications and executors */ - def render(request: HttpServletRequest): Seq[Node] = { + override def render(request: HttpServletRequest): Seq[Node] = { val stateFuture = (master ? RequestMasterState)(timeout).mapTo[MasterStateResponse] val state = Await.result(stateFuture, timeout) @@ -139,7 +139,7 @@ private[spark] class IndexPage(parent: MasterWebUI) { UIUtils.basicSparkPage(content, "Spark Master at " + state.uri) } - def workerRow(worker: WorkerInfo): Seq[Node] = { + private def workerRow(worker: WorkerInfo): Seq[Node] = { {worker.id} @@ -154,8 +154,7 @@ private[spark] class IndexPage(parent: MasterWebUI) { } - - def appRow(app: ApplicationInfo): Seq[Node] = { + private def appRow(app: ApplicationInfo): Seq[Node] = { {app.id} @@ -169,14 +168,14 @@ private[spark] class IndexPage(parent: MasterWebUI) { {Utils.megabytesToString(app.desc.memoryPerSlave)} - {WebUI.formatDate(app.submitDate)} + {UIUtils.formatDate(app.submitDate)} {app.desc.user} {app.state.toString} - {WebUI.formatDuration(app.duration)} + {UIUtils.formatDuration(app.duration)} } - def driverRow(driver: DriverInfo): Seq[Node] = { + private def driverRow(driver: DriverInfo): Seq[Node] = { {driver.id} {driver.submitDate} diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala b/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala index bd75b2dfd0e07..fbcc76b3cc150 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala @@ -17,13 +17,9 @@ package org.apache.spark.deploy.master.ui -import javax.servlet.http.HttpServletRequest - -import org.eclipse.jetty.servlet.ServletContextHandler - import org.apache.spark.Logging import org.apache.spark.deploy.master.Master -import org.apache.spark.ui.{ServerInfo, SparkUI} +import org.apache.spark.ui.{SparkUI, WebUI} import org.apache.spark.ui.JettyUtils._ import org.apache.spark.util.{AkkaUtils, Utils} @@ -31,50 +27,39 @@ import org.apache.spark.util.{AkkaUtils, Utils} * Web UI server for the standalone master. */ private[spark] -class MasterWebUI(val master: Master, requestedPort: Int) extends Logging { - val masterActorRef = master.self - val timeout = AkkaUtils.askTimeout(master.conf) +class MasterWebUI(val master: Master, requestedPort: Int) + extends WebUI(master.securityMgr) with Logging { private val host = Utils.localHostName() private val port = requestedPort - private val applicationPage = new ApplicationPage(this) - private val indexPage = new IndexPage(this) - private var serverInfo: Option[ServerInfo] = None + val masterActorRef = master.self + val timeout = AkkaUtils.askTimeout(master.conf) - private val handlers: Seq[ServletContextHandler] = { - master.masterMetricsSystem.getServletHandlers ++ - master.applicationMetricsSystem.getServletHandlers ++ - Seq[ServletContextHandler]( - createStaticHandler(MasterWebUI.STATIC_RESOURCE_DIR, "/static"), - createServletHandler("/app/json", - (request: HttpServletRequest) => applicationPage.renderJson(request), master.securityMgr), - createServletHandler("/app", - (request: HttpServletRequest) => applicationPage.render(request), master.securityMgr), - createServletHandler("/json", - (request: HttpServletRequest) => indexPage.renderJson(request), master.securityMgr), - createServletHandler("/", - (request: HttpServletRequest) => indexPage.render(request), master.securityMgr) - ) + def start() { + attachPage(new ApplicationPage(this)) + attachPage(new IndexPage(this)) + attachHandler(createStaticHandler(MasterWebUI.STATIC_RESOURCE_DIR, "/static")) + master.masterMetricsSystem.getServletHandlers.foreach(attachHandler) + master.applicationMetricsSystem.getServletHandlers.foreach(attachHandler) } + /** Bind to the HTTP server behind this web interface. */ def bind() { try { serverInfo = Some(startJettyServer(host, port, handlers, master.conf)) logInfo("Started Master web UI at http://%s:%d".format(host, boundPort)) } catch { case e: Exception => - logError("Failed to create Master JettyUtils", e) + logError("Failed to create Master web UI", e) System.exit(1) } } - def boundPort: Int = serverInfo.map(_.boundPort).getOrElse(-1) - /** Attach a reconstructed UI to this Master UI. Only valid after bind(). */ def attachUI(ui: SparkUI) { assert(serverInfo.isDefined, "Master UI must be bound to a server before attaching SparkUIs") val rootHandler = serverInfo.get.rootHandler - for (handler <- ui.handlers) { + for (handler <- ui.getHandlers) { rootHandler.addHandler(handler) if (!handler.isStarted) { handler.start() @@ -86,18 +71,13 @@ class MasterWebUI(val master: Master, requestedPort: Int) extends Logging { def detachUI(ui: SparkUI) { assert(serverInfo.isDefined, "Master UI must be bound to a server before detaching SparkUIs") val rootHandler = serverInfo.get.rootHandler - for (handler <- ui.handlers) { + for (handler <- ui.getHandlers) { if (handler.isStarted) { handler.stop() } rootHandler.removeHandler(handler) } } - - def stop() { - assert(serverInfo.isDefined, "Attempted to stop a Master UI that was not bound to a server!") - serverInfo.get.server.stop() - } } private[spark] object MasterWebUI { diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala index 8a71ddda4cb5e..97b5a37f1439c 100755 --- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala @@ -123,8 +123,9 @@ private[spark] class Worker( logInfo("Spark home: " + sparkHome) createWorkDir() webUi = new WorkerWebUI(this, workDir, Some(webUiPort)) - context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent]) + webUi.start() webUi.bind() + context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent]) registerWithMaster() metricsSystem.registerSource(workerSource) diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ui/IndexPage.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ui/IndexPage.scala index 85200ab0e102d..bf7d552101484 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/ui/IndexPage.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/ui/IndexPage.scala @@ -28,21 +28,21 @@ import org.apache.spark.deploy.JsonProtocol import org.apache.spark.deploy.DeployMessages.{RequestWorkerState, WorkerStateResponse} import org.apache.spark.deploy.master.DriverState import org.apache.spark.deploy.worker.{DriverRunner, ExecutorRunner} -import org.apache.spark.ui.UIUtils +import org.apache.spark.ui.{UIPage, UIUtils} import org.apache.spark.util.Utils -private[spark] class IndexPage(parent: WorkerWebUI) { +private[spark] class IndexPage(parent: WorkerWebUI) extends UIPage("", includeJson = true) { val workerActor = parent.worker.self val worker = parent.worker val timeout = parent.timeout - def renderJson(request: HttpServletRequest): JValue = { + override def renderJson(request: HttpServletRequest): JValue = { val stateFuture = (workerActor ? RequestWorkerState)(timeout).mapTo[WorkerStateResponse] val workerState = Await.result(stateFuture, timeout) JsonProtocol.writeWorkerState(workerState) } - def render(request: HttpServletRequest): Seq[Node] = { + override def render(request: HttpServletRequest): Seq[Node] = { val stateFuture = (workerActor ? RequestWorkerState)(timeout).mapTo[WorkerStateResponse] val workerState = Await.result(stateFuture, timeout) diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ui/LogPage.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ui/LogPage.scala new file mode 100644 index 0000000000000..f57900c99ce3d --- /dev/null +++ b/core/src/main/scala/org/apache/spark/deploy/worker/ui/LogPage.scala @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.worker.ui + +import java.io.File +import javax.servlet.http.HttpServletRequest + +import scala.xml.Node + +import org.apache.spark.ui.{UIPage, UIUtils} +import org.apache.spark.util.Utils + +private[spark] class LogPage(parent: WorkerWebUI) extends UIPage("logPage") { + private val worker = parent.worker + private val workDir = parent.workDir + + def renderLog(request: HttpServletRequest): String = { + val defaultBytes = 100 * 1024 + + val appId = Option(request.getParameter("appId")) + val executorId = Option(request.getParameter("executorId")) + val driverId = Option(request.getParameter("driverId")) + val logType = request.getParameter("logType") + val offset = Option(request.getParameter("offset")).map(_.toLong) + val byteLength = Option(request.getParameter("byteLength")).map(_.toInt).getOrElse(defaultBytes) + + val path = (appId, executorId, driverId) match { + case (Some(a), Some(e), None) => + s"${workDir.getPath}/$appId/$executorId/$logType" + case (None, None, Some(d)) => + s"${workDir.getPath}/$driverId/$logType" + case _ => + throw new Exception("Request must specify either application or driver identifiers") + } + + val (startByte, endByte) = getByteRange(path, offset, byteLength) + val file = new File(path) + val logLength = file.length + + val pre = s"==== Bytes $startByte-$endByte of $logLength of $path ====\n" + pre + Utils.offsetBytes(path, startByte, endByte) + } + + override def render(request: HttpServletRequest): Seq[Node] = { + val defaultBytes = 100 * 1024 + val appId = Option(request.getParameter("appId")) + val executorId = Option(request.getParameter("executorId")) + val driverId = Option(request.getParameter("driverId")) + val logType = request.getParameter("logType") + val offset = Option(request.getParameter("offset")).map(_.toLong) + val byteLength = Option(request.getParameter("byteLength")).map(_.toInt).getOrElse(defaultBytes) + + val (path, params) = (appId, executorId, driverId) match { + case (Some(a), Some(e), None) => + (s"${workDir.getPath}/$a/$e/$logType", s"appId=$a&executorId=$e") + case (None, None, Some(d)) => + (s"${workDir.getPath}/$d/$logType", s"driverId=$d") + case _ => + throw new Exception("Request must specify either application or driver identifiers") + } + + val (startByte, endByte) = getByteRange(path, offset, byteLength) + val file = new File(path) + val logLength = file.length + val logText = {Utils.offsetBytes(path, startByte, endByte)} + val linkToMaster =

Back to Master

+ val range = Bytes {startByte.toString} - {endByte.toString} of {logLength} + + val backButton = + if (startByte > 0) { + + + + } + else { + + } + + val nextButton = + if (endByte < logLength) { + + + + } + else { + + } + + val content = + + + {linkToMaster} +
+
{backButton}
+
{range}
+
{nextButton}
+
+
+
+
{logText}
+
+ + + UIUtils.basicSparkPage(content, logType + " log page for " + appId) + } + + /** Determine the byte range for a log or log page. */ + private def getByteRange(path: String, offset: Option[Long], byteLength: Int): (Long, Long) = { + val defaultBytes = 100 * 1024 + val maxBytes = 1024 * 1024 + val file = new File(path) + val logLength = file.length() + val getOffset = offset.getOrElse(logLength - defaultBytes) + val startByte = + if (getOffset < 0) 0L + else if (getOffset > logLength) logLength + else getOffset + val logPageLength = math.min(byteLength, maxBytes) + val endByte = math.min(startByte + logPageLength, logLength) + (startByte, endByte) + } +} diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala index de76a5d5eb7bc..e0a60121cb65a 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala @@ -20,11 +20,9 @@ package org.apache.spark.deploy.worker.ui import java.io.File import javax.servlet.http.HttpServletRequest -import org.eclipse.jetty.servlet.ServletContextHandler - import org.apache.spark.Logging import org.apache.spark.deploy.worker.Worker -import org.apache.spark.ui.{JettyUtils, ServerInfo, SparkUI, UIUtils} +import org.apache.spark.ui.{SparkUI, WebUI} import org.apache.spark.ui.JettyUtils._ import org.apache.spark.util.{AkkaUtils, Utils} @@ -33,164 +31,34 @@ import org.apache.spark.util.{AkkaUtils, Utils} */ private[spark] class WorkerWebUI(val worker: Worker, val workDir: File, requestedPort: Option[Int] = None) - extends Logging { - - val timeout = AkkaUtils.askTimeout(worker.conf) + extends WebUI(worker.securityMgr) with Logging { private val host = Utils.localHostName() private val port = requestedPort.getOrElse( worker.conf.get("worker.ui.port", WorkerWebUI.DEFAULT_PORT).toInt) - private val indexPage = new IndexPage(this) - private var serverInfo: Option[ServerInfo] = None + val timeout = AkkaUtils.askTimeout(worker.conf) - private val handlers: Seq[ServletContextHandler] = { - worker.metricsSystem.getServletHandlers ++ - Seq[ServletContextHandler]( - createStaticHandler(WorkerWebUI.STATIC_RESOURCE_BASE, "/static"), - createServletHandler("/log", - (request: HttpServletRequest) => log(request), worker.securityMgr), - createServletHandler("/logPage", - (request: HttpServletRequest) => logPage(request), worker.securityMgr), - createServletHandler("/json", - (request: HttpServletRequest) => indexPage.renderJson(request), worker.securityMgr), - createServletHandler("/", - (request: HttpServletRequest) => indexPage.render(request), worker.securityMgr) - ) + def start() { + val logPage = new LogPage(this) + attachPage(logPage) + attachPage(new IndexPage(this)) + attachHandler(createStaticHandler(WorkerWebUI.STATIC_RESOURCE_BASE, "/static")) + attachHandler(createServletHandler("/log", + (request: HttpServletRequest) => logPage.renderLog(request), worker.securityMgr)) + worker.metricsSystem.getServletHandlers.foreach(attachHandler) } + /** Bind to the HTTP server behind this web interface. */ def bind() { try { - serverInfo = Some(JettyUtils.startJettyServer(host, port, handlers, worker.conf)) + serverInfo = Some(startJettyServer(host, port, handlers, worker.conf)) logInfo("Started Worker web UI at http://%s:%d".format(host, boundPort)) } catch { case e: Exception => - logError("Failed to create Worker JettyUtils", e) + logError("Failed to create Worker web UI", e) System.exit(1) } } - - def boundPort: Int = serverInfo.map(_.boundPort).getOrElse(-1) - - private def log(request: HttpServletRequest): String = { - val defaultBytes = 100 * 1024 - - val appId = Option(request.getParameter("appId")) - val executorId = Option(request.getParameter("executorId")) - val driverId = Option(request.getParameter("driverId")) - val logType = request.getParameter("logType") - val offset = Option(request.getParameter("offset")).map(_.toLong) - val byteLength = Option(request.getParameter("byteLength")).map(_.toInt).getOrElse(defaultBytes) - - val path = (appId, executorId, driverId) match { - case (Some(a), Some(e), None) => - s"${workDir.getPath}/$appId/$executorId/$logType" - case (None, None, Some(d)) => - s"${workDir.getPath}/$driverId/$logType" - case _ => - throw new Exception("Request must specify either application or driver identifiers") - } - - val (startByte, endByte) = getByteRange(path, offset, byteLength) - val file = new File(path) - val logLength = file.length - - val pre = s"==== Bytes $startByte-$endByte of $logLength of $path ====\n" - pre + Utils.offsetBytes(path, startByte, endByte) - } - - private def logPage(request: HttpServletRequest): Seq[scala.xml.Node] = { - val defaultBytes = 100 * 1024 - val appId = Option(request.getParameter("appId")) - val executorId = Option(request.getParameter("executorId")) - val driverId = Option(request.getParameter("driverId")) - val logType = request.getParameter("logType") - val offset = Option(request.getParameter("offset")).map(_.toLong) - val byteLength = Option(request.getParameter("byteLength")).map(_.toInt).getOrElse(defaultBytes) - - val (path, params) = (appId, executorId, driverId) match { - case (Some(a), Some(e), None) => - (s"${workDir.getPath}/$a/$e/$logType", s"appId=$a&executorId=$e") - case (None, None, Some(d)) => - (s"${workDir.getPath}/$d/$logType", s"driverId=$d") - case _ => - throw new Exception("Request must specify either application or driver identifiers") - } - - val (startByte, endByte) = getByteRange(path, offset, byteLength) - val file = new File(path) - val logLength = file.length - val logText = {Utils.offsetBytes(path, startByte, endByte)} - val linkToMaster =

Back to Master

- val range = Bytes {startByte.toString} - {endByte.toString} of {logLength} - - val backButton = - if (startByte > 0) { - - - - } - else { - - } - - val nextButton = - if (endByte < logLength) { - - - - } - else { - - } - - val content = - - - {linkToMaster} -
-
{backButton}
-
{range}
-
{nextButton}
-
-
-
-
{logText}
-
- - - UIUtils.basicSparkPage(content, logType + " log page for " + appId) - } - - /** Determine the byte range for a log or log page. */ - private def getByteRange(path: String, offset: Option[Long], byteLength: Int): (Long, Long) = { - val defaultBytes = 100 * 1024 - val maxBytes = 1024 * 1024 - val file = new File(path) - val logLength = file.length() - val getOffset = offset.getOrElse(logLength - defaultBytes) - val startByte = - if (getOffset < 0) 0L - else if (getOffset > logLength) logLength - else getOffset - val logPageLength = math.min(byteLength, maxBytes) - val endByte = math.min(startByte + logPageLength, logLength) - (startByte, endByte) - } - - def stop() { - assert(serverInfo.isDefined, "Attempted to stop a Worker UI that was not bound to a server!") - serverInfo.get.server.stop() - } } private[spark] object WorkerWebUI { diff --git a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala index ef1ad872c8ef7..d8ea1b13362e3 100644 --- a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala +++ b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala @@ -17,103 +17,78 @@ package org.apache.spark.ui -import org.eclipse.jetty.servlet.ServletContextHandler - -import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkContext, SparkEnv} +import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkContext} import org.apache.spark.scheduler._ import org.apache.spark.storage.StorageStatusListener import org.apache.spark.ui.JettyUtils._ -import org.apache.spark.ui.env.EnvironmentUI -import org.apache.spark.ui.exec.ExecutorsUI -import org.apache.spark.ui.jobs.JobProgressUI -import org.apache.spark.ui.storage.BlockManagerUI +import org.apache.spark.ui.env.EnvironmentTab +import org.apache.spark.ui.exec.ExecutorsTab +import org.apache.spark.ui.jobs.JobProgressTab +import org.apache.spark.ui.storage.BlockManagerTab import org.apache.spark.util.Utils -/** Top level user interface for Spark */ +/** + * Top level user interface for Spark. + */ private[spark] class SparkUI( val sc: SparkContext, conf: SparkConf, + val securityManager: SecurityManager, val listenerBus: SparkListenerBus, val appName: String, val basePath: String = "") - extends Logging { + extends WebUI(securityManager, basePath) with Logging { - def this(sc: SparkContext) = this(sc, sc.conf, sc.listenerBus, sc.appName) + def this(sc: SparkContext) = this(sc, sc.conf, sc.env.securityManager, sc.listenerBus, sc.appName) def this(conf: SparkConf, listenerBus: SparkListenerBus, appName: String, basePath: String) = - this(null, conf, listenerBus, appName, basePath) + this(null, conf, new SecurityManager(conf), listenerBus, appName, basePath) // If SparkContext is not provided, assume the associated application is not live val live = sc != null - val securityManager = if (live) sc.env.securityManager else new SecurityManager(conf) - private val bindHost = Utils.localHostName() private val publicHost = Option(System.getenv("SPARK_PUBLIC_DNS")).getOrElse(bindHost) private val port = conf.get("spark.ui.port", SparkUI.DEFAULT_PORT).toInt - private var serverInfo: Option[ServerInfo] = None - private val storage = new BlockManagerUI(this) - private val jobs = new JobProgressUI(this) - private val env = new EnvironmentUI(this) - private val exec = new ExecutorsUI(this) + // Maintain executor storage status through Spark events + val storageStatusListener = new StorageStatusListener - val handlers: Seq[ServletContextHandler] = { - val metricsServletHandlers = if (live) { - SparkEnv.get.metricsSystem.getServletHandlers - } else { - Array[ServletContextHandler]() + /** Initialize all components of the server. Must be called before bind(). */ + def start() { + attachTab(new BlockManagerTab(this)) + attachTab(new JobProgressTab(this)) + attachTab(new EnvironmentTab(this)) + attachTab(new ExecutorsTab(this)) + attachHandler(createStaticHandler(SparkUI.STATIC_RESOURCE_DIR, "/static")) + attachHandler(createRedirectHandler("/", "/stages", basePath)) + if (live) { + sc.env.metricsSystem.getServletHandlers.foreach(attachHandler) } - storage.getHandlers ++ - jobs.getHandlers ++ - env.getHandlers ++ - exec.getHandlers ++ - metricsServletHandlers ++ - Seq[ServletContextHandler] ( - createStaticHandler(SparkUI.STATIC_RESOURCE_DIR, "/static"), - createRedirectHandler("/", "/stages", basePath) - ) + // Storage status listener must receive events first, as other listeners depend on its state + listenerBus.addListener(storageStatusListener) + getListeners.foreach(listenerBus.addListener) } - // Maintain executor storage status through Spark events - val storageStatusListener = new StorageStatusListener - - /** Bind the HTTP server which backs this web interface */ + /** Bind to the HTTP server behind this web interface. */ def bind() { + assert(!handlers.isEmpty, "SparkUI has not started yet!") try { serverInfo = Some(startJettyServer(bindHost, port, handlers, sc.conf)) - logInfo("Started Spark Web UI at http://%s:%d".format(publicHost, boundPort)) + logInfo("Started Spark web UI at http://%s:%d".format(publicHost, boundPort)) } catch { case e: Exception => - logError("Failed to create Spark JettyUtils", e) + logError("Failed to create Spark web UI", e) System.exit(1) } } - def boundPort: Int = serverInfo.map(_.boundPort).getOrElse(-1) - - /** Initialize all components of the server */ - def start() { - storage.start() - jobs.start() - env.start() - exec.start() - - // Storage status listener must receive events first, as other listeners depend on its state - listenerBus.addListener(storageStatusListener) - listenerBus.addListener(storage.listener) - listenerBus.addListener(jobs.listener) - listenerBus.addListener(env.listener) - listenerBus.addListener(exec.listener) - } - - def stop() { - assert(serverInfo.isDefined, "Attempted to stop a SparkUI that was not bound to a server!") - serverInfo.get.server.stop() - logInfo("Stopped Spark Web UI at %s".format(appUIAddress)) + /** Stop the server behind this web interface. Only valid after bind(). */ + override def stop() { + super.stop() + logInfo("Stopped Spark web UI at %s".format(appUIAddress)) } private[spark] def appUIAddress = "http://" + publicHost + ":" + boundPort - } private[spark] object SparkUI { diff --git a/core/src/main/scala/org/apache/spark/ui/UIUtils.scala b/core/src/main/scala/org/apache/spark/ui/UIUtils.scala index a487924effbff..de4216849dc7d 100644 --- a/core/src/main/scala/org/apache/spark/ui/UIUtils.scala +++ b/core/src/main/scala/org/apache/spark/ui/UIUtils.scala @@ -17,6 +17,9 @@ package org.apache.spark.ui +import java.text.SimpleDateFormat +import java.util.Date + import scala.xml.Node /** Utility functions for generating XML pages with spark content. */ @@ -24,9 +27,32 @@ private[spark] object UIUtils { import Page._ + // SimpleDateFormat is not thread-safe. Don't expose it to avoid improper use. + private val dateFormat = new ThreadLocal[SimpleDateFormat]() { + override def initialValue(): SimpleDateFormat = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss") + } + + def formatDate(date: Date): String = dateFormat.get.format(date) + + def formatDate(timestamp: Long): String = dateFormat.get.format(new Date(timestamp)) + + def formatDuration(milliseconds: Long): String = { + val seconds = milliseconds.toDouble / 1000 + if (seconds < 60) { + return "%.0f s".format(seconds) + } + val minutes = seconds / 60 + if (minutes < 10) { + return "%.1f min".format(minutes) + } else if (minutes < 60) { + return "%.0f min".format(minutes) + } + val hours = minutes / 60 + "%.1f h".format(hours) + } + // Yarn has to go through a proxy so the base uri is provided and has to be on all links - private[spark] val uiRoot : String = Option(System.getenv("APPLICATION_WEB_PROXY_BASE")). - getOrElse("") + val uiRoot : String = Option(System.getenv("APPLICATION_WEB_PROXY_BASE")).getOrElse("") def prependBaseUri(basePath: String = "", resource: String = "") = uiRoot + basePath + resource diff --git a/core/src/main/scala/org/apache/spark/ui/WebUI.scala b/core/src/main/scala/org/apache/spark/ui/WebUI.scala index a7b872f3445a4..6f7385086b534 100644 --- a/core/src/main/scala/org/apache/spark/ui/WebUI.scala +++ b/core/src/main/scala/org/apache/spark/ui/WebUI.scala @@ -17,34 +17,100 @@ package org.apache.spark.ui -import java.text.SimpleDateFormat -import java.util.Date +import javax.servlet.http.HttpServletRequest + +import org.eclipse.jetty.servlet.ServletContextHandler + +import org.apache.spark.SecurityManager +import org.apache.spark.ui.JettyUtils._ +import org.apache.spark.util.Utils +import scala.collection.mutable.ArrayBuffer +import org.apache.spark.scheduler.SparkListener +import scala.xml.Node +import org.json4s.JsonAST.{JNothing, JValue} /** - * Utilities used throughout the web UI. + * The top level component of the UI hierarchy that contains the server. + * + * Each WebUI represents a collection of tabs, each of which in turn represents a collection of + * pages. The use of tabs is optional, however; a WebUI may choose to include pages directly. + * All tabs and pages must be attached before bind()'ing the server. */ -private[spark] object WebUI { - // SimpleDateFormat is not thread-safe. Don't expose it to avoid improper use. - private val dateFormat = new ThreadLocal[SimpleDateFormat]() { - override def initialValue(): SimpleDateFormat = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss") +private[spark] abstract class WebUI(securityManager: SecurityManager, basePath: String = "") { + protected val tabs = ArrayBuffer[UITab]() + protected val handlers = ArrayBuffer[ServletContextHandler]() + protected var serverInfo: Option[ServerInfo] = None + + /** Attach a tab to this UI, along with all of its attached pages. Only valid before bind(). */ + def attachTab(tab: UITab) { + tab.start() + tab.pages.foreach(attachPage) + tabs += tab } - def formatDate(date: Date): String = dateFormat.get.format(date) + /** Attach a page to this UI. Only valid before bind(). */ + def attachPage(page: UIPage) { + val pagePath = "/" + page.prefix + attachHandler(createServletHandler(pagePath, + (request: HttpServletRequest) => page.render(request), securityManager, basePath)) + if (page.includeJson) { + attachHandler(createServletHandler(pagePath.stripSuffix("/") + "/json", + (request: HttpServletRequest) => page.renderJson(request), securityManager, basePath)) + } + } - def formatDate(timestamp: Long): String = dateFormat.get.format(new Date(timestamp)) + /** Attach a handler to this UI. Only valid before bind(). */ + def attachHandler(handler: ServletContextHandler) { + handlers += handler + } - def formatDuration(milliseconds: Long): String = { - val seconds = milliseconds.toDouble / 1000 - if (seconds < 60) { - return "%.0f s".format(seconds) - } - val minutes = seconds / 60 - if (minutes < 10) { - return "%.1f min".format(minutes) - } else if (minutes < 60) { - return "%.0f min".format(minutes) - } - val hours = minutes / 60 - return "%.1f h".format(hours) + /** Return a list of listeners attached to this UI. */ + def getListeners = tabs.flatMap(_.listener) + + /** Return a list of handlers attached to this UI. */ + def getHandlers = handlers.toSeq + + /** + * Bind to the HTTP server behind this web interface. + * Overridden implementation should set serverInfo. + */ + def bind() + + /** Return the actual port to which this server is bound. Only valid after bind(). */ + def boundPort: Int = serverInfo.map(_.boundPort).getOrElse(-1) + + /** Stop the server behind this web interface. Only valid after bind(). */ + def stop() { + assert(serverInfo.isDefined, + "Attempted to stop %s before binding to a server!".format(Utils.getFormattedClassName(this))) + serverInfo.get.server.stop() } } + +/** + * A tab that represents a collection of pages and a unit of listening for Spark events. + * Associating each tab with a listener is arbitrary and need not be the case. + */ +private[spark] abstract class UITab(val prefix: String) { + val pages = ArrayBuffer[UIPage]() + var listener: Option[SparkListener] = None + + /** Attach a page to this tab. This prepends the page's prefix with the tab's own prefix. */ + def attachPage(page: UIPage) { + page.prefix = (prefix + "/" + page.prefix).stripSuffix("/") + pages += page + } + + def start() +} + +/** + * A page that represents the leaf node in the UI hierarchy. + * + * If includeJson is true, the parent WebUI (direct or indirect) creates handlers for both the + * HTML and the JSON content, rather than just the former. + */ +private[spark] abstract class UIPage(var prefix: String, val includeJson: Boolean = false) { + def render(request: HttpServletRequest): Seq[Node] = Seq[Node]() + def renderJson(request: HttpServletRequest): JValue = JNothing +} diff --git a/core/src/main/scala/org/apache/spark/ui/env/EnvironmentTab.scala b/core/src/main/scala/org/apache/spark/ui/env/EnvironmentTab.scala new file mode 100644 index 0000000000000..dd4ea2a2332a2 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/ui/env/EnvironmentTab.scala @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.ui.env + +import org.apache.spark.scheduler._ +import org.apache.spark.ui._ + +private[ui] class EnvironmentTab(parent: SparkUI) extends UITab("environment") { + val appName = parent.appName + val basePath = parent.basePath + + def start() { + listener = Some(new EnvironmentListener) + attachPage(new IndexPage(this)) + } + + def environmentListener = { + assert(listener.isDefined, "EnvironmentTab has not started yet!") + listener.get.asInstanceOf[EnvironmentListener] + } +} + +/** + * A SparkListener that prepares information to be displayed on the EnvironmentTab + */ +private[ui] class EnvironmentListener extends SparkListener { + var jvmInformation = Seq[(String, String)]() + var sparkProperties = Seq[(String, String)]() + var systemProperties = Seq[(String, String)]() + var classpathEntries = Seq[(String, String)]() + + override def onEnvironmentUpdate(environmentUpdate: SparkListenerEnvironmentUpdate) { + synchronized { + val environmentDetails = environmentUpdate.environmentDetails + jvmInformation = environmentDetails("JVM Information") + sparkProperties = environmentDetails("Spark Properties") + systemProperties = environmentDetails("System Properties") + classpathEntries = environmentDetails("Classpath Entries") + } + } +} diff --git a/core/src/main/scala/org/apache/spark/ui/env/EnvironmentUI.scala b/core/src/main/scala/org/apache/spark/ui/env/IndexPage.scala similarity index 62% rename from core/src/main/scala/org/apache/spark/ui/env/EnvironmentUI.scala rename to core/src/main/scala/org/apache/spark/ui/env/IndexPage.scala index 23e90c34d5b33..bf1872f18d54e 100644 --- a/core/src/main/scala/org/apache/spark/ui/env/EnvironmentUI.scala +++ b/core/src/main/scala/org/apache/spark/ui/env/IndexPage.scala @@ -21,30 +21,15 @@ import javax.servlet.http.HttpServletRequest import scala.xml.Node -import org.eclipse.jetty.servlet.ServletContextHandler - -import org.apache.spark.scheduler._ -import org.apache.spark.ui._ -import org.apache.spark.ui.JettyUtils._ +import org.apache.spark.ui.{UIUtils, UIPage} import org.apache.spark.ui.Page.Environment -private[ui] class EnvironmentUI(parent: SparkUI) { +private[ui] class IndexPage(parent: EnvironmentTab) extends UIPage("") { private val appName = parent.appName private val basePath = parent.basePath - private var _listener: Option[EnvironmentListener] = None - - lazy val listener = _listener.get - - def start() { - _listener = Some(new EnvironmentListener) - } - - def getHandlers = Seq[ServletContextHandler]( - createServletHandler("/environment", - (request: HttpServletRequest) => render(request), parent.securityManager, basePath) - ) + private val listener = parent.environmentListener - def render(request: HttpServletRequest): Seq[Node] = { + override def render(request: HttpServletRequest): Seq[Node] = { val runtimeInformationTable = UIUtils.listingTable( propertyHeader, jvmRow, listener.jvmInformation, fixedWidth = true) val sparkPropertiesTable = UIUtils.listingTable( @@ -70,23 +55,3 @@ private[ui] class EnvironmentUI(parent: SparkUI) { private def propertyRow(kv: (String, String)) = {kv._1}{kv._2} private def classPathRow(data: (String, String)) = {data._1}{data._2} } - -/** - * A SparkListener that prepares information to be displayed on the EnvironmentUI - */ -private[ui] class EnvironmentListener extends SparkListener { - var jvmInformation = Seq[(String, String)]() - var sparkProperties = Seq[(String, String)]() - var systemProperties = Seq[(String, String)]() - var classpathEntries = Seq[(String, String)]() - - override def onEnvironmentUpdate(environmentUpdate: SparkListenerEnvironmentUpdate) { - synchronized { - val environmentDetails = environmentUpdate.environmentDetails - jvmInformation = environmentDetails("JVM Information") - sparkProperties = environmentDetails("Spark Properties") - systemProperties = environmentDetails("System Properties") - classpathEntries = environmentDetails("Classpath Entries") - } - } -} diff --git a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala new file mode 100644 index 0000000000000..2b833c58c8e44 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.ui.exec + +import scala.collection.mutable.HashMap + +import org.apache.spark.ExceptionFailure +import org.apache.spark.scheduler._ +import org.apache.spark.storage.StorageStatusListener +import org.apache.spark.ui.{SparkUI, UITab} + +private[ui] class ExecutorsTab(parent: SparkUI) extends UITab("executors") { + val appName = parent.appName + val basePath = parent.basePath + + def start() { + listener = Some(new ExecutorsListener(parent.storageStatusListener)) + attachPage(new IndexPage(this)) + } + + def executorsListener = { + assert(listener.isDefined, "ExecutorsTab has not started yet!") + listener.get.asInstanceOf[ExecutorsListener] + } +} + +/** + * A SparkListener that prepares information to be displayed on the ExecutorsTab + */ +private[ui] class ExecutorsListener(storageStatusListener: StorageStatusListener) + extends SparkListener { + + val executorToTasksActive = HashMap[String, Int]() + val executorToTasksComplete = HashMap[String, Int]() + val executorToTasksFailed = HashMap[String, Int]() + val executorToDuration = HashMap[String, Long]() + val executorToShuffleRead = HashMap[String, Long]() + val executorToShuffleWrite = HashMap[String, Long]() + + def storageStatusList = storageStatusListener.storageStatusList + + override def onTaskStart(taskStart: SparkListenerTaskStart) = synchronized { + val eid = formatExecutorId(taskStart.taskInfo.executorId) + executorToTasksActive(eid) = executorToTasksActive.getOrElse(eid, 0) + 1 + } + + override def onTaskEnd(taskEnd: SparkListenerTaskEnd) = synchronized { + val info = taskEnd.taskInfo + if (info != null) { + val eid = formatExecutorId(info.executorId) + executorToTasksActive(eid) = executorToTasksActive.getOrElse(eid, 1) - 1 + executorToDuration(eid) = executorToDuration.getOrElse(eid, 0L) + info.duration + taskEnd.reason match { + case e: ExceptionFailure => + executorToTasksFailed(eid) = executorToTasksFailed.getOrElse(eid, 0) + 1 + case _ => + executorToTasksComplete(eid) = executorToTasksComplete.getOrElse(eid, 0) + 1 + } + + // Update shuffle read/write + val metrics = taskEnd.taskMetrics + if (metrics != null) { + metrics.shuffleReadMetrics.foreach { shuffleRead => + executorToShuffleRead(eid) = + executorToShuffleRead.getOrElse(eid, 0L) + shuffleRead.remoteBytesRead + } + metrics.shuffleWriteMetrics.foreach { shuffleWrite => + executorToShuffleWrite(eid) = + executorToShuffleWrite.getOrElse(eid, 0L) + shuffleWrite.shuffleBytesWritten + } + } + } + } + + // This addresses executor ID inconsistencies in the local mode + private def formatExecutorId(execId: String) = storageStatusListener.formatExecutorId(execId) +} diff --git a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsUI.scala b/core/src/main/scala/org/apache/spark/ui/exec/IndexPage.scala similarity index 60% rename from core/src/main/scala/org/apache/spark/ui/exec/ExecutorsUI.scala rename to core/src/main/scala/org/apache/spark/ui/exec/IndexPage.scala index 031ed88a493a8..fbbba2f63878f 100644 --- a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsUI.scala +++ b/core/src/main/scala/org/apache/spark/ui/exec/IndexPage.scala @@ -11,7 +11,7 @@ * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and +* See the License for the specific language governing permissions and * limitations under the License. */ @@ -19,36 +19,18 @@ package org.apache.spark.ui.exec import javax.servlet.http.HttpServletRequest -import scala.collection.mutable.HashMap import scala.xml.Node -import org.eclipse.jetty.servlet.ServletContextHandler - -import org.apache.spark.ExceptionFailure -import org.apache.spark.scheduler._ -import org.apache.spark.storage.StorageStatusListener -import org.apache.spark.ui.JettyUtils._ +import org.apache.spark.ui.{UIPage, UIUtils} import org.apache.spark.ui.Page.Executors -import org.apache.spark.ui.{SparkUI, UIUtils} import org.apache.spark.util.Utils -private[ui] class ExecutorsUI(parent: SparkUI) { +private[ui] class IndexPage(parent: ExecutorsTab) extends UIPage("") { private val appName = parent.appName private val basePath = parent.basePath - private var _listener: Option[ExecutorsListener] = None - - lazy val listener = _listener.get - - def start() { - _listener = Some(new ExecutorsListener(parent.storageStatusListener)) - } - - def getHandlers = Seq[ServletContextHandler]( - createServletHandler("/executors", - (request: HttpServletRequest) => render(request), parent.securityManager, basePath) - ) + private val listener = parent.executorsListener - def render(request: HttpServletRequest): Seq[Node] = { + override def render(request: HttpServletRequest): Seq[Node] = { val storageStatusList = listener.storageStatusList val maxMem = storageStatusList.map(_.maxMem).fold(0L)(_ + _) val memUsed = storageStatusList.map(_.memUsed()).fold(0L)(_ + _) @@ -68,11 +50,11 @@ private[ui] class ExecutorsUI(parent: SparkUI) { -
-
- {execTable} -
-
; +
+
+ {execTable} +
+
; UIUtils.headerSparkPage( content, basePath, appName, "Executors (" + execInfo.size + ")", Executors) @@ -158,55 +140,3 @@ private[ui] class ExecutorsUI(parent: SparkUI) { execFields.zip(execValues).toMap } } - -/** - * A SparkListener that prepares information to be displayed on the ExecutorsUI - */ -private[ui] class ExecutorsListener(storageStatusListener: StorageStatusListener) - extends SparkListener { - - val executorToTasksActive = HashMap[String, Int]() - val executorToTasksComplete = HashMap[String, Int]() - val executorToTasksFailed = HashMap[String, Int]() - val executorToDuration = HashMap[String, Long]() - val executorToShuffleRead = HashMap[String, Long]() - val executorToShuffleWrite = HashMap[String, Long]() - - def storageStatusList = storageStatusListener.storageStatusList - - override def onTaskStart(taskStart: SparkListenerTaskStart) = synchronized { - val eid = formatExecutorId(taskStart.taskInfo.executorId) - executorToTasksActive(eid) = executorToTasksActive.getOrElse(eid, 0) + 1 - } - - override def onTaskEnd(taskEnd: SparkListenerTaskEnd) = synchronized { - val info = taskEnd.taskInfo - if (info != null) { - val eid = formatExecutorId(info.executorId) - executorToTasksActive(eid) = executorToTasksActive.getOrElse(eid, 1) - 1 - executorToDuration(eid) = executorToDuration.getOrElse(eid, 0L) + info.duration - taskEnd.reason match { - case e: ExceptionFailure => - executorToTasksFailed(eid) = executorToTasksFailed.getOrElse(eid, 0) + 1 - case _ => - executorToTasksComplete(eid) = executorToTasksComplete.getOrElse(eid, 0) + 1 - } - - // Update shuffle read/write - val metrics = taskEnd.taskMetrics - if (metrics != null) { - metrics.shuffleReadMetrics.foreach { shuffleRead => - executorToShuffleRead(eid) = - executorToShuffleRead.getOrElse(eid, 0L) + shuffleRead.remoteBytesRead - } - metrics.shuffleWriteMetrics.foreach { shuffleWrite => - executorToShuffleWrite(eid) = - executorToShuffleWrite.getOrElse(eid, 0L) + shuffleWrite.shuffleBytesWritten - } - } - } - } - - // This addresses executor ID inconsistencies in the local mode - private def formatExecutorId(execId: String) = storageStatusListener.formatExecutorId(execId) -} diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala b/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala index 73861ae6746da..31173e48d7a1e 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala @@ -20,11 +20,12 @@ package org.apache.spark.ui.jobs import scala.collection.mutable import scala.xml.Node +import org.apache.spark.ui.UIUtils import org.apache.spark.util.Utils /** Page showing executor summary */ -private[ui] class ExecutorTable(stageId: Int, parent: JobProgressUI) { - private lazy val listener = parent.listener +private[ui] class ExecutorTable(stageId: Int, parent: JobProgressTab) { + private lazy val listener = parent.jobProgressListener def toNodeSeq: Seq[Node] = { listener.synchronized { @@ -69,7 +70,7 @@ private[ui] class ExecutorTable(stageId: Int, parent: JobProgressUI) { {k} {executorIdToAddress.getOrElse(k, "CANNOT FIND ADDRESS")} - {parent.formatDuration(v.taskTime)} + {UIUtils.formatDuration(v.taskTime)} {v.failedTasks + v.succeededTasks} {v.failedTasks} {v.succeededTasks} diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/IndexPage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/IndexPage.scala index 70d62b66a4829..c600e58af004d 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/IndexPage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/IndexPage.scala @@ -23,23 +23,23 @@ import scala.xml.{Node, NodeSeq} import org.apache.spark.scheduler.Schedulable import org.apache.spark.ui.Page._ -import org.apache.spark.ui.UIUtils +import org.apache.spark.ui.{UIPage, UIUtils} /** Page showing list of all ongoing and recently finished stages and pools */ -private[ui] class IndexPage(parent: JobProgressUI) { +private[ui] class IndexPage(parent: JobProgressTab) extends UIPage("") { private val appName = parent.appName private val basePath = parent.basePath private val live = parent.live private val sc = parent.sc - private lazy val listener = parent.listener + private lazy val listener = parent.jobProgressListener private lazy val isFairScheduler = parent.isFairScheduler - def render(request: HttpServletRequest): Seq[Node] = { + override def render(request: HttpServletRequest): Seq[Node] = { listener.synchronized { val activeStages = listener.activeStages.values.toSeq val completedStages = listener.completedStages.reverse.toSeq val failedStages = listener.failedStages.reverse.toSeq - val now = System.currentTimeMillis() + val now = System.currentTimeMillis val activeStagesTable = new StageTable(activeStages.sortBy(_.submissionTime).reverse, parent) val completedStagesTable = @@ -57,7 +57,7 @@ private[ui] class IndexPage(parent: JobProgressUI) { // Total duration is not meaningful unless the UI is live
  • Total Duration: - {parent.formatDuration(now - sc.startTime)} + {UIUtils.formatDuration(now - sc.startTime)}
  • }}
  • diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressUI.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressTab.scala similarity index 50% rename from core/src/main/scala/org/apache/spark/ui/jobs/JobProgressUI.scala rename to core/src/main/scala/org/apache/spark/ui/jobs/JobProgressTab.scala index b2c67381cc3da..c40b75d684510 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressUI.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressTab.scala @@ -17,44 +17,29 @@ package org.apache.spark.ui.jobs -import javax.servlet.http.HttpServletRequest - -import org.eclipse.jetty.servlet.ServletContextHandler - import org.apache.spark.SparkConf import org.apache.spark.scheduler.SchedulingMode -import org.apache.spark.ui.JettyUtils._ -import org.apache.spark.ui.SparkUI -import org.apache.spark.util.Utils +import org.apache.spark.ui.{SparkUI, UITab} /** Web UI showing progress status of all jobs in the given SparkContext. */ -private[ui] class JobProgressUI(parent: SparkUI) { +private[ui] class JobProgressTab(parent: SparkUI) extends UITab("stages") { val appName = parent.appName val basePath = parent.basePath val live = parent.live val sc = parent.sc - lazy val listener = _listener.get - lazy val isFairScheduler = listener.schedulingMode.exists(_ == SchedulingMode.FAIR) - - private val indexPage = new IndexPage(this) - private val stagePage = new StagePage(this) - private val poolPage = new PoolPage(this) - private var _listener: Option[JobProgressListener] = None - def start() { val conf = if (live) sc.conf else new SparkConf - _listener = Some(new JobProgressListener(conf)) + listener = Some(new JobProgressListener(conf)) + attachPage(new IndexPage(this)) + attachPage(new StagePage(this)) + attachPage(new PoolPage(this)) } - def formatDuration(ms: Long) = Utils.msDurationToString(ms) + def jobProgressListener = { + assert(listener.isDefined, "JobProgressTab has not started yet!") + listener.get.asInstanceOf[JobProgressListener] + } - def getHandlers = Seq[ServletContextHandler]( - createServletHandler("/stages/stage", - (request: HttpServletRequest) => stagePage.render(request), parent.securityManager, basePath), - createServletHandler("/stages/pool", - (request: HttpServletRequest) => poolPage.render(request), parent.securityManager, basePath), - createServletHandler("/stages", - (request: HttpServletRequest) => indexPage.render(request), parent.securityManager, basePath) - ) + def isFairScheduler = jobProgressListener.schedulingMode.exists(_ == SchedulingMode.FAIR) } diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/PoolPage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/PoolPage.scala index bd33182b70059..53200ecdd4fee 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/PoolPage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/PoolPage.scala @@ -23,17 +23,17 @@ import scala.xml.Node import org.apache.spark.scheduler.{Schedulable, StageInfo} import org.apache.spark.ui.Page._ -import org.apache.spark.ui.UIUtils +import org.apache.spark.ui.{UIPage, UIUtils} /** Page showing specific pool details */ -private[ui] class PoolPage(parent: JobProgressUI) { +private[ui] class PoolPage(parent: JobProgressTab) extends UIPage("pool") { private val appName = parent.appName private val basePath = parent.basePath private val live = parent.live private val sc = parent.sc - private lazy val listener = parent.listener + private lazy val listener = parent.jobProgressListener - def render(request: HttpServletRequest): Seq[Node] = { + override def render(request: HttpServletRequest): Seq[Node] = { listener.synchronized { val poolName = request.getParameter("poolname") val poolToActiveStages = listener.poolToActiveStages diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/PoolTable.scala b/core/src/main/scala/org/apache/spark/ui/jobs/PoolTable.scala index c5c8d8668740b..bb7a9c14f7761 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/PoolTable.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/PoolTable.scala @@ -24,10 +24,9 @@ import org.apache.spark.scheduler.{Schedulable, StageInfo} import org.apache.spark.ui.UIUtils /** Table showing list of pools */ -private[ui] class PoolTable(pools: Seq[Schedulable], parent: JobProgressUI) { +private[ui] class PoolTable(pools: Seq[Schedulable], parent: JobProgressTab) { private val basePath = parent.basePath - private val poolToActiveStages = listener.poolToActiveStages - private lazy val listener = parent.listener + private lazy val listener = parent.jobProgressListener def toNodeSeq: Seq[Node] = { listener.synchronized { @@ -48,7 +47,7 @@ private[ui] class PoolTable(pools: Seq[Schedulable], parent: JobProgressUI) { SchedulingMode - {rows.map(r => makeRow(r, poolToActiveStages))} + {rows.map(r => makeRow(r, listener.poolToActiveStages))} } diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala index 0c55f2ee7e944..bd3d878b1567f 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala @@ -23,16 +23,16 @@ import javax.servlet.http.HttpServletRequest import scala.xml.Node import org.apache.spark.ui.Page._ -import org.apache.spark.ui.{WebUI, UIUtils} +import org.apache.spark.ui.{UIPage, UIUtils} import org.apache.spark.util.{Utils, Distribution} /** Page showing statistics and task list for a given stage */ -private[ui] class StagePage(parent: JobProgressUI) { +private[ui] class StagePage(parent: JobProgressTab) extends UIPage("stage") { private val appName = parent.appName private val basePath = parent.basePath - private lazy val listener = parent.listener + private lazy val listener = parent.jobProgressListener - def render(request: HttpServletRequest): Seq[Node] = { + override def render(request: HttpServletRequest): Seq[Node] = { listener.synchronized { val stageId = request.getParameter("id").toInt @@ -58,7 +58,7 @@ private[ui] class StagePage(parent: JobProgressUI) { val hasBytesSpilled = memoryBytesSpilled > 0 && diskBytesSpilled > 0 var activeTime = 0L - val now = System.currentTimeMillis() + val now = System.currentTimeMillis val tasksActive = listener.stageIdToTasksActive(stageId).values tasksActive.foreach(activeTime += _.timeRunning(now)) @@ -68,7 +68,7 @@ private[ui] class StagePage(parent: JobProgressUI) {