New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-35455][SQL] Unify empty relation optimization between normal and AQE optimizer #32602
Conversation
@@ -59,19 +66,33 @@ object EliminateUnnecessaryJoin extends Rule[LogicalPlan] { | |||
case Some(count) => hasRow == (count > 0) | |||
case _ => false | |||
} | |||
|
|||
case LocalRelation(_, data, isStreaming) if !isStreaming => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if we want to handle LocalRelation
, then it's not AQE specific and we can do it in the normal optimizer?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, but currently we have no chance to do normal optimizer at AQE side. Maybe we can let some rules which in Optimizer
also available at AQEOptimizer
in future ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since AQE is on by default, it's not a big issue but more about code cleanness. How about this:
EliminateUnnecessaryJoin
should only deal withLocalRelation
, and appears in both the normal optimizer and AQE optimizer- AQE optimizer adds a new rule to turn empty query stage into empty
LocalRelation
- AQE optimizer adds a new rule to deal with non empty query stage for left semi/anti joins
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sgtm, and just find a exists rule PropagateEmptyRelation
.
How about this updating ?
- Add a rule
ConvertToLocalRelation
to turn empty query stage into empty LocalRelation - Make
PropagateEmptyRelation
appears in AQE optimizer - Reduce
EliminateUnnecessaryJoin
to only handle naaj/semi/anti joins
Kubernetes integration test starting |
Kubernetes integration test status failure |
Kubernetes integration test starting |
Test build #138730 has finished for PR 32602 at commit
|
Kubernetes integration test status success |
*/ | ||
object ConvertToLocalRelation extends Rule[LogicalPlan] { | ||
override def apply(plan: LogicalPlan): LogicalPlan = plan transform { | ||
case l @ LogicalQueryStage(_, stage: QueryStageExec) if stage.resultOption.get().isDefined && |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry I made the wrong decision. This may change the output partitioning and is not always safe/beneficial (we may add extra shuffles in the planning phase later).
Looking at the optimizations for empty local relations, some of them are likely beneficial and we can always do: eliminate join, aggregate, limit, repartition, sort, generate
Some may not that beneficial and we shouldn't do: simplify union, eliminate project/filter/sample.
My new idea:
- Create a new rule
PropagateEmptyRelationBasic
, which deals with local relaion only, and is not very beneficial (eliminate project, filter, etc), and runs in the normal optimizer only - Create a new rule
PropagateEmptyRelationAdvanced
, which deals with both local relation and query stage, and is very beneficial (eliminate join, aggregate, etc.), and runs in both normal and AQE optimizer
The old EliminateUnnecessaryJoin
and PropagateEmptyRelation
rules should be removed and merged into the new rules.
PropagateEmptyRelationAdvanced
may not be able to access QueryStageExec which is in sql/core. We can let this rule take checkRowCount
functiton as a parameter.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see the issue. If we worry about the LocalRelation
output partitioning, we can just mark LocalRelationExec
output partitioning as SinglePartition
to avoid extra shuffle. But it doesn't work with optimization like non empty left semi/anti elimintaion and multi-join case.
we can always do: eliminate join, aggregate
Not sure we can always do this if we don't want to introduce extra shuffle. And the issue it has already existed in current EliminateUnnecessaryJoin
, like this plan
Aggregate (same key with join)
Join Inner
LocalRelation
xxx
An another idea, if we plan to support extra shuffle later and don't expect introduce shuffle at AQE optimzier side, then is it better to check the physical plan requiredChildDistribution
? We can only allow one node which has a valid requiredChildDistribution
(not UnspecifiedDistribution
) in one query stage, and skip optimize if one query stage has two or more valid requiredChildDistribution
nodes. Thus we can run PropagateEmptyRelation
safely.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we turn a broadcast stage into local relation without changing other plan parts, seems we will broadcast the local relation again. SinglePartition
can't help here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If it's hard to avoid introduce shuffle at AQE optimizer, how about add extra shuffle check between AQE optimizer and stage preparation ? Then it won't affect the extra shuffle in stage preparation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can do that, but it seems not worth the complexity. It's not very helpful to turn query stage to local relation if we can't use it to eliminate expensive operators like join, agg, sort, etc.
I think my proposal is simpler and effective enough. The EliminateUnnecessaryJoin
today does not consider extra shuffles either and blindly eliminate joins (so as the query stages) if possible
Test build #138736 has finished for PR 32602 at commit
|
* - Aggregate with all empty children and at least one grouping expression. | ||
* - Generate(Explode) with all empty children. Others like Hive UDTF may return results. | ||
* | ||
* @param checkRowCount At AQE side, we use the query stage stats to check the check. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
At AQE side, we use this function to check if a plan has output rows or not
plan.output.map{ a => Alias(cast(Literal(null), a.dataType), a.name)(a.exprId) } | ||
|
||
// We can not use transformUpWithPruning here since this rule is used by both normal Optimizer | ||
// and AQE Optimizer. And this may only effective at AQE side. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ah good point. I think there is a way to overcome it:
- Create an abstract class
PropagateEmptyRelationBase
that contains util functions and optimizes expensive operators such as join, aggregate, etc. - Create a rule
PropagateEmptyRelation extends PropagateEmptyRelationBase
that additionally optimzes project, filter, etc. - Create a rule
AQEPropagateEmptyRelation extends PropagateEmptyRelationBase
that overrides some util functions likeisEmptyPlan
.
Then these two rules can define their transformation prunning separatedly.
Kubernetes integration test starting |
Kubernetes integration test status success |
} | ||
|
||
// TODO we need use transformUpWithPruning instead of transformUp | ||
def apply(plan: LogicalPlan): LogicalPlan = plan.transformUp { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@cloud-fan , if we want to use transformUpWithPruning
at AQE optimizer side, we need to some more work like add pattern at LogicalQueryStage
. So this PR does not do the change, just use transformUp
. Do you think it's OK ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yea it's OK as it's not a regression. cc @sigmod
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@cloud-fan seems we don't need to use transformUpWithPruning
here since the AQE Optimizer always run once rather than fixed point ?
Kubernetes integration test starting |
Test build #138793 has finished for PR 32602 at commit
|
Kubernetes integration test status success |
Test build #138803 has finished for PR 32602 at commit
|
@@ -27,7 +28,9 @@ import org.apache.spark.util.Utils | |||
*/ | |||
class AQEOptimizer(conf: SQLConf) extends RuleExecutor[LogicalPlan] { | |||
private val defaultBatches = Seq( | |||
Batch("Eliminate Unnecessary Join", Once, EliminateUnnecessaryJoin), | |||
Batch("Propagate Empty LocalRelation", Once, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LocalRelation
-> Relations
?
withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true", | ||
SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") { | ||
Seq( | ||
// left semi join and empty left side |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can't optimize this before this PR?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah, we cann't. Before we only check right side with LeftSemi/LeftAnti
.
And the test should use different column to do filter and join in case of InferFiltersFromConstraints
make right side empty. Updated it.
Kubernetes integration test status success |
Kubernetes integration test starting |
Test build #138867 has finished for PR 32602 at commit
|
Kubernetes integration test status success |
@@ -1307,6 +1308,69 @@ class AdaptiveQueryExecSuite | |||
} | |||
} | |||
|
|||
test("SPARK-35455: Enhance EliminateUnnecessaryJoin - single join") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let's update the test name and PR title: Unify empty relation optimization between normal and AQE optimizer
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated it and also updated the PR title.
} | ||
} | ||
|
||
test("SPARK-35455: Enhance EliminateUnnecessaryJoin - multi join") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
Test build #138870 has finished for PR 32602 at commit
|
Kubernetes integration test starting |
Kubernetes integration test status success |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
Test build #138899 has finished for PR 32602 at commit
|
thanks, merging to master! |
@cloud-fan thank you for the discussion and merging ! @c21 thank you for the review ! |
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AQEOptimizer.scala
Show resolved
Hide resolved
* - Project/Filter/Sample with all empty children. | ||
* | ||
* The reason why we don't apply this rule at AQE optimizer side is: the benefit is not big enough | ||
* and it may introduce extra exchanges. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After more thought, I think this is a big performance issue if we can't propagate empty relations through project/filter which are quite common. The risk of introducing new shuffles is relatively small compared to this.
@ulysses-you can we move all the logic to PropagateEmptyRelationBase
? PropagateEmptyRelation
should not have any extra logic.
…nd AQE optimizer ### What changes were proposed in this pull request? * remove `EliminateUnnecessaryJoin`, using `AQEPropagateEmptyRelation` instead. * eliminate join, aggregate, limit, repartition, sort, generate which is beneficial. ### Why are the changes needed? Make `EliminateUnnecessaryJoin` available with more case. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Add test. Closes apache#32602 from ulysses-you/SPARK-35455. Authored-by: ulysses-you <ulyssesyou18@gmail.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
What changes were proposed in this pull request?
EliminateUnnecessaryJoin
, usingAQEPropagateEmptyRelation
instead.Why are the changes needed?
Make
EliminateUnnecessaryJoin
available with more case.Does this PR introduce any user-facing change?
No.
How was this patch tested?
Add test.