New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-39172][SQL] Remove left/right outer join if only left/right side columns are selected and the join keys on the other side are unique #36530
Conversation
…side keys exist unique key
@@ -139,6 +139,14 @@ object ReorderJoin extends Rule[LogicalPlan] with PredicateHelper { | |||
* SELECT t1.c1, max(t1.c2) FROM t1 GROUP BY t1.c1 | |||
* }}} | |||
* | |||
* 3. Remove outer join if all output comes from streamed side and the join keys from buffered side | |||
* exist unique key. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it looks a bit weird to talk about stream side and buffer side in the logical plan phase. Can we explain this optimization in a different way?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
changed it to
* 3. Remove outer join if:
* - For a left outer join with only left-side columns being selected and the right side join
* keys are unique.
* - For a right outer join with only right-side columns being selected and the left side join
* keys are unique.
case p @ Project(_, ExtractEquiJoinKeys(LeftOuter, _, rightKeys, _, _, left, right, _)) | ||
if right.distinctKeys.exists(_.subsetOf(ExpressionSet(rightKeys))) && | ||
p.references.subsetOf(left.outputSet) => | ||
p.copy(child = left) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For a left outer join with only left-side columns being selected, the join can only change the result if we can find more than one matched row on the right side. If the right side join keys are unique, apparently we can't find more than one match. So this optimization LGTM.
* keys are unique. | ||
* | ||
* {{{ | ||
* SELECT t1.* FROM t1 LEFT JOIN (SELECT DISTINCT c1 as c1 FROM t)t2 ON t1.c1 = t2.c1 ==> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
* SELECT t1.* FROM t1 LEFT JOIN (SELECT DISTINCT c1 as c1 FROM t)t2 ON t1.c1 = t2.c1 ==> | |
* SELECT t1.* FROM t1 LEFT JOIN (SELECT DISTINCT c1 as c1 FROM t) t2 ON t1.c1 = t2.c1 ==> |
@@ -268,4 +268,54 @@ class OuterJoinEliminationSuite extends PlanTest { | |||
|
|||
comparePlans(optimized, originalQuery.analyze) | |||
} | |||
|
|||
test("SPARK-39172: Remove outer join if all output come from streamed side and buffered side " + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please update the test name as well
|
||
test("SPARK-39172: Remove outer join if all output come from streamed side and buffered side " + | ||
"keys exist unique key") { | ||
val x = testRelation.subquery(Symbol("x")) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
val x = testRelation.subquery(Symbol("x")) | |
val x = testRelation.subquery("x") |
.select($"a").analyze | ||
comparePlans(Optimize.execute(p2), p2) | ||
|
||
// output comes from buffered side |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
// output comes from buffered side | |
// output comes from the right side of a left outer join |
@@ -462,6 +462,7 @@ package object dsl { | |||
Window(windowExpressions, partitionSpec, orderSpec, logicalPlan) | |||
|
|||
def subquery(alias: Symbol): LogicalPlan = SubqueryAlias(alias.name, logicalPlan) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't know why do we need to accept a Symbol
here. We can probably do a cleanup later and remove this method. The same to def as(alias: Symbol): NamedExpression = Alias(expr, alias.name)()
in this file.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I looked up the history, it is added at the beginning of SQL ..
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sure, will do a cleanup later
@@ -268,4 +268,59 @@ class OuterJoinEliminationSuite extends PlanTest { | |||
|
|||
comparePlans(optimized, originalQuery.analyze) | |||
} | |||
|
|||
test("SPARK-39172: Remove left outer join if only left-side columns being selected and " + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remove left outer join if only left-side columns are selected and the join keys on the other side are unique.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The PR title can be Remove left/right outer join if only left/right side columns are selected and the join keys on the other side are unique
thanks, merging to master! |
thank you @cloud-fan @sigmod for review |
What changes were proposed in this pull request?
Improve
EliminateOuterJoin
that support Remove left/right outer join if only left/right side columns are selected and the join keys on the other side are unique.Why are the changes needed?
Improve the optimzation case using the distinct keys framework.
For example:
Does this PR introduce any user-facing change?
no, improve performance
How was this patch tested?
add test