New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-35349][SQL] Add code-gen for left/right outer sort merge join #32476
Conversation
cc @cloud-fan and @maropu could you help take a look when you have time? Thanks. |
Kubernetes integration test starting |
Kubernetes integration test status failure |
Test build #138282 has finished for PR 32476 at commit
|
Kubernetes integration test starting |
Kubernetes integration test status failure |
Test build #138298 has finished for PR 32476 at commit
|
Kubernetes integration test starting |
Kubernetes integration test status failure |
Test build #138301 has finished for PR 32476 at commit
|
throw new IllegalArgumentException( | ||
s"SortMergeJoin.streamedPlan/bufferedPlan should not take $x as the JoinType") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about this?
private lazy val ((streamedPlan, streamedKyes), (bufferedPlan, bufferedKeys)) = joinType match {
case _: InnerLike | LeftOuter => ((left, leftKeys), (right, rightKeys))
case RightOuter => ((right, rightKeys), (left, leftKeys))
case x =>
throw new IllegalArgumentException(
s"SortMergeJoin.streamedPlan/bufferedPlan should not take $x as the JoinType")
}
private lazy val streamOutput = streamedPlan.output
private lazy val bufferedOutput = bufferedPlan.output
I think we don't need to repeat the joinType
check.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Makes sense to me, addressed in #32495 .
@@ -353,12 +353,37 @@ case class SortMergeJoinExec( | |||
} | |||
} | |||
|
|||
override def supportCodegen: Boolean = { | |||
joinType.isInstanceOf[InnerLike] | |||
private lazy val (streamedPlan, bufferedPlan) = joinType match { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need lazy
here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@maropu - yes, this is used for code-gen only. Note here we only pattern match inner/left outer/right outer join, so it will throw exception with val
for other join types.
|if (!$matches.isEmpty()) { | ||
| $matches.clear(); | ||
|} | ||
|return false; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
// Eagerly return streamed row.
s"""
|$matches.clear();
|return false;
""".stripMargin
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wanted to avoid clear()
if isEmpty()
is true. ExternalAppendOnlyUnsafeRowArray.isEmpty()
is very cheap but clear()
sets multiple variables.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see. Could you leave some comments about it there?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@maropu - added comment.
|
||
lazy val outerJoin = { | ||
val foundMatch = ctx.freshName("foundMatch") | ||
val foundJoinRows = ctx.freshName("foundJoinRows") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
foundJoinRows
not used?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My bad, forget to remove it during code iterations. Will remove.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@maropu - removed.
| scala.collection.Iterator leftIter, | ||
| scala.collection.Iterator rightIter) { | ||
| $leftRow = null; | ||
|private boolean findNextJoinRows( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the outer case, a return value is not used?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It looks reusing the inner-case code makes the outer-case code inefficient. For example, if there are too many matched duplicate rows in the buffered side, it seems we don't need to put all the rows in matches
, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the outer case, a return value is not used?
Yes. Otherwise it's very hard to re-use code in findNextJoinRows
. I can further make more change to not return anything for findNextJoinRows
in case it's an outer join. Do we want to do that?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For example, if there are too many matched duplicate rows in the buffered side, it seems we don't need to put all the rows in matches, right?
Why we don't need to put all the rows? We anyway need to evaluate all the rows on buffered side for join, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why we don't need to put all the rows? We anyway need to evaluate all the rows on buffered side for join, right?
Oh, my bad. ya, you're right. I misunderstood it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the outer case, a return value is not used?
Yes. Otherwise it's very hard to re-use code in findNextJoinRows. I can further make more change to not return anything for findNextJoinRows in case it's an outer join. Do we want to do that?
okay, the current one looks fine. Let's just wait for a @cloud-fan comment here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
btw, in the current generated code, it seems conditionCheck
is evaluated outside findNextJoinRows
. We cannot evaluate it inside findNextJoinRows
to avoid putting unmached rows in matches
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@maropu - No I think we need buffer anyway. The buffered rows has same join keys with current streamed row. But there can be multiple followed streamed rows having same join keys, as the buffered rows. Even though buffered rows cannot match condition with current streamed row, they may match condition with followed streamed rows. I think this is how current sort merge join (code-gen & iterator) is designed.
| } | ||
| } while ($leftRow != null); | ||
| } while ($streamedRow != null); | ||
| } | ||
| return false; // unreachable |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(This is not related to this PR though) In this case, could we throw an illegal state exception?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@maropu - sounds good to me. Will update.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@maropu - Interesting, when I tried to add a throw new IllegalStateException
before return false
, janino compiler is clever enough to figure out the statement is unreachable and throws exception when trying to compile - https://gist.github.com/c21/196166411d5d0406d9a76b37be889194 . So I think we'd better keep this as it is for now?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, I missed this comment. it looks interesting. sgtm.
Could you update the |
@maropu - |
Ah, okay. sgtm. |
can we open a PR to do the renaming first? left, right to streamed, buffered, to make this PR easier to review. |
@cloud-fan - sounds good, #32495 is for the renaming part only. Thanks. |
…oin type ### What changes were proposed in this pull request? This is a pre-requisite of #32476, in discussion of #32476 (comment) . This is to refactor sort merge join code-gen to depend on streamed/buffered terminology, which makes the code-gen agnostic to different join types and can be extended to support other join types than inner join. ### Why are the changes needed? Pre-requisite of #32476. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Existing unit test in `InnerJoinSuite.scala` for inner join code-gen. Closes #32495 from c21/smj-refactor. Authored-by: Cheng Su <chengsu@fb.com> Signed-off-by: Takeshi Yamamuro <yamamuro@apache.org>
Co-authored-by: Chen Li <meloli87@gmail.com>
@@ -501,7 +538,7 @@ case class SortMergeJoinExec( | |||
ctx: CodegenContext, | |||
streamedRow: String): (Seq[ExprCode], Seq[String]) = { | |||
ctx.INPUT_ROW = streamedRow | |||
left.output.zipWithIndex.map { case (a, i) => | |||
streamedPlan.output.zipWithIndex.map { case (a, i) => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@maropu, @cloud-fan - This PR is rebased on top of #32495, and ready for review now, thanks. |
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala
Show resolved
Hide resolved
Kubernetes integration test starting |
Kubernetes integration test status failure |
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala
Show resolved
Hide resolved
Kubernetes integration test starting |
Kubernetes integration test status failure |
Test build #138360 has finished for PR 32476 at commit
|
// all matched rows into `matches`. Return true when getting all matched rows. | ||
// For `streamedRow` without `matches` (`handleStreamedWithoutMatch`): | ||
// 1. Inner join: skip the row. | ||
// 2. Left/Right Outer join: keep the row and return false. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
keep the row and return false (with matches being empty)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@cloud-fan - updated.
// 2. Left/Right Outer join: clear the previous `matches` if needed, keep the row, | ||
// and return false. | ||
// - Step 2: Find the `matches` from buffered side having same join keys with `streamedRow`. | ||
// If previous `matches` is not empty, check the join keys and clear the `matches` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: we can simply say Clear matches if we hit a new streamedRow, as we need to find new matches.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@cloud-fan - updated.
Test build #138374 has finished for PR 32476 at commit
|
Kubernetes integration test starting |
Kubernetes integration test status failure |
Test build #138407 has finished for PR 32476 at commit
|
// The function has the following step: | ||
// - Step 1: Find the next `streamedRow` with non-null join keys. | ||
// For `streamedRow` with null join keys (`handleStreamedAnyNull`): | ||
// 1. Inner join: skip the row. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
null join keys is also kind of a new streamRow
, so ideally we should clear matches
for inner join as well. It's ok because the matches
will be cleared when hitting the next streamedRow
without null join keys.
How about 1. Inner join: skip the row. matches will be cleared later when hitting the next streamedRow with non-null join keys.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@cloud-fan - sure, updated the comment.
| ${streamedVarDecl.mkString("\n")} | ||
| ${beforeLoop.trim} | ||
| scala.collection.Iterator<UnsafeRow> $iterator = $matches.generateIterator(); | ||
| boolean $foundMatch = false; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This name is a bit confusing, as we will set it to true even if there is no match. How about boolean firstIteration = true;
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Discussed offline for the naming. The variable is to indicate whether this streamed row has output row or not. So renamed to hasOutputRow
.
Kubernetes integration test starting |
Kubernetes integration test status failure |
Test build #138436 has finished for PR 32476 at commit
|
thanks, merging to master! |
Thank you @cloud-fan and @maropu for review! |
NOTE: This fix improved TPCDS(sf=20) q78: 160883ms => 143171ms. Nice. |
@maropu - thanks for heads up, this is great to know! |
What changes were proposed in this pull request?
This PR is to add code-gen support for LEFT OUTER / RIGHT OUTER sort merge join. Currently sort merge join only supports inner join type (https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala#L374 ). There's no fundamental reason why we cannot support code-gen for other join types. Here we add code-gen for LEFT OUTER / RIGHT OUTER join. Will submit followup PRs to add LEFT SEMI, LEFT ANTI and FULL OUTER code-gen separately.
The change is to extend current sort merge join logic to work with LEFT OUTER and RIGHT OUTER (should work with LEFT SEMI/ANTI as well, but FULL OUTER join needs some other more code change). Replace left/right with streamed/buffered to make code extendable to other join types besides inner join.
Example query:
Example generated code:
Why are the changes needed?
Improve query CPU performance. Example micro benchmark below showed 10% run-time improvement.
Does this PR introduce any user-facing change?
No.
How was this patch tested?
Added unit test in
WholeStageCodegenSuite.scala
andWholeStageCodegenSuite.scala
.