Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP][SPARK-22573][SQL] Shouldn't inferFilters if it contains SubqueryExpression #19804

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -267,6 +267,17 @@ object ScalarSubquery {
case _ => false
}.isDefined
}

def hasScalarSubquery(e: Expression): Boolean = {
e.find {
case s: ScalarSubquery => true
case _ => false
}.isDefined
}

def hasScalarSubquery(e: Seq[Expression]): Boolean = {
e.find(hasScalarSubquery(_)).isDefined
}
}

/**
Expand Down

Large diffs are not rendered by default.

Expand Up @@ -119,7 +119,11 @@ object EliminateOuterJoin extends Rule[LogicalPlan] with PredicateHelper {
* Returns whether the expression returns null or false when all inputs are nulls.
*/
private def canFilterOutNull(e: Expression): Boolean = {
if (!e.deterministic || SubqueryExpression.hasCorrelatedSubquery(e)) return false
if (!e.deterministic ||
SubqueryExpression.hasCorrelatedSubquery(e) ||
SubExprUtils.containsOuter(e)) {
return false
}
val attributes = e.references.toSeq
val emptyRow = new GenericInternalRow(attributes.length)
val boundE = BindReferences.bindReference(e, attributes)
Expand All @@ -146,9 +150,44 @@ object EliminateOuterJoin extends Rule[LogicalPlan] with PredicateHelper {
}
}

private def buildNewJoinType(
upperJoin: Join,
lowerJoin: Join,
otherTableOutput: AttributeSet): JoinType = {
val conditions = upperJoin.constraints
// Find the predicates reference only on the other table.
val localConditions = conditions.filter(_.references.subsetOf(otherTableOutput))
// Find the predicates reference either the left table or the join predicates
// between the left table and the other table.
val leftConditions = conditions.filter(_.references.
subsetOf(lowerJoin.left.outputSet ++ otherTableOutput)).diff(localConditions)
// Find the predicates reference either the right table or the join predicates
// between the right table and the other table.
val rightConditions = conditions.filter(_.references.
subsetOf(lowerJoin.right.outputSet ++ otherTableOutput)).diff(localConditions)

val leftHasNonNullPredicate = leftConditions.exists(canFilterOutNull)
val rightHasNonNullPredicate = rightConditions.exists(canFilterOutNull)

lowerJoin.joinType match {
case RightOuter if leftHasNonNullPredicate => Inner
case LeftOuter if rightHasNonNullPredicate => Inner
case FullOuter if leftHasNonNullPredicate && rightHasNonNullPredicate => Inner
case FullOuter if leftHasNonNullPredicate => LeftOuter
case FullOuter if rightHasNonNullPredicate => RightOuter
case o => o
}
}

def apply(plan: LogicalPlan): LogicalPlan = plan transform {
case f @ Filter(condition, j @ Join(_, _, RightOuter | LeftOuter | FullOuter, _)) =>
val newJoinType = buildNewJoinType(f, j)
if (j.joinType == newJoinType) f else Filter(condition, j.copy(joinType = newJoinType))
case j @ Join(child @ Join(_, _, RightOuter | LeftOuter | FullOuter, _),
subquery, LeftSemiOrAnti(joinType), joinCond) =>
val newJoinType = buildNewJoinType(j, child, subquery.outputSet)
if (newJoinType == child.joinType) j else {
Join(child.copy(joinType = newJoinType), subquery, joinType, joinCond)
}
}
}
Expand Up @@ -114,3 +114,10 @@ object LeftExistence {
case _ => None
}
}

object LeftSemiOrAnti {
def unapply(joinType: JoinType): Option[JoinType] = joinType match {
case LeftSemi | LeftAnti => Some(joinType)
case _ => None
}
}
Expand Up @@ -300,7 +300,7 @@ case class Join(
left.constraints
.union(right.constraints)
.union(splitConjunctivePredicates(condition.get).toSet)
case LeftSemi if condition.isDefined =>
case LeftSemi | LeftAnti if condition.isDefined =>
left.constraints
.union(splitConjunctivePredicates(condition.get).toSet)
case j: ExistenceJoin =>
Expand Down
Expand Up @@ -39,6 +39,7 @@ class FilterPushdownSuite extends PlanTest {
PushDownPredicate,
BooleanSimplification,
PushPredicateThroughJoin,
PushLeftSemiLeftAntiThroughJoin,
CollapseProject) :: Nil
}

Expand Down