[SPARK-14785][SQL] Support correlated scalar subqueries #12822
[SPARK-14785][SQL] Support correlated scalar subqueries #12822hvanhovell wants to merge 7 commits intoapache:masterfrom
Conversation
|
Test build #57478 has finished for PR 12822 at commit
|
|
Test build #57485 has finished for PR 12822 at commit
|
| query match { | ||
| case a: Aggregate => checkAggregate(a) | ||
| case Project(_, a: Aggregate) => checkAggregate(a) | ||
| case fail => failAnalysis(s"Correlated scalar subqueries must be Aggregated: $fail") |
There was a problem hiding this comment.
Can it have an Filter on top of Aggregate (HAVING clause)?
There was a problem hiding this comment.
Sure I'll add it.
|
LGTM |
|
Test build #57532 has finished for PR 12822 at commit
|
|
@hvanhovell When I tested this patch with TPCDS Q32 and Q92, the optimizer became not stable, it will reach 100 iterations, and the logical plan become huge. Could you fix it before merging? |
|
@davies something is up with the optimizer. Working on it. |
# Conflicts: # sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
|
@davies the TPCDS queries should work now. Could you take another look? |
|
@hvanhovell They works well now. Could you also update Filter to not create constraint from predicate that has correlated subquery? |
| override protected def validConstraints: Set[Expression] = { | ||
| val predicates = splitConjunctivePredicates(condition) | ||
| .filterNot(PredicateSubquery.hasPredicateSubquery) | ||
| .filterNot(SubqueryExpression.hasCorrelatedSubquery) |
There was a problem hiding this comment.
@davies I changed the filter to prevent any correlated subquery from being propagated.
There was a problem hiding this comment.
Oh I see, I missed this one from latest changes.
|
LGTM, Will merge this one once it pass the tests. |
|
Test build #57561 has finished for PR 12822 at commit
|
## What changes were proposed in this pull request? In this PR we add support for correlated scalar subqueries. An example of such a query is: ```SQL select * from tbl1 a where a.value > (select max(value) from tbl2 b where b.key = a.key) ``` The implementation adds the `RewriteCorrelatedScalarSubquery` rule to the Optimizer. This rule plans these subqueries using `LEFT OUTER` joins. It currently supports rewrites for `Project`, `Aggregate` & `Filter` logical plans. I could not find a well defined semantics for the use of scalar subqueries in an `Aggregate`. The current implementation currently evaluates the scalar subquery *before* aggregation. This means that you either have to make scalar subquery part of the grouping expression, or that you have to aggregate it further on. I am open to suggestions on this. The implementation currently forces the uniqueness of a scalar subquery by enforcing that it is aggregated and that the resulting column is wrapped in an `AggregateExpression`. ## How was this patch tested? Added tests to `SubquerySuite`. Author: Herman van Hovell <hvanhovell@questtec.nl> Closes #12822 from hvanhovell/SPARK-14785.
What changes were proposed in this pull request?
In this PR we add support for correlated scalar subqueries. An example of such a query is:
The implementation adds the
RewriteCorrelatedScalarSubqueryrule to the Optimizer. This rule plans these subqueries usingLEFT OUTERjoins. It currently supports rewrites forProject,Aggregate&Filterlogical plans.I could not find a well defined semantics for the use of scalar subqueries in an
Aggregate. The current implementation currently evaluates the scalar subquery before aggregation. This means that you either have to make scalar subquery part of the grouping expression, or that you have to aggregate it further on. I am open to suggestions on this.The implementation currently forces the uniqueness of a scalar subquery by enforcing that it is aggregated and that the resulting column is wrapped in an
AggregateExpression.How was this patch tested?
Added tests to
SubquerySuite.