Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-1450] Added fold operator for the Streaming API #481

Closed
wants to merge 13 commits into
base: master
from

Conversation

Projects
None yet
6 participants
@akshaydixi
Copy link
Contributor

akshaydixi commented Mar 12, 2015

Tried to follow the steps described in the JIRA Issue comments:

  • Created a FoldFunction and a RichFoldFunction
  • Created a StreamFoldInvokable
  • Integrated it to the DataStream for both Java and Scala
  • Implemented StreamFoldTest based on StreamReduceTest
@gyfora

This comment has been minimized.

It does not really makes sense to copy the next received value since that is coming directly from the input (but you are right copying at the collect method). You should actually copy the accumulator value instead in order to guarantee that you dont mutate it afterwards.

@gyfora

This comment has been minimized.

We should also add a fold method to the scala api which has the same signature as the scala foldLeft method which can be applied on collections (which then can take lambda functions too)

@gyfora

This comment has been minimized.

Copy link

gyfora commented on 9848f23 Mar 12, 2015

Hey,

Good job, I like it very much. 👍 I added some minor comments.

There are couple of things that would be nice if you could add as a next commit :)

-It would be cool to have a grouped version of this operator that would work on GroupedDataStreams, like the GroupedReduceInvokable, this should be a trivial extension

-Also please do me the favor and integrate the fold methods in the WindowedDataStream, this also should take about 10 minutes.

I think that similarly to reduce, the most used versions in stream processing would be the grouped fold and windowed fold operations.

Thanks again for the contribution, once you added these I would like to merge them right away :)

hsaputra and others added some commits Mar 12, 2015

Merge branch 'FLINK-1450' of github.com:akshaydixi/flink into FLINK-1450
Conflicts:
	flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/StreamFoldInvokable.java
@akshaydixi

This comment has been minimized.

Copy link
Contributor Author

akshaydixi commented Mar 13, 2015

Thanks @gyfora . I've added fold into GroupedDataStream and WindowedDataStream in subsequent commits.

@gyfora

This comment has been minimized.

The accumulator should be copied. In order to do that you will have to pass the type serializer for the out type to the invokable (because the copy method works for the input type). You can see how it is passed to the the ProjectInvokable. Once you have the typeserializer, you can call the copy method of that.

This comment has been minimized.

Copy link
Owner Author

akshaydixi replied Mar 14, 2015

So @gyfora, could you explain a bit why are we using copy here? I remember you mentitioning using it to guarantee immutation, could you give a bit more detail? Thanks.

This comment has been minimized.

Copy link

gyfora replied Mar 14, 2015

Imagine a foldfunction that always stores the previously accumulated values inside. If you dont copy and the function mutates the aggregated values afterwards that can give weird results.

@gyfora

This comment has been minimized.

I dont think this actually works. The java windowedstream will fail when trying to extract the typeinformation from this function. This is the reason why I added a second mapWindow to the java WindowedDataStream that also takes the out type as well.

This comment has been minimized.

Copy link
Owner Author

akshaydixi replied Mar 14, 2015

Hey @gyfora, so I'm not sure why it won't work. If as you say the WindowedStream will fail extracting TypeInformation, wouldn't implicitly adding TypeInformation work ? Also, could you explain why would it fail while extracting the TypeInformation?
I'm assuming the error will occur here?

This comment has been minimized.

Copy link

gyfora replied Mar 14, 2015

I believe it fails because the java method that will try to extract the typeinformation from the fold function will fail on the scala types.

This comment has been minimized.

Copy link
Owner Author

akshaydixi replied Mar 14, 2015

So shouldn't adding passing outTypeInformation from the scala foldWindow calls work instead ( as I did in the previous commit for GroupedFoldInvokable and StreamFoldInvokable )?

This comment has been minimized.

Copy link

gyfora replied Mar 14, 2015

I dont quite get what you mean. But you should try it out and see what works :)

This comment has been minimized.

Copy link
Owner Author

akshaydixi replied Mar 14, 2015

So, this commit should solve the problem. I agree, while trying out foldWindow as shown in this gist it ran into errors. But with it runs successfully with the new commit changes.

@gyfora

This comment has been minimized.

@gyfora

This comment has been minimized.

This shouldnt be necessary if the fold is implemented as a mapWindow

@gyfora

This comment has been minimized.

@gyfora

This comment has been minimized.

Copy link

gyfora commented on b8270b8 Mar 14, 2015

I see you took a little more complicated approach than what I thought.

I would suggest to modify this in a way that the foldWindow is implemented by using the mapWindow method. At some point we will probably take a separate approach to optimize the performance but now I believe that is better.

This comment has been minimized.

Copy link
Owner Author

akshaydixi replied Mar 14, 2015

Sure, I don't mind changing foldWindow, but I'm a bit confused on how to implement it using mapWindow. mapWindow accepts a WindowMapFunction, so would I need to create an instance of WindowMapFunction that overrides mapWindow by having it collect the "fold" of all the values passed to it and the initialValue?
Am I on the right track here?

This comment has been minimized.

Copy link

gyfora replied Mar 14, 2015

Your WindowMapFunction would encapsulate the fold functionality. It would fold the whole window (from the iterable) and only collect the final aggregate.

This comment has been minimized.

Copy link
Owner Author

akshaydixi replied Mar 14, 2015

Instead of changing foldWindow, I tried passing the typeinformation from scala onwards so it doesn't run into errors trying to extract it afterwards (in this commit). This probably should solve this issue.

This comment has been minimized.

Copy link

gyfora replied Mar 14, 2015

