Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-30276][SQL] Support Filter expression allows simultaneous use of DISTINCT #27428

Closed

Conversation

beliefer
Copy link
Contributor

@beliefer beliefer commented Feb 1, 2020

What changes were proposed in this pull request?

This PR is related to #26656.
#26656 only support use FILTER clause on aggregate expression without DISTINCT.
This PR will enhance this feature when one or more DISTINCT aggregate expressions which allows the use of the FILTER clause.
Such as:

select sum(distinct id) filter (where sex = 'man') from student;
select class_id, sum(distinct id) filter (where sex = 'man') from student group by class_id;
select count(id) filter (where class_id = 1), sum(distinct id) filter (where sex = 'man') from student;
select class_id, count(id) filter (where class_id = 1), sum(distinct id) filter (where sex = 'man') from student group by class_id;
select sum(distinct id), sum(distinct id) filter (where sex = 'man') from student;
select class_id, sum(distinct id), sum(distinct id) filter (where sex = 'man') from student group by class_id;
select class_id, count(id), count(id) filter (where class_id = 1), sum(distinct id), sum(distinct id) filter (where sex = 'man') from student group by class_id;

Note:
In #26656, we use AggregationIterator to treat the filter conditions of aggregate expr. This is good because we can evaluate filter in first aggregate locally.
If we use AggregationIterator too, the filter conditions of DISTINCT aggregate expr will be treated in second or thrid aggregate.
In order to reduce cost, we treat the filter conditions of DISTINCT aggregate expr in first aggregate or local is better.
So, this PR uses Expand to ensure the evaluation at local.

Why are the changes needed?

Spark SQL only support use FILTER clause on aggregate expression without DISTINCT.
This PR support Filter expression allows simultaneous use of DISTINCT

Does this PR introduce any user-facing change?

No

How was this patch tested?

Exists and new UT

@beliefer
Copy link
Contributor Author

beliefer commented Feb 1, 2020

@maropu @cloud-fan I'm sorry for open a new PR. Because I made some mistake operator.

@SparkQA
Copy link

SparkQA commented Feb 1, 2020

Test build #117718 has finished for PR 27428 at commit 6a32d83.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Feb 2, 2020

Test build #117724 has finished for PR 27428 at commit 5c38bbe.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Feb 2, 2020

Test build #117737 has finished for PR 27428 at commit a6498f9.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@beliefer
Copy link
Contributor Author

beliefer commented Feb 4, 2020

cc @cloud-fan

@SparkQA
Copy link

SparkQA commented Feb 7, 2020

