Skip to content

Commit

Permalink
Add WholeStageCodegen Grouping Support
Browse files Browse the repository at this point in the history
  • Loading branch information
Eren Avsarogullari committed Apr 20, 2020
1 parent efa10a6 commit f41435e
Show file tree
Hide file tree
Showing 4 changed files with 72 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ private[ui] class SparkPlanGraphNode(
/**
* Represent a tree of SparkPlan for WholeStageCodegen.
*/
private[ui] class SparkPlanGraphCluster(
class SparkPlanGraphCluster(
id: Long,
name: String,
desc: String,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,17 @@ import java.util.Date
import javax.ws.rs._
import javax.ws.rs.core.MediaType

import scala.util.{Failure, Success, Try}

import org.apache.spark.JobExecutionStatus
import org.apache.spark.sql.execution.ui.{SQLAppStatusStore, SQLExecutionUIData, SQLPlanMetric}
import org.apache.spark.sql.execution.ui.{SparkPlanGraph, SparkPlanGraphCluster, SQLAppStatusStore, SQLExecutionUIData, SQLPlanMetric}
import org.apache.spark.status.api.v1.{BaseAppResource, NotFoundException}

@Produces(Array(MediaType.APPLICATION_JSON))
private[v1] class SqlResource extends BaseAppResource {

val WHOLE_STAGE_CODEGEN = "WholeStageCodegen"

@GET
def sqlList(
@DefaultValue("false") @QueryParam("details") details: Boolean,
Expand All @@ -50,16 +54,19 @@ private[v1] class SqlResource extends BaseAppResource {
planDescription: Boolean): ExecutionData = {
withUI { ui =>
val sqlStore = new SQLAppStatusStore(ui.store.store)
val graph = sqlStore.planGraph(execId)
val nodeIdAndWSCGIdMap: Map[Long, Option[Long]] = getNodeIdAndWSCGIdMap(graph)
sqlStore
.execution(execId)
.map(prepareExecutionData(_, details, planDescription))
.map(prepareExecutionData(_, nodeIdAndWSCGIdMap, details, planDescription))
.getOrElse(throw new NotFoundException("unknown execution id: " + execId))
}
}

private def printableMetrics(
sqlPlanMetrics: Seq[SQLPlanMetric],
metricValues: Map[Long, String]): Seq[MetricDetails] = {
metricValues: Map[Long, String],
nodeIdAndWSCGIdMap: Map[Long, Option[Long]]): Seq[MetricDetails] = {

def getMetric(metricValues: Map[Long, String], accumulatorId: Long,
metricName: String): Option[Metric] = {
Expand All @@ -78,12 +85,15 @@ private[v1] class SqlResource extends BaseAppResource {

val metricDetails = metrics.map {
case ((nodeId: Long, nodeName: String), metrics: Seq[Metric]) =>
MetricDetails(nodeId = nodeId, nodeName = nodeName.trim, metrics = metrics) }.toSeq
val wholeStageCodegenId = nodeIdAndWSCGIdMap.get(nodeId).flatten
MetricDetails(nodeId = nodeId, nodeName = nodeName.trim, wholeStageCodegenId, metrics)
}.toSeq

metricDetails.sortBy(_.nodeId).reverse
}

private def prepareExecutionData(exec: SQLExecutionUIData,
nodeIdAndWSCGIdMap: Map[Long, Option[Long]] = Map.empty,
details: Boolean,
planDescription: Boolean): ExecutionData = {
var running = Seq[Int]()
Expand Down Expand Up @@ -111,7 +121,7 @@ private[v1] class SqlResource extends BaseAppResource {
val duration = exec.completionTime.getOrElse(new Date()).getTime - exec.submissionTime
val planDetails = if (details && planDescription) exec.physicalPlanDescription else ""
val metrics =
if (details) printableMetrics(exec.metrics, exec.metricValues)
if (details) printableMetrics(exec.metrics, exec.metricValues, nodeIdAndWSCGIdMap)
else Seq.empty
new ExecutionData(
exec.executionId,
Expand All @@ -125,4 +135,24 @@ private[v1] class SqlResource extends BaseAppResource {
failed,
metrics)
}

private def getNodeIdAndWSCGIdMap(graph: SparkPlanGraph): Map[Long, Option[Long]] = {
val wscgNodes = graph.allNodes.filter(_.name.trim.startsWith(WHOLE_STAGE_CODEGEN))
val nodeIdAndWSCGIdMap: Map[Long, Option[Long]] = wscgNodes.flatMap { _ match {
case x: SparkPlanGraphCluster => x.nodes.map(_.id -> getWholeStageCodegenId(x.name.trim))
case _ => Seq.empty
}
}.toMap

nodeIdAndWSCGIdMap
}

private def getWholeStageCodegenId(wscgNodeName: String): Option[Long] = {
Try(wscgNodeName.substring(
s"$WHOLE_STAGE_CODEGEN (".length, wscgNodeName.length - 1).toLong) match {
case Success(wscgId) => Some(wscgId)
case Failure(t) => None
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ class ExecutionData private[spark] (

case class MetricDetails private[spark] (nodeId: Long,
nodeName: String,
wholeStageCodegenId: Option[Long] = None,
metrics: Seq[Metric])

case class Metric private[spark] (name: String, value: String)
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,21 @@ object SqlResourceSuite {
}

private def getExpectedMetricDetails(): Seq[MetricDetails] = {
val metricDetails =
MetricDetails(1, WHOLE_STAGE_CODEGEN_1, wholeStageCodegenId = None, metrics = Seq(Metric(DURATION, "0 ms")))
val metricDetails2 = MetricDetails(2, FILTER,
wholeStageCodegenId = Some(1), metrics = Seq(Metric(NUMBER_OF_OUTPUT_ROWS, "1")))
val metricDetails3 = MetricDetails(3, SCAN_TEXT, wholeStageCodegenId = Some(1),
metrics = Seq(Metric(METADATA_TIME, "2 ms"),
Metric(NUMBER_OF_FILES_READ, "1"),
Metric(NUMBER_OF_OUTPUT_ROWS, "1"),
Metric(SIZE_OF_FILES_READ, "330.0 B")))

// reverse order because of supporting execution order by aligning with Spark-UI
Seq(metricDetails3, metricDetails2, metricDetails)
}

private def getExpectedMetricDetailsWhenWholeStageCodegenIsOff(): Seq[MetricDetails] = {
val metricDetails =
MetricDetails(1, WHOLE_STAGE_CODEGEN_1, metrics = Seq(Metric(DURATION, "0 ms")))
val metricDetails2 = MetricDetails(2, FILTER, metrics = Seq(Metric(NUMBER_OF_OUTPUT_ROWS, "1")))
Expand Down Expand Up @@ -114,16 +129,17 @@ class SqlResourceSuite extends SparkFunSuite with PrivateMethodTester {
val sqlResource = new SqlResource()
val sqlExecutionUIData = getSQLExecutionUIData()
val prepareExecutionData = PrivateMethod[ExecutionData]('prepareExecutionData)
val nodeIdAndWSCGIdMap: Map[Long, Option[Long]] = Map(2L -> Some(1L), 3L -> Some(1L))

test("Prepare ExecutionData when details = false and planDescription = false") {
val executionData =
sqlResource invokePrivate prepareExecutionData(sqlExecutionUIData, false, false)
sqlResource invokePrivate prepareExecutionData(sqlExecutionUIData, nodeIdAndWSCGIdMap, false, false)
verifyExpectedExecutionData(executionData, metricDetails = Seq.empty, planDescription = "")
}

test("Prepare ExecutionData when details = true and planDescription = false") {
val executionData =
sqlResource invokePrivate prepareExecutionData(sqlExecutionUIData, true, false)
sqlResource invokePrivate prepareExecutionData(sqlExecutionUIData, nodeIdAndWSCGIdMap, true, false)
verifyExpectedExecutionData(
executionData,
metricDetails = getExpectedMetricDetails(),
Expand All @@ -132,11 +148,27 @@ class SqlResourceSuite extends SparkFunSuite with PrivateMethodTester {

test("Prepare ExecutionData when details = true and planDescription = true") {
val executionData =
sqlResource invokePrivate prepareExecutionData(sqlExecutionUIData, true, true)
sqlResource invokePrivate prepareExecutionData(sqlExecutionUIData, nodeIdAndWSCGIdMap, true, true)
verifyExpectedExecutionData(
executionData,
metricDetails = getExpectedMetricDetails(),
planDescription = PLAN_DESCRIPTION)
}

test("Prepare ExecutionData when details = true and planDescription = false and WholeStageCodegen = off") {
val executionData =
sqlResource invokePrivate prepareExecutionData(sqlExecutionUIData, Map.empty, true, false)
verifyExpectedExecutionData(
executionData,
metricDetails = getExpectedMetricDetailsWhenWholeStageCodegenIsOff(),
planDescription = "")
}

test("Parse wholeStageCodegenId from nodeName") {
val getWholeStageCodegenId = PrivateMethod[Option[Long]]('getWholeStageCodegenId)
val wholeStageCodegenId =
sqlResource invokePrivate getWholeStageCodegenId(WHOLE_STAGE_CODEGEN_1)
assert(wholeStageCodegenId == Some(1))
}

}

0 comments on commit f41435e

Please sign in to comment.