Skip to content

Commit

Permalink
[SPARK-39867][SQL][3.1] Global limit should not inherit OrderPreservi…
Browse files Browse the repository at this point in the history
…ngUnaryNode

backport branch-3.1 for #37284

### What changes were proposed in this pull request?

Make GlobalLimit inherit UnaryNode rather than OrderPreservingUnaryNode

### Why are the changes needed?

Global limit can not promise the output ordering is same with child, it actually depend on the certain physical plan.

For all physical plan with gobal limits:
- CollectLimitExec: it does not promise output ordering
- GlobalLimitExec: it required all tuples so it can assume the child is shuffle or child is single partition. Then it can use output ordering of child
- TakeOrderedAndProjectExec: it do sort inside it's implementation

This bug get worse since we pull out v1 write require ordering.

### Does this PR introduce _any_ user-facing change?

yes, bug fix

### How was this patch tested?

fix test and add test

Closes #37398 from ulysses-you/SPARK-39867-3.1.

Authored-by: ulysses-you <ulyssesyou18@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
  • Loading branch information
ulysses-you authored and cloud-fan committed Aug 5, 2022
1 parent 08c650d commit a25394a
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -903,8 +903,13 @@ object Limit {
* A global (coordinated) limit. This operator can emit at most `limitExpr` number in total.
*
* See [[Limit]] for more information.
*
* Note that, we can not make it inherit [[OrderPreservingUnaryNode]] due to the different strategy
* of physical plan. The output ordering of child will be broken if a shuffle exchange comes in
* between the child and global limit, due to the fact that shuffle reader fetches blocks in random
* order.
*/
case class GlobalLimit(limitExpr: Expression, child: LogicalPlan) extends OrderPreservingUnaryNode {
case class GlobalLimit(limitExpr: Expression, child: LogicalPlan) extends UnaryNode {
override def output: Seq[Attribute] = child.output
override def maxRows: Option[Long] = {
limitExpr match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,9 +108,10 @@ class EliminateSortsSuite extends AnalysisTest {

test("SPARK-33183: remove redundant sort by") {
val orderedPlan = testRelation.select('a, 'b).orderBy('a.asc, 'b.desc_nullsFirst)
val unnecessaryReordered = orderedPlan.limit(2).select('a).sortBy('a.asc, 'b.desc_nullsFirst)
val unnecessaryReordered = LocalLimit(2, orderedPlan).select('a)
.sortBy('a.asc, 'b.desc_nullsFirst)
val optimized = Optimize.execute(unnecessaryReordered.analyze)
val correctAnswer = orderedPlan.limit(2).select('a).analyze
val correctAnswer = LocalLimit(2, orderedPlan).select('a).analyze
comparePlans(optimized, correctAnswer)
}

Expand Down Expand Up @@ -154,11 +155,11 @@ class EliminateSortsSuite extends AnalysisTest {
comparePlans(optimized, correctAnswer)
}

test("SPARK-33183: limits should not affect order for local sort") {
test("SPARK-33183: local limits should not affect order for local sort") {
val orderedPlan = testRelation.select('a, 'b).orderBy('a.asc, 'b.desc)
val filteredAndReordered = orderedPlan.limit(Literal(10)).sortBy('a.asc, 'b.desc)
val filteredAndReordered = LocalLimit(10, orderedPlan).sortBy('a.asc, 'b.desc)
val optimized = Optimize.execute(filteredAndReordered.analyze)
val correctAnswer = orderedPlan.limit(Literal(10)).analyze
val correctAnswer = LocalLimit(10, orderedPlan).analyze
comparePlans(optimized, correctAnswer)
}

Expand Down Expand Up @@ -435,4 +436,9 @@ class EliminateSortsSuite extends AnalysisTest {
.sortBy($"c".asc).analyze
comparePlans(Optimize.execute(plan3), expected3)
}

test("SPARK-39867: Global limit should not inherit OrderPreservingUnaryNode") {
val plan = testRelation.sortBy($"a".asc).limit(2).sortBy($"a".asc).analyze
comparePlans(Optimize.execute(plan), plan)
}
}

0 comments on commit a25394a

Please sign in to comment.