Skip to content

Commit

Permalink
[FLINK-2550] Change Window API constructs to use Time instead of long
Browse files Browse the repository at this point in the history
This covers assigners/triggers/evictors.
  • Loading branch information
aljoscha committed Oct 5, 2015
1 parent 833b347 commit 8c2c769
Show file tree
Hide file tree
Showing 18 changed files with 144 additions and 115 deletions.
Expand Up @@ -741,9 +741,9 @@ public AllWindowedStream<T, TimeWindow> timeWindowAll(AbstractTime size) {
AbstractTime actualSize = size.makeSpecificBasedOnTimeCharacteristic(environment.getStreamTimeCharacteristic());

if (actualSize instanceof EventTime) {
return windowAll(TumblingTimeWindows.of(actualSize.toMilliseconds()));
return windowAll(TumblingTimeWindows.of(actualSize));
} else {
return windowAll(TumblingProcessingTimeWindows.of(actualSize.toMilliseconds()));
return windowAll(TumblingProcessingTimeWindows.of(actualSize));
}
}

Expand All @@ -763,11 +763,9 @@ public AllWindowedStream<T, TimeWindow> timeWindowAll(AbstractTime size, Abstrac
AbstractTime actualSlide = slide.makeSpecificBasedOnTimeCharacteristic(environment.getStreamTimeCharacteristic());

if (actualSize instanceof EventTime) {
return windowAll(SlidingTimeWindows.of(actualSize.toMilliseconds(),
actualSlide.toMilliseconds()));
return windowAll(SlidingTimeWindows.of(size, slide));
} else {
return windowAll(SlidingProcessingTimeWindows.of(actualSize.toMilliseconds(),
actualSlide.toMilliseconds()));
return windowAll(SlidingProcessingTimeWindows.of(actualSize, actualSlide));
}
}

Expand Down
Expand Up @@ -125,9 +125,9 @@ public WindowedStream<T, KEY, TimeWindow> timeWindow(AbstractTime size) {
AbstractTime actualSize = size.makeSpecificBasedOnTimeCharacteristic(environment.getStreamTimeCharacteristic());

if (actualSize instanceof EventTime) {
return window(TumblingTimeWindows.of(actualSize.toMilliseconds()));
return window(TumblingTimeWindows.of(actualSize));
} else {
return window(TumblingProcessingTimeWindows.of(actualSize.toMilliseconds()));
return window(TumblingProcessingTimeWindows.of(actualSize));
}
}

Expand All @@ -147,9 +147,9 @@ public WindowedStream<T, KEY, TimeWindow> timeWindow(AbstractTime size, Abstract
AbstractTime actualSlide = slide.makeSpecificBasedOnTimeCharacteristic(environment.getStreamTimeCharacteristic());

if (actualSize instanceof EventTime) {
return window(SlidingTimeWindows.of(actualSize.toMilliseconds(), actualSlide.toMilliseconds()));
return window(SlidingTimeWindows.of(actualSize, actualSlide));
} else {
return window(SlidingProcessingTimeWindows.of(actualSize.toMilliseconds(), actualSlide.toMilliseconds()));
return window(SlidingProcessingTimeWindows.of(actualSize, actualSlide));
}
}

Expand Down
Expand Up @@ -18,6 +18,7 @@
package org.apache.flink.streaming.api.windowing.assigners;

