diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index fdadcccea29ee..0e81f48fc7ebb 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -2552,6 +2552,8 @@ class Analyzer( * [[Window]] operator and inserts it into the plan tree. */ object ExtractWindowExpressions extends Rule[LogicalPlan] { + type Spec = (Seq[Expression], Seq[SortOrder], WindowFunctionType) + private def hasWindowFunction(exprs: Seq[Expression]): Boolean = exprs.exists(hasWindowFunction) @@ -2696,8 +2698,11 @@ class Analyzer( }.asInstanceOf[NamedExpression] } + // SPARK-32616: Use a linked hash map to maintains the insertion order of the Window + // operators, so the query with multiple Window operators can have the determined plan. + val groupedWindowExpressions = mutable.LinkedHashMap.empty[Spec, ArrayBuffer[NamedExpression]] // Second, we group extractedWindowExprBuffer based on their Partition and Order Specs. - val groupedWindowExpressions = extractedWindowExprBuffer.groupBy { expr => + extractedWindowExprBuffer.foreach { expr => val distinctWindowSpec = expr.collect { case window: WindowExpression => window.windowSpec }.distinct @@ -2713,9 +2718,12 @@ class Analyzer( s"Please file a bug report with this error message, stack trace, and the query.") } else { val spec = distinctWindowSpec.head - (spec.partitionSpec, spec.orderSpec, WindowFunctionType.functionType(expr)) + val specKey = (spec.partitionSpec, spec.orderSpec, WindowFunctionType.functionType(expr)) + val windowExprs = groupedWindowExpressions + .getOrElseUpdate(specKey, new ArrayBuffer[NamedExpression]) + windowExprs += expr } - }.toSeq + } // Third, we aggregate them by adding each Window operator for each Window Spec and then // setting this to the child of the next Window operator.