Skip to content

Commit

Permalink
Allow adding tabs to SparkUI dynamically + add example
Browse files Browse the repository at this point in the history
An example of how this is done is in org.apache.spark.ui.FooTab. Run
it through bin/spark-class to see what it looks like (which should
more or less match your expectations...).
  • Loading branch information
andrewor14 committed Apr 3, 2014
1 parent ed25dfc commit 9a48fa1
Show file tree
Hide file tree
Showing 5 changed files with 139 additions and 25 deletions.
Expand Up @@ -35,7 +35,7 @@ class MasterWebUI(val master: Master, requestedPort: Int)
val masterActorRef = master.self
val timeout = AkkaUtils.askTimeout(master.conf)

/** Initialize all components of the server. Must be called before bind(). */
/** Initialize all components of the server. */
def start() {
attachPage(new ApplicationPage(this))
attachPage(new IndexPage(this))
Expand All @@ -59,25 +59,13 @@ class MasterWebUI(val master: Master, requestedPort: Int)
/** Attach a reconstructed UI to this Master UI. Only valid after bind(). */
def attachUI(ui: SparkUI) {
assert(serverInfo.isDefined, "Master UI must be bound to a server before attaching SparkUIs")
val rootHandler = serverInfo.get.rootHandler
for (handler <- ui.getHandlers) {
rootHandler.addHandler(handler)
if (!handler.isStarted) {
handler.start()
}
}
ui.getHandlers.foreach(attachHandler)
}

/** Detach a reconstructed UI from this Master UI. Only valid after bind(). */
def detachUI(ui: SparkUI) {
assert(serverInfo.isDefined, "Master UI must be bound to a server before detaching SparkUIs")
val rootHandler = serverInfo.get.rootHandler
for (handler <- ui.getHandlers) {
if (handler.isStarted) {
handler.stop()
}
rootHandler.removeHandler(handler)
}
ui.getHandlers.foreach(detachHandler)
}
}

