/
AnalysisHelper.scala
332 lines (302 loc) · 13.9 KB
/
AnalysisHelper.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.catalyst.plans.logical
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, Expression}
import org.apache.spark.sql.catalyst.plans.QueryPlan
import org.apache.spark.sql.catalyst.rules.RuleId
import org.apache.spark.sql.catalyst.rules.UnknownRuleId
import org.apache.spark.sql.catalyst.trees.{AlwaysProcess, CurrentOrigin, TreePatternBits}
import org.apache.spark.util.Utils
/**
* [[AnalysisHelper]] defines some infrastructure for the query analyzer. In particular, in query
* analysis we don't want to repeatedly re-analyze sub-plans that have previously been analyzed.
*
* This trait defines a flag `analyzed` that can be set to true once analysis is done on the tree.
* This also provides a set of resolve methods that do not recurse down to sub-plans that have the
* analyzed flag set to true.
*
* The analyzer rules should use the various resolve methods, in lieu of the various transform
* methods defined in [[org.apache.spark.sql.catalyst.trees.TreeNode]] and [[QueryPlan]].
*
* To prevent accidental use of the transform methods, this trait also overrides the transform
* methods to throw exceptions in test mode, if they are used in the analyzer.
*/
trait AnalysisHelper extends QueryPlan[LogicalPlan] { self: LogicalPlan =>
private var _analyzed: Boolean = false
/**
* Recursively marks all nodes in this plan tree as analyzed.
* This should only be called by
* [[org.apache.spark.sql.catalyst.analysis.CheckAnalysis]].
*/
private[sql] def setAnalyzed(): Unit = {
if (!_analyzed) {
_analyzed = true
children.foreach(_.setAnalyzed())
}
}
/**
* Returns true if this node and its children have already been gone through analysis and
* verification. Note that this is only an optimization used to avoid analyzing trees that
* have already been analyzed, and can be reset by transformations.
*/
def analyzed: Boolean = _analyzed
/**
* Returns a copy of this node where `rule` has been recursively applied to the tree. When
* `rule` does not apply to a given node, it is left unchanged. This function is similar to
* `transform`, but skips sub-trees that have already been marked as analyzed.
* Users should not expect a specific directionality. If a specific directionality is needed,
* [[resolveOperatorsUp]] or [[resolveOperatorsDown]] should be used.
*
* @param rule the function used to transform this nodes children.
*/
def resolveOperators(rule: PartialFunction[LogicalPlan, LogicalPlan]): LogicalPlan = {
resolveOperatorsWithPruning(AlwaysProcess.fn, UnknownRuleId)(rule)
}
/**
* Returns a copy of this node where `rule` has been recursively applied to the tree. When
* `rule` does not apply to a given node, it is left unchanged. This function is similar to
* `transform`, but skips sub-trees that have already been marked as analyzed.
* Users should not expect a specific directionality. If a specific directionality is needed,
* [[resolveOperatorsUpWithPruning]] or [[resolveOperatorsDownWithPruning]] should be used.
*
* @param rule the function used to transform this nodes children.
* @param cond a Lambda expression to prune tree traversals. If `cond.apply` returns false
* on an operator T, skips processing T and its subtree; otherwise, processes
* T and its subtree recursively.
* @param ruleId is a unique Id for `rule` to prune unnecessary tree traversals. When it is
* UnknownRuleId, no pruning happens. Otherwise, if `rule` (with id `ruleId`)
* has been marked as in effective on an operator T, skips processing T and its
* subtree. Do not pass it if the rule is not purely functional and reads a
* varying initial state for different invocations.
*/
def resolveOperatorsWithPruning(cond: TreePatternBits => Boolean,
ruleId: RuleId = UnknownRuleId)(rule: PartialFunction[LogicalPlan, LogicalPlan])
: LogicalPlan = {
resolveOperatorsDownWithPruning(cond, ruleId)(rule)
}
/**
* Returns a copy of this node where `rule` has been recursively applied first to all of its
* children and then itself (post-order, bottom-up). When `rule` does not apply to a given node,
* it is left unchanged. This function is similar to `transformUp`, but skips sub-trees that
* have already been marked as analyzed.
*
* @param rule the function used to transform this nodes children.
*/
def resolveOperatorsUp(rule: PartialFunction[LogicalPlan, LogicalPlan]): LogicalPlan = {
resolveOperatorsUpWithPruning(AlwaysProcess.fn, UnknownRuleId)(rule)
}
/**
* Returns a copy of this node where `rule` has been recursively applied first to all of its
* children and then itself (post-order, bottom-up). When `rule` does not apply to a given node,
* it is left unchanged. This function is similar to `transformUp`, but skips sub-trees that
* have already been marked as analyzed.
*
* @param rule the function used to transform this nodes children.
* @param cond a Lambda expression to prune tree traversals. If `cond.apply` returns false
* on an operator T, skips processing T and its subtree; otherwise, processes
* T and its subtree recursively.
* @param ruleId is a unique Id for `rule` to prune unnecessary tree traversals. When it is
* UnknownRuleId, no pruning happens. Otherwise, if `rule` (with id `ruleId`)
* has been marked as in effective on an operator T, skips processing T and its
* subtree. Do not pass it if the rule is not purely functional and reads a
* varying initial state for different invocations.
*/
def resolveOperatorsUpWithPruning(cond: TreePatternBits => Boolean,
ruleId: RuleId = UnknownRuleId)(rule: PartialFunction[LogicalPlan, LogicalPlan])
: LogicalPlan = {
if (!analyzed && cond.apply(self) && !isRuleIneffective(ruleId)) {
AnalysisHelper.allowInvokingTransformsInAnalyzer {
val afterRuleOnChildren = mapChildren(_.resolveOperatorsUpWithPruning(cond, ruleId)(rule))
val afterRule = if (self fastEquals afterRuleOnChildren) {
CurrentOrigin.withOrigin(origin) {
rule.applyOrElse(self, identity[LogicalPlan])
}
} else {
CurrentOrigin.withOrigin(origin) {
rule.applyOrElse(afterRuleOnChildren, identity[LogicalPlan])
}
}
if (self eq afterRule) {
self.markRuleAsIneffective(ruleId)
self
} else {
afterRule
}
}
} else {
self
}
}
/** Similar to [[resolveOperatorsUp]], but does it top-down. */
def resolveOperatorsDown(rule: PartialFunction[LogicalPlan, LogicalPlan]): LogicalPlan = {
resolveOperatorsDownWithPruning(AlwaysProcess.fn, UnknownRuleId)(rule)
}
/** Similar to [[resolveOperatorsUpWithPruning]], but does it top-down. */
def resolveOperatorsDownWithPruning(cond: TreePatternBits => Boolean,
ruleId: RuleId = UnknownRuleId)(rule: PartialFunction[LogicalPlan, LogicalPlan])
: LogicalPlan = {
if (!analyzed && cond.apply(self) && !isRuleIneffective(ruleId)) {
AnalysisHelper.allowInvokingTransformsInAnalyzer {
val afterRule = CurrentOrigin.withOrigin(origin) {
rule.applyOrElse(self, identity[LogicalPlan])
}
// Check if unchanged and then possibly return old copy to avoid gc churn.
if (self fastEquals afterRule) {
val rewritten_plan = mapChildren(_.resolveOperatorsDownWithPruning(cond, ruleId)(rule))
if (self eq rewritten_plan) {
self.markRuleAsIneffective(ruleId)
self
} else {
rewritten_plan
}
} else {
afterRule.mapChildren(_.resolveOperatorsDownWithPruning(cond, ruleId)(rule))
}
}
} else {
self
}
}
/**
* A variant of `transformUpWithNewOutput`, which skips touching already analyzed plan.
*/
def resolveOperatorsUpWithNewOutput(
rule: PartialFunction[LogicalPlan, (LogicalPlan, Seq[(Attribute, Attribute)])])
: LogicalPlan = {
if (!analyzed) {
transformUpWithNewOutput(rule, skipCond = _.analyzed, canGetOutput = _.resolved)
} else {
self
}
}
override def transformUpWithNewOutput(
rule: PartialFunction[LogicalPlan, (LogicalPlan, Seq[(Attribute, Attribute)])],
skipCond: LogicalPlan => Boolean,
canGetOutput: LogicalPlan => Boolean): LogicalPlan = {
AnalysisHelper.allowInvokingTransformsInAnalyzer {
super.transformUpWithNewOutput(rule, skipCond, canGetOutput)
}
}
override def updateOuterReferencesInSubquery(plan: LogicalPlan, attrMap: AttributeMap[Attribute])
: LogicalPlan = {
AnalysisHelper.allowInvokingTransformsInAnalyzer {
super.updateOuterReferencesInSubquery(plan, attrMap)
}
}
/**
* Recursively transforms the expressions of a tree, skipping nodes that have already
* been analyzed.
*/
def resolveExpressions(r: PartialFunction[Expression, Expression]): LogicalPlan = {
resolveExpressionsWithPruning(AlwaysProcess.fn, UnknownRuleId)(r)
}
/**
* Recursively transforms the expressions of a tree, skipping nodes that have already
* been analyzed.
*
* @param rule the function used to transform this nodes children.
* @param cond a Lambda expression to prune tree traversals. If `cond.apply` returns false
* on a TreeNode T, skips processing T and its subtree; otherwise, processes
* T and its subtree recursively.
* @param ruleId is a unique Id for `rule` to prune unnecessary tree traversals. When it is
* UnknownRuleId, no pruning happens. Otherwise, if `rule` (with id `ruleId`)
* has been marked as in effective on a TreeNode T, skips processing T and its
* subtree. Do not pass it if the rule is not purely functional and reads a
* varying initial state for different invocations.
*/
def resolveExpressionsWithPruning(cond: TreePatternBits => Boolean,
ruleId: RuleId = UnknownRuleId)(rule: PartialFunction[Expression, Expression]): LogicalPlan = {
resolveOperatorsWithPruning(cond, ruleId) {
case p => p.transformExpressionsWithPruning(cond, ruleId)(rule)
}
}
protected def assertNotAnalysisRule(): Unit = {
if (Utils.isTesting &&
AnalysisHelper.inAnalyzer.get > 0 &&
AnalysisHelper.resolveOperatorDepth.get == 0) {
throw new RuntimeException("This method should not be called in the analyzer")
}
}
/**
* In analyzer, use [[resolveOperatorsDown()]] instead. If this is used in the analyzer,
* an exception will be thrown in test mode. It is however OK to call this function within
* the scope of a [[resolveOperatorsDown()]] call.
* @see [[org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning()]].
*/
override def transformDownWithPruning(cond: TreePatternBits => Boolean,
ruleId: RuleId = UnknownRuleId)(rule: PartialFunction[LogicalPlan, LogicalPlan])
: LogicalPlan = {
assertNotAnalysisRule()
super.transformDownWithPruning(cond, ruleId)(rule)
}
/**
* Use [[resolveOperators()]] in the analyzer.
*
* @see [[org.apache.spark.sql.catalyst.trees.TreeNode.transformUpWithPruning()]]
*/
override def transformUpWithPruning(cond: TreePatternBits => Boolean,
ruleId: RuleId = UnknownRuleId)(rule: PartialFunction[LogicalPlan, LogicalPlan])
: LogicalPlan = {
assertNotAnalysisRule()
super.transformUpWithPruning(cond, ruleId)(rule)
}
/**
* Use [[resolveExpressions()]] in the analyzer.
* @see [[QueryPlan.transformAllExpressionsWithPruning()]]
*/
override def transformAllExpressionsWithPruning(
cond: TreePatternBits => Boolean,
ruleId: RuleId = UnknownRuleId)(rule: PartialFunction[Expression, Expression])
: this.type = {
assertNotAnalysisRule()
super.transformAllExpressionsWithPruning(cond, ruleId)(rule)
}
override def clone(): LogicalPlan = {
val cloned = super.clone()
if (analyzed) cloned.setAnalyzed()
cloned
}
}
object AnalysisHelper {
/**
* A thread local to track whether we are in a resolveOperator call (for the purpose of analysis).
* This is an int because resolve* calls might be be nested (e.g. a rule might trigger another
* query compilation within the rule itself), so we are tracking the depth here.
*/
private val resolveOperatorDepth: ThreadLocal[Int] = new ThreadLocal[Int] {
override def initialValue(): Int = 0
}
/**
* A thread local to track whether we are in the analysis phase of query compilation. This is an
* int rather than a boolean in case our analyzer recursively calls itself.
*/
private val inAnalyzer: ThreadLocal[Int] = new ThreadLocal[Int] {
override def initialValue(): Int = 0
}
def allowInvokingTransformsInAnalyzer[T](f: => T): T = {
resolveOperatorDepth.set(resolveOperatorDepth.get + 1)
try f finally {
resolveOperatorDepth.set(resolveOperatorDepth.get - 1)
}
}
def markInAnalyzer[T](f: => T): T = {
inAnalyzer.set(inAnalyzer.get + 1)
try f finally {
inAnalyzer.set(inAnalyzer.get - 1)
}
}
}