Skip to content

[SPARK-40764][SQL] Extract partitioning from children's output expressions#38222

Closed
wangyum wants to merge 3 commits intoapache:masterfrom
wangyum:SPARK-40764
Closed

[SPARK-40764][SQL] Extract partitioning from children's output expressions#38222
wangyum wants to merge 3 commits intoapache:masterfrom
wangyum:SPARK-40764

Conversation

@wangyum
Copy link
Member

@wangyum wangyum commented Oct 12, 2022

What changes were proposed in this pull request?

This PR extract partitionings from children's output expressions:

  1. It first transform the current partitioning with non attribute aliases.
  2. Then transform the transformed partitioning with attribute aliases.
  3. Then normalize it.

For example:

CREATE TABLE t1(value string) using parquet;
CREATE TABLE t2(value string) using parquet;
set spark.sql.autoBroadcastJoinThreshold=-1;

SELECT upper(tmp.value),                  
       max(tmp.value)                     
FROM   (SELECT value,                     
               upper(value) AS upper_value
        FROM   t1) tmp                    
       JOIN t2                            
         ON tmp.upper_value = t2.value    
GROUP  BY upper(tmp.value);

Before this PR:

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- SortAggregate(key=[_groupingexpression#238], functions=[max(value#229)], output=[upper(value)#233, max(value)#234])
   +- Sort [_groupingexpression#238 ASC NULLS FIRST], false, 0
      +- Exchange hashpartitioning(_groupingexpression#238, 5), ENSURE_REQUIREMENTS, [plan_id=127]
         +- SortAggregate(key=[_groupingexpression#238], functions=[partial_max(value#229)], output=[_groupingexpression#238, max#240])
            +- Sort [_groupingexpression#238 ASC NULLS FIRST], false, 0
               +- Project [value#229, upper(value#229) AS _groupingexpression#238]
                  +- SortMergeJoin [upper_value#228], [value#230], Inner
                     :- Sort [upper_value#228 ASC NULLS FIRST], false, 0
                     :  +- Exchange hashpartitioning(upper_value#228, 5), ENSURE_REQUIREMENTS, [plan_id=117]
                     :     +- Project [value#229, upper(value#229) AS upper_value#228]
                     :        +- Filter isnotnull(upper(value#229))
                     :           +- FileScan parquet spark_catalog.default.t1[value#229]
                     +- Sort [value#230 ASC NULLS FIRST], false, 0
                        +- Exchange hashpartitioning(value#230, 5), ENSURE_REQUIREMENTS, [plan_id=118]
                           +- Filter isnotnull(value#230)
                              +- FileScan parquet spark_catalog.default.t2[value#230]

After this PR:

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- SortAggregate(key=[_groupingexpression#238], functions=[max(value#229)], output=[upper(value)#233, max(value)#234])
   +- SortAggregate(key=[_groupingexpression#238], functions=[partial_max(value#229)], output=[_groupingexpression#238, max#240])
      +- Project [value#229, upper(value#229) AS _groupingexpression#238]
         +- SortMergeJoin [upper_value#228], [value#230], Inner
            :- Sort [upper_value#228 ASC NULLS FIRST], false, 0
            :  +- Exchange hashpartitioning(upper_value#228, 5), ENSURE_REQUIREMENTS, [plan_id=117]
            :     +- Project [value#229, upper(value#229) AS upper_value#228]
            :        +- Filter isnotnull(upper(value#229))
            :           +- FileScan parquet spark_catalog.default.t1[value#229]
            +- Sort [value#230 ASC NULLS FIRST], false, 0
               +- Exchange hashpartitioning(value#230, 5), ENSURE_REQUIREMENTS, [plan_id=118]
                  +- Filter isnotnull(value#230)
                     +- FileScan parquet spark_catalog.default.t2[value#230]

Why are the changes needed?

Extract partitionings to reduce shuffle exchange.

Does this PR introduce any user-facing change?

No.

How was this patch tested?

Unit test and benchmark test.

SQL Before this PR(Seconds) After this PR(Seconds)
TPC-DS q51a 120  114

@github-actions github-actions bot added the SQL label Oct 12, 2022
@wangyum
Copy link
Member Author

wangyum commented Oct 12, 2022

cc @cloud-fan

@github-actions
Copy link

We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable.
If you'd like to revive this PR, please reopen it and ask a committer to remove the Stale tag!

@github-actions github-actions bot added the Stale label Jan 21, 2023
@github-actions github-actions bot closed this Jan 22, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant