Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-31440][SQL] Improve SQL Rest API #28208

Closed
wants to merge 14 commits into from
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ object SparkPlanGraph {
* @param name the name of this SparkPlan node
* @param metrics metrics that this SparkPlan node will track
*/
private[ui] class SparkPlanGraphNode(
class SparkPlanGraphNode(
val id: Long,
val name: String,
val desc: String,
Expand Down Expand Up @@ -193,7 +193,7 @@ private[ui] class SparkPlanGraphNode(
/**
* Represent a tree of SparkPlan for WholeStageCodegen.
*/
private[ui] class SparkPlanGraphCluster(
class SparkPlanGraphCluster(
id: Long,
name: String,
desc: String,
Expand Down Expand Up @@ -229,7 +229,7 @@ private[ui] class SparkPlanGraphCluster(
* Represent an edge in the SparkPlan tree. `fromId` is the child node id, and `toId` is the parent
* node id.
*/
private[ui] case class SparkPlanGraphEdge(fromId: Long, toId: Long) {
case class SparkPlanGraphEdge(fromId: Long, toId: Long) {

def makeDotEdge: String = s""" $fromId->$toId;\n"""
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,46 +21,55 @@ import java.util.Date
import javax.ws.rs._
import javax.ws.rs.core.MediaType

import scala.util.{Failure, Success, Try}

import org.apache.spark.JobExecutionStatus
import org.apache.spark.sql.execution.ui.{SQLAppStatusStore, SQLExecutionUIData, SQLPlanMetric}
import org.apache.spark.sql.execution.ui.{SparkPlanGraph, SparkPlanGraphCluster, SparkPlanGraphNode, SQLAppStatusStore, SQLExecutionUIData}
import org.apache.spark.status.api.v1.{BaseAppResource, NotFoundException}

@Produces(Array(MediaType.APPLICATION_JSON))
private[v1] class SqlResource extends BaseAppResource {

val WHOLE_STAGE_CODEGEN = "WholeStageCodegen"

@GET
def sqlList(
@DefaultValue("false") @QueryParam("details") details: Boolean,
@DefaultValue("true") @QueryParam("details") details: Boolean,
@DefaultValue("true") @QueryParam("planDescription") planDescription: Boolean,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@erenavsarogullari one last comment: why do we have extra option "planDescription" here. It seems reasonable to be covered by the option "details"

Copy link
Member Author

@erenavsarogullari erenavsarogullari May 19, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gengliangwang Thanks for the review.

Please find my comments as follows:
1- planDescription exposes Physical Plan which covers dataset column-names. Column Names can be thought as customer sensitive data so with this option, end-users can disable in the light of their use-cases when they still access metrics.
2- For complex queries, planDescription can be big string and create network overhead. In this case, it can be disabled where it is not required and metrics are required(e.g: time-series monitoring - metrics need to be persisted & exposed but Physical Plan does not) (if makes sense)

@DefaultValue("0") @QueryParam("offset") offset: Int,
@DefaultValue("20") @QueryParam("length") length: Int): Seq[ExecutionData] = {
withUI { ui =>
val sqlStore = new SQLAppStatusStore(ui.store.store)
sqlStore.executionsList(offset, length).map(prepareExecutionData(_, details))
sqlStore.executionsList(offset, length).map { exec =>
val graph = sqlStore.planGraph(exec.executionId)
prepareExecutionData(exec, graph, details, planDescription)
}
}
}

@GET
@Path("{executionId:\\d+}")
def sql(
@PathParam("executionId") execId: Long,
@DefaultValue("false") @QueryParam("details") details: Boolean): ExecutionData = {
@DefaultValue("true") @QueryParam("details") details: Boolean,
@DefaultValue("true") @QueryParam("planDescription")
planDescription: Boolean): ExecutionData = {
withUI { ui =>
val sqlStore = new SQLAppStatusStore(ui.store.store)
val graph = sqlStore.planGraph(execId)
sqlStore
.execution(execId)
.map(prepareExecutionData(_, details))
.getOrElse(throw new NotFoundException("unknown id: " + execId))
.map(prepareExecutionData(_, graph, details, planDescription))
.getOrElse(throw new NotFoundException("unknown query execution id: " + execId))
}
}

private def printableMetrics(
metrics: Seq[SQLPlanMetric],
metricValues: Map[Long, String]): Seq[Metrics] = {
metrics.map(metric =>
Metrics(metric.name, metricValues.get(metric.accumulatorId).getOrElse("")))
}
private def prepareExecutionData(
exec: SQLExecutionUIData,
graph: SparkPlanGraph,
details: Boolean,
planDescription: Boolean): ExecutionData = {

private def prepareExecutionData(exec: SQLExecutionUIData, details: Boolean): ExecutionData = {
var running = Seq[Int]()
var completed = Seq[Int]()
var failed = Seq[Int]()
Expand All @@ -84,18 +93,65 @@ private[v1] class SqlResource extends BaseAppResource {
}

val duration = exec.completionTime.getOrElse(new Date()).getTime - exec.submissionTime
val planDetails = if (details) exec.physicalPlanDescription else ""
val metrics = if (details) printableMetrics(exec.metrics, exec.metricValues) else Seq.empty
val planDetails = if (planDescription) exec.physicalPlanDescription else ""
val nodes = if (details) printableMetrics(graph.allNodes, exec.metricValues) else Seq.empty
val edges = if (details) graph.edges else Seq.empty

new ExecutionData(
exec.executionId,
status,
exec.description,
planDetails,
metrics,
new Date(exec.submissionTime),
duration,
running,
completed,
failed)
failed,
nodes,
edges)
}

private def printableMetrics(allNodes: Seq[SparkPlanGraphNode],
metricValues: Map[Long, String]): Seq[Node] = {

def getMetric(metricValues: Map[Long, String], accumulatorId: Long,
metricName: String): Option[Metric] = {

metricValues.get(accumulatorId).map( mv => {
val metricValue = if (mv.startsWith("\n")) mv.substring(1, mv.length) else mv
Metric(metricName, metricValue)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually after #28037, part of the metrics name is in the value...
I can see the json output like:

{
"nodeId": 9,
"nodeName": "WholeStageCodegen (1)",
"metrics": [
{
"name": "duration",
"value": "total (min, med, max (stageId: taskId))\n4.3 s (269 ms, 270 ms, 272 ms (stage 3.0: task 16))"
}
]
}

But this is not strongly related to this PR. We can just fix it in another PR.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, sounds good.

})
}

val nodeIdAndWSCGIdMap = getNodeIdAndWSCGIdMap(allNodes)
val nodes = allNodes.map { node =>
val wholeStageCodegenId = nodeIdAndWSCGIdMap.get(node.id).flatten
val metrics =
node.metrics.flatMap(m => getMetric(metricValues, m.accumulatorId, m.name.trim))
Node(nodeId = node.id, nodeName = node.name.trim, wholeStageCodegenId, metrics)
}

nodes.sortBy(_.nodeId).reverse
}

private def getNodeIdAndWSCGIdMap(allNodes: Seq[SparkPlanGraphNode]): Map[Long, Option[Long]] = {
val wscgNodes = allNodes.filter(_.name.trim.startsWith(WHOLE_STAGE_CODEGEN))
val nodeIdAndWSCGIdMap: Map[Long, Option[Long]] = wscgNodes.flatMap {
_ match {
case x: SparkPlanGraphCluster => x.nodes.map(_.id -> getWholeStageCodegenId(x.name.trim))
case _ => Seq.empty
}
}.toMap

nodeIdAndWSCGIdMap
}

private def getWholeStageCodegenId(wscgNodeName: String): Option[Long] = {
Try(wscgNodeName.substring(
s"$WHOLE_STAGE_CODEGEN (".length, wscgNodeName.length - 1).toLong) match {
case Success(wscgId) => Some(wscgId)
case Failure(t) => None
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,25 @@ package org.apache.spark.status.api.v1.sql

import java.util.Date

import org.apache.spark.sql.execution.ui.SparkPlanGraphEdge

class ExecutionData private[spark] (
val id: Long,
val status: String,
val description: String,
val planDescription: String,
val metrics: Seq[Metrics],
val submissionTime: Date,
val duration: Long,
val runningJobIds: Seq[Int],
val successJobIds: Seq[Int],
val failedJobIds: Seq[Int])
val failedJobIds: Seq[Int],
val nodes: Seq[Node],
val edges: Seq[SparkPlanGraphEdge])

case class Node private[spark](
nodeId: Long,
nodeName: String,
wholeStageCodegenId: Option[Long] = None,
metrics: Seq[Metric])

case class Metrics private[spark] (metricName: String, metricValue: String)
case class Metric private[spark] (name: String, value: String)
Loading