Test build #118021 has finished for PR 27428 at commit c6caf73.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • case class AliasIdentifier(name: String, qualifier: Seq[String])
  • class LegacyDateFormatter(pattern: String, locale: Locale) extends DateFormatter
  • class LegacyTimestampFormatter(
  • case class PlanAdaptiveSubqueries(subqueryMap: Map[Long, SubqueryExec]) extends Rule[SparkPlan]

@beliefer
Copy link
Contributor Author

beliefer commented Feb 7, 2020

retest this please

@SparkQA
Copy link

SparkQA commented Feb 7, 2020

Test build #118027 has finished for PR 27428 at commit c6caf73.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • case class AliasIdentifier(name: String, qualifier: Seq[String])
  • class LegacyDateFormatter(pattern: String, locale: Locale) extends DateFormatter
  • class LegacyTimestampFormatter(
  • case class PlanAdaptiveSubqueries(subqueryMap: Map[Long, SubqueryExec]) extends Rule[SparkPlan]

@SparkQA
Copy link

SparkQA commented Feb 7, 2020

Test build #118033 has finished for PR 27428 at commit cd00f91.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HyukjinKwon
Copy link
Member

cc @hvanhovell

@beliefer
Copy link
Contributor Author

retest this please

@SparkQA
Copy link

SparkQA commented Apr 13, 2020

Test build #121200 has finished for PR 27428 at commit cd00f91.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@beliefer
Copy link
Contributor Author

retest this please

@SparkQA
Copy link

SparkQA commented Apr 13, 2020

Test build #121216 has finished for PR 27428 at commit cd00f91.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@beliefer
Copy link
Contributor Author

cc @cloud-fan

* Wraps this [[AggregateFunction]] in an [[AggregateExpression]] and sets `isDistinct`
* flag of the [[AggregateExpression]] to the given value because
* Wraps this [[AggregateFunction]] in an [[AggregateExpression]] with `isDistinct`
* flag and `filter` option of the [[AggregateExpression]] to the given value because
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

filter option looks weird, how about and an optional 'filter'?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK.

* [[AggregateExpression]] is the container of an [[AggregateFunction]], aggregation mode,
* and the flag indicating if this aggregation is distinct aggregation or not.
* the flag indicating if this aggregation is distinct aggregation or not and filter option.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto, and the optional 'filter'

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK

* ('key, '_gen_distinct_1, null, 1, null),
* ('key, null, '_gen_distinct_2, 2, null)]
* output = ['key, '_gen_distinct_1, '_gen_distinct_2, 'gid, 'value])
* Expand(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doesn't need to be an Expand: you just have one project list, and we can just use Project.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Then we can merge the Project with the above Expand

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan Good idea. I learned how to use Alias. Thanks.

@SparkQA
Copy link

SparkQA commented Jul 9, 2020

Test build #125433 has finished for PR 27428 at commit 16d8c1d.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@beliefer
Copy link
Contributor Author

beliefer commented Jul 9, 2020

retest this please

@SparkQA
Copy link

SparkQA commented Jul 9, 2020

Test build #125439 has finished for PR 27428 at commit 16d8c1d.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jul 9, 2020

Test build #125462 has started for PR 27428 at commit 762e839.

@SparkQA
Copy link

SparkQA commented Jul 9, 2020

Test build #125450 has finished for PR 27428 at commit 3c49156.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@beliefer
Copy link
Contributor Author

retest this please

@SparkQA
Copy link

SparkQA commented Jul 10, 2020

Test build #125539 has finished for PR 27428 at commit 762e839.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@beliefer
Copy link
Contributor Author

retest this please

@SparkQA
Copy link

SparkQA commented Jul 10, 2020

Test build #125564 has finished for PR 27428 at commit 762e839.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@beliefer
Copy link
Contributor Author

retest this please

@SparkQA
Copy link

SparkQA commented Jul 10, 2020

Test build #125576 has started for PR 27428 at commit 762e839.

@beliefer
Copy link
Contributor Author

retest this please

@SparkQA
Copy link

SparkQA commented Jul 10, 2020

Test build #125626 has finished for PR 27428 at commit 762e839.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

* LocalTableScan [...]
* }}}
*
* The rule does the following things here:
* Four example: single distinct aggregate function with filter clauses (in sql):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

single -> more than one?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK. How about at least two distinct aggregate function and one of them contains filter clauses (in sql) ?

* 1. Project the data. There are three aggregation groups in this query:
* i. the non-distinct group;
* ii. the distinct 'cat1 group;
* iii. the distinct 'cat2 group with filter clause.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doesn't match the group. Maybe just make it general the distinct group without filter clause and the distinct group with filter clause

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK

* in this query:
* i. the non-distinct 'cat1 group;
* ii. the distinct 'cat1 group;
* iii. the distinct 'cat1 group with filter clause.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't need to repeat these 3 groups.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They are different

* functions = [COUNT(DISTINCT 'cat1) with FILTER('id > 1),
* sum('value)]
* output = ['key, 'cat1_cnt, 'total])
* LocalTableScan [...]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to rewrite this query? The planner can handle single distinct agg func AFAIK.

Copy link
Contributor Author

@beliefer beliefer Jul 13, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can keep the previous behavior. AggregationIterator already done this.

@SparkQA
Copy link

SparkQA commented Jul 13, 2020

Test build #125756 has finished for PR 27428 at commit d531864.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jul 13, 2020

Test build #125761 has finished for PR 27428 at commit 5bbbfd7.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jul 14, 2020

Test build #125800 has finished for PR 27428 at commit 20ad143.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

* sum('_gen_attr_2)]
* output = ['key, 'cat1_cnt, 'total])
* Project(
* projectionList = ['key, if ('id > 1) 'cat1 else null, cast('value as bigint)]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this necessary? The query can work fine even if we don't add this Project in this rule, right?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This rule should be skipped if there is only one distinct. Having a filter or not shouldn't change it.

Copy link
Contributor Author

@beliefer beliefer Jul 14, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If not apply this rule, can't support the case that have only one distinct with filter clause.
For unification, the rules are used uniformly here

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mean to unify the implementations of the filter clause that are handled by this rule. This case is not handled by this rule before your PR. Sorry if I didn't make myself clear enough.

@beliefer beliefer closed this Jul 16, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
5 participants