From a25394ad775e132f96754f2cb84be08c8b7014a6 Mon Sep 17 00:00:00 2001 From: ulysses-you Date: Fri, 5 Aug 2022 18:26:57 +0800 Subject: [PATCH] [SPARK-39867][SQL][3.1] Global limit should not inherit OrderPreservingUnaryNode backport branch-3.1 for https://github.com/apache/spark/pull/37284 ### What changes were proposed in this pull request? Make GlobalLimit inherit UnaryNode rather than OrderPreservingUnaryNode ### Why are the changes needed? Global limit can not promise the output ordering is same with child, it actually depend on the certain physical plan. For all physical plan with gobal limits: - CollectLimitExec: it does not promise output ordering - GlobalLimitExec: it required all tuples so it can assume the child is shuffle or child is single partition. Then it can use output ordering of child - TakeOrderedAndProjectExec: it do sort inside it's implementation This bug get worse since we pull out v1 write require ordering. ### Does this PR introduce _any_ user-facing change? yes, bug fix ### How was this patch tested? fix test and add test Closes #37398 from ulysses-you/SPARK-39867-3.1. Authored-by: ulysses-you Signed-off-by: Wenchen Fan --- .../plans/logical/basicLogicalOperators.scala | 7 ++++++- .../catalyst/optimizer/EliminateSortsSuite.scala | 16 +++++++++++----- 2 files changed, 17 insertions(+), 6 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala index 1d8d4e1952b73..58dcbfdad5d03 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala @@ -903,8 +903,13 @@ object Limit { * A global (coordinated) limit. This operator can emit at most `limitExpr` number in total. * * See [[Limit]] for more information. + * + * Note that, we can not make it inherit [[OrderPreservingUnaryNode]] due to the different strategy + * of physical plan. The output ordering of child will be broken if a shuffle exchange comes in + * between the child and global limit, due to the fact that shuffle reader fetches blocks in random + * order. */ -case class GlobalLimit(limitExpr: Expression, child: LogicalPlan) extends OrderPreservingUnaryNode { +case class GlobalLimit(limitExpr: Expression, child: LogicalPlan) extends UnaryNode { override def output: Seq[Attribute] = child.output override def maxRows: Option[Long] = { limitExpr match { diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/EliminateSortsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/EliminateSortsSuite.scala index ca7d386480977..d8a258e42088d 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/EliminateSortsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/EliminateSortsSuite.scala @@ -108,9 +108,10 @@ class EliminateSortsSuite extends AnalysisTest { test("SPARK-33183: remove redundant sort by") { val orderedPlan = testRelation.select('a, 'b).orderBy('a.asc, 'b.desc_nullsFirst) - val unnecessaryReordered = orderedPlan.limit(2).select('a).sortBy('a.asc, 'b.desc_nullsFirst) + val unnecessaryReordered = LocalLimit(2, orderedPlan).select('a) + .sortBy('a.asc, 'b.desc_nullsFirst) val optimized = Optimize.execute(unnecessaryReordered.analyze) - val correctAnswer = orderedPlan.limit(2).select('a).analyze + val correctAnswer = LocalLimit(2, orderedPlan).select('a).analyze comparePlans(optimized, correctAnswer) } @@ -154,11 +155,11 @@ class EliminateSortsSuite extends AnalysisTest { comparePlans(optimized, correctAnswer) } - test("SPARK-33183: limits should not affect order for local sort") { + test("SPARK-33183: local limits should not affect order for local sort") { val orderedPlan = testRelation.select('a, 'b).orderBy('a.asc, 'b.desc) - val filteredAndReordered = orderedPlan.limit(Literal(10)).sortBy('a.asc, 'b.desc) + val filteredAndReordered = LocalLimit(10, orderedPlan).sortBy('a.asc, 'b.desc) val optimized = Optimize.execute(filteredAndReordered.analyze) - val correctAnswer = orderedPlan.limit(Literal(10)).analyze + val correctAnswer = LocalLimit(10, orderedPlan).analyze comparePlans(optimized, correctAnswer) } @@ -435,4 +436,9 @@ class EliminateSortsSuite extends AnalysisTest { .sortBy($"c".asc).analyze comparePlans(Optimize.execute(plan3), expected3) } + + test("SPARK-39867: Global limit should not inherit OrderPreservingUnaryNode") { + val plan = testRelation.sortBy($"a".asc).limit(2).sortBy($"a".asc).analyze + comparePlans(Optimize.execute(plan), plan) + } }