-
Notifications
You must be signed in to change notification settings - Fork 28.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-32862][SS] Left semi stream-stream join #30076
Conversation
cc @cloud-fan and @sameeragarwal if you guys have time to take a look, thanks. |
cc @HeartSaVioR |
Kubernetes integration test starting |
Kubernetes integration test status success |
Test build #129931 has finished for PR 30076 at commit
|
Kubernetes integration test starting |
Kubernetes integration test status success |
Test build #129934 has finished for PR 30076 at commit
|
I've just picked this up and from high level perspective I see at least 2 things in the PR:
I suggest to split them up by creating a jira for the test code deduplication. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just went through the code except test suite. Seems OK to me and I'll look into test suite in a couple of days.
Btw, I have a feeling that left semi join seems to be different enough compared to the other joins, which might be worth to take a different path instead of adding exceptions for left semi.
e.g. I guess you'd like to simply remove the left side of row instead of marking it to be matched
in state whenever it got matched with right side of row. (This seems to be what you say as "follow-up".)
I'm OK to do it later assuming it doesn't end up with touching public API.
...alyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala
Outdated
Show resolved
Hide resolved
...alyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala
Show resolved
Hide resolved
...src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala
Outdated
Show resolved
Hide resolved
...src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala
Outdated
Show resolved
Hide resolved
...ain/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala
Outdated
Show resolved
Hide resolved
...src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @HeartSaVioR for review, addressed all comments for now.
...alyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala
Show resolved
Hide resolved
...alyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala
Outdated
Show resolved
Hide resolved
...src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala
Outdated
Show resolved
Hide resolved
...src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala
Outdated
Show resolved
Hide resolved
...src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala
Outdated
Show resolved
Hide resolved
...ain/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala
Outdated
Show resolved
Hide resolved
Kubernetes integration test starting |
Kubernetes integration test status success |
...alyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala
Outdated
Show resolved
Hide resolved
...alyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala
Outdated
Show resolved
Hide resolved
...src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala
Outdated
Show resolved
Hide resolved
...src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala
Outdated
Show resolved
Hide resolved
...src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala
Outdated
Show resolved
Hide resolved
...ain/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala
Outdated
Show resolved
Hide resolved
keyWithIndexToValue.getAll(key, numValues).filterNot { keyIdxToValue => | ||
joinOnlyFirstTimeMatchedRow && keyIdxToValue.matched | ||
}.map { keyIdxToValue => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I feel it is easier to read if:
val keyIdxToValues = if (joinOnlyFirstTimeMatchedRow) {
keyWithIndexToValue.getAll(key, numValues).filter { keyIdxToValue =>
!keyIdxToValue.matched
}
} else {
keyWithIndexToValue.getAll(key, numValues)
}
keyIdxToValues.map { keyIdxToValue =>
...
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think current code is more concise and doesn't make distraction. If someone feels filterNot is something possibly making confusion, we can simply use filter and reverse the condition.
Test build #130010 has finished for PR 30076 at commit
|
} | ||
|
||
after { | ||
StateStore.stop() | ||
} | ||
|
||
protected def setupStream(prefix: String, multiplier: Int): (MemoryStream[Int], DataFrame) = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It'd be pretty much helpful to provide guide comments on tracking refactors.
e.g. this is equivalent to StreamingOuterJoinSuite.setupStream
with changing signature private
to protected
to co-use.
val windowed1 = df1.select('key, window('leftTime, "10 second"), 'leftValue) | ||
val windowed2 = df2.select('key, window('rightTime, "10 second"), 'rightValue) | ||
val joined = windowed1.join(windowed2, Seq("key", "window"), joinType) | ||
val select = if (joinType == "left_semi") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For reviewers: this is equivalent to StreamingOuterJoinSuite.setupWindowedJoin
with changing
- signature
private
toprotected
- conditional select on left_semi vs others, as in left_semi only left side of columns are available
(input1, input2, select) | ||
} | ||
|
||
protected def setupWindowedJoinWithLeftCondition(joinType: String) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For reviewers: this is extracted from test("left outer early state exclusion on left")
/ test("right outer early state exclusion on left")
, with adding select per join type.
(leftInput, rightInput, select) | ||
} | ||
|
||
protected def setupWindowedJoinWithRightCondition(joinType: String) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For reviewers: this is extracted from test("left outer early state exclusion on right")
/ test("right outer early state exclusion on right")
, with adding select per join type.
(leftInput, rightInput, select) | ||
} | ||
|
||
protected def setupWindowedJoinWithRangeCondition(joinType: String) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For reviewers: this is extracted from test(s"${joinType.replaceAllLiterally("_", " ")} with watermark range condition")
, with conditional select on left_semi vs others, as in left_semi only left side of columns are available.
sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingJoinSuite.scala
Show resolved
Hide resolved
sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingJoinSuite.scala
Show resolved
Hide resolved
sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingJoinSuite.scala
Show resolved
Hide resolved
sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingJoinSuite.scala
Outdated
Show resolved
Hide resolved
sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingJoinSuite.scala
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Addressed all comments and the PR is ready for review again, cc @HeartSaVioR , thanks.
...alyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala
Outdated
Show resolved
Hide resolved
...alyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala
Outdated
Show resolved
Hide resolved
...src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala
Outdated
Show resolved
Hide resolved
...src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala
Outdated
Show resolved
Hide resolved
...src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala
Outdated
Show resolved
Hide resolved
...ain/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala
Outdated
Show resolved
Hide resolved
Kubernetes integration test starting |
Test build #130072 has finished for PR 30076 at commit
|
Kubernetes integration test status success |
retest this please |
sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingJoinSuite.scala
Show resolved
Hide resolved
Kubernetes integration test starting |
Kubernetes integration test status success |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Addressed all comments.
if (right.isStreaming) { | ||
throwError("Left semi/anti joins with a streaming DataFrame/Dataset " + | ||
throwError("Left anti joins with a streaming DataFrame/Dataset " + | ||
"on the right are not supported") | ||
} | ||
|
||
// We support streaming left outer joins with static on the right always, and with |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@xuanyuanking - updated.
streamRelation.join(streamRelation, joinType = LeftSemi, | ||
condition = Some(attribute === attribute)), | ||
OutputMode.Append(), | ||
Seq("watermark in the join keys")) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@xuanyuanking - yeah I agree adding "without" would be better. I updated for the left semi join here. A refactoring for all joins (inner, outer, semi, anti, etc) is anyway needed as a followup JIRA (https://issues.apache.org/jira/browse/SPARK-33209), so I want to clean up other places in a separate PR, e.g. "appropriate range condition" has similar problem.
// right: (1, 10), (2, 5) | ||
assertNumStateRows(total = 4, updated = 2), | ||
AddData(rightInput, (1, 11)), | ||
// No match as left time is too low and left row is already matched. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@HeartSaVioR - sounds good, updated.
// states | ||
// left: (2, 2L), (4, 4L) | ||
// (left rows with value % 2 != 0 is filtered per [[PushDownLeftSemiAntiJoin]]) | ||
// right: (2, 2L), (4, 4L) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@HeartSaVioR - updated, I also figured the optimization rule should be PushPredicateThroughJoin
, instead of PushDownLeftSemiAntiJoin
, updated comment as well.
val numValues = keyToNumValues.get(key) | ||
keyWithIndexToValue.getAll(key, numValues).map { keyIdxToValue => | ||
keyWithIndexToValue.getAll(key, numValues).filterNot { keyIdxToValue => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
FYI I created https://issues.apache.org/jira/browse/SPARK-33211 for this followup.
Kubernetes integration test starting |
Kubernetes integration test status success |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, only a nit just commented.
Kubernetes integration test starting |
Kubernetes integration test status success |
Test build #130106 has finished for PR 30076 at commit
|
Test build #130110 has finished for PR 30076 at commit
|
I'll let the PR around 2 days to see whether others have additional comments. If no further comment is provided I'll merge this probably in this weekend. cc. @viirya @xuanyuanking |
@HeartSaVioR Agree, post my LGTM. |
Will go through this again today. |
@viirya - wondering any more comments? thanks. |
@viirya - gentle ping again, any more comments before merging? Thanks. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't look at tests in details, but the left-semi streaming join part looks okay,
retest this, please |
Kubernetes integration test starting |
Kubernetes integration test status failure |
retest this please |
Kubernetes integration test starting |
Kubernetes integration test status success |
Test build #130246 has finished for PR 30076 at commit
|
Thanks, merging to master. |
Thanks for your contribution! Merged into master. |
Test build #130248 has finished for PR 30076 at commit
|
Thank you @HeartSaVioR , @xuanyuanking and @viirya for review! |
…rtedOperationsSuite ### What changes were proposed in this pull request? This PR is a followup from #30076 to refactor unit test of stream-stream join in `UnsupportedOperationsSuite`, where we had a lot of duplicated code for stream-stream join unit test, for each join type. ### Why are the changes needed? Help reduce duplicated code and make it easier for developers to read and add code in the future. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Existing unit test in `UnsupportedOperationsSuite.scala` (pure refactoring). Closes #30347 from c21/stream-test. Authored-by: Cheng Su <chengsu@fb.com> Signed-off-by: Jungtaek Lim (HeartSaVioR) <kabhwan.opensource@gmail.com>
What changes were proposed in this pull request?
This is to support left semi join in stream-stream join. The implementation of left semi join is (mostly in
StreamingSymmetricHashJoinExec
andSymmetricHashJoinStateManager
):Note a followup optimization can be to evict matched left side rows from state store earlier, even when the rows are still above watermark. However this needs more change in
SymmetricHashJoinStateManager
, so will leave this as a followup.Why are the changes needed?
Current stream-stream join supports inner, left outer and right outer join (https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala#L166 ). We do see internally a lot of users are using left semi stream-stream join (not spark structured streaming), e.g. I want to get the ad impression (join left side) which has click (joint right side), but I don't care how many clicks per ad (left semi semantics).
Does this PR introduce any user-facing change?
No.
How was this patch tested?
Added unit tests in
UnsupportedOperationChecker.scala
andStreamingJoinSuite.scala
.