Skip to content

Commit

Permalink
[SPARK-34974][SQL] Improve subquery decorrelation framework
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?
This PR implements the decorrelation technique in the paper "Unnesting Arbitrary Queries" by T. Neumann; A. Kemper
(http://www.btw-2015.de/res/proceedings/Hauptband/Wiss/Neumann-Unnesting_Arbitrary_Querie.pdf). It currently supports Filter, Project, Aggregate, Join, and UnaryNode that passes CheckAnalysis.

This feature can be controlled by the config `spark.sql.optimizer.decorrelateInnerQuery.enabled` (default: true).

A few notes:
1. This PR does not relax any constraints in CheckAnalysis for correlated subqueries, even though some cases can be supported by this new framework, such as aggregate with correlated non-equality predicates. This PR focuses on adding the new framework and making sure all existing cases can be supported. Constraints can be relaxed gradually in the future via separate PRs.
2. The new framework is only enabled for correlated scalar subqueries, as the first step. EXISTS/IN subqueries can be supported in the future.

### Why are the changes needed?
Currently, Spark has limited support for correlated subqueries. It only allows `Filter` to reference outer query columns and does not support non-equality predicates when the subquery is aggregated. This new framework will allow more operators to host outer column references and support correlated non-equality predicates and more types of operators in correlated subqueries.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Existing unit and SQL query tests and new optimizer plan tests.

Closes #32072 from allisonwang-db/spark-34974-decorrelation.

Authored-by: allisonwang-db <66282705+allisonwang-db@users.noreply.github.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
  • Loading branch information
allisonwang-db authored and cloud-fan committed Apr 20, 2021
1 parent aa0d00d commit b6bb24c
Show file tree
Hide file tree
Showing 6 changed files with 819 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -140,14 +140,11 @@ object SubExprUtils extends PredicateHelper {
* Given a logical plan, returns TRUE if it has an outer reference and false otherwise.
*/
def hasOuterReferences(plan: LogicalPlan): Boolean = {
plan.find {
case f: Filter => containsOuter(f.condition)
case other => false
}.isDefined
plan.find(_.expressions.exists(containsOuter)).isDefined
}

/**
* Given a list of expressions, returns the expressions which have outer references. Aggregate
* Given an expression, returns the expressions which have outer references. Aggregate
* expressions are treated in a special way. If the children of aggregate expression contains an
* outer reference, then the entire aggregate expression is marked as an outer reference.
* Example (SQL):
Expand Down Expand Up @@ -183,18 +180,18 @@ object SubExprUtils extends PredicateHelper {
* }}}
* The code below needs to change when we support the above cases.
*/
def getOuterReferences(conditions: Seq[Expression]): Seq[Expression] = {
def getOuterReferences(expr: Expression): Seq[Expression] = {
val outerExpressions = ArrayBuffer.empty[Expression]
conditions foreach { expr =>
expr transformDown {
case a: AggregateExpression if a.collectLeaves.forall(_.isInstanceOf[OuterReference]) =>
val newExpr = stripOuterReference(a)
outerExpressions += newExpr
newExpr
case OuterReference(e) =>
outerExpressions += e
e
}
expr transformDown {
case a: AggregateExpression if a.collectLeaves.forall(_.isInstanceOf[OuterReference]) =>
// Collect and update the sub-tree so that outer references inside this aggregate
// expression will not be collected. For example: min(outer(a)) -> min(a).
val newExpr = stripOuterReference(a)
outerExpressions += newExpr
newExpr
case OuterReference(e) =>
outerExpressions += e
e
}
outerExpressions.toSeq
}
Expand All @@ -204,8 +201,7 @@ object SubExprUtils extends PredicateHelper {
* Filter operator can host outer references.
*/
def getOuterReferences(plan: LogicalPlan): Seq[Expression] = {
val conditions = plan.collect { case Filter(cond, _) => cond }
getOuterReferences(conditions)
plan.flatMap(_.expressions.flatMap(getOuterReferences))
}

/**
Expand Down
Loading

0 comments on commit b6bb24c

Please sign in to comment.