Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-13249][SQL] Add Filter checking nullability of keys for inner join #11235

Closed
wants to merge 15 commits into from
Closed
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import org.apache.spark.sql.catalyst.analysis.{CleanupAliases, EliminateSubquery
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.aggregate._
import org.apache.spark.sql.catalyst.expressions.Literal.{FalseLiteral, TrueLiteral}
import org.apache.spark.sql.catalyst.planning.{ExtractFiltersAndInnerJoins, Unions}
import org.apache.spark.sql.catalyst.planning.{ExtractEquiJoinKeys, ExtractFiltersAndInnerJoins, Unions}
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules._
Expand Down Expand Up @@ -57,6 +57,8 @@ abstract class Optimizer extends RuleExecutor[LogicalPlan] {
ReplaceDistinctWithAggregate) ::
Batch("Aggregate", FixedPoint(100),
RemoveLiteralFromGroupExpressions) ::
Batch("Join", FixedPoint(100),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We may have more InnerJoin from OuterJoinElimination, should we move this rule after that?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we can make this rule idempotent, we don't need to put this as separate group.

AddFilterOfNullForInnerJoin) ::
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should make this rule idempotent instead of hacking it and making it run only once. You are loosing the benefits of emergent optimizations with this implementation.

I would directly construct the filter in the left/right child, but only when its not already present in the constraints of the child. This is the whole reason we added the ability to reason about what constraints are already present on a subtree.

Batch("Operator Optimizations", FixedPoint(100),
// Operator push down
SetOperationPushDown,
Expand Down Expand Up @@ -143,6 +145,56 @@ object EliminateSerialization extends Rule[LogicalPlan] {
}
}

/**
* Add Filter to left and right of an inner Join to filter out rows with null keys.
* So we may not need to check nullability of keys while joining. Besides, by filtering
* out keys with null, we can also reduce data size in Join.
*/
object AddFilterOfNullForInnerJoin extends Rule[LogicalPlan] with PredicateHelper {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could also do this for semi join and outer join, should we call it AddFilterOfNullForEquiJoin ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not commenting at the name at all (haven't looked at the rest of the pr), but AddFilterOfNullForEquiJoin can be shortened slightly to AddNullFilterForEquiJoin

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Renamed. Will do part of semi join and outer join in separate PR once this getting merged.

def apply(plan: LogicalPlan): LogicalPlan = plan transform {
case ExtractEquiJoinKeys(Inner, leftKeys, rightKeys, condition, left, right) =>
val leftConditions = leftKeys.distinct.map { l =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should only add predicate if the key is nullable and there is no IsNotNull constraints on the key.

IsNotNull(l)
}.reduceLeft(And)

val rightConditions = rightKeys.distinct.map { r =>
IsNotNull(r)
}.reduceLeft(And)

val keysConditions = ExpressionSet(leftKeys.zip(rightKeys).map { lr =>
EqualTo(lr._2, lr._1)
}).reduceLeft(And)

val finalConditions = if (condition.isDefined) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's better to use the original condition

And(keysConditions, condition.get)
} else {
keysConditions
}

val leftHasAllNotNullPredicate = left.constraints.nonEmpty &&
left.constraints.filter(_.isInstanceOf[IsNotNull])
.forall(expr => leftConditions.references.intersect(expr.references).nonEmpty)

val rightHasAllNotNullPredicate = right.constraints.nonEmpty &&
right.constraints.filter(_.isInstanceOf[IsNotNull])
.forall(expr => rightConditions.references.intersect(expr.references).nonEmpty)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems complicated and definitely needs more comments. Why are we not just adding IsNotNull to the left/right when it doesn't already exist in the conditions?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because we want to eliminate reading nulls in advance of join operator. If we simply add IsNotNull to the left/right to the conditions, the nulls are still read during performing join.


val newLeft = if (leftHasAllNotNullPredicate) {
left
} else {
Filter(leftConditions, left)
}

val newRight = if (rightHasAllNotNullPredicate) {
right
} else {
Filter(rightConditions, right)
}

Join(newLeft, newRight, Inner, Some(finalConditions))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If no filter added, we should return the original join.

}
}

/**
* Pushes down [[LocalLimit]] beneath UNION ALL and beneath the streamed inputs of outer joins.
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.catalyst.optimizer

import org.apache.spark.sql.catalyst.analysis
import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases
import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.catalyst.dsl.plans._
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.PlanTest
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules._
import org.apache.spark.sql.types.IntegerType

class AddFilterOfNullForInnerJoinSuite extends PlanTest {

object Optimize extends RuleExecutor[LogicalPlan] {
val batches =
Batch("Subqueries", Once,
EliminateSubqueryAliases) ::
Batch("Add Filter", FixedPoint(100),
AddFilterOfNullForInnerJoin) :: Nil
}

val testRelation = LocalRelation('a.int, 'b.int, 'c.int)

val testRelation1 = LocalRelation('d.int)

test("add filters to left and right of inner join") {
val x = testRelation.subquery('x)
val y = testRelation1.subquery('y)

val originalQuery = {
x.join(y, condition = Some("d".attr === "b".attr && "d".attr === "c".attr && "a".attr > 10))
}

val nullLiteral = Literal.create(null, IntegerType)

val optimized = Optimize.execute(originalQuery.analyze)
val left = testRelation.where(IsNotNull('b) && IsNotNull('c))
val right = testRelation1.where(IsNotNull('d))
val correctAnswer =
left.join(right,
condition = Some(("d".attr === "b".attr && "d".attr === "c".attr) && "a".attr > 10)).analyze

comparePlans(optimized, correctAnswer)
}

test("don't need to add filters to left and right of inner join if constraints are ready") {
// left and right of join node already have not null constraints
val x = testRelation.subquery('x).where(IsNotNull('b) && IsNotNull('c)).select('a, 'b, 'c)
val y = testRelation1.subquery('y).where(IsNotNull('d)).select('d)

val originalQuery = {
x.join(y, condition = Some("d".attr === "b".attr && "d".attr === "c".attr && "a".attr > 10))
}

val nullLiteral = Literal.create(null, IntegerType)

val optimized = Optimize.execute(originalQuery.analyze)
assert(optimized.collect {
case j @ Join(left, right, _, _)
if !left.isInstanceOf[Filter] && !right.isInstanceOf[Filter] => 1
}.nonEmpty)
}
}