From fb721561dc4b1aff36ffad083accab691dd5e627 Mon Sep 17 00:00:00 2001 From: Mark Mims Date: Thu, 30 Oct 2014 15:55:30 -0600 Subject: [PATCH 1/4] mimic hasInput. The basics work here, but wanna clean this up with maybeAccumulators for column content --- .../org/apache/spark/ui/jobs/StagePage.scala | 19 ++++++++++++------- 1 file changed, 12 insertions(+), 7 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala index 2414e4c65237e..12d54fa1daf0c 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala @@ -52,6 +52,7 @@ private[ui] class StagePage(parent: JobProgressTab) extends WebUIPage("stage") { val numCompleted = tasks.count(_.taskInfo.finished) val accumulables = listener.stageIdToData((stageId, stageAttemptId)).accumulables + val hasAccumulators = accumulables.size > 0 val hasInput = stageData.inputBytes > 0 val hasShuffleRead = stageData.shuffleReadBytes > 0 val hasShuffleWrite = stageData.shuffleWriteBytes > 0 @@ -104,7 +105,8 @@ private[ui] class StagePage(parent: JobProgressTab) extends WebUIPage("stage") { val taskHeaders: Seq[String] = Seq( "Index", "ID", "Attempt", "Status", "Locality Level", "Executor ID / Host", - "Launch Time", "Duration", "GC Time", "Accumulators") ++ + "Launch Time", "Duration", "GC Time") ++ + {if (hasAccumulators) Seq("Accumulators") else Nil} ++ {if (hasInput) Seq("Input") else Nil} ++ {if (hasShuffleRead) Seq("Shuffle Read") else Nil} ++ {if (hasShuffleWrite) Seq("Write Time", "Shuffle Write") else Nil} ++ @@ -112,7 +114,7 @@ private[ui] class StagePage(parent: JobProgressTab) extends WebUIPage("stage") { Seq("Errors") val taskTable = UIUtils.listingTable( - taskHeaders, taskRow(hasInput, hasShuffleRead, hasShuffleWrite, hasBytesSpilled), tasks) + taskHeaders, taskRow(hasAccumulators, hasInput, hasShuffleRead, hasShuffleWrite, hasBytesSpilled), tasks) // Excludes tasks which failed and have incomplete metrics val validTasks = tasks.filter(t => t.taskInfo.status == "SUCCESS" && t.taskMetrics.isDefined) @@ -232,6 +234,7 @@ private[ui] class StagePage(parent: JobProgressTab) extends WebUIPage("stage") { } def taskRow( + hasAccumulators: Boolean, hasInput: Boolean, hasShuffleRead: Boolean, hasShuffleWrite: Boolean, @@ -290,17 +293,19 @@ private[ui] class StagePage(parent: JobProgressTab) extends WebUIPage("stage") { {if (gcTime > 0) UIUtils.formatDuration(gcTime) else ""} - - {Unparsed( - info.accumulables.map{acc => s"${acc.name}: ${acc.update.get}"}.mkString("
") - )} - + {if (hasAccumulators) { + + {Unparsed( + info.accumulables.map{acc => s"${acc.name}: ${acc.update.get}"}.mkString("
") + )} + + }} {if (hasInput) { {inputReadable} From c28c449d94f2af67e68ef32d330a9057d795df94 Mon Sep 17 00:00:00 2001 From: Mark Mims Date: Thu, 30 Oct 2014 18:23:47 -0600 Subject: [PATCH 2/4] looking much better now... minimal explicit formatting. Now, see if any sort keys make sense --- .../main/scala/org/apache/spark/ui/jobs/StagePage.scala | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala index 12d54fa1daf0c..cc32be5d171ea 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala @@ -247,6 +247,11 @@ private[ui] class StagePage(parent: JobProgressTab) extends WebUIPage("stage") { val gcTime = metrics.map(_.jvmGCTime).getOrElse(0L) val serializationTime = metrics.map(_.resultSerializationTime).getOrElse(0L) + val maybeAccumulators = info.accumulables + //val accumulatorsSortable = maybeAccumulators.map(_.name) + val accumulatorsReadable = maybeAccumulators + .map{acc => s"${acc.name}: ${acc.update.get}"} + val maybeInput = metrics.flatMap(_.inputMetrics) val inputSortable = maybeInput.map(_.bytesRead.toString).getOrElse("") val inputReadable = maybeInput @@ -301,9 +306,7 @@ private[ui] class StagePage(parent: JobProgressTab) extends WebUIPage("stage") { --> {if (hasAccumulators) { - {Unparsed( - info.accumulables.map{acc => s"${acc.name}: ${acc.update.get}"}.mkString("
") - )} + {Unparsed(accumulatorsReadable.mkString("
"))} }} {if (hasInput) { From 390893b016ff63d492263703f334e2645cdca0a1 Mon Sep 17 00:00:00 2001 From: Mark Mims Date: Thu, 30 Oct 2014 18:31:47 -0600 Subject: [PATCH 3/4] cleanup --- core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala index cc32be5d171ea..e09ba75901638 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala @@ -248,9 +248,7 @@ private[ui] class StagePage(parent: JobProgressTab) extends WebUIPage("stage") { val serializationTime = metrics.map(_.resultSerializationTime).getOrElse(0L) val maybeAccumulators = info.accumulables - //val accumulatorsSortable = maybeAccumulators.map(_.name) - val accumulatorsReadable = maybeAccumulators - .map{acc => s"${acc.name}: ${acc.update.get}"} + val accumulatorsReadable = maybeAccumulators.map{acc => s"${acc.name}: ${acc.update.get}"} val maybeInput = metrics.flatMap(_.inputMetrics) val inputSortable = maybeInput.map(_.bytesRead.toString).getOrElse("") From 6141cb38be6e9d3d8b9796eab0015891885ee0fb Mon Sep 17 00:00:00 2001 From: Mark Mims Date: Fri, 31 Oct 2014 06:19:57 -0600 Subject: [PATCH 4/4] reformat to satisfy scalastyle linelength. build failed from jenkins https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/22604/ --- core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala index e09ba75901638..9dcf980341a34 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala @@ -114,7 +114,9 @@ private[ui] class StagePage(parent: JobProgressTab) extends WebUIPage("stage") { Seq("Errors") val taskTable = UIUtils.listingTable( - taskHeaders, taskRow(hasAccumulators, hasInput, hasShuffleRead, hasShuffleWrite, hasBytesSpilled), tasks) + taskHeaders, + taskRow(hasAccumulators, hasInput, hasShuffleRead, hasShuffleWrite, hasBytesSpilled), + tasks) // Excludes tasks which failed and have incomplete metrics val validTasks = tasks.filter(t => t.taskInfo.status == "SUCCESS" && t.taskMetrics.isDefined)