Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BEAM-7386] Adding EventTimeEquiJoin #15275

Closed
wants to merge 2 commits into from

Conversation

laraschmidt
Copy link
Contributor

@laraschmidt laraschmidt commented Aug 3, 2021

Adding EventTimeEquiJoin which is a PTransform that joins two streams where the comparison is time-stamp bounded.

@reuvenlax @kennknowles

Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:

  • Choose reviewer(s) and mention them in a comment (R: @username).
  • Format the pull request title like [BEAM-XXX] Fixes bug in ApproximateQuantiles, where you replace BEAM-XXX with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.
  • Update CHANGES.md with noteworthy changes.
  • If this contribution is large, please file an Apache Individual Contributor License Agreement.

See the Contributor Guide for more tips on how to make review process smoother.

ValidatesRunner compliance status (on master branch)

Lang ULR Dataflow Flink Samza Spark Twister2
Go --- Build Status Build Status Build Status Build Status ---
Java Build Status Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
Python --- Build Status
Build Status
Build Status
Build Status
Build Status
Build Status Build Status ---
XLang Build Status Build Status Build Status Build Status Build Status ---

Examples testing status on various runners

Lang ULR Dataflow Flink Samza Spark Twister2
Go --- --- --- --- --- --- ---
Java --- Build Status
Build Status
Build Status
--- --- --- --- ---
Python --- --- --- --- --- --- ---
XLang --- --- --- --- --- --- ---

Post-Commit SDK/Transform Integration Tests Status (on master branch)

Go Java Python
Build Status Build Status Build Status
Build Status
Build Status

Pre-Commit Tests Status (on master branch)

--- Java Python Go Website Whitespace Typescript
Non-portable Build Status
Build Status
Build Status
Build Status
Build Status
Build Status Build Status Build Status Build Status
Portable --- Build Status Build Status --- --- ---

See .test-infra/jenkins/README for trigger phrase, status and link of all Jenkins jobs.

GitHub Actions Tests Status (on master branch)

Build python source distribution and wheels
Python tests
Java tests

See CI.md for more information about GitHub Actions CI.

@reuvenlax reuvenlax self-requested a review August 4, 2021 21:44
* </pre>
*/
@AutoValue
public abstract class EventTimeEquiJoin<K, V1, V2>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would like to think on the name a bit more, as EventTimeEquiJoin seems a bit awkward to me.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also wonder about whether this PTransform should be in terms of KVs or not. However I'm starting to think that it should be and then we have the option to wrap it in a higher-level Join transform

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I looked up what Flink and Spark call them. Spark doesn't really have a name. Flink's joins seem to be different (processing on a table at a point of time instead of things being equivalent). Tyson's suggestion in the doc was EventTimeBoundedEquiJoin. Not sure I have any other suggestions. Maybe we could shorten it and move some to the function: E.g. EventTimeBoundedJoin.innerEquiOf() or something like that?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some other options that were suggested previously:

  • TimestampBoundedEquijoin
  • EventTimeLimitedDurationEquiJoin
  • EventTimeScopedDurationInnerJoin

Instant nextBucketStart =
Instant.ofEpochMilli(
cleanupTime.getMillis() / TIMER_BUCKET * TIMER_BUCKET + TIMER_BUCKET);
cleanupTimers.get(Long.toString(nextBucketStart.getMillis())).set(nextBucketStart);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ideally we should call withOutputTimestamp to hold the watermark back to the earliest buffered element in that timer's range. Doing this probably requires keeping a histogram in a ValueState tracking the min timestamp per minute bucket.

@reuvenlax
Copy link
Contributor

Let me know when you're ready for another review round!

@laraschmidt
Copy link
Contributor Author

@reuvenlax @kennknowles I implemented changes as if we were to remove the checks we discussed offline (one I avoided with the skew and the other by removing it). Please take a look, thanks!

@laraschmidt laraschmidt force-pushed the equijoin branch 6 times, most recently from ec252da to 7a8ba6d Compare August 24, 2021 23:35
@kennknowles kennknowles self-requested a review September 2, 2021 20:32
@laraschmidt
Copy link
Contributor Author

Ping. :) @reuvenlax @kennknowles

@aaltay
Copy link
Member

aaltay commented Sep 16, 2021

@kennknowles @reuvenlax - Could you please take a look at this?

@aaltay
Copy link
Member

aaltay commented Sep 30, 2021

@kennknowles @reuvenlax - Could you please take a look at this?

@laraschmidt - Could you look at the failing precommit test or re-run it?

@laraschmidt
Copy link
Contributor Author

