Skip to content

Commit

Permalink
[streaming] WindowBuffer interface added for preaggregator logic + si…
Browse files Browse the repository at this point in the history
…mple tumbling prereducer
  • Loading branch information
gyfora authored and mbalassi committed Feb 16, 2015
1 parent 8708688 commit ef7b7cd
Show file tree
Hide file tree
Showing 16 changed files with 360 additions and 133 deletions.
Expand Up @@ -28,11 +28,11 @@
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.streaming.api.invokable.StreamInvokable;
import org.apache.flink.streaming.api.invokable.operator.windowing.WindowFlattener;
import org.apache.flink.streaming.api.invokable.operator.windowing.StreamWindow;
import org.apache.flink.streaming.api.invokable.operator.windowing.StreamWindowTypeInfo;
import org.apache.flink.streaming.api.invokable.operator.windowing.WindowMerger;
import org.apache.flink.streaming.api.invokable.operator.windowing.WindowFlattener;
import org.apache.flink.streaming.api.invokable.operator.windowing.WindowMapper;
import org.apache.flink.streaming.api.invokable.operator.windowing.WindowMerger;
import org.apache.flink.streaming.api.invokable.operator.windowing.WindowPartitioner;
import org.apache.flink.streaming.api.invokable.operator.windowing.WindowReducer;

