-
Notifications
You must be signed in to change notification settings - Fork 13k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FLINK-9344] [table] Support INTERSECT and INTERSECT ALL for streaming #5998
Conversation
b93360a
to
d007ddb
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @Xpray for the contribution. It looks pretty good! I just left a few comment and questions.
I guess I am confused by the JIRA ticket description as it doesn't specify whether you are supporting unbounded intersect or windowed intersect, or both.
I guess a brief description would be very helpful here, for other reviewers as well.
--
Rong
} | ||
|
||
override def toString: String = { | ||
s"Intersect$intersectType(intersect$intersectType: ($intersectSelectionToString))" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
s"Intersect$intersectType(intersect: ($intersectSelectionToString))"
I dont think you need to duplicate the type twice
with DataStreamRel { | ||
|
||
private lazy val intersectType = if (all) { | ||
"All" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
" All"
might be better formatting since you only attached this to the explainTerm
and toString
method
import org.apache.flink.types.Row | ||
import org.apache.flink.util.Collector | ||
|
||
class StreamIntersectCoProcessFunction( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Missing JavaDoc
resultType: TypeInformation[Row], | ||
queryConfig: StreamQueryConfig, | ||
all: Boolean) | ||
extends CoProcessFunction[CRow, CRow, CRow] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess I am confused here:
There's CoGroupedStream
with customized CoGroupFunction
which is already supported in DataStream API. seems like if we operate on a windowed stream, we can apply the intersect
as a CoGroupFunction
. Is this function solely targeting the non-windowed intersect case. If so, can we rename the function (also adds to my point: please add Java Doc).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it makes sense to have two implementations of this operator.
- For tables with a time attribute. This implementation works without retraction and can automatically cleanup the state.
- For tables without time attributes. This implementation needs to cleanup state based on retention time and produces retractions.
This PR seems to address both cases, which is fine for now. We can improve for 1. later on. Both cases should be implemented as CoProcessFunction
. We should try to be independent of the DataStream window operators, IMO.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the review @walterddr and @fhueske , This PR intends to support NonWindow intersect just like NonWindow innerJoin.
} | ||
} | ||
|
||
private def expireOutTimeRow( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't believe any of your test triggers this code path.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider overriding the queryConfig for triggering this perhaps
|
||
validateEqualsHashCode("intersect", resultType) | ||
|
||
// state to hold left stream element |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The description is misleading, you are not actually holding the "row" of stream element if I understand correctly.
bb72574
to
492f7b6
Compare
Thanks for the review @walterddr @fhueske , I've updated the PR. |
@fhueske , I would like to support minus/minus All after this issue, would you give some suggestion about this issue? |
[FLINK-9344] [TableAPI & SQL] Support INTERSECT and INTERSECT ALL for streaming
What is the purpose of the change
Support Intersect and Intersect All for Streaming SQL and TableAPI
Brief change log
Verifying this change
This change added tests and can be verified as follows:
cases of intersect operations in both
org.apache.flink.table.runtime.stream.sql.SetOperatorsITCase
andorg.apache.flink.table.runtime.stream.table.SetOperatorsITCase
Does this pull request potentially affect one of the following parts:
@Public(Evolving)
: noDocumentation