From 8c2c76947a9a26e25d4539068b5253b265c71c23 Mon Sep 17 00:00:00 2001 From: Aljoscha Krettek Date: Sat, 3 Oct 2015 16:39:13 +0200 Subject: [PATCH] [FLINK-2550] Change Window API constructs to use Time instead of long This covers assigners/triggers/evictors. --- .../streaming/api/datastream/DataStream.java | 10 +++--- .../streaming/api/datastream/KeyedStream.java | 8 ++--- .../SlidingProcessingTimeWindows.java | 5 +-- .../assigners/SlidingTimeWindows.java | 5 +-- .../TumblingProcessingTimeWindows.java | 5 +-- .../assigners/TumblingTimeWindows.java | 5 +-- .../api/windowing/evictors/TimeEvictor.java | 5 +-- .../api/windowing/time/AbstractTime.java | 3 ++ .../ContinuousProcessingTimeTrigger.java | 31 ++++++++--------- .../triggers/ContinuousWatermarkTrigger.java | 25 +++++++------- .../windowing/AllWindowTranslationTest.java | 17 ++++++---- .../windowing/NonKeyedWindowOperatorTest.java | 25 +++++--------- .../windowing/WindowOperatorTest.java | 16 +++++---- .../windowing/WindowTranslationTest.java | 18 ++++++---- .../streaming/api/scala/DataStream.scala | 12 +++---- .../streaming/api/scala/KeyedStream.scala | 12 +++---- .../api/scala/AllWindowTranslationTest.scala | 33 ++++++++++++------- .../api/scala/WindowTranslationTest.scala | 24 ++++++++++---- 18 files changed, 144 insertions(+), 115 deletions(-) diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java index 6d88416ab9052..32d9012dbc06c 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java @@ -741,9 +741,9 @@ public AllWindowedStream timeWindowAll(AbstractTime size) { AbstractTime actualSize = size.makeSpecificBasedOnTimeCharacteristic(environment.getStreamTimeCharacteristic()); if (actualSize instanceof EventTime) { - return windowAll(TumblingTimeWindows.of(actualSize.toMilliseconds())); + return windowAll(TumblingTimeWindows.of(actualSize)); } else { - return windowAll(TumblingProcessingTimeWindows.of(actualSize.toMilliseconds())); + return windowAll(TumblingProcessingTimeWindows.of(actualSize)); } } @@ -763,11 +763,9 @@ public AllWindowedStream timeWindowAll(AbstractTime size, Abstrac AbstractTime actualSlide = slide.makeSpecificBasedOnTimeCharacteristic(environment.getStreamTimeCharacteristic()); if (actualSize instanceof EventTime) { - return windowAll(SlidingTimeWindows.of(actualSize.toMilliseconds(), - actualSlide.toMilliseconds())); + return windowAll(SlidingTimeWindows.of(size, slide)); } else { - return windowAll(SlidingProcessingTimeWindows.of(actualSize.toMilliseconds(), - actualSlide.toMilliseconds())); + return windowAll(SlidingProcessingTimeWindows.of(actualSize, actualSlide)); } } diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java index 265886b06ac48..f7c5b53238e12 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java @@ -125,9 +125,9 @@ public WindowedStream timeWindow(AbstractTime size) { AbstractTime actualSize = size.makeSpecificBasedOnTimeCharacteristic(environment.getStreamTimeCharacteristic()); if (actualSize instanceof EventTime) { - return window(TumblingTimeWindows.of(actualSize.toMilliseconds())); + return window(TumblingTimeWindows.of(actualSize)); } else { - return window(TumblingProcessingTimeWindows.of(actualSize.toMilliseconds())); + return window(TumblingProcessingTimeWindows.of(actualSize)); } } @@ -147,9 +147,9 @@ public WindowedStream timeWindow(AbstractTime size, Abstract AbstractTime actualSlide = slide.makeSpecificBasedOnTimeCharacteristic(environment.getStreamTimeCharacteristic()); if (actualSize instanceof EventTime) { - return window(SlidingTimeWindows.of(actualSize.toMilliseconds(), actualSlide.toMilliseconds())); + return window(SlidingTimeWindows.of(actualSize, actualSlide)); } else { - return window(SlidingProcessingTimeWindows.of(actualSize.toMilliseconds(), actualSlide.toMilliseconds())); + return window(SlidingProcessingTimeWindows.of(actualSize, actualSlide)); } } diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingProcessingTimeWindows.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingProcessingTimeWindows.java index a2d95c2d25c5e..6fc79b05df17b 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingProcessingTimeWindows.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingProcessingTimeWindows.java @@ -18,6 +18,7 @@ package org.apache.flink.streaming.api.windowing.assigners; import com.google.common.collect.Lists; +import org.apache.flink.streaming.api.windowing.time.AbstractTime; import org.apache.flink.streaming.api.windowing.triggers.ProcessingTimeTrigger; import org.apache.flink.streaming.api.windowing.triggers.Trigger; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; @@ -86,7 +87,7 @@ public String toString() { * @param slide The slide interval of the generated windows. * @return The time policy. */ - public static SlidingProcessingTimeWindows of(long size, long slide) { - return new SlidingProcessingTimeWindows(size, slide); + public static SlidingProcessingTimeWindows of(AbstractTime size, AbstractTime slide) { + return new SlidingProcessingTimeWindows(size.toMilliseconds(), slide.toMilliseconds()); } } diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingTimeWindows.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingTimeWindows.java index cb5a7a1eed2ff..49bff057fe63d 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingTimeWindows.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingTimeWindows.java @@ -17,6 +17,7 @@ */ package org.apache.flink.streaming.api.windowing.assigners; +import org.apache.flink.streaming.api.windowing.time.AbstractTime; import org.apache.flink.streaming.api.windowing.triggers.Trigger; import org.apache.flink.streaming.api.windowing.triggers.WatermarkTrigger; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; @@ -75,7 +76,7 @@ public String toString() { * @param slide The slide interval of the generated windows. * @return The time policy. */ - public static SlidingTimeWindows of(long size, long slide) { - return new SlidingTimeWindows(size, slide); + public static SlidingTimeWindows of(AbstractTime size, AbstractTime slide) { + return new SlidingTimeWindows(size.toMilliseconds(), slide.toMilliseconds()); } } diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingProcessingTimeWindows.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingProcessingTimeWindows.java index b1ef857c900c9..1f2eebf0d2b88 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingProcessingTimeWindows.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingProcessingTimeWindows.java @@ -17,6 +17,7 @@ */ package org.apache.flink.streaming.api.windowing.assigners; +import org.apache.flink.streaming.api.windowing.time.AbstractTime; import org.apache.flink.streaming.api.windowing.triggers.ProcessingTimeTrigger; import org.apache.flink.streaming.api.windowing.triggers.Trigger; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; @@ -61,7 +62,7 @@ public String toString() { * @param size The size of the generated windows. * @return The time policy. */ - public static TumblingProcessingTimeWindows of(long size) { - return new TumblingProcessingTimeWindows(size); + public static TumblingProcessingTimeWindows of(AbstractTime size) { + return new TumblingProcessingTimeWindows(size.toMilliseconds()); } } diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingTimeWindows.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingTimeWindows.java index d19c97d7a7913..019f45b71895f 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingTimeWindows.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingTimeWindows.java @@ -17,6 +17,7 @@ */ package org.apache.flink.streaming.api.windowing.assigners; +import org.apache.flink.streaming.api.windowing.time.AbstractTime; import org.apache.flink.streaming.api.windowing.triggers.Trigger; import org.apache.flink.streaming.api.windowing.triggers.WatermarkTrigger; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; @@ -60,8 +61,8 @@ public String toString() { * @param size The size of the generated windows. * @return The time policy. */ - public static TumblingTimeWindows of(long size) { - return new TumblingTimeWindows(size); + public static TumblingTimeWindows of(AbstractTime size) { + return new TumblingTimeWindows(size.toMilliseconds()); } } diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/evictors/TimeEvictor.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/evictors/TimeEvictor.java index c38100ce897d3..2965214fea1b7 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/evictors/TimeEvictor.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/evictors/TimeEvictor.java @@ -18,6 +18,7 @@ package org.apache.flink.streaming.api.windowing.evictors; import com.google.common.annotations.VisibleForTesting; +import org.apache.flink.streaming.api.windowing.time.AbstractTime; import org.apache.flink.streaming.api.windowing.windows.Window; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; @@ -54,7 +55,7 @@ public long getWindowSize() { return windowSize; } - public static TimeEvictor of(long windowSize) { - return new TimeEvictor<>(windowSize); + public static TimeEvictor of(AbstractTime windowSize) { + return new TimeEvictor<>(windowSize.toMilliseconds()); } } diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/time/AbstractTime.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/time/AbstractTime.java index 1264c2a5ddc5a..3f8fb606dc407 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/time/AbstractTime.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/time/AbstractTime.java @@ -24,6 +24,9 @@ import static com.google.common.base.Preconditions.checkNotNull; +/** + * Base class for {@link Time} implementations. + */ public abstract class AbstractTime { /** The time unit for this policy's time interval */ diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousProcessingTimeTrigger.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousProcessingTimeTrigger.java index da198be7f2996..24e8ce332300a 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousProcessingTimeTrigger.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousProcessingTimeTrigger.java @@ -18,32 +18,33 @@ package org.apache.flink.streaming.api.windowing.triggers; import com.google.common.annotations.VisibleForTesting; +import org.apache.flink.streaming.api.windowing.time.AbstractTime; import org.apache.flink.streaming.api.windowing.windows.Window; public class ContinuousProcessingTimeTrigger implements Trigger { private static final long serialVersionUID = 1L; - private long granularity; + private long interval; private long nextFireTimestamp = 0; - private ContinuousProcessingTimeTrigger(long granularity) { - this.granularity = granularity; + private ContinuousProcessingTimeTrigger(long interval) { + this.interval = interval; } @Override public TriggerResult onElement(Object element, long timestamp, W window, TriggerContext ctx) { long currentTime = System.currentTimeMillis(); if (nextFireTimestamp == 0) { - long start = currentTime - (currentTime % granularity); - nextFireTimestamp = start + granularity; + long start = currentTime - (currentTime % interval); + nextFireTimestamp = start + interval; ctx.registerProcessingTimeTimer(nextFireTimestamp); return TriggerResult.CONTINUE; } if (currentTime > nextFireTimestamp) { - long start = currentTime - (currentTime % granularity); - nextFireTimestamp = start + granularity; + long start = currentTime - (currentTime % interval); + nextFireTimestamp = start + interval; ctx.registerProcessingTimeTimer(nextFireTimestamp); @@ -57,8 +58,8 @@ public TriggerResult onTime(long time, TriggerContext ctx) { // only fire if an element didn't already fire long currentTime = System.currentTimeMillis(); if (currentTime > nextFireTimestamp) { - long start = currentTime - (currentTime % granularity); - nextFireTimestamp = start + granularity; + long start = currentTime - (currentTime % interval); + nextFireTimestamp = start + interval; return TriggerResult.FIRE; } return TriggerResult.CONTINUE; @@ -66,20 +67,20 @@ public TriggerResult onTime(long time, TriggerContext ctx) { @Override public Trigger duplicate() { - return new ContinuousProcessingTimeTrigger<>(granularity); + return new ContinuousProcessingTimeTrigger<>(interval); } @VisibleForTesting - public long getGranularity() { - return granularity; + public long getInterval() { + return interval; } @Override public String toString() { - return "ContinuousProcessingTimeTrigger(" + granularity + ")"; + return "ContinuousProcessingTimeTrigger(" + interval + ")"; } - public static ContinuousProcessingTimeTrigger of(long granularity) { - return new ContinuousProcessingTimeTrigger<>(granularity); + public static ContinuousProcessingTimeTrigger of(AbstractTime interval) { + return new ContinuousProcessingTimeTrigger<>(interval.toMilliseconds()); } } diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousWatermarkTrigger.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousWatermarkTrigger.java index 3b6dc6d9644fe..e11cebae9969d 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousWatermarkTrigger.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousWatermarkTrigger.java @@ -18,24 +18,25 @@ package org.apache.flink.streaming.api.windowing.triggers; import com.google.common.annotations.VisibleForTesting; +import org.apache.flink.streaming.api.windowing.time.AbstractTime; import org.apache.flink.streaming.api.windowing.windows.Window; public class ContinuousWatermarkTrigger implements Trigger { private static final long serialVersionUID = 1L; - private long granularity; + private long interval; private boolean first = true; - private ContinuousWatermarkTrigger(long granularity) { - this.granularity = granularity; + private ContinuousWatermarkTrigger(long interval) { + this.interval = interval; } @Override public TriggerResult onElement(Object element, long timestamp, W window, TriggerContext ctx) { if (first) { - long start = timestamp - (timestamp % granularity); - long nextFireTimestamp = start + granularity; + long start = timestamp - (timestamp % interval); + long nextFireTimestamp = start + interval; ctx.registerWatermarkTimer(nextFireTimestamp); first = false; @@ -46,26 +47,26 @@ public TriggerResult onElement(Object element, long timestamp, W window, Trigger @Override public TriggerResult onTime(long time, TriggerContext ctx) { - ctx.registerWatermarkTimer(time + granularity); + ctx.registerWatermarkTimer(time + interval); return TriggerResult.FIRE; } @Override public Trigger duplicate() { - return new ContinuousWatermarkTrigger<>(granularity); + return new ContinuousWatermarkTrigger<>(interval); } @Override public String toString() { - return "ContinuousProcessingTimeTrigger(" + granularity + ")"; + return "ContinuousProcessingTimeTrigger(" + interval + ")"; } @VisibleForTesting - public long getGranularity() { - return granularity; + public long getInterval() { + return interval; } - public static ContinuousWatermarkTrigger of(long granularity) { - return new ContinuousWatermarkTrigger<>(granularity); + public static ContinuousWatermarkTrigger of(AbstractTime interval) { + return new ContinuousWatermarkTrigger<>(interval.toMilliseconds()); } } diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AllWindowTranslationTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AllWindowTranslationTest.java index ee8c6d6e1909f..767b40c33c76f 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AllWindowTranslationTest.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AllWindowTranslationTest.java @@ -28,6 +28,7 @@ import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows; import org.apache.flink.streaming.api.windowing.evictors.CountEvictor; import org.apache.flink.streaming.api.windowing.evictors.TimeEvictor; +import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.triggers.CountTrigger; import org.apache.flink.streaming.api.windowing.triggers.ProcessingTimeTrigger; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; @@ -39,6 +40,8 @@ import org.junit.Ignore; import org.junit.Test; +import java.util.concurrent.TimeUnit; + /** * These tests verify that the api calls on * {@link org.apache.flink.streaming.api.datastream.AllWindowedStream} instantiate @@ -62,7 +65,7 @@ public void testFastTimeWindows() throws Exception { DummyReducer reducer = new DummyReducer(); DataStream> window1 = source - .windowAll(SlidingProcessingTimeWindows.of(1000, 100)) + .windowAll(SlidingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS))) .reduceWindow(reducer); OneInputTransformation, Tuple2> transform1 = (OneInputTransformation, Tuple2>) window1.getTransformation(); @@ -70,7 +73,7 @@ public void testFastTimeWindows() throws Exception { Assert.assertTrue(operator1 instanceof AggregatingProcessingTimeWindowOperator); DataStream> window2 = source - .windowAll(SlidingProcessingTimeWindows.of(1000, 100)) + .windowAll(SlidingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS))) .apply(new AllWindowFunction, Tuple2, TimeWindow>() { private static final long serialVersionUID = 1L; @@ -98,7 +101,7 @@ public void testNonEvicting() throws Exception { DummyReducer reducer = new DummyReducer(); DataStream> window1 = source - .windowAll(SlidingProcessingTimeWindows.of(1000, 100)) + .windowAll(SlidingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS))) .trigger(CountTrigger.of(100)) .reduceWindow(reducer); @@ -111,7 +114,7 @@ public void testNonEvicting() throws Exception { Assert.assertTrue(winOperator1.getWindowBufferFactory() instanceof PreAggregatingHeapWindowBuffer.Factory); DataStream> window2 = source - .windowAll(TumblingProcessingTimeWindows.of(1000)) + .windowAll(TumblingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS))) .trigger(CountTrigger.of(100)) .apply(new AllWindowFunction, Tuple2, TimeWindow>() { private static final long serialVersionUID = 1L; @@ -144,7 +147,7 @@ public void testEvicting() throws Exception { DummyReducer reducer = new DummyReducer(); DataStream> window1 = source - .windowAll(SlidingProcessingTimeWindows.of(1000, 100)) + .windowAll(SlidingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS))) .evictor(CountEvictor.of(100)) .reduceWindow(reducer); @@ -158,9 +161,9 @@ public void testEvicting() throws Exception { Assert.assertTrue(winOperator1.getWindowBufferFactory() instanceof HeapWindowBuffer.Factory); DataStream> window2 = source - .windowAll(TumblingProcessingTimeWindows.of(1000)) + .windowAll(TumblingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS))) .trigger(CountTrigger.of(100)) - .evictor(TimeEvictor.of(100)) + .evictor(TimeEvictor.of(Time.of(100, TimeUnit.MILLISECONDS))) .apply(new AllWindowFunction, Tuple2, TimeWindow>() { private static final long serialVersionUID = 1L; diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperatorTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperatorTest.java index 9b0bcc4fd57d3..6cc8931959593 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperatorTest.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperatorTest.java @@ -28,6 +28,7 @@ import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows; import org.apache.flink.streaming.api.windowing.assigners.SlidingTimeWindows; import org.apache.flink.streaming.api.windowing.assigners.TumblingTimeWindows; +import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.triggers.ContinuousWatermarkTrigger; import org.apache.flink.streaming.api.windowing.triggers.CountTrigger; import org.apache.flink.streaming.api.windowing.triggers.PurgingTrigger; @@ -49,6 +50,7 @@ import java.util.Collection; import java.util.Comparator; import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; @RunWith(Parameterized.class) @@ -69,11 +71,11 @@ public NonKeyedWindowOperatorTest(WindowBufferFactory windowBufferFactory) public void testSlidingEventTimeWindows() throws Exception { closeCalled.set(0); - final int WINDOW_SIZE = 3000; - final int WINDOW_SLIDE = 1000; + final int WINDOW_SIZE = 3; + final int WINDOW_SLIDE = 1; NonKeyedWindowOperator, Tuple2, TimeWindow> operator = new NonKeyedWindowOperator<>( - SlidingTimeWindows.of(WINDOW_SIZE, WINDOW_SLIDE), + SlidingTimeWindows.of(Time.of(WINDOW_SIZE, TimeUnit.SECONDS), Time.of(WINDOW_SLIDE, TimeUnit.SECONDS)), windowBufferFactory, new ReduceAllWindowFunction>(new SumReducer()), WatermarkTrigger.create()); @@ -150,10 +152,10 @@ public void testSlidingEventTimeWindows() throws Exception { public void testTumblingEventTimeWindows() throws Exception { closeCalled.set(0); - final int WINDOW_SIZE = 3000; + final int WINDOW_SIZE = 3; NonKeyedWindowOperator, Tuple2, TimeWindow> operator = new NonKeyedWindowOperator<>( - TumblingTimeWindows.of(WINDOW_SIZE), + TumblingTimeWindows.of(Time.of(WINDOW_SIZE, TimeUnit.SECONDS)), windowBufferFactory, new ReduceAllWindowFunction>(new SumReducer()), WatermarkTrigger.create()); @@ -228,13 +230,13 @@ public void testTumblingEventTimeWindows() throws Exception { public void testContinuousWatermarkTrigger() throws Exception { closeCalled.set(0); - final int WINDOW_SIZE = 3000; + final int WINDOW_SIZE = 3; NonKeyedWindowOperator, Tuple2, GlobalWindow> operator = new NonKeyedWindowOperator<>( GlobalWindows.create(), windowBufferFactory, new ReduceAllWindowFunction>(new SumReducer()), - ContinuousWatermarkTrigger.of(WINDOW_SIZE)); + ContinuousWatermarkTrigger.of(Time.of(WINDOW_SIZE, TimeUnit.SECONDS))); operator.setInputType(TypeInfoParser.>parse("Tuple2"), new ExecutionConfig()); @@ -421,13 +423,4 @@ public int compare(Object o1, Object o2) { } } } - - private static class TupleKeySelector implements KeySelector, String> { - private static final long serialVersionUID = 1L; - - @Override - public String getKey(Tuple2 value) throws Exception { - return value.f0; - } - } } diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java index 1bfd1d5878e08..d387df00a666e 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java @@ -27,6 +27,7 @@ import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows; import org.apache.flink.streaming.api.windowing.assigners.SlidingTimeWindows; import org.apache.flink.streaming.api.windowing.assigners.TumblingTimeWindows; +import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.runtime.operators.windowing.buffers.HeapWindowBuffer; import org.apache.flink.streaming.runtime.operators.windowing.buffers.PreAggregatingHeapWindowBuffer; import org.apache.flink.streaming.runtime.operators.windowing.buffers.WindowBufferFactory; @@ -49,6 +50,7 @@ import java.util.Collection; import java.util.Comparator; import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; @RunWith(Parameterized.class) @@ -69,11 +71,11 @@ public WindowOperatorTest(WindowBufferFactory windowBufferFactory) { public void testSlidingEventTimeWindows() throws Exception { closeCalled.set(0); - final int WINDOW_SIZE = 3000; - final int WINDOW_SLIDE = 1000; + final int WINDOW_SIZE = 3; + final int WINDOW_SLIDE = 1; WindowOperator, Tuple2, TimeWindow> operator = new WindowOperator<>( - SlidingTimeWindows.of(WINDOW_SIZE, WINDOW_SLIDE), + SlidingTimeWindows.of(Time.of(WINDOW_SIZE, TimeUnit.SECONDS), Time.of(WINDOW_SLIDE, TimeUnit.SECONDS)), new TupleKeySelector(), windowBufferFactory, new ReduceWindowFunction>(new SumReducer()), @@ -157,10 +159,10 @@ public void testSlidingEventTimeWindows() throws Exception { public void testTumblingEventTimeWindows() throws Exception { closeCalled.set(0); - final int WINDOW_SIZE = 3000; + final int WINDOW_SIZE = 3; WindowOperator, Tuple2, TimeWindow> operator = new WindowOperator<>( - TumblingTimeWindows.of(WINDOW_SIZE), + TumblingTimeWindows.of(Time.of(WINDOW_SIZE, TimeUnit.SECONDS)), new TupleKeySelector(), windowBufferFactory, new ReduceWindowFunction>(new SumReducer()), @@ -240,14 +242,14 @@ public void testTumblingEventTimeWindows() throws Exception { public void testContinuousWatermarkTrigger() throws Exception { closeCalled.set(0); - final int WINDOW_SIZE = 3000; + final int WINDOW_SIZE = 3; WindowOperator, Tuple2, GlobalWindow> operator = new WindowOperator<>( GlobalWindows.create(), new TupleKeySelector(), windowBufferFactory, new ReduceWindowFunction>(new SumReducer()), - ContinuousWatermarkTrigger.of(WINDOW_SIZE)); + ContinuousWatermarkTrigger.of(Time.of(WINDOW_SIZE, TimeUnit.SECONDS))); operator.setInputType(TypeInfoParser.>parse("Tuple2"), new ExecutionConfig()); diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowTranslationTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowTranslationTest.java index a3e60854b37c7..9dc66879d689c 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowTranslationTest.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowTranslationTest.java @@ -30,6 +30,7 @@ import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows; import org.apache.flink.streaming.api.windowing.evictors.CountEvictor; import org.apache.flink.streaming.api.windowing.evictors.TimeEvictor; +import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.triggers.CountTrigger; import org.apache.flink.streaming.api.windowing.triggers.ProcessingTimeTrigger; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; @@ -40,6 +41,8 @@ import org.junit.Assert; import org.junit.Test; +import java.util.concurrent.TimeUnit; + /** * These tests verify that the api calls on * {@link WindowedStream} instantiate @@ -61,7 +64,8 @@ public void testFastTimeWindows() throws Exception { DataStream> window1 = source .keyBy(0) - .window(SlidingProcessingTimeWindows.of(1000, 100)) + .window(SlidingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS), + Time.of(100, TimeUnit.MILLISECONDS))) .reduceWindow(reducer); OneInputTransformation, Tuple2> transform1 = (OneInputTransformation, Tuple2>) window1.getTransformation(); @@ -70,7 +74,7 @@ public void testFastTimeWindows() throws Exception { DataStream> window2 = source .keyBy(0) - .window(SlidingProcessingTimeWindows.of(1000, 100)) + .window(SlidingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS))) .apply(new WindowFunction, Tuple2, Tuple, TimeWindow>() { private static final long serialVersionUID = 1L; @@ -99,7 +103,7 @@ public void testNonEvicting() throws Exception { DataStream> window1 = source .keyBy(0) - .window(SlidingProcessingTimeWindows.of(1000, 100)) + .window(SlidingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS))) .trigger(CountTrigger.of(100)) .reduceWindow(reducer); @@ -113,7 +117,7 @@ public void testNonEvicting() throws Exception { DataStream> window2 = source .keyBy(0) - .window(TumblingProcessingTimeWindows.of(1000)) + .window(TumblingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS))) .trigger(CountTrigger.of(100)) .apply(new WindowFunction, Tuple2, Tuple, TimeWindow>() { private static final long serialVersionUID = 1L; @@ -147,7 +151,7 @@ public void testEvicting() throws Exception { DataStream> window1 = source .keyBy(0) - .window(SlidingProcessingTimeWindows.of(1000, 100)) + .window(SlidingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS))) .evictor(CountEvictor.of(100)) .reduceWindow(reducer); @@ -162,9 +166,9 @@ public void testEvicting() throws Exception { DataStream> window2 = source .keyBy(0) - .window(TumblingProcessingTimeWindows.of(1000)) + .window(TumblingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS))) .trigger(CountTrigger.of(100)) - .evictor(TimeEvictor.of(100)) + .evictor(TimeEvictor.of(Time.of(100, TimeUnit.MILLISECONDS))) .apply(new WindowFunction, Tuple2, Tuple, TimeWindow>() { private static final long serialVersionUID = 1L; diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala index 7dfaeef90f960..6ad7629dcacd2 100644 --- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala +++ b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala @@ -629,11 +629,11 @@ class DataStream[T](javaStream: JavaStream[T]) { actualSize match { case t: EventTime => - val assigner = TumblingTimeWindows.of(actualSize.toMilliseconds) + val assigner = TumblingTimeWindows.of(actualSize) .asInstanceOf[WindowAssigner[T, TimeWindow]] windowAll(assigner) case t: ProcessingTime => - val assigner = TumblingProcessingTimeWindows.of(actualSize.toMilliseconds) + val assigner = TumblingProcessingTimeWindows.of(actualSize) .asInstanceOf[WindowAssigner[T, TimeWindow]] windowAll(assigner) case _ => throw new RuntimeException("Invalid time: " + actualSize) @@ -658,13 +658,13 @@ class DataStream[T](javaStream: JavaStream[T]) { actualSize match { case t: EventTime => val assigner = SlidingTimeWindows.of( - actualSize.toMilliseconds, - actualSlide.toMilliseconds).asInstanceOf[WindowAssigner[T, TimeWindow]] + actualSize, + actualSlide).asInstanceOf[WindowAssigner[T, TimeWindow]] windowAll(assigner) case t: ProcessingTime => val assigner = SlidingProcessingTimeWindows.of( - actualSize.toMilliseconds, - actualSlide.toMilliseconds).asInstanceOf[WindowAssigner[T, TimeWindow]] + actualSize, + actualSlide).asInstanceOf[WindowAssigner[T, TimeWindow]] windowAll(assigner) case _ => throw new RuntimeException("Invalid time: " + actualSize) } diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala index 232e4bb860681..18b71bed63ce4 100644 --- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala +++ b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala @@ -55,11 +55,11 @@ class KeyedStream[T, K](javaStream: KeyedJavaStream[T, K]) extends DataStream[T] actualSize match { case t: EventTime => - val assigner = TumblingTimeWindows.of(actualSize.toMilliseconds) + val assigner = TumblingTimeWindows.of(actualSize) .asInstanceOf[WindowAssigner[T, TimeWindow]] window(assigner) case t: ProcessingTime => - val assigner = TumblingProcessingTimeWindows.of(actualSize.toMilliseconds) + val assigner = TumblingProcessingTimeWindows.of(actualSize) .asInstanceOf[WindowAssigner[T, TimeWindow]] window(assigner) case _ => throw new RuntimeException("Invalid time: " + actualSize) @@ -85,13 +85,13 @@ class KeyedStream[T, K](javaStream: KeyedJavaStream[T, K]) extends DataStream[T] actualSize match { case t: EventTime => val assigner = SlidingTimeWindows.of( - actualSize.toMilliseconds, - actualSlide.toMilliseconds).asInstanceOf[WindowAssigner[T, TimeWindow]] + actualSize, + actualSlide).asInstanceOf[WindowAssigner[T, TimeWindow]] window(assigner) case t: ProcessingTime => val assigner = SlidingProcessingTimeWindows.of( - actualSize.toMilliseconds, - actualSlide.toMilliseconds).asInstanceOf[WindowAssigner[T, TimeWindow]] + actualSize, + actualSlide).asInstanceOf[WindowAssigner[T, TimeWindow]] window(assigner) case _ => throw new RuntimeException("Invalid time: " + actualSize) } diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/AllWindowTranslationTest.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/AllWindowTranslationTest.scala index 35c7fcc12139b..247256f019028 100644 --- a/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/AllWindowTranslationTest.scala +++ b/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/AllWindowTranslationTest.scala @@ -19,13 +19,15 @@ package org.apache.flink.streaming.api.scala +import java.util.concurrent.TimeUnit + import org.apache.flink.api.common.functions.RichReduceFunction -import org.apache.flink.api.java.tuple.Tuple -import org.apache.flink.streaming.api.functions.windowing.{AllWindowFunction, WindowFunction} +import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction import org.apache.flink.streaming.api.transformations.OneInputTransformation import org.apache.flink.streaming.api.windowing.assigners.{TumblingProcessingTimeWindows, SlidingProcessingTimeWindows} import org.apache.flink.streaming.api.windowing.evictors.{CountEvictor, TimeEvictor} -import org.apache.flink.streaming.api.windowing.triggers.{ProcessingTimeTrigger, WatermarkTrigger, CountTrigger} +import org.apache.flink.streaming.api.windowing.time.Time +import org.apache.flink.streaming.api.windowing.triggers.{ProcessingTimeTrigger, CountTrigger} import org.apache.flink.streaming.api.windowing.windows.TimeWindow import org.apache.flink.streaming.runtime.operators.windowing.buffers.{HeapWindowBuffer, PreAggregatingHeapWindowBuffer} import org.apache.flink.streaming.runtime.operators.windowing._ @@ -53,7 +55,9 @@ class AllWindowTranslationTest extends StreamingMultipleProgramsTestBase { val reducer = new DummyReducer val window1 = source - .windowAll(SlidingProcessingTimeWindows.of(1000, 100)) + .windowAll(SlidingProcessingTimeWindows.of( + Time.of(1, TimeUnit.SECONDS), + Time.of(100, TimeUnit.MILLISECONDS))) .reduceWindow(reducer) val transform1 = window1.getJavaStream.getTransformation @@ -65,10 +69,11 @@ class AllWindowTranslationTest extends StreamingMultipleProgramsTestBase { val window2 = source .keyBy(0) - .window(SlidingProcessingTimeWindows.of(1000, 100)) - .apply(new WindowFunction[(String, Int), (String, Int), Tuple, TimeWindow]() { + .windowAll(SlidingProcessingTimeWindows.of( + Time.of(1, TimeUnit.SECONDS), + Time.of(100, TimeUnit.MILLISECONDS))) + .apply(new AllWindowFunction[(String, Int), (String, Int), TimeWindow]() { def apply( - tuple: Tuple, window: TimeWindow, values: java.lang.Iterable[(String, Int)], out: Collector[(String, Int)]) { } @@ -91,7 +96,9 @@ class AllWindowTranslationTest extends StreamingMultipleProgramsTestBase { val reducer = new DummyReducer val window1 = source - .windowAll(SlidingProcessingTimeWindows.of(1000, 100)) + .windowAll(SlidingProcessingTimeWindows.of( + Time.of(1, TimeUnit.SECONDS), + Time.of(100, TimeUnit.MILLISECONDS))) .trigger(CountTrigger.of(100)) .reduceWindow(reducer) @@ -109,7 +116,7 @@ class AllWindowTranslationTest extends StreamingMultipleProgramsTestBase { val window2 = source - .windowAll(TumblingProcessingTimeWindows.of(1000)) + .windowAll(TumblingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS))) .trigger(CountTrigger.of(100)) .apply(new AllWindowFunction[(String, Int), (String, Int), TimeWindow]() { def apply( @@ -139,8 +146,10 @@ class AllWindowTranslationTest extends StreamingMultipleProgramsTestBase { val reducer = new DummyReducer val window1 = source - .windowAll(SlidingProcessingTimeWindows.of(1000, 100)) - .evictor(TimeEvictor.of(1000)) + .windowAll(SlidingProcessingTimeWindows.of( + Time.of(1, TimeUnit.SECONDS), + Time.of(100, TimeUnit.MILLISECONDS))) + .evictor(TimeEvictor.of(Time.of(1, TimeUnit.SECONDS))) .reduceWindow(reducer) val transform1 = window1.getJavaStream.getTransformation @@ -157,7 +166,7 @@ class AllWindowTranslationTest extends StreamingMultipleProgramsTestBase { val window2 = source - .windowAll(TumblingProcessingTimeWindows.of(1000)) + .windowAll(TumblingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS))) .trigger(CountTrigger.of(100)) .evictor(CountEvictor.of(1000)) .apply(new AllWindowFunction[(String, Int), (String, Int), TimeWindow]() { diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowTranslationTest.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowTranslationTest.scala index 49d0a1a5e8f92..f1b05c6267718 100644 --- a/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowTranslationTest.scala +++ b/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowTranslationTest.scala @@ -18,12 +18,14 @@ package org.apache.flink.streaming.api.scala +import java.util.concurrent.TimeUnit import org.apache.flink.api.java.tuple.Tuple import org.apache.flink.streaming.api.functions.windowing.WindowFunction import org.apache.flink.streaming.api.transformations.OneInputTransformation import org.apache.flink.streaming.api.windowing.assigners.{TumblingProcessingTimeWindows, SlidingProcessingTimeWindows} import org.apache.flink.streaming.api.windowing.evictors.{CountEvictor, TimeEvictor} +import org.apache.flink.streaming.api.windowing.time.Time import org.apache.flink.streaming.api.windowing.triggers.{ProcessingTimeTrigger, CountTrigger} import org.apache.flink.streaming.api.windowing.windows.TimeWindow import org.apache.flink.streaming.runtime.operators.windowing.buffers.{HeapWindowBuffer, PreAggregatingHeapWindowBuffer} @@ -50,7 +52,9 @@ class WindowTranslationTest extends StreamingMultipleProgramsTestBase { val window1 = source .keyBy(0) - .window(SlidingProcessingTimeWindows.of(1000, 100)) + .window(SlidingProcessingTimeWindows.of( + Time.of(1, TimeUnit.SECONDS), + Time.of(100, TimeUnit.MILLISECONDS))) .reduceWindow(reducer) val transform1 = window1.getJavaStream.getTransformation @@ -62,7 +66,9 @@ class WindowTranslationTest extends StreamingMultipleProgramsTestBase { val window2 = source .keyBy(0) - .window(SlidingProcessingTimeWindows.of(1000, 100)) + .window(SlidingProcessingTimeWindows.of( + Time.of(1, TimeUnit.SECONDS), + Time.of(100, TimeUnit.MILLISECONDS))) .apply(new WindowFunction[(String, Int), (String, Int), Tuple, TimeWindow]() { def apply( key: Tuple, @@ -89,7 +95,9 @@ class WindowTranslationTest extends StreamingMultipleProgramsTestBase { val window1 = source .keyBy(0) - .window(SlidingProcessingTimeWindows.of(1000, 100)) + .window(SlidingProcessingTimeWindows.of( + Time.of(1, TimeUnit.SECONDS), + Time.of(100, TimeUnit.MILLISECONDS))) .trigger(CountTrigger.of(100)) .reduceWindow(reducer) @@ -108,7 +116,7 @@ class WindowTranslationTest extends StreamingMultipleProgramsTestBase { val window2 = source .keyBy(0) - .window(TumblingProcessingTimeWindows.of(1000)) + .window(TumblingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS))) .trigger(CountTrigger.of(100)) .apply(new WindowFunction[(String, Int), (String, Int), Tuple, TimeWindow]() { def apply( @@ -140,8 +148,10 @@ class WindowTranslationTest extends StreamingMultipleProgramsTestBase { val window1 = source .keyBy(0) - .window(SlidingProcessingTimeWindows.of(1000, 100)) - .evictor(TimeEvictor.of(1000)) + .window(SlidingProcessingTimeWindows.of( + Time.of(1, TimeUnit.SECONDS), + Time.of(100, TimeUnit.MILLISECONDS))) + .evictor(TimeEvictor.of(Time.of(1, TimeUnit.SECONDS))) .reduceWindow(reducer) val transform1 = window1.getJavaStream.getTransformation @@ -159,7 +169,7 @@ class WindowTranslationTest extends StreamingMultipleProgramsTestBase { val window2 = source .keyBy(0) - .window(TumblingProcessingTimeWindows.of(1000)) + .window(TumblingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS))) .trigger(CountTrigger.of(100)) .evictor(CountEvictor.of(1000)) .apply(new WindowFunction[(String, Int), (String, Int), Tuple, TimeWindow]() {