No tasks have started yet
@@ -45,23 +47,14 @@ private[ui] class StagePage(parent: JobProgressTab) extends WebUIPage("stage") {
"Details for Stage %s".format(stageId), parent.headerTabs, parent)
}
- val tasks = listener.stageIdToTaskData(stageId).values.toSeq.sortBy(_.taskInfo.launchTime)
+ val stageData = stageDataOption.get
+ val tasks = stageData.taskData.values.toSeq.sortBy(_.taskInfo.launchTime)
val numCompleted = tasks.count(_.taskInfo.finished)
- val inputBytes = listener.stageIdToInputBytes.getOrElse(stageId, 0L)
- val hasInput = inputBytes > 0
- val shuffleReadBytes = listener.stageIdToShuffleRead.getOrElse(stageId, 0L)
- val hasShuffleRead = shuffleReadBytes > 0
- val shuffleWriteBytes = listener.stageIdToShuffleWrite.getOrElse(stageId, 0L)
- val hasShuffleWrite = shuffleWriteBytes > 0
- val memoryBytesSpilled = listener.stageIdToMemoryBytesSpilled.getOrElse(stageId, 0L)
- val diskBytesSpilled = listener.stageIdToDiskBytesSpilled.getOrElse(stageId, 0L)
- val hasBytesSpilled = memoryBytesSpilled > 0 && diskBytesSpilled > 0
-
- var activeTime = 0L
- val now = System.currentTimeMillis
- val tasksActive = listener.stageIdToTasksActive(stageId).values
- tasksActive.foreach(activeTime += _.timeRunning(now))
+ val hasInput = stageData.inputBytes > 0
+ val hasShuffleRead = stageData.shuffleReadBytes > 0
+ val hasShuffleWrite = stageData.shuffleWriteBytes > 0
+ val hasBytesSpilled = stageData.memoryBytesSpilled > 0 && stageData.diskBytesSpilled > 0
// scalastyle:off
val summary =
@@ -69,34 +62,34 @@ private[ui] class StagePage(parent: JobProgressTab) extends WebUIPage("stage") {
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala
index fd8d0b5cdde00..5f45c0ced5ec5 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala
@@ -17,12 +17,11 @@
package org.apache.spark.ui.jobs
-import java.util.Date
-
-import scala.collection.mutable.HashMap
import scala.xml.Node
-import org.apache.spark.scheduler.{StageInfo, TaskInfo}
+import java.util.Date
+
+import org.apache.spark.scheduler.StageInfo
import org.apache.spark.ui.{ToolTips, UIUtils}
import org.apache.spark.util.Utils
@@ -71,14 +70,14 @@ private[ui] class StageTableBase(
}
- private def makeProgressBar(started: Int, completed: Int, failed: String, total: Int): Seq[Node] =
+ private def makeProgressBar(started: Int, completed: Int, failed: Int, total: Int): Seq[Node] =
{
val completeWidth = "width: %s%%".format((completed.toDouble/total)*100)
val startWidth = "width: %s%%".format((started.toDouble/total)*100)
- {completed}/{total} {failed}
+ {completed}/{total} { if (failed > 0) s"($failed failed)" else "" }
@@ -108,13 +107,23 @@ private[ui] class StageTableBase(
{s.details}
}
- listener.stageIdToDescription.get(s.stageId)
- .map(d =>
{d}
{nameLink} {killLink}
)
- .getOrElse(
{nameLink} {killLink} {details}
)
+ val stageDataOption = listener.stageIdToData.get(s.stageId)
+ // Too many nested map/flatMaps with options are just annoying to read. Do this imperatively.
+ if (stageDataOption.isDefined && stageDataOption.get.description.isDefined) {
+ val desc = stageDataOption.get.description
+
{desc}
{nameLink} {killLink}
+ } else {
+
{killLink} {nameLink} {details}
+ }
}
protected def stageRow(s: StageInfo): Seq[Node] = {
- val poolName = listener.stageIdToPool.get(s.stageId)
+ val stageDataOption = listener.stageIdToData.get(s.stageId)
+ if (stageDataOption.isEmpty) {
+ return
{s.stageId} | No data available for this stage |
+ }
+
+ val stageData = stageDataOption.get
val submissionTime = s.submissionTime match {
case Some(t) => UIUtils.formatDate(new Date(t))
case None => "Unknown"
@@ -124,35 +133,20 @@ private[ui] class StageTableBase(
if (finishTime > t) finishTime - t else System.currentTimeMillis - t
}
val formattedDuration = duration.map(d => UIUtils.formatDuration(d)).getOrElse("Unknown")
- val startedTasks =
- listener.stageIdToTasksActive.getOrElse(s.stageId, HashMap[Long, TaskInfo]()).size
- val completedTasks = listener.stageIdToTasksComplete.getOrElse(s.stageId, 0)
- val failedTasks = listener.stageIdToTasksFailed.getOrElse(s.stageId, 0) match {
- case f if f > 0 => "(%s failed)".format(f)
- case _ => ""
- }
- val totalTasks = s.numTasks
- val inputSortable = listener.stageIdToInputBytes.getOrElse(s.stageId, 0L)
- val inputRead = inputSortable match {
- case 0 => ""
- case b => Utils.bytesToString(b)
- }
- val shuffleReadSortable = listener.stageIdToShuffleRead.getOrElse(s.stageId, 0L)
- val shuffleRead = shuffleReadSortable match {
- case 0 => ""
- case b => Utils.bytesToString(b)
- }
- val shuffleWriteSortable = listener.stageIdToShuffleWrite.getOrElse(s.stageId, 0L)
- val shuffleWrite = shuffleWriteSortable match {
- case 0 => ""
- case b => Utils.bytesToString(b)
- }
+
+ val inputRead = stageData.inputBytes
+ val inputReadWithUnit = if (inputRead > 0) Utils.bytesToString(inputRead) else ""
+ val shuffleRead = stageData.shuffleReadBytes
+ val shuffleReadWithUnit = if (shuffleRead > 0) Utils.bytesToString(shuffleRead) else ""
+ val shuffleWrite = stageData.shuffleWriteBytes
+ val shuffleWriteWithUnit = if (shuffleWrite > 0) Utils.bytesToString(shuffleWrite) else ""
+
{s.stageId} | ++
{if (isFairScheduler) {
- {poolName.get}
+ .format(UIUtils.prependBaseUri(basePath), stageData.schedulingPool)}>
+ {stageData.schedulingPool}
|
} else {
@@ -162,11 +156,12 @@ private[ui] class StageTableBase(
{submissionTime} |
{formattedDuration} |
- {makeProgressBar(startedTasks, completedTasks, failedTasks, totalTasks)}
+ {makeProgressBar(stageData.numActiveTasks, stageData.numCompleteTasks,
+ stageData.numFailedTasks, s.numTasks)}
|
-
{inputRead} |
-
{shuffleRead} |
-
{shuffleWrite} |
+
{inputReadWithUnit} |
+
{shuffleReadWithUnit} |
+
{shuffleWriteWithUnit} |
}
/** Render an HTML row that represents a stage */
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala b/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala
new file mode 100644
index 0000000000000..be11a11695b01
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.ui.jobs
+
+import org.apache.spark.executor.TaskMetrics
+import org.apache.spark.scheduler.TaskInfo
+
+import scala.collection.mutable.HashMap
+
+private[jobs] object UIData {
+
+ class ExecutorSummary {
+ var taskTime : Long = 0
+ var failedTasks : Int = 0
+ var succeededTasks : Int = 0
+ var inputBytes : Long = 0
+ var shuffleRead : Long = 0
+ var shuffleWrite : Long = 0
+ var memoryBytesSpilled : Long = 0
+ var diskBytesSpilled : Long = 0
+ }
+
+ class StageUIData {
+ var numActiveTasks: Int = _
+ var numCompleteTasks: Int = _
+ var numFailedTasks: Int = _
+
+ var executorRunTime: Long = _
+
+ var inputBytes: Long = _
+ var shuffleReadBytes: Long = _
+ var shuffleWriteBytes: Long = _
+ var memoryBytesSpilled: Long = _
+ var diskBytesSpilled: Long = _
+
+ var schedulingPool: String = ""
+ var description: Option[String] = None
+
+ var taskData = new HashMap[Long, TaskUIData]
+ var executorSummary = new HashMap[String, ExecutorSummary]
+ }
+
+ case class TaskUIData(
+ taskInfo: TaskInfo,
+ taskMetrics: Option[TaskMetrics] = None,
+ errorMessage: Option[String] = None)
+}
diff --git a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
index fa43b66c6cb5a..a8556624804bb 100644
--- a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
@@ -47,11 +47,11 @@ class JobProgressListenerSuite extends FunSuite with LocalSparkContext with Matc
}
listener.completedStages.size should be (5)
- listener.completedStages.filter(_.stageId == 50).size should be (1)
- listener.completedStages.filter(_.stageId == 49).size should be (1)
- listener.completedStages.filter(_.stageId == 48).size should be (1)
- listener.completedStages.filter(_.stageId == 47).size should be (1)
- listener.completedStages.filter(_.stageId == 46).size should be (1)
+ listener.completedStages.count(_.stageId == 50) should be (1)
+ listener.completedStages.count(_.stageId == 49) should be (1)
+ listener.completedStages.count(_.stageId == 48) should be (1)
+ listener.completedStages.count(_.stageId == 47) should be (1)
+ listener.completedStages.count(_.stageId == 46) should be (1)
}
test("test executor id to summary") {
@@ -59,9 +59,7 @@ class JobProgressListenerSuite extends FunSuite with LocalSparkContext with Matc
val listener = new JobProgressListener(conf)
val taskMetrics = new TaskMetrics()
val shuffleReadMetrics = new ShuffleReadMetrics()
-
- // nothing in it
- assert(listener.stageIdToExecutorSummaries.size == 0)
+ assert(listener.stageIdToData.size === 0)
// finish this task, should get updated shuffleRead
shuffleReadMetrics.remoteBytesRead = 1000
@@ -71,8 +69,8 @@ class JobProgressListenerSuite extends FunSuite with LocalSparkContext with Matc
var task = new ShuffleMapTask(0, null, null, 0, null)
val taskType = Utils.getFormattedClassName(task)
listener.onTaskEnd(SparkListenerTaskEnd(task.stageId, taskType, Success, taskInfo, taskMetrics))
- assert(listener.stageIdToExecutorSummaries.getOrElse(0, fail()).getOrElse("exe-1", fail())
- .shuffleRead == 1000)
+ assert(listener.stageIdToData.getOrElse(0, fail()).executorSummary.getOrElse("exe-1", fail())
+ .shuffleRead === 1000)
// finish a task with unknown executor-id, nothing should happen
taskInfo =
@@ -80,7 +78,7 @@ class JobProgressListenerSuite extends FunSuite with LocalSparkContext with Matc
taskInfo.finishTime = 1
task = new ShuffleMapTask(0, null, null, 0, null)
listener.onTaskEnd(SparkListenerTaskEnd(task.stageId, taskType, Success, taskInfo, taskMetrics))
- assert(listener.stageIdToExecutorSummaries.size == 1)
+ assert(listener.stageIdToData.size === 1)
// finish this task, should get updated duration
shuffleReadMetrics.remoteBytesRead = 1000
@@ -89,8 +87,8 @@ class JobProgressListenerSuite extends FunSuite with LocalSparkContext with Matc
taskInfo.finishTime = 1
task = new ShuffleMapTask(0, null, null, 0, null)
listener.onTaskEnd(SparkListenerTaskEnd(task.stageId, taskType, Success, taskInfo, taskMetrics))
- assert(listener.stageIdToExecutorSummaries.getOrElse(0, fail()).getOrElse("exe-1", fail())
- .shuffleRead == 2000)
+ assert(listener.stageIdToData.getOrElse(0, fail()).executorSummary.getOrElse("exe-1", fail())
+ .shuffleRead === 2000)
// finish this task, should get updated duration
shuffleReadMetrics.remoteBytesRead = 1000
@@ -99,8 +97,8 @@ class JobProgressListenerSuite extends FunSuite with LocalSparkContext with Matc
taskInfo.finishTime = 1
task = new ShuffleMapTask(0, null, null, 0, null)
listener.onTaskEnd(SparkListenerTaskEnd(task.stageId, taskType, Success, taskInfo, taskMetrics))
- assert(listener.stageIdToExecutorSummaries.getOrElse(0, fail()).getOrElse("exe-2", fail())
- .shuffleRead == 1000)
+ assert(listener.stageIdToData.getOrElse(0, fail()).executorSummary.getOrElse("exe-2", fail())
+ .shuffleRead === 1000)
}
test("test task success vs failure counting for different task end reasons") {
@@ -121,13 +119,17 @@ class JobProgressListenerSuite extends FunSuite with LocalSparkContext with Matc
TaskKilled,
ExecutorLostFailure,
UnknownReason)
+ var failCount = 0
for (reason <- taskFailedReasons) {
listener.onTaskEnd(SparkListenerTaskEnd(task.stageId, taskType, reason, taskInfo, metrics))
- assert(listener.stageIdToTasksComplete.get(task.stageId) === None)
+ failCount += 1
+ assert(listener.stageIdToData(task.stageId).numCompleteTasks === 0)
+ assert(listener.stageIdToData(task.stageId).numFailedTasks === failCount)
}
// Make sure we count success as success.
listener.onTaskEnd(SparkListenerTaskEnd(task.stageId, taskType, Success, taskInfo, metrics))
- assert(listener.stageIdToTasksComplete.get(task.stageId) === Some(1))
+ assert(listener.stageIdToData(task.stageId).numCompleteTasks === 1)
+ assert(listener.stageIdToData(task.stageId).numFailedTasks === failCount)
}
}