Skip to content

Commit

Permalink
Try merge SPARK-19712
Browse files Browse the repository at this point in the history
  • Loading branch information
wangyum committed Dec 23, 2017
1 parent edd0434 commit c0925d9
Show file tree
Hide file tree
Showing 7 changed files with 1,117 additions and 27 deletions.
Expand Up @@ -267,6 +267,17 @@ object ScalarSubquery {
case _ => false
}.isDefined
}

def hasScalarSubquery(e: Expression): Boolean = {
e.find {
case s: ScalarSubquery => true
case _ => false
}.isDefined
}

def hasScalarSubquery(e: Seq[Expression]): Boolean = {
e.find(hasScalarSubquery(_)).isDefined
}
}

/**
Expand Down

Large diffs are not rendered by default.

Expand Up @@ -119,7 +119,11 @@ object EliminateOuterJoin extends Rule[LogicalPlan] with PredicateHelper {
* Returns whether the expression returns null or false when all inputs are nulls.
*/
private def canFilterOutNull(e: Expression): Boolean = {
if (!e.deterministic || SubqueryExpression.hasCorrelatedSubquery(e)) return false
if (!e.deterministic ||
SubqueryExpression.hasCorrelatedSubquery(e) ||
SubExprUtils.containsOuter(e)) {
return false
}
val attributes = e.references.toSeq
val emptyRow = new GenericInternalRow(attributes.length)
val boundE = BindReferences.bindReference(e, attributes)
Expand All @@ -146,9 +150,44 @@ object EliminateOuterJoin extends Rule[LogicalPlan] with PredicateHelper {
}
}

private def buildNewJoinType(
upperJoin: Join,
lowerJoin: Join,
otherTableOutput: AttributeSet): JoinType = {
val conditions = upperJoin.constraints
// Find the predicates reference only on the other table.
val localConditions = conditions.filter(_.references.subsetOf(otherTableOutput))
// Find the predicates reference either the left table or the join predicates
// between the left table and the other table.
val leftConditions = conditions.filter(_.references.
subsetOf(lowerJoin.left.outputSet ++ otherTableOutput)).diff(localConditions)
// Find the predicates reference either the right table or the join predicates
// between the right table and the other table.
val rightConditions = conditions.filter(_.references.
subsetOf(lowerJoin.right.outputSet ++ otherTableOutput)).diff(localConditions)

val leftHasNonNullPredicate = leftConditions.exists(canFilterOutNull)
val rightHasNonNullPredicate = rightConditions.exists(canFilterOutNull)

lowerJoin.joinType match {
case RightOuter if leftHasNonNullPredicate => Inner
case LeftOuter if rightHasNonNullPredicate => Inner
case FullOuter if leftHasNonNullPredicate && rightHasNonNullPredicate => Inner
case FullOuter if leftHasNonNullPredicate => LeftOuter
case FullOuter if rightHasNonNullPredicate => RightOuter
case o => o
}
}

def apply(plan: LogicalPlan): LogicalPlan = plan transform {
case f @ Filter(condition, j @ Join(_, _, RightOuter | LeftOuter | FullOuter, _)) =>
val newJoinType = buildNewJoinType(f, j)
if (j.joinType == newJoinType) f else Filter(condition, j.copy(joinType = newJoinType))
case j @ Join(child @ Join(_, _, RightOuter | LeftOuter | FullOuter, _),
subquery, LeftSemiOrAnti(joinType), joinCond) =>
val newJoinType = buildNewJoinType(j, child, subquery.outputSet)
if (newJoinType == child.joinType) j else {
Join(child.copy(joinType = newJoinType), subquery, joinType, joinCond)
}
}
}
Expand Up @@ -114,3 +114,10 @@ object LeftExistence {
case _ => None
}
}

object LeftSemiOrAnti {
def unapply(joinType: JoinType): Option[JoinType] = joinType match {
case LeftSemi | LeftAnti => Some(joinType)
case _ => None
}
}
Expand Up @@ -300,7 +300,7 @@ case class Join(
left.constraints
.union(right.constraints)
.union(splitConjunctivePredicates(condition.get).toSet)
case LeftSemi if condition.isDefined =>
case LeftSemi | LeftAnti if condition.isDefined =>
left.constraints
.union(splitConjunctivePredicates(condition.get).toSet)
case j: ExistenceJoin =>
Expand Down
Expand Up @@ -39,6 +39,7 @@ class FilterPushdownSuite extends PlanTest {
PushDownPredicate,
BooleanSimplification,
PushPredicateThroughJoin,
PushLeftSemiLeftAntiThroughJoin,
CollapseProject) :: Nil
}

Expand Down

0 comments on commit c0925d9

Please sign in to comment.