Skip to content

Commit

Permalink
[SPARK-31642][FOLLOWUP] Fix Sorting for duration column and make Stat…
Browse files Browse the repository at this point in the history
…us column sortable

### What changes were proposed in this pull request?
In #28485 pagination support for tables of Structured Streaming Tab was added.
It missed 2 things:
* For sorting duration column, `String` was used which sometimes gives wrong results(consider `"3 ms"` and `"12 ms"`). Now we first sort the duration column and then convert it to readable String
* Status column was not made sortable.

### Why are the changes needed?
To fix the wrong result for sorting and making Status column sortable.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
After changes:
<img width="1677" alt="Screenshot 2020-06-08 at 2 18 48 PM" src="https://user-images.githubusercontent.com/15366835/84010992-153fa280-a993-11ea-9846-bf176f2ec5d7.png">

Closes #28752 from iRakson/ssTests.

Authored-by: iRakson <raksonrakesh@gmail.com>
Signed-off-by: Sean Owen <srowen@gmail.com>
  • Loading branch information
iRakson authored and srowen committed Jun 14, 2020
1 parent 54e702c commit f5f6eee
Showing 1 changed file with 7 additions and 6 deletions.
Expand Up @@ -152,7 +152,7 @@ class StreamingQueryPagedTable(
val headerAndCss: Seq[(String, Boolean, Option[String])] = {
Seq(
("Name", true, None),
("Status", false, None),
("Status", true, None),
("ID", true, None),
("Run ID", true, None),
("Start Time", true, None),
Expand Down Expand Up @@ -197,7 +197,7 @@ class StreamingQueryPagedTable(
<td>{streamingQuery.id}</td>
<td><a href={statisticsLink}>{streamingQuery.runId}</a></td>
<td>{SparkUIUtils.formatDate(streamingQuery.startTimestamp)}</td>
<td>{query.duration}</td>
<td>{SparkUIUtils.formatDurationVerbose(query.duration)}</td>
<td>{withNoProgress(streamingQuery, {query.avgInput.formatted("%.2f")}, "NaN")}</td>
<td>{withNoProgress(streamingQuery, {query.avgProcess.formatted("%.2f")}, "NaN")}</td>
<td>{withNoProgress(streamingQuery, {streamingQuery.lastProgress.batchId}, "NaN")}</td>
Expand All @@ -207,7 +207,7 @@ class StreamingQueryPagedTable(
}

case class StructuredStreamingRow(
duration: String,
duration: Long,
avgInput: Double,
avgProcess: Double,
streamingUIData: StreamingQueryUIData)
Expand All @@ -224,12 +224,12 @@ class StreamingQueryDataSource(uiData: Seq[StreamingQueryUIData], sortColumn: St

private def streamingRow(query: StreamingQueryUIData): StructuredStreamingRow = {
val duration = if (isActive) {
SparkUIUtils.formatDurationVerbose(System.currentTimeMillis() - query.startTimestamp)
System.currentTimeMillis() - query.startTimestamp
} else {
withNoProgress(query, {
val endTimeMs = query.lastProgress.timestamp
SparkUIUtils.formatDurationVerbose(parseProgressTimestamp(endTimeMs) - query.startTimestamp)
}, "-")
parseProgressTimestamp(endTimeMs) - query.startTimestamp
}, 0)
}

val avgInput = (query.recentProgress.map(p => withNumberInvalid(p.inputRowsPerSecond)).sum /
Expand All @@ -244,6 +244,7 @@ class StreamingQueryDataSource(uiData: Seq[StreamingQueryUIData], sortColumn: St
private def ordering(sortColumn: String, desc: Boolean): Ordering[StructuredStreamingRow] = {
val ordering: Ordering[StructuredStreamingRow] = sortColumn match {
case "Name" => Ordering.by(q => UIUtils.getQueryName(q.streamingUIData))
case "Status" => Ordering.by(q => UIUtils.getQueryStatus(q.streamingUIData))
case "ID" => Ordering.by(_.streamingUIData.id)
case "Run ID" => Ordering.by(_.streamingUIData.runId)
case "Start Time" => Ordering.by(_.streamingUIData.startTimestamp)
Expand Down

0 comments on commit f5f6eee

Please sign in to comment.