Skip to content

Commit

Permalink
Nong's comments
Browse files Browse the repository at this point in the history
  • Loading branch information
sameeragarwal committed Mar 4, 2016
1 parent 013f97a commit 31b1700
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -588,30 +588,28 @@ object NullPropagation extends Rule[LogicalPlan] {

/**
* Attempts to eliminate reading (unnecessary) NULL values if they are not required for correctness
* by inserting isNotNull filters is the query plan. These filters are currently inserted beneath
* by inserting isNotNull filters in the query plan. These filters are currently inserted beneath
* existing Filters and Join operators and are inferred based on their data constraints.
*
* Note: While this optimization is applicable to all types of join, it primarily benefits Inner and
* LeftSemi joins.
*/
object NullFiltering extends Rule[LogicalPlan] with PredicateHelper {
def apply(plan: LogicalPlan): LogicalPlan = plan transform {
case filter @ Filter(condition, child: LogicalPlan) =>
case filter @ Filter(condition, child) =>
// We generate a list of additional isNotNull filters from the operator's existing constraints
// but remove those that are either already part of the filter condition or are part of the
// operator's child constraints.
val newIsNotNullConstraints = filter.constraints.filter(_.isInstanceOf[IsNotNull]) --
(child.constraints ++ splitConjunctivePredicates(condition))
val newCondition = if (newIsNotNullConstraints.nonEmpty) {
And(newIsNotNullConstraints.reduce(And), condition)
if (newIsNotNullConstraints.nonEmpty) {
Filter(And(newIsNotNullConstraints.reduce(And), condition), child)
} else {
condition
filter
}
Filter(newCondition, child)

case join @ Join(left: LogicalPlan, right: LogicalPlan, joinType: JoinType,
condition: Option[Expression]) =>
val leftIsNotNullConstraints = join.constraints
case join @ Join(left, right, joinType, condition) =>
val leftIsNotNullConstraints = join.constraints
.filter(_.isInstanceOf[IsNotNull])
.filter(_.references.subsetOf(left.outputSet)) -- left.constraints
val rightIsNotNullConstraints =
Expand All @@ -628,7 +626,11 @@ object NullFiltering extends Rule[LogicalPlan] with PredicateHelper {
} else {
right
}
Join(newLeftChild, newRightChild, joinType, condition)
if (newLeftChild != left || newRightChild != right) {
Join(newLeftChild, newRightChild, joinType, condition)
} else {
join
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,41 +28,68 @@ class NullFilteringSuite extends PlanTest {

object Optimize extends RuleExecutor[LogicalPlan] {
val batches = Batch("NullFiltering", Once, NullFiltering) ::
Batch("CombineFilters", Once, CombineFilters) :: Nil
Batch("CombineFilters", FixedPoint(5), CombineFilters) :: Nil
}

val testRelation = LocalRelation('a.int, 'b.int, 'c.int)

test("filter: filter out nulls in condition") {
val originalQuery = testRelation.where('a === 1)
val originalQuery = testRelation.where('a === 1).analyze
val correctAnswer = testRelation.where(IsNotNull('a) && 'a === 1).analyze
val optimized = Optimize.execute(originalQuery.analyze)
val optimized = Optimize.execute(originalQuery)
comparePlans(optimized, correctAnswer)
}

test("join: filter out nulls on either side") {
test("single inner join: filter out nulls on either side on equi-join keys") {
val x = testRelation.subquery('x)
val y = testRelation.subquery('y)
val originalQuery = x.join(y,
condition = Some("x.a".attr === "y.a".attr && "x.b".attr === 1 && "y.c".attr > 5))
condition = Some("x.a".attr === "y.a".attr && "x.b".attr === 1 && "y.c".attr > 5)).analyze
val left = x.where(IsNotNull('a) && IsNotNull('b))
val right = y.where(IsNotNull('a) && IsNotNull('c))
val correctAnswer = left.join(right,
condition = Some("x.a".attr === "y.a".attr && "x.b".attr === 1 && "y.c".attr > 5)).analyze
val optimized = Optimize.execute(originalQuery.analyze)
val optimized = Optimize.execute(originalQuery)
comparePlans(optimized, correctAnswer)
}

test("join with pre-existing filters: filter out nulls on either side") {
test("single inner join with pre-existing filters: filter out nulls on either side") {
val x = testRelation.subquery('x)
val y = testRelation.subquery('y)
val originalQuery = x.where('b > 5).join(y.where('c === 10),
condition = Some("x.a".attr === "y.a".attr))
condition = Some("x.a".attr === "y.a".attr)).analyze
val left = x.where(IsNotNull('a) && IsNotNull('b) && 'b > 5)
val right = y.where(IsNotNull('a) && IsNotNull('c) && 'c === 10)
val correctAnswer = left.join(right,
condition = Some("x.a".attr === "y.a".attr)).analyze
val optimized = Optimize.execute(originalQuery.analyze)
val optimized = Optimize.execute(originalQuery)
comparePlans(optimized, correctAnswer)
}

test("single outer join: no null filters are generated") {
val x = testRelation.subquery('x)
val y = testRelation.subquery('y)
val originalQuery = x.join(y, FullOuter,
condition = Some("x.a".attr === "y.a".attr)).analyze
val optimized = Optimize.execute(originalQuery)
comparePlans(optimized, originalQuery)
}

test("multiple inner joins: filter out nulls on all sides on equi-join keys") {
val t1 = testRelation.subquery('t1)
val t2 = testRelation.subquery('t2)
val t3 = testRelation.subquery('t3)
val t4 = testRelation.subquery('t4)

val originalQuery = t1
.join(t2, condition = Some("t1.b".attr === "t2.b".attr))
.join(t3, condition = Some("t2.b".attr === "t3.b".attr))
.join(t4, condition = Some("t3.b".attr === "t4.b".attr)).analyze
val correctAnswer = t1.where(IsNotNull('b))
.join(t2.where(IsNotNull('b)), condition = Some("t1.b".attr === "t2.b".attr))
.join(t3.where(IsNotNull('b)), condition = Some("t2.b".attr === "t3.b".attr))
.join(t4.where(IsNotNull('b)), condition = Some("t3.b".attr === "t4.b".attr)).analyze
val optimized = Optimize.execute(originalQuery)
comparePlans(optimized, correctAnswer)
}
}

0 comments on commit 31b1700

Please sign in to comment.