Skip to content

Commit

Permalink
[SPARK-31440][SQL] Improve SQL Rest API
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?
SQL Rest API exposes query execution metrics as Public API. This PR aims to apply following improvements on SQL Rest API by aligning Spark-UI.

**Proposed Improvements:**
1- Support Physical Operations and group metrics per physical operation by aligning Spark UI.
2- Support `wholeStageCodegenId` for Physical Operations
3- `nodeId` can be useful for grouping metrics and sorting physical operations (according to execution order) to differentiate same operators (if used multiple times during the same query execution) and their metrics.
4- Filter `empty` metrics by aligning with Spark UI - SQL Tab. Currently, Spark UI does not show empty metrics.
5- Remove line breakers(`\n`) from `metricValue`.
6- `planDescription` can be `optional` Http parameter to avoid network cost where there is specially complex jobs creating big-plans.
7- `metrics` attribute needs to be exposed at the bottom order as `nodes`. Specially, this can be useful for the user where `nodes` array size is high.
8- `edges` attribute is being exposed to show relationship between `nodes`.
9- Reverse order on `metricDetails` aims to match with Spark UI by supporting Physical Operators' execution order.

### Why are the changes needed?
Proposed improvements provides more useful (e.g: physical operations and metrics correlation, grouping) and clear (e.g: filtering blank metrics, removing line breakers) result for the end-user.

### Does this PR introduce any user-facing change?
Yes. Please find both current and improved versions of the results as attached for following SQL Rest Endpoint:
```
curl -X GET http://localhost:4040/api/v1/applications/$appId/sql/$executionId?details=true
```
**Current version:**
https://issues.apache.org/jira/secure/attachment/12999821/current_version.json

**Improved version:**
https://issues.apache.org/jira/secure/attachment/13000621/improved_version.json

### Backward Compatibility
SQL Rest API will be started to expose with `Spark 3.0` and `3.0.0-preview2` (released on 12/23/19) does not cover this API so if PR can catch 3.0 release, this will not have any backward compatibility issue.

### How was this patch tested?
1. New Unit tests are added.
2. Also, patch has been tested manually through both **Spark Core** and **History Server** Rest APIs.

Closes #28208 from erenavsarogullari/SPARK-31440.

Authored-by: Eren Avsarogullari <eren.avsarogullari@gmail.com>
Signed-off-by: Gengliang Wang <gengliang.wang@databricks.com>
(cherry picked from commit ab4cf49)
Signed-off-by: Gengliang Wang <gengliang.wang@databricks.com>
  • Loading branch information
