Skip to content

Commit

Permalink
[SPARK-40297][SQL] CTE outer reference nested in CTE main body cannot…
Browse files Browse the repository at this point in the history
… be resolved

This PR fixes a bug where a CTE reference cannot be resolved if this reference occurs in an inner CTE definition nested in the outer CTE's main body FROM clause. E.g.,
```
WITH cte_outer AS (
  SELECT 1
)
SELECT * FROM (
  WITH cte_inner AS (
    SELECT * FROM cte_outer
  )
  SELECT * FROM cte_inner
)
```

This fix is to change the `CTESubstitution`'s traverse order from `resolveOperatorsUpWithPruning` to `resolveOperatorsDownWithPruning` and also to recursively call `traverseAndSubstituteCTE` for CTE main body.

Bug fix. Without the fix an `AnalysisException` would be thrown for CTE queries mentioned above.

No.

Added UTs.

Closes #37751 from maryannxue/spark-40297.

Authored-by: Maryann Xue <maryann.xue@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
  • Loading branch information
maryannxue authored and cloud-fan committed Sep 6, 2022
1 parent 9473840 commit 1324f7d
Show file tree
Hide file tree
Showing 6 changed files with 476 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ object CTESubstitution extends Rule[LogicalPlan] {
case _ => false
}
val cteDefs = ArrayBuffer.empty[CTERelationDef]
val (substituted, lastSubstituted) =
val (substituted, firstSubstituted) =
LegacyBehaviorPolicy.withName(conf.getConf(LEGACY_CTE_PRECEDENCE_POLICY)) match {
case LegacyBehaviorPolicy.EXCEPTION =>
assertNoNameConflictsInCTE(plan)
Expand All @@ -68,12 +68,17 @@ object CTESubstitution extends Rule[LogicalPlan] {
}
if (cteDefs.isEmpty) {
substituted
} else if (substituted eq lastSubstituted.get) {
} else if (substituted eq firstSubstituted.get) {
WithCTE(substituted, cteDefs.toSeq)
} else {
var done = false
substituted.resolveOperatorsWithPruning(_ => !done) {
case p if p eq lastSubstituted.get =>
case p if p eq firstSubstituted.get =>
// `firstSubstituted` is the parent of all other CTEs (if any).
done = true
WithCTE(p, cteDefs.toSeq)
case p if p.children.count(_.containsPattern(CTE)) > 1 =>
// This is the first common parent of all CTEs.
done = true
WithCTE(p, cteDefs.toSeq)
}
Expand Down Expand Up @@ -181,21 +186,28 @@ object CTESubstitution extends Rule[LogicalPlan] {
isCommand: Boolean,
outerCTEDefs: Seq[(String, CTERelationDef)],
cteDefs: ArrayBuffer[CTERelationDef]): (LogicalPlan, Option[LogicalPlan]) = {
var lastSubstituted: Option[LogicalPlan] = None
val newPlan = plan.resolveOperatorsUpWithPruning(
var firstSubstituted: Option[LogicalPlan] = None
val newPlan = plan.resolveOperatorsDownWithPruning(
_.containsAnyPattern(UNRESOLVED_WITH, PLAN_EXPRESSION)) {
case UnresolvedWith(child: LogicalPlan, relations) =>
val resolvedCTERelations =
resolveCTERelations(relations, isLegacy = false, isCommand, outerCTEDefs, cteDefs)
lastSubstituted = Some(substituteCTE(child, isCommand, resolvedCTERelations))
lastSubstituted.get
resolveCTERelations(relations, isLegacy = false, isCommand, outerCTEDefs, cteDefs) ++
outerCTEDefs
val substituted = substituteCTE(
traverseAndSubstituteCTE(child, isCommand, resolvedCTERelations, cteDefs)._1,
isCommand,
resolvedCTERelations)
if (firstSubstituted.isEmpty) {
firstSubstituted = Some(substituted)
}
substituted

case other =>
other.transformExpressionsWithPruning(_.containsPattern(PLAN_EXPRESSION)) {
case e: SubqueryExpression => e.withNewPlan(apply(e.plan))
}
}
(newPlan, lastSubstituted)
(newPlan, firstSubstituted)
}

private def resolveCTERelations(
Expand Down
59 changes: 58 additions & 1 deletion sql/core/src/test/resources/sql-tests/inputs/cte-nested.sql
Original file line number Diff line number Diff line change
Expand Up @@ -146,4 +146,61 @@ WITH
)
SELECT * FROM t3
)
SELECT * FROM t2;
SELECT * FROM t2;

-- CTE nested in CTE main body FROM clause references outer CTE def
WITH cte_outer AS (
SELECT 1
)
SELECT * FROM (
WITH cte_inner AS (
SELECT * FROM cte_outer
)
SELECT * FROM cte_inner
);

-- CTE double nested in CTE main body FROM clause references outer CTE def
WITH cte_outer AS (
SELECT 1
)
SELECT * FROM (
WITH cte_inner AS (
SELECT * FROM (
WITH cte_inner_inner AS (
SELECT * FROM cte_outer
)
SELECT * FROM cte_inner_inner
)
)
SELECT * FROM cte_inner
);

-- Invalid reference to invisible CTE def nested CTE def
WITH cte_outer AS (
WITH cte_invisible_inner AS (
SELECT 1
)
SELECT * FROM cte_invisible_inner
)
SELECT * FROM (
WITH cte_inner AS (
SELECT * FROM cte_invisible_inner
)
SELECT * FROM cte_inner
);

-- Invalid reference to invisible CTE def nested CTE def (in FROM)
WITH cte_outer AS (
SELECT * FROM (
WITH cte_invisible_inner AS (
SELECT 1
)
SELECT * FROM cte_invisible_inner
)
)
SELECT * FROM (
WITH cte_inner AS (
SELECT * FROM cte_invisible_inner
)
SELECT * FROM cte_inner
);
80 changes: 80 additions & 0 deletions sql/core/src/test/resources/sql-tests/results/cte-legacy.sql.out
Original file line number Diff line number Diff line change
Expand Up @@ -236,3 +236,83 @@ struct<>
-- !query output
org.apache.spark.sql.AnalysisException
Table or view not found: t1; line 5 pos 20


-- !query
WITH cte_outer AS (
SELECT 1
)
SELECT * FROM (
WITH cte_inner AS (
SELECT * FROM cte_outer
)
SELECT * FROM cte_inner
)
-- !query schema
struct<1:int>
-- !query output
1


-- !query
WITH cte_outer AS (
SELECT 1
)
SELECT * FROM (
WITH cte_inner AS (
SELECT * FROM (
WITH cte_inner_inner AS (
SELECT * FROM cte_outer
)
SELECT * FROM cte_inner_inner
)
)
SELECT * FROM cte_inner
)
-- !query schema
struct<>
-- !query output
org.apache.spark.sql.AnalysisException
Table or view not found: cte_outer; line 8 pos 22


-- !query
WITH cte_outer AS (
WITH cte_invisible_inner AS (
SELECT 1
)
SELECT * FROM cte_invisible_inner
)
SELECT * FROM (
WITH cte_inner AS (
SELECT * FROM cte_invisible_inner
)
SELECT * FROM cte_inner
)
-- !query schema
struct<>
-- !query output
org.apache.spark.sql.AnalysisException
Table or view not found: cte_invisible_inner; line 9 pos 18


-- !query
WITH cte_outer AS (
SELECT * FROM (
WITH cte_invisible_inner AS (
SELECT 1
)
SELECT * FROM cte_invisible_inner
)
)
SELECT * FROM (
WITH cte_inner AS (
SELECT * FROM cte_invisible_inner
)
SELECT * FROM cte_inner
)
-- !query schema
struct<>
-- !query output
org.apache.spark.sql.AnalysisException
Table or view not found: cte_invisible_inner; line 11 pos 18
79 changes: 79 additions & 0 deletions sql/core/src/test/resources/sql-tests/results/cte-nested.sql.out
Original file line number Diff line number Diff line change
Expand Up @@ -243,3 +243,82 @@ SELECT * FROM t2
struct<1:int>
-- !query output
1


-- !query
WITH cte_outer AS (
SELECT 1
)
SELECT * FROM (
WITH cte_inner AS (
SELECT * FROM cte_outer
)
SELECT * FROM cte_inner
)
-- !query schema
struct<1:int>
-- !query output
1


-- !query
WITH cte_outer AS (
SELECT 1
)
SELECT * FROM (
WITH cte_inner AS (
SELECT * FROM (
WITH cte_inner_inner AS (
SELECT * FROM cte_outer
)
SELECT * FROM cte_inner_inner
)
)
SELECT * FROM cte_inner
)
-- !query schema
struct<1:int>
-- !query output
1


-- !query
WITH cte_outer AS (
WITH cte_invisible_inner AS (
SELECT 1
)
SELECT * FROM cte_invisible_inner
)
SELECT * FROM (
WITH cte_inner AS (
SELECT * FROM cte_invisible_inner
)
SELECT * FROM cte_inner
)
-- !query schema
struct<>
-- !query output
org.apache.spark.sql.AnalysisException
Table or view not found: cte_invisible_inner; line 9 pos 18


-- !query
WITH cte_outer AS (
SELECT * FROM (
WITH cte_invisible_inner AS (
SELECT 1
)
SELECT * FROM cte_invisible_inner
)
)
SELECT * FROM (
WITH cte_inner AS (
SELECT * FROM cte_invisible_inner
)
SELECT * FROM cte_inner
)
-- !query schema
struct<>
-- !query output
org.apache.spark.sql.AnalysisException
Table or view not found: cte_invisible_inner; line 11 pos 18
Original file line number Diff line number Diff line change
Expand Up @@ -235,3 +235,82 @@ SELECT * FROM t2
struct<1:int>
-- !query output
1


-- !query
WITH cte_outer AS (
SELECT 1
)
SELECT * FROM (
WITH cte_inner AS (
SELECT * FROM cte_outer
)
SELECT * FROM cte_inner
)
-- !query schema
struct<1:int>
-- !query output
1


-- !query
WITH cte_outer AS (
SELECT 1
)
SELECT * FROM (
WITH cte_inner AS (
SELECT * FROM (
WITH cte_inner_inner AS (
SELECT * FROM cte_outer
)
SELECT * FROM cte_inner_inner
)
)
SELECT * FROM cte_inner
)
-- !query schema
struct<1:int>
-- !query output
1


-- !query
WITH cte_outer AS (
WITH cte_invisible_inner AS (
SELECT 1
)
SELECT * FROM cte_invisible_inner
)
SELECT * FROM (
WITH cte_inner AS (
SELECT * FROM cte_invisible_inner
)
SELECT * FROM cte_inner
)
-- !query schema
struct<>
-- !query output
org.apache.spark.sql.AnalysisException
Table or view not found: cte_invisible_inner; line 9 pos 18


-- !query
WITH cte_outer AS (
SELECT * FROM (
WITH cte_invisible_inner AS (
SELECT 1
)
SELECT * FROM cte_invisible_inner
)
)
SELECT * FROM (
WITH cte_inner AS (
SELECT * FROM cte_invisible_inner
)
SELECT * FROM cte_inner
)
-- !query schema
struct<>
-- !query output
org.apache.spark.sql.AnalysisException
Table or view not found: cte_invisible_inner; line 11 pos 18
Loading

0 comments on commit 1324f7d

Please sign in to comment.