From fe47402c83232f5db64ddfb496a542f3efe145ad Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Mon, 16 Jun 2014 18:26:56 -0700 Subject: [PATCH 1/7] Show exited executors on the Master UI Previously executors disappear if the application exits cleanly (i.e. if sc.stop() is called). --- .../apache/spark/deploy/master/ApplicationInfo.scala | 11 ++++++++++- .../spark/deploy/master/ui/ApplicationPage.scala | 6 +++--- 2 files changed, 13 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala b/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala index 46b9f4dc7d3ba..d704c42fd8ace 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala @@ -20,10 +20,11 @@ package org.apache.spark.deploy.master import java.util.Date import scala.collection.mutable +import scala.collection.mutable.ArrayBuffer import akka.actor.ActorRef -import org.apache.spark.deploy.ApplicationDescription +import org.apache.spark.deploy.{ApplicationDescription, ExecutorState} private[spark] class ApplicationInfo( val startTime: Long, @@ -41,6 +42,7 @@ private[spark] class ApplicationInfo( @transient var appSource: ApplicationSource = _ @transient private var nextExecutorId: Int = _ + @transient private var exitedExecutors: ArrayBuffer[ExecutorInfo] = _ init() @@ -51,6 +53,7 @@ private[spark] class ApplicationInfo( endTime = -1L appSource = new ApplicationSource(this) nextExecutorId = 0 + exitedExecutors = new ArrayBuffer[ExecutorInfo] } private def newExecutorId(useID: Option[Int] = None): Int = { @@ -74,11 +77,17 @@ private[spark] class ApplicationInfo( def removeExecutor(exec: ExecutorInfo) { if (executors.contains(exec.id)) { + if (exec.state == ExecutorState.EXITED) { + exitedExecutors += executors(exec.id) + } executors -= exec.id coresGranted -= exec.cores } } + /** Return the information for all live and exited executors. */ + def executorInfo: Seq[ExecutorInfo] = (executors.values ++ exitedExecutors).toSet.toSeq + private val myMaxCores = desc.maxCores.getOrElse(defaultCores) def coresLeft: Int = myMaxCores - coresGranted diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala b/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala index b5cd4d2ea963f..e35d3b4eb3f98 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala @@ -57,7 +57,7 @@ private[spark] class ApplicationPage(parent: MasterWebUI) extends WebUIPage("app }) val executorHeaders = Seq("ExecutorID", "Worker", "Cores", "Memory", "State", "Logs") - val executors = app.executors.values.toSeq + val executors = app.executorInfo val executorTable = UIUtils.listingTable(executorHeaders, executorRow, executors) val content = @@ -68,14 +68,14 @@ private[spark] class ApplicationPage(parent: MasterWebUI) extends WebUIPage("app
  • Name: {app.desc.name}
  • User: {app.desc.user}
  • Cores: - { + { if (app.desc.maxCores.isEmpty) { "Unlimited (%s granted)".format(app.coresGranted) } else { "%s (%s granted, %s left)".format( app.desc.maxCores.get, app.coresGranted, app.coresLeft) } - } + }
  • Executor Memory: From c89bb6e90af3ceb9e1332b76fd416e0b543c2a2e Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Mon, 16 Jun 2014 19:20:46 -0700 Subject: [PATCH 2/7] Add table for removed executors in MasterWebUI --- .../spark/deploy/master/ApplicationInfo.scala | 13 ++- .../deploy/master/ui/ApplicationPage.scala | 81 +++++++++++-------- 2 files changed, 54 insertions(+), 40 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala b/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala index d704c42fd8ace..46e6aaadccdcd 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala @@ -37,12 +37,12 @@ private[spark] class ApplicationInfo( @transient var state: ApplicationState.Value = _ @transient var executors: mutable.HashMap[Int, ExecutorInfo] = _ + @transient var removedExecutors: ArrayBuffer[ExecutorInfo] = _ @transient var coresGranted: Int = _ @transient var endTime: Long = _ @transient var appSource: ApplicationSource = _ @transient private var nextExecutorId: Int = _ - @transient private var exitedExecutors: ArrayBuffer[ExecutorInfo] = _ init() @@ -53,7 +53,7 @@ private[spark] class ApplicationInfo( endTime = -1L appSource = new ApplicationSource(this) nextExecutorId = 0 - exitedExecutors = new ArrayBuffer[ExecutorInfo] + removedExecutors = new ArrayBuffer[ExecutorInfo] } private def newExecutorId(useID: Option[Int] = None): Int = { @@ -77,16 +77,15 @@ private[spark] class ApplicationInfo( def removeExecutor(exec: ExecutorInfo) { if (executors.contains(exec.id)) { - if (exec.state == ExecutorState.EXITED) { - exitedExecutors += executors(exec.id) - } + removedExecutors += executors(exec.id) executors -= exec.id coresGranted -= exec.cores } } - /** Return the information for all live and exited executors. */ - def executorInfo: Seq[ExecutorInfo] = (executors.values ++ exitedExecutors).toSet.toSeq + def exitedExecutors: Seq[ExecutorInfo] = { + removedExecutors.filter(_.state == ExecutorState.EXITED) + } private val myMaxCores = desc.maxCores.getOrElse(defaultCores) diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala b/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala index e35d3b4eb3f98..80fe7344174f5 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala @@ -25,7 +25,7 @@ import scala.xml.Node import akka.pattern.ask import org.json4s.JValue -import org.apache.spark.deploy.JsonProtocol +import org.apache.spark.deploy.{ExecutorState, JsonProtocol} import org.apache.spark.deploy.DeployMessages.{MasterStateResponse, RequestMasterState} import org.apache.spark.deploy.master.ExecutorInfo import org.apache.spark.ui.{WebUIPage, UIUtils} @@ -57,43 +57,58 @@ private[spark] class ApplicationPage(parent: MasterWebUI) extends WebUIPage("app }) val executorHeaders = Seq("ExecutorID", "Worker", "Cores", "Memory", "State", "Logs") - val executors = app.executorInfo - val executorTable = UIUtils.listingTable(executorHeaders, executorRow, executors) + val allExecutors = (app.executors.values ++ app.removedExecutors).toSet.toSeq + val executors = allExecutors.filter { exec => + !ExecutorState.isFinished(exec.state) || exec.state == ExecutorState.EXITED + } + val removedExecutors = allExecutors.diff(executors) + val executorsTable = UIUtils.listingTable(executorHeaders, executorRow, executors) + val removedExecutorsTable = UIUtils.listingTable(executorHeaders, executorRow, removedExecutors) val content = -
    -
    -
      -
    • ID: {app.id}
    • -
    • Name: {app.desc.name}
    • -
    • User: {app.desc.user}
    • -
    • Cores: - { - if (app.desc.maxCores.isEmpty) { - "Unlimited (%s granted)".format(app.coresGranted) - } else { - "%s (%s granted, %s left)".format( - app.desc.maxCores.get, app.coresGranted, app.coresLeft) - } +
      +
      +
        +
      • ID: {app.id}
      • +
      • Name: {app.desc.name}
      • +
      • User: {app.desc.user}
      • +
      • Cores: + { + if (app.desc.maxCores.isEmpty) { + "Unlimited (%s granted)".format(app.coresGranted) + } else { + "%s (%s granted, %s left)".format( + app.desc.maxCores.get, app.coresGranted, app.coresLeft) } -
      • -
      • - Executor Memory: - {Utils.megabytesToString(app.desc.memoryPerSlave)} -
      • -
      • Submit Date: {app.submitDate}
      • -
      • State: {app.state}
      • -
      • Application Detail UI
      • -
      -
      + } +
    • +
    • + Executor Memory: + {Utils.megabytesToString(app.desc.memoryPerSlave)} +
    • +
    • Submit Date: {app.submitDate}
    • +
    • State: {app.state}
    • +
    • Application Detail UI
    • +
    +
    -
    -
    -

    Executor Summary

    - {executorTable} -
    -
    ; +
    +
    +

    Executor Summary

    + { + executorsTable ++ + { + if (removedExecutors.nonEmpty) { +

    Removed Executors

    ++ + removedExecutorsTable + } else { + Seq.empty[Node] + } + } + } +
    +
    ; UIUtils.basicSparkPage(content, "Application: " + app.desc.name) } From fbb65b8df0165b84e1a42a7191139eeb8cf1a14c Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Mon, 16 Jun 2014 19:24:46 -0700 Subject: [PATCH 3/7] Removed unused method --- .../org/apache/spark/deploy/master/ApplicationInfo.scala | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala b/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala index 46e6aaadccdcd..72d0589689e71 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala @@ -24,7 +24,7 @@ import scala.collection.mutable.ArrayBuffer import akka.actor.ActorRef -import org.apache.spark.deploy.{ApplicationDescription, ExecutorState} +import org.apache.spark.deploy.ApplicationDescription private[spark] class ApplicationInfo( val startTime: Long, @@ -83,10 +83,6 @@ private[spark] class ApplicationInfo( } } - def exitedExecutors: Seq[ExecutorInfo] = { - removedExecutors.filter(_.state == ExecutorState.EXITED) - } - private val myMaxCores = desc.maxCores.getOrElse(defaultCores) def coresLeft: Int = myMaxCores - coresGranted From 161f8a2a574f89a49b90ee2eb0a12b86a489a0a8 Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Mon, 16 Jun 2014 19:59:46 -0700 Subject: [PATCH 4/7] Display finished executors table (fix bug) The table was previously hidden by a comment, which disconnected the HTML from the rest of the content. Yay Scala DSL. --- .../deploy/master/ui/ApplicationPage.scala | 12 +-- .../spark/deploy/worker/ui/WorkerPage.scala | 92 ++++++++----------- 2 files changed, 44 insertions(+), 60 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala b/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala index 80fe7344174f5..c1aee1d1fb427 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala @@ -96,15 +96,11 @@ private[spark] class ApplicationPage(parent: MasterWebUI) extends WebUIPage("app

    Executor Summary

    + {executorsTable} { - executorsTable ++ - { - if (removedExecutors.nonEmpty) { -

    Removed Executors

    ++ - removedExecutorsTable - } else { - Seq.empty[Node] - } + if (removedExecutors.nonEmpty) { +

    Removed Executors

    ++ + removedExecutorsTable } }
    diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerPage.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerPage.scala index d4513118ced05..4e8b46b43628c 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerPage.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerPage.scala @@ -47,73 +47,61 @@ private[spark] class WorkerPage(parent: WorkerWebUI) extends WebUIPage("") { val workerState = Await.result(stateFuture, timeout) val executorHeaders = Seq("ExecutorID", "Cores", "Memory", "Job Details", "Logs") + val runningExecutors = workerState.executors val runningExecutorTable = - UIUtils.listingTable(executorHeaders, executorRow, workerState.executors) + UIUtils.listingTable(executorHeaders, executorRow, runningExecutors) + val finishedExecutors = workerState.finishedExecutors val finishedExecutorTable = - UIUtils.listingTable(executorHeaders, executorRow, workerState.finishedExecutors) + UIUtils.listingTable(executorHeaders, executorRow, finishedExecutors) val driverHeaders = Seq("DriverID", "Main Class", "State", "Cores", "Memory", "Logs", "Notes") val runningDrivers = workerState.drivers.sortBy(_.driverId).reverse val runningDriverTable = UIUtils.listingTable(driverHeaders, driverRow, runningDrivers) val finishedDrivers = workerState.finishedDrivers.sortBy(_.driverId).reverse - def finishedDriverTable = UIUtils.listingTable(driverHeaders, driverRow, finishedDrivers) + val finishedDriverTable = UIUtils.listingTable(driverHeaders, driverRow, finishedDrivers) // For now we only show driver information if the user has submitted drivers to the cluster. // This is until we integrate the notion of drivers and applications in the UI. - def hasDrivers = runningDrivers.length > 0 || finishedDrivers.length > 0 val content = -
    -
    -
      -
    • ID: {workerState.workerId}
    • -
    • - Master URL: {workerState.masterUrl} -
    • -
    • Cores: {workerState.cores} ({workerState.coresUsed} Used)
    • -
    • Memory: {Utils.megabytesToString(workerState.memory)} - ({Utils.megabytesToString(workerState.memoryUsed)} Used)
    • -
    -

    Back to Master

    -
    +
    +
    +
      +
    • ID: {workerState.workerId}
    • +
    • + Master URL: {workerState.masterUrl} +
    • +
    • Cores: {workerState.cores} ({workerState.coresUsed} Used)
    • +
    • Memory: {Utils.megabytesToString(workerState.memory)} + ({Utils.megabytesToString(workerState.memoryUsed)} Used)
    • +
    +

    Back to Master

    - -
    -
    -

    Running Executors {workerState.executors.size}

    - {runningExecutorTable} -
    -
    - // scalastyle:off -
    - {if (hasDrivers) -
    -
    -

    Running Drivers {workerState.drivers.size}

    - {runningDriverTable} -
    -
    +
    +
    +
    +

    Running Executors ({runningExecutors.size})

    + {runningExecutorTable} + { + if (runningDrivers.nonEmpty) { +

    Running Drivers ({runningDrivers.size})

    ++ + runningDriverTable + } } -
    - -
    -
    -

    Finished Executors

    - {finishedExecutorTable} -
    -
    - -
    - {if (hasDrivers) -
    -
    -

    Finished Drivers

    - {finishedDriverTable} -
    -
    + { + if (finishedExecutors.nonEmpty) { +

    Finished Executors ({finishedExecutors.size})

    ++ + finishedExecutorTable + } } -
    ; - // scalastyle:on + { + if (finishedDrivers.nonEmpty) { +

    Finished Drivers ({finishedDrivers.size})

    ++ + finishedDriverTable + } + } +
    +
    ; UIUtils.basicSparkPage(content, "Spark Worker at %s:%s".format( workerState.host, workerState.port)) } From 3390b49b2ac4c810ff5f2d58f7ed228b156acd25 Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Mon, 16 Jun 2014 20:04:15 -0700 Subject: [PATCH 5/7] Add executor state column to WorkerPage --- .../scala/org/apache/spark/deploy/worker/ui/WorkerPage.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerPage.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerPage.scala index 4e8b46b43628c..327b905032800 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerPage.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerPage.scala @@ -46,7 +46,7 @@ private[spark] class WorkerPage(parent: WorkerWebUI) extends WebUIPage("") { val stateFuture = (workerActor ? RequestWorkerState)(timeout).mapTo[WorkerStateResponse] val workerState = Await.result(stateFuture, timeout) - val executorHeaders = Seq("ExecutorID", "Cores", "Memory", "Job Details", "Logs") + val executorHeaders = Seq("ExecutorID", "Cores", "State", "Memory", "Job Details", "Logs") val runningExecutors = workerState.executors val runningExecutorTable = UIUtils.listingTable(executorHeaders, executorRow, runningExecutors) @@ -110,6 +110,7 @@ private[spark] class WorkerPage(parent: WorkerWebUI) extends WebUIPage("") { {executor.execId} {executor.cores} + {executor.state} {Utils.megabytesToString(executor.memory)} From 792f992e71f6145bc9c0f1dd63ae3ca36df98e6f Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Tue, 17 Jun 2014 10:28:24 -0700 Subject: [PATCH 6/7] Add missing equals method in ExecutorInfo --- .../apache/spark/deploy/master/ExecutorInfo.scala | 13 +++++++++++++ .../spark/deploy/master/ui/ApplicationPage.scala | 1 + 2 files changed, 14 insertions(+) diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ExecutorInfo.scala b/core/src/main/scala/org/apache/spark/deploy/master/ExecutorInfo.scala index 76db61dd619c6..7e54a3f8fe024 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/ExecutorInfo.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/ExecutorInfo.scala @@ -34,4 +34,17 @@ private[spark] class ExecutorInfo( } def fullId: String = application.id + "/" + id + + override def equals(other: Any): Boolean = { + other match { + case info: ExecutorInfo => + fullId == info.fullId && + worker.id == info.worker.id && + cores == info.cores && + memory == info.memory + case _ => false + } + } + + override def toString: String = fullId } diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala b/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala index c1aee1d1fb427..34fa1429c86de 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala @@ -58,6 +58,7 @@ private[spark] class ApplicationPage(parent: MasterWebUI) extends WebUIPage("app val executorHeaders = Seq("ExecutorID", "Worker", "Cores", "Memory", "State", "Logs") val allExecutors = (app.executors.values ++ app.removedExecutors).toSet.toSeq + // This includes executors that are either still running or have exited cleanly val executors = allExecutors.filter { exec => !ExecutorState.isFinished(exec.state) || exec.state == ExecutorState.EXITED } From 2e2298fdffb5fed46354f81b2e1b344ed1bb36cf Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Tue, 17 Jun 2014 11:23:04 -0700 Subject: [PATCH 7/7] Add hash code method to ExecutorInfo (minor) --- .../scala/org/apache/spark/deploy/master/ExecutorInfo.scala | 2 ++ 1 file changed, 2 insertions(+) diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ExecutorInfo.scala b/core/src/main/scala/org/apache/spark/deploy/master/ExecutorInfo.scala index 7e54a3f8fe024..d417070c51016 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/ExecutorInfo.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/ExecutorInfo.scala @@ -47,4 +47,6 @@ private[spark] class ExecutorInfo( } override def toString: String = fullId + + override def hashCode: Int = toString.hashCode() }