Eren Avsarogullari authored and gengliangwang committed May 19, 2020
1 parent 18925c3 commit d668d67
Show file tree
Hide file tree
Showing 4 changed files with 293 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ object SparkPlanGraph {
* @param name the name of this SparkPlan node
* @param metrics metrics that this SparkPlan node will track
*/
private[ui] class SparkPlanGraphNode(
class SparkPlanGraphNode(
val id: Long,
val name: String,
val desc: String,
Expand Down Expand Up @@ -193,7 +193,7 @@ private[ui] class SparkPlanGraphNode(
/**
* Represent a tree of SparkPlan for WholeStageCodegen.
*/
private[ui] class SparkPlanGraphCluster(
class SparkPlanGraphCluster(
id: Long,
name: String,
desc: String,
Expand Down Expand Up @@ -229,7 +229,7 @@ private[ui] class SparkPlanGraphCluster(
* Represent an edge in the SparkPlan tree. `fromId` is the child node id, and `toId` is the parent
* node id.
*/
private[ui] case class SparkPlanGraphEdge(fromId: Long, toId: Long) {
case class SparkPlanGraphEdge(fromId: Long, toId: Long) {

def makeDotEdge: String = s""" $fromId->$toId;\n"""
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,46 +21,55 @@ import java.util.Date
import javax.ws.rs._
import javax.ws.rs.core.MediaType

import scala.util.{Failure, Success, Try}

import org.apache.spark.JobExecutionStatus
import org.apache.spark.sql.execution.ui.{SQLAppStatusStore, SQLExecutionUIData, SQLPlanMetric}
import org.apache.spark.sql.execution.ui.{SparkPlanGraph, SparkPlanGraphCluster, SparkPlanGraphNode, SQLAppStatusStore, SQLExecutionUIData}
import org.apache.spark.status.api.v1.{BaseAppResource, NotFoundException}

@Produces(Array(MediaType.APPLICATION_JSON))
private[v1] class SqlResource extends BaseAppResource {

val WHOLE_STAGE_CODEGEN = "WholeStageCodegen"

@GET
def sqlList(
@DefaultValue("false") @QueryParam("details") details: Boolean,
@DefaultValue("true") @QueryParam("details") details: Boolean,
@DefaultValue("true") @QueryParam("planDescription") planDescription: Boolean,
@DefaultValue("0") @QueryParam("offset") offset: Int,
@DefaultValue("20") @QueryParam("length") length: Int): Seq[ExecutionData] = {
withUI { ui =>
val sqlStore = new SQLAppStatusStore(ui.store.store)
sqlStore.executionsList(offset, length).map(prepareExecutionData(_, details))
sqlStore.executionsList(offset, length).map { exec =>
val graph = sqlStore.planGraph(exec.executionId)
prepareExecutionData(exec, graph, details, planDescription)
}
}
}

@GET
@Path("{executionId:\\d+}")
def sql(
@PathParam("executionId") execId: Long,
@DefaultValue("false") @QueryParam("details") details: Boolean): ExecutionData = {
@DefaultValue("true") @QueryParam("details") details: Boolean,
@DefaultValue("true") @QueryParam("planDescription")
planDescription: Boolean): ExecutionData = {
withUI { ui =>
val sqlStore = new SQLAppStatusStore(ui.store.store)
val graph = sqlStore.planGraph(execId)
sqlStore
.execution(execId)
.map(prepareExecutionData(_, details))
.getOrElse(throw new NotFoundException("unknown id: " + execId))
.map(prepareExecutionData(_, graph, details, planDescription))
.getOrElse(throw new NotFoundException("unknown query execution id: " + execId))
}
}

private def printableMetrics(
metrics: Seq[SQLPlanMetric],
metricValues: Map[Long, String]): Seq[Metrics] = {
metrics.map(metric =>
Metrics(metric.name, metricValues.get(metric.accumulatorId).getOrElse("")))
}
private def prepareExecutionData(
exec: SQLExecutionUIData,
graph: SparkPlanGraph,
details: Boolean,
planDescription: Boolean): ExecutionData = {

private def prepareExecutionData(exec: SQLExecutionUIData, details: Boolean): ExecutionData = {
var running = Seq[Int]()
var completed = Seq[Int]()
var failed = Seq[Int]()
Expand All @@ -84,18 +93,65 @@ private[v1] class SqlResource extends BaseAppResource {
}

val duration = exec.completionTime.getOrElse(new Date()).getTime - exec.submissionTime
val planDetails = if (details) exec.physicalPlanDescription else ""
val metrics = if (details) printableMetrics(exec.metrics, exec.metricValues) else Seq.empty
val planDetails = if (planDescription) exec.physicalPlanDescription else ""
val nodes = if (details) printableMetrics(graph.allNodes, exec.metricValues) else Seq.empty
val edges = if (details) graph.edges else Seq.empty

new ExecutionData(
exec.executionId,
status,
exec.description,
planDetails,
metrics,
new Date(exec.submissionTime),
duration,
running,
completed,
failed)
failed,
nodes,
edges)
}

private def printableMetrics(allNodes: Seq[SparkPlanGraphNode],
metricValues: Map[Long, String]): Seq[Node] = {

def getMetric(metricValues: Map[Long, String], accumulatorId: Long,
metricName: String): Option[Metric] = {

metricValues.get(accumulatorId).map( mv => {
val metricValue = if (mv.startsWith("\n")) mv.substring(1, mv.length) else mv
Metric(metricName, metricValue)
})
}

val nodeIdAndWSCGIdMap = getNodeIdAndWSCGIdMap(allNodes)
val nodes = allNodes.map { node =>
val wholeStageCodegenId = nodeIdAndWSCGIdMap.get(node.id).flatten
val metrics =
node.metrics.flatMap(m => getMetric(metricValues, m.accumulatorId, m.name.trim))
Node(nodeId = node.id, nodeName = node.name.trim, wholeStageCodegenId, metrics)
}

nodes.sortBy(_.nodeId).reverse
}

private def getNodeIdAndWSCGIdMap(allNodes: Seq[SparkPlanGraphNode]): Map[Long, Option[Long]] = {
val wscgNodes = allNodes.filter(_.name.trim.startsWith(WHOLE_STAGE_CODEGEN))
val nodeIdAndWSCGIdMap: Map[Long, Option[Long]] = wscgNodes.flatMap {
_ match {
case x: SparkPlanGraphCluster => x.nodes.map(_.id -> getWholeStageCodegenId(x.name.trim))
case _ => Seq.empty
}
}.toMap

nodeIdAndWSCGIdMap
}

private def getWholeStageCodegenId(wscgNodeName: String): Option[Long] = {
Try(wscgNodeName.substring(
s"$WHOLE_STAGE_CODEGEN (".length, wscgNodeName.length - 1).toLong) match {
case Success(wscgId) => Some(wscgId)
case Failure(t) => None
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,25 @@ package org.apache.spark.status.api.v1.sql

import java.util.Date

import org.apache.spark.sql.execution.ui.SparkPlanGraphEdge

class ExecutionData private[spark] (
val id: Long,
val status: String,
val description: String,
val planDescription: String,
val metrics: Seq[Metrics],
val submissionTime: Date,
val duration: Long,
val runningJobIds: Seq[Int],
val successJobIds: Seq[Int],
val failedJobIds: Seq[Int])
val failedJobIds: Seq[Int],
val nodes: Seq[Node],
val edges: Seq[SparkPlanGraphEdge])

case class Node private[spark](
nodeId: Long,
nodeName: String,
wholeStageCodegenId: Option[Long] = None,
metrics: Seq[Metric])

case class Metrics private[spark] (metricName: String, metricValue: String)
case class Metric private[spark] (name: String, value: String)
Loading

0 comments on commit d668d67

Please sign in to comment.