From c6be2074cc704ec1578eb05d1d8a696227cd2bbf Mon Sep 17 00:00:00 2001 From: "yi.wu" Date: Fri, 14 Aug 2020 13:29:48 +0000 Subject: [PATCH] [SPARK-32616][SQL] Window operators should be added determinedly ### What changes were proposed in this pull request? Use the `LinkedHashMap` instead of `immutable.Map` to hold the `Window` expressions in `ExtractWindowExpressions.addWindow`. ### Why are the changes needed? This is a bug fix for https://github.com/apache/spark/pull/29270. In that PR, the generated plan(especially for the queries q47, q49, q57) on Jenkins always can not match the golden plan generated on my laptop. It happens because `ExtractWindowExpressions.addWindow` now uses `immutable.Map` to hold the `Window` expressions by the key `(spec.partitionSpec, spec.orderSpec, WindowFunctionType.functionType(expr))` and converts the map to `Seq` at the end. Then, the `Seq` is used to add Window operators on top of the child plan. However, for the same query, the order of Windows expression inside the `Seq` could be undetermined when the expression id changes(which can affect the key). As a result, the same query could have different plans because of the undetermined order of Window operators. Therefore, we use `LinkedHashMap`, which records the insertion order of entries, to make the adding order determined. ### Does this PR introduce _any_ user-facing change? Maybe yes, users now always see the same plan for the same queries with multiple Window operators. ### How was this patch tested? It's really hard to make a reproduce demo. I just tested manually with https://github.com/apache/spark/pull/29270 and it looks good. Closes #29432 from Ngone51/fix-addWindow. Authored-by: yi.wu Signed-off-by: Wenchen Fan --- .../spark/sql/catalyst/analysis/Analyzer.scala | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index fdadcccea29ee..0e81f48fc7ebb 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -2552,6 +2552,8 @@ class Analyzer( * [[Window]] operator and inserts it into the plan tree. */ object ExtractWindowExpressions extends Rule[LogicalPlan] { + type Spec = (Seq[Expression], Seq[SortOrder], WindowFunctionType) + private def hasWindowFunction(exprs: Seq[Expression]): Boolean = exprs.exists(hasWindowFunction) @@ -2696,8 +2698,11 @@ class Analyzer( }.asInstanceOf[NamedExpression] } + // SPARK-32616: Use a linked hash map to maintains the insertion order of the Window + // operators, so the query with multiple Window operators can have the determined plan. + val groupedWindowExpressions = mutable.LinkedHashMap.empty[Spec, ArrayBuffer[NamedExpression]] // Second, we group extractedWindowExprBuffer based on their Partition and Order Specs. - val groupedWindowExpressions = extractedWindowExprBuffer.groupBy { expr => + extractedWindowExprBuffer.foreach { expr => val distinctWindowSpec = expr.collect { case window: WindowExpression => window.windowSpec }.distinct @@ -2713,9 +2718,12 @@ class Analyzer( s"Please file a bug report with this error message, stack trace, and the query.") } else { val spec = distinctWindowSpec.head - (spec.partitionSpec, spec.orderSpec, WindowFunctionType.functionType(expr)) + val specKey = (spec.partitionSpec, spec.orderSpec, WindowFunctionType.functionType(expr)) + val windowExprs = groupedWindowExpressions + .getOrElseUpdate(specKey, new ArrayBuffer[NamedExpression]) + windowExprs += expr } - }.toSeq + } // Third, we aggregate them by adding each Window operator for each Window Spec and then // setting this to the child of the next Window operator.