New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FLINK-5968] Add documentation for WindowedStream.aggregate() #4833
[FLINK-5968] Add documentation for WindowedStream.aggregate() #4833
Conversation
R: @twalthr you might be interested 😃 |
docs/dev/stream/operators/windows.md
Outdated
Context context, | ||
Iterable<Double> averages, | ||
Collector<Tuple2<String, Double>> out) { | ||
Double average = averags.iterator().next(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should be averages, not averags
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks! fixing
c4f44b4
to
02681ab
Compare
CC: @fhueske You wanted me to cc you for the Table API changes. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the heads up @aljoscha.
I had a look at the flink-table
modifications and think we can revert most changes. GeneratedAggregations
is just an internal interface and the public UDAGG interface that it calls does not support immutable accumulators (it suffer from the same problem as the DataStream AggregateFunction
). I don't want to touch the UDAGG interface before the release.
Thanks, Fabian
@@ -52,11 +52,12 @@ import org.apache.flink.table.dataview.ListViewTypeInfoFactory | |||
* return accum; | |||
* } | |||
* | |||
* public void accumulate(MyAccum accumulator, String id) { | |||
* public MyAccum accumulate(MyAccum accumulator, String id) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please revert these changes.
The accumulate
method belongs to a Table API org.apache.flink.table.functions.AggregateFunction
and not GeneratedAggregations
.
@@ -52,7 +52,7 @@ import org.apache.flink.table.dataview.MapViewTypeInfoFactory | |||
* return accum; | |||
* } | |||
* | |||
* public void accumulate(MyAccum accumulator, String id) { | |||
* public MyAccum accumulate(MyAccum accumulator, String id) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please revert these changes.
The accumulate
method belongs to a Table API org.apache.flink.table.functions.AggregateFunction
and not GeneratedAggregations
.
@@ -67,7 +67,7 @@ abstract class GeneratedAggregations extends Function { | |||
* aggregated results | |||
* @param input input values bundled in a row |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please add @returns
documentation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please change retract()
accordingly to keep the interfaces consistent.
Alternatively, you can also revert the changes on GeneratedAggregations
which is only an internal interface. The Table API org.apache.flink.table.functions.AggregateFunction
does not support immutable accumulators, so this change has no effect until the public interface is changed. If you revert, we only need to touch AggregateAggFunction
and not the code generator.
02681ab
to
644ed15
Compare
3f16528
to
f764184
Compare
f764184
to
b4afc52
Compare
Same as with `ReduceFunction`, Flink will incrementally aggregate input elements of a window as they | ||
arrive. | ||
|
||
A `AggregateFunction` can be defined and used like this: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
An
|
||
@Override | ||
public Tuple2<Long, Long> add( | ||
Tuple2<String, Long> value, Tuple2<Long, Long> accumulator) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shall we keep the method signature in a single line? They can fit into one line, and I usually find it difficult to read method signature in two lines
|
||
@Override | ||
public Tuple2<Long, Long> merge( | ||
Tuple2<Long, Long> a, Tuple2<Long, Long> b) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
|
||
@Override | ||
public Tuple2<Long, Long> add( | ||
Tuple2<String, Long> value, Tuple2<Long, Long> accumulator) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
|
||
@Override | ||
public Tuple2<Long, Long> merge( | ||
Tuple2<Long, Long> a, Tuple2<Long, Long> b) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
I just saw this PR, and sorry for submitting my feedbacks late... |
Thanks, @bowenli86! Your recommendations are good and I'll push a hot fix commit with the fixes. |
No description provided.