Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-3109]Join two streams with two different buffer time -- Java i… #1527

Closed
wants to merge 8 commits into from
Closed

Conversation

wangyangjun
Copy link

Java implementation of jira FLINK-3109

* @throws Exception
*/
private void processWatermark(long watermark) throws Exception{
System.out.println("Watermark:" + String.valueOf(watermark));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

System.out is not good here. Maybe using logger if you want to log something.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for your suggestion.

@aljoscha
Copy link
Contributor

Hi @wangyangjun,
sorry for the long wait but I think we can get this PR in after some modifications. I'd like to change the API a bit to separate it from the other Join implementation since that class is already quite crowded. What I would propose is to add a method timeJoin() on DataStream and a new class TimeJoinedStreams that is similar to JoinedStreams but specific to the two-buffer time join.

Could you also please add support for the Scala API, we try to keep the two APIs in sync. If you need help with that, please let me know.

@wangyangjun
Copy link
Author

Hello, @aljoscha ,
Thanks for your comment and suggestion. I will update the API. During my benchmark tests of timeJoin, the feature of this implementation is quite good, but the performance is bad. I will reimplement it with Guava.

Yes, I could add support for the Scala API.

Thanks

@wangyangjun
Copy link
Author

Hello @aljoscha ,
As I mentioned in last comment, I will reimplement it with Guava CacheBuilder. One question is about Flink self-memory management, does CacheBuilder get memory from Flink or JVM directly? As I understand, HeapWindowBuffer allocates memory from Flink. Is there any data structure like cachedMap in Flink?

@StephanEwen
Copy link
Contributor

Concerning the data management: @aljoscha and me are currently heavily reworking that.
All window operations need to go onto the "state" interfaces. Before we merge this one, we should also do that, so please do not spend much time on optimizing how the buffers for the two inputs are implemented.

The interfaces for that will go into the code in a few days (they are in this pull request: #1562)

For now, I would focus on the API and we look into the buffers in a few days.

BTW: how exactly the buffered data is held (managed memory, external databases, etc) depends on the "state backend" of the job. Memory behavior can be changed that way and the operators need not worry about that.

@StephanEwen
Copy link
Contributor

@wangyangjun We actually merged all the changes concerning the state abstraction.

To make this window join work seamlessly on Flink's state backends (memory, or key/value stores, managed memory, ...) you would need to implement it against the key/value state. That means that whenever you store data in the operator, the data should go into the partitioned state that you can obtain from the AbstractStreamOperator or the RuntimeContext.

I think that for this window operator, the ListState is a good choice, where you can add values to a key and retrieve the list as a whole once the windows are evaluated.

Please write back if you need some more pointers on the state abstraction.

@StephanEwen
Copy link
Contributor

I just saw that you updated this pull request (actually a few weeks ago already)

A lot of it looks very good, some things we need to check a bit deeper (like how triggers actually behave on the two separate windows, how windows are matched).

Can you give a high level summary of how this should behave?
Especially given that you allow for custom triggers and window assigners here, how are windows matched against each other (to determine that their elements should be joined/co-grouped).
For tumbling time windows, the behavior is well defined and like discussed in the JIRA issue, but for generic windows and triggers, how is it defined?

@wangyangjun
Copy link
Author

Hi StephanEwen, sorry for replying this late.

This will join two window streams as long as slide size of these two windows are equal. It can be two SlidingTimeWindows, one SlidingTimeWindow and one TumblingTimeWindow. If slide size of windows are not equal, an exception will be threw.

It doesn't support generic windows and triggers yet. Only TimeWindow is allowed in the API.

@aljoscha
Copy link
Contributor

I'm closing this as "Abandoned", since there is no more activity and the code base has moved on quite a bit.

@aljoscha aljoscha closed this Oct 14, 2019
@aljoscha aljoscha self-assigned this Oct 20, 2019
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
6 participants