Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-39172][SQL] Remove left/right outer join if only left/right side columns are selected and the join keys on the other side are unique #36530

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -462,6 +462,7 @@ package object dsl {
Window(windowExpressions, partitionSpec, orderSpec, logicalPlan)

def subquery(alias: Symbol): LogicalPlan = SubqueryAlias(alias.name, logicalPlan)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know why do we need to accept a Symbol here. We can probably do a cleanup later and remove this method. The same to def as(alias: Symbol): NamedExpression = Alias(expr, alias.name)() in this file.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I looked up the history, it is added at the beginning of SQL ..

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure, will do a cleanup later

def subquery(alias: String): LogicalPlan = SubqueryAlias(alias, logicalPlan)

def except(otherPlan: LogicalPlan, isAll: Boolean): LogicalPlan =
Except(logicalPlan, otherPlan, isAll)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import scala.util.control.NonFatal

import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateFunction
import org.apache.spark.sql.catalyst.planning.ExtractFiltersAndInnerJoins
import org.apache.spark.sql.catalyst.planning.{ExtractEquiJoinKeys, ExtractFiltersAndInnerJoins}
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules._
Expand Down Expand Up @@ -139,6 +139,17 @@ object ReorderJoin extends Rule[LogicalPlan] with PredicateHelper {
* SELECT t1.c1, max(t1.c2) FROM t1 GROUP BY t1.c1
* }}}
*
* 3. Remove outer join if:
* - For a left outer join with only left-side columns being selected and the right side join
* keys are unique.
* - For a right outer join with only right-side columns being selected and the left side join
* keys are unique.
*
* {{{
* SELECT t1.* FROM t1 LEFT JOIN (SELECT DISTINCT c1 as c1 FROM t) t2 ON t1.c1 = t2.c1 ==>
* SELECT t1.* FROM t1
* }}}
*
* This rule should be executed before pushing down the Filter
*/
object EliminateOuterJoin extends Rule[LogicalPlan] with PredicateHelper {
Expand Down Expand Up @@ -211,6 +222,15 @@ object EliminateOuterJoin extends Rule[LogicalPlan] with PredicateHelper {
if projectList.forall(_.deterministic) && p.references.subsetOf(right.outputSet) &&
allDuplicateAgnostic(aggExprs) =>
a.copy(child = p.copy(child = right))

case p @ Project(_, ExtractEquiJoinKeys(LeftOuter, _, rightKeys, _, _, left, right, _))
if right.distinctKeys.exists(_.subsetOf(ExpressionSet(rightKeys))) &&
p.references.subsetOf(left.outputSet) =>
p.copy(child = left)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For a left outer join with only left-side columns being selected, the join can only change the result if we can find more than one matched row on the right side. If the right side join keys are unique, apparently we can't find more than one match. So this optimization LGTM.

case p @ Project(_, ExtractEquiJoinKeys(RightOuter, leftKeys, _, _, _, left, right, _))
if left.distinctKeys.exists(_.subsetOf(ExpressionSet(leftKeys))) &&
p.references.subsetOf(right.outputSet) =>
p.copy(child = right)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -268,4 +268,59 @@ class OuterJoinEliminationSuite extends PlanTest {

comparePlans(optimized, originalQuery.analyze)
}

test("SPARK-39172: Remove left outer join if only left-side columns being selected and " +
Copy link
Contributor

@cloud-fan cloud-fan May 16, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove left outer join if only left-side columns are selected and the join keys on the other side are unique.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The PR title can be Remove left/right outer join if only left/right side columns are selected and the join keys on the other side are unique

"the right side join keys are unique") {
val x = testRelation.subquery("x")
val y = testRelation1.subquery("y")
comparePlans(Optimize.execute(
x.join(y.groupBy($"d")($"d"), LeftOuter, Some($"a" === $"d"))
.select($"a", $"b", $"c").analyze),
x.select($"a", $"b", $"c").analyze
)

comparePlans(Optimize.execute(
x.join(y.groupBy($"d")($"d", count($"d").as("x")), LeftOuter,
Some($"a" === $"d" && $"b" === $"x"))
.select($"a", $"b", $"c").analyze),
x.select($"a", $"b", $"c").analyze
)
}

test("SPARK-39172: Remove right outer join if only right-side columns being selected and " +
"the left side join keys are unique") {
val x = testRelation.subquery("x")
val y = testRelation1.subquery("y")
comparePlans(Optimize.execute(
x.groupBy($"a")($"a").join(y, RightOuter, Some($"a" === $"d"))
.select($"d", $"e", $"f").analyze),
y.select($"d", $"e", $"f").analyze
)

comparePlans(Optimize.execute(
x.groupBy($"a")($"a", count($"a").as("x")).join(y, RightOuter,
Some($"a" === $"d" && $"x" === $"e"))
.select($"d", $"e", $"f").analyze),
y.select($"d", $"e", $"f").analyze
)
}

test("SPARK-39172: Negative case, do not remove outer join") {
val x = testRelation.subquery("x")
val y = testRelation1.subquery("y")
// not a equi-join
val p1 = x.join(y.groupBy($"d")($"d"), LeftOuter, Some($"a" > $"d"))
.select($"a").analyze
comparePlans(Optimize.execute(p1), p1)

// do not exist unique key
val p2 = x.join(y.groupBy($"d", $"e")($"d", $"e"), LeftOuter, Some($"a" === $"d"))
.select($"a").analyze
comparePlans(Optimize.execute(p2), p2)

// output comes from the right side of a left outer join
val p3 = x.join(y.groupBy($"d")($"d"), LeftOuter, Some($"a" === $"d"))
.select($"a", $"d").analyze
comparePlans(Optimize.execute(p3), p3)
}
}