-
Notifications
You must be signed in to change notification settings - Fork 28.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-32862][SS] Left semi stream-stream join #30076
Changes from 4 commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -490,15 +490,77 @@ class UnsupportedOperationsSuite extends SparkFunSuite { | |
_.join(_, joinType = LeftSemi), | ||
streamStreamSupported = false, | ||
batchStreamSupported = false, | ||
expectedMsg = "left semi/anti joins") | ||
expectedMsg = "LeftSemi join") | ||
|
||
// Left semi joins: update and complete mode not allowed | ||
c21 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
assertNotSupportedInStreamingPlan( | ||
"left semi join with stream-stream relations and update mode", | ||
streamRelation.join(streamRelation, joinType = LeftSemi, | ||
condition = Some(attribute === attribute)), | ||
OutputMode.Update(), | ||
Seq("is not supported in Update output mode")) | ||
assertNotSupportedInStreamingPlan( | ||
"left semi join with stream-stream relations and complete mode", | ||
Aggregate(Nil, aggExprs("d"), streamRelation.join(streamRelation, joinType = LeftSemi, | ||
condition = Some(attribute === attribute))), | ||
OutputMode.Complete(), | ||
Seq("is not supported in Complete output mode")) | ||
|
||
// Left semi joins: stream-stream allowed with join on watermark attribute | ||
// Note that the attribute need not be watermarked on both sides. | ||
assertSupportedInStreamingPlan( | ||
"left semi join with stream-stream relations and join on attribute with left watermark", | ||
streamRelation.join(streamRelation, joinType = LeftSemi, | ||
condition = Some(attributeWithWatermark === attribute)), | ||
OutputMode.Append()) | ||
assertSupportedInStreamingPlan( | ||
"left semi join with stream-stream relations and join on attribute with right watermark", | ||
streamRelation.join(streamRelation, joinType = LeftSemi, | ||
condition = Some(attribute === attributeWithWatermark)), | ||
OutputMode.Append()) | ||
assertNotSupportedInStreamingPlan( | ||
"left semi join with stream-stream relations and join on non-watermarked attribute", | ||
streamRelation.join(streamRelation, joinType = LeftSemi, | ||
condition = Some(attribute === attribute)), | ||
OutputMode.Append(), | ||
Seq("watermark in the join keys")) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Personally, I prefer to change the match string to There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @xuanyuanking - yeah I agree adding "without" would be better. I updated for the left semi join here. A refactoring for all joins (inner, outer, semi, anti, etc) is anyway needed as a followup JIRA (https://issues.apache.org/jira/browse/SPARK-33209), so I want to clean up other places in a separate PR, e.g. "appropriate range condition" has similar problem. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Make sense. |
||
|
||
// Left semi joins: stream-stream allowed with range condition yielding state value watermark | ||
assertSupportedInStreamingPlan( | ||
"left semi join with stream-stream relations and state value watermark", { | ||
val leftRelation = streamRelation | ||
val rightTimeWithWatermark = | ||
AttributeReference("b", IntegerType)().withMetadata(watermarkMetadata) | ||
val rightRelation = new TestStreamingRelation(rightTimeWithWatermark) | ||
leftRelation.join( | ||
rightRelation, | ||
joinType = LeftSemi, | ||
condition = Some(attribute > rightTimeWithWatermark + 10)) | ||
}, | ||
OutputMode.Append()) | ||
|
||
// Left semi joins: stream-stream not allowed with insufficient range condition | ||
assertNotSupportedInStreamingPlan( | ||
"left semi join with stream-stream relations and state value watermark", { | ||
val leftRelation = streamRelation | ||
val rightTimeWithWatermark = | ||
AttributeReference("b", IntegerType)().withMetadata(watermarkMetadata) | ||
val rightRelation = new TestStreamingRelation(rightTimeWithWatermark) | ||
leftRelation.join( | ||
rightRelation, | ||
joinType = LeftSemi, | ||
condition = Some(attribute < rightTimeWithWatermark + 10)) | ||
}, | ||
OutputMode.Append(), | ||
Seq("appropriate range condition")) | ||
|
||
// Left anti joins: stream-* not allowed | ||
testBinaryOperationInStreamingPlan( | ||
"left anti join", | ||
_.join(_, joinType = LeftAnti), | ||
streamStreamSupported = false, | ||
batchStreamSupported = false, | ||
expectedMsg = "left semi/anti joins") | ||
expectedMsg = "Left anti join") | ||
|
||
// Right outer joins: stream-* not allowed | ||
testBinaryOperationInStreamingPlan( | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -99,13 +99,20 @@ class SymmetricHashJoinStateManager( | |
/** | ||
* Get all the matched values for given join condition, with marking matched. | ||
* This method is designed to mark joined rows properly without exposing internal index of row. | ||
* | ||
* @param excludeRowsAlreadyMatched Do not join with rows already matched previously. | ||
* This is used for right side of left semi join in | ||
* [[StreamingSymmetricHashJoinExec]] only. | ||
*/ | ||
def getJoinedRows( | ||
key: UnsafeRow, | ||
generateJoinedRow: InternalRow => JoinedRow, | ||
predicate: JoinedRow => Boolean): Iterator[JoinedRow] = { | ||
predicate: JoinedRow => Boolean, | ||
excludeRowsAlreadyMatched: Boolean = false): Iterator[JoinedRow] = { | ||
val numValues = keyToNumValues.get(key) | ||
keyWithIndexToValue.getAll(key, numValues).map { keyIdxToValue => | ||
keyWithIndexToValue.getAll(key, numValues).filterNot { keyIdxToValue => | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It makes more sense to add this filter logic in the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I thought this first and not proposed because current predicate cannot check the condition. We can still do this via adjusting the type of predicate a bit, but I guess the followup PR would try to separate left semi case of performance which lets us to can revert the change here. For the reason I prefer the small change for now. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, after taking a further look, the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. FYI I created https://issues.apache.org/jira/browse/SPARK-33211 for this followup. |
||
excludeRowsAlreadyMatched && keyIdxToValue.matched | ||
}.map { keyIdxToValue => | ||
val joinedRow = generateJoinedRow(keyIdxToValue.value) | ||
if (predicate(joinedRow)) { | ||
if (!keyIdxToValue.matched) { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Also change this comment?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@xuanyuanking - updated.