Expand Down
Expand Up @@ -38,7 +38,7 @@ class WorkerWebUI(val worker: Worker, val workDir: File, requestedPort: Option[I
worker.conf.get("worker.ui.port", WorkerWebUI.DEFAULT_PORT).toInt)
val timeout = AkkaUtils.askTimeout(worker.conf)

/** Initialize all components of the server. Must be called before bind(). */
/** Initialize all components of the server. */
def start() {
val logPage = new LogPage(this)
attachPage(logPage)
Expand Down
105 changes: 105 additions & 0 deletions core/src/main/scala/org/apache/spark/ui/FooTab.scala
@@ -0,0 +1,105 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.ui

import javax.servlet.http.HttpServletRequest

import scala.collection.mutable
import scala.xml.Node

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.SparkContext._
import org.apache.spark.scheduler.{SparkListener, SparkListenerJobEnd}

/*
* This is an example of how to extend the SparkUI by adding new tabs to it. It is intended
* only as a demonstration and should be removed before merging into master!
*
* bin/spark-class org.apache.spark.ui.FooTab
*/

/** A tab that displays basic information about jobs seen so far. */
private[spark] class FooTab(parent: SparkUI) extends UITab("foo") {
val appName = parent.appName
val basePath = parent.basePath

def start() {
listener = Some(new FooListener)
attachPage(new IndexPage(this))
}

def fooListener: FooListener = {
assert(listener.isDefined, "ExecutorsTab has not started yet!")
listener.get.asInstanceOf[FooListener]
}

def headerTabs: Seq[UITab] = parent.getTabs
}

/** A foo page. Enough said. */
private[spark] class IndexPage(parent: FooTab) extends UIPage("") {
private val appName = parent.appName
private val basePath = parent.basePath
private val listener = parent.fooListener

override def render(request: HttpServletRequest): Seq[Node] = {
val results = listener.jobResultMap.toSeq.sortBy { case (k, _) => k }
val content =
<div class="row-fluid">
<div class="span12">
<strong>Foo Jobs: </strong>
<ul>
{results.map { case (k, v) => <li>Job {k}: <strong>{v}</strong></li> }}
</ul>
</div>
</div>
UIUtils.headerSparkPage(content, basePath, appName, "Foo", parent.headerTabs, parent)
}
}

/** A listener that maintains a mapping between job IDs and job results. */
private[spark] class FooListener extends SparkListener {
val jobResultMap = mutable.Map[Int, String]()

override def onJobEnd(end: SparkListenerJobEnd) {
jobResultMap(end.jobId) = end.jobResult.toString
}
}


/**
* Start a SparkContext and a SparkUI with a FooTab attached.
*/
private[spark] object FooTab {
def main(args: Array[String]) {
val sc = new SparkContext("local", "Foo Tab", new SparkConf)
val fooTab = new FooTab(sc.ui)
sc.ui.attachTab(fooTab)

// Run a few jobs
sc.parallelize(1 to 1000).count()
sc.parallelize(1 to 2000).persist().count()
sc.parallelize(1 to 3000).map(i => (i/2, i)).groupByKey().count()
sc.parallelize(1 to 4000).map(i => (i/2, i)).groupByKey().persist().count()
sc.parallelize(1 to 5000).map(i => (i/2, i)).groupByKey().persist().count()
sc.parallelize(1 to 6000).map(i => (i/2, i)).groupByKey().persist().count()
sc.parallelize(1 to 7000).map(i => (i/2, i)).groupByKey().persist().count()

readLine("\n> Started SparkUI with a Foo tab...")
}
}
13 changes: 8 additions & 5 deletions core/src/main/scala/org/apache/spark/ui/SparkUI.scala
Expand Up @@ -52,8 +52,9 @@ private[spark] class SparkUI(

// Maintain executor storage status through Spark events
val storageStatusListener = new StorageStatusListener
listenerBus.addListener(storageStatusListener)

/** Initialize all components of the server. Must be called before bind(). */
/** Initialize all components of the server. */
def start() {
attachTab(new JobProgressTab(this))
attachTab(new BlockManagerTab(this))
Expand All @@ -64,14 +65,10 @@ private[spark] class SparkUI(
if (live) {
sc.env.metricsSystem.getServletHandlers.foreach(attachHandler)
}
// Storage status listener must receive events first, as other listeners depend on its state
listenerBus.addListener(storageStatusListener)
getListeners.foreach(listenerBus.addListener)
}

/** Bind to the HTTP server behind this web interface. */
def bind() {
assert(!handlers.isEmpty, "SparkUI has not started yet!")
try {
serverInfo = Some(startJettyServer(bindHost, port, handlers, sc.conf))
logInfo("Started Spark web UI at http://%s:%d".format(publicHost, boundPort))
Expand All @@ -82,6 +79,12 @@ private[spark] class SparkUI(
}
}

/** Attach a tab to this UI, along with its corresponding listener if it exists. */
override def attachTab(tab: UITab) {
super.attachTab(tab)
tab.listener.foreach(listenerBus.addListener)
}

/** Stop the server behind this web interface. Only valid after bind(). */
override def stop() {
super.stop()
Expand Down
26 changes: 22 additions & 4 deletions core/src/main/scala/org/apache/spark/ui/WebUI.scala
Expand Up @@ -35,7 +35,6 @@ import org.apache.spark.util.Utils
*
* Each WebUI represents a collection of tabs, each of which in turn represents a collection of
* pages. The use of tabs is optional, however; a WebUI may choose to include pages directly.
* All tabs and pages must be attached before bind()'ing the server.
*/
private[spark] abstract class WebUI(securityManager: SecurityManager, basePath: String = "") {
protected val tabs = ArrayBuffer[UITab]()
Expand All @@ -46,14 +45,14 @@ private[spark] abstract class WebUI(securityManager: SecurityManager, basePath:
def getHandlers: Seq[ServletContextHandler] = handlers.toSeq
def getListeners: Seq[SparkListener] = tabs.flatMap(_.listener)

/** Attach a tab to this UI, along with all of its attached pages. Only valid before bind(). */
/** Attach a tab to this UI, along with all of its attached pages. */
def attachTab(tab: UITab) {
tab.start()
tab.pages.foreach(attachPage)
tabs += tab
}

/** Attach a page to this UI. Only valid before bind(). */
/** Attach a page to this UI. */
def attachPage(page: UIPage) {
val pagePath = "/" + page.prefix
attachHandler(createServletHandler(pagePath,
Expand All @@ -64,9 +63,26 @@ private[spark] abstract class WebUI(securityManager: SecurityManager, basePath:
}
}

/** Attach a handler to this UI. Only valid before bind(). */
/** Attach a handler to this UI. */
def attachHandler(handler: ServletContextHandler) {
handlers += handler
serverInfo.foreach { info =>
info.rootHandler.addHandler(handler)
if (!handler.isStarted) {
handler.start()
}
}
}

/** Detach a handler from this UI. */
def detachHandler(handler: ServletContextHandler) {
handlers -= handler
serverInfo.foreach { info =>
info.rootHandler.removeHandler(handler)
if (handler.isStarted) {
handler.stop()
}
}
}

/** Initialize all components of the server. Must be called before bind(). */
Expand All @@ -89,6 +105,7 @@ private[spark] abstract class WebUI(securityManager: SecurityManager, basePath:
}
}


/**
* A tab that represents a collection of pages and a unit of listening for Spark events.
* Associating each tab with a listener is arbitrary and need not be the case.
Expand All @@ -108,6 +125,7 @@ private[spark] abstract class UITab(val prefix: String) {
def start()
}


/**
* A page that represents the leaf node in the UI hierarchy.
*
Expand Down

0 comments on commit 9a48fa1

Please sign in to comment.