Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-24497][SQL] Support recursive SQL #40744

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
41 changes: 41 additions & 0 deletions common/utils/src/main/resources/error/error-classes.json
Expand Up @@ -2157,6 +2157,35 @@
],
"sqlState" : "42602"
},
"INVALID_RECURSIVE_CTE" : {
"message" : [
"Invalid recursive definition found. Recursive queries must contain an UNION or an UNION ALL statement with 2 children. The first child needs to be the anchor term without any recursive references."
],
"sqlState" : "42836"
},
"INVALID_RECURSIVE_REFERENCE" : {
"message" : [
"Invalid recursive reference found."
],
"subClass" : {
"DATA_TYPE" : {
"message" : [
"The data type of recursive references cannot change during resolution. Originally it was <fromDataType> but after resolution is <toDataType>."
]
},
"NUMBER" : {
"message" : [
"Recursive references cannot be used multiple times."
]
},
"PLACE" : {
"message" : [
"Recursive references cannot be used on the right side of left outer/semi/anti joins, on the left side of right outer joins, in full outer joins and in aggregates."
]
}
},
"sqlState" : "42836"
},
"INVALID_SCHEMA" : {
"message" : [
"The input schema <inputSchema> is not a valid schema string."
Expand Down Expand Up @@ -2898,6 +2927,18 @@
],
"sqlState" : "38000"
},
"RECURSIVE_CTE_IN_LEGACY_MODE" : {
"message" : [
"Recursive definitions cannot be used in legacy CTE precedence mode (spark.sql.legacy.ctePrecedencePolicy=LEGACY)."
],
"sqlState" : "42836"
},
"RECURSIVE_CTE_WHEN_INLINING_IS_FORCED" : {
"message" : [
"Recursive definitions cannot be used when CTE inlining is forced."
],
"sqlState" : "42836"
},
"RECURSIVE_PROTOBUF_SCHEMA" : {
"message" : [
"Found recursive reference in Protobuf schema, which can not be processed by Spark by default: <fieldDescriptor>. try setting the option `recursive.fields.max.depth` 0 to 10. Going beyond 10 levels of recursion is not allowed."
Expand Down
@@ -0,0 +1,40 @@
---
layout: global
title: INVALID_RECURSIVE_REFERENCE error class
displayTitle: INVALID_RECURSIVE_REFERENCE error class
license: |
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
---

[SQLSTATE: 42836](sql-error-conditions-sqlstates.html#class-42-syntax-error-or-access-rule-violation)

Invalid recursive reference found.

This error class has the following derived error classes:

## DATA_TYPE

The data type of recursive references cannot change during resolution. Originally it was `<fromDataType>` but after resolution is `<toDataType>`.

## NUMBER

Recursive references cannot be used multiple times.

## PLACE

Recursive references cannot be used on the right side of left outer/semi/anti joins, on the left side of right outer joins, in full outer joins and in aggregates.


26 changes: 26 additions & 0 deletions docs/sql-error-conditions.md
Expand Up @@ -1249,6 +1249,20 @@ For more details see [INVALID_PARTITION_OPERATION](sql-error-conditions-invalid-

`<value>` is an invalid property value, please use quotes, e.g. SET `<key>`=`<value>`

### INVALID_RECURSIVE_CTE

[SQLSTATE: 42836](sql-error-conditions-sqlstates.html#class-42-syntax-error-or-access-rule-violation)

Invalid recursive definition found. Recursive queries must contain an UNION or an UNION ALL statement with 2 children. The first child needs to be the anchor term without any recursive references.

### [INVALID_RECURSIVE_REFERENCE](sql-error-conditions-invalid-recursive-reference-error-class.html)

[SQLSTATE: 42836](sql-error-conditions-sqlstates.html#class-42-syntax-error-or-access-rule-violation)

Invalid recursive reference found.

For more details see [INVALID_RECURSIVE_REFERENCE](sql-error-conditions-invalid-recursive-reference-error-class.html)

### [INVALID_SCHEMA](sql-error-conditions-invalid-schema-error-class.html)

[SQLSTATE: 42K07](sql-error-conditions-sqlstates.html#class-42-syntax-error-or-access-rule-violation)
Expand Down Expand Up @@ -1762,6 +1776,18 @@ Protobuf type not yet supported: `<protobufType>`.

Failed to `<action>` Python data source `<type>` in Python: `<msg>`

### RECURSIVE_CTE_IN_LEGACY_MODE

[SQLSTATE: 42836](sql-error-conditions-sqlstates.html#class-42-syntax-error-or-access-rule-violation)

Recursive definitions cannot be used in legacy CTE precedence mode (spark.sql.legacy.ctePrecedencePolicy=LEGACY).

### RECURSIVE_CTE_WHEN_INLINING_IS_FORCED

[SQLSTATE: 42836](sql-error-conditions-sqlstates.html#class-42-syntax-error-or-access-rule-violation)

Recursive definitions cannot be used when CTE inlining is forced.

### RECURSIVE_PROTOBUF_SCHEMA

[SQLSTATE: 42K0G](sql-error-conditions-sqlstates.html#class-42-syntax-error-or-access-rule-violation)
Expand Down
1 change: 1 addition & 0 deletions docs/sql-ref-ansi-compliance.md
Expand Up @@ -593,6 +593,7 @@ Below is a list of all the keywords in Spark SQL.
|RECORDREADER|non-reserved|non-reserved|non-reserved|
|RECORDWRITER|non-reserved|non-reserved|non-reserved|
|RECOVER|non-reserved|non-reserved|non-reserved|
|RECURSIVE|reserved|non-reserved|reserved|
|REDUCE|non-reserved|non-reserved|non-reserved|
|REFERENCES|reserved|non-reserved|reserved|
|REFRESH|non-reserved|non-reserved|non-reserved|
Expand Down
Expand Up @@ -312,6 +312,7 @@ REAL: 'REAL';
RECORDREADER: 'RECORDREADER';
RECORDWRITER: 'RECORDWRITER';
RECOVER: 'RECOVER';
RECURSIVE: 'RECURSIVE';
REDUCE: 'REDUCE';
REFERENCES: 'REFERENCES';
REFRESH: 'REFRESH';
Expand Down
Expand Up @@ -372,7 +372,7 @@ describeColName
;

ctes
: WITH namedQuery (COMMA namedQuery)*
: WITH RECURSIVE? namedQuery (COMMA namedQuery)*
;

namedQuery
Expand Down Expand Up @@ -1806,6 +1806,7 @@ nonReserved
| RECORDREADER
| RECORDWRITER
| RECOVER
| RECURSIVE
| REDUCE
| REFERENCES
| REFRESH
Expand Down
Expand Up @@ -350,7 +350,9 @@ class Analyzer(override val catalogManager: CatalogManager) extends RuleExecutor
Batch("HandleSpecialCommand", Once,
HandleSpecialCommand),
Batch("Remove watermark for batch query", Once,
EliminateEventTimeWatermark)
EliminateEventTimeWatermark),
Batch("Insert Loops", Once,
InsertLoops)
)

/**
Expand Down
Expand Up @@ -123,7 +123,7 @@ object CTESubstitution extends Rule[LogicalPlan] {
startOfQuery: Boolean = true): Unit = {
val resolver = conf.resolver
plan match {
case UnresolvedWith(child, relations) =>
case UnresolvedWith(child, relations, _) =>
val newNames = ArrayBuffer.empty[String]
newNames ++= outerCTERelationNames
relations.foreach {
Expand All @@ -149,10 +149,15 @@ object CTESubstitution extends Rule[LogicalPlan] {
plan: LogicalPlan,
cteDefs: ArrayBuffer[CTERelationDef]): LogicalPlan = {
plan.resolveOperatorsUp {
case cte @ UnresolvedWith(child, relations) =>
val resolvedCTERelations =
resolveCTERelations(relations, isLegacy = true, forceInline = false, Seq.empty, cteDefs)
val substituted = substituteCTE(child, alwaysInline = true, resolvedCTERelations)
case cte @ UnresolvedWith(child, relations, allowRecursion) =>
if (allowRecursion) {
cte.failAnalysis(
errorClass = "RECURSIVE_CTE_IN_LEGACY_MODE",
messageParameters = Map.empty)
}
val resolvedCTERelations = resolveCTERelations(relations, isLegacy = true,
forceInline = false, Seq.empty, cteDefs, allowRecursion)
val substituted = substituteCTE(child, alwaysInline = true, resolvedCTERelations, None)._1
substituted.copyTagsFrom(cte)
substituted
}
Expand Down Expand Up @@ -204,14 +209,20 @@ object CTESubstitution extends Rule[LogicalPlan] {
var firstSubstituted: Option[LogicalPlan] = None
val newPlan = plan.resolveOperatorsDownWithPruning(
_.containsAnyPattern(UNRESOLVED_WITH, PLAN_EXPRESSION)) {
case cte @ UnresolvedWith(child: LogicalPlan, relations) =>
case cte @ UnresolvedWith(child, relations, allowRecursion) =>
if (allowRecursion && forceInline) {
cte.failAnalysis(
errorClass = "RECURSIVE_CTE_WHEN_INLINING_IS_FORCED",
messageParameters = Map.empty)
}
val resolvedCTERelations =
resolveCTERelations(relations, isLegacy = false, forceInline, outerCTEDefs, cteDefs) ++
outerCTEDefs
resolveCTERelations(relations, isLegacy = false, forceInline, outerCTEDefs, cteDefs,
allowRecursion) ++ outerCTEDefs
val substituted = substituteCTE(
traverseAndSubstituteCTE(child, forceInline, resolvedCTERelations, cteDefs)._1,
forceInline,
resolvedCTERelations)
resolvedCTERelations,
None)._1
if (firstSubstituted.isEmpty) {
firstSubstituted = Some(substituted)
}
Expand All @@ -231,7 +242,8 @@ object CTESubstitution extends Rule[LogicalPlan] {
isLegacy: Boolean,
forceInline: Boolean,
outerCTEDefs: Seq[(String, CTERelationDef)],
cteDefs: ArrayBuffer[CTERelationDef]): Seq[(String, CTERelationDef)] = {
cteDefs: ArrayBuffer[CTERelationDef],
allowRecursion: Boolean): Seq[(String, CTERelationDef)] = {
val alwaysInline = isLegacy || forceInline
var resolvedCTERelations = if (alwaysInline) {
Seq.empty
Expand All @@ -250,53 +262,127 @@ object CTESubstitution extends Rule[LogicalPlan] {
// NOTE: we must call `traverseAndSubstituteCTE` before `substituteCTE`, as the relations
// in the inner CTE have higher priority over the relations in the outer CTE when resolving
// inner CTE relations. For example:
// WITH t1 AS (SELECT 1)
// t2 AS (
// WITH t1 AS (SELECT 2)
// WITH t3 AS (SELECT * FROM t1)
// )
// WITH
// t1 AS (SELECT 1),
// t2 AS (
// WITH
// t1 AS (SELECT 2),
// t3 AS (SELECT * FROM t1)
// SELECT * FROM t1
// )
// SELECT * FROM t2
// t3 should resolve the t1 to `SELECT 2` instead of `SELECT 1`.
traverseAndSubstituteCTE(relation, forceInline, resolvedCTERelations, cteDefs)._1
//
// When recursion allowed:
// - don't add current definition to outer definitions of `traverseAndSubstituteCTE()` to
// prevent recursion inside inner CTEs.
// E.g. the following query will not resolve `t1` within `t2`:
// WITH RECURSIVE
// t1 AS (
// SELECT 1 AS level
// UNION (
// WITH t2 AS (SELECT level + 1 FROM t1 WHERE level < 10)
// SELECT * FROM t2
// )
// )
// SELECT * FROM t1
// - remove definitions that conflict with current relation `name` from outer definitions of
// `traverseAndSubstituteCTE()` to prevent weird resolutions.
// E.g. we don't want to resolve `t1` within `t3` to `SELECT 1`:
// WITH
// t1 AS (SELECT 1),
// t2 AS (
// WITH RECURSIVE
// t1 AS (
// SELECT 1 AS level
// UNION (
// WITH t3 AS (SELECT level + 1 FROM t1 WHERE level < 10)
// SELECT * FROM t3
// )
// )
// SELECT * FROM t1
// )
// SELECT * FROM t2
val nonConflictingCTERelations = if (allowRecursion) {
resolvedCTERelations.filterNot {
case (cteName, cteDef) => cteDef.conf.resolver(cteName, name)
}
} else {
resolvedCTERelations
}
traverseAndSubstituteCTE(relation, forceInline, nonConflictingCTERelations, cteDefs)._1
}

// If recursion is allowed then it has higher priority than outer or previous relations so
// construct a not yet substituted but recursive `CTERelationDef`, that we will prepend to
// `resolvedCTERelations`.
val recursiveCTERelation = if (allowRecursion) {
Some(name -> CTERelationDef(relation, recursive = true))
} else {
None
}
// CTE definition can reference a previous one
val substituted = substituteCTE(innerCTEResolved, alwaysInline, resolvedCTERelations)
val cteRelation = CTERelationDef(substituted)

// CTE definition can reference a previous one or itself if recursion allowed.
val (substituted, recursionFound) = substituteCTE(innerCTEResolved, alwaysInline,
resolvedCTERelations, recursiveCTERelation)
val cteRelation = recursiveCTERelation
.map(_._2.copy(child = substituted, recursive = recursionFound))
.getOrElse(CTERelationDef(substituted))
if (!alwaysInline) {
cteDefs += cteRelation
}

// From this point any reference to the definition is non-recursive.
val nonRecursiveCTERelation = if (cteRelation.recursive) {
cteRelation.copy(recursive = false)
} else {
cteRelation
}

// Prepending new CTEs makes sure that those have higher priority over outer ones.
resolvedCTERelations +:= (name -> cteRelation)
resolvedCTERelations +:= (name -> nonRecursiveCTERelation)
}
resolvedCTERelations
}

private def substituteCTE(
plan: LogicalPlan,
alwaysInline: Boolean,
cteRelations: Seq[(String, CTERelationDef)]): LogicalPlan = {
plan.resolveOperatorsUpWithPruning(
cteRelations: Seq[(String, CTERelationDef)],
recursiveCTERelation: Option[(String, CTERelationDef)]): (LogicalPlan, Boolean) = {
var recursionFound = false
val substituted = plan.resolveOperatorsUpWithPruning(
_.containsAnyPattern(RELATION_TIME_TRAVEL, UNRESOLVED_RELATION, PLAN_EXPRESSION)) {
case RelationTimeTravel(UnresolvedRelation(Seq(table), _, _), _, _)
if cteRelations.exists(r => plan.conf.resolver(r._1, table)) =>
throw QueryCompilationErrors.timeTravelUnsupportedError(toSQLId(table))

case u @ UnresolvedRelation(Seq(table), _, _) =>
cteRelations.find(r => plan.conf.resolver(r._1, table)).map { case (_, d) =>
if (alwaysInline) {
d.child
} else {
// Add a `SubqueryAlias` for hint-resolving rules to match relation names.
SubqueryAlias(table, CTERelationRef(d.id, d.resolved, d.output, d.isStreaming))
}
(recursiveCTERelation ++ cteRelations)
.find(r => plan.conf.resolver(r._1, table))
.map { case (_, d) =>
if (alwaysInline) {
d.child
} else {
if (d.recursive) {
recursionFound = true
}
// Add a `SubqueryAlias` for hint-resolving rules to match relation names.
SubqueryAlias(table,
CTERelationRef(d.id, d.resolved, d.output, d.isStreaming, recursive = d.recursive))
}
}.getOrElse(u)

case other =>
// This cannot be done in ResolveSubquery because ResolveSubquery does not know the CTE.
other.transformExpressionsWithPruning(_.containsPattern(PLAN_EXPRESSION)) {
case e: SubqueryExpression =>
e.withNewPlan(apply(substituteCTE(e.plan, alwaysInline, cteRelations)))
e.withNewPlan(
apply(substituteCTE(e.plan, alwaysInline, cteRelations, None)._1))
}
}

(substituted, recursionFound)
}

/**
Expand Down