Skip to content

Commit

Permalink
[FLINK-1643] [streaming] Auto detection of tumbling policies added + …
Browse files Browse the repository at this point in the history
…WindowedDataStream refactor
  • Loading branch information
gyfora committed Mar 10, 2015
1 parent 70abc16 commit aacd4f2
Show file tree
Hide file tree
Showing 13 changed files with 300 additions and 139 deletions.
Expand Up @@ -46,14 +46,11 @@
import org.apache.flink.streaming.api.windowing.WindowUtils; import org.apache.flink.streaming.api.windowing.WindowUtils;
import org.apache.flink.streaming.api.windowing.WindowUtils.WindowTransformation; import org.apache.flink.streaming.api.windowing.WindowUtils.WindowTransformation;
import org.apache.flink.streaming.api.windowing.helper.Time; import org.apache.flink.streaming.api.windowing.helper.Time;
import org.apache.flink.streaming.api.windowing.helper.TimestampWrapper;
import org.apache.flink.streaming.api.windowing.helper.WindowingHelper; import org.apache.flink.streaming.api.windowing.helper.WindowingHelper;
import org.apache.flink.streaming.api.windowing.policy.CloneableEvictionPolicy; import org.apache.flink.streaming.api.windowing.policy.CloneableEvictionPolicy;
import org.apache.flink.streaming.api.windowing.policy.CloneableTriggerPolicy; import org.apache.flink.streaming.api.windowing.policy.CloneableTriggerPolicy;
import org.apache.flink.streaming.api.windowing.policy.CountEvictionPolicy;
import org.apache.flink.streaming.api.windowing.policy.CountTriggerPolicy; import org.apache.flink.streaming.api.windowing.policy.CountTriggerPolicy;
import org.apache.flink.streaming.api.windowing.policy.EvictionPolicy; import org.apache.flink.streaming.api.windowing.policy.EvictionPolicy;
import org.apache.flink.streaming.api.windowing.policy.TimeEvictionPolicy;
import org.apache.flink.streaming.api.windowing.policy.TimeTriggerPolicy; import org.apache.flink.streaming.api.windowing.policy.TimeTriggerPolicy;
import org.apache.flink.streaming.api.windowing.policy.TriggerPolicy; import org.apache.flink.streaming.api.windowing.policy.TriggerPolicy;
import org.apache.flink.streaming.api.windowing.policy.TumblingEvictionPolicy; import org.apache.flink.streaming.api.windowing.policy.TumblingEvictionPolicy;
Expand Down Expand Up @@ -261,7 +258,7 @@ public DataStream<OUT> flatten() {
public DiscretizedStream<OUT> reduceWindow(ReduceFunction<OUT> reduceFunction) { public DiscretizedStream<OUT> reduceWindow(ReduceFunction<OUT> reduceFunction) {


WindowTransformation transformation = WindowTransformation.REDUCEWINDOW WindowTransformation transformation = WindowTransformation.REDUCEWINDOW
.with(reduceFunction); .with(clean(reduceFunction));


WindowBuffer<OUT> windowBuffer = getWindowBuffer(transformation); WindowBuffer<OUT> windowBuffer = getWindowBuffer(transformation);


Expand All @@ -288,7 +285,7 @@ public DiscretizedStream<OUT> reduceWindow(ReduceFunction<OUT> reduceFunction) {
* @return The transformed DataStream * @return The transformed DataStream
*/ */
public <R> WindowedDataStream<R> mapWindow(WindowMapFunction<OUT, R> windowMapFunction) { public <R> WindowedDataStream<R> mapWindow(WindowMapFunction<OUT, R> windowMapFunction) {
return discretize(WindowTransformation.MAPWINDOW.with(windowMapFunction), return discretize(WindowTransformation.MAPWINDOW.with(clean(windowMapFunction)),
new BasicWindowBuffer<OUT>()).mapWindow(windowMapFunction); new BasicWindowBuffer<OUT>()).mapWindow(windowMapFunction);
} }


Expand Down Expand Up @@ -370,42 +367,36 @@ private StreamInvokable<WindowEvent<OUT>, StreamWindow<OUT>> getBufferInvokable(


@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
private WindowBuffer<OUT> getWindowBuffer(WindowTransformation transformation) { private WindowBuffer<OUT> getWindowBuffer(WindowTransformation transformation) {
TriggerPolicy<OUT> trigger = getTrigger();
EvictionPolicy<OUT> eviction = getEviction();


if (transformation == WindowTransformation.REDUCEWINDOW) { if (transformation == WindowTransformation.REDUCEWINDOW) {
if (getTrigger() instanceof TumblingEvictionPolicy) { if (WindowUtils.isTumblingPolicy(trigger, eviction)) {
if (groupByKey == null) { if (groupByKey == null) {
return new TumblingPreReducer<OUT>( return new TumblingPreReducer<OUT>(
clean((ReduceFunction<OUT>) transformation.getUDF()), getType() (ReduceFunction<OUT>) transformation.getUDF(), getType()
.createSerializer(getExecutionConfig())); .createSerializer(getExecutionConfig()));
} else { } else {
return new TumblingGroupedPreReducer<OUT>( return new TumblingGroupedPreReducer<OUT>(
clean((ReduceFunction<OUT>) transformation.getUDF()), groupByKey, (ReduceFunction<OUT>) transformation.getUDF(), groupByKey, getType()
getType().createSerializer(getExecutionConfig())); .createSerializer(getExecutionConfig()));
}
} else if (getTrigger() instanceof CountTriggerPolicy
&& getEviction() instanceof CountEvictionPolicy && groupByKey == null) {

int slide = ((CountTriggerPolicy<OUT>) getTrigger()).getSlideSize();
int window = ((CountEvictionPolicy<OUT>) getEviction()).getWindowSize();
int start = ((CountEvictionPolicy<OUT>) getEviction()).getStart();
if (slide < window) {
return new SlidingCountPreReducer<OUT>(
clean((ReduceFunction<OUT>) transformation.getUDF()), dataStream
.getType().createSerializer(getExecutionConfig()), window,
slide, start);
}
} else if (getTrigger() instanceof TimeTriggerPolicy
&& getEviction() instanceof TimeEvictionPolicy && groupByKey == null) {
int slide = (int) ((TimeTriggerPolicy<OUT>) getTrigger()).getSlideSize();
int window = (int) ((TimeEvictionPolicy<OUT>) getEviction()).getWindowSize();
TimestampWrapper<OUT> wrapper = ((TimeEvictionPolicy<OUT>) getEviction())
.getTimeStampWrapper();
if (slide < window) {
return new SlidingTimePreReducer<OUT>(
clean((ReduceFunction<OUT>) transformation.getUDF()), dataStream
.getType().createSerializer(getExecutionConfig()), window,
slide, wrapper);
} }
} else if (WindowUtils.isSlidingCountPolicy(trigger, eviction) && groupByKey == null) {

return new SlidingCountPreReducer<OUT>(
clean((ReduceFunction<OUT>) transformation.getUDF()), dataStream.getType()
.createSerializer(getExecutionConfig()),
WindowUtils.getWindowSize(eviction), WindowUtils.getSlideSize(trigger),
((CountTriggerPolicy<?>) trigger).getStart());

} else if (WindowUtils.isSlidingTimePolicy(trigger, eviction) && groupByKey == null) {

return new SlidingTimePreReducer<OUT>(
(ReduceFunction<OUT>) transformation.getUDF(), dataStream.getType()
.createSerializer(getExecutionConfig()),
WindowUtils.getWindowSize(eviction), WindowUtils.getSlideSize(trigger),
WindowUtils.getTimeStampWrapper(trigger));

} }
} }
return new BasicWindowBuffer<OUT>(); return new BasicWindowBuffer<OUT>();
Expand Down
@@ -0,0 +1,84 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.streaming.api.invokable.operator.windowing;

import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.invokable.ChainableInvokable;
import org.apache.flink.streaming.api.windowing.StreamWindow;

/**
* This invokable applies either split or key partitioning depending on the
* transformation.
*/
public class ParallelWindowPartitioner<T> extends
ChainableInvokable<StreamWindow<T>, StreamWindow<T>> {

private KeySelector<T, ?> keySelector;
private int numberOfSplits;
private int currentWindowID = 0;

public ParallelWindowPartitioner(KeySelector<T, ?> keySelector) {
super(null);
this.keySelector = keySelector;
}

public ParallelWindowPartitioner(int numberOfSplits) {
super(null);
this.numberOfSplits = numberOfSplits;
}

private static final long serialVersionUID = 1L;

@Override
public void invoke() throws Exception {
while (isRunning && readNext() != null) {
callUserFunctionAndLogException();
}
}

@Override
protected void callUserFunction() throws Exception {
StreamWindow<T> currentWindow = nextObject;
currentWindow.setID(++currentWindowID);

if (keySelector == null) {
if (numberOfSplits <= 1) {
collector.collect(currentWindow);
} else {
for (StreamWindow<T> window : currentWindow.split(numberOfSplits)) {
collector.collect(window);
}
}
} else {

for (StreamWindow<T> window : currentWindow.partitionBy(keySelector)) {
collector.collect(window);
}

}
}

@Override
public void collect(StreamWindow<T> record) {
if (isRunning) {
nextObject = record;
callUserFunctionAndLogException();
}
}

}
Expand Up @@ -52,13 +52,10 @@ protected void callUserFunction() throws Exception {
protected void handleWindowEvent(WindowEvent<T> windowEvent, WindowBuffer<T> buffer) protected void handleWindowEvent(WindowEvent<T> windowEvent, WindowBuffer<T> buffer)
throws Exception { throws Exception {
if (windowEvent.isElement()) { if (windowEvent.isElement()) {
System.out.println("element: " + windowEvent.getElement());
buffer.store(windowEvent.getElement()); buffer.store(windowEvent.getElement());
} else if (windowEvent.isEviction()) { } else if (windowEvent.isEviction()) {
System.out.println("eviction: " + windowEvent.getEviction());
buffer.evict(windowEvent.getEviction()); buffer.evict(windowEvent.getEviction());
} else if (windowEvent.isTrigger()) { } else if (windowEvent.isTrigger()) {
System.out.println("trigger");
buffer.emitWindow(collector); buffer.emitWindow(collector);
} }
} }
Expand Down
Expand Up @@ -172,6 +172,10 @@ public StreamWindow<T> setNumberOfParts(int n) {
return this; return this;
} }


public void setID(int id) {
this.windowID = id;
}

/** /**
* Checks whether this window can be merged with the given one. * Checks whether this window can be merged with the given one.
* *
Expand Down
Expand Up @@ -19,6 +19,7 @@


import org.apache.flink.api.common.functions.Function; import org.apache.flink.api.common.functions.Function;
import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.windowing.helper.TimestampWrapper;
import org.apache.flink.streaming.api.windowing.policy.CountEvictionPolicy; import org.apache.flink.streaming.api.windowing.policy.CountEvictionPolicy;
import org.apache.flink.streaming.api.windowing.policy.CountTriggerPolicy; import org.apache.flink.streaming.api.windowing.policy.CountTriggerPolicy;
import org.apache.flink.streaming.api.windowing.policy.EvictionPolicy; import org.apache.flink.streaming.api.windowing.policy.EvictionPolicy;
Expand Down Expand Up @@ -48,10 +49,102 @@ public static boolean isParallelPolicy(TriggerPolicy<?> trigger, EvictionPolicy<
|| (eviction instanceof TumblingEvictionPolicy && trigger instanceof CountTriggerPolicy); || (eviction instanceof TumblingEvictionPolicy && trigger instanceof CountTriggerPolicy);
} }


public static boolean isSlidingTimePolicy(TriggerPolicy<?> trigger, EvictionPolicy<?> eviction) {
if (isTimeOnly(trigger, eviction)) {
long slide = getSlideSize(trigger);
long window = getWindowSize(eviction);

return slide < window
&& getTimeStampWrapper(trigger).equals(getTimeStampWrapper(eviction));
} else {
return false;
}
}

public static boolean isSlidingCountPolicy(TriggerPolicy<?> trigger, EvictionPolicy<?> eviction) {
if (isCountOnly(trigger, eviction)) {
long slide = getSlideSize(trigger);
long window = getWindowSize(eviction);

return slide < window
&& ((CountTriggerPolicy<?>) trigger).getStart() == ((CountEvictionPolicy<?>) eviction)
.getStart()
&& ((CountEvictionPolicy<?>) eviction).getDeleteOnEviction() == 1;
} else {
return false;
}
}

public static <X> TimestampWrapper<X> getTimeStampWrapper(TriggerPolicy<X> trigger) {
if (trigger instanceof TimeTriggerPolicy) {
return ((TimeTriggerPolicy<X>) trigger).getTimeStampWrapper();
} else {
throw new IllegalArgumentException(
"Timestamp wrapper can only be accessed for time policies");
}
}

public static <X> TimestampWrapper<X> getTimeStampWrapper(EvictionPolicy<X> eviction) {
if (eviction instanceof EvictionPolicy) {
return ((TimeEvictionPolicy<X>) eviction).getTimeStampWrapper();
} else {
throw new IllegalArgumentException(
"Timestamp wrapper can only be accessed for time policies");
}
}

public static long getSlideSize(TriggerPolicy<?> trigger) {
if (trigger instanceof TimeTriggerPolicy) {
return ((TimeTriggerPolicy<?>) trigger).getSlideSize();
} else if (trigger instanceof CountTriggerPolicy) {
return ((CountTriggerPolicy<?>) trigger).getSlideSize();
} else {
throw new IllegalArgumentException(
"Slide size can only be accessed for time or count policies");
}
}

public static long getWindowSize(EvictionPolicy<?> eviction) {
if (eviction instanceof TimeEvictionPolicy) {
return ((TimeEvictionPolicy<?>) eviction).getWindowSize();
} else if (eviction instanceof CountEvictionPolicy) {
return ((CountEvictionPolicy<?>) eviction).getWindowSize();
} else {
throw new IllegalArgumentException(
"Window size can only be accessed for time or count policies");
}
}

public static boolean isTumblingPolicy(TriggerPolicy<?> trigger, EvictionPolicy<?> eviction) {
if (eviction instanceof TumblingEvictionPolicy) {
return true;
} else if (isTimeOnly(trigger, eviction)) {
long slide = getSlideSize(trigger);
long window = getWindowSize(eviction);

return slide == window
&& getTimeStampWrapper(trigger).equals(getTimeStampWrapper(eviction));
} else if (isCountOnly(trigger, eviction)) {
long slide = getSlideSize(trigger);
long window = getWindowSize(eviction);

return slide == window
&& ((CountTriggerPolicy<?>) trigger).getStart() == ((CountEvictionPolicy<?>) eviction)
.getStart()
&& ((CountEvictionPolicy<?>) eviction).getDeleteOnEviction() == 1;
} else {
return false;
}
}

public static boolean isTimeOnly(TriggerPolicy<?> trigger, EvictionPolicy<?> eviction) { public static boolean isTimeOnly(TriggerPolicy<?> trigger, EvictionPolicy<?> eviction) {
return trigger instanceof TimeTriggerPolicy && eviction instanceof TimeEvictionPolicy; return trigger instanceof TimeTriggerPolicy && eviction instanceof TimeEvictionPolicy;
} }


public static boolean isCountOnly(TriggerPolicy<?> trigger, EvictionPolicy<?> eviction) {
return trigger instanceof CountTriggerPolicy && eviction instanceof CountEvictionPolicy;
}

public static boolean isSystemTimeTrigger(TriggerPolicy<?> trigger) { public static boolean isSystemTimeTrigger(TriggerPolicy<?> trigger) {
return trigger instanceof TimeTriggerPolicy return trigger instanceof TimeTriggerPolicy
&& ((TimeTriggerPolicy<?>) trigger).timestampWrapper.isDefaultTimestamp(); && ((TimeTriggerPolicy<?>) trigger).timestampWrapper.isDefaultTimestamp();
Expand Down
Expand Up @@ -80,7 +80,6 @@ private Time(long length, TimeUnit timeUnit, TimestampWrapper<DATA> timestampWra
this.length = length; this.length = length;
this.granularity = timeUnit; this.granularity = timeUnit;
this.timestampWrapper = timestampWrapper; this.timestampWrapper = timestampWrapper;
this.delay = 0;
} }


@Override @Override
Expand All @@ -90,7 +89,7 @@ public EvictionPolicy<DATA> toEvict() {


@Override @Override
public TriggerPolicy<DATA> toTrigger() { public TriggerPolicy<DATA> toTrigger() {
return new TimeTriggerPolicy<DATA>(granularityInMillis(), timestampWrapper, delay); return new TimeTriggerPolicy<DATA>(granularityInMillis(), timestampWrapper);
} }


/** /**
Expand Down Expand Up @@ -147,19 +146,7 @@ public static <DATA> Time<DATA> of(long length, Timestamp<DATA> timestamp, long
public static <DATA> Time<DATA> of(long length, Timestamp<DATA> timestamp) { public static <DATA> Time<DATA> of(long length, Timestamp<DATA> timestamp) {
return of(length, timestamp, 0); return of(length, timestamp, 0);
} }


/**
* Sets the delay for the first processed window.
*
* @param delay
* The number of time units before the first processed window.
* @return Helper representing the time based trigger and eviction policy
*/
public Time<DATA> withDelay(long delay) {
this.delay = delay;
return this;
}

protected long granularityInMillis() { protected long granularityInMillis() {
return granularity == null ? length : granularity.toMillis(length); return granularity == null ? length : granularity.toMillis(length);
} }
Expand Down
Expand Up @@ -143,6 +143,10 @@ public int getWindowSize() {
public int getStart() { public int getStart() {
return startValue; return startValue;
} }

public int getDeleteOnEviction(){
return deleteOnEviction;
}


@Override @Override
public String toString() { public String toString() {
Expand Down
Expand Up @@ -105,6 +105,10 @@ public boolean equals(Object other) {
public int getSlideSize() { public int getSlideSize() {
return max; return max;
} }

public int getStart() {
return startValue;
}


@Override @Override
public String toString() { public String toString() {
Expand Down

0 comments on commit aacd4f2

Please sign in to comment.