diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SparkPlanGraph.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SparkPlanGraph.scala index 274a5a414ffa2..a798fe02700e3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SparkPlanGraph.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SparkPlanGraph.scala @@ -153,7 +153,7 @@ object SparkPlanGraph { * @param name the name of this SparkPlan node * @param metrics metrics that this SparkPlan node will track */ -private[ui] class SparkPlanGraphNode( +class SparkPlanGraphNode( val id: Long, val name: String, val desc: String, @@ -193,7 +193,7 @@ private[ui] class SparkPlanGraphNode( /** * Represent a tree of SparkPlan for WholeStageCodegen. */ -private[ui] class SparkPlanGraphCluster( +class SparkPlanGraphCluster( id: Long, name: String, desc: String, @@ -229,7 +229,7 @@ private[ui] class SparkPlanGraphCluster( * Represent an edge in the SparkPlan tree. `fromId` is the child node id, and `toId` is the parent * node id. */ -private[ui] case class SparkPlanGraphEdge(fromId: Long, toId: Long) { +case class SparkPlanGraphEdge(fromId: Long, toId: Long) { def makeDotEdge: String = s""" $fromId->$toId;\n""" } diff --git a/sql/core/src/main/scala/org/apache/spark/status/api/v1/sql/SqlResource.scala b/sql/core/src/main/scala/org/apache/spark/status/api/v1/sql/SqlResource.scala index 346e07f2bef15..c7599f864dd97 100644 --- a/sql/core/src/main/scala/org/apache/spark/status/api/v1/sql/SqlResource.scala +++ b/sql/core/src/main/scala/org/apache/spark/status/api/v1/sql/SqlResource.scala @@ -21,21 +21,29 @@ import java.util.Date import javax.ws.rs._ import javax.ws.rs.core.MediaType +import scala.util.{Failure, Success, Try} + import org.apache.spark.JobExecutionStatus -import org.apache.spark.sql.execution.ui.{SQLAppStatusStore, SQLExecutionUIData, SQLPlanMetric} +import org.apache.spark.sql.execution.ui.{SparkPlanGraph, SparkPlanGraphCluster, SparkPlanGraphNode, SQLAppStatusStore, SQLExecutionUIData} import org.apache.spark.status.api.v1.{BaseAppResource, NotFoundException} @Produces(Array(MediaType.APPLICATION_JSON)) private[v1] class SqlResource extends BaseAppResource { + val WHOLE_STAGE_CODEGEN = "WholeStageCodegen" + @GET def sqlList( - @DefaultValue("false") @QueryParam("details") details: Boolean, + @DefaultValue("true") @QueryParam("details") details: Boolean, + @DefaultValue("true") @QueryParam("planDescription") planDescription: Boolean, @DefaultValue("0") @QueryParam("offset") offset: Int, @DefaultValue("20") @QueryParam("length") length: Int): Seq[ExecutionData] = { withUI { ui => val sqlStore = new SQLAppStatusStore(ui.store.store) - sqlStore.executionsList(offset, length).map(prepareExecutionData(_, details)) + sqlStore.executionsList(offset, length).map { exec => + val graph = sqlStore.planGraph(exec.executionId) + prepareExecutionData(exec, graph, details, planDescription) + } } } @@ -43,24 +51,25 @@ private[v1] class SqlResource extends BaseAppResource { @Path("{executionId:\\d+}") def sql( @PathParam("executionId") execId: Long, - @DefaultValue("false") @QueryParam("details") details: Boolean): ExecutionData = { + @DefaultValue("true") @QueryParam("details") details: Boolean, + @DefaultValue("true") @QueryParam("planDescription") + planDescription: Boolean): ExecutionData = { withUI { ui => val sqlStore = new SQLAppStatusStore(ui.store.store) + val graph = sqlStore.planGraph(execId) sqlStore .execution(execId) - .map(prepareExecutionData(_, details)) - .getOrElse(throw new NotFoundException("unknown id: " + execId)) + .map(prepareExecutionData(_, graph, details, planDescription)) + .getOrElse(throw new NotFoundException("unknown query execution id: " + execId)) } } - private def printableMetrics( - metrics: Seq[SQLPlanMetric], - metricValues: Map[Long, String]): Seq[Metrics] = { - metrics.map(metric => - Metrics(metric.name, metricValues.get(metric.accumulatorId).getOrElse(""))) - } + private def prepareExecutionData( + exec: SQLExecutionUIData, + graph: SparkPlanGraph, + details: Boolean, + planDescription: Boolean): ExecutionData = { - private def prepareExecutionData(exec: SQLExecutionUIData, details: Boolean): ExecutionData = { var running = Seq[Int]() var completed = Seq[Int]() var failed = Seq[Int]() @@ -84,18 +93,65 @@ private[v1] class SqlResource extends BaseAppResource { } val duration = exec.completionTime.getOrElse(new Date()).getTime - exec.submissionTime - val planDetails = if (details) exec.physicalPlanDescription else "" - val metrics = if (details) printableMetrics(exec.metrics, exec.metricValues) else Seq.empty + val planDetails = if (planDescription) exec.physicalPlanDescription else "" + val nodes = if (details) printableMetrics(graph.allNodes, exec.metricValues) else Seq.empty + val edges = if (details) graph.edges else Seq.empty + new ExecutionData( exec.executionId, status, exec.description, planDetails, - metrics, new Date(exec.submissionTime), duration, running, completed, - failed) + failed, + nodes, + edges) } + + private def printableMetrics(allNodes: Seq[SparkPlanGraphNode], + metricValues: Map[Long, String]): Seq[Node] = { + + def getMetric(metricValues: Map[Long, String], accumulatorId: Long, + metricName: String): Option[Metric] = { + + metricValues.get(accumulatorId).map( mv => { + val metricValue = if (mv.startsWith("\n")) mv.substring(1, mv.length) else mv + Metric(metricName, metricValue) + }) + } + + val nodeIdAndWSCGIdMap = getNodeIdAndWSCGIdMap(allNodes) + val nodes = allNodes.map { node => + val wholeStageCodegenId = nodeIdAndWSCGIdMap.get(node.id).flatten + val metrics = + node.metrics.flatMap(m => getMetric(metricValues, m.accumulatorId, m.name.trim)) + Node(nodeId = node.id, nodeName = node.name.trim, wholeStageCodegenId, metrics) + } + + nodes.sortBy(_.nodeId).reverse + } + + private def getNodeIdAndWSCGIdMap(allNodes: Seq[SparkPlanGraphNode]): Map[Long, Option[Long]] = { + val wscgNodes = allNodes.filter(_.name.trim.startsWith(WHOLE_STAGE_CODEGEN)) + val nodeIdAndWSCGIdMap: Map[Long, Option[Long]] = wscgNodes.flatMap { + _ match { + case x: SparkPlanGraphCluster => x.nodes.map(_.id -> getWholeStageCodegenId(x.name.trim)) + case _ => Seq.empty + } + }.toMap + + nodeIdAndWSCGIdMap + } + + private def getWholeStageCodegenId(wscgNodeName: String): Option[Long] = { + Try(wscgNodeName.substring( + s"$WHOLE_STAGE_CODEGEN (".length, wscgNodeName.length - 1).toLong) match { + case Success(wscgId) => Some(wscgId) + case Failure(t) => None + } + } + } diff --git a/sql/core/src/main/scala/org/apache/spark/status/api/v1/sql/api.scala b/sql/core/src/main/scala/org/apache/spark/status/api/v1/sql/api.scala index 7ace66ffb06e1..0ddf66718bce7 100644 --- a/sql/core/src/main/scala/org/apache/spark/status/api/v1/sql/api.scala +++ b/sql/core/src/main/scala/org/apache/spark/status/api/v1/sql/api.scala @@ -18,16 +18,25 @@ package org.apache.spark.status.api.v1.sql import java.util.Date +import org.apache.spark.sql.execution.ui.SparkPlanGraphEdge + class ExecutionData private[spark] ( val id: Long, val status: String, val description: String, val planDescription: String, - val metrics: Seq[Metrics], val submissionTime: Date, val duration: Long, val runningJobIds: Seq[Int], val successJobIds: Seq[Int], - val failedJobIds: Seq[Int]) + val failedJobIds: Seq[Int], + val nodes: Seq[Node], + val edges: Seq[SparkPlanGraphEdge]) + +case class Node private[spark]( + nodeId: Long, + nodeName: String, + wholeStageCodegenId: Option[Long] = None, + metrics: Seq[Metric]) -case class Metrics private[spark] (metricName: String, metricValue: String) +case class Metric private[spark] (name: String, value: String) diff --git a/sql/core/src/test/scala/org/apache/spark/status/api/v1/sql/SqlResourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/status/api/v1/sql/SqlResourceSuite.scala new file mode 100644 index 0000000000000..43cca246cc47c --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/status/api/v1/sql/SqlResourceSuite.scala @@ -0,0 +1,205 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.status.api.v1.sql + +import java.util.Date + +import scala.collection.mutable.ArrayBuffer + +import org.scalatest.PrivateMethodTester + +import org.apache.spark.{JobExecutionStatus, SparkFunSuite} +import org.apache.spark.sql.execution.ui.{SparkPlanGraph, SparkPlanGraphCluster, SparkPlanGraphEdge, SparkPlanGraphNode, SQLExecutionUIData, SQLPlanMetric} + +object SqlResourceSuite { + + val SCAN_TEXT = "Scan text" + val FILTER = "Filter" + val WHOLE_STAGE_CODEGEN_1 = "WholeStageCodegen (1)" + val DURATION = "duration" + val NUMBER_OF_OUTPUT_ROWS = "number of output rows" + val METADATA_TIME = "metadata time" + val NUMBER_OF_FILES_READ = "number of files read" + val SIZE_OF_FILES_READ = "size of files read" + val PLAN_DESCRIPTION = "== Physical Plan ==\nCollectLimit (3)\n+- * Filter (2)\n +- Scan text..." + val DESCRIPTION = "csv at MyDataFrames.scala:57" + + val nodeIdAndWSCGIdMap: Map[Long, Option[Long]] = Map(1L -> Some(1L)) + + val filterNode = new SparkPlanGraphNode(1, FILTER, "", + metrics = Seq(SQLPlanMetric(NUMBER_OF_OUTPUT_ROWS, 1, ""))) + val nodes: Seq[SparkPlanGraphNode] = Seq( + new SparkPlanGraphCluster(0, WHOLE_STAGE_CODEGEN_1, "", + nodes = ArrayBuffer(filterNode), + metrics = Seq(SQLPlanMetric(DURATION, 0, ""))), + new SparkPlanGraphNode(2, SCAN_TEXT, "", + metrics = Seq( + SQLPlanMetric(METADATA_TIME, 2, ""), + SQLPlanMetric(NUMBER_OF_FILES_READ, 3, ""), + SQLPlanMetric(NUMBER_OF_OUTPUT_ROWS, 4, ""), + SQLPlanMetric(SIZE_OF_FILES_READ, 5, "")))) + + val nodesWhenCodegenIsOff: Seq[SparkPlanGraphNode] = + SparkPlanGraph(nodes, edges).allNodes.filterNot(_.name == WHOLE_STAGE_CODEGEN_1) + + val edges: Seq[SparkPlanGraphEdge] = + Seq(SparkPlanGraphEdge(3, 2)) + + val metrics: Seq[SQLPlanMetric] = { + Seq(SQLPlanMetric(DURATION, 0, ""), + SQLPlanMetric(NUMBER_OF_OUTPUT_ROWS, 1, ""), + SQLPlanMetric(METADATA_TIME, 2, ""), + SQLPlanMetric(NUMBER_OF_FILES_READ, 3, ""), + SQLPlanMetric(NUMBER_OF_OUTPUT_ROWS, 4, ""), + SQLPlanMetric(SIZE_OF_FILES_READ, 5, "")) + } + + val sqlExecutionUIData: SQLExecutionUIData = { + def getMetricValues() = { + Map[Long, String]( + 0L -> "0 ms", + 1L -> "1", + 2L -> "2 ms", + 3L -> "1", + 4L -> "1", + 5L -> "330.0 B" + ) + } + + new SQLExecutionUIData( + executionId = 0, + description = DESCRIPTION, + details = "", + physicalPlanDescription = PLAN_DESCRIPTION, + metrics = metrics, + submissionTime = 1586768888233L, + completionTime = Some(new Date(1586768888999L)), + jobs = Map[Int, JobExecutionStatus]( + 0 -> JobExecutionStatus.SUCCEEDED, + 1 -> JobExecutionStatus.SUCCEEDED), + stages = Set[Int](), + metricValues = getMetricValues() + ) + } + + private def getNodes(): Seq[Node] = { + val node = Node(0, WHOLE_STAGE_CODEGEN_1, + wholeStageCodegenId = None, metrics = Seq(Metric(DURATION, "0 ms"))) + val node2 = Node(1, FILTER, + wholeStageCodegenId = Some(1), metrics = Seq(Metric(NUMBER_OF_OUTPUT_ROWS, "1"))) + val node3 = Node(2, SCAN_TEXT, wholeStageCodegenId = None, + metrics = Seq(Metric(METADATA_TIME, "2 ms"), + Metric(NUMBER_OF_FILES_READ, "1"), + Metric(NUMBER_OF_OUTPUT_ROWS, "1"), + Metric(SIZE_OF_FILES_READ, "330.0 B"))) + + // reverse order because of supporting execution order by aligning with Spark-UI + Seq(node3, node2, node) + } + + private def getExpectedNodesWhenWholeStageCodegenIsOff(): Seq[Node] = { + val node = Node(1, FILTER, metrics = Seq(Metric(NUMBER_OF_OUTPUT_ROWS, "1"))) + val node2 = Node(2, SCAN_TEXT, + metrics = Seq(Metric(METADATA_TIME, "2 ms"), + Metric(NUMBER_OF_FILES_READ, "1"), + Metric(NUMBER_OF_OUTPUT_ROWS, "1"), + Metric(SIZE_OF_FILES_READ, "330.0 B"))) + + // reverse order because of supporting execution order by aligning with Spark-UI + Seq(node2, node) + } + + private def verifyExpectedExecutionData(executionData: ExecutionData, + nodes: Seq[Node], + edges: Seq[SparkPlanGraphEdge], + planDescription: String): Unit = { + + assert(executionData.id == 0) + assert(executionData.status == "COMPLETED") + assert(executionData.description == DESCRIPTION) + assert(executionData.planDescription == planDescription) + assert(executionData.submissionTime == new Date(1586768888233L)) + assert(executionData.duration == 766L) + assert(executionData.successJobIds == Seq[Int](0, 1)) + assert(executionData.runningJobIds == Seq[Int]()) + assert(executionData.failedJobIds == Seq.empty) + assert(executionData.nodes == nodes) + assert(executionData.edges == edges) + } + +} + +/** + * Sql Resource Public API Unit Tests. + */ +class SqlResourceSuite extends SparkFunSuite with PrivateMethodTester { + + import SqlResourceSuite._ + + val sqlResource = new SqlResource() + val prepareExecutionData = PrivateMethod[ExecutionData]('prepareExecutionData) + + test("Prepare ExecutionData when details = false and planDescription = false") { + val executionData = + sqlResource invokePrivate prepareExecutionData( + sqlExecutionUIData, SparkPlanGraph(Seq.empty, Seq.empty), false, false) + verifyExpectedExecutionData(executionData, edges = Seq.empty, + nodes = Seq.empty, planDescription = "") + } + + test("Prepare ExecutionData when details = true and planDescription = false") { + val executionData = + sqlResource invokePrivate prepareExecutionData( + sqlExecutionUIData, SparkPlanGraph(nodes, edges), true, false) + verifyExpectedExecutionData( + executionData, + nodes = getNodes(), + edges, + planDescription = "") + } + + test("Prepare ExecutionData when details = true and planDescription = true") { + val executionData = + sqlResource invokePrivate prepareExecutionData( + sqlExecutionUIData, SparkPlanGraph(nodes, edges), true, true) + verifyExpectedExecutionData( + executionData, + nodes = getNodes(), + edges = edges, + planDescription = PLAN_DESCRIPTION) + } + + test("Prepare ExecutionData when details = true and planDescription = false and WSCG = off") { + val executionData = + sqlResource invokePrivate prepareExecutionData( + sqlExecutionUIData, SparkPlanGraph(nodesWhenCodegenIsOff, edges), true, false) + verifyExpectedExecutionData( + executionData, + nodes = getExpectedNodesWhenWholeStageCodegenIsOff(), + edges = edges, + planDescription = "") + } + + test("Parse wholeStageCodegenId from nodeName") { + val getWholeStageCodegenId = PrivateMethod[Option[Long]]('getWholeStageCodegenId) + val wholeStageCodegenId = + sqlResource invokePrivate getWholeStageCodegenId(WHOLE_STAGE_CODEGEN_1) + assert(wholeStageCodegenId == Some(1)) + } + +}