Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-13495][SQL] Add Null Filters in the query plan for Filters/Joins based on their data constraints #11372

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ abstract class Optimizer extends RuleExecutor[LogicalPlan] {
CombineLimits,
CombineUnions,
// Constant folding and strength reduction
NullFiltering,
NullPropagation,
OptimizeIn,
ConstantFolding,
Expand Down Expand Up @@ -585,6 +586,54 @@ object NullPropagation extends Rule[LogicalPlan] {
}
}

/**
* Attempts to eliminate reading (unnecessary) NULL values if they are not required for correctness
* by inserting isNotNull filters in the query plan. These filters are currently inserted beneath
* existing Filters and Join operators and are inferred based on their data constraints.
*
* Note: While this optimization is applicable to all types of join, it primarily benefits Inner and
* LeftSemi joins.
*/
object NullFiltering extends Rule[LogicalPlan] with PredicateHelper {
def apply(plan: LogicalPlan): LogicalPlan = plan transform {
case filter @ Filter(condition, child) =>
// We generate a list of additional isNotNull filters from the operator's existing constraints
// but remove those that are either already part of the filter condition or are part of the
// operator's child constraints.
val newIsNotNullConstraints = filter.constraints.filter(_.isInstanceOf[IsNotNull]) --
(child.constraints ++ splitConjunctivePredicates(condition))
if (newIsNotNullConstraints.nonEmpty) {
Filter(And(newIsNotNullConstraints.reduce(And), condition), child)
} else {
filter
}

case join @ Join(left, right, joinType, condition) =>
val leftIsNotNullConstraints = join.constraints
.filter(_.isInstanceOf[IsNotNull])
.filter(_.references.subsetOf(left.outputSet)) -- left.constraints
val rightIsNotNullConstraints =
join.constraints
.filter(_.isInstanceOf[IsNotNull])
.filter(_.references.subsetOf(right.outputSet)) -- right.constraints
val newLeftChild = if (leftIsNotNullConstraints.nonEmpty) {
Filter(leftIsNotNullConstraints.reduce(And), left)
} else {
left
}
val newRightChild = if (rightIsNotNullConstraints.nonEmpty) {
Filter(rightIsNotNullConstraints.reduce(And), right)
} else {
right
}
if (newLeftChild != left || newRightChild != right) {
Join(newLeftChild, newRightChild, joinType, condition)
} else {
join
}
}
}

