-
Notifications
You must be signed in to change notification settings - Fork 28.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-32862][SS] Left semi stream-stream join #30076
Changes from 2 commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -490,15 +490,77 @@ class UnsupportedOperationsSuite extends SparkFunSuite { | |
_.join(_, joinType = LeftSemi), | ||
streamStreamSupported = false, | ||
batchStreamSupported = false, | ||
expectedMsg = "left semi/anti joins") | ||
expectedMsg = "LeftSemi join") | ||
|
||
// Left semi joins: update and complete mode not allowed | ||
c21 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
assertNotSupportedInStreamingPlan( | ||
s"left semi join with stream-stream relations and update mode", | ||
c21 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
streamRelation.join(streamRelation, joinType = LeftSemi, | ||
condition = Some(attribute === attribute)), | ||
OutputMode.Update(), | ||
Seq("is not supported in Update output mode")) | ||
assertNotSupportedInStreamingPlan( | ||
s"left semi join with stream-stream relations and complete mode", | ||
c21 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
Aggregate(Nil, aggExprs("d"), streamRelation.join(streamRelation, joinType = LeftSemi, | ||
condition = Some(attribute === attribute))), | ||
OutputMode.Complete(), | ||
Seq("is not supported in Complete output mode")) | ||
|
||
// Left ousemiter joins: stream-stream allowed with join on watermark attribute | ||
c21 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
// Note that the attribute need not be watermarked on both sides. | ||
assertSupportedInStreamingPlan( | ||
s"left semi join with stream-stream relations and join on attribute with left watermark", | ||
streamRelation.join(streamRelation, joinType = LeftSemi, | ||
condition = Some(attributeWithWatermark === attribute)), | ||
OutputMode.Append()) | ||
assertSupportedInStreamingPlan( | ||
s"left semi join with stream-stream relations and join on attribute with right watermark", | ||
streamRelation.join(streamRelation, joinType = LeftSemi, | ||
condition = Some(attribute === attributeWithWatermark)), | ||
OutputMode.Append()) | ||
assertNotSupportedInStreamingPlan( | ||
s"left semi join with stream-stream relations and join on non-watermarked attribute", | ||
streamRelation.join(streamRelation, joinType = LeftSemi, | ||
condition = Some(attribute === attribute)), | ||
OutputMode.Append(), | ||
Seq("watermark in the join keys")) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Personally, I prefer to change the match string to There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @xuanyuanking - yeah I agree adding "without" would be better. I updated for the left semi join here. A refactoring for all joins (inner, outer, semi, anti, etc) is anyway needed as a followup JIRA (https://issues.apache.org/jira/browse/SPARK-33209), so I want to clean up other places in a separate PR, e.g. "appropriate range condition" has similar problem. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Make sense. |
||
|
||
// Left semi joins: stream-stream allowed with range condition yielding state value watermark | ||
assertSupportedInStreamingPlan( | ||
s"left semi join with stream-stream relations and state value watermark", { | ||
val leftRelation = streamRelation | ||
val rightTimeWithWatermark = | ||
AttributeReference("b", IntegerType)().withMetadata(watermarkMetadata) | ||
val rightRelation = new TestStreamingRelation(rightTimeWithWatermark) | ||
leftRelation.join( | ||
rightRelation, | ||
joinType = LeftSemi, | ||
condition = Some(attribute > rightTimeWithWatermark + 10)) | ||
}, | ||
OutputMode.Append()) | ||
|
||
// Left semi joins: stream-stream not allowed with insufficient range condition | ||
assertNotSupportedInStreamingPlan( | ||
s"left semi join with stream-stream relations and state value watermark", { | ||
val leftRelation = streamRelation | ||
val rightTimeWithWatermark = | ||
AttributeReference("b", IntegerType)().withMetadata(watermarkMetadata) | ||
val rightRelation = new TestStreamingRelation(rightTimeWithWatermark) | ||
leftRelation.join( | ||
rightRelation, | ||
joinType = LeftSemi, | ||
condition = Some(attribute < rightTimeWithWatermark + 10)) | ||
}, | ||
OutputMode.Append(), | ||
Seq("appropriate range condition")) | ||
|
||
// Left anti joins: stream-* not allowed | ||
testBinaryOperationInStreamingPlan( | ||
"left anti join", | ||
_.join(_, joinType = LeftAnti), | ||
streamStreamSupported = false, | ||
batchStreamSupported = false, | ||
expectedMsg = "left semi/anti joins") | ||
expectedMsg = "Left anti join") | ||
|
||
// Right outer joins: stream-* not allowed | ||
testBinaryOperationInStreamingPlan( | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Also change this comment?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@xuanyuanking - updated.