Skip to content

Commit

Permalink
[SPARK-47289][SQL] Allow extensions to log extended information in ex…
Browse files Browse the repository at this point in the history
…plain plan

### What changes were proposed in this pull request?

This addresses SPARK-47289 and adds a new section in explain plan where Spark extensions can add additional information for end users. The section is included in the output only if the relevant configuration is enabled and if the extension actually adds some new information

### Why are the changes needed?
Extensions to Spark can add their own planning rules and sometimes may need to add additional information about how the plan was generated. This is useful for end users in determining if the extensions rules are working as intended.

### Does this PR introduce _any_ user-facing change?
This PR increases the information logged in the UI in the query plan. The attached screenshot shows output from an extension which provides some of its own operations but does not support some operations.

<img width="703" alt="Screenshot 2024-03-11 at 10 23 36 AM" src="https://github.com/apache/spark/assets/6529136/88594772-f85e-4fd4-8eac-33017ef0c1c6">

### How was this patch tested?
Unit test and manual testing

### Was this patch authored or co-authored using generative AI tooling?
No

Closes #45488 from parthchandra/explain_extensions.

Authored-by: Parth Chandra <parthc@apple.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
  • Loading branch information
parthchandra authored and cloud-fan committed Apr 5, 2024
1 parent 0107435 commit d5620cb
Show file tree
Hide file tree
Showing 5 changed files with 134 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,8 @@ object CheckConnectJvmClientCompatibility {
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.SparkSessionExtensions"),
ProblemFilters.exclude[MissingClassProblem](
"org.apache.spark.sql.SparkSessionExtensionsProvider"),
ProblemFilters.exclude[MissingClassProblem](
"org.apache.spark.sql.ExtendedExplainGenerator"),
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.UDTFRegistration"),
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.UDFRegistration$"),
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.DataSourceRegistration"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -333,6 +333,15 @@ object SQLConf {
.booleanConf
.createWithDefault(true)

val EXTENDED_EXPLAIN_PROVIDERS = buildConf("spark.sql.extendedExplainProviders")
.doc("A comma-separated list of classes that implement the" +
" org.apache.spark.sql.ExtendedExplainGenerator trait. If provided, Spark will print" +
" extended plan information from the providers in explain plan and in the UI")
.version("4.0.0")
.stringConf
.toSequence
.createOptional

val DYNAMIC_PARTITION_PRUNING_ENABLED =
buildConf("spark.sql.optimizer.dynamicPartitionPruning.enabled")
.doc("When true, we will generate predicate for partition column when it's used as join key")
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql

import org.apache.spark.annotation.{DeveloperApi, Since}
import org.apache.spark.sql.execution.SparkPlan

/**
* A trait for a session extension to implement that provides addition explain plan
* information.
*/
@DeveloperApi
@Since("4.0.0")
trait ExtendedExplainGenerator {
def title: String
def generateExtendedInfo(plan: SparkPlan): String
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,14 @@ import java.io.{BufferedWriter, OutputStreamWriter}
import java.util.UUID
import java.util.concurrent.atomic.AtomicLong

import scala.util.control.NonFatal

import org.apache.hadoop.fs.Path

import org.apache.spark.SparkException
import org.apache.spark.internal.Logging
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
import org.apache.spark.sql.{AnalysisException, ExtendedExplainGenerator, Row, SparkSession}
import org.apache.spark.sql.catalyst.{InternalRow, QueryPlanningTracker}
import org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker
import org.apache.spark.sql.catalyst.expressions.codegen.ByteCodeStats
Expand Down Expand Up @@ -258,6 +260,7 @@ class QueryExecution(
QueryPlan.append(executedPlan,
append, verbose = false, addSuffix = false, maxFields = maxFields)
}
extendedExplainInfo(append, executedPlan)
append("\n")
}

Expand Down Expand Up @@ -317,6 +320,7 @@ class QueryExecution(
QueryPlan.append(optimizedPlan, append, verbose, addSuffix, maxFields)
append("\n== Physical Plan ==\n")
QueryPlan.append(executedPlan, append, verbose, addSuffix, maxFields)
extendedExplainInfo(append, executedPlan)
} catch {
case e: AnalysisException => append(e.toString)
}
Expand Down Expand Up @@ -369,6 +373,23 @@ class QueryExecution(
Utils.redact(sparkSession.sessionState.conf.stringRedactionPattern, message)
}

def extendedExplainInfo(append: String => Unit, plan: SparkPlan): Unit = {
val generators = sparkSession.sessionState.conf.getConf(SQLConf.EXTENDED_EXPLAIN_PROVIDERS)
.getOrElse(Seq.empty)
val extensions = Utils.loadExtensions(classOf[ExtendedExplainGenerator],
generators,
sparkSession.sparkContext.conf)
if (extensions.nonEmpty) {
extensions.foreach(extension =>
try {
append(s"\n== Extended Information (${extension.title}) ==\n")
append(extension.generateExtendedInfo(plan))
} catch {
case NonFatal(e) => logWarning(s"Cannot use $extension to get extended information.", e)
})
}
}

/** A special namespace for commands that can be used to debug query execution. */
// scalastyle:off
object debug {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,21 @@
*/
package org.apache.spark.sql.execution

import scala.collection.mutable
import scala.io.Source

import org.apache.spark.sql.{AnalysisException, Dataset, FastOperator}
import org.apache.spark.sql.{AnalysisException, Dataset, ExtendedExplainGenerator, FastOperator}
import org.apache.spark.sql.catalyst.{QueryPlanningTracker, QueryPlanningTrackerCallback}
import org.apache.spark.sql.catalyst.analysis.CurrentNamespace
import org.apache.spark.sql.catalyst.expressions.UnsafeRow
import org.apache.spark.sql.catalyst.plans.QueryPlan
import org.apache.spark.sql.catalyst.plans.logical.{CommandResult, LogicalPlan, OneRowRelation, Project, ShowTables, SubqueryAlias}
import org.apache.spark.sql.catalyst.trees.TreeNodeTag
import org.apache.spark.sql.catalyst.util.StringUtils.PlanStringConcat
import org.apache.spark.sql.connector.catalog.CatalogManager.SESSION_CATALOG_NAME
import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanExec, QueryStageExec}
import org.apache.spark.sql.execution.datasources.v2.ShowTablesExec
import org.apache.spark.sql.execution.joins.SortMergeJoinExec
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SharedSparkSession
import org.apache.spark.util.Utils
Expand Down Expand Up @@ -336,6 +340,28 @@ class QueryExecutionSuite extends SharedSparkSession {
}
}

