Skip to content

Commit

Permalink
[SPARK-32616][SQL] Window operators should be added determinedly
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?

Use the `LinkedHashMap` instead of `immutable.Map` to hold the `Window` expressions in `ExtractWindowExpressions.addWindow`.

### Why are the changes needed?

This is a bug fix for apache#29270. In that PR, the generated plan(especially for the queries q47, q49, q57) on Jenkins always can not match the golden plan generated on my laptop.

It happens because `ExtractWindowExpressions.addWindow` now uses `immutable.Map` to hold the `Window` expressions by the key `(spec.partitionSpec, spec.orderSpec, WindowFunctionType.functionType(expr))` and converts the map to `Seq` at the end. Then, the `Seq` is used to add Window operators on top of the child plan. However, for the same query, the order of Windows expression inside the `Seq` could be undetermined when the expression id changes(which can affect the key). As a result, the same query could have different plans because of the undetermined order of Window operators.

Therefore, we use `LinkedHashMap`, which records the insertion order of entries, to make the adding order determined.

### Does this PR introduce _any_ user-facing change?

Maybe yes, users now always see the same plan for the same queries with multiple Window operators.

### How was this patch tested?

It's really hard to make a reproduce demo. I just tested manually with apache#29270 and it looks good.

Closes apache#29432 from Ngone51/fix-addWindow.

Authored-by: yi.wu <yi.wu@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
  • Loading branch information
Ngone51 authored and cloud-fan committed Aug 14, 2020
1 parent 10edeaf commit c6be207
Showing 1 changed file with 11 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2552,6 +2552,8 @@ class Analyzer(
* [[Window]] operator and inserts it into the plan tree.
*/
object ExtractWindowExpressions extends Rule[LogicalPlan] {
type Spec = (Seq[Expression], Seq[SortOrder], WindowFunctionType)

private def hasWindowFunction(exprs: Seq[Expression]): Boolean =
exprs.exists(hasWindowFunction)

Expand Down Expand Up @@ -2696,8 +2698,11 @@ class Analyzer(
}.asInstanceOf[NamedExpression]
}

// SPARK-32616: Use a linked hash map to maintains the insertion order of the Window
// operators, so the query with multiple Window operators can have the determined plan.
val groupedWindowExpressions = mutable.LinkedHashMap.empty[Spec, ArrayBuffer[NamedExpression]]
// Second, we group extractedWindowExprBuffer based on their Partition and Order Specs.
val groupedWindowExpressions = extractedWindowExprBuffer.groupBy { expr =>
extractedWindowExprBuffer.foreach { expr =>
val distinctWindowSpec = expr.collect {
case window: WindowExpression => window.windowSpec
}.distinct
Expand All @@ -2713,9 +2718,12 @@ class Analyzer(
s"Please file a bug report with this error message, stack trace, and the query.")
} else {
val spec = distinctWindowSpec.head
(spec.partitionSpec, spec.orderSpec, WindowFunctionType.functionType(expr))
val specKey = (spec.partitionSpec, spec.orderSpec, WindowFunctionType.functionType(expr))
val windowExprs = groupedWindowExpressions
.getOrElseUpdate(specKey, new ArrayBuffer[NamedExpression])
windowExprs += expr
}
}.toSeq
}

// Third, we aggregate them by adding each Window operator for each Window Spec and then
// setting this to the child of the next Window operator.
Expand Down

0 comments on commit c6be207

Please sign in to comment.