From 612c18c845f1a1840af316af92d43056cc85c502 Mon Sep 17 00:00:00 2001 From: Carson Wang Date: Mon, 27 Jul 2015 16:16:30 +0800 Subject: [PATCH 1/2] RDD partition table pagination for the RDD Page --- .../org/apache/spark/ui/PagedTable.scala | 16 +- .../org/apache/spark/ui/jobs/StagePage.scala | 4 +- .../org/apache/spark/ui/storage/RDDPage.scala | 231 +++++++++++++++--- 3 files changed, 210 insertions(+), 41 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/ui/PagedTable.scala b/core/src/main/scala/org/apache/spark/ui/PagedTable.scala index 17d7b39c2d951..6e2375477a688 100644 --- a/core/src/main/scala/org/apache/spark/ui/PagedTable.scala +++ b/core/src/main/scala/org/apache/spark/ui/PagedTable.scala @@ -159,9 +159,9 @@ private[ui] trait PagedTable[T] { // "goButtonJsFuncName" val formJs = s"""$$(function(){ - | $$( "#form-task-page" ).submit(function(event) { - | var page = $$("#form-task-page-no").val() - | var pageSize = $$("#form-task-page-size").val() + | $$( "#form-$tableId-page" ).submit(function(event) { + | var page = $$("#form-$tableId-page-no").val() + | var pageSize = $$("#form-$tableId-page-size").val() | pageSize = pageSize ? pageSize: 100; | if (page != "") { | ${goButtonJsFuncName}(page, pageSize); @@ -173,12 +173,14 @@ private[ui] trait PagedTable[T] {
-
+ - + - - + +
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala index cf04b5e59239b..d3d74dbf176cf 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala @@ -1145,7 +1145,7 @@ private[ui] class TaskPagedTable( sortColumn: String, desc: Boolean) extends PagedTable[TaskTableRowData]{ - override def tableId: String = "" + override def tableId: String = "task-table" override def tableCssClass: String = "table table-bordered table-condensed table-striped" @@ -1218,7 +1218,7 @@ private[ui] class TaskPagedTable( Seq(("Errors", "")) if (!taskHeadersAndCssClasses.map(_._1).contains(sortColumn)) { - new IllegalArgumentException(s"Unknown column: $sortColumn") + throw new IllegalArgumentException(s"Unknown column: $sortColumn") } val headerRow: Seq[Node] = { diff --git a/core/src/main/scala/org/apache/spark/ui/storage/RDDPage.scala b/core/src/main/scala/org/apache/spark/ui/storage/RDDPage.scala index 36943978ff594..e77a130e151f4 100644 --- a/core/src/main/scala/org/apache/spark/ui/storage/RDDPage.scala +++ b/core/src/main/scala/org/apache/spark/ui/storage/RDDPage.scala @@ -17,12 +17,13 @@ package org.apache.spark.ui.storage +import java.net.URLEncoder import javax.servlet.http.HttpServletRequest -import scala.xml.Node +import scala.xml.{Unparsed, Node} import org.apache.spark.status.api.v1.{AllRDDResource, RDDDataDistribution, RDDPartitionInfo} -import org.apache.spark.ui.{UIUtils, WebUIPage} +import org.apache.spark.ui.{PagedTable, PagedDataSource, UIUtils, WebUIPage} import org.apache.spark.util.Utils /** Page showing storage details for a given RDD */ @@ -32,6 +33,18 @@ private[ui] class RDDPage(parent: StorageTab) extends WebUIPage("rdd") { def render(request: HttpServletRequest): Seq[Node] = { val parameterId = request.getParameter("id") require(parameterId != null && parameterId.nonEmpty, "Missing id parameter") + + val parameterBlockPage = request.getParameter("block.page") + val parameterBlockSortColumn = request.getParameter("block.sort") + val parameterBlockSortDesc = request.getParameter("block.desc") + val parameterBlockPageSize = request.getParameter("block.pageSize") + + val blockPage = Option(parameterBlockPage).map(_.toInt).getOrElse(1) + val blockSortColumn = Option(parameterBlockSortColumn).getOrElse("Block Name") + val blockSortDesc = Option(parameterBlockSortDesc).map(_.toBoolean).getOrElse(false) + val blockPageSize = Option(parameterBlockPageSize).map(_.toInt).getOrElse(100) + + val rddId = parameterId.toInt val rddStorageInfo = AllRDDResource.getRDDStorageInfo(rddId, listener, includeDetails = true) .getOrElse { @@ -44,8 +57,34 @@ private[ui] class RDDPage(parent: StorageTab) extends WebUIPage("rdd") { rddStorageInfo.dataDistribution.get, id = Some("rdd-storage-by-worker-table")) // Block table - val blockTable = UIUtils.listingTable(blockHeader, blockRow, rddStorageInfo.partitions.get, - id = Some("rdd-storage-by-block-table")) + val (blockTable, blockTableHTML) = try { + val _blockTable = new BlockPagedTable( + UIUtils.prependBaseUri(parent.basePath) + s"/storage/rdd/?id=${rddId}", + rddStorageInfo.partitions.get, + blockPageSize, + blockSortColumn, + blockSortDesc + ) + (_blockTable, _blockTable.table(blockPage)) + } catch { + case e @ (_ : IllegalArgumentException | _ : IndexOutOfBoundsException) => + (null,
{e.getMessage}
) + } + + val jsForScrollingDownToBlockTable = + val content =
@@ -85,11 +124,11 @@ private[ui] class RDDPage(parent: StorageTab) extends WebUIPage("rdd") {
-
-
-

{rddStorageInfo.partitions.map(_.size).getOrElse(0)} Partitions

- {blockTable} -
+
+

+ {rddStorageInfo.partitions.map(_.size).getOrElse(0)} Partitions +

+ {blockTableHTML ++ jsForScrollingDownToBlockTable}
; UIUtils.headerSparkPage("RDD Storage Info for " + rddStorageInfo.name, content, parent) @@ -101,14 +140,6 @@ private[ui] class RDDPage(parent: StorageTab) extends WebUIPage("rdd") { "Memory Usage", "Disk Usage") - /** Header fields for the block table */ - private def blockHeader = Seq( - "Block Name", - "Storage Level", - "Size in Memory", - "Size on Disk", - "Executors") - /** Render an HTML row representing a worker */ private def workerRow(worker: RDDDataDistribution): Seq[Node] = { @@ -120,23 +151,159 @@ private[ui] class RDDPage(parent: StorageTab) extends WebUIPage("rdd") { {Utils.bytesToString(worker.diskUsed)} } +} + +private[ui] case class BlockTableRowData( + blockName: String, + storageLevel: String, + memoryUsed: Long, + diskUsed: Long, + executors: String) + +private[ui] class BlockDataSource( + rddPartitions: Seq[RDDPartitionInfo], + pageSize: Int, + sortColumn: String, + desc: Boolean) extends PagedDataSource[BlockTableRowData](pageSize) { + + private val data = rddPartitions.map(blockRow).sorted(ordering(sortColumn, desc)) + + override def dataSize: Int = data.size + + override def sliceData(from: Int, to: Int): Seq[BlockTableRowData] = { + data.slice(from, to) + } + + private def blockRow(rddPartition: RDDPartitionInfo): BlockTableRowData = { + BlockTableRowData( + rddPartition.blockName, + rddPartition.storageLevel, + rddPartition.memoryUsed, + rddPartition.diskUsed, + rddPartition.executors.mkString(" ") + ) + } + + /** + * Return Ordering according to sortColumn and desc + */ + private def ordering(sortColumn: String, desc: Boolean): Ordering[BlockTableRowData] = { + val ordering = sortColumn match { + case "Block Name" => new Ordering[BlockTableRowData] { + override def compare(x: BlockTableRowData, y: BlockTableRowData): Int = + Ordering.String.compare(x.blockName, y.blockName) + } + case "Storage Level" => new Ordering[BlockTableRowData] { + override def compare(x: BlockTableRowData, y: BlockTableRowData): Int = + Ordering.String.compare(x.storageLevel, y.storageLevel) + } + case "Size in Memory" => new Ordering[BlockTableRowData] { + override def compare(x: BlockTableRowData, y: BlockTableRowData): Int = + Ordering.Long.compare(x.memoryUsed, y.memoryUsed) + } + case "Size on Disk" => new Ordering[BlockTableRowData] { + override def compare(x: BlockTableRowData, y: BlockTableRowData): Int = + Ordering.Long.compare(x.diskUsed, y.diskUsed) + } + case "Executors" => new Ordering[BlockTableRowData] { + override def compare(x: BlockTableRowData, y: BlockTableRowData): Int = + Ordering.String.compare(x.executors, y.executors) + } + case unknownColumn => throw new IllegalArgumentException(s"Unknown column: $unknownColumn") + } + if (desc) { + ordering.reverse + } else { + ordering + } + } +} + +private[ui] class BlockPagedTable( + basePath: String, + rddPartitions: Seq[RDDPartitionInfo], + pageSize: Int, + sortColumn: String, + desc: Boolean) extends PagedTable[BlockTableRowData] { + + override def tableId: String = "rdd-storage-by-block-table" + + override def tableCssClass: String = "table table-bordered table-condensed table-striped" + + override val dataSource: BlockDataSource = new BlockDataSource( + rddPartitions, + pageSize, + sortColumn, + desc + ) + + override def pageLink(page: Int): String = { + val encodedSortColumn = URLEncoder.encode(sortColumn, "UTF-8") + s"${basePath}&block.page=$page&block.sort=${encodedSortColumn}&block.desc=${desc}" + + s"&block.pageSize=${pageSize}" + } + + override def goButtonJavascriptFunction: (String, String) = { + val jsFuncName = "goToBlockPage" + val encodedSortColumn = URLEncoder.encode(sortColumn, "UTF-8") + val jsFunc = s""" + |currentBlockPageSize = ${pageSize} + |function goToBlockPage(page, pageSize) { + | // Set page to 1 if the page size changes + | page = pageSize == currentBlockPageSize ? page : 1; + | var url = "${basePath}&block.sort=${encodedSortColumn}&block.desc=${desc}" + + | "&block.page=" + page + "&block.pageSize=" + pageSize; + | window.location.href = url; + |} + """.stripMargin + (jsFuncName, jsFunc) + } - /** Render an HTML row representing a block */ - private def blockRow(row: RDDPartitionInfo): Seq[Node] = { + override def headers: Seq[Node] = { + val blockHeaders = Seq( + "Block Name", + "Storage Level", + "Size in Memory", + "Size on Disk", + "Executors") + + if (!blockHeaders.contains(sortColumn)) { + throw new IllegalArgumentException(s"Unknown column: $sortColumn") + } + + val headerRow: Seq[Node] = { + blockHeaders.map { header => + if (header == sortColumn) { + val headerLink = + s"$basePath&block.sort=${URLEncoder.encode(header, "UTF-8")}&block.desc=${!desc}" + + s"&block.pageSize=${pageSize}" + val js = Unparsed(s"window.location.href='${headerLink}'") + val arrow = if (desc) "▾" else "▴" // UP or DOWN + + {header} +  {Unparsed(arrow)} + + } else { + val headerLink = + s"$basePath&block.sort=${URLEncoder.encode(header, "UTF-8")}" + + s"&block.pageSize=${pageSize}" + val js = Unparsed(s"window.location.href='${headerLink}'") + + {header} + + } + } + } + {headerRow} + } + + override def row(block: BlockTableRowData): Seq[Node] = { - {row.blockName} - - {row.storageLevel} - - - {Utils.bytesToString(row.memoryUsed)} - - - {Utils.bytesToString(row.diskUsed)} - - - {row.executors.map(l => {l}
)} - + {block.blockName} + {block.storageLevel} + {Utils.bytesToString(block.memoryUsed)} + {Utils.bytesToString(block.diskUsed)} + {block.executors} } } From 03c71686863d5aa98568b24395366d8e98a8d104 Mon Sep 17 00:00:00 2001 From: Carson Wang Date: Tue, 4 Aug 2015 09:13:39 +0800 Subject: [PATCH 2/2] Fix style issues --- .../org/apache/spark/ui/jobs/StagePage.scala | 6 +-- .../org/apache/spark/ui/storage/RDDPage.scala | 43 +++++++++---------- 2 files changed, 22 insertions(+), 27 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala index d3d74dbf176cf..9228ccd98bab7 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala @@ -945,8 +945,7 @@ private[ui] class TaskDataSource( shuffleRead, shuffleWrite, bytesSpilled, - errorMessage.getOrElse("") - ) + errorMessage.getOrElse("")) } /** @@ -1160,8 +1159,7 @@ private[ui] class TaskPagedTable( currentTime, pageSize, sortColumn, - desc - ) + desc) override def pageLink(page: Int): String = { val encodedSortColumn = URLEncoder.encode(sortColumn, "UTF-8") diff --git a/core/src/main/scala/org/apache/spark/ui/storage/RDDPage.scala b/core/src/main/scala/org/apache/spark/ui/storage/RDDPage.scala index e77a130e151f4..fd6cc3ed759b3 100644 --- a/core/src/main/scala/org/apache/spark/ui/storage/RDDPage.scala +++ b/core/src/main/scala/org/apache/spark/ui/storage/RDDPage.scala @@ -20,10 +20,10 @@ package org.apache.spark.ui.storage import java.net.URLEncoder import javax.servlet.http.HttpServletRequest -import scala.xml.{Unparsed, Node} +import scala.xml.{Node, Unparsed} import org.apache.spark.status.api.v1.{AllRDDResource, RDDDataDistribution, RDDPartitionInfo} -import org.apache.spark.ui.{PagedTable, PagedDataSource, UIUtils, WebUIPage} +import org.apache.spark.ui.{PagedDataSource, PagedTable, UIUtils, WebUIPage} import org.apache.spark.util.Utils /** Page showing storage details for a given RDD */ @@ -44,7 +44,6 @@ private[ui] class RDDPage(parent: StorageTab) extends WebUIPage("rdd") { val blockSortDesc = Option(parameterBlockSortDesc).map(_.toBoolean).getOrElse(false) val blockPageSize = Option(parameterBlockPageSize).map(_.toInt).getOrElse(100) - val rddId = parameterId.toInt val rddStorageInfo = AllRDDResource.getRDDStorageInfo(rddId, listener, includeDetails = true) .getOrElse { @@ -63,8 +62,7 @@ private[ui] class RDDPage(parent: StorageTab) extends WebUIPage("rdd") { rddStorageInfo.partitions.get, blockPageSize, blockSortColumn, - blockSortDesc - ) + blockSortDesc) (_blockTable, _blockTable.table(blockPage)) } catch { case e @ (_ : IllegalArgumentException | _ : IndexOutOfBoundsException) => @@ -73,16 +71,17 @@ private[ui] class RDDPage(parent: StorageTab) extends WebUIPage("rdd") { val jsForScrollingDownToBlockTable = @@ -176,12 +175,11 @@ private[ui] class BlockDataSource( private def blockRow(rddPartition: RDDPartitionInfo): BlockTableRowData = { BlockTableRowData( - rddPartition.blockName, - rddPartition.storageLevel, - rddPartition.memoryUsed, - rddPartition.diskUsed, - rddPartition.executors.mkString(" ") - ) + rddPartition.blockName, + rddPartition.storageLevel, + rddPartition.memoryUsed, + rddPartition.diskUsed, + rddPartition.executors.mkString(" ")) } /** @@ -234,8 +232,7 @@ private[ui] class BlockPagedTable( rddPartitions, pageSize, sortColumn, - desc - ) + desc) override def pageLink(page: Int): String = { val encodedSortColumn = URLEncoder.encode(sortColumn, "UTF-8")