but now if the user wants to use foldWindow in the java api he needs to pass the typeinfo, which could be extracted

This comment has been minimized.

Copy link

gyfora replied Mar 14, 2015

just look at how the mapWindow is implemented with 2 different signatures

This comment has been minimized.

Copy link
Owner Author

akshaydixi replied Mar 14, 2015

Right, I forgot about that. Fixed it in the next commit.

@gyfora

This comment has been minimized.

I don't think we use this function anywhere.

This comment has been minimized.

Copy link
Owner Author

akshaydixi replied Mar 14, 2015

Yeah @gyfora , sorry. I added it before writing GroupedFoldInvokable. I'll remove this file in the next commit.

@gyfora

This comment has been minimized.

The accumulator should be copied here as well to be consistent with the other operators. See my other comment below

@gyfora

This comment has been minimized.

Copy link

gyfora commented on b9bfebd Mar 14, 2015

We always rebase to keep linear git history. But no worries I will fix this once I merge it.

This comment has been minimized.

Copy link
Owner Author

akshaydixi replied Mar 14, 2015

Yeah sorry about that. I was trying to follow the steps in How To Contribute and didn't understand what went wrong.

@gyfora

This comment has been minimized.

Copy link
Contributor

gyfora commented Mar 14, 2015

I added some other minor comments :)

Also if you are interested in taking this one step further there are some possibilities (after I merged this):

  • add combiners/aggregators (that basically make it possible to apply folds in parallel by using ReduceFunctions), this is mostly applicable for windowing
  • add window PreAggregators for fold operators as well -> even better if we have combiners

I will add some JIRA-s for these, but we should definitely sync on skype or hangout if you decide to go further.

[FLINK-1450] Fixed StreamFoldInvokable and GroupedStreamFoldInvokable…
… by implementing accumulator copying to prevent mutations and removed GroupFoldFunction.
@gyfora

This comment has been minimized.

Copy link

gyfora commented on b8270b8 Mar 14, 2015

I see you took a little more complicated approach than what I thought.

I would suggest to modify this in a way that the foldWindow is implemented by using the mapWindow method. At some point we will probably take a separate approach to optimize the performance but now I believe that is better.

This comment has been minimized.

Copy link
Owner Author

akshaydixi replied Mar 14, 2015

Sure, I don't mind changing foldWindow, but I'm a bit confused on how to implement it using mapWindow. mapWindow accepts a WindowMapFunction, so would I need to create an instance of WindowMapFunction that overrides mapWindow by having it collect the "fold" of all the values passed to it and the initialValue?
Am I on the right track here?

This comment has been minimized.

Copy link

gyfora replied Mar 14, 2015

Your WindowMapFunction would encapsulate the fold functionality. It would fold the whole window (from the iterable) and only collect the final aggregate.

This comment has been minimized.

Copy link
Owner Author

akshaydixi replied Mar 14, 2015

Instead of changing foldWindow, I tried passing the typeinformation from scala onwards so it doesn't run into errors trying to extract it afterwards (in this commit). This probably should solve this issue.

This comment has been minimized.

Copy link

gyfora replied Mar 14, 2015

but now if the user wants to use foldWindow in the java api he needs to pass the typeinfo, which could be extracted

This comment has been minimized.

Copy link

gyfora replied Mar 14, 2015

just look at how the mapWindow is implemented with 2 different signatures

This comment has been minimized.

Copy link
Owner Author

akshaydixi replied Mar 14, 2015

Right, I forgot about that. Fixed it in the next commit.

@akshaydixi

This comment has been minimized.

Copy link
Contributor Author

akshaydixi commented Mar 14, 2015

Ok @gyfora, thanks for reviewing!
I picked this issue since it seemed a good way to get a handle of flink-streaming as I'm interested in pursuing FLINK-1534 and FLINK-1617 for my GSoC proposal. But I'll be glad to work more on this and try to implement more of the features you've described. They do seem challenging so I'll count on your help in the tougher bits :)

akshaydixi added some commits Mar 14, 2015

[FLINK-1450] Fixed foldWindow on WindowedDataStreams so it doesn't fa…
…il on extracting type information from scala types
[FLINK-1450] Fix foldWindow so now user doesn't have to supply output…
… TypeInformation while using it from Java API
[FLINK-1450] Fixed typo in WindowedDataStream.java and style-nits in …
…DataStream.scala and WindowedDataStream.scala

@asfgit asfgit closed this in aaa231b Mar 15, 2015

@gyfora

This comment has been minimized.

Copy link
Contributor

gyfora commented Mar 15, 2015

Hey,

I squashed some of your small fix commits to your last, one and also fixed several minor issues:

-I swapped the type parameter order to make the type extraction work properly
-I made the parameter order consistent through the scala and java api
-The GroupedFoldInvokable completely disregarded the first element in the stream (also the test logic for this class was broken)
-And some minor windowing related stuff

I didnt include your last commit since I already added those fixes to yours.

@akshaydixi

This comment has been minimized.

Copy link
Contributor Author

akshaydixi commented Mar 15, 2015

Thanks @gyfora. I'll try to not mess up commit history in future contributions!

@akshaydixi akshaydixi deleted the akshaydixi:FLINK-1450 branch Mar 15, 2015

mafernandez-stratio added a commit to mafernandez-stratio/flink that referenced this pull request Mar 18, 2015

bhatsachin added a commit to bhatsachin/flink that referenced this pull request May 5, 2015

marthavk added a commit to marthavk/flink that referenced this pull request Jun 9, 2015

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.