Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-29343][SQL] Eliminate sorts without limit in the subquery of Join/Aggregation #26011

Closed
wants to merge 11 commits into from

Conversation

WangGuangxin
Copy link
Contributor

@WangGuangxin WangGuangxin commented Oct 3, 2019

What changes were proposed in this pull request?

This is somewhat a complement of #21853.
The Sort without Limit operator in Join subquery is useless, it's the same case in GroupBy when the aggregation function is order irrelevant, such as count, sum.
This PR try to remove this kind of Sort operator in SQL Optimizer.

Why are the changes needed?

For example, select count(1) from (select a from test1 order by a) is equal to select count(1) from (select a from test1).
'select * from (select a from test1 order by a) t1 join (select b from test2) t2 on t1.a = t2.b' is equal to select * from (select a from test1) t1 join (select b from test2) t2 on t1.a = t2.b.

Remove useless Sort operator can improve performance.

Does this PR introduce any user-facing change?

No

How was this patch tested?

Adding new UT RemoveSortInSubquerySuite.scala

@dongjoon-hyun dongjoon-hyun changed the title [SPARK-29343]Eliminate sorts without limit in the subquery of Join/Aggregation [SPARK-29343][SQL] Eliminate sorts without limit in the subquery of Join/Aggregation Oct 3, 2019
@dongjoon-hyun
Copy link
Member

ok to test

@SparkQA
Copy link

SparkQA commented Oct 3, 2019

Test build #111734 has finished for PR 26011 at commit 75b43f5.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Oct 3, 2019

Test build #111741 has finished for PR 26011 at commit e29b323.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • case class Average(child: Expression) extends DeclarativeAggregate with ImplicitCastInputTypes
  • case class Count(children: Seq[Expression]) extends DeclarativeAggregate with OrderIrrelevantAggs
  • case class Max(child: Expression) extends DeclarativeAggregate with OrderIrrelevantAggs
  • case class Min(child: Expression) extends DeclarativeAggregate with OrderIrrelevantAggs
  • trait OrderIrrelevantAggs extends AggregateFunction
  • case class Sum(child: Expression) extends DeclarativeAggregate with ImplicitCastInputTypes
  • case class AnyAgg(arg: Expression) extends UnevaluableBooleanAggBase(arg)

@SparkQA
Copy link

SparkQA commented Oct 4, 2019

Test build #111766 has finished for PR 26011 at commit 25a3bb8.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@WangGuangxin
Copy link
Contributor Author

retest this please

1 similar comment
@HyukjinKwon
Copy link
Member

retest this please

@SparkQA
Copy link

SparkQA commented Oct 4, 2019

Test build #111770 has finished for PR 26011 at commit 25a3bb8.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Oct 7, 2019

Test build #111828 has finished for PR 26011 at commit d21c683.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • case class AnyAgg(arg: Expression) extends UnevaluableBooleanAggBase(arg)

@SparkQA
Copy link

SparkQA commented Oct 13, 2019

Test build #111994 has finished for PR 26011 at commit d2328a0.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Oct 13, 2019

Test build #111997 has finished for PR 26011 at commit a9e9be9.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@WangGuangxin
Copy link
Contributor Author

@dongjoon-hyun @dilipbiswal @maropu @gatorsmile Could you please take a look at this PR when you have time?

