-
Notifications
You must be signed in to change notification settings - Fork 28.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-30119][WebUI]Support Pagination for Batch Tables in Streaming Tab #26756
Conversation
Please review @srowen |
streaming/src/main/scala/org/apache/spark/streaming/ui/AllBatchesTable.scala
Outdated
Show resolved
Hide resolved
streaming/src/main/scala/org/apache/spark/streaming/ui/AllBatchesTable.scala
Outdated
Show resolved
Hide resolved
streaming/src/main/scala/org/apache/spark/streaming/ui/AllBatchesTable.scala
Show resolved
Hide resolved
streaming/src/main/scala/org/apache/spark/streaming/ui/AllBatchesTable.scala
Outdated
Show resolved
Hide resolved
streaming/src/main/scala/org/apache/spark/streaming/ui/AllBatchesTable.scala
Show resolved
Hide resolved
streaming/src/main/scala/org/apache/spark/streaming/ui/AllBatchesTable.scala
Outdated
Show resolved
Hide resolved
streaming/src/main/scala/org/apache/spark/streaming/ui/AllBatchesTable.scala
Outdated
Show resolved
Hide resolved
streaming/src/main/scala/org/apache/spark/streaming/ui/AllBatchesTable.scala
Outdated
Show resolved
Hide resolved
streaming/src/main/scala/org/apache/spark/streaming/ui/UIUtils.scala
Outdated
Show resolved
Hide resolved
streaming/src/test/scala/org/apache/spark/streaming/UISeleniumSuite.scala
Outdated
Show resolved
Hide resolved
streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala
Outdated
Show resolved
Hide resolved
streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala
Outdated
Show resolved
Hide resolved
cc @shahidki31 |
ok to test |
@iRakson above comments hasn't resolved? |
I will push with all the changes in few minutes. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Made one pass
val completedBatchTableHeaders = Seq("Batch Time", "Records", "Scheduling Delay", | ||
"Processing Delay", "Total Delay", "Output Ops: Succeeded/Total") | ||
|
||
val tooltips = Seq(None, None, Some("Time taken by Streaming scheduler to" + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is the tooltip newly added in this PR or was that already there?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These tooltips were already there.
{formattedBatchTime} | ||
</a> | ||
</td> | ||
<td sorttable_customkey={numRecords.toString}>{numRecords.toString} records</td> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it necessary to add sortable_customkey
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No. Not required. Removed this.
@@ -156,40 +160,232 @@ private[ui] class ActiveBatchTable( | |||
} | |||
} | |||
|
|||
private[ui] class CompletedBatchTable(batches: Seq[BatchUIData], batchInterval: Long) | |||
extends BatchTableBase("completed-batches-table", batchInterval) { | |||
private[ui] class CompletedBatchTableRow( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think CompletedBatchTableRow
isn't required. You can get the data for sorting from BatchUIData
itself
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, i missed this. I followed the format form other pages. We can use BatchUIData
directly.
private def ordering(sortColumn: String, desc: Boolean): Ordering[CompletedBatchTableRow] = { | ||
val ordering: Ordering[CompletedBatchTableRow] = sortColumn match { | ||
case "Batch Time" => Ordering.by(_.batchTime) | ||
case "Records" => Ordering.by (_.numRecords) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: space after by
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done.
} | ||
|
||
def prependBaseUri( | ||
request: HttpServletRequest, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it necessary to add these method here?
@@ -186,4 +191,21 @@ private[streaming] object UIUtils { | |||
</td> | |||
} | |||
} | |||
|
|||
def decodeURLParameter(urlParam: String): String = { | |||
var param = urlParam |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it necessary to add these method here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I mistakenly added these functions instead of using imports. Thanks.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@shahidki31 all review comments fixed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are we not doing pagination support for Active Batches table?
} | ||
} | ||
private val parameterPath = s"$basePath/$subPath/?${parameterOtherTable.mkString("&")}" | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the space between the line can be removed
Nil | ||
<td class="progress-cell"> | ||
{SparkUIUtils.makeProgressBar(started = batch.numActiveOutputOp, | ||
completed = batch.numCompletedOutputOp, failed = batch.numFailedOutputOp, skipped = 0, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the same code is there in the method createOutputOperationProgressBar
? Can't we use that?
} | ||
|
||
protected def getFirstFailureReason(batches: Seq[BatchUIData]): Option[String] = { | ||
batches.flatMap(_.outputOperations.flatMap(_._2.failureReason)).headOption |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it newly added? getFirstFailureReason
method already exist right? can't we reuse it?
} | ||
|
||
protected def getFirstFailureTableCell(batch: BatchUIData): Seq[Node] = { | ||
val firstFailureReason = batch.outputOperations.flatMap(_._2.failureReason).headOption |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same here. Avoid duplication of the methods
Jenkins, test this please |
@shahidki31 all the methods that you mentioned are part of abstract class |
If we create a generic class for both |
At this moment i have not added pagination support for active batches. Now i will add that in this PR only. This will help us preventing code duplication as well. |
796293f
to
84a5e77
Compare
I have added pagination support for completed batch table as well as active batch table in streaming tab. |
s"&$pageNumberFormField=$page" + | ||
s"&$streamingBatchTag.sort=$encodedSortColumn" + | ||
s"&$streamingBatchTag.desc=$desc" + | ||
s"&$pageSizeFormField=$pageSize" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don't we need to add #tableHeaderId
here?
spark/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/AllExecutionsPage.scala
Line 259 in 67b644c
s"#$tableHeaderId" |
{SparkUIUtils.tooltip("Time taken to process all jobs of a batch", "top")}</th> | ||
override def goButtonFormPath: String = { | ||
val encodedSortColumn = URLEncoder.encode(sortColumn, UTF_8.name()) | ||
s"$parameterPath&$streamingBatchTag.sort=$encodedSortColumn&$streamingBatchTag.desc=$desc" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here also it seems we need to add #tableHeaderId
?
spark/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/AllExecutionsPage.scala
Line 268 in 67b644c
s"$parameterPath&$executionTag.sort=$encodedSortColumn&$executionTag.desc=$desc#$tableHeaderId" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will add the tableHeaderId
in the link.
failureReasonForUI, rowspan = 1, includeFirstLineInExpandDetails = false) | ||
}.getOrElse(<td>-</td>) | ||
override def headers: Seq[Node] = { | ||
val completedBatchTableHeaders = Seq("Batch Time", "Records", "Scheduling Delay", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is the headers only for completedBatchTables
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Both the tables are identical i.e. the schema is same for both. So headers will remain same for both. But yeah, completedBatchTableHeaders
is misleading. So, i will update the variable name.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
From the attached screenshot, I can see Total Delay
isn't there in the ActiveBatches
table and Status
field isn't there in the CompletedBatches
table?
val runningBatches = listener.runningBatches.sortBy(_.batchTime.milliseconds).reverse | ||
val waitingBatches = listener.waitingBatches.sortBy(_.batchTime.milliseconds).reverse | ||
val completedBatches = listener.retainedCompletedBatches. | ||
sortBy(_.batchTime.milliseconds).reverse | ||
val activeBatchData = waitingBatches ++ runningBatches |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we simply append the two table data? Could you please check the output of the pages before and after the PR, which contains both runningBatches and waitingBatches?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Previously, running batches and waiting batches were shown in the same table (Active Batches Table) too.
To ensure that property only i appended the data of both running batches and waiting batches.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not sure we can simply append the table. Please refer earlier code. To check, if there is no change, could you please attach screenshot for that (before and after PR)
spark/streaming/src/main/scala/org/apache/spark/streaming/ui/AllBatchesTable.scala
Lines 130 to 135 in 1fc353d
override protected def renderRows: Seq[Node] = { | |
// The "batchTime"s of "waitingBatches" must be greater than "runningBatches"'s, so display | |
// waiting batches before running batches | |
waitingBatches.flatMap(batch => <tr>{waitingBatchRow(batch)}</tr>) ++ | |
runningBatches.flatMap(batch => <tr>{runningBatchRow(batch)}</tr>) | |
} |
We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable. |
What changes were proposed in this pull request?
Adding support for pagination in streaming tab for completed batch table using existing framework for pagination. Refer PR #26215
Why are the changes needed?
If our streaming job is running for long time and number of batches are huge then out of memory error may come while loading the streaming page. Introducing pagination will solve this problem and also improve the loading time of page. Besides jobs,stages,sql and thrift-server page contains pagination. So it also brings consistency.
Does this PR introduce any user-facing change?
Yes.
How was this patch tested?
Manually Tested.
Before
After