New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-36112] [SQL] Support correlated EXISTS and IN subqueries using DecorrelateInnerQuery framework #43111
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good for the most part, some inline comments
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
Outdated
Show resolved
Hide resolved
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
Show resolved
Hide resolved
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/DecorrelateInnerQuery.scala
Outdated
Show resolved
Hide resolved
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good - this will be a super helpful change!
@allisonwang-db should also review
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Excited to see this! Please also add IN subquery in the PR title :)
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
Outdated
Show resolved
Hide resolved
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
Outdated
Show resolved
Hide resolved
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/subquery.scala
Outdated
Show resolved
Hide resolved
sql/core/src/test/resources/sql-tests/inputs/subquery/exists-subquery/exists-count-bug.sql
Show resolved
Hide resolved
// However, SUM(x) IS NULL is another case that returns 0, and in general any IS/NOT IS and CASE | ||
// expressions are suspect (and the combination of those). | ||
// For now we conservatively accept only those expressions that are guaranteed to be safe. | ||
val exprsRejectEmptyInput = aggregateExpressions.map { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this new code? how do we detect count bug before?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For exists and IN we did not detect the count bug before, hence the incorrect results.
For scalar subqueries, there is some quite convoluted way of detecting a count bug as a post-processing of scalar subquery. I will refactor it to use this function in the future, as it seems easier and more straightforward.
case Alias(a: AggregateExpression, _) if a.aggregateFunction.defaultResult == None => true | ||
case _ => false | ||
} | ||
exprsRejectEmptyInput.forall(x => x == true) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit:
aggregateExpressions.forall ...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
neat, thank you!
@@ -283,6 +305,15 @@ object PullupCorrelatedPredicates extends Rule[LogicalPlan] with PredicateHelper | |||
} else { | |||
a | |||
} | |||
|
|||
case l @ Limit(_, _) if predicateMap.nonEmpty => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
don't we fail earlier in CheckAnalysis
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no, we don't.
In fact, CheckAnalysis now allows LIMIT in the correlated subqueries as we support them in lateral/scalar/ EXISTs and IN (the latter is done in this PR).
This check just makes sure that the legacy path (aka PullupCorrelatedPredicates) does not allow LIMITs.
@@ -461,6 +462,22 @@ object DecorrelateInnerQuery extends PredicateHelper { | |||
p.mapChildren(rewriteDomainJoins(outerPlan, _, conditions)) | |||
} | |||
|
|||
private def isCountBugFree(aggregateExpressions: Seq[NamedExpression]): Boolean = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the existing way to detect the count bug is better. It evaluates the Aggregate
operator with empty input and see if the result is null or not. It's more accurate than a static analysis.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since this is only used in the new code path, it's fine to improve it when we consolidate the code later.
thanks, merging to master! |
…DecorrelateInnerQuery framework ### What changes were proposed in this pull request? Currently, `DecorrelateInnerQuery` is only enabled for scalar and lateral subqueries. This PR enables `DecorrelateInnerQuery` for IN/EXISTS subqueries ### Why are the changes needed? See above. ### Does this PR introduce _any_ user-facing change? Users can run more EXISTS/IN subqueries, including correlated subqueries with aggregates and limits etc. ### How was this patch tested? Existing unit tests and new query tests. ### Was this patch authored or co-authored using generative AI tooling? No Closes apache#43111 from agubichev/correlated_exists. Authored-by: Andrey Gubichev <andrey.gubichev@databricks.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
@@ -5272,6 +5281,9 @@ class SQLConf extends Serializable with Logging with SqlApiConf { | |||
|
|||
def decorrelateInnerQueryEnabled: Boolean = getConf(SQLConf.DECORRELATE_INNER_QUERY_ENABLED) | |||
|
|||
def decorrelateInnerQueryEnabledForExistsIn: Boolean = | |||
!getConf(SQLConf.DECORRELATE_EXISTS_IN_SUBQUERY_LEGACY_INCORRECT_COUNT_HANDLING_ENABLED) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we check whether decorrelateInnerQueryEnabled
is true here, too?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the caller checks it:
https://github.com/search?q=repo%3Aapache%2Fspark%20decorrelateInnerQueryEnabledForExistsIn&type=code
(first check of the decorrelate
function, explicit check in CheckAnalysis)
What changes were proposed in this pull request?
Currently,
DecorrelateInnerQuery
is only enabled for scalar and lateral subqueries. This PR enablesDecorrelateInnerQuery
for IN/EXISTS subqueriesWhy are the changes needed?
See above.
Does this PR introduce any user-facing change?
Users can run more EXISTS/IN subqueries, including correlated subqueries with aggregates and limits etc.
How was this patch tested?
Existing unit tests and new query tests.
Was this patch authored or co-authored using generative AI tooling?
No