Skip to content

Commit

Permalink
[FLINK-3200] Use Partitioned State in WindowOperator
Browse files Browse the repository at this point in the history
This changes window operator to use the new partitioned state
abstraction for keeping window contents instead of custom internal
state and the checkpointed interface.

For now, timers are still kept as custom checkpointed state, however.

WindowOperator now expects a StateIdentifier for MergingState, this can
either be for ReducingState or ListState but WindowOperator is agnostic
to the type of State. Also the signature of WindowFunction is changed to
include the type of intermediate input. For example, if a ReducingState
is used the input of the WindowFunction is T (where T is the input
type). If using a ListState the input of the WindowFunction would be of
type Iterable[T].
  • Loading branch information
aljoscha committed Feb 1, 2016
1 parent c95e91e commit c949a19
Show file tree
Hide file tree
Showing 49 changed files with 1,392 additions and 956 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ public long getCurrentWatermark() {
/**
* Builds up-to-date partial models on new training data.
*/
public static class PartialModelBuilder implements AllWindowFunction<Integer, Double[], TimeWindow> {
public static class PartialModelBuilder implements AllWindowFunction<Iterable<Integer>, Double[], TimeWindow> {
private static final long serialVersionUID = 1L;

protected Double[] buildPartialModel(Iterable<Integer> values) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ public Key getKey(Type value) {
}
}

public static class SummingWindowFunction implements WindowFunction<Tuple2<Long, Long>, Tuple2<Long, Long>, Long, Window> {
public static class SummingWindowFunction implements WindowFunction<Iterable<Tuple2<Long, Long>>, Tuple2<Long, Long>, Long, Window> {

@Override
public void apply(Long key, Window window, Iterable<Tuple2<Long, Long>> values, Collector<Tuple2<Long, Long>> out) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,10 @@

package org.apache.flink.streaming.examples.windowing;

import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
Expand Down Expand Up @@ -100,6 +103,10 @@ private static class SessionTrigger implements Trigger<Tuple3<String, Long, Inte

private final Long sessionTimeout;

private final ValueStateDescriptor<Long> stateDesc = new ValueStateDescriptor<>("last-seen", 1L,
BasicTypeInfo.LONG_TYPE_INFO.createSerializer(new ExecutionConfig()));


public SessionTrigger(Long sessionTimeout) {
this.sessionTimeout = sessionTimeout;

Expand All @@ -108,7 +115,7 @@ public SessionTrigger(Long sessionTimeout) {
@Override
public TriggerResult onElement(Tuple3<String, Long, Integer> element, long timestamp, GlobalWindow window, TriggerContext ctx) throws Exception {

ValueState<Long> lastSeenState = ctx.getKeyValueState("last-seen", 1L);
ValueState<Long> lastSeenState = ctx.getPartitionedState(stateDesc);
Long lastSeen = lastSeenState.value();

Long timeSinceLastEvent = timestamp - lastSeen;
Expand All @@ -127,7 +134,7 @@ public TriggerResult onElement(Tuple3<String, Long, Integer> element, long times

@Override
public TriggerResult onEventTime(long time, GlobalWindow window, TriggerContext ctx) throws Exception {
ValueState<Long> lastSeenState = ctx.getKeyValueState("last-seen", 1L);
ValueState<Long> lastSeenState = ctx.getPartitionedState(stateDesc);
Long lastSeen = lastSeenState.value();

if (time - lastSeen >= sessionTimeout) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ public double getDelta(
Tuple4<Integer, Integer, Double, Long> newDataPoint) {
return newDataPoint.f2 - oldDataPoint.f2;
}
}))
}, carData.getType().createSerializer(env.getConfig())))
.maxBy(1);

if (fileOutput) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -192,9 +192,7 @@ public class TopSpeedWindowingExampleData {
"(1,95,1973.6111111111115,1424952007664)\n" +
"(0,100,1709.7222222222229,1424952006663)\n" +
"(0,100,1737.5000000000007,1424952007664)\n" +
"(1,95,1973.6111111111115,1424952007664)\n" +
"(0,100,1791.6666666666674,1424952009664)\n" +
"(1,95,2211.1111111111118,1424952017668)\n";
"(1,95,1973.6111111111115,1424952007664)\n";

public static final String TOP_CASE_CLASS_SPEEDS =
"CarEvent(0,55,15.277777777777777,1424951918630)\n" +
Expand Down Expand Up @@ -267,9 +265,7 @@ public class TopSpeedWindowingExampleData {
"CarEvent(1,95,1973.6111111111115,1424952007664)\n" +
"CarEvent(0,100,1709.7222222222229,1424952006663)\n" +
"CarEvent(0,100,1737.5000000000007,1424952007664)\n" +
"CarEvent(1,95,1973.6111111111115,1424952007664)\n" +
"CarEvent(0,100,1791.6666666666674,1424952009664)\n" +
"CarEvent(1,95,2211.1111111111118,1424952017668)\n";
"CarEvent(1,95,1973.6111111111115,1424952007664)\n";

private TopSpeedWindowingExampleData() {
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ object TopSpeedWindowing {
.evictor(TimeEvictor.of(Time.of(evictionSec * 1000, TimeUnit.MILLISECONDS)))
.trigger(DeltaTrigger.of(triggerMeters, new DeltaFunction[CarEvent] {
def getDelta(oldSp: CarEvent, newSp: CarEvent): Double = newSp.distance - oldSp.distance
}))
}, cars.getType().createSerializer(env.getConfig)))
// .window(Time.of(evictionSec * 1000, (car : CarEvent) => car.time))
// .every(Delta.of[CarEvent](triggerMeters,
// (oldSp,newSp) => newSp.distance-oldSp.distance, CarEvent(0,0,0,0)))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,20 @@
import org.apache.flink.api.common.functions.FoldFunction;
import org.apache.flink.api.common.functions.Function;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.functions.RichFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.Utils;
import org.apache.flink.api.java.typeutils.GenericTypeInfo;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.aggregation.AggregationFunction;
import org.apache.flink.streaming.api.functions.aggregation.ComparableAggregator;
import org.apache.flink.streaming.api.functions.aggregation.SumAggregator;
import org.apache.flink.streaming.api.functions.windowing.FoldAllWindowFunction;
import org.apache.flink.streaming.api.functions.windowing.ReduceAllWindowFunction;
import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction;
import org.apache.flink.streaming.api.functions.windowing.ReduceApplyAllWindowFunction;
import org.apache.flink.streaming.api.functions.windowing.ReduceIterableAllWindowFunction;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
import org.apache.flink.streaming.api.windowing.evictors.Evictor;
Expand Down Expand Up @@ -126,6 +129,11 @@ public AllWindowedStream<T, W> evictor(Evictor<? super T, ? super W> evictor) {
* @return The data stream that is the result of applying the reduce function to the window.
*/
public SingleOutputStreamOperator<T, ?> reduce(ReduceFunction<T> function) {
if (function instanceof RichFunction) {
throw new UnsupportedOperationException("ReduceFunction of reduce can not be a RichFunction. " +
"Please use apply(ReduceFunction, WindowFunction) instead.");
}

//clean the closure
function = input.getExecutionEnvironment().clean(function);

Expand All @@ -147,15 +155,15 @@ public AllWindowedStream<T, W> evictor(Evictor<? super T, ? super W> evictor) {
operator = new EvictingNonKeyedWindowOperator<>(windowAssigner,
windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
new HeapWindowBuffer.Factory<T>(),
new ReduceAllWindowFunction<W, T>(function),
new ReduceIterableAllWindowFunction<W, T>(function),
trigger,
evictor).enableSetProcessingTime(setProcessingTime);

} else {
operator = new NonKeyedWindowOperator<>(windowAssigner,
windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
new PreAggregatingHeapWindowBuffer.Factory<>(function),
new ReduceAllWindowFunction<W, T>(function),
new ReduceIterableAllWindowFunction<W, T>(function),
trigger).enableSetProcessingTime(setProcessingTime);
}

Expand Down Expand Up @@ -205,10 +213,11 @@ public AllWindowedStream<T, W> evictor(Evictor<? super T, ? super W> evictor) {
* @param function The window function.
* @return The data stream that is the result of applying the window function to the window.
*/
public <R> SingleOutputStreamOperator<R, ?> apply(AllWindowFunction<T, R, W> function) {
TypeInformation<T> inType = input.getType();
public <R> SingleOutputStreamOperator<R, ?> apply(AllWindowFunction<Iterable<T>, R, W> function) {
@SuppressWarnings("unchecked, rawtypes")
TypeInformation<Iterable<T>> iterTypeInfo = new GenericTypeInfo<>((Class) Iterable.class);
TypeInformation<R> resultType = TypeExtractor.getUnaryOperatorReturnType(
function, AllWindowFunction.class, true, true, inType, null, false);
function, AllWindowFunction.class, true, true, iterTypeInfo, null, false);

return apply(function, resultType);
}
Expand All @@ -224,7 +233,7 @@ public AllWindowedStream<T, W> evictor(Evictor<? super T, ? super W> evictor) {
* @param function The window function.
* @return The data stream that is the result of applying the window function to the window.
*/
public <R> SingleOutputStreamOperator<R, ?> apply(AllWindowFunction<T, R, W> function, TypeInformation<R> resultType) {
public <R> SingleOutputStreamOperator<R, ?> apply(AllWindowFunction<Iterable<T>, R, W> function, TypeInformation<R> resultType) {
//clean the closure
function = input.getExecutionEnvironment().clean(function);

Expand Down Expand Up @@ -297,6 +306,10 @@ public AllWindowedStream<T, W> evictor(Evictor<? super T, ? super W> evictor) {
* @return The data stream that is the result of applying the window function to the window.
*/
public <R> SingleOutputStreamOperator<R, ?> apply(ReduceFunction<T> preAggregator, AllWindowFunction<T, R, W> function, TypeInformation<R> resultType) {
if (preAggregator instanceof RichFunction) {
throw new UnsupportedOperationException("Pre-aggregator of apply can not be a RichFunction.");
}

//clean the closures
function = input.getExecutionEnvironment().clean(function);
preAggregator = input.getExecutionEnvironment().clean(preAggregator);
Expand All @@ -314,16 +327,16 @@ public AllWindowedStream<T, W> evictor(Evictor<? super T, ? super W> evictor) {
operator = new EvictingNonKeyedWindowOperator<>(windowAssigner,
windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
new HeapWindowBuffer.Factory<T>(),
function,
new ReduceApplyAllWindowFunction<>(preAggregator, function),
trigger,
evictor).enableSetProcessingTime(setProcessingTime);

} else {
operator = new NonKeyedWindowOperator<>(windowAssigner,
windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
new PreAggregatingHeapWindowBuffer.Factory<>(preAggregator),
function,
trigger).enableSetProcessingTime(setProcessingTime);
windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
new PreAggregatingHeapWindowBuffer.Factory<>(preAggregator),
new ReduceApplyAllWindowFunction<>(preAggregator, function),
trigger).enableSetProcessingTime(setProcessingTime);
}

return input.transform(opName, resultType, operator).setParallelism(1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -545,7 +545,7 @@ public KEY getKey(TaggedUnion<T1, T2> value) throws Exception{

private static class CoGroupWindowFunction<T1, T2, T, KEY, W extends Window>
extends WrappingFunction<CoGroupFunction<T1, T2, T>>
implements WindowFunction<TaggedUnion<T1, T2>, T, KEY, W> {
implements WindowFunction<Iterable<TaggedUnion<T1, T2>>, T, KEY, W> {

private static final long serialVersionUID = 1L;

Expand Down
Loading

0 comments on commit c949a19

Please sign in to comment.