Expand All @@ -48,13 +48,15 @@
*/
public class DiscretizedStream<OUT> extends WindowedDataStream<OUT> {

protected SingleOutputStreamOperator<StreamWindow<OUT>, ?> discretizedStream;
private SingleOutputStreamOperator<StreamWindow<OUT>, ?> discretizedStream;
private WindowTransformation transformation;

protected DiscretizedStream(SingleOutputStreamOperator<StreamWindow<OUT>, ?> discretizedStream,
KeySelector<OUT, ?> groupByKey) {
KeySelector<OUT, ?> groupByKey, WindowTransformation tranformation) {
super();
this.groupByKey = groupByKey;
this.discretizedStream = discretizedStream;
this.transformation = tranformation;
}

/**
Expand All @@ -70,12 +72,13 @@ protected DiscretizedStream(SingleOutputStreamOperator<StreamWindow<OUT>, ?> dis
@Override
public DiscretizedStream<OUT> reduceWindow(ReduceFunction<OUT> reduceFunction) {

DiscretizedStream<OUT> out = partition(false).transform("Window Reduce", getType(),
DiscretizedStream<OUT> out = partition(transformation).transform(
WindowTransformation.REDUCEWINDOW, "Window Reduce", getType(),
new WindowReducer<OUT>(reduceFunction)).merge();

if (!isGrouped()) {
return out.transform("Window Reduce", out.getType(), new WindowReducer<OUT>(
reduceFunction));
return out.transform(WindowTransformation.REDUCEWINDOW, "Window Reduce", out.getType(),
new WindowReducer<OUT>(reduceFunction));
} else {
return out;
}
Expand Down Expand Up @@ -106,7 +109,8 @@ public <R> DiscretizedStream<R> mapWindow(GroupReduceFunction<OUT, R> reduceFunc
@Override
public <R> DiscretizedStream<R> mapWindow(GroupReduceFunction<OUT, R> reduceFunction,
TypeInformation<R> returnType) {
DiscretizedStream<R> out = partition(true).transform("Window Reduce", returnType,
DiscretizedStream<R> out = partition(transformation).transform(
WindowTransformation.REDUCEWINDOW, "Window Reduce", returnType,
new WindowMapper<OUT, R>(reduceFunction));

if (isGrouped()) {
Expand All @@ -116,30 +120,31 @@ public <R> DiscretizedStream<R> mapWindow(GroupReduceFunction<OUT, R> reduceFunc
}
}

private <R> DiscretizedStream<R> transform(String operatorName, TypeInformation<R> retType,
private <R> DiscretizedStream<R> transform(WindowTransformation transformation,
String operatorName, TypeInformation<R> retType,
StreamInvokable<StreamWindow<OUT>, StreamWindow<R>> invokable) {

return wrap(discretizedStream.transform(operatorName, new StreamWindowTypeInfo<R>(retType),
invokable));
invokable), transformation);
}

private DiscretizedStream<OUT> partition(boolean isMap) {
private DiscretizedStream<OUT> partition(WindowTransformation transformation) {

int parallelism = discretizedStream.getParallelism();

if (isGrouped()) {
DiscretizedStream<OUT> out = transform("Window partitioner", getType(),
DiscretizedStream<OUT> out = transform(transformation, "Window partitioner", getType(),
new WindowPartitioner<OUT>(groupByKey)).setParallelism(parallelism);

out.groupByKey = null;

return out;
} else if (!isMap) {
} else if (transformation == WindowTransformation.MAPWINDOW) {
return transform(
transformation,
"Window partitioner",
getType(),
new WindowPartitioner<OUT>(discretizedStream.environment
.getDegreeOfParallelism())).setParallelism(parallelism);
new WindowPartitioner<OUT>(parallelism)).setParallelism(parallelism);
} else {
return this;
}
Expand All @@ -160,9 +165,15 @@ public DataStream<OUT> flatten() {
return discretizedStream.transform("Window Flatten", getType(), new WindowFlattener<OUT>());
}

@SuppressWarnings({ "unchecked", "rawtypes" })
@SuppressWarnings("rawtypes")
private <R> DiscretizedStream<R> wrap(SingleOutputStreamOperator stream) {
return new DiscretizedStream<R>(stream, (KeySelector<R, ?>) this.groupByKey);
return wrap(stream, transformation);
}

@SuppressWarnings({ "unchecked", "rawtypes" })
private <R> DiscretizedStream<R> wrap(SingleOutputStreamOperator stream,
WindowTransformation transformation) {
return new DiscretizedStream<R>(stream, (KeySelector<R, ?>) this.groupByKey, transformation);
}

public DataStream<StreamWindow<OUT>> getDiscretizedStream() {
Expand Down Expand Up @@ -218,7 +229,7 @@ public TypeInformation<OUT> getType() {
}

protected DiscretizedStream<OUT> copy() {
return new DiscretizedStream<OUT>(discretizedStream.copy(), groupByKey);
return new DiscretizedStream<OUT>(discretizedStream.copy(), groupByKey, transformation);
}

}
Expand Up @@ -17,6 +17,7 @@

package org.apache.flink.streaming.api.datastream;

import org.apache.flink.api.common.functions.Function;
import org.apache.flink.api.common.functions.GroupReduceFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.functions.RichGroupReduceFunction;
Expand All @@ -31,11 +32,14 @@
import org.apache.flink.streaming.api.function.aggregation.ComparableAggregator;
import org.apache.flink.streaming.api.function.aggregation.SumAggregator;
import org.apache.flink.streaming.api.invokable.StreamInvokable;
import org.apache.flink.streaming.api.invokable.operator.windowing.BasicWindowBuffer;
import org.apache.flink.streaming.api.invokable.operator.windowing.GroupedStreamDiscretizer;
import org.apache.flink.streaming.api.invokable.operator.windowing.GroupedTimeDiscretizer;
import org.apache.flink.streaming.api.invokable.operator.windowing.StreamDiscretizer;
import org.apache.flink.streaming.api.invokable.operator.windowing.StreamWindow;
import org.apache.flink.streaming.api.invokable.operator.windowing.StreamWindowTypeInfo;
import org.apache.flink.streaming.api.invokable.operator.windowing.TumblingPreReducer;
import org.apache.flink.streaming.api.invokable.operator.windowing.WindowBuffer;
import org.apache.flink.streaming.api.windowing.helper.Time;
import org.apache.flink.streaming.api.windowing.helper.WindowingHelper;
import org.apache.flink.streaming.api.windowing.policy.CloneableEvictionPolicy;
Expand Down Expand Up @@ -210,35 +214,75 @@ public WindowedDataStream<OUT> local() {
return out;
}

private DiscretizedStream<OUT> discretize(boolean isMap) {
private DiscretizedStream<OUT> discretize(WindowTransformation transformation) {

StreamInvokable<OUT, StreamWindow<OUT>> discretizer = getDiscretizer();
StreamInvokable<OUT, StreamWindow<OUT>> discretizer = getDiscretizer(transformation,
getTrigger(), getEviction(), discretizerKey);

int parallelism = isLocal || (discretizerKey != null) ? dataStream.environment
.getDegreeOfParallelism() : 1;
int parallelism = getDiscretizerParallelism();

return new DiscretizedStream<OUT>(dataStream.transform("Stream Discretizer",
new StreamWindowTypeInfo<OUT>(getType()), discretizer).setParallelism(parallelism),
groupByKey);
groupByKey, transformation);

}

protected enum WindowTransformation {

REDUCEWINDOW, MAPWINDOW, NONE;

private Function UDF;

public WindowTransformation with(Function UDF) {
this.UDF = UDF;
return this;
}
}

private int getDiscretizerParallelism() {
return isLocal || (discretizerKey != null) ? dataStream.environment
.getDegreeOfParallelism() : 1;
}

private StreamInvokable<OUT, StreamWindow<OUT>> getDiscretizer() {
private StreamInvokable<OUT, StreamWindow<OUT>> getDiscretizer(
WindowTransformation transformation, TriggerPolicy<OUT> trigger,
EvictionPolicy<OUT> eviction, KeySelector<OUT, ?> discretizerKey) {

WindowBuffer<OUT> windowBuffer = getWindowBuffer(transformation, trigger, eviction,
discretizerKey);

if (discretizerKey == null) {
return new StreamDiscretizer<OUT>(getTrigger(), getEvicter());
} else if (getTrigger() instanceof TimeTriggerPolicy
&& ((TimeTriggerPolicy<OUT>) getTrigger()).timestampWrapper.isDefaultTimestamp()) {
return new StreamDiscretizer<OUT>(trigger, eviction, windowBuffer);
} else if (trigger instanceof TimeTriggerPolicy
&& ((TimeTriggerPolicy<OUT>) trigger).timestampWrapper.isDefaultTimestamp()) {
return new GroupedTimeDiscretizer<OUT>(discretizerKey,
(TimeTriggerPolicy<OUT>) getTrigger(),
(CloneableEvictionPolicy<OUT>) getEvicter());
(TimeTriggerPolicy<OUT>) trigger, (CloneableEvictionPolicy<OUT>) eviction,
windowBuffer);
} else {
return new GroupedStreamDiscretizer<OUT>(discretizerKey,
(CloneableTriggerPolicy<OUT>) getTrigger(),
(CloneableEvictionPolicy<OUT>) getEvicter());
(CloneableTriggerPolicy<OUT>) trigger, (CloneableEvictionPolicy<OUT>) eviction,
windowBuffer);
}

}

@SuppressWarnings("unchecked")
private WindowBuffer<OUT> getWindowBuffer(WindowTransformation transformation,
TriggerPolicy<OUT> trigger, EvictionPolicy<OUT> eviction,
KeySelector<OUT, ?> discretizerKey) {

if (transformation == WindowTransformation.REDUCEWINDOW
&& eviction instanceof TumblingEvictionPolicy) {
if (discretizerKey == null) {
return new TumblingPreReducer<OUT>((ReduceFunction<OUT>) transformation.UDF,
getType().createSerializer());
} else {
return new BasicWindowBuffer<OUT>();
}
}
return new BasicWindowBuffer<OUT>();
}

/**
* Applies a reduce transformation on the windowed data stream by reducing
* the current window at every trigger.The user can also extend the
Expand All @@ -250,7 +294,8 @@ private StreamInvokable<OUT, StreamWindow<OUT>> getDiscretizer() {
* @return The transformed DataStream
*/
public DiscretizedStream<OUT> reduceWindow(ReduceFunction<OUT> reduceFunction) {
return discretize(false).reduceWindow(reduceFunction);
return discretize(WindowTransformation.REDUCEWINDOW.with(reduceFunction)).reduceWindow(
reduceFunction);
}

/**
Expand All @@ -267,7 +312,8 @@ public DiscretizedStream<OUT> reduceWindow(ReduceFunction<OUT> reduceFunction) {
* @return The transformed DataStream
*/
public <R> WindowedDataStream<R> mapWindow(GroupReduceFunction<OUT, R> reduceFunction) {
return discretize(true).mapWindow(reduceFunction);
return discretize(WindowTransformation.MAPWINDOW.with(reduceFunction)).mapWindow(
reduceFunction);
}

/**
Expand All @@ -289,7 +335,8 @@ public <R> WindowedDataStream<R> mapWindow(GroupReduceFunction<OUT, R> reduceFun
public <R> WindowedDataStream<R> mapWindow(GroupReduceFunction<OUT, R> reduceFunction,
TypeInformation<R> outType) {

return discretize(true).mapWindow(reduceFunction, outType);
return discretize(WindowTransformation.MAPWINDOW.with(reduceFunction)).mapWindow(
reduceFunction, outType);
}

public DataStream<OUT> flatten() {
Expand Down Expand Up @@ -533,7 +580,7 @@ protected TriggerPolicy<OUT> getTrigger() {

}

protected EvictionPolicy<OUT> getEvicter() {
protected EvictionPolicy<OUT> getEviction() {

if (evictionHelper != null) {
return evictionHelper.toEvict();
Expand Down Expand Up @@ -571,7 +618,7 @@ protected boolean isGrouped() {
}

public DataStream<StreamWindow<OUT>> getDiscretizedStream() {
return discretize(true).getDiscretizedStream();
return discretize(WindowTransformation.NONE).getDiscretizedStream();
}

protected static class WindowKey<R> implements KeySelector<StreamWindow<R>, Integer> {
Expand Down
Expand Up @@ -51,7 +51,7 @@ protected void callUserFunction() throws Exception {
nextValue = nextObject;

if (currentValue != null) {
currentValue = reducer.reduce(currentValue, nextValue);
currentValue = reducer.reduce(copy(currentValue), nextValue);
} else {
currentValue = nextValue;

Expand Down
@@ -0,0 +1,74 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.streaming.api.invokable.operator.windowing;

import java.util.LinkedList;
import java.util.NoSuchElementException;

import org.apache.flink.util.Collector;

public class BasicWindowBuffer<T> implements WindowBuffer<T> {

private static final long serialVersionUID = 1L;
protected LinkedList<T> buffer;

public BasicWindowBuffer() {
this.buffer = new LinkedList<T>();
}

public boolean emitWindow(Collector<StreamWindow<T>> collector) {
if (!buffer.isEmpty()) {
StreamWindow<T> currentWindow = new StreamWindow<T>();
currentWindow.addAll(buffer);
collector.collect(currentWindow);
return true;
} else {
return false;
}
}

public void store(T element) throws Exception {
buffer.add(element);
}

public void evict(int n) {
for (int i = 0; i < n; i++) {
try {
buffer.removeFirst();
} catch (NoSuchElementException e) {
// In case no more elements are in the buffer:
// Prevent failure and stop deleting.
break;
}
}
}

public int size() {
return buffer.size();
}

@Override
public BasicWindowBuffer<T> clone() {
return new BasicWindowBuffer<T>();
}

@Override
public String toString() {
return buffer.toString();
}
}

0 comments on commit ef7b7cd

Please sign in to comment.