New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-27123][SQL] Improve CollapseProject to handle projects cross limit/repartition/sample #24049
Conversation
@maropu . This is the one you requested before. Could you review this? |
Thanks, @dongjoon-hyun ! If this resolved, we can remove the patten match (https://github.com/apache/spark/pull/23964/files#diff-43334bab9616cc53e8797b9afa9fc7aaR46) in #23964 ? |
Yes, @maropu ! I'll rebase that PR after this is merged. |
nice! I'll review later. |
Yep. Thanks, @maropu . |
Test build #103298 has finished for PR 24049 at commit
|
retest this please |
Test build #103306 has finished for PR 24049 at commit
|
Test build #103334 has finished for PR 24049 at commit
|
Test build #103347 has finished for PR 24049 at commit
|
Ur, it's weird because it passed locally. I'll rebase this to the master. |
Test build #103354 has finished for PR 24049 at commit
|
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
Outdated
Show resolved
Hide resolved
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
Outdated
Show resolved
Hide resolved
Test build #103364 has finished for PR 24049 at commit
|
LGTM. Merged into master. Thanks. |
…imit/repartition/sample ## What changes were proposed in this pull request? `CollapseProject` optimizer rule simplifies some plans by merging the adjacent projects and performing alias substitutions. ```scala scala> sql("SELECT b c FROM (SELECT a b FROM t)").explain == Physical Plan == *(1) Project [a#5 AS c#1] +- Scan hive default.t [a#5], HiveTableRelation `default`.`t`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [a#5] ``` We can do that more complex cases like the following. This PR aims to handle adjacent projects across limit/repartition/sample. Here, repartition means `Repartition`, not `RepartitionByExpression`. **BEFORE** ```scala scala> sql("SELECT b c FROM (SELECT /*+ REPARTITION(1) */ a b FROM t)").explain == Physical Plan == *(2) Project [b#0 AS c#1] +- Exchange RoundRobinPartitioning(1) +- *(1) Project [a#5 AS b#0] +- Scan hive default.t [a#5], HiveTableRelation `default`.`t`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [a#5] ``` **AFTER** ```scala scala> sql("SELECT b c FROM (SELECT /*+ REPARTITION(1) */ a b FROM t)").explain == Physical Plan == Exchange RoundRobinPartitioning(1) +- *(1) Project [a#11 AS c#7] +- Scan hive default.t [a#11], HiveTableRelation `default`.`t`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [a#11] ``` ## How was this patch tested? Pass the Jenkins with the newly added and updated test cases. Closes #24049 from dongjoon-hyun/SPARK-27123. Authored-by: Dongjoon Hyun <dhyun@apple.com> Signed-off-by: DB Tsai <d_tsai@apple.com>
Sorry to be late; could you update the description of
Probably, it would be better to describe something about the new target this pr added. It seems |
Yep. Sure! |
This will cause the perf regression, right? |
@@ -699,6 +699,24 @@ object CollapseProject extends Rule[LogicalPlan] { | |||
agg.copy(aggregateExpressions = buildCleanedProjectList( | |||
p.projectList, agg.aggregateExpressions)) | |||
} | |||
case p1 @ Project(_, g @ GlobalLimit(_, l @ LocalLimit(_, p2: Project))) => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry to be late for the review. I have 2 concerns about this optimization:
- if
p2
outputs one column, andp1
outputs 1000 columns, then pushing downp1
through limit operator would increase the data size to be shuffled. - if
p1
has an expensive expression like UDF, pushingp1
through limit operator means the expensive expression will be executed a lot more times.
Do we have a general rule to justify the benefit of pushing down the project operator?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for review, @gatorsmile and @cloud-fan . I got it. I'll narrow down with if isRenaming(l1, l2) =>
for these cases, too.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe case 1 can be handled with isRenaming
, but I'm not sure how case 2 can be handled.
Case 2 actually means that we can't push down project through operators that will reduce the numRows, e.g. limit, sample, etc.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@cloud-fan . isRenaming
use semanticEquals
. Case 2 will be prevented. I'll make a PR soon.
Hi, @maropu , @gatorsmile , @cloud-fan . I'll make a followup very soon. Is there other concern for this PR? |
What changes were proposed in this pull request?
CollapseProject
optimizer rule simplifies some plans by merging the adjacent projects and performing alias substitutions.We can do that more complex cases like the following. This PR aims to handle adjacent projects across limit/repartition/sample. Here, repartition means
Repartition
, notRepartitionByExpression
.BEFORE
AFTER
How was this patch tested?
Pass the Jenkins with the newly added and updated test cases.