test("SPARK-47289: extended explain info") {
val concat = new PlanStringConcat()
withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true",
SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1",
SQLConf.EXTENDED_EXPLAIN_PROVIDERS.key -> "org.apache.spark.sql.execution.ExtendedInfo") {
val left = Seq((1, 10L), (2, 100L), (3, 1000L)).toDF("l1", "l2")
val right = Seq((1, 2), (2, 3), (3, 5)).toDF("r1", "r2")
val data = left.join(right, $"l1" === $"r1")
val df = data.selectExpr("l2 + r2")
// execute the plan so that the final adaptive plan is available
df.collect()
val qe = df.queryExecution
qe.extendedExplainInfo(concat.append, qe.executedPlan)
val info = concat.toString
val expected = "\n== Extended Information (Test) ==\n" +
"Scan Info: LocalTableScan\n" +
"Project Info: Project\n" +
"SMJ Info: SortMergeJoin"
assert(info == expected)
}
}

case class MockCallbackEagerCommand(
var trackerAnalyzed: QueryPlanningTracker = null,
var trackerReadyForExecution: QueryPlanningTracker = null)
Expand Down Expand Up @@ -413,3 +439,45 @@ class QueryExecutionSuite extends SharedSparkSession {
}
}
}

class ExtendedInfo extends ExtendedExplainGenerator {

override def title: String = "Test"

override def generateExtendedInfo(plan: SparkPlan): String = {
val info = mutable.LinkedHashSet[String]() // don't allow duplicates
extensionInfo(plan, info)
info.mkString("\n")
}

def getActualPlan(plan: SparkPlan): SparkPlan = {
plan match {
case p : AdaptiveSparkPlanExec => p.executedPlan
case p : QueryStageExec => p.plan
case p : WholeStageCodegenExec => p.child
case p => p
}
}

def extensionInfo(p: SparkPlan, info: mutable.Set[String]): Unit = {
// val info = mutable.Set[String]() // don't allow duplicates
val actualPlan = getActualPlan(p)
if (actualPlan.innerChildren.nonEmpty) {
actualPlan.innerChildren.foreach(c => {
extensionInfo(c.asInstanceOf[SparkPlan], info)
})
}
if (actualPlan.children.nonEmpty) {
actualPlan.children.foreach(c => {
extensionInfo(c, info)
})
}
actualPlan match {
case p : LocalTableScanExec => info += s"Scan Info: ${p.nodeName}"
case p : SortMergeJoinExec => info += s"SMJ Info: ${p.nodeName}"
case p : ProjectExec => info += s"Project Info: ${p.nodeName}"
case _ =>
}
()
}
}

0 comments on commit d5620cb

Please sign in to comment.