[SPARK-32854][SS] Minor code and doc improvement for stream-stream join#29724
[SPARK-32854][SS] Minor code and doc improvement for stream-stream join#29724c21 wants to merge 3 commits intoapache:masterfrom
Conversation
|
cc @cloud-fan and @sameeragarwal if you have time to take a look, thanks. |
|
Test build #128552 has finished for PR 29724 at commit
|
|
retest this please |
| * If a timestamp column with event time watermark is present in the join keys or in the input | ||
| * data, then the it uses the watermark figure out which rows in the buffer will not join with | ||
| * and the new data, and therefore can be discarded. Depending on the provided query conditions, we | ||
| * data, then it uses the watermark figure out which rows in the buffer will not join with |
There was a problem hiding this comment.
uses the watermark to figure out
|
|
||
| // Join one side input using the other side's buffered/state rows. Here is how it is done. | ||
| // | ||
| // - `leftJoiner.joinWith(rightJoiner)` generates all rows from matching new left input with |
There was a problem hiding this comment.
I'm not familiar with this part, cc @zsxwing @HeartSaVioR @xuanyuanking
There was a problem hiding this comment.
The comment seems to be just modified for replacing leftJoiner.joinWith(rightJoiner) with leftSideJoiner.storeAndJoinWithOtherSide(rightSideJoiner) and vice versa for right side. Other parts aren't modified.
There was a problem hiding this comment.
@cloud-fan , @HeartSaVioR - yes, this is just updating the comment, because there's no leftJoiner/rightJoiner/joinWith in the file, and the original author (#19271) should mean to refer to leftSideJoiner/rightSideJoiner/storeAndJoinWithOtherSide. I think it would make sense to be consistent between code and comment here. This is anyway a minor change for comment only.
There was a problem hiding this comment.
I think the original PR just wants to use pseudocode to explain, either way is ok to me.
|
Test build #128556 has finished for PR 29724 at commit
|
c21
left a comment
There was a problem hiding this comment.
Thanks @cloud-fan and @HeartSaVioR , the PR is updated and ready for review again, thanks.
| * If a timestamp column with event time watermark is present in the join keys or in the input | ||
| * data, then the it uses the watermark figure out which rows in the buffer will not join with | ||
| * and the new data, and therefore can be discarded. Depending on the provided query conditions, we | ||
| * data, then it uses the watermark figure out which rows in the buffer will not join with |
|
|
||
| // Join one side input using the other side's buffered/state rows. Here is how it is done. | ||
| // | ||
| // - `leftJoiner.joinWith(rightJoiner)` generates all rows from matching new left input with |
There was a problem hiding this comment.
@cloud-fan , @HeartSaVioR - yes, this is just updating the comment, because there's no leftJoiner/rightJoiner/joinWith in the file, and the original author (#19271) should mean to refer to leftSideJoiner/rightSideJoiner/storeAndJoinWithOtherSide. I think it would make sense to be consistent between code and comment here. This is anyway a minor change for comment only.
|
|
||
| // Join one side input using the other side's buffered/state rows. Here is how it is done. | ||
| // | ||
| // - `leftJoiner.joinWith(rightJoiner)` generates all rows from matching new left input with |
There was a problem hiding this comment.
I think the original PR just wants to use pseudocode to explain, either way is ok to me.
| s"${getClass.getSimpleName} should not take $x as the JoinType") | ||
| case LeftOuter => left.outputPartitioning | ||
| case RightOuter => right.outputPartitioning | ||
| case _ => throwBadJoinTypeException() |
There was a problem hiding this comment.
Nich catch,
nit: let's remove the duplicate error string in https://github.com/apache/spark/pull/29724/files#diff-e9db271d8593f070ba7096e758c8b89dR168 and https://github.com/apache/spark/pull/29724/files#diff-e9db271d8593f070ba7096e758c8b89dR162
There was a problem hiding this comment.
@xuanyuanking - sorry that I don't get how to change, are you suggesting to have a string val for error message to be used in throwBadJoinTypeException and require(...): val errorMessageForJoinType = s"${getClass.getSimpleName} should not take $joinType as the JoinType"), or something else?
There was a problem hiding this comment.
Yes, have a string val for the same error message.
|
Test build #128578 has finished for PR 29724 at commit
|
|
Test build #128580 has finished for PR 29724 at commit
|
|
Addressed all comments and tests are passed. Let me know if anything needs to be changed, thanks, @cloud-fan . |
|
thanks, merging to master! |
|
Thank you @cloud-fan , @HeartSaVioR and @xuanyuanking for review! |
What changes were proposed in this pull request?
Several minor code and documentation improvement for stream-stream join. Specifically:
SparkPlan, as extending fromBinaryExecNodeis enough.left/right.outputPartitioningforLeft/RightOuterinoutputPartitioning, as thePartitioningCollectionwrapper is unnecessary (similar to batch joinsShuffledHashJoinExec,SortMergeJoinExec).generateFilteredJoinedRowinstoreAndJoinWithOtherSide). Similar optimization (i.e. create auxiliary method/variable per different join type before the iterator of input rows) has been done in batch join world (SortMergeJoinExec,ShuffledHashJoinExec).Why are the changes needed?
Minor optimization to avoid per-row unnecessary work (this probably can be optimized away by compiler, but we can do a better join to avoid it at the first place). And other comment/indentation fix to have better code readability for future developers.
Does this PR introduce any user-facing change?
No.
How was this patch tested?
Existing tests in
StreamingJoinSuite.scalaas no new logic is introduced.