/**
* Replaces [[Expression Expressions]] that can be statically evaluated with
* equivalent [[Literal]] values.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.catalyst.optimizer

import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.catalyst.dsl.plans._
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules._

class NullFilteringSuite extends PlanTest {

object Optimize extends RuleExecutor[LogicalPlan] {
val batches = Batch("NullFiltering", Once, NullFiltering) ::
Batch("CombineFilters", FixedPoint(5), CombineFilters) :: Nil
}

val testRelation = LocalRelation('a.int, 'b.int, 'c.int)

test("filter: filter out nulls in condition") {
val originalQuery = testRelation.where('a === 1).analyze
val correctAnswer = testRelation.where(IsNotNull('a) && 'a === 1).analyze
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you haven't done anything with a === 1 right? There's still no logic that a === 1 has a not nullable

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can do that in a follow up pr

val optimized = Optimize.execute(originalQuery)
comparePlans(optimized, correctAnswer)
}

test("single inner join: filter out nulls on either side on equi-join keys") {
val x = testRelation.subquery('x)
val y = testRelation.subquery('y)
val originalQuery = x.join(y,
condition = Some("x.a".attr === "y.a".attr && "x.b".attr === 1 && "y.c".attr > 5)).analyze
val left = x.where(IsNotNull('a) && IsNotNull('b))
val right = y.where(IsNotNull('a) && IsNotNull('c))
val correctAnswer = left.join(right,
condition = Some("x.a".attr === "y.a".attr && "x.b".attr === 1 && "y.c".attr > 5)).analyze
val optimized = Optimize.execute(originalQuery)
comparePlans(optimized, correctAnswer)
}

test("single inner join with pre-existing filters: filter out nulls on either side") {
val x = testRelation.subquery('x)
val y = testRelation.subquery('y)
val originalQuery = x.where('b > 5).join(y.where('c === 10),
condition = Some("x.a".attr === "y.a".attr)).analyze
val left = x.where(IsNotNull('a) && IsNotNull('b) && 'b > 5)
val right = y.where(IsNotNull('a) && IsNotNull('c) && 'c === 10)
val correctAnswer = left.join(right,
condition = Some("x.a".attr === "y.a".attr)).analyze
val optimized = Optimize.execute(originalQuery)
comparePlans(optimized, correctAnswer)
}

test("single outer join: no null filters are generated") {
val x = testRelation.subquery('x)
val y = testRelation.subquery('y)
val originalQuery = x.join(y, FullOuter,
condition = Some("x.a".attr === "y.a".attr)).analyze
val optimized = Optimize.execute(originalQuery)
comparePlans(optimized, originalQuery)
}

test("multiple inner joins: filter out nulls on all sides on equi-join keys") {
val t1 = testRelation.subquery('t1)
val t2 = testRelation.subquery('t2)
val t3 = testRelation.subquery('t3)
val t4 = testRelation.subquery('t4)

val originalQuery = t1
.join(t2, condition = Some("t1.b".attr === "t2.b".attr))
.join(t3, condition = Some("t2.b".attr === "t3.b".attr))
.join(t4, condition = Some("t3.b".attr === "t4.b".attr)).analyze
val correctAnswer = t1.where(IsNotNull('b))
.join(t2.where(IsNotNull('b)), condition = Some("t1.b".attr === "t2.b".attr))
.join(t3.where(IsNotNull('b)), condition = Some("t2.b".attr === "t3.b".attr))
.join(t4.where(IsNotNull('b)), condition = Some("t3.b".attr === "t4.b".attr)).analyze
val optimized = Optimize.execute(originalQuery)
comparePlans(optimized, correctAnswer)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had a few more test cases when i tried to this. Can you see if any of them should be added?

nongli@ea0edd4

We should also have outer join tests to make sure they don't add the is not null filter.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a modified version of multi-join test based on your patch and a new test for outer join.

As discussed offline, there is an additional test for non-equal keys (t1.a !== t2.b) that requires generating isNotNull constraints for these expressions; I'll make that change in a small followup PR and add new tests.

}
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import org.apache.spark.sql.catalyst.util._
/**
* Provides helper methods for comparing plans.
*/
abstract class PlanTest extends SparkFunSuite {
abstract class PlanTest extends SparkFunSuite with PredicateHelper {
/**
* Since attribute references are given globally unique ids during analysis,
* we must normalize them to check if two different queries are identical.
Expand All @@ -39,10 +39,22 @@ abstract class PlanTest extends SparkFunSuite {
}
}

/**
* Normalizes the filter conditions that appear in the plan. For instance,
* ((expr 1 && expr 2) && expr 3), (expr 1 && expr 2 && expr 3), (expr 3 && (expr 1 && expr 2)
* etc., will all now be equivalent.
*/
private def normalizeFilters(plan: LogicalPlan) = {
plan transform {
case filter @ Filter(condition: Expression, child: LogicalPlan) =>
Filter(splitConjunctivePredicates(condition).sortBy(_.hashCode()).reduce(And), child)
}
}

/** Fails the test if the two plans do not match */
protected def comparePlans(plan1: LogicalPlan, plan2: LogicalPlan) {
val normalized1 = normalizeExprIds(plan1)
val normalized2 = normalizeExprIds(plan2)
val normalized1 = normalizeFilters(normalizeExprIds(plan1))
val normalized2 = normalizeFilters(normalizeExprIds(plan2))
if (normalized1 != normalized2) {
fail(
s"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ class PlannerSuite extends SharedSQLContext {

withTempTable("testPushed") {
val exp = sql("select * from testPushed where key = 15").queryExecution.sparkPlan
assert(exp.toString.contains("PushedFilters: [EqualTo(key,15)]"))
assert(exp.toString.contains("PushedFilters: [IsNotNull(key), EqualTo(key,15)]"))
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,10 +74,8 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex
selectedFilters.foreach { pred =>
val maybeFilter = ParquetFilters.createFilter(df.schema, pred)
assert(maybeFilter.isDefined, s"Couldn't generate filter predicate for $pred")
maybeFilter.foreach { f =>
// Doesn't bother checking type parameters here (e.g. `Eq[Integer]`)
assert(f.getClass === filterClass)
}
// Doesn't bother checking type parameters here (e.g. `Eq[Integer]`)
maybeFilter.exists(_.getClass === filterClass)
}
checker(stripSparkFilter(query), expected)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,8 @@ class OrcFilterSuite extends QueryTest with OrcTest {
(predicate: Predicate, filterOperator: PredicateLeaf.Operator)
(implicit df: DataFrame): Unit = {
def checkComparisonOperator(filter: SearchArgument) = {
val operator = filter.getLeaves.asScala.head.getOperator
assert(operator === filterOperator)
val operator = filter.getLeaves.asScala
assert(operator.map(_.getOperator).contains(filterOperator))
}
checkFilterPredicate(df, predicate, checkComparisonOperator)
}
Expand Down Expand Up @@ -216,8 +216,9 @@ class OrcFilterSuite extends QueryTest with OrcTest {
)
checkFilterPredicate(
!('_1 < 4),
"""leaf-0 = (LESS_THAN _1 4)
|expr = (not leaf-0)""".stripMargin.trim
"""leaf-0 = (IS_NULL _1)
|leaf-1 = (LESS_THAN _1 4)
|expr = (and (not leaf-0) (not leaf-1))""".stripMargin.trim
)
checkFilterPredicate(
'_1 < 2 || '_1 > 3,
Expand All @@ -227,9 +228,10 @@ class OrcFilterSuite extends QueryTest with OrcTest {
)
checkFilterPredicate(
'_1 < 2 && '_1 > 3,
"""leaf-0 = (LESS_THAN _1 2)
|leaf-1 = (LESS_THAN_EQUALS _1 3)
|expr = (and leaf-0 (not leaf-1))""".stripMargin.trim
"""leaf-0 = (IS_NULL _1)
|leaf-1 = (LESS_THAN _1 2)
|leaf-2 = (LESS_THAN_EQUALS _1 3)
|expr = (and (not leaf-0) leaf-1 (not leaf-2))""".stripMargin.trim
)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -192,14 +192,14 @@ class SimpleTextHadoopFsRelationSuite extends HadoopFsRelationTest with Predicat
}

markup("Checking pushed filters")
assert(SimpleTextRelation.pushedFilters === pushedFilters.toSet)
assert(pushedFilters.toSet.subsetOf(SimpleTextRelation.pushedFilters))

val expectedInconvertibleFilters = inconvertibleFilters.map(_.expr).toSet
val expectedUnhandledFilters = unhandledFilters.map(_.expr).toSet
val expectedPartitioningFilters = partitioningFilters.map(_.expr).toSet

markup("Checking unhandled and inconvertible filters")
assert(expectedInconvertibleFilters ++ expectedUnhandledFilters === nonPushedFilters)
assert((expectedInconvertibleFilters ++ expectedUnhandledFilters).subsetOf(nonPushedFilters))

markup("Checking partitioning filters")
val actualPartitioningFilters = splitConjunctivePredicates(filter.expr).filter {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,12 +140,17 @@ class SimpleTextRelation(
// Constructs a filter predicate to simulate filter push-down
val predicate = {
val filterCondition: Expression = filters.collect {
// According to `unhandledFilters`, `SimpleTextRelation` only handles `GreaterThan` filter
// According to `unhandledFilters`, `SimpleTextRelation` only handles `GreaterThan` and
// `isNotNull` filters
case sources.GreaterThan(column, value) =>
val dataType = dataSchema(column).dataType
val literal = Literal.create(value, dataType)
val attribute = inputAttributes.find(_.name == column).get
expressions.GreaterThan(attribute, literal)
case sources.IsNotNull(column) =>
val dataType = dataSchema(column).dataType
val attribute = inputAttributes.find(_.name == column).get
expressions.IsNotNull(attribute)
}.reduceOption(expressions.And).getOrElse(Literal(true))
InterpretedPredicate.create(filterCondition, inputAttributes)
}
Expand Down Expand Up @@ -183,11 +188,12 @@ class SimpleTextRelation(
}
}

// `SimpleTextRelation` only handles `GreaterThan` filter. This is used to test filter push-down
// and `BaseRelation.unhandledFilters()`.
// `SimpleTextRelation` only handles `GreaterThan` and `IsNotNull` filters. This is used to test
// filter push-down and `BaseRelation.unhandledFilters()`.
override def unhandledFilters(filters: Array[Filter]): Array[Filter] = {
filters.filter {
case _: GreaterThan => false
case _: IsNotNull => false
case _ => true
}
}
Expand Down