Skip to content

Commit

Permalink
[SPARK-23274][SQL] Fix ReplaceExceptWithFilter when the right's Filte…
Browse files Browse the repository at this point in the history
…r contains the references that are not in the left output

## What changes were proposed in this pull request?
This PR is to fix the `ReplaceExceptWithFilter` rule when the right's Filter contains the references that are not in the left output.

Before this PR, we got the error like
```
java.util.NoSuchElementException: key not found: a
  at scala.collection.MapLike$class.default(MapLike.scala:228)
  at scala.collection.AbstractMap.default(Map.scala:59)
  at scala.collection.MapLike$class.apply(MapLike.scala:141)
  at scala.collection.AbstractMap.apply(Map.scala:59)
```

After this PR, `ReplaceExceptWithFilter ` will not take an effect in this case.

## How was this patch tested?
Added tests

Author: gatorsmile <gatorsmile@gmail.com>

Closes #20444 from gatorsmile/fixReplaceExceptWithFilter.

(cherry picked from commit ca04c3f)
Signed-off-by: gatorsmile <gatorsmile@gmail.com>
  • Loading branch information
gatorsmile committed Jan 31, 2018
1 parent 6ed0d57 commit b877832
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 4 deletions.
Expand Up @@ -46,18 +46,27 @@ object ReplaceExceptWithFilter extends Rule[LogicalPlan] {
}

plan.transform {
case Except(left, right) if isEligible(left, right) =>
Distinct(Filter(Not(transformCondition(left, skipProject(right))), left))
case e @ Except(left, right) if isEligible(left, right) =>
val newCondition = transformCondition(left, skipProject(right))
newCondition.map { c =>
Distinct(Filter(Not(c), left))
}.getOrElse {
e
}
}
}

private def transformCondition(left: LogicalPlan, right: LogicalPlan): Expression = {
private def transformCondition(left: LogicalPlan, right: LogicalPlan): Option[Expression] = {
val filterCondition =
InferFiltersFromConstraints(combineFilters(right)).asInstanceOf[Filter].condition

val attributeNameMap: Map[String, Attribute] = left.output.map(x => (x.name, x)).toMap

filterCondition.transform { case a : AttributeReference => attributeNameMap(a.name) }
if (filterCondition.references.forall(r => attributeNameMap.contains(r.name))) {
Some(filterCondition.transform { case a: AttributeReference => attributeNameMap(a.name) })
} else {
None
}
}

// TODO: This can be further extended in the future.
Expand Down
Expand Up @@ -168,6 +168,21 @@ class ReplaceOperatorSuite extends PlanTest {
comparePlans(optimized, correctAnswer)
}

test("replace Except with Filter when only right filter can be applied to the left") {
val table = LocalRelation(Seq('a.int, 'b.int))
val left = table.where('b < 1).select('a).as("left")
val right = table.where('b < 3).select('a).as("right")

val query = Except(left, right)
val optimized = Optimize.execute(query.analyze)

val correctAnswer =
Aggregate(left.output, right.output,
Join(left, right, LeftAnti, Option($"left.a" <=> $"right.a"))).analyze

comparePlans(optimized, correctAnswer)
}

test("replace Distinct with Aggregate") {
val input = LocalRelation('a.int, 'b.int)

Expand Down
Expand Up @@ -589,6 +589,14 @@ class DataFrameSuite extends QueryTest with SharedSQLContext {
Nil)
}

test("SPARK-23274: except between two projects without references used in filter") {
val df = Seq((1, 2, 4), (1, 3, 5), (2, 2, 3), (2, 4, 5)).toDF("a", "b", "c")
val df1 = df.filter($"a" === 1)
val df2 = df.filter($"a" === 2)
checkAnswer(df1.select("b").except(df2.select("b")), Row(3) :: Nil)
checkAnswer(df1.select("b").except(df2.select("c")), Row(2) :: Nil)
}

test("except distinct - SQL compliance") {
val df_left = Seq(1, 2, 2, 3, 3, 4).toDF("id")
val df_right = Seq(1, 3).toDF("id")
Expand Down

0 comments on commit b877832

Please sign in to comment.