import com.google.common.collect.Lists;
import org.apache.flink.streaming.api.windowing.time.AbstractTime;
import org.apache.flink.streaming.api.windowing.triggers.ProcessingTimeTrigger;
import org.apache.flink.streaming.api.windowing.triggers.Trigger;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
Expand Down Expand Up @@ -86,7 +87,7 @@ public String toString() {
* @param slide The slide interval of the generated windows.
* @return The time policy.
*/
public static SlidingProcessingTimeWindows of(long size, long slide) {
return new SlidingProcessingTimeWindows(size, slide);
public static SlidingProcessingTimeWindows of(AbstractTime size, AbstractTime slide) {
return new SlidingProcessingTimeWindows(size.toMilliseconds(), slide.toMilliseconds());
}
}
Expand Up @@ -17,6 +17,7 @@
*/
package org.apache.flink.streaming.api.windowing.assigners;

import org.apache.flink.streaming.api.windowing.time.AbstractTime;
import org.apache.flink.streaming.api.windowing.triggers.Trigger;
import org.apache.flink.streaming.api.windowing.triggers.WatermarkTrigger;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
Expand Down Expand Up @@ -75,7 +76,7 @@ public String toString() {
* @param slide The slide interval of the generated windows.
* @return The time policy.
*/
public static SlidingTimeWindows of(long size, long slide) {
return new SlidingTimeWindows(size, slide);
public static SlidingTimeWindows of(AbstractTime size, AbstractTime slide) {
return new SlidingTimeWindows(size.toMilliseconds(), slide.toMilliseconds());
}
}
Expand Up @@ -17,6 +17,7 @@
*/
package org.apache.flink.streaming.api.windowing.assigners;

import org.apache.flink.streaming.api.windowing.time.AbstractTime;
import org.apache.flink.streaming.api.windowing.triggers.ProcessingTimeTrigger;
import org.apache.flink.streaming.api.windowing.triggers.Trigger;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
Expand Down Expand Up @@ -61,7 +62,7 @@ public String toString() {
* @param size The size of the generated windows.
* @return The time policy.
*/
public static TumblingProcessingTimeWindows of(long size) {
return new TumblingProcessingTimeWindows(size);
public static TumblingProcessingTimeWindows of(AbstractTime size) {
return new TumblingProcessingTimeWindows(size.toMilliseconds());
}
}
Expand Up @@ -17,6 +17,7 @@
*/
package org.apache.flink.streaming.api.windowing.assigners;

import org.apache.flink.streaming.api.windowing.time.AbstractTime;
import org.apache.flink.streaming.api.windowing.triggers.Trigger;
import org.apache.flink.streaming.api.windowing.triggers.WatermarkTrigger;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
Expand Down Expand Up @@ -60,8 +61,8 @@ public String toString() {
* @param size The size of the generated windows.
* @return The time policy.
*/
public static TumblingTimeWindows of(long size) {
return new TumblingTimeWindows(size);
public static TumblingTimeWindows of(AbstractTime size) {
return new TumblingTimeWindows(size.toMilliseconds());
}

}
Expand Up @@ -18,6 +18,7 @@
package org.apache.flink.streaming.api.windowing.evictors;

import com.google.common.annotations.VisibleForTesting;
import org.apache.flink.streaming.api.windowing.time.AbstractTime;
import org.apache.flink.streaming.api.windowing.windows.Window;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;

Expand Down Expand Up @@ -54,7 +55,7 @@ public long getWindowSize() {
return windowSize;
}

public static <W extends Window> TimeEvictor<W> of(long windowSize) {
return new TimeEvictor<>(windowSize);
public static <W extends Window> TimeEvictor<W> of(AbstractTime windowSize) {
return new TimeEvictor<>(windowSize.toMilliseconds());
}
}
Expand Up @@ -24,6 +24,9 @@

import static com.google.common.base.Preconditions.checkNotNull;

/**
* Base class for {@link Time} implementations.
*/
public abstract class AbstractTime {

/** The time unit for this policy's time interval */
Expand Down
Expand Up @@ -18,32 +18,33 @@
package org.apache.flink.streaming.api.windowing.triggers;

import com.google.common.annotations.VisibleForTesting;
import org.apache.flink.streaming.api.windowing.time.AbstractTime;
import org.apache.flink.streaming.api.windowing.windows.Window;

public class ContinuousProcessingTimeTrigger<W extends Window> implements Trigger<Object, W> {
private static final long serialVersionUID = 1L;

private long granularity;
private long interval;

private long nextFireTimestamp = 0;

private ContinuousProcessingTimeTrigger(long granularity) {
this.granularity = granularity;
private ContinuousProcessingTimeTrigger(long interval) {
this.interval = interval;
}

@Override
public TriggerResult onElement(Object element, long timestamp, W window, TriggerContext ctx) {
long currentTime = System.currentTimeMillis();
if (nextFireTimestamp == 0) {
long start = currentTime - (currentTime % granularity);
nextFireTimestamp = start + granularity;
long start = currentTime - (currentTime % interval);
nextFireTimestamp = start + interval;

ctx.registerProcessingTimeTimer(nextFireTimestamp);
return TriggerResult.CONTINUE;
}
if (currentTime > nextFireTimestamp) {
long start = currentTime - (currentTime % granularity);
nextFireTimestamp = start + granularity;
long start = currentTime - (currentTime % interval);
nextFireTimestamp = start + interval;

ctx.registerProcessingTimeTimer(nextFireTimestamp);

Expand All @@ -57,29 +58,29 @@ public TriggerResult onTime(long time, TriggerContext ctx) {
// only fire if an element didn't already fire
long currentTime = System.currentTimeMillis();
if (currentTime > nextFireTimestamp) {
long start = currentTime - (currentTime % granularity);
nextFireTimestamp = start + granularity;
long start = currentTime - (currentTime % interval);
nextFireTimestamp = start + interval;
return TriggerResult.FIRE;
}
return TriggerResult.CONTINUE;
}

@Override
public Trigger<Object, W> duplicate() {
return new ContinuousProcessingTimeTrigger<>(granularity);
return new ContinuousProcessingTimeTrigger<>(interval);
}

@VisibleForTesting
public long getGranularity() {
return granularity;
public long getInterval() {
return interval;
}

@Override
public String toString() {
return "ContinuousProcessingTimeTrigger(" + granularity + ")";
return "ContinuousProcessingTimeTrigger(" + interval + ")";
}

public static <W extends Window> ContinuousProcessingTimeTrigger<W> of(long granularity) {
return new ContinuousProcessingTimeTrigger<>(granularity);
public static <W extends Window> ContinuousProcessingTimeTrigger<W> of(AbstractTime interval) {
return new ContinuousProcessingTimeTrigger<>(interval.toMilliseconds());
}
}
Expand Up @@ -18,24 +18,25 @@
package org.apache.flink.streaming.api.windowing.triggers;

import com.google.common.annotations.VisibleForTesting;
import org.apache.flink.streaming.api.windowing.time.AbstractTime;
import org.apache.flink.streaming.api.windowing.windows.Window;

public class ContinuousWatermarkTrigger<W extends Window> implements Trigger<Object, W> {
private static final long serialVersionUID = 1L;

private long granularity;
private long interval;

private boolean first = true;

private ContinuousWatermarkTrigger(long granularity) {
this.granularity = granularity;
private ContinuousWatermarkTrigger(long interval) {
this.interval = interval;
}

@Override
public TriggerResult onElement(Object element, long timestamp, W window, TriggerContext ctx) {
if (first) {
long start = timestamp - (timestamp % granularity);
long nextFireTimestamp = start + granularity;
long start = timestamp - (timestamp % interval);
long nextFireTimestamp = start + interval;

ctx.registerWatermarkTimer(nextFireTimestamp);
first = false;
Expand All @@ -46,26 +47,26 @@ public TriggerResult onElement(Object element, long timestamp, W window, Trigger

@Override
public TriggerResult onTime(long time, TriggerContext ctx) {
ctx.registerWatermarkTimer(time + granularity);
ctx.registerWatermarkTimer(time + interval);
return TriggerResult.FIRE;
}

@Override
public Trigger<Object, W> duplicate() {
return new ContinuousWatermarkTrigger<>(granularity);
return new ContinuousWatermarkTrigger<>(interval);
}

@Override
public String toString() {
return "ContinuousProcessingTimeTrigger(" + granularity + ")";
return "ContinuousProcessingTimeTrigger(" + interval + ")";
}

@VisibleForTesting
public long getGranularity() {
return granularity;
public long getInterval() {
return interval;
}

public static <W extends Window> ContinuousWatermarkTrigger<W> of(long granularity) {
return new ContinuousWatermarkTrigger<>(granularity);
public static <W extends Window> ContinuousWatermarkTrigger<W> of(AbstractTime interval) {
return new ContinuousWatermarkTrigger<>(interval.toMilliseconds());
}
}
Expand Up @@ -28,6 +28,7 @@
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.evictors.CountEvictor;
import org.apache.flink.streaming.api.windowing.evictors.TimeEvictor;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.triggers.CountTrigger;
import org.apache.flink.streaming.api.windowing.triggers.ProcessingTimeTrigger;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
Expand All @@ -39,6 +40,8 @@
import org.junit.Ignore;
import org.junit.Test;

import java.util.concurrent.TimeUnit;

/**
* These tests verify that the api calls on
* {@link org.apache.flink.streaming.api.datastream.AllWindowedStream} instantiate
Expand All @@ -62,15 +65,15 @@ public void testFastTimeWindows() throws Exception {
DummyReducer reducer = new DummyReducer();

DataStream<Tuple2<String, Integer>> window1 = source
.windowAll(SlidingProcessingTimeWindows.of(1000, 100))
.windowAll(SlidingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS)))
.reduceWindow(reducer);

OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform1 = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window1.getTransformation();
OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator1 = transform1.getOperator();
Assert.assertTrue(operator1 instanceof AggregatingProcessingTimeWindowOperator);

DataStream<Tuple2<String, Integer>> window2 = source
.windowAll(SlidingProcessingTimeWindows.of(1000, 100))
.windowAll(SlidingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS)))
.apply(new AllWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow>() {
private static final long serialVersionUID = 1L;

Expand Down Expand Up @@ -98,7 +101,7 @@ public void testNonEvicting() throws Exception {
DummyReducer reducer = new DummyReducer();

DataStream<Tuple2<String, Integer>> window1 = source
.windowAll(SlidingProcessingTimeWindows.of(1000, 100))
.windowAll(SlidingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS)))
.trigger(CountTrigger.of(100))
.reduceWindow(reducer);

Expand All @@ -111,7 +114,7 @@ public void testNonEvicting() throws Exception {
Assert.assertTrue(winOperator1.getWindowBufferFactory() instanceof PreAggregatingHeapWindowBuffer.Factory);

DataStream<Tuple2<String, Integer>> window2 = source
.windowAll(TumblingProcessingTimeWindows.of(1000))
.windowAll(TumblingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
.trigger(CountTrigger.of(100))
.apply(new AllWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow>() {
private static final long serialVersionUID = 1L;
Expand Down Expand Up @@ -144,7 +147,7 @@ public void testEvicting() throws Exception {
DummyReducer reducer = new DummyReducer();

DataStream<Tuple2<String, Integer>> window1 = source
.windowAll(SlidingProcessingTimeWindows.of(1000, 100))
.windowAll(SlidingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS)))
.evictor(CountEvictor.of(100))
.reduceWindow(reducer);

Expand All @@ -158,9 +161,9 @@ public void testEvicting() throws Exception {
Assert.assertTrue(winOperator1.getWindowBufferFactory() instanceof HeapWindowBuffer.Factory);

DataStream<Tuple2<String, Integer>> window2 = source
.windowAll(TumblingProcessingTimeWindows.of(1000))
.windowAll(TumblingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
.trigger(CountTrigger.of(100))
.evictor(TimeEvictor.of(100))
.evictor(TimeEvictor.of(Time.of(100, TimeUnit.MILLISECONDS)))
.apply(new AllWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow>() {
private static final long serialVersionUID = 1L;

Expand Down

0 comments on commit 8c2c769

Please sign in to comment.