-
Notifications
You must be signed in to change notification settings - Fork 28.1k
/
CTESubstitution.scala
315 lines (299 loc) · 13.9 KB
/
CTESubstitution.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.catalyst.analysis
import scala.collection.mutable.ArrayBuffer
import org.apache.spark.sql.catalyst.expressions.SubqueryExpression
import org.apache.spark.sql.catalyst.plans.logical.{Command, CTEInChildren, CTERelationDef, CTERelationRef, InsertIntoDir, LogicalPlan, ParsedStatement, SubqueryAlias, UnresolvedWith, WithCTE}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.catalyst.trees.TreePattern._
import org.apache.spark.sql.catalyst.util.TypeUtils._
import org.apache.spark.sql.errors.QueryCompilationErrors
import org.apache.spark.sql.internal.{LegacyBehaviorPolicy, SQLConf}
import org.apache.spark.sql.internal.SQLConf.LEGACY_CTE_PRECEDENCE_POLICY
/**
* Analyze WITH nodes and substitute child plan with CTE references or CTE definitions depending
* on the conditions below:
* 1. If in legacy mode, replace with CTE definitions, i.e., inline CTEs.
* 2. Otherwise, replace with CTE references `CTERelationRef`s. The decision to inline or not
* inline will be made later by the rule `InlineCTE` after query analysis.
*
* All the CTE definitions that are not inlined after this substitution will be grouped together
* under one `WithCTE` node for each of the main query and the subqueries. Any of the main query
* or the subqueries that do not contain CTEs or have had all CTEs inlined will obviously not have
* any `WithCTE` nodes. If any though, the `WithCTE` node will be in the same place as where the
* outermost `With` node once was.
*
* The CTE definitions in a `WithCTE` node are kept in the order of how they have been resolved.
* That means the CTE definitions are guaranteed to be in topological order base on their
* dependency for any valid CTE query (i.e., given CTE definitions A and B with B referencing A,
* A is guaranteed to appear before B). Otherwise, it must be an invalid user query, and an
* analysis exception will be thrown later by relation resolving rules.
*
* If the query is a SQL command or DML statement (extends `CTEInChildren`),
* place `WithCTE` into their children.
*/
object CTESubstitution extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = {
if (!plan.containsPattern(UNRESOLVED_WITH)) {
return plan
}
val commands = plan.collect {
case c @ (_: Command | _: ParsedStatement | _: InsertIntoDir) => c
}
val forceInline = if (commands.length == 1) {
if (conf.getConf(SQLConf.LEGACY_INLINE_CTE_IN_COMMANDS)) {
// The legacy behavior always inlines the CTE relations for queries in commands.
true
} else {
// If there is only one command and it's `CTEInChildren`, we can resolve
// CTE normally and don't need to force inline.
!commands.head.isInstanceOf[CTEInChildren]
}
} else if (commands.length > 1) {
// This can happen with the multi-insert statement. We should fall back to
// the legacy behavior.
true
} else {
false
}
val cteDefs = ArrayBuffer.empty[CTERelationDef]
val (substituted, firstSubstituted) =
LegacyBehaviorPolicy.withName(conf.getConf(LEGACY_CTE_PRECEDENCE_POLICY)) match {
case LegacyBehaviorPolicy.EXCEPTION =>
assertNoNameConflictsInCTE(plan)
traverseAndSubstituteCTE(plan, forceInline, Seq.empty, cteDefs)
case LegacyBehaviorPolicy.LEGACY =>
(legacyTraverseAndSubstituteCTE(plan, cteDefs), None)
case LegacyBehaviorPolicy.CORRECTED =>
traverseAndSubstituteCTE(plan, forceInline, Seq.empty, cteDefs)
}
if (cteDefs.isEmpty) {
substituted
} else if (substituted eq firstSubstituted.get) {
withCTEDefs(substituted, cteDefs.toSeq)
} else {
var done = false
substituted.resolveOperatorsWithPruning(_ => !done) {
case p if p eq firstSubstituted.get =>
// `firstSubstituted` is the parent of all other CTEs (if any).
done = true
withCTEDefs(p, cteDefs.toSeq)
case p if p.children.count(_.containsPattern(CTE)) > 1 =>
// This is the first common parent of all CTEs.
done = true
withCTEDefs(p, cteDefs.toSeq)
}
}
}
/**
* Spark 3.0 changes the CTE relations resolution, and inner relations take precedence. This is
* correct but we need to warn users about this behavior change under EXCEPTION mode, when we see
* CTE relations with conflicting names.
*
* Note that, before Spark 3.0 the parser didn't support CTE in the FROM clause. For example,
* `WITH ... SELECT * FROM (WITH ... SELECT ...)` was not supported. We should not fail for this
* case, as Spark versions before 3.0 can't run it anyway. The parameter `startOfQuery` is used
* to indicate where we can define CTE relations before Spark 3.0, and we should only check
* name conflicts when `startOfQuery` is true.
*/
private def assertNoNameConflictsInCTE(
plan: LogicalPlan,
outerCTERelationNames: Seq[String] = Nil,
startOfQuery: Boolean = true): Unit = {
val resolver = conf.resolver
plan match {
case UnresolvedWith(child, relations) =>
val newNames = ArrayBuffer.empty[String]
newNames ++= outerCTERelationNames
relations.foreach {
case (name, relation) =>
if (startOfQuery && outerCTERelationNames.exists(resolver(_, name))) {
throw QueryCompilationErrors.ambiguousRelationAliasNameInNestedCTEError(name)
}
// CTE relation is defined as `SubqueryAlias`. Here we skip it and check the child
// directly, so that `startOfQuery` is set correctly.
assertNoNameConflictsInCTE(relation.child, newNames.toSeq)
newNames += name
}
assertNoNameConflictsInCTE(child, newNames.toSeq, startOfQuery = false)
case other =>
other.subqueries.foreach(assertNoNameConflictsInCTE(_, outerCTERelationNames))
other.children.foreach(
assertNoNameConflictsInCTE(_, outerCTERelationNames, startOfQuery = false))
}
}
private def legacyTraverseAndSubstituteCTE(
plan: LogicalPlan,
cteDefs: ArrayBuffer[CTERelationDef]): LogicalPlan = {
plan.resolveOperatorsUp {
case UnresolvedWith(child, relations) =>
val resolvedCTERelations =
resolveCTERelations(relations, isLegacy = true, forceInline = false, Seq.empty, cteDefs)
substituteCTE(child, alwaysInline = true, resolvedCTERelations)
}
}
/**
* Traverse the plan and expression nodes as a tree and replace matching references with CTE
* references if `isCommand` is false, otherwise with the query plans of the corresponding
* CTE definitions.
* - If the rule encounters a WITH node then it substitutes the child of the node with CTE
* definitions of the node right-to-left order as a definition can reference to a previous
* one.
* For example the following query is valid:
* WITH
* t AS (SELECT 1),
* t2 AS (SELECT * FROM t)
* SELECT * FROM t2
* - If a CTE definition contains an inner WITH node then substitution of inner should take
* precedence because it can shadow an outer CTE definition.
* For example the following query should return 2:
* WITH
* t AS (SELECT 1),
* t2 AS (
* WITH t AS (SELECT 2)
* SELECT * FROM t
* )
* SELECT * FROM t2
* - If a CTE definition contains a subquery expression that contains an inner WITH node then
* substitution of inner should take precedence because it can shadow an outer CTE
* definition.
* For example the following query should return 2:
* WITH t AS (SELECT 1)
* SELECT (
* WITH t AS (SELECT 2)
* SELECT * FROM t
* )
* @param plan the plan to be traversed
* @param forceInline always inline the CTE relations if this is true
* @param outerCTEDefs already resolved outer CTE definitions with names
* @param cteDefs all accumulated CTE definitions
* @return the plan where CTE substitution is applied and optionally the last substituted `With`
* where CTE definitions will be gathered to
*/
private def traverseAndSubstituteCTE(
plan: LogicalPlan,
forceInline: Boolean,
outerCTEDefs: Seq[(String, CTERelationDef)],
cteDefs: ArrayBuffer[CTERelationDef]): (LogicalPlan, Option[LogicalPlan]) = {
var firstSubstituted: Option[LogicalPlan] = None
val newPlan = plan.resolveOperatorsDownWithPruning(
_.containsAnyPattern(UNRESOLVED_WITH, PLAN_EXPRESSION)) {
case UnresolvedWith(child: LogicalPlan, relations) =>
val resolvedCTERelations =
resolveCTERelations(relations, isLegacy = false, forceInline, outerCTEDefs, cteDefs) ++
outerCTEDefs
val substituted = substituteCTE(
traverseAndSubstituteCTE(child, forceInline, resolvedCTERelations, cteDefs)._1,
forceInline,
resolvedCTERelations)
if (firstSubstituted.isEmpty) {
firstSubstituted = Some(substituted)
}
substituted
case other =>
other.transformExpressionsWithPruning(_.containsPattern(PLAN_EXPRESSION)) {
case e: SubqueryExpression => e.withNewPlan(apply(e.plan))
}
}
(newPlan, firstSubstituted)
}
private def resolveCTERelations(
relations: Seq[(String, SubqueryAlias)],
isLegacy: Boolean,
forceInline: Boolean,
outerCTEDefs: Seq[(String, CTERelationDef)],
cteDefs: ArrayBuffer[CTERelationDef]): Seq[(String, CTERelationDef)] = {
val alwaysInline = isLegacy || forceInline
var resolvedCTERelations = if (alwaysInline) {
Seq.empty
} else {
outerCTEDefs
}
for ((name, relation) <- relations) {
val innerCTEResolved = if (isLegacy) {
// In legacy mode, outer CTE relations take precedence. Here we don't resolve the inner
// `With` nodes, later we will substitute `UnresolvedRelation`s with outer CTE relations.
// Analyzer will run this rule multiple times until all `With` nodes are resolved.
relation
} else {
// A CTE definition might contain an inner CTE that has a higher priority, so traverse and
// substitute CTE defined in `relation` first.
// NOTE: we must call `traverseAndSubstituteCTE` before `substituteCTE`, as the relations
// in the inner CTE have higher priority over the relations in the outer CTE when resolving
// inner CTE relations. For example:
// WITH t1 AS (SELECT 1)
// t2 AS (
// WITH t1 AS (SELECT 2)
// WITH t3 AS (SELECT * FROM t1)
// )
// t3 should resolve the t1 to `SELECT 2` instead of `SELECT 1`.
traverseAndSubstituteCTE(relation, forceInline, resolvedCTERelations, cteDefs)._1
}
// CTE definition can reference a previous one
val substituted = substituteCTE(innerCTEResolved, alwaysInline, resolvedCTERelations)
val cteRelation = CTERelationDef(substituted)
if (!alwaysInline) {
cteDefs += cteRelation
}
// Prepending new CTEs makes sure that those have higher priority over outer ones.
resolvedCTERelations +:= (name -> cteRelation)
}
resolvedCTERelations
}
private def substituteCTE(
plan: LogicalPlan,
alwaysInline: Boolean,
cteRelations: Seq[(String, CTERelationDef)]): LogicalPlan = {
plan.resolveOperatorsUpWithPruning(
_.containsAnyPattern(RELATION_TIME_TRAVEL, UNRESOLVED_RELATION, PLAN_EXPRESSION)) {
case RelationTimeTravel(UnresolvedRelation(Seq(table), _, _), _, _)
if cteRelations.exists(r => plan.conf.resolver(r._1, table)) =>
throw QueryCompilationErrors.timeTravelUnsupportedError(toSQLId(table))
case u @ UnresolvedRelation(Seq(table), _, _) =>
cteRelations.find(r => plan.conf.resolver(r._1, table)).map { case (_, d) =>
if (alwaysInline) {
d.child
} else {
// Add a `SubqueryAlias` for hint-resolving rules to match relation names.
SubqueryAlias(table, CTERelationRef(d.id, d.resolved, d.output, d.isStreaming))
}
}.getOrElse(u)
case other =>
// This cannot be done in ResolveSubquery because ResolveSubquery does not know the CTE.
other.transformExpressionsWithPruning(_.containsPattern(PLAN_EXPRESSION)) {
case e: SubqueryExpression =>
e.withNewPlan(apply(substituteCTE(e.plan, alwaysInline, cteRelations)))
}
}
}
/**
* For commands which extend `CTEInChildren`, we should place the `WithCTE` node on its
* children. There are two reasons:
* 1. Some rules will pattern match the root command nodes, and we should keep command
* as the root node to not break them.
* 2. `Dataset` eagerly executes the commands inside a query plan. For example,
* sql("WITH v ... CREATE TABLE t AS SELECT * FROM v") will create the table instead of just
* analyzing the command. However, the CTE references inside commands will be invalid if we
* execute the command alone, as the CTE definitions are outside of the command.
*/
private def withCTEDefs(p: LogicalPlan, cteDefs: Seq[CTERelationDef]): LogicalPlan = {
p match {
case c: CTEInChildren => c.withCTEDefs(cteDefs)
case _ => WithCTE(p, cteDefs)
}
}
}