New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-36424][SQL] Support eliminate limits in AQE Optimizer #33651
Conversation
4299af5
to
c076d76
Compare
Kubernetes integration test starting |
Kubernetes integration test starting |
Kubernetes integration test status failure |
* if we can eliminate limits. And we check if [[LogicalQueryStage]] is materialized at stats, | ||
* if it is not materialized the maxRows is none. | ||
*/ | ||
object AQEEliminateLimits extends EliminateLimitsBase { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what's the difference between this and EliminateLimits
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
They are same. Just consider we don't need transformDownWithPruning in AQE Optimizer since it's batch only run once.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
that's too minor to justify creating a new class
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yea, seems can reuse EliminateLimits
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK, I'm fine to reuse the EliminateLimits
Kubernetes integration test status success |
Test build #142082 has finished for PR 33651 at commit
|
Test build #142084 has finished for PR 33651 at commit
|
8aae958
to
4510816
Compare
case Limit(l, child) if canEliminate(l, child) => | ||
child | ||
case GlobalLimit(l, child) if canEliminate(l, child) => | ||
child | ||
|
||
case GlobalLimit(le, GlobalLimit(ne, grandChild)) => | ||
GlobalLimit(Least(Seq(ne, le)), grandChild) | ||
GlobalLimit(Literal(Least(Seq(ne, le)).eval().asInstanceOf[Int]), grandChild) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's flaky that this expression highly depend on ConstantFolding
.
Kubernetes integration test unable to build dist. exiting with code: 1 |
Kubernetes integration test starting |
Kubernetes integration test status success |
Test build #142101 has finished for PR 33651 at commit
|
Test build #142102 has finished for PR 33651 at commit
|
Kubernetes integration test starting |
Kubernetes integration test status failure |
@@ -54,4 +54,6 @@ case class LogicalQueryStage( | |||
} | |||
physicalStats.getOrElse(logicalPlan.stats) | |||
} | |||
|
|||
override def maxRows: Option[Long] = stats.rowCount.map(_.min(Long.MaxValue).toLong) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If there is other physical nodes above QueryStageExec
the stats is not accurate. Seems the only node can exist here is aggregate, so the maxRows is still accurate.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should trust the existing framework. If the maxRows
stats can be wrong, then EliminateLimits
is also wrong even without AQE.
I don't think we need to highlight LogicalQueryStage
in the doc of EliminateLimits
. We just need to follow the existing framework and make sure the maxRows
is not under-estimated.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK, cleaned the comment of EliminateLimits
Kubernetes integration test starting |
sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
Show resolved
Hide resolved
Test build #142130 has started for PR 33651 at commit |
Kubernetes integration test status success |
Test build #142117 has finished for PR 33651 at commit
|
Kubernetes integration test starting |
sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
Show resolved
Hide resolved
Kubernetes integration test status success |
retest this please |
Test build #142152 has finished for PR 33651 at commit
|
Test build #142119 has finished for PR 33651 at commit
|
Kubernetes integration test starting |
e8f17a4
to
6f28a37
Compare
Kubernetes integration test status success |
Test build #142162 has finished for PR 33651 at commit
|
Kubernetes integration test starting |
Kubernetes integration test status failure |
retest this please |
1 similar comment
retest this please |
Kubernetes integration test starting |
Kubernetes integration test status failure |
Test build #142187 has finished for PR 33651 at commit
|
thanks, merging to master! |
What changes were proposed in this pull request?
LogicalQueryStage
EliminateLimits
inAQEOptimizer
Why are the changes needed?
In Ad-hoc scenario, we always add limit for the query if user have no special limit value, but not all limit is nesessary.
With the power of AQE, we can eliminate limits using running statistics.
Does this PR introduce any user-facing change?
no
How was this patch tested?
add test