Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-23405] Generate additional constraints for Join's children #20670

Closed
wants to merge 8 commits into from
Closed
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -661,7 +661,7 @@ object InferFiltersFromConstraints extends Rule[LogicalPlan] with PredicateHelpe
case join @ Join(left, right, joinType, conditionOpt) =>
// Only consider constraints that can be pushed down completely to either the left or the
// right child
val constraints = join.constraints.filter { c =>
val constraints = join.allConstraints.filter { c =>
c.references.subsetOf(left.outputSet) || c.references.subsetOf(right.outputSet)
}
// Remove those constraints that are already enforced by either the left or the right child
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,21 +22,30 @@ import org.apache.spark.sql.catalyst.expressions._

trait QueryPlanConstraints { self: LogicalPlan =>

/**
* An [[ExpressionSet]] that contains an additional set of constraints, such as equality
* constraints and `isNotNull` constraints, etc.
*/
lazy val allConstraints: ExpressionSet = {
if (conf.constraintPropagationEnabled) {
ExpressionSet(validConstraints
.union(inferAdditionalConstraints(validConstraints))
.union(constructIsNotNullConstraints(validConstraints)))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: indents

} else {
ExpressionSet(Set.empty)
}
}

/**
* An [[ExpressionSet]] that contains invariants about the rows output by this operator. For
* example, if this set contains the expression `a = 2` then that expression is guaranteed to
* evaluate to `true` for all rows produced.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comment belongs to constraints not allConstraints

*/
lazy val constraints: ExpressionSet = {
if (conf.constraintPropagationEnabled) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

now we don't need this if.

ExpressionSet(
validConstraints
.union(inferAdditionalConstraints(validConstraints))
.union(constructIsNotNullConstraints(validConstraints))
.filter { c =>
c.references.nonEmpty && c.references.subsetOf(outputSet) && c.deterministic
}
)
ExpressionSet(allConstraints.filter { c =>
c.references.nonEmpty && c.references.subsetOf(outputSet) && c.deterministic
})
} else {
ExpressionSet(Set.empty)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -192,4 +192,17 @@ class InferFiltersFromConstraintsSuite extends PlanTest {

comparePlans(Optimize.execute(original.analyze), correct.analyze)
}

test("SPARK-23405: left-semi equal-join should filter out null join keys on both sides") {
val x = testRelation.subquery('x)
val y = testRelation.subquery('y)
val condition = Some("x.a".attr === "y.a".attr)
val originalQuery = x.join(y, LeftSemi, condition).analyze
val left = x.where(IsNotNull('a))
val right = y.where(IsNotNull('a))
val correctAnswer = left.join(right, LeftSemi, condition)
.analyze
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this doesn't need to be in a new line

val optimized = Optimize.execute(originalQuery)
comparePlans(optimized, correctAnswer)
}
}