New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[streaming] Fast calculation of medians of windows #684
Conversation
Hi, thank you for contributing to Flink! |
try { | ||
keyField.set(record, fieldValue); | ||
} catch (IllegalAccessException e) { | ||
throw new RuntimeException(""); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add message + chain IllegalAccessException
In general I think we should discuss whether we make Gabor's GSoC project part of the core streaming API or have them separately in a streaming windowing statistics package or even maven module. For this specific change I am personally ambivalent whether we should have this in core. @rmetzger: Good point with JIRA. There is no ticket yet for Gabor's work, I'll discuss the issues he is addressing later this week and we will break it up to a number of issues. |
fa48ad4
to
efc3837
Compare
Thank you for your comments. @rmetzger: I fixed the catching of the IllegalAccessException. The assertions seem to be enabled for the unit tests. (Surefire enables them by default, and I don't see them disabled. I also tried it by temporarily adding an 'assert false', and the test failed.) It also seems to me, that an assert is more appropriate here, as it doesn't slow the code down when we are not unit testing. And if this assert fails, that indicates some internal problem (I mean that the user shouldn't be able to make this condition not hold here). @mbalassi: |
4279df9
to
ef789c2
Compare
I coded the grouped case and did the refactoring of SumAggregator and ComparableAggregator to use the new FieldAccessor class to access the user-specified field. The first commit does the refactoring and the second adds the median calculation. As part of the refactoring I also fixed a small bug: SimpleComparableAggregator.reduce was not handling the byAggregate case. I also moved the logic of DataStream.getClassAtPos and checkFieldRange to FieldAccessor. The refactoring also solves FLINK-2039. (by the first two lines of the second overload of FieldAccessor.create) There are three things left to do: deciding where will the median be placed, creating a Jira, and possibly adding the median to the Scala API. I will do these after speaking with @mbalassi tomorrow. |
import java.lang.reflect.Array; | ||
import java.util.List; | ||
|
||
// These classes encapsulate the logic of accessing a field specified by the user as either an index |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should use proper JavaDoc style
Concerning Marton's comment: I think we should put the code into a streaming operator library. The streaming core is undergoing heavy changes, the windowing may be reworked in the near future, and there are plans to use managed memory for window buffer contents. In light of that, I would suggest to keep these changes in a library for now. It would be easier for both parties that way. |
ac3ca63
to
1f14d58
Compare
I did the Scala part, and added a Jira (FLINK-2145).
I am not sure how putting this into a separate library helps a future refactoring of windowing. Most of the median code is inside the new PreReducers, so it is probably already separated from the rest of the code as well as it can be. |
8001af9
to
17b0dc5
Compare
Merged the first commit as that was already clarified. As for the second one the placement to either @ggevay, @gyfora what do you think? [1] mbalassi@9561457 |
} catch (IndexOutOfBoundsException e) { | ||
// Nothing to do here | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is there no test for the POJO field accessor?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is tested by AggregationFunctionTest.
ProductFieldAccessor is tested by TopSpeedWindowingExampleITCase (or it will be, after fixing FLINK-2255).
I will add this as a comment to FieldAccessorTest.java.
I merged Marton's changes (moving to contrib) to this PR. |
Concerning my comment about where to put the code: There is so much code being added to Flink right now. The committers have to be ultimately the ones that maintain the code. That should make us think hard about what we really want to add. What is the motivation to add this to the core projects? Is it a frequently needed functionality, from users? This code seems to be totally okay with running "on top of Flink", as a library function. Just because it can be added is not really a reason that we can follow any more at this point. |
I see your point, no problem. Although, I would like to put my next pull request (856) to the core, because that code would be very awkward to put into contrib. But I will put later things into contrib. |
Is this still a relevant PR with all the changes to the runtime in recent months? If not, can we close this? |
This is dead. Everything has totally changed that this was based on. The fundamental problem is that this is based on the assumption of the old 0.9 windowing code that events come ordered by time. The idea was to update the median incrementally when the current window changes, but the new windowing code is building several windows at the same time instead of windows gradually changing into each other, so I don't think that this idea could be adapted to the current version. |
I started working on my Google Summer of Code Project. I decided to start with the fast calculation of medians of windows. The MedianPreReducer keeps track of the median at every window change by keeping the lower and upper halves of the window in two multisets. The updates have logarithmic runtime with the current window size. (Adding an element to or removing an element from the window involves moving at most one of the other elements between the lower and upper halves.)
The parameters of WindowedDataStream.median is the same as for the sum, min, max, etc. methods. (But here, the specified field can only be of Double type.) I implemented a few helper classes for accessing the field that is specified by these parameters. I think that the other methods with similar field selection (sum, min, max, etc.) could also be refactored to use these helper classes, so that the code duplication in SumAggregator and ComparableAggregator could be eliminated (the duplication of both the field access logic, and the logic of the byAggregate comparisons). Should I do this refactoring?
Currently, the implementation only handles non-grouped data streams, but I plan to create a GroupedMedianPreReducer (hence the [WIP] flag).