Skip to content

Commit

Permalink
[SPARK-37379][SQL] Add tree pattern pruning to CTESubstitution rule
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?

This PR adds tree pattern pruning to the `CTESubstitution` analyzer rule. The rule will now exit early if the tree does not contain an `UnresolvedWith` node.

### Why are the changes needed?

Analysis is eagerly performed after every DataFrame transformation. If a user's program performs a long chain of _n_ transformations to construct a large query plan then this can lead to _O(n^2)_ performance costs from `CTESubstitution` because it is applied _n_ times and each application traverses the entire logical plan tree (which contains _O(n)_ nodes). In the case of chained `withColumn` calls (leading to stacked `Project` nodes) it's possible to see _O(n^3)_ slowdowns where _n_ is the number of projects: this is because there are _n_ separate analysis phases, each of which calls `CTESubstitution.traverseAndSubstituteCTE`, where each call visits each of the _n_ `Project` nodes and each of their _O(n)_ expressions.

Very large DataFrame plans typically do not use CTEs because there is not a DataFrame syntax for them (although they might appear in the plan if `sql(someQueryWithCTE)` is used). As a result, this PR's proposed optimization to skip `CTESubstitution` can greatly reduce the analysis cost for such plans.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

I believe that optimizer correctness is covered by existing tests.

As a toy benchmark, I ran

```
import org.apache.spark.sql.DataFrame
org.apache.spark.sql.catalyst.rules.RuleExecutor.resetMetrics()
(1 to 600).foldLeft(spark.range(100).toDF)((df: DataFrame, i: Int) => df.withColumn(s"col$i", $"id" % i))
println(org.apache.spark.sql.catalyst.rules.RuleExecutor.dumpTimeSpent())
```

on my laptop before and after this PR's changes (simulating a _O(n^3)_ case). Skipping `CTESubstitution` cut the running time from ~28.4 seconds to ~15.5 seconds.

The bulk of the remaining time comes from `DeduplicateRelations`, for which I plan to submit a separate optimization PR.

Closes #34658 from JoshRosen/CTESubstitution-tree-pattern-pruning.

Authored-by: Josh Rosen <joshrosen@databricks.com>
Signed-off-by: Josh Rosen <joshrosen@databricks.com>
  • Loading branch information
JoshRosen committed Nov 19, 2021
1 parent 53f9334 commit 3b4eb1f
Show file tree
Hide file tree
Showing 3 changed files with 6 additions and 0 deletions.
Expand Up @@ -48,6 +48,9 @@ import org.apache.spark.sql.internal.SQLConf.{LEGACY_CTE_PRECEDENCE_POLICY, Lega
*/
object CTESubstitution extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = {
if (!plan.containsPattern(UNRESOLVED_WITH)) {
return plan
}
val isCommand = plan.find {
case _: Command | _: ParsedStatement | _: InsertIntoDir => true
case _ => false
Expand Down
Expand Up @@ -626,6 +626,8 @@ object View {
case class UnresolvedWith(
child: LogicalPlan,
cteRelations: Seq[(String, SubqueryAlias)]) extends UnaryNode {
final override val nodePatterns: Seq[TreePattern] = Seq(UNRESOLVED_WITH)

override def output: Seq[Attribute] = child.output

override def simpleString(maxFields: Int): String = {
Expand Down
Expand Up @@ -111,6 +111,7 @@ object TreePattern extends Enumeration {
val REPARTITION_OPERATION: Value = Value
val UNION: Value = Value
val UNRESOLVED_RELATION: Value = Value
val UNRESOLVED_WITH: Value = Value
val TYPED_FILTER: Value = Value
val WINDOW: Value = Value
val WITH_WINDOW_DEFINITION: Value = Value
Expand Down

0 comments on commit 3b4eb1f

Please sign in to comment.