Skip to content

Commit

Permalink
merge
Browse files Browse the repository at this point in the history
  • Loading branch information
GuoPhilipse committed Jun 8, 2020
2 parents 3fd6d02 + fd677c9 commit 108dfea
Show file tree
Hide file tree
Showing 5 changed files with 177 additions and 271 deletions.
3 changes: 1 addition & 2 deletions core/src/main/resources/org/apache/spark/ui/static/webui.js
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,7 @@ $(function() {
collapseTablePageLoad('collapse-aggregated-poolActiveStages','aggregated-poolActiveStages');
collapseTablePageLoad('collapse-aggregated-tasks','aggregated-tasks');
collapseTablePageLoad('collapse-aggregated-rdds','aggregated-rdds');
collapseTablePageLoad('collapse-aggregated-waitingBatches','aggregated-waitingBatches');
collapseTablePageLoad('collapse-aggregated-runningBatches','aggregated-runningBatches');
collapseTablePageLoad('collapse-aggregated-activeBatches','aggregated-activeBatches');
collapseTablePageLoad('collapse-aggregated-completedBatches','aggregated-completedBatches');
collapseTablePageLoad('collapse-aggregated-runningExecutions','aggregated-runningExecutions');
collapseTablePageLoad('collapse-aggregated-completedExecutions','aggregated-completedExecutions');
Expand Down
5 changes: 1 addition & 4 deletions python/pyspark/sql/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,11 +97,8 @@ class UnknownException(CapturedException):
def convert_exception(e):
s = e.toString()
c = e.getCause()
stacktrace = SparkContext._jvm.org.apache.spark.util.Utils.exceptionString(e)

jvm = SparkContext._jvm
jwriter = jvm.java.io.StringWriter()
e.printStackTrace(jvm.java.io.PrintWriter(jwriter))
stacktrace = jwriter.toString()
if s.startswith('org.apache.spark.sql.AnalysisException: '):
return AnalysisException(s.split(': ', 1)[1], stacktrace, c)
if s.startswith('org.apache.spark.sql.catalyst.analysis'):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,41 +17,30 @@

package org.apache.spark.streaming.ui

import java.net.URLEncoder
import java.nio.charset.StandardCharsets.UTF_8
import javax.servlet.http.HttpServletRequest

import scala.xml.Node

import org.apache.spark.ui.{PagedDataSource, PagedTable, UIUtils => SparkUIUtils}

private[ui] class StreamingPagedTable(
request: HttpServletRequest,
tableTag: String,
batches: Seq[BatchUIData],
basePath: String,
subPath: String,
batchInterval: Long) extends PagedTable[BatchUIData] {

private val(sortColumn, desc, pageSize) = getTableParameters(request, tableTag, "Batch Time")
private val parameterPath = s"$basePath/$subPath/?${getParameterOtherTable(request, tableTag)}"
private val encodedSortColumn = URLEncoder.encode(sortColumn, UTF_8.name())

private val firstFailureReason: Option[String] =
if (!tableTag.equals("waitingBatches")) {
getFirstFailureReason(batches)
} else {
None
}
import org.apache.spark.ui.{UIUtils => SparkUIUtils}

private[ui] abstract class BatchTableBase(tableId: String, batchInterval: Long) {

protected def columns: Seq[Node] = {
<th>Batch Time</th>
<th>Records</th>
<th>Scheduling Delay
{SparkUIUtils.tooltip("Time taken by Streaming scheduler to submit jobs of a batch", "top")}
</th>
<th>Processing Time
{SparkUIUtils.tooltip("Time taken to process all jobs of a batch", "top")}</th>
}

/**
* Return the first failure reason if finding in the batches.
*/
private def getFirstFailureReason(batches: Seq[BatchUIData]): Option[String] = {
protected def getFirstFailureReason(batches: Seq[BatchUIData]): Option[String] = {
batches.flatMap(_.outputOperations.flatMap(_._2.failureReason)).headOption
}

private def getFirstFailureTableCell(batch: BatchUIData): Seq[Node] = {
protected def getFirstFailureTableCell(batch: BatchUIData): Seq[Node] = {
val firstFailureReason = batch.outputOperations.flatMap(_._2.failureReason).headOption
firstFailureReason.map { failureReason =>
val failureReasonForUI = UIUtils.createOutputOperationFailureForUI(failureReason)
Expand All @@ -60,154 +49,147 @@ private[ui] class StreamingPagedTable(
}.getOrElse(<td>-</td>)
}

private def createOutputOperationProgressBar(batch: BatchUIData): Seq[Node] = {
protected def baseRow(batch: BatchUIData): Seq[Node] = {
val batchTime = batch.batchTime.milliseconds
val formattedBatchTime = SparkUIUtils.formatBatchTime(batchTime, batchInterval)
val numRecords = batch.numRecords
val schedulingDelay = batch.schedulingDelay
val formattedSchedulingDelay = schedulingDelay.map(SparkUIUtils.formatDuration).getOrElse("-")
val processingTime = batch.processingDelay
val formattedProcessingTime = processingTime.map(SparkUIUtils.formatDuration).getOrElse("-")
val batchTimeId = s"batch-$batchTime"

<td id={batchTimeId} sorttable_customkey={batchTime.toString}
isFailed={batch.isFailed.toString}>
<a href={s"batch?id=$batchTime"}>
{formattedBatchTime}
</a>
</td>
<td sorttable_customkey={numRecords.toString}>{numRecords.toString} records</td>
<td sorttable_customkey={schedulingDelay.getOrElse(Long.MaxValue).toString}>
{formattedSchedulingDelay}
</td>
<td sorttable_customkey={processingTime.getOrElse(Long.MaxValue).toString}>
{formattedProcessingTime}
</td>
}

private def batchTable: Seq[Node] = {
<table id={tableId} class="table table-bordered table-striped table-sm sortable">
<thead>
{columns}
</thead>
<tbody>
{renderRows}
</tbody>
</table>
}

def toNodeSeq: Seq[Node] = {
batchTable
}

protected def createOutputOperationProgressBar(batch: BatchUIData): Seq[Node] = {
<td class="progress-cell">
{
SparkUIUtils.makeProgressBar(
started = batch.numActiveOutputOp,
completed = batch.numCompletedOutputOp,
failed = batch.numFailedOutputOp,
skipped = 0,
reasonToNumKilled = Map.empty,
total = batch.outputOperations.size)
SparkUIUtils.makeProgressBar(
started = batch.numActiveOutputOp,
completed = batch.numCompletedOutputOp,
failed = batch.numFailedOutputOp,
skipped = 0,
reasonToNumKilled = Map.empty,
total = batch.outputOperations.size)
}
</td>
}

override def tableId: String = s"$tableTag-table"
/**
* Return HTML for all rows of this table.
*/
protected def renderRows: Seq[Node]
}

override def tableCssClass: String =
"table table-bordered table-sm table-striped table-head-clickable table-cell-width-limited"
private[ui] class ActiveBatchTable(
runningBatches: Seq[BatchUIData],
waitingBatches: Seq[BatchUIData],
batchInterval: Long) extends BatchTableBase("active-batches-table", batchInterval) {

override def pageSizeFormField: String = s"$tableTag.pageSize"
private val firstFailureReason = getFirstFailureReason(runningBatches)

override def pageNumberFormField: String = s"$tableTag.page"
override protected def columns: Seq[Node] = super.columns ++ {
<th>Output Ops: Succeeded/Total</th>
<th>Status</th> ++ {
if (firstFailureReason.nonEmpty) {
<th>Error</th>
} else {
Nil
}
}
}

override def pageLink(page: Int): String = {
parameterPath +
s"&$tableTag.sort=$encodedSortColumn" +
s"&$tableTag.desc=$desc" +
s"&$pageNumberFormField=$page" +
s"&$pageSizeFormField=$pageSize" +
s"#$tableTag"
override protected def renderRows: Seq[Node] = {
// The "batchTime"s of "waitingBatches" must be greater than "runningBatches"'s, so display
// waiting batches before running batches
waitingBatches.flatMap(batch => <tr>{waitingBatchRow(batch)}</tr>) ++
runningBatches.flatMap(batch => <tr>{runningBatchRow(batch)}</tr>)
}

override def goButtonFormPath: String =
s"$parameterPath&$tableTag.sort=$encodedSortColumn&$tableTag.desc=$desc#$tableTag"

override def dataSource: PagedDataSource[BatchUIData] =
new StreamingDataSource(batches, pageSize, sortColumn, desc)

override def headers: Seq[Node] = {
// headers, sortable and tooltips
val headersAndCssClasses: Seq[(String, Boolean, Option[String])] = {
Seq(
("Batch Time", true, None),
("Records", true, None),
("Scheduling Delay", true, Some("Time taken by Streaming scheduler to submit jobs " +
"of a batch")),
("Processing Time", true, Some("Time taken to process all jobs of a batch"))) ++ {
if (tableTag.equals("completedBatches")) {
Seq(
("Total Delay", true, Some("Total time taken to handle a batch")),
("Output Ops: Succeeded/Total", false, None))
} else {
Seq(
("Output Ops: Succeeded/Total", false, None),
("Status", false, None))
}
} ++ {
if (firstFailureReason.nonEmpty) {
Seq(("Error", false, None))
} else {
Nil
}
private def runningBatchRow(batch: BatchUIData): Seq[Node] = {
baseRow(batch) ++ createOutputOperationProgressBar(batch) ++ <td>processing</td> ++ {
if (firstFailureReason.nonEmpty) {
getFirstFailureTableCell(batch)
} else {
Nil
}
}
// check if sort column is a valid sortable column
isSortColumnValid(headersAndCssClasses, sortColumn)

headerRow(headersAndCssClasses, desc, pageSize, sortColumn, parameterPath, tableTag, tableTag)
}

override def row(batch: BatchUIData): Seq[Node] = {
val batchTime = batch.batchTime.milliseconds
val formattedBatchTime = SparkUIUtils.formatBatchTime(batchTime, batchInterval)
val numRecords = batch.numRecords
val schedulingDelay = batch.schedulingDelay
val formattedSchedulingDelay = schedulingDelay.map(SparkUIUtils.formatDuration).getOrElse("-")
val processingTime = batch.processingDelay
val formattedProcessingTime = processingTime.map(SparkUIUtils.formatDuration).getOrElse("-")
val batchTimeId = s"batch-$batchTime"
val totalDelay = batch.totalDelay
val formattedTotalDelay = totalDelay.map(SparkUIUtils.formatDuration).getOrElse("-")

<tr>
<td id={batchTimeId} isFailed={batch.isFailed.toString}>
<a href={s"batch?id=$batchTime"}>
{formattedBatchTime}
</a>
</td>
<td> {numRecords.toString} records </td>
<td> {formattedSchedulingDelay} </td>
<td> {formattedProcessingTime} </td>
{
if (tableTag.equals("completedBatches")) {
<td> {formattedTotalDelay} </td> ++
createOutputOperationProgressBar(batch) ++ {
if (firstFailureReason.nonEmpty) {
getFirstFailureTableCell(batch)
} else {
Nil
}
}
} else if (tableTag.equals("runningBatches")) {
createOutputOperationProgressBar(batch) ++
<td> processing </td> ++ {
if (firstFailureReason.nonEmpty) {
getFirstFailureTableCell(batch)
} else {
Nil
}
}
} else {
createOutputOperationProgressBar(batch) ++
<td> queued </td> ++ {
if (firstFailureReason.nonEmpty) {
// Waiting batches have not run yet, so must have no failure reasons.
<td>-</td>
} else {
Nil
}
}
}
private def waitingBatchRow(batch: BatchUIData): Seq[Node] = {
baseRow(batch) ++ createOutputOperationProgressBar(batch) ++ <td>queued</td>++ {
if (firstFailureReason.nonEmpty) {
// Waiting batches have not run yet, so must have no failure reasons.
<td>-</td>
} else {
Nil
}
</tr>
}
}
}

private[ui] class StreamingDataSource(info: Seq[BatchUIData], pageSize: Int, sortColumn: String,
desc: Boolean) extends PagedDataSource[BatchUIData](pageSize) {
private[ui] class CompletedBatchTable(batches: Seq[BatchUIData], batchInterval: Long)
extends BatchTableBase("completed-batches-table", batchInterval) {

private val data = info.sorted(ordering(sortColumn, desc))
private val firstFailureReason = getFirstFailureReason(batches)

override protected def dataSize: Int = data.size
override protected def columns: Seq[Node] = super.columns ++ {
<th>Total Delay {SparkUIUtils.tooltip("Total time taken to handle a batch", "top")}</th>
<th>Output Ops: Succeeded/Total</th> ++ {
if (firstFailureReason.nonEmpty) {
<th>Error</th>
} else {
Nil
}
}
}

override protected def sliceData(from: Int, to: Int): Seq[BatchUIData] = data.slice(from, to)
override protected def renderRows: Seq[Node] = {
batches.flatMap(batch => <tr>{completedBatchRow(batch)}</tr>)
}

private def ordering(column: String, desc: Boolean): Ordering[BatchUIData] = {
val ordering: Ordering[BatchUIData] = column match {
case "Batch Time" => Ordering.by(_.batchTime.milliseconds)
case "Records" => Ordering.by(_.numRecords)
case "Scheduling Delay" => Ordering.by(_.schedulingDelay.getOrElse(Long.MaxValue))
case "Processing Time" => Ordering.by(_.processingDelay.getOrElse(Long.MaxValue))
case "Total Delay" => Ordering.by(_.totalDelay.getOrElse(Long.MaxValue))
case unknownColumn => throw new IllegalArgumentException(s"Unknown Column: $unknownColumn")
}
if (desc) {
ordering.reverse
} else {
ordering
private def completedBatchRow(batch: BatchUIData): Seq[Node] = {
val totalDelay = batch.totalDelay
val formattedTotalDelay = totalDelay.map(SparkUIUtils.formatDuration).getOrElse("-")

baseRow(batch) ++ {
<td sorttable_customkey={totalDelay.getOrElse(Long.MaxValue).toString}>
{formattedTotalDelay}
</td>
} ++ createOutputOperationProgressBar(batch)++ {
if (firstFailureReason.nonEmpty) {
getFirstFailureTableCell(batch)
} else {
Nil
}
}
}
}

0 comments on commit 108dfea

Please sign in to comment.