*
* This rule try to remove this kind of [[Sort]] operator.
*/
object RemoveSortInSubquery extends Rule[LogicalPlan] with PredicateHelper {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should the existing RemoveRedundantSorts handle this as well ? The reason i ask is, i don't see any thing subquery specific in the new rule ?

Copy link
Contributor

@cloud-fan cloud-fan Oct 14, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 for merging the 2 rules. We can call the merged rule EliminateSorts, as not only redundant sorts are removed.

@dilipbiswal
Copy link
Contributor

dilipbiswal commented Oct 14, 2019

The idea looks reasonable to me. cc @cloud-fan

@maropu
Copy link
Member

maropu commented Oct 23, 2019

@WangGuangxin Are you there? Can you update the pr based on the reviews above?

* has nothing to do with the order of input data.
* For example, [[Sum]] is [[OrderIrrelevantAggs]] while [[First]] is not.
*/
trait OrderIrrelevantAggs extends AggregateFunction {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this trait approach doesn't work for ScalaUDAF. Is this expected?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it's not suitable for ScalaUDAF. In fact, I used OrderIrrelevantAggs as a mixin trait, and I only apply this trait to min, max and several other agg funciton, not on ScalaUDAF. Hope I understand you correctly.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe OrderIrrelevantAggs don't need to extends AggregateFunction.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If there is not too many agg functions for this optimization, can we list up them inside the rule like the others? e.g., https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasing.scala#L162

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, I think your suggestion is better.

@SparkQA
Copy link

SparkQA commented Oct 24, 2019

Test build #112579 has finished for PR 26011 at commit 425f76d.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@@ -967,12 +967,18 @@ object EliminateSorts extends Rule[LogicalPlan] {
* Removes redundant Sort operation. This can happen:
* 1) if the child is already sorted
* 2) if there is another Sort operator separated by 0...n Project/Filter operators
* 3) if the Sort operator is within Join and without Limit
* 4) if the Sort operator is within GroupBy and the aggregate function is order irrelevant
*/
object RemoveRedundantSorts extends Rule[LogicalPlan] {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we rename it to EliminateSorts? Some sorts are not redundant but we remove them as well according to the SQL semantic.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is another rule named EliminateSorts. Can we merge it together?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yea let's merge them.

*/
object RemoveRedundantSorts extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = plan transformDown {
case Sort(orders, true, child) if SortOrder.orderingSatisfies(child.outputOrdering, orders) =>
child
case s @ Sort(_, _, child) => s.copy(child = recursiveRemoveSort(child))
case j @ Join(originLeft, originRight, _, _, _) =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we make sure the join condition is deterministic?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

@SparkQA
Copy link

SparkQA commented Oct 26, 2019

Test build #112712 has finished for PR 26011 at commit 8082324.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@@ -953,26 +952,25 @@ object CombineFilters extends Rule[LogicalPlan] with PredicateHelper {
}

/**
* Removes no-op SortOrder from Sort
* Removes Sort operation. This can happen:
* 1) if the sort is noop
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this statement is a little ambiguous, so could you make it more precise?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated

* 1) if the sort is noop
* 2) if the child is already sorted
* 3) if there is another Sort operator separated by 0...n Project/Filter operators
* 4) if the Sort operator is within Join and without Limit
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

without limit? It seems the rule below does not check that condition?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, within Join -> within Join having deterministic conditions only?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

limit is restricted by canEliminateSort.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That stmt is still ambiguous... probably, I think the condition 4) is similar to 3? I mean That might be Join separated by 0...n Project/Filter operators only.

@@ -987,6 +985,24 @@ object RemoveRedundantSorts extends Rule[LogicalPlan] {
case f: Filter => f.condition.deterministic
case _ => false
}

def isOrderIrrelevantAggs(aggs: Seq[NamedExpression]): Boolean = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: private?

case j @ Join(originLeft, originRight, _, cond, _) if cond.forall(_.deterministic) =>
j.copy(left = recursiveRemoveSort(originLeft), right = recursiveRemoveSort(originRight))
case g @ Aggregate(_, aggs, originChild) if isOrderIrrelevantAggs(aggs) =>
g.copy(child = recursiveRemoveSort(originChild))
}

def recursiveRemoveSort(plan: LogicalPlan): LogicalPlan = plan match {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(This is not related to this pr though...) nit: private?

case _ => false
}

aggs.flatMap { e =>
Copy link
Member

@maropu maropu Oct 26, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If aggs only has a single PythonUDF, it seems this method returns true.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, I'll try to fix it

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for pointing out this. I've updated and add a UT for udf

@SparkQA
Copy link

SparkQA commented Nov 3, 2019

Test build #113160 has finished for PR 26011 at commit f2d9ec1.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

case ae: AggregateExpression => ae.aggregateFunction
}
}.forall(isOrderIrrelevantAggFunction)
def checkValidAggregateExpression(expr: Expression): Boolean = expr match {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: private?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We cannot make it private because this is a nested function within private isOrderIrrelevantAggs

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ur, this is an inner func. I see.

@maropu
Copy link
Member

maropu commented Nov 4, 2019

@dilipbiswal @cloud-fan Looks good overall and could you check this?

@SparkQA
Copy link

SparkQA commented Nov 4, 2019

Test build #113186 has finished for PR 26011 at commit 4eccd2a.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor

thanks, merging to master!

@cloud-fan cloud-fan closed this in 83c39d1 Nov 4, 2019
@maropu
Copy link
Member

maropu commented Nov 4, 2019

Thanks, @WangGuangxin and @cloud-fan !

case _: Min => true
case _: Max => true
case _: Count => true
case _: Average => true
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could still have a precision difference after eliminating the sort for the floating point data type. I am afraid some end users might prefer to adding a sort in these cases to ensure the results are consistent?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah that's a good point. AVG over floating values is order sensitive. Not sure if this can really affect queries in practice, but better to be conservative here. @WangGuangxin can you fix it in a followup?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, I'll fix it in a followup

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@WangGuangxin do you have no time for the follow-up now? Could I take this over?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same will be true of all central moments, BTW

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I revisit this and have a small question. In fact Avg is transformed to Sum and Count, so I think there should be no precision problem?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sum is the issue itself, actually; see the followup PR

dongjoon-hyun pushed a commit that referenced this pull request Nov 16, 2019
…alMomentAgg from order-insensitive aggregates

### What changes were proposed in this pull request?

This pr is to remove floating-point `Sum/Average/CentralMomentAgg` from order-insensitive aggregates in `EliminateSorts`.

This pr comes from the gatorsmile suggestion: #26011 (comment)

### Why are the changes needed?

Bug fix.

### Does this PR introduce any user-facing change?

No.

### How was this patch tested?

Added tests in `SubquerySuite`.

Closes #26534 from maropu/SPARK-29343-FOLLOWUP.

Authored-by: Takeshi Yamamuro <yamamuro@apache.org>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
9 participants