-
Notifications
You must be signed in to change notification settings - Fork 13k
/
AllWindowedStream.java
485 lines (424 loc) · 20.4 KB
/
AllWindowedStream.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.api.datastream;
import org.apache.commons.lang.SerializationUtils;
import org.apache.flink.api.common.functions.FoldFunction;
import org.apache.flink.api.common.functions.Function;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.Utils;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.aggregation.AggregationFunction;
import org.apache.flink.streaming.api.functions.aggregation.ComparableAggregator;
import org.apache.flink.streaming.api.functions.aggregation.SumAggregator;
import org.apache.flink.streaming.api.functions.windowing.FoldAllWindowFunction;
import org.apache.flink.streaming.api.functions.windowing.ReduceAllWindowFunction;
import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
import org.apache.flink.streaming.api.windowing.evictors.Evictor;
import org.apache.flink.streaming.api.windowing.triggers.Trigger;
import org.apache.flink.streaming.api.windowing.windows.Window;
import org.apache.flink.streaming.runtime.operators.windowing.EvictingNonKeyedWindowOperator;
import org.apache.flink.streaming.runtime.operators.windowing.NonKeyedWindowOperator;
import org.apache.flink.streaming.runtime.operators.windowing.buffers.HeapWindowBuffer;
import org.apache.flink.streaming.runtime.operators.windowing.buffers.PreAggregatingHeapWindowBuffer;
/**
* A {@code AllWindowedStream} represents a data stream where the stream of
* elements is split into windows based on a
* {@link org.apache.flink.streaming.api.windowing.assigners.WindowAssigner}. Window emission
* is triggered based on a {@link org.apache.flink.streaming.api.windowing.triggers.Trigger}.
*
* <p>
* If an {@link org.apache.flink.streaming.api.windowing.evictors.Evictor} is specified it will be
* used to evict elements from the window after
* evaluation was triggered by the {@code Trigger} but before the actual evaluation of the window.
* When using an evictor window performance will degrade significantly, since
* pre-aggregation of window results cannot be used.
*
* <p>
* Note that the {@code AllWindowedStream} is purely and API construct, during runtime
* the {@code AllWindowedStream} will be collapsed together with the
* operation over the window into one single operation.
*
* @param <T> The type of elements in the stream.
* @param <W> The type of {@code Window} that the {@code WindowAssigner} assigns the elements to.
*/
public class AllWindowedStream<T, W extends Window> {
/** The data stream that is windowed by this stream */
private final DataStream<T> input;
/** The window assigner */
private final WindowAssigner<? super T, W> windowAssigner;
/** The trigger that is used for window evaluation/emission. */
private Trigger<? super T, ? super W> trigger;
/** The evictor that is used for evicting elements before window evaluation. */
private Evictor<? super T, ? super W> evictor;
public AllWindowedStream(DataStream<T> input,
WindowAssigner<? super T, W> windowAssigner) {
this.input = input;
this.windowAssigner = windowAssigner;
this.trigger = windowAssigner.getDefaultTrigger(input.getExecutionEnvironment());
}
/**
* Sets the {@code Trigger} that should be used to trigger window emission.
*/
public AllWindowedStream<T, W> trigger(Trigger<? super T, ? super W> trigger) {
this.trigger = trigger;
return this;
}
/**
* Sets the {@code Evictor} that should be used to evict elements from a window before emission.
*
* <p>
* Note: When using an evictor window performance will degrade significantly, since
* pre-aggregation of window results cannot be used.
*/
public AllWindowedStream<T, W> evictor(Evictor<? super T, ? super W> evictor) {
this.evictor = evictor;
return this;
}
// ------------------------------------------------------------------------
// Operations on the keyed windows
// ------------------------------------------------------------------------
/**
* Applies a reduce function to the window. The window function is called for each evaluation
* of the window for each key individually. The output of the reduce function is interpreted
* as a regular non-windowed stream.
* <p>
* This window will try and pre-aggregate data as much as the window policies permit. For example,
* tumbling time windows can perfectly pre-aggregate the data, meaning that only one element per
* key is stored. Sliding time windows will pre-aggregate on the granularity of the slide interval,
* so a few elements are stored per key (one per slide interval).
* Custom windows may not be able to pre-aggregate, or may need to store extra values in an
* aggregation tree.
*
* @param function The reduce function.
* @return The data stream that is the result of applying the reduce function to the window.
*/
public SingleOutputStreamOperator<T, ?> reduce(ReduceFunction<T> function) {
//clean the closure
function = input.getExecutionEnvironment().clean(function);
String callLocation = Utils.getCallLocationName();
String udfName = "Reduce at " + callLocation;
SingleOutputStreamOperator<T, ?> result = createFastTimeOperatorIfValid(function, input.getType(), udfName);
if (result != null) {
return result;
}
String opName = "NonParallelTriggerWindow(" + windowAssigner + ", " + trigger + ", " + udfName + ")";
OneInputStreamOperator<T, T> operator;
boolean setProcessingTime = input.getExecutionEnvironment().getStreamTimeCharacteristic() == TimeCharacteristic.ProcessingTime;
if (evictor != null) {
operator = new EvictingNonKeyedWindowOperator<>(windowAssigner,
new HeapWindowBuffer.Factory<T>(),
new ReduceAllWindowFunction<W, T>(function),
trigger,
evictor).enableSetProcessingTime(setProcessingTime);
} else {
// we need to copy because we need our own instance of the pre aggregator
@SuppressWarnings("unchecked")
ReduceFunction<T> functionCopy = (ReduceFunction<T>) SerializationUtils.clone(function);
operator = new NonKeyedWindowOperator<>(windowAssigner,
new PreAggregatingHeapWindowBuffer.Factory<>(functionCopy),
new ReduceAllWindowFunction<W, T>(function),
trigger).enableSetProcessingTime(setProcessingTime);
}
return input.transform(opName, input.getType(), operator).setParallelism(1);
}
/**
* Applies the given fold function to each window. The window function is called for each
* evaluation of the window for each key individually. The output of the reduce function is
* interpreted as a regular non-windowed stream.
*
* @param function The fold function.
* @return The data stream that is the result of applying the fold function to the window.
*/
public <R> SingleOutputStreamOperator<R, ?> fold(R initialValue, FoldFunction<T, R> function) {
//clean the closure
function = input.getExecutionEnvironment().clean(function);
TypeInformation<R> resultType = TypeExtractor.getFoldReturnTypes(function, input.getType(),
Utils.getCallLocationName(), true);
return apply(new FoldAllWindowFunction<W, T, R>(initialValue, function), resultType);
}
/**
* Applies the given fold function to each window. The window function is called for each
* evaluation of the window for each key individually. The output of the reduce function is
* interpreted as a regular non-windowed stream.
*
* @param function The fold function.
* @return The data stream that is the result of applying the fold function to the window.
*/
public <R> SingleOutputStreamOperator<R, ?> fold(R initialValue, FoldFunction<T, R> function, TypeInformation<R> resultType) {
//clean the closure
function = input.getExecutionEnvironment().clean(function);
return apply(new FoldAllWindowFunction<W, T, R>(initialValue, function), resultType);
}
/**
* Applies a window function to the window. The window function is called for each evaluation
* of the window for each key individually. The output of the window function is interpreted
* as a regular non-windowed stream.
* <p>
* Not that this function requires that all data in the windows is buffered until the window
* is evaluated, as the function provides no means of pre-aggregation.
*
* @param function The window function.
* @return The data stream that is the result of applying the window function to the window.
*/
public <R> SingleOutputStreamOperator<R, ?> apply(AllWindowFunction<T, R, W> function) {
TypeInformation<T> inType = input.getType();
TypeInformation<R> resultType = TypeExtractor.getUnaryOperatorReturnType(
function, AllWindowFunction.class, true, true, inType, null, false);
return apply(function, resultType);
}
/**
* Applies the given window function to each window. The window function is called for each evaluation
* of the window for each key individually. The output of the window function is interpreted
* as a regular non-windowed stream.
* <p>
* Not that this function requires that all data in the windows is buffered until the window
* is evaluated, as the function provides no means of pre-aggregation.
*
* @param function The window function.
* @return The data stream that is the result of applying the window function to the window.
*/
public <R> SingleOutputStreamOperator<R, ?> apply(AllWindowFunction<T, R, W> function, TypeInformation<R> resultType) {
//clean the closure
function = input.getExecutionEnvironment().clean(function);
String callLocation = Utils.getCallLocationName();
String udfName = "MapWindow at " + callLocation;
SingleOutputStreamOperator<R, ?> result = createFastTimeOperatorIfValid(function, resultType, udfName);
if (result != null) {
return result;
}
String opName = "TriggerWindow(" + windowAssigner + ", " + trigger + ", " + udfName + ")";
NonKeyedWindowOperator<T, R, W> operator;
boolean setProcessingTime = input.getExecutionEnvironment().getStreamTimeCharacteristic() == TimeCharacteristic.ProcessingTime;
if (evictor != null) {
operator = new EvictingNonKeyedWindowOperator<>(windowAssigner,
new HeapWindowBuffer.Factory<T>(),
function,
trigger,
evictor).enableSetProcessingTime(setProcessingTime);
} else {
operator = new NonKeyedWindowOperator<>(windowAssigner,
new HeapWindowBuffer.Factory<T>(),
function,
trigger).enableSetProcessingTime(setProcessingTime);
}
return input.transform(opName, resultType, operator).setParallelism(1);
}
// ------------------------------------------------------------------------
// Aggregations on the windows
// ------------------------------------------------------------------------
/**
* Applies an aggregation that sums every window of the data stream at the
* given position.
*
* @param positionToSum The position in the tuple/array to sum
* @return The transformed DataStream.
*/
public SingleOutputStreamOperator<T, ?> sum(int positionToSum) {
return aggregate(new SumAggregator<>(positionToSum, input.getType(), input.getExecutionConfig()));
}
/**
* Applies an aggregation that sums every window of the pojo data stream at
* the given field for every window.
*
* <p>
* A field expression is either
* the name of a public field or a getter method with parentheses of the
* stream's underlying type. A dot can be used to drill down into objects,
* as in {@code "field1.getInnerField2()" }.
*
* @param field The field to sum
* @return The transformed DataStream.
*/
public SingleOutputStreamOperator<T, ?> sum(String field) {
return aggregate(new SumAggregator<>(field, input.getType(), input.getExecutionConfig()));
}
/**
* Applies an aggregation that that gives the minimum value of every window
* of the data stream at the given position.
*
* @param positionToMin The position to minimize
* @return The transformed DataStream.
*/
public SingleOutputStreamOperator<T, ?> min(int positionToMin) {
return aggregate(new ComparableAggregator<>(positionToMin, input.getType(), AggregationFunction.AggregationType.MIN, input.getExecutionConfig()));
}
/**
* Applies an aggregation that that gives the minimum value of the pojo data
* stream at the given field expression for every window.
*
* <p>
* A field
* expression is either the name of a public field or a getter method with
* parentheses of the {@link DataStream}S underlying type. A dot can be used
* to drill down into objects, as in {@code "field1.getInnerField2()" }.
*
* @param field The field expression based on which the aggregation will be applied.
* @return The transformed DataStream.
*/
public SingleOutputStreamOperator<T, ?> min(String field) {
return aggregate(new ComparableAggregator<>(field, input.getType(), AggregationFunction.AggregationType.MIN, false, input.getExecutionConfig()));
}
/**
* Applies an aggregation that gives the minimum element of every window of
* the data stream by the given position. If more elements have the same
* minimum value the operator returns the first element by default.
*
* @param positionToMinBy
* The position to minimize by
* @return The transformed DataStream.
*/
public SingleOutputStreamOperator<T, ?> minBy(int positionToMinBy) {
return this.minBy(positionToMinBy, true);
}
/**
* Applies an aggregation that gives the minimum element of every window of
* the data stream by the given position. If more elements have the same
* minimum value the operator returns the first element by default.
*
* @param positionToMinBy The position to minimize by
* @return The transformed DataStream.
*/
public SingleOutputStreamOperator<T, ?> minBy(String positionToMinBy) {
return this.minBy(positionToMinBy, true);
}
/**
* Applies an aggregation that gives the minimum element of every window of
* the data stream by the given position. If more elements have the same
* minimum value the operator returns either the first or last one depending
* on the parameter setting.
*
* @param positionToMinBy The position to minimize
* @param first If true, then the operator return the first element with the minimum value, otherwise returns the last
* @return The transformed DataStream.
*/
public SingleOutputStreamOperator<T, ?> minBy(int positionToMinBy, boolean first) {
return aggregate(new ComparableAggregator<>(positionToMinBy, input.getType(), AggregationFunction.AggregationType.MINBY, first, input.getExecutionConfig()));
}
/**
* Applies an aggregation that that gives the minimum element of the pojo
* data stream by the given field expression for every window. A field
* expression is either the name of a public field or a getter method with
* parentheses of the {@link DataStream DataStreams} underlying type. A dot can be used
* to drill down into objects, as in {@code "field1.getInnerField2()" }.
*
* @param field The field expression based on which the aggregation will be applied.
* @param first If True then in case of field equality the first object will be returned
* @return The transformed DataStream.
*/
public SingleOutputStreamOperator<T, ?> minBy(String field, boolean first) {
return aggregate(new ComparableAggregator<>(field, input.getType(), AggregationFunction.AggregationType.MINBY, first, input.getExecutionConfig()));
}
/**
* Applies an aggregation that gives the maximum value of every window of
* the data stream at the given position.
*
* @param positionToMax The position to maximize
* @return The transformed DataStream.
*/
public SingleOutputStreamOperator<T, ?> max(int positionToMax) {
return aggregate(new ComparableAggregator<>(positionToMax, input.getType(), AggregationFunction.AggregationType.MAX, input.getExecutionConfig()));
}
/**
* Applies an aggregation that that gives the maximum value of the pojo data
* stream at the given field expression for every window. A field expression
* is either the name of a public field or a getter method with parentheses
* of the {@link DataStream DataStreams} underlying type. A dot can be used to drill
* down into objects, as in {@code "field1.getInnerField2()" }.
*
* @param field The field expression based on which the aggregation will be applied.
* @return The transformed DataStream.
*/
public SingleOutputStreamOperator<T, ?> max(String field) {
return aggregate(new ComparableAggregator<>(field, input.getType(), AggregationFunction.AggregationType.MAX, false, input.getExecutionConfig()));
}
/**
* Applies an aggregation that gives the maximum element of every window of
* the data stream by the given position. If more elements have the same
* maximum value the operator returns the first by default.
*
* @param positionToMaxBy
* The position to maximize by
* @return The transformed DataStream.
*/
public SingleOutputStreamOperator<T, ?> maxBy(int positionToMaxBy) {
return this.maxBy(positionToMaxBy, true);
}
/**
* Applies an aggregation that gives the maximum element of every window of
* the data stream by the given position. If more elements have the same
* maximum value the operator returns the first by default.
*
* @param positionToMaxBy
* The position to maximize by
* @return The transformed DataStream.
*/
public SingleOutputStreamOperator<T, ?> maxBy(String positionToMaxBy) {
return this.maxBy(positionToMaxBy, true);
}
/**
* Applies an aggregation that gives the maximum element of every window of
* the data stream by the given position. If more elements have the same
* maximum value the operator returns either the first or last one depending
* on the parameter setting.
*
* @param positionToMaxBy The position to maximize by
* @param first If true, then the operator return the first element with the maximum value, otherwise returns the last
* @return The transformed DataStream.
*/
public SingleOutputStreamOperator<T, ?> maxBy(int positionToMaxBy, boolean first) {
return aggregate(new ComparableAggregator<>(positionToMaxBy, input.getType(), AggregationFunction.AggregationType.MAXBY, first, input.getExecutionConfig()));
}
/**
* Applies an aggregation that that gives the maximum element of the pojo
* data stream by the given field expression for every window. A field
* expression is either the name of a public field or a getter method with
* parentheses of the {@link DataStream}S underlying type. A dot can be used
* to drill down into objects, as in {@code "field1.getInnerField2()" }.
*
* @param field The field expression based on which the aggregation will be applied.
* @param first If True then in case of field equality the first object will be returned
* @return The transformed DataStream.
*/
public SingleOutputStreamOperator<T, ?> maxBy(String field, boolean first) {
return aggregate(new ComparableAggregator<>(field, input.getType(), AggregationFunction.AggregationType.MAXBY, first, input.getExecutionConfig()));
}
private SingleOutputStreamOperator<T, ?> aggregate(AggregationFunction<T> aggregator) {
return reduce(aggregator);
}
// ------------------------------------------------------------------------
// Utilities
// ------------------------------------------------------------------------
private <R> SingleOutputStreamOperator<R, ?> createFastTimeOperatorIfValid(
Function function,
TypeInformation<R> resultType,
String functionName) {
// TODO: add once non-parallel fast aligned time windows operator is ready
return null;
}
public StreamExecutionEnvironment getExecutionEnvironment() {
return input.getExecutionEnvironment();
}
public TypeInformation<T> getInputType() {
return input.getType();
}
}