Run Java PreCommit

@laraschmidt
Copy link
Contributor Author

Run Java_Examples_Dataflow PreCommit

1 similar comment
@laraschmidt
Copy link
Contributor Author

Run Java_Examples_Dataflow PreCommit

@laraschmidt
Copy link
Contributor Author

Run Java PreCommit

Copy link
Member

@kennknowles kennknowles left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice

@Override
public void encode(Pair<V1, V2> value, OutputStream outStream)
throws CoderException, IOException {
firstCoder.encode(value.getFirst(), outStream);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have we deprecated the coder "context" idea? Or do you just not want to apply it here? I would expect PairCoder to be identical to KvCoder, anyhow.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not familiar with the deprecated context thing but I copied it over from KVCoder. What exactly is it used for? I assume that in my case I want context on both (whereas the KV coder only put it on the value).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hrm, having both failed so I swapped it to just be on the second element.

Instant ts,
OrderedListState<ThisT> thisCollection,
OrderedListState<OtherT> otherCollection,
ValueState<Instant> oldestTimestampState,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can see how having a specific timestamp queue state would make it easier to have all the methods we need in a single "state type". Interestingly enough, the original design for state (before all the annotations) made it pretty easy to define composite types of state, whereas now we really cannot.

* @param <V2> the type of the value in the second {@code PCollection}
* </pre>
*/
@AutoValue
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Experimental

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

@laraschmidt laraschmidt force-pushed the equijoin branch 2 times, most recently from be5bf5c to 08fc756 Compare October 12, 2021 19:02
@laraschmidt
Copy link
Contributor Author

Addressed comments, PTAL. :) Thanks! @kennknowles

@aaltay
Copy link
Member

aaltay commented Oct 22, 2021

@kennknowles - Could you please take a look?

@aaltay
Copy link
Member

aaltay commented Dec 29, 2021

Is this still relevant? Should we close it?

Ignore that. I saw the recent changes.

@reuvenlax
Copy link
Contributor

reuvenlax commented Dec 29, 2021 via email

@aaltay
Copy link
Member

aaltay commented Jan 7, 2022

@laraschmidt - how is your other PR doing? :)

@laraschmidt
Copy link
Contributor Author

The other one is in but it does not work for portable runner because making it work for protable runner breaks dataflow on portable runner. :)

But the changes can actually be treated as pretty independent. That one was making the function we use here not as deprecated but we can still use it in this PR even if deprecated. So this is still ready for review.

@reuvenlax
Copy link
Contributor

reuvenlax commented Jan 7, 2022 via email

@laraschmidt
Copy link
Contributor Author

laraschmidt commented Jan 7, 2022

That was for an earlier iteration. The solution we ended up going with only outputs from processElement and only uses timers to hold the watermark. So we didn't end up actually needing to allow older elements from timers because we accepted that we will allow older elements from processElement function.

@reuvenlax
Copy link
Contributor

reuvenlax commented Jan 7, 2022 via email

@laraschmidt
Copy link
Contributor Author

I'm trying to page this back in. But if I remember correctly, the first element holds the watermark back until all not-late second elements would have appeared. So we don't actually need to hold it backwards. It's just the processElement that puts out older elements.

Here's the relevant code for the timer which are not negative:
if (elementAffectsWatermarkHolds) {
addTimer(watermarkHolds, ts.plus(thisCollectionValidFor)).withOutputTimestamp(ts);

@reuvenlax
Copy link
Contributor

reuvenlax commented Jan 7, 2022 via email

@laraschmidt
Copy link
Contributor Author

We use the same watermark hold for a bucket of elements (E.g. arriving between t and t+e). And we just hold it back til t+e. So it shouldn't matter that they come out of order from what I can tell.

@aaltay
Copy link
Member

aaltay commented Jan 21, 2022

What is the next step on this PR? - (I will stop pinging. Let me know if I can do anything to help.)

@github-actions
Copy link
Contributor

This pull request has been marked as stale due to 60 days of inactivity. It will be closed in 1 week if no further activity occurs. If you think that’s incorrect or this pull request requires a review, please simply write any comment. If closed, you can revive the PR at any time and @mention a reviewer or discuss it on the dev@beam.apache.org list. Thank you for your contributions.

@github-actions github-actions bot added the stale label Mar 22, 2022
@github-actions
Copy link
Contributor

This pull request has been closed due to lack of activity. If you think that is incorrect, or the pull request requires review, you can revive the PR at any time.

@github-actions github-actions bot closed this Mar 30, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

5 participants