diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala b/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala index add0e9878a546..dc0b010f97841 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala @@ -17,6 +17,8 @@ package org.apache.spark.ui.jobs +import org.apache.spark.ui.jobs.UIData.StageUIData + import scala.collection.mutable import scala.xml.Node @@ -63,11 +65,9 @@ private[ui] class ExecutorTable(stageId: Int, parent: JobProgressTab) { executorIdToAddress.put(executorId, address) } - val executorIdToSummary = listener.stageIdToExecutorSummaries.get(stageId) - executorIdToSummary match { - case Some(x) => - x.toSeq.sortBy(_._1).map { case (k, v) => { - // scalastyle:off + listener.stageUIData.get(stageId) match { + case Some(stageData: StageUIData) => + stageData.executorSummary.toSeq.sortBy(_._1).map { case (k, v) => {k} {executorIdToAddress.getOrElse(k, "CANNOT FIND ADDRESS")} @@ -75,15 +75,18 @@ private[ui] class ExecutorTable(stageId: Int, parent: JobProgressTab) { {v.failedTasks + v.succeededTasks} {v.failedTasks} {v.succeededTasks} - {Utils.bytesToString(v.shuffleRead)} - {Utils.bytesToString(v.shuffleWrite)} - {Utils.bytesToString(v.memoryBytesSpilled)} - {Utils.bytesToString(v.diskBytesSpilled)} + + {Utils.bytesToString(v.shuffleRead)} + + {Utils.bytesToString(v.shuffleWrite)} + + {Utils.bytesToString(v.memoryBytesSpilled)} + + {Utils.bytesToString(v.diskBytesSpilled)} - // scalastyle:on } - } - case _ => Seq[Node]() + case None => + Seq.empty[Node] } } } diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala index 381a5443df8b5..21316afc3130f 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala @@ -25,6 +25,7 @@ import org.apache.spark.executor.TaskMetrics import org.apache.spark.scheduler._ import org.apache.spark.scheduler.SchedulingMode.SchedulingMode import org.apache.spark.storage.BlockManagerId +import org.apache.spark.ui.jobs.UIData._ /** * :: DeveloperApi :: @@ -35,7 +36,7 @@ import org.apache.spark.storage.BlockManagerId * updating the internal data structures concurrently. */ @DeveloperApi -class JobProgressListener(conf: SparkConf) extends SparkListener { +class JobProgressListener(conf: SparkConf) extends SparkListener with Logging { import JobProgressListener._ @@ -51,19 +52,8 @@ class JobProgressListener(conf: SparkConf) extends SparkListener { var totalShuffleRead = 0L var totalShuffleWrite = 0L - // TODO: Should probably consolidate all following into a single hash map. - val stageIdToTime = HashMap[Int, Long]() - val stageIdToShuffleRead = HashMap[Int, Long]() - val stageIdToShuffleWrite = HashMap[Int, Long]() - val stageIdToMemoryBytesSpilled = HashMap[Int, Long]() - val stageIdToDiskBytesSpilled = HashMap[Int, Long]() - val stageIdToTasksActive = HashMap[Int, HashMap[Long, TaskInfo]]() - val stageIdToTasksComplete = HashMap[Int, Int]() - val stageIdToTasksFailed = HashMap[Int, Int]() - val stageIdToTaskData = HashMap[Int, HashMap[Long, TaskUIData]]() - val stageIdToExecutorSummaries = HashMap[Int, HashMap[String, ExecutorSummary]]() - val stageIdToPool = HashMap[Int, String]() - val stageIdToDescription = HashMap[Int, String]() + val stageUIData = new HashMap[Int, StageUIData] + val poolToActiveStages = HashMap[String, HashMap[Int, StageInfo]]() val executorIdToBlockManagerId = HashMap[String, BlockManagerId]() @@ -75,8 +65,13 @@ class JobProgressListener(conf: SparkConf) extends SparkListener { override def onStageCompleted(stageCompleted: SparkListenerStageCompleted) = synchronized { val stage = stageCompleted.stageInfo val stageId = stage.stageId + val stageData = stageUIData.getOrElseUpdate(stage.stageId, { + logWarning("Stage completed for unknown stage " + stage.stageId) + new StageUIData + }) + // Remove by stageId, rather than by StageInfo, in case the StageInfo is from storage - poolToActiveStages(stageIdToPool(stageId)).remove(stageId) + poolToActiveStages(stageData.schedulingPool).remove(stageId) activeStages.remove(stageId) if (stage.failureReason.isEmpty) { completedStages += stage @@ -91,20 +86,7 @@ class JobProgressListener(conf: SparkConf) extends SparkListener { private def trimIfNecessary(stages: ListBuffer[StageInfo]) = synchronized { if (stages.size > retainedStages) { val toRemove = math.max(retainedStages / 10, 1) - stages.take(toRemove).foreach { s => - stageIdToTime.remove(s.stageId) - stageIdToShuffleRead.remove(s.stageId) - stageIdToShuffleWrite.remove(s.stageId) - stageIdToMemoryBytesSpilled.remove(s.stageId) - stageIdToDiskBytesSpilled.remove(s.stageId) - stageIdToTasksActive.remove(s.stageId) - stageIdToTasksComplete.remove(s.stageId) - stageIdToTasksFailed.remove(s.stageId) - stageIdToTaskData.remove(s.stageId) - stageIdToExecutorSummaries.remove(s.stageId) - stageIdToPool.remove(s.stageId) - stageIdToDescription.remove(s.stageId) - } + stages.take(toRemove).foreach { s => stageUIData.remove(s.stageId) } stages.trimStart(toRemove) } } @@ -117,26 +99,27 @@ class JobProgressListener(conf: SparkConf) extends SparkListener { val poolName = Option(stageSubmitted.properties).map { p => p.getProperty("spark.scheduler.pool", DEFAULT_POOL_NAME) }.getOrElse(DEFAULT_POOL_NAME) - stageIdToPool(stage.stageId) = poolName - val description = Option(stageSubmitted.properties).flatMap { + val stageData = stageUIData.getOrElseUpdate(stage.stageId, new StageUIData) + stageData.schedulingPool = poolName + + stageData.description = Option(stageSubmitted.properties).flatMap { p => Option(p.getProperty(SparkContext.SPARK_JOB_DESCRIPTION)) } - description.map(d => stageIdToDescription(stage.stageId) = d) val stages = poolToActiveStages.getOrElseUpdate(poolName, new HashMap[Int, StageInfo]()) stages(stage.stageId) = stage } override def onTaskStart(taskStart: SparkListenerTaskStart) = synchronized { - val sid = taskStart.stageId val taskInfo = taskStart.taskInfo if (taskInfo != null) { - val tasksActive = stageIdToTasksActive.getOrElseUpdate(sid, new HashMap[Long, TaskInfo]()) - tasksActive(taskInfo.taskId) = taskInfo - val taskMap = stageIdToTaskData.getOrElse(sid, HashMap[Long, TaskUIData]()) - taskMap(taskInfo.taskId) = new TaskUIData(taskInfo) - stageIdToTaskData(sid) = taskMap + val stageData = stageUIData.getOrElseUpdate(taskStart.stageId, { + logWarning("Task start for unknown stage " + taskStart.stageId) + new StageUIData + }) + stageData.numActiveTasks += 1 + stageData.taskData.put(taskInfo.taskId, new TaskUIData(taskInfo)) } } @@ -146,86 +129,75 @@ class JobProgressListener(conf: SparkConf) extends SparkListener { } override def onTaskEnd(taskEnd: SparkListenerTaskEnd) = synchronized { - val sid = taskEnd.stageId val info = taskEnd.taskInfo - if (info != null) { + val stageData = stageUIData.getOrElseUpdate(taskEnd.stageId, { + logWarning("Task end for unknown stage " + taskEnd.stageId) + new StageUIData + }) + // create executor summary map if necessary - val executorSummaryMap = stageIdToExecutorSummaries.getOrElseUpdate(key = sid, - op = new HashMap[String, ExecutorSummary]()) + val executorSummaryMap = stageData.executorSummary executorSummaryMap.getOrElseUpdate(key = info.executorId, op = new ExecutorSummary) - val executorSummary = executorSummaryMap.get(info.executorId) - executorSummary match { - case Some(y) => { - // first update failed-task, succeed-task - taskEnd.reason match { - case Success => - y.succeededTasks += 1 - case _ => - y.failedTasks += 1 - } - - // update duration - y.taskTime += info.duration - - val metrics = taskEnd.taskMetrics - if (metrics != null) { - metrics.shuffleReadMetrics.foreach { y.shuffleRead += _.remoteBytesRead } - metrics.shuffleWriteMetrics.foreach { y.shuffleWrite += _.shuffleBytesWritten } - y.memoryBytesSpilled += metrics.memoryBytesSpilled - y.diskBytesSpilled += metrics.diskBytesSpilled - } + executorSummaryMap.get(info.executorId).foreach { y => + // first update failed-task, succeed-task + taskEnd.reason match { + case Success => + y.succeededTasks += 1 + case _ => + y.failedTasks += 1 + } + + // update duration + y.taskTime += info.duration + + val metrics = taskEnd.taskMetrics + if (metrics != null) { + metrics.shuffleReadMetrics.foreach { y.shuffleRead += _.remoteBytesRead } + metrics.shuffleWriteMetrics.foreach { y.shuffleWrite += _.shuffleBytesWritten } + y.memoryBytesSpilled += metrics.memoryBytesSpilled + y.diskBytesSpilled += metrics.diskBytesSpilled } - case _ => {} } - val tasksActive = stageIdToTasksActive.getOrElseUpdate(sid, new HashMap[Long, TaskInfo]()) - // Remove by taskId, rather than by TaskInfo, in case the TaskInfo is from storage - tasksActive.remove(info.taskId) + stageData.numActiveTasks -= 1 val (errorMessage, metrics): (Option[String], Option[TaskMetrics]) = taskEnd.reason match { case org.apache.spark.Success => - stageIdToTasksComplete(sid) = stageIdToTasksComplete.getOrElse(sid, 0) + 1 + stageData.numCompleteTasks += 1 (None, Option(taskEnd.taskMetrics)) case e: ExceptionFailure => // Handle ExceptionFailure because we might have metrics - stageIdToTasksFailed(sid) = stageIdToTasksFailed.getOrElse(sid, 0) + 1 + stageData.numFailedTasks += 1 (Some(e.toErrorString), e.metrics) case e: TaskFailedReason => // All other failure cases - stageIdToTasksFailed(sid) = stageIdToTasksFailed.getOrElse(sid, 0) + 1 + stageData.numFailedTasks += 1 (Some(e.toErrorString), None) } - stageIdToTime.getOrElseUpdate(sid, 0L) - val time = metrics.map(_.executorRunTime).getOrElse(0L) - stageIdToTime(sid) += time - totalTime += time + val taskRunTime = metrics.map(_.executorRunTime).getOrElse(0L) + stageData.executorRunTime += taskRunTime + totalTime += taskRunTime - stageIdToShuffleRead.getOrElseUpdate(sid, 0L) val shuffleRead = metrics.flatMap(_.shuffleReadMetrics).map(_.remoteBytesRead).getOrElse(0L) - stageIdToShuffleRead(sid) += shuffleRead + stageData.shuffleReadBytes += shuffleRead totalShuffleRead += shuffleRead - stageIdToShuffleWrite.getOrElseUpdate(sid, 0L) val shuffleWrite = metrics.flatMap(_.shuffleWriteMetrics).map(_.shuffleBytesWritten).getOrElse(0L) - stageIdToShuffleWrite(sid) += shuffleWrite + stageData.shuffleWriteBytes += shuffleWrite totalShuffleWrite += shuffleWrite - stageIdToMemoryBytesSpilled.getOrElseUpdate(sid, 0L) val memoryBytesSpilled = metrics.map(_.memoryBytesSpilled).getOrElse(0L) - stageIdToMemoryBytesSpilled(sid) += memoryBytesSpilled + stageData.memoryBytesSpilled += memoryBytesSpilled - stageIdToDiskBytesSpilled.getOrElseUpdate(sid, 0L) val diskBytesSpilled = metrics.map(_.diskBytesSpilled).getOrElse(0L) - stageIdToDiskBytesSpilled(sid) += diskBytesSpilled + stageData.diskBytesSpilled += diskBytesSpilled - val taskMap = stageIdToTaskData.getOrElse(sid, HashMap[Long, TaskUIData]()) - taskMap(info.taskId) = new TaskUIData(info, metrics, errorMessage) - stageIdToTaskData(sid) = taskMap + stageData.taskData(info.taskId) = new TaskUIData(info, metrics, errorMessage) } - } + } // end of onTaskEnd override def onEnvironmentUpdate(environmentUpdate: SparkListenerEnvironmentUpdate) { synchronized { @@ -253,12 +225,6 @@ class JobProgressListener(conf: SparkConf) extends SparkListener { } -@DeveloperApi -case class TaskUIData( - taskInfo: TaskInfo, - taskMetrics: Option[TaskMetrics] = None, - errorMessage: Option[String] = None) - private object JobProgressListener { val DEFAULT_POOL_NAME = "default" val DEFAULT_RETAINED_STAGES = 1000 diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala index 8e3d5d1cd4c6b..12bf108d33cf1 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala @@ -23,6 +23,7 @@ import javax.servlet.http.HttpServletRequest import scala.xml.Node import org.apache.spark.ui.{WebUIPage, UIUtils} +import org.apache.spark.ui.jobs.UIData._ import org.apache.spark.util.{Utils, Distribution} /** Page showing statistics and task list for a given stage */ @@ -34,8 +35,9 @@ private[ui] class StagePage(parent: JobProgressTab) extends WebUIPage("stage") { def render(request: HttpServletRequest): Seq[Node] = { listener.synchronized { val stageId = request.getParameter("id").toInt + val stageData = listener.stageUIData(stageId) - if (!listener.stageIdToTaskData.contains(stageId)) { + if (stageData.taskData.isEmpty) { val content =

Summary Metrics

No tasks have started yet @@ -45,21 +47,13 @@ private[ui] class StagePage(parent: JobProgressTab) extends WebUIPage("stage") { "Details for Stage %s".format(stageId), parent.headerTabs, parent) } - val tasks = listener.stageIdToTaskData(stageId).values.toSeq.sortBy(_.taskInfo.launchTime) + val tasks = stageData.taskData.values.toSeq.sortBy(_.taskInfo.launchTime) val numCompleted = tasks.count(_.taskInfo.finished) - val shuffleReadBytes = listener.stageIdToShuffleRead.getOrElse(stageId, 0L) - val hasShuffleRead = shuffleReadBytes > 0 - val shuffleWriteBytes = listener.stageIdToShuffleWrite.getOrElse(stageId, 0L) - val hasShuffleWrite = shuffleWriteBytes > 0 - val memoryBytesSpilled = listener.stageIdToMemoryBytesSpilled.getOrElse(stageId, 0L) - val diskBytesSpilled = listener.stageIdToDiskBytesSpilled.getOrElse(stageId, 0L) - val hasBytesSpilled = memoryBytesSpilled > 0 && diskBytesSpilled > 0 - var activeTime = 0L - val now = System.currentTimeMillis - val tasksActive = listener.stageIdToTasksActive(stageId).values - tasksActive.foreach(activeTime += _.timeRunning(now)) + val hasShuffleRead = stageData.shuffleReadBytes > 0 + val hasShuffleWrite = stageData.shuffleWriteBytes > 0 + val hasBytesSpilled = stageData.memoryBytesSpilled > 0 && stageData.diskBytesSpilled > 0 // scalastyle:off val summary = @@ -67,28 +61,28 @@ private[ui] class StagePage(parent: JobProgressTab) extends WebUIPage("stage") { diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala index 30971f769682f..b4ed2bee4a411 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala @@ -63,14 +63,14 @@ private[ui] class StageTableBase( } - private def makeProgressBar(started: Int, completed: Int, failed: String, total: Int): Seq[Node] = + private def makeProgressBar(started: Int, completed: Int, failed: Int, total: Int): Seq[Node] = { val completeWidth = "width: %s%%".format((completed.toDouble/total)*100) val startWidth = "width: %s%%".format((started.toDouble/total)*100)
- {completed}/{total} {failed} + {completed}/{total} { if (failed > 0) s"($failed failed)" else "" }
@@ -99,13 +99,14 @@ private[ui] class StageTableBase( } - listener.stageIdToDescription.get(s.stageId) - .map(d =>
{d}
{nameLink} {killLink}
) - .getOrElse(
{killLink} {nameLink} {details}
) + listener.stageUIData(s.stageId).description match { + case Some(desc) =>
{desc}
{nameLink} {killLink}
+ case None =>
{killLink} {nameLink} {details}
+ } } protected def stageRow(s: StageInfo): Seq[Node] = { - val poolName = listener.stageIdToPool.get(s.stageId) + val stageData = listener.stageUIData(s.stageId) val submissionTime = s.submissionTime match { case Some(t) => UIUtils.formatDate(new Date(t)) case None => "Unknown" @@ -115,30 +116,17 @@ private[ui] class StageTableBase( if (finishTime > t) finishTime - t else System.currentTimeMillis - t } val formattedDuration = duration.map(d => UIUtils.formatDuration(d)).getOrElse("Unknown") - val startedTasks = - listener.stageIdToTasksActive.getOrElse(s.stageId, HashMap[Long, TaskInfo]()).size - val completedTasks = listener.stageIdToTasksComplete.getOrElse(s.stageId, 0) - val failedTasks = listener.stageIdToTasksFailed.getOrElse(s.stageId, 0) match { - case f if f > 0 => "(%s failed)".format(f) - case _ => "" - } - val totalTasks = s.numTasks - val shuffleReadSortable = listener.stageIdToShuffleRead.getOrElse(s.stageId, 0L) - val shuffleRead = shuffleReadSortable match { - case 0 => "" - case b => Utils.bytesToString(b) - } - val shuffleWriteSortable = listener.stageIdToShuffleWrite.getOrElse(s.stageId, 0L) - val shuffleWrite = shuffleWriteSortable match { - case 0 => "" - case b => Utils.bytesToString(b) - } + + val shuffleRead = stageData.shuffleReadBytes + val shuffleReadWithUnit = if (shuffleRead > 0) Utils.bytesToString(shuffleRead) else "" + val shuffleWrite = stageData.shuffleWriteBytes + val shuffleWriteWithUnit = if (shuffleWrite > 0) Utils.bytesToString(shuffleWrite) else "" {s.stageId} ++ {if (isFairScheduler) { - {poolName.get} + .format(UIUtils.prependBaseUri(basePath), stageData.schedulingPool)}> + {stageData.schedulingPool} } else { @@ -148,10 +136,11 @@ private[ui] class StageTableBase( {submissionTime} {formattedDuration} - {makeProgressBar(startedTasks, completedTasks, failedTasks, totalTasks)} + {makeProgressBar(stageData.numActiveTasks, stageData.numCompleteTasks, + stageData.numFailedTasks, s.numTasks)} - {shuffleRead} - {shuffleWrite} + {shuffleReadWithUnit} + {shuffleWriteWithUnit} } /** Render an HTML row that represents a stage */ @@ -168,7 +157,7 @@ private[ui] class FailedStageTable( override protected def stageRow(s: StageInfo): Seq[Node] = { val basicColumns = super.stageRow(s) - val failureReason = {s.failureReason.getOrElse("")} + val failureReason =
{s.failureReason.getOrElse("")}
basicColumns ++ failureReason } } diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala b/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala index deee272bcb6c6..0da87774839a5 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala @@ -34,19 +34,20 @@ private[jobs] object UIData { var diskBytesSpilled : Long = 0 } - class StageData { + class StageUIData { + var numActiveTasks: Int = _ + var numCompleteTasks: Int = _ + var numFailedTasks: Int = _ + var executorRunTime: Long = _ + var shuffleReadBytes: Long = _ var shuffleWriteBytes: Long = _ var memoryBytesSpilled: Long = _ var diskBytesSpilled: Long = _ - var numActiveTasks: Int = _ - var numCompleteTasks: Int = _ - var numFailedTasks: Int = _ - - var schedulingPool: String = _ - var description: String = _ + var schedulingPool: String = "" + var description: Option[String] = None var taskData = new HashMap[Long, TaskUIData] var executorSummary = new HashMap[String, ExecutorSummary] diff --git a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala index fa43b66c6cb5a..b6b8293c37312 100644 --- a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala @@ -47,11 +47,11 @@ class JobProgressListenerSuite extends FunSuite with LocalSparkContext with Matc } listener.completedStages.size should be (5) - listener.completedStages.filter(_.stageId == 50).size should be (1) - listener.completedStages.filter(_.stageId == 49).size should be (1) - listener.completedStages.filter(_.stageId == 48).size should be (1) - listener.completedStages.filter(_.stageId == 47).size should be (1) - listener.completedStages.filter(_.stageId == 46).size should be (1) + listener.completedStages.count(_.stageId == 50) should be (1) + listener.completedStages.count(_.stageId == 49) should be (1) + listener.completedStages.count(_.stageId == 48) should be (1) + listener.completedStages.count(_.stageId == 47) should be (1) + listener.completedStages.count(_.stageId == 46) should be (1) } test("test executor id to summary") { @@ -59,9 +59,7 @@ class JobProgressListenerSuite extends FunSuite with LocalSparkContext with Matc val listener = new JobProgressListener(conf) val taskMetrics = new TaskMetrics() val shuffleReadMetrics = new ShuffleReadMetrics() - - // nothing in it - assert(listener.stageIdToExecutorSummaries.size == 0) + assert(listener.stageUIData.size === 0) // finish this task, should get updated shuffleRead shuffleReadMetrics.remoteBytesRead = 1000 @@ -71,8 +69,8 @@ class JobProgressListenerSuite extends FunSuite with LocalSparkContext with Matc var task = new ShuffleMapTask(0, null, null, 0, null) val taskType = Utils.getFormattedClassName(task) listener.onTaskEnd(SparkListenerTaskEnd(task.stageId, taskType, Success, taskInfo, taskMetrics)) - assert(listener.stageIdToExecutorSummaries.getOrElse(0, fail()).getOrElse("exe-1", fail()) - .shuffleRead == 1000) + assert(listener.stageUIData.getOrElse(0, fail()).executorSummary.getOrElse("exe-1", fail()) + .shuffleRead === 1000) // finish a task with unknown executor-id, nothing should happen taskInfo = @@ -80,7 +78,7 @@ class JobProgressListenerSuite extends FunSuite with LocalSparkContext with Matc taskInfo.finishTime = 1 task = new ShuffleMapTask(0, null, null, 0, null) listener.onTaskEnd(SparkListenerTaskEnd(task.stageId, taskType, Success, taskInfo, taskMetrics)) - assert(listener.stageIdToExecutorSummaries.size == 1) + assert(listener.stageUIData.size === 1) // finish this task, should get updated duration shuffleReadMetrics.remoteBytesRead = 1000 @@ -89,8 +87,8 @@ class JobProgressListenerSuite extends FunSuite with LocalSparkContext with Matc taskInfo.finishTime = 1 task = new ShuffleMapTask(0, null, null, 0, null) listener.onTaskEnd(SparkListenerTaskEnd(task.stageId, taskType, Success, taskInfo, taskMetrics)) - assert(listener.stageIdToExecutorSummaries.getOrElse(0, fail()).getOrElse("exe-1", fail()) - .shuffleRead == 2000) + assert(listener.stageUIData.getOrElse(0, fail()).executorSummary.getOrElse("exe-1", fail()) + .shuffleRead === 2000) // finish this task, should get updated duration shuffleReadMetrics.remoteBytesRead = 1000 @@ -99,8 +97,8 @@ class JobProgressListenerSuite extends FunSuite with LocalSparkContext with Matc taskInfo.finishTime = 1 task = new ShuffleMapTask(0, null, null, 0, null) listener.onTaskEnd(SparkListenerTaskEnd(task.stageId, taskType, Success, taskInfo, taskMetrics)) - assert(listener.stageIdToExecutorSummaries.getOrElse(0, fail()).getOrElse("exe-2", fail()) - .shuffleRead == 1000) + assert(listener.stageUIData.getOrElse(0, fail()).executorSummary.getOrElse("exe-2", fail()) + .shuffleRead === 1000) } test("test task success vs failure counting for different task end reasons") { @@ -121,13 +119,17 @@ class JobProgressListenerSuite extends FunSuite with LocalSparkContext with Matc TaskKilled, ExecutorLostFailure, UnknownReason) + var failCount = 0 for (reason <- taskFailedReasons) { listener.onTaskEnd(SparkListenerTaskEnd(task.stageId, taskType, reason, taskInfo, metrics)) - assert(listener.stageIdToTasksComplete.get(task.stageId) === None) + failCount += 1 + assert(listener.stageUIData(task.stageId).numCompleteTasks === 0) + assert(listener.stageUIData(task.stageId).numFailedTasks === failCount) } // Make sure we count success as success. listener.onTaskEnd(SparkListenerTaskEnd(task.stageId, taskType, Success, taskInfo, metrics)) - assert(listener.stageIdToTasksComplete.get(task.stageId) === Some(1)) + assert(listener.stageUIData(task.stageId).numCompleteTasks === 1) + assert(listener.stageUIData(task.stageId).numFailedTasks === failCount) } }