-
Notifications
You must be signed in to change notification settings - Fork 4.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[BEAM-7386] Introduce EventTimeBoundedEquijoin. #12915
Conversation
R: @reuvenlax |
/cc @kennknowles |
037894b
to
8a163ad
Compare
Out of curiosity, why are you adding this here instead of the schema join library (which SQL uses)? |
...a/extensions/join-library/src/main/java/org/apache/beam/sdk/extensions/joinlibrary/Join.java
Outdated
Show resolved
Hide resolved
I wasn't aware of the other join library. I saw the join extension library implementations, plus the previously closed PR in BEAM-7386, and thought that this new one should be placed near and made the assumption that SQL would reuse the implementation. Looking at it now though it seems like the SQL joins don't use the join extension library. Should I keep this one around or refactor into the SQL schema join library? |
8a163ad
to
23b525e
Compare
7c0fcab
to
5a5195d
Compare
After looking at the SQL schema join library I think it would be useful to keep this join in join-extension so it can be used with non-schema'd PCollections and support more than equijoins. The schema join should be able to reuse this implementation in the future by refactoring the |
Oops - didn't realize that 'Close with Comment' was for the whole PR. I thought it was just for the comment thread. |
Sorry for the delay. AFAIK both this and the schema library are limited today to equijoins. The schema API is designed so that we can extend it later with non equijoins, however doing arbitrary join conditions in a distributed manner can be a hard problem. |
This implementation allows for simple comparisons between records for the join beyond an equijoin by allowing the user to provide a |
I am a bit confused about the usage of compareFn here. State is per key, so I believe that your DoFn will only join items that have the same key - the compareFn will never even get to compare items with different keys. Is the idea to allow the user to generate a subset of an equijoin? |
Yes, it will be a subset of an equijoin. Sorry for the confusion. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry for the long delay!
...a/extensions/join-library/src/main/java/org/apache/beam/sdk/extensions/joinlibrary/Join.java
Outdated
Show resolved
Hide resolved
...a/extensions/join-library/src/main/java/org/apache/beam/sdk/extensions/joinlibrary/Join.java
Outdated
Show resolved
Hide resolved
...a/extensions/join-library/src/main/java/org/apache/beam/sdk/extensions/joinlibrary/Join.java
Outdated
Show resolved
Hide resolved
...a/extensions/join-library/src/main/java/org/apache/beam/sdk/extensions/joinlibrary/Join.java
Outdated
Show resolved
Hide resolved
...a/extensions/join-library/src/main/java/org/apache/beam/sdk/extensions/joinlibrary/Join.java
Show resolved
Hide resolved
...a/extensions/join-library/src/main/java/org/apache/beam/sdk/extensions/joinlibrary/Join.java
Outdated
Show resolved
Hide resolved
...a/extensions/join-library/src/main/java/org/apache/beam/sdk/extensions/joinlibrary/Join.java
Show resolved
Hide resolved
...a/extensions/join-library/src/main/java/org/apache/beam/sdk/extensions/joinlibrary/Join.java
Outdated
Show resolved
Hide resolved
...a/extensions/join-library/src/main/java/org/apache/beam/sdk/extensions/joinlibrary/Join.java
Outdated
Show resolved
Hide resolved
...a/extensions/join-library/src/main/java/org/apache/beam/sdk/extensions/joinlibrary/Join.java
Outdated
Show resolved
Hide resolved
To be clear, this join is a temporal join. You have to have a condition relating the timestamps of the two elements that can translate into garbage collection threshold. |
(the garbage collection can be looser than the actual comparison) |
IMO it's clearer if we reserve TemporalJoin for cases where the join
condition itself is _primarily_ temporal - e.g. AS OF joins, such as the
quotes/trade examples. Yes this join has a temporal component that bounds
our lookback (and also affects GC), but the primary join condition is not
that.
…On Mon, Nov 2, 2020 at 11:15 AM Kenn Knowles ***@***.***> wrote:
(the garbage collection can be looser than the actual comparison)
—
You are receiving this because you were mentioned.
Reply to this email directly, view it on GitHub
<#12915 (comment)>, or
unsubscribe
<https://github.com/notifications/unsubscribe-auth/AFAYJVJNX3GAEEPNRPBV653SN4AOHANCNFSM4RXFYIIQ>
.
|
I think there is a hidden innovation here when you refer to AS OF joins. In standard SQL this refers to the past state of a table as it is mutated in processing time. It is primarily useful for inspecting the evolution of a table. To yield correct results for a quotes/trades pipeline it must refer to event time. If all events from quotes and trades are ordered (and jointly ordered) then AS OF can answer the query. Otherwise it cannot. This is true for standard SQL databases: if a quote is inserted that contradicts a prior result of the quote/trade match, then the ordering used by AS OF cannot be used to determine the price for a trade. I think the re-interpretation of AS OF to refer to event time is a good change. I know that Flink has done this same thing. I think having a transform that can correctly address the quote/trade problem is also good. In documentation and API just be very careful to make sure it is clear. We already have a lot of users / StackOverflow questions talking about "before" and "after" and "as of" in terms of processing time, mixing it up with event time. |
We could call this one "timestamp-bounded equijoin" or some such. |
Note that I used as-of joins as.an example reference point, but this code
is not an actual SQL implementation, so I did feel bound to be standards
compliant.
As you mentioned, Flink has reinterpreted AS OF joins to refer to even time
in their SQL dialect. Note though that in SQL you would write this as
SELECT * FROM table INNJER_JOIN temporal_table FOR SYSTEM_TIME AS OF <time>
A simple extension might be to replace SYSTEM_TIME with EVENT_TIME. This
might be a useful extension to our SQL implementation.
…On Mon, Nov 2, 2020 at 2:20 PM Kenn Knowles ***@***.***> wrote:
I think there is a hidden innovation here when you refer to AS OF joins.
In standard SQL this refers to the past state of a table as it is mutated
in *processing time*. It is primarily useful for inspecting the evolution
of a table.
To yield correct results for a quotes/trades pipeline it must refer to *event
time*. If all events from quotes and trades are ordered (and jointly
ordered) then AS OF can answer the query. Otherwise it cannot. This is true
for standard SQL databases: if a quote is inserted that contradicts a prior
result of the quote/trade match, then the ordering used by AS OF cannot be
used to determine the price for a trade.
I think the re-interpretation of AS OF to refer to event time is a good
change. I know that Flink has done this same thing. I think having a
transform that can correctly address the quote/trade problem is also good.
In documentation and API just be very careful to make sure it is clear. We
already have a lot of users / StackOverflow questions talking about
"before" and "after" and "as of" in terms of processing time, mixing it up
with event time.
—
You are receiving this because you were mentioned.
Reply to this email directly, view it on GitHub
<#12915 (comment)>, or
unsubscribe
<https://github.com/notifications/unsubscribe-auth/AFAYJVILSPDE7FXON6FP4Q3SN4WDLANCNFSM4RXFYIIQ>
.
|
Now that i'm thinking about this further, the compareFn may be unnecessarily complicating the API for this join. I imagined it would be helpful for a user who wants to add logic before emitting a matched result, like a filter, but it would be more idiomatic for the user to apply a filter transform to the join result instead. |
+1 IMO it would be useful if it somehow happened prior to the CGBK, but
since it's after a subsequent ParDo or Filter will likely be fused anyway.
…On Tue, Nov 3, 2020 at 11:23 AM Tyson Hamilton ***@***.***> wrote:
I am a bit confused about the usage of compareFn here. State is per key,
so I believe that your DoFn will only join items that have the same key -
the compareFn will never even get to compare items with different keys. Is
the idea to allow the user to generate a subset of an equijoin?
Yes, it will be a subset of an equijoin. Sorry for the confusion.
I am a bit confused about the usage of compareFn here. State is per key,
so I believe that your DoFn will only join items that have the same key -
the compareFn will never even get to compare items with different keys. Is
the idea to allow the user to generate a subset of an equijoin?
Yes, it will be a subset of an equijoin. Sorry for the confusion.
Now that i'm thinking about this further, the compareFn may be
unnecessarily complicating the API for this join. I imagined it would be
helpful for a user who wants to add logic before emitting a matched result,
like a filter, but it would be more idiomatic for the user to apply a
filter transform to the join result instead.
—
You are receiving this because you were mentioned.
Reply to this email directly, view it on GitHub
<#12915 (comment)>, or
unsubscribe
<https://github.com/notifications/unsubscribe-auth/AFAYJVLK76ST2YWOMQGMURLSOBKEJANCNFSM4RXFYIIQ>
.
|
Ya this is a tough one to name, more input is welcome. I floated the following: EventTimeLimitedDurationInnerJoin, EventTimeScopedDurationInnerJoin |
FYI the actual class name can be a bit longer, as long there is a good builder method. e.g. You could do something like: Join.boundedInnerJoin(pc1, pc2); This would be easier to deal with if this contrib Join library used PTransforms instead of functions. |
Similar to other inner joins except it includes a temporal predicate, allowing users to join unbounded PCollection<KV>s in the GlobalWindow.
8c0e05b
to
b074975
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the review! I had some questions, resolved some of your comments, PTAL.
...a/extensions/join-library/src/main/java/org/apache/beam/sdk/extensions/joinlibrary/Join.java
Outdated
Show resolved
Hide resolved
...a/extensions/join-library/src/main/java/org/apache/beam/sdk/extensions/joinlibrary/Join.java
Outdated
Show resolved
Hide resolved
...a/extensions/join-library/src/main/java/org/apache/beam/sdk/extensions/joinlibrary/Join.java
Outdated
Show resolved
Hide resolved
...a/extensions/join-library/src/main/java/org/apache/beam/sdk/extensions/joinlibrary/Join.java
Outdated
Show resolved
Hide resolved
...a/extensions/join-library/src/main/java/org/apache/beam/sdk/extensions/joinlibrary/Join.java
Show resolved
Hide resolved
...a/extensions/join-library/src/main/java/org/apache/beam/sdk/extensions/joinlibrary/Join.java
Outdated
Show resolved
Hide resolved
...a/extensions/join-library/src/main/java/org/apache/beam/sdk/extensions/joinlibrary/Join.java
Outdated
Show resolved
Hide resolved
...a/extensions/join-library/src/main/java/org/apache/beam/sdk/extensions/joinlibrary/Join.java
Outdated
Show resolved
Hide resolved
V1 left = e.getValue().getKey(); | ||
V2 right = e.getValue().getValue(); | ||
if (left != null) { | ||
leftState.add(TimestampedValue.of(left, timestamp)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you elaborate a bit please? I don't understand why we want the hold, or what it accomplishes. The docs are a bit tricky to follow regarding this.
if (left != null) { | ||
leftState.add(TimestampedValue.of(left, timestamp)); | ||
rightState | ||
.readRange(timestamp.minus(temporalBound), timestamp.plus(temporalBound)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will the timer family reduce the worst case? The O(n^2) comes from searching through the state on each input, won't that still be required for finding the 'joined elements' to output in the timer?
d3ba9ec
to
b5198b2
Compare
Refactor name of classes and methods, remove the compareFn, fix eviction bug, convert boolean to state, and other smaller changes.
b5198b2
to
c63cc35
Compare
Obsolete. |
Similar to other inner joins except it includes a temporal predicate,
allowing users to join unbounded PCollections in the GlobalWindow.
Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
R: @username
).[BEAM-XXX] Fixes bug in ApproximateQuantiles
, where you replaceBEAM-XXX
with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.CHANGES.md
with noteworthy changes.See the Contributor Guide for more tips on how to make review process smoother.
Post-Commit Tests Status (on master branch)
Pre-Commit Tests Status (on master branch)
See .test-infra/jenkins/README for trigger phrase, status and link of all Jenkins jobs.
GitHub Actions Tests Status (on master branch)
See CI.md for more information about GitHub Actions CI.