Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-32360][SQL] Add MaxMinBy to support eliminate sorts #29142

Closed
wants to merge 4 commits into from

Conversation

ulysses-you
Copy link
Contributor

@ulysses-you ulysses-you commented Jul 17, 2020

What changes were proposed in this pull request?

Add MaxMinBy aggregate function and make these case support eliminate sorts.

Why are the changes needed?

Make EliminateSorts match more case.

Does this PR introduce any user-facing change?

Yes, if match case user will see the different execution plan.

create table t1(c1 int, c2 int) using parquet;
select max_by(c1, c2) from (
select * from t1 order by c1
);

before

Aggregate [max_by(c1#29, c2#30) AS max_by(c1, c2)#67]
+- Sort [c1#29 ASC NULLS FIRST], true
   +- Relation[c1#29,c2#30] parquet

after

Aggregate [max_by(c1#24, c2#25) AS max_by(c1, c2)#56]
+- Relation[c1#24,c2#25] parquet

How was this patch tested?

manual test.

@SparkQA
Copy link

SparkQA commented Jul 17, 2020

Test build #126034 has finished for PR 29142 at commit e544ca3.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@ulysses-you
Copy link
Contributor Author

cc @maropu

@@ -1004,7 +1004,7 @@ object EliminateSorts extends Rule[LogicalPlan] {

private def isOrderIrrelevantAggs(aggs: Seq[NamedExpression]): Boolean = {
def isOrderIrrelevantAggFunction(func: AggregateFunction): Boolean = func match {
case _: Min | _: Max | _: Count => true
case _: Min | _: Max | _: Count | _: CountIf | _: MaxMinBy => true
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

count_if has replaced with count and if, so needn't add it.

@SparkQA
Copy link

SparkQA commented Jul 18, 2020

Test build #126101 has finished for PR 29142 at commit bc454b6.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@dongjoon-hyun
Copy link
Member

Retest this please.

Copy link
Member

@dongjoon-hyun dongjoon-hyun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Apache Spark community really appreciates your continuously contributions. Here are some advices for your contributions.

  • Your contribution is good. However, please be aware of the non-code Apache Spark community policy. For example, release cycle and versioning. For example, SPARK-29343 is already released at 3.0.0 and your improvement patch will be applied only at 3.1.0. You need to use a new JIRA ID.
  • You had better choose the most specific PR title. For example, this PR only add MaxMinBy. If then, just say Add MaxMinBy instead of Add more aggregate function.
  • Please provide a test coverage when you add a new code path. More specifically, we need a test case which fails at master branch and succeeds at your PR. The test coverage is very crucial to protect your contribution from accidental removal at future releases.

The above is a general guideline for you. We want to help you grow in the Apache Spark community and to go with us further. In addition, the above guideline will help you work in another Apache community, too. Thank you always, @ulysses-you .

@SparkQA
Copy link

SparkQA commented Jul 19, 2020

Test build #126135 has finished for PR 29142 at commit bc454b6.

  • This patch fails PySpark pip packaging tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@ulysses-you
Copy link
Contributor Author

Thank you @dongjoon-hyun , your advice is really helpful to me. I will follow up the policy that you said and keep contribute continuously.

Thanks again!

@ulysses-you ulysses-you changed the title [SPARK-29343][SQL][FOLLOW-UP] Add more aggregate function to support eliminate sorts. [SPARK-32360][SQL] Add MaxMinBy to support eliminate sorts Jul 20, 2020
@SparkQA
Copy link

SparkQA commented Jul 20, 2020

Test build #126137 has finished for PR 29142 at commit 8aee8a5.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@dongjoon-hyun
Copy link
Member

Thank you for your update. Could you fix the UT failure? The newly add UT is failing now.

[info] - SPARK-32360: Add MaxMinBy to support eliminate sorts *** FAILED *** (7 milliseconds)
[info]   org.apache.spark.sql.AnalysisException: expression '`b`' is neither present in the group by, nor is it an aggregate function. Add to group by or wrap in first() (or first_value) if you don't care which value you get.;

@@ -1004,7 +1004,7 @@ object EliminateSorts extends Rule[LogicalPlan] {

private def isOrderIrrelevantAggs(aggs: Seq[NamedExpression]): Boolean = {
def isOrderIrrelevantAggFunction(func: AggregateFunction): Boolean = func match {
case _: Min | _: Max | _: Count => true
case _: Min | _: Max | _: Count | _: MaxMinBy => true
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adding this itself looks fine to me.

@SparkQA
Copy link

SparkQA commented Jul 20, 2020

Test build #126145 has finished for PR 29142 at commit cd93b70.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Member

@dongjoon-hyun dongjoon-hyun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi, @ulysses-you and @maropu . Unfortunately, MaxBy and MinBy is order-sensitive. I'm -1 for this optimization.

scala> sql("SELECT max_by(x, y) FROM (SELECT * FROM VALUES (('a', 50)), (('b', 50)), (('c', 50)) AS tab(x, y) ORDER BY x)").show
+------------+
|max_by(x, y)|
+------------+
|           c|
+------------+


scala> sql("SELECT max_by(x, y) FROM (SELECT * FROM VALUES (('a', 50)), (('b', 50)), (('c', 50)) AS tab(x, y) ORDER BY x DESC)").show
+------------+
|max_by(x, y)|
+------------+
|           a|
+------------+

@dongjoon-hyun
Copy link
Member

dongjoon-hyun commented Jul 20, 2020

Could you close this PR please, @ulysses-you ? Or, could you let me know what I missed in your proposal?

@ulysses-you
Copy link
Contributor Author

Thanks for the negative case, seems I missed something.

@dongjoon-hyun
Copy link
Member

dongjoon-hyun commented Jul 21, 2020

Thank you for closing, @ulysses-you .

@dongjoon-hyun
Copy link
Member

cc @gatorsmile

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
4 participants