Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-38533][SQL] DS V2 aggregate push-down supports project with alias #35823

Closed
wants to merge 13 commits into from

Conversation

beliefer
Copy link
Contributor

@beliefer beliefer commented Mar 12, 2022

What changes were proposed in this pull request?

Currently, Spark DS V2 aggregate push-down doesn't supports project with alias.

Refer

if filters.isEmpty && project.forall(_.isInstanceOf[AttributeReference]) =>

This PR let it works good with alias.

The first example:
the origin plan show below:

Aggregate [DEPT#0], [DEPT#0, sum(mySalary#8) AS total#14]
+- Project [DEPT#0, SALARY#2 AS mySalary#8]
   +- ScanBuilderHolder [DEPT#0, NAME#1, SALARY#2, BONUS#3], RelationV2[DEPT#0, NAME#1, SALARY#2, BONUS#3] test.employee, JDBCScanBuilder(org.apache.spark.sql.test.TestSparkSession@77978658,StructType(StructField(DEPT,IntegerType,true),StructField(NAME,StringType,true),StructField(SALARY,DecimalType(20,2),true),StructField(BONUS,DoubleType,true)),org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions@5f8da82)

If we can complete push down the aggregate, then the plan will be:

Project [DEPT#0, SUM(SALARY)#18 AS sum(SALARY#2)#13 AS total#14]
+- RelationV2[DEPT#0, SUM(SALARY)#18] test.employee

If we can partial push down the aggregate, then the plan will be:

Aggregate [DEPT#0], [DEPT#0, sum(cast(SUM(SALARY)#18 as decimal(20,2))) AS total#14]
+- RelationV2[DEPT#0, SUM(SALARY)#18] test.employee

The second example:
the origin plan show below:

Aggregate [myDept#33], [myDept#33, sum(mySalary#34) AS total#40]
+- Project [DEPT#25 AS myDept#33, SALARY#27 AS mySalary#34]
   +- ScanBuilderHolder [DEPT#25, NAME#26, SALARY#27, BONUS#28], RelationV2[DEPT#25, NAME#26, SALARY#27, BONUS#28] test.employee, JDBCScanBuilder(org.apache.spark.sql.test.TestSparkSession@25c4f621,StructType(StructField(DEPT,IntegerType,true),StructField(NAME,StringType,true),StructField(SALARY,DecimalType(20,2),true),StructField(BONUS,DoubleType,true)),org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions@345d641e)

If we can complete push down the aggregate, then the plan will be:

Project [DEPT#25 AS myDept#33, SUM(SALARY)#44 AS sum(SALARY#27)#39 AS total#40]
+- RelationV2[DEPT#25, SUM(SALARY)#44] test.employee

If we can partial push down the aggregate, then the plan will be:

Aggregate [DEPT#25], [DEPT#25 AS myDept#33, sum(cast(SUM(SALARY)#56 as decimal(20,2))) AS total#52]
+- RelationV2[DEPT#25, SUM(SALARY)#56] test.employee

Why are the changes needed?

Alias is more useful.

Does this PR introduce any user-facing change?

'Yes'.
Users could see DS V2 aggregate push-down supports project with alias.

How was this patch tested?

New tests.

@github-actions github-actions bot added the SQL label Mar 12, 2022
@codecov-commenter
Copy link

codecov-commenter commented Mar 12, 2022

Codecov Report

Merging #35823 (c91c2e9) into master (c483e29) will increase coverage by 0.00%.
The diff coverage is 90.62%.

❗ Current head c91c2e9 differs from pull request most recent head 787e0dd. Consider uploading reports for the commit 787e0dd to get more accurate results

Impacted file tree graph

@@           Coverage Diff           @@
##           master   #35823   +/-   ##
=======================================
  Coverage   91.19%   91.19%           
=======================================
  Files         297      297           
  Lines       64696    64724   +28     
  Branches     9919     9921    +2     
=======================================
+ Hits        58999    59025   +26     
- Misses       4330     4332    +2     
  Partials     1367     1367           
Flag Coverage Δ
unittests 91.17% <90.62%> (+<0.01%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

Impacted Files Coverage Δ
python/pyspark/sql/tests/test_udf.py 95.56% <ø> (ø)
python/pyspark/pandas/indexes/category.py 93.33% <66.66%> (-0.92%) ⬇️
python/pyspark/pandas/indexes/datetimes.py 95.23% <66.66%> (-0.52%) ⬇️
python/pyspark/pandas/indexes/timedelta.py 75.40% <66.66%> (-0.46%) ⬇️
python/pyspark/pandas/base.py 94.14% <100.00%> (+0.03%) ⬆️
python/pyspark/pandas/frame.py 97.06% <100.00%> (+<0.01%) ⬆️
python/pyspark/pandas/tests/test_dataframe.py 97.34% <100.00%> (+0.01%) ⬆️
python/pyspark/pandas/tests/test_series.py 96.24% <100.00%> (+<0.01%) ⬆️
...n/pyspark/mllib/tests/test_streaming_algorithms.py 76.34% <0.00%> (-0.36%) ⬇️
python/pyspark/streaming/tests/test_context.py 98.42% <0.00%> (ø)
... and 1 more

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update 216b972...787e0dd. Read the comment docs.

@@ -234,7 +257,7 @@ object V2ScanRelationPushDown extends Rule[LogicalPlan] with PredicateHelper {
// Aggregate [c2#10], [min(min(c1)#21) AS min(c1)#17, max(max(c1)#22) AS max(c1)#18]
// +- RelationV2[c2#10, min(c1)#21, max(c1)#22] ...
// scalastyle:on
plan.transformExpressions {
val agg = plan.transformExpressions {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: unnecessary change?

@huaxingao
Copy link
Contributor

I actually have an alias over aggregate test in FileSource too. Could you please change that one as well?
https://github.com/apache/spark/blob/master/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceAggregatePushDownSuite.scala#L187

@beliefer
Copy link
Contributor Author

I actually have an alias over aggregate test in FileSource too. Could you please change that one as well?
https://github.com/apache/spark/blob/master/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceAggregatePushDownSuite.scala#L187

Thank you for the remind.

@@ -779,15 +779,19 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel
checkAnswer(df, Seq(Row(1d), Row(1d), Row(null)))
}

test("scan with aggregate push-down: aggregate over alias NOT push down") {
test("scan with aggregate push-down: aggregate over alias push down") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi. Is it better to specify SPARK-38533 ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It doesn't matter.

@@ -1032,4 +1036,76 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel
|ON h2.test.view1.`|col1` = h2.test.view2.`|col1`""".stripMargin)
checkAnswer(df, Seq.empty[Row])
}

test("scan with aggregate push-down: complete push-down aggregate with alias") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

checkAnswer(df2, Seq(Row(1, 19000.00), Row(2, 22000.00), Row(6, 12000.00)))
}

test("scan with aggregate push-down: partial push-down aggregate with alias") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

checkAggregateRemoved(df)
df.queryExecution.optimizedPlan.collect {
case _: DataSourceV2ScanRelation =>
val expected_plan_fragment =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

checkAggregateRemoved(df, false)
df.queryExecution.optimizedPlan.collect {
case _: DataSourceV2ScanRelation =>
val expected_plan_fragment =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

checkAggregateRemoved(df2)
df2.queryExecution.optimizedPlan.collect {
case _: DataSourceV2ScanRelation =>
val expected_plan_fragment =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

checkAggregateRemoved(df2, false)
df2.queryExecution.optimizedPlan.collect {
case _: DataSourceV2ScanRelation =>
val expected_plan_fragment =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

val newGroupingExpressions = groupingExpressions.map { expr =>
expr.transform {
case r: AttributeReference if aliasAttrToOriginAttr.contains(r.canonicalized) =>
aliasAttrToOriginAttr(r.canonicalized)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These two lambda expressions behave the same, we can consider reusing a function

@beliefer
Copy link
Contributor Author

ping @huaxingao cc @cloud-fan

case ScanOperation(project, filters, sHolder: ScanBuilderHolder)
if filters.isEmpty && project.forall(_.isInstanceOf[AttributeReference]) =>
case ScanOperation(project, filters, sHolder: ScanBuilderHolder) if filters.isEmpty &&
project.forall(p => p.isInstanceOf[AttributeReference] || p.isInstanceOf[Alias]) =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please follow the predicate pushdown optimizer rule and leverage AliasHelper to do it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for the reminder.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if we follow it, then here should be project.forall(_.deterministic)

expr: NamedExpression,
aliasMap: AttributeMap[Alias]): NamedExpression = {
replaceAliasButKeepName(expr, aliasMap).transform {
case Alias(attr: Attribute, _) => attr
Copy link
Contributor

@cloud-fan cloud-fan Mar 15, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is this line needed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Attribute in groupingExpressions may be alias. Before push down SQL to JDBC, I want replace the alias Attribute with origin Attribute, not the Alias.

sHolder.builder match {
case r: SupportsPushDownAggregates =>
val aliasMap = getAliasMap(project)
val newResultExpressions = resultExpressions.map(replaceAliasWithAttr(_, aliasMap))
val newGroupingExpressions = groupingExpressions.asInstanceOf[Seq[NamedExpression]]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

groupingExpressions may not be NamedExpression

@@ -92,23 +91,27 @@ object V2ScanRelationPushDown extends Rule[LogicalPlan] with PredicateHelper {
// update the scan builder with agg pushdown and return a new plan with agg pushed
case aggNode @ Aggregate(groupingExpressions, resultExpressions, child) =>
child match {
case ScanOperation(project, filters, sHolder: ScanBuilderHolder)
if filters.isEmpty && project.forall(_.isInstanceOf[AttributeReference]) =>
case ScanOperation(project, filters, sHolder: ScanBuilderHolder) if filters.isEmpty &&
Copy link
Contributor

@cloud-fan cloud-fan Mar 15, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to clearly describe the final plan. This is more complicated now as the project may contain arbitrary expressions.

For example

Aggregate(sum(a + b) + max(a - c) + x, group by x,
  Project(x, x + 1 as a, x * 2 as b , x + y as c,
    Table(x, y, z)
  )
)

what the final plan looks like if the aggregate can be pushed, or can be partial pushed, or can't be pushed.

Copy link
Contributor Author

@beliefer beliefer Mar 16, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Aggregate [myDept#0], [((cast(sum(CheckOverflow((promote_precision(cast(mySalary#1 as decimal(23,2))) + promote_precision(cast(yourSalary#2 as decimal(23,2)))), DecimalType(23,2))) as double) + max((cast(mySalary#1 as double) - bonus#6))) + cast(myDept#0 as double)) AS ((sum((mySalary + yourSalary)) + max((mySalary - bonus))) + myDept)#9]
+- Project [dept#3 AS myDept#0, CheckOverflow((promote_precision(cast(salary#5 as decimal(21,2))) + 1.00), DecimalType(21,2)) AS mySalary#1, CheckOverflow((promote_precision(salary#5) * 2.00), DecimalType(22,2)) AS yourSalary#2, bonus#6]
   +- ScanBuilderHolder [DEPT#3, NAME#4, SALARY#5, BONUS#6], RelationV2[DEPT#3, NAME#4, SALARY#5, BONUS#6] test.employee, JDBCScanBuilder(org.apache.spark.sql.test.TestSparkSession@463a1f47,StructType(StructField(DEPT,IntegerType,true),StructField(NAME,StringType,true),StructField(SALARY,DecimalType(20,2),true),StructField(BONUS,DoubleType,true)),org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions@47224d5d)

cast, CheckOverflow, promote_precision not supported in aggregate push-down.
I updated description of PR and add a plan.

val translatedAggregates = DataSourceStrategy.translateAggregation(
normalizedAggregates, normalizedGroupingExpressions)
val (finalResultExpressions, finalAggregates, finalTranslatedAggregates) = {
val (selectedResultExpressions, selectedAggregates, selectedTranslatedAggregates) = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we rename these?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not confirmed yet.

val newGroupingExpressions = groupingExpressions.map {
case e: NamedExpression => replaceAliasWithAttr(e, aliasMap)
case other => other
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we make the code more explicit? We need to clearly show the steps

  1. collapse aggregate and project
  2. remove the alias from aggregate functions and group by expressions (this logic should be put here instead of AliasHelper as this is not a common logic)
  3. push down agg
  4. add back alias for group by expressions only.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for the good idea.

@beliefer
Copy link
Contributor Author

#35932 replaces this PR.

@beliefer beliefer closed this Mar 22, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
5 participants