-
Notifications
You must be signed in to change notification settings - Fork 28k
/
WindowExec.scala
119 lines (113 loc) · 6.1 KB
/
WindowExec.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.execution.window
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
/**
* This class calculates and outputs (windowed) aggregates over the rows in a single (sorted)
* partition. The aggregates are calculated for each row in the group. Special processing
* instructions, frames, are used to calculate these aggregates. Frames are processed in the order
* specified in the window specification (the ORDER BY ... clause). There are four different frame
* types:
* - Entire partition: The frame is the entire partition, i.e.
* UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING. For this case, window function will take all
* rows as inputs and be evaluated once.
* - Growing frame: We only add new rows into the frame, Examples are:
* 1. UNBOUNDED PRECEDING AND 1 PRECEDING
* 2. UNBOUNDED PRECEDING AND CURRENT ROW
* 3. UNBOUNDED PRECEDING AND 1 FOLLOWING
* Every time we move to a new row to process, we add some rows to the frame. We do not remove
* rows from this frame.
* - Shrinking frame: We only remove rows from the frame, Examples are:
* 1. 1 PRECEDING AND UNBOUNDED FOLLOWING
* 2. CURRENT ROW AND UNBOUNDED FOLLOWING
* 3. 1 FOLLOWING AND UNBOUNDED FOLLOWING
* Every time we move to a new row to process, we remove some rows from the frame. We do not add
* rows to this frame.
* - Moving frame: Every time we move to a new row to process, we remove some rows from the frame
* and we add some rows to the frame. Examples are:
* 1. 2 PRECEDING AND 1 PRECEDING
* 2. 1 PRECEDING AND CURRENT ROW
* 3. CURRENT ROW AND 1 FOLLOWING
* 4. 1 PRECEDING AND 1 FOLLOWING
* 5. 1 FOLLOWING AND 2 FOLLOWING
* - Offset frame: The frame consist of one row, which is an offset number of rows away from the
* current row. Only [[OffsetWindowFunction]]s can be processed in an offset frame. There are
* three implements of offset frame: [[FrameLessOffsetWindowFunctionFrame]],
* [[UnboundedOffsetWindowFunctionFrame]] and [[UnboundedPrecedingOffsetWindowFunctionFrame]].
*
* Different frame boundaries can be used in Growing, Shrinking and Moving frames. A frame
* boundary can be either Row or Range based:
* - Row Based: A row based boundary is based on the position of the row within the partition.
* An offset indicates the number of rows above or below the current row, the frame for the
* current row starts or ends. For instance, given a row based sliding frame with a lower bound
* offset of -1 and a upper bound offset of +2. The frame for row with index 5 would range from
* index 4 to index 7.
* - Range based: A range based boundary is based on the actual value of the ORDER BY
* expression(s). An offset is used to alter the value of the ORDER BY expression, for
* instance if the current order by expression has a value of 10 and the lower bound offset
* is -3, the resulting lower bound for the current row will be 10 - 3 = 7. This however puts a
* number of constraints on the ORDER BY expressions: there can be only one expression and this
* expression must have a numerical data type. An exception can be made when the offset is 0,
* because no value modification is needed, in this case multiple and non-numeric ORDER BY
* expression are allowed.
*
* This is quite an expensive operator because every row for a single group must be in the same
* partition and partitions must be sorted according to the grouping and sort order. The operator
* requires the planner to take care of the partitioning and sorting.
*
* The operator is semi-blocking. The window functions and aggregates are calculated one group at
* a time, the result will only be made available after the processing for the entire group has
* finished. The operator is able to process different frame configurations at the same time. This
* is done by delegating the actual frame processing (i.e. calculation of the window functions) to
* specialized classes, see [[WindowFunctionFrame]], which take care of their own frame type:
* Entire Partition, Sliding, Growing & Shrinking. Boundary evaluation is also delegated to a pair
* of specialized classes: [[RowBoundOrdering]] & [[RangeBoundOrdering]].
*/
case class WindowExec(
windowExpression: Seq[NamedExpression],
partitionSpec: Seq[Expression],
orderSpec: Seq[SortOrder],
child: SparkPlan)
extends WindowExecBase {
override lazy val metrics: Map[String, SQLMetric] = Map(
"spillSize" -> SQLMetrics.createSizeMetric(sparkContext, "spill size")
)
protected override def doExecute(): RDD[InternalRow] = {
val evaluatorFactory =
new WindowEvaluatorFactory(
windowExpression,
partitionSpec,
orderSpec,
child.output,
longMetric("spillSize"))
// Start processing.
if (conf.usePartitionEvaluator) {
child.execute().mapPartitionsWithEvaluator(evaluatorFactory)
} else {
child.execute().mapPartitionsWithIndex { (index, rowIterator) =>
val evaluator = evaluatorFactory.createEvaluator()
evaluator.eval(index, rowIterator)
}
}
}
override protected def withNewChildInternal(newChild: SparkPlan): WindowExec =
copy(child = newChild)
}