New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-15832][SQL] Embedded IN/EXISTS predicate subquery throws TreeNodeException #13570
Conversation
@@ -1715,31 +1715,68 @@ object RewritePredicateSubquery extends Rule[LogicalPlan] with PredicateHelper { | |||
// Filter the plan by applying left semi and left anti joins. | |||
withSubquery.foldLeft(newFilter) { | |||
case (p, PredicateSubquery(sub, conditions, _, _)) => | |||
Join(p, sub, LeftSemi, conditions.reduceOption(And)) | |||
if (!conditions.exists(PredicateSubquery.hasPredicateSubquery)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So the code for these rewrites is (almost) the same for all three cases. Lets move this into a helper method or move it into a separate a loop before this one (I prefer the latter).
@ioana-delaney great catch! The overall PR seems pretty solid. I left one smallish code organization related comment. |
Test build #3070 has finished for PR 13570 at commit
|
@hvanhovell Thank you for reviewing the changes and I apologize for the delay in replying. |
* are blocked in the Analyzer. | ||
*/ | ||
private def rewriteExistentialExpr( | ||
expr: Option[Expression], |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Lets just pass a sequence of expressions. Predicate subqueries are guaranteed to have one or more conditions.
This also eliminates the need for a pattern match. Just map over the expressions (rewrite the subqueries) and reduce conditions at the end.
@ioana-delaney no worries. I think the approach you have taken is the correct one. I have left one smallish comment. |
@hvanhovell The EXISTS/NOT EXISTS predicates will have an empty condition. e.g. select c1 from t1 where EXISTS (select c2 from t2) == Optimized Logical Plan == But the other subquery predicates are quaranteed to have at least one condition. Regarding the rewriteExistentialExpr interface, I think that I need to pass an expression instead of a sequence of conditions since the last case in the main rewrite rule does not have conditions. It's just an expression. e.g. where (case when c2 IN (select 1 as one) then 1 else 2) = c1 Please let me know. Thanks. |
LGTM - merging to master/2.0 thanks! |
…odeException ## What changes were proposed in this pull request? Queries with embedded existential sub-query predicates throws exception when building the physical plan. Example failing query: ```SQL scala> Seq((1, 1), (2, 2)).toDF("c1", "c2").createOrReplaceTempView("t1") scala> Seq((1, 1), (2, 2)).toDF("c1", "c2").createOrReplaceTempView("t2") scala> sql("select c1 from t1 where (case when c2 in (select c2 from t2) then 2 else 3 end) IN (select c2 from t1)").show() Binding attribute, tree: c2#239 org.apache.spark.sql.catalyst.errors.package$TreeNodeException: Binding attribute, tree: c2#239 at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:50) at org.apache.spark.sql.catalyst.expressions.BindReferences$$anonfun$bindReference$1.applyOrElse(BoundAttribute.scala:88) ... at org.apache.spark.sql.catalyst.expressions.BindReferences$.bindReference(BoundAttribute.scala:87) at org.apache.spark.sql.execution.joins.HashJoin$$anonfun$4.apply(HashJoin.scala:66) at org.apache.spark.sql.execution.joins.HashJoin$$anonfun$4.apply(HashJoin.scala:66) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.immutable.List.foreach(List.scala:381) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.immutable.List.map(List.scala:285) at org.apache.spark.sql.execution.joins.HashJoin$class.org$apache$spark$sql$execution$joins$HashJoin$$x$8(HashJoin.scala:66) at org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.org$apache$spark$sql$execution$joins$HashJoin$$x$8$lzycompute(BroadcastHashJoinExec.scala:38) at org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.org$apache$spark$sql$execution$joins$HashJoin$$x$8(BroadcastHashJoinExec.scala:38) at org.apache.spark.sql.execution.joins.HashJoin$class.buildKeys(HashJoin.scala:63) at org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.buildKeys$lzycompute(BroadcastHashJoinExec.scala:38) at org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.buildKeys(BroadcastHashJoinExec.scala:38) at org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.requiredChildDistribution(BroadcastHashJoinExec.scala:52) ``` **Problem description:** When the left hand side expression of an existential sub-query predicate contains another embedded sub-query predicate, the RewritePredicateSubquery optimizer rule does not resolve the embedded sub-query expressions into existential joins.For example, the above query has the following optimized plan, which fails during physical plan build. ```SQL == Optimized Logical Plan == Project [_1#224 AS c1#227] +- Join LeftSemi, (CASE WHEN predicate-subquery#255 [(_2#225 = c2#239)] THEN 2 ELSE 3 END = c2#228#262) : +- SubqueryAlias predicate-subquery#255 [(_2#225 = c2#239)] : +- LocalRelation [c2#239] :- LocalRelation [_1#224, _2#225] +- LocalRelation [c2#228#262] == Physical Plan == org.apache.spark.sql.catalyst.errors.package$TreeNodeException: Binding attribute, tree: c2#239 ``` **Solution:** In RewritePredicateSubquery, before rewriting the outermost predicate sub-query, resolve any embedded existential sub-queries. The Optimized plan for the above query after the changes looks like below. ```SQL == Optimized Logical Plan == Project [_1#224 AS c1#227] +- Join LeftSemi, (CASE WHEN exists#285 THEN 2 ELSE 3 END = c2#228#284) :- Join ExistenceJoin(exists#285), (_2#225 = c2#239) : :- LocalRelation [_1#224, _2#225] : +- LocalRelation [c2#239] +- LocalRelation [c2#228#284] == Physical Plan == *Project [_1#224 AS c1#227] +- *BroadcastHashJoin [CASE WHEN exists#285 THEN 2 ELSE 3 END], [c2#228#284], LeftSemi, BuildRight :- *BroadcastHashJoin [_2#225], [c2#239], ExistenceJoin(exists#285), BuildRight : :- LocalTableScan [_1#224, _2#225] : +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint))) : +- LocalTableScan [c2#239] +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint))) +- LocalTableScan [c2#228#284] +- LocalTableScan [c222#36], [[111],[222]] ``` ## How was this patch tested? Added new test cases in SubquerySuite.scala Author: Ioana Delaney <ioanamdelaney@gmail.com> Closes #13570 from ioana-delaney/fixEmbedSubPredV1. (cherry picked from commit 0ff8a68) Signed-off-by: Herman van Hovell <hvanhovell@databricks.com>
Did this break the build? https://amplab.cs.berkeley.edu/jenkins/view/Spark%20QA%20Compile/job/spark-master-compile-sbt-scala-2.10/1707/console It looks like the last commit may not have been tested. |
@hvanhovell @jkbradley Could you add @ioana-delaney to the whitelist? Thanks! |
What changes were proposed in this pull request?
Queries with embedded existential sub-query predicates throws exception when building the physical plan.
Example failing query:
Problem description:
When the left hand side expression of an existential sub-query predicate contains another embedded sub-query predicate, the RewritePredicateSubquery optimizer rule does not resolve the embedded sub-query expressions into existential joins.For example, the above query has the following optimized plan, which fails during physical plan build.
Solution:
In RewritePredicateSubquery, before rewriting the outermost predicate sub-query, resolve any embedded existential sub-queries. The Optimized plan for the above query after the changes looks like below.
How was this patch tested?
Added new test cases in SubquerySuite.scala