Skip to content

Commit

Permalink
[SPARK-17337][SQL] Do not pushdown predicates through filters with pr…
Browse files Browse the repository at this point in the history
…edicate subqueries

## What changes were proposed in this pull request?
The `PushDownPredicate` rule can create a wrong result if we try to push a filter containing a predicate subquery through a project when the subquery and the project share attributes (have the same source).

The current PR fixes this by making sure that we do not push down when there is a predicate subquery that outputs the same attributes as the filters new child plan.

## How was this patch tested?
Added a test to `SubquerySuite`. nsyca has done previous work this. I have taken test from his initial PR.

Author: Herman van Hovell <hvanhovell@databricks.com>

Closes #15761 from hvanhovell/SPARK-17337.
  • Loading branch information
hvanhovell committed Nov 4, 2016
1 parent a42d738 commit 550cd56
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -689,7 +689,7 @@ object PushDownPredicate extends Rule[LogicalPlan] with PredicateHelper {
// state and all the input rows processed before. In another word, the order of input rows
// matters for non-deterministic expressions, while pushing down predicates changes the order.
case filter @ Filter(condition, project @ Project(fields, grandChild))
if fields.forall(_.deterministic) =>
if fields.forall(_.deterministic) && canPushThroughCondition(grandChild, condition) =>

// Create a map of Aliases to their values from the child projection.
// e.g., 'SELECT a + b AS c, d ...' produces Map(c -> a + b).
Expand Down Expand Up @@ -830,6 +830,20 @@ object PushDownPredicate extends Rule[LogicalPlan] with PredicateHelper {
filter
}
}

/**
* Check if we can safely push a filter through a projection, by making sure that predicate
* subqueries in the condition do not contain the same attributes as the plan they are moved
* into. This can happen when the plan and predicate subquery have the same source.
*/
private def canPushThroughCondition(plan: LogicalPlan, condition: Expression): Boolean = {
val attributes = plan.outputSet
val matched = condition.find {
case PredicateSubquery(p, _, _, _) => p.outputSet.intersect(attributes).nonEmpty
case _ => false
}
matched.isEmpty
}
}

/**
Expand Down
24 changes: 20 additions & 4 deletions sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -608,8 +608,8 @@ class SubquerySuite extends QueryTest with SharedSQLContext {
| where exists (select 1 from onerow t2 where t1.c1=t2.c1)
| and exists (select 1 from onerow LIMIT 1)""".stripMargin),
Row(1) :: Nil)
}
}
}
}

test("SPARK-16804: Correlated subqueries containing LIMIT - 2") {
withTempView("onerow") {
Expand All @@ -623,6 +623,22 @@ class SubquerySuite extends QueryTest with SharedSQLContext {
| from (select 1 from onerow t2 LIMIT 1)
| where t1.c1=t2.c1)""".stripMargin),
Row(1) :: Nil)
}
}
}
}

test("SPARK-17337: Incorrect column resolution leads to incorrect results") {
withTempView("t1", "t2") {
Seq(1, 2).toDF("c1").createOrReplaceTempView("t1")
Seq(1).toDF("c2").createOrReplaceTempView("t2")

checkAnswer(
sql(
"""
| select *
| from (select t2.c2+1 as c3
| from t1 left join t2 on t1.c1=t2.c2) t3
| where c3 not in (select c2 from t2)""".stripMargin),
Row(2) :: Nil)
}
}
}

0 comments on commit 550cd56

Please sign in to comment.