From d31cc8125b30212b6ac21996a48d703eb11354e9 Mon Sep 17 00:00:00 2001 From: renkai Date: Thu, 11 Aug 2016 18:48:50 +0800 Subject: [PATCH 1/4] [FLINK-4282]Add Offset Parameter to WindowAssigners --- .../assigners/SlidingEventTimeWindows.java | 34 +++- .../SlidingProcessingTimeWindows.java | 34 +++- .../assigners/SlidingTimeWindows.java | 2 +- .../assigners/TumblingEventTimeWindows.java | 35 +++- .../TumblingProcessingTimeWindows.java | 34 +++- .../assigners/TumblingTimeWindows.java | 2 +- .../api/windowing/windows/TimeWindow.java | 24 +++ .../windowing/WindowOperatorTest.java | 54 ++++-- .../windowing/WindowingTestHarnessTest.java | 174 ++++++++++++++++++ 9 files changed, 365 insertions(+), 28 deletions(-) diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingEventTimeWindows.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingEventTimeWindows.java index 8fd0d2532414d..5394686a8a8ed 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingEventTimeWindows.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingEventTimeWindows.java @@ -52,16 +52,19 @@ public class SlidingEventTimeWindows extends WindowAssigner private final long slide; - protected SlidingEventTimeWindows(long size, long slide) { + private final long offset; + + protected SlidingEventTimeWindows(long size, long slide,long offset) { this.size = size; this.slide = slide; + this.offset = offset; } @Override public Collection assignWindows(Object element, long timestamp, WindowAssignerContext context) { if (timestamp > Long.MIN_VALUE) { List windows = new ArrayList<>((int) (size / slide)); - long lastStart = timestamp - timestamp % slide; + long lastStart = TimeWindow.getWindowStartWithOffset(timestamp,offset,slide); for (long start = lastStart; start > timestamp - size; start -= slide) { @@ -102,7 +105,32 @@ public String toString() { * @return The time policy. */ public static SlidingEventTimeWindows of(Time size, Time slide) { - return new SlidingEventTimeWindows(size.toMilliseconds(), slide.toMilliseconds()); + return new SlidingEventTimeWindows(size.toMilliseconds(), slide.toMilliseconds(),0); + } + + /** + * Creates a new {@code SlidingEventTimeWindows} {@link WindowAssigner} that assigns + * elements to time windows based on the element timestamp and offset. + *

+ * For example, if you want window a stream by hour,but window begins at the 15th minutes + * of each hour, you can use {@code of(Time.hours(1),Time.minutes(15))},then you will get + * time windows start at 0:15:00,1:15:00,2:15:00,etc. + *

+ * + *

+ * Rather than that,if you are living in somewhere which is not using UTC±00:00 time, + * such as China which is using UTC+08:00,and you want a time window with size of one day, + * and window begins at every 00:00:00 of local time,you may use {@code of(Time.days(1),Time.hours(-8))}. + * The parameter of offset is {@code Time.hours(-8))} since UTC+08:00 is 8 hours earlier than UTC time. + *

+ * @param size The size of the generated windows. + * @param slide The slide interval of the generated windows. + * @param offset The offset which window start would be shifted by. + * @return The time policy. + */ + public static SlidingEventTimeWindows of(Time size, Time slide,Time offset) { + return new SlidingEventTimeWindows(size.toMilliseconds(), slide.toMilliseconds(), + offset.toMilliseconds() % slide.toMilliseconds()); } @Override diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingProcessingTimeWindows.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingProcessingTimeWindows.java index 6a0364007e3bd..ec47bf4cf8c1c 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingProcessingTimeWindows.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingProcessingTimeWindows.java @@ -47,18 +47,21 @@ public class SlidingProcessingTimeWindows extends WindowAssigner assignWindows(Object element, long timestamp, WindowAssignerContext context) { timestamp = context.getCurrentProcessingTime(); List windows = new ArrayList<>((int) (size / slide)); - long lastStart = timestamp - timestamp % slide; + long lastStart = TimeWindow.getWindowStartWithOffset(timestamp,offset,slide); for (long start = lastStart; start > timestamp - size; start -= slide) { @@ -94,7 +97,32 @@ public String toString() { * @return The time policy. */ public static SlidingProcessingTimeWindows of(Time size, Time slide) { - return new SlidingProcessingTimeWindows(size.toMilliseconds(), slide.toMilliseconds()); + return new SlidingProcessingTimeWindows(size.toMilliseconds(), slide.toMilliseconds(),0); + } + + /** + * Creates a new {@code SlidingProcessingTimeWindows} {@link WindowAssigner} that assigns + * elements to time windows based on the element timestamp and offset. + *

+ * For example, if you want window a stream by hour,but window begins at the 15th minutes + * of each hour, you can use {@code of(Time.hours(1),Time.minutes(15))},then you will get + * time windows start at 0:15:00,1:15:00,2:15:00,etc. + *

+ * + *

+ * Rather than that,if you are living in somewhere which is not using UTC±00:00 time, + * such as China which is using UTC+08:00,and you want a time window with size of one day, + * and window begins at every 00:00:00 of local time,you may use {@code of(Time.days(1),Time.hours(-8))}. + * The parameter of offset is {@code Time.hours(-8))} since UTC+08:00 is 8 hours earlier than UTC time. + *

+ * @param size The size of the generated windows. + * @param slide The slide interval of the generated windows. + * @param offset The offset which window start would be shifted by. + * @return The time policy. + */ + public static SlidingProcessingTimeWindows of(Time size, Time slide,Time offset) { + return new SlidingProcessingTimeWindows(size.toMilliseconds(), slide.toMilliseconds(), + offset.toMilliseconds() % slide.toMilliseconds()); } @Override diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingTimeWindows.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingTimeWindows.java index 581bbe195dd42..1bf2654d31965 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingTimeWindows.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingTimeWindows.java @@ -33,7 +33,7 @@ public class SlidingTimeWindows extends SlidingEventTimeWindows { private static final long serialVersionUID = 1L; private SlidingTimeWindows(long size, long slide) { - super(size, slide); + super(size, slide,0); } /** diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingEventTimeWindows.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingEventTimeWindows.java index 44464f04816ab..1fd122617caff 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingEventTimeWindows.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingEventTimeWindows.java @@ -47,17 +47,19 @@ public class TumblingEventTimeWindows extends WindowAssigner { private static final long serialVersionUID = 1L; - private long size; + private final long size; - protected TumblingEventTimeWindows(long size) { + private final long offset; + + protected TumblingEventTimeWindows(long size,long offset){ this.size = size; + this.offset = offset; } - @Override public Collection assignWindows(Object element, long timestamp, WindowAssignerContext context) { if (timestamp > Long.MIN_VALUE) { // Long.MIN_VALUE is currently assigned when no timestamp is present - long start = timestamp - (timestamp % size); + long start = TimeWindow.getWindowStartWithOffset(timestamp,offset,size); return Collections.singletonList(new TimeWindow(start, start + size)); } else { throw new RuntimeException("Record has Long.MIN_VALUE timestamp (= no timestamp marker). " + @@ -88,7 +90,30 @@ public String toString() { * @return The time policy. */ public static TumblingEventTimeWindows of(Time size) { - return new TumblingEventTimeWindows(size.toMilliseconds()); + return new TumblingEventTimeWindows(size.toMilliseconds(),0); + } + + /** + * Creates a new {@code TumblingEventTimeWindows} {@link WindowAssigner} that assigns + * elements to time windows based on the element timestamp and offset. + *

+ * For example, if you want window a stream by hour,but window begins at the 15th minutes + * of each hour, you can use {@code of(Time.hours(1),Time.minutes(15))},then you will get + * time windows start at 0:15:00,1:15:00,2:15:00,etc. + *

+ * + *

+ * Rather than that,if you are living in somewhere which is not using UTC±00:00 time, + * such as China which is using UTC+08:00,and you want a time window with size of one day, + * and window begins at every 00:00:00 of local time,you may use {@code of(Time.days(1),Time.hours(-8))}. + * The parameter of offset is {@code Time.hours(-8))} since UTC+08:00 is 8 hours earlier than UTC time. + *

+ * @param size The size of the generated windows. + * @param offset The offset which window start would be shifted by. + * @return The time policy. + */ + public static TumblingEventTimeWindows of(Time size, Time offset) { + return new TumblingEventTimeWindows(size.toMilliseconds(),offset.toMilliseconds() % size.toMilliseconds()); } @Override diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingProcessingTimeWindows.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingProcessingTimeWindows.java index ce361441a1203..58e40ce6b8d5b 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingProcessingTimeWindows.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingProcessingTimeWindows.java @@ -44,16 +44,20 @@ public class TumblingProcessingTimeWindows extends WindowAssigner { private static final long serialVersionUID = 1L; - private long size; + private final long size; - private TumblingProcessingTimeWindows(long size) { + private final long offset; + + + private TumblingProcessingTimeWindows(long size,long offset) { this.size = size; + this.offset = offset; } @Override public Collection assignWindows(Object element, long timestamp, WindowAssignerContext context) { final long now = context.getCurrentProcessingTime(); - long start = now - (now % size); + long start = TimeWindow.getWindowStartWithOffset(now,offset,size); return Collections.singletonList(new TimeWindow(start, start + size)); } @@ -79,9 +83,31 @@ public String toString() { * @return The time policy. */ public static TumblingProcessingTimeWindows of(Time size) { - return new TumblingProcessingTimeWindows(size.toMilliseconds()); + return new TumblingProcessingTimeWindows(size.toMilliseconds(),0); } + /** + * Creates a new {@code TumblingProcessingTimeWindows} {@link WindowAssigner} that assigns + * elements to time windows based on the element timestamp and offset. + *

+ * For example, if you want window a stream by hour,but window begins at the 15th minutes + * of each hour, you can use {@code of(Time.hours(1),Time.minutes(15))},then you will get + * time windows start at 0:15:00,1:15:00,2:15:00,etc. + *

+ * + *

+ * Rather than that,if you are living in somewhere which is not using UTC±00:00 time, + * such as China which is using UTC+08:00,and you want a time window with size of one day, + * and window begins at every 00:00:00 of local time,you may use {@code of(Time.days(1),Time.hours(-8))}. + * The parameter of offset is {@code Time.hours(-8))} since UTC+08:00 is 8 hours earlier than UTC time. + *

+ * @param size The size of the generated windows. + * @param offset The offset which window start would be shifted by. + * @return The time policy. + */ + public static TumblingProcessingTimeWindows of(Time size,Time offset) { + return new TumblingProcessingTimeWindows(size.toMilliseconds(), offset.toMilliseconds() % size.toMilliseconds()); + } @Override public TypeSerializer getWindowSerializer(ExecutionConfig executionConfig) { return new TimeWindow.Serializer(); diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingTimeWindows.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingTimeWindows.java index 156b1e96b0023..a27efcd4dd690 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingTimeWindows.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingTimeWindows.java @@ -33,7 +33,7 @@ public class TumblingTimeWindows extends TumblingEventTimeWindows { private static final long serialVersionUID = 1L; private TumblingTimeWindows(long size) { - super(size); + super(size,0); } /** diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/windows/TimeWindow.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/windows/TimeWindow.java index 5dfd60b492b60..8f9494e3ac4ab 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/windows/TimeWindow.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/windows/TimeWindow.java @@ -227,4 +227,28 @@ public int compare(TimeWindow o1, TimeWindow o2) { } } } + + /** + * Method to get the window start for a timestamp. + * @param timestamp epoch millisecond to get the window start. + * @param offset The offset which window start would be shifted by. + * @param windowSize The size of the generated windows. + * @return window start + */ + public static long getWindowStartWithOffset(long timestamp,long offset,long windowSize){ + if(Math.abs(offset) >= windowSize) { + throw new RuntimeException("Offset for TimeWindow should not be larger than or equal to windowSize" + + String.format(" offset: %s window size: %s ",offset,windowSize)); + } + long start = (timestamp - (timestamp %windowSize))+offset; + if(offset!=0){ + if(start + windowSize <= timestamp) { + start += windowSize; + } + if(start>timestamp) { + start -= windowSize; + } + } + return start; + } } diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java index 62266c4327425..dfb37fc364dad 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java @@ -553,7 +553,7 @@ public void testSessionWindowsWithCountTrigger() throws Exception { new OneInputStreamOperatorTestHarness<>(operator); testHarness.configureForKeyedStream(new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); - + ConcurrentLinkedQueue expectedOutput = new ConcurrentLinkedQueue<>(); testHarness.open(); @@ -660,7 +660,7 @@ public void testPointSessions() throws Exception { new OneInputStreamOperatorTestHarness<>(operator); testHarness.configureForKeyedStream(new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); - + ConcurrentLinkedQueue expectedOutput = new ConcurrentLinkedQueue<>(); testHarness.open(); @@ -722,7 +722,7 @@ public void testContinuousWatermarkTrigger() throws Exception { new OneInputStreamOperatorTestHarness<>(operator); testHarness.configureForKeyedStream(new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); - + ConcurrentLinkedQueue expectedOutput = new ConcurrentLinkedQueue<>(); testHarness.open(); @@ -815,7 +815,7 @@ public void testCountTrigger() throws Exception { new OneInputStreamOperatorTestHarness<>(operator); testHarness.configureForKeyedStream(new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); - + ConcurrentLinkedQueue expectedOutput = new ConcurrentLinkedQueue<>(); testHarness.open(); @@ -1167,7 +1167,7 @@ public void testLateness() throws Exception { operator.setInputType(inputType, new ExecutionConfig()); testHarness.open(); - + ConcurrentLinkedQueue expected = new ConcurrentLinkedQueue<>(); testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 500)); @@ -1362,7 +1362,7 @@ public void testDropDueToLatenessSliding() throws Exception { operator.setInputType(inputType, new ExecutionConfig()); testHarness.open(); - + ConcurrentLinkedQueue expected = new ConcurrentLinkedQueue<>(); testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 1000)); @@ -1441,7 +1441,7 @@ public void testDropDueToLatenessSessionZeroLatenessPurgingTrigger() throws Exce testHarness.configureForKeyedStream(new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); testHarness.open(); - + ConcurrentLinkedQueue expected = new ConcurrentLinkedQueue<>(); testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 1000)); @@ -1535,7 +1535,7 @@ public void testDropDueToLatenessSessionZeroLateness() throws Exception { testHarness.configureForKeyedStream(new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); testHarness.open(); - + ConcurrentLinkedQueue expected = new ConcurrentLinkedQueue<>(); testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 1000)); @@ -1625,7 +1625,7 @@ public void testDropDueToLatenessSessionWithLatenessPurgingTrigger() throws Exce testHarness.open(); ConcurrentLinkedQueue expected = new ConcurrentLinkedQueue<>(); - + testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 1000)); testHarness.processWatermark(new Watermark(1999)); @@ -1711,7 +1711,7 @@ public void testDropDueToLatenessSessionWithLateness() throws Exception { testHarness.configureForKeyedStream(new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); testHarness.open(); - + ConcurrentLinkedQueue expected = new ConcurrentLinkedQueue<>(); testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 1000)); @@ -1808,7 +1808,7 @@ public void testDropDueToLatenessSessionWithHugeLatenessPurgingTrigger() throws testHarness.configureForKeyedStream(new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); testHarness.open(); - + ConcurrentLinkedQueue expected = new ConcurrentLinkedQueue<>(); testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 1000)); @@ -2550,4 +2550,36 @@ public String toString() { return "EventTimeTrigger()"; } } + + @Test + public void testGetWindowStartWithOffset() { + //[0,7),[7,14),[14,21)... + long offset = 0; + Assert.assertEquals(TimeWindow.getWindowStartWithOffset(1,offset,7),0); + Assert.assertEquals(TimeWindow.getWindowStartWithOffset(6,offset,7),0); + Assert.assertEquals(TimeWindow.getWindowStartWithOffset(7,offset,7),7); + Assert.assertEquals(TimeWindow.getWindowStartWithOffset(8,offset,7),7); + + //[-4,3),[3,10),[10,17)... + offset = 3; + Assert.assertEquals(TimeWindow.getWindowStartWithOffset(1,offset,7),-4); + Assert.assertEquals(TimeWindow.getWindowStartWithOffset(2,offset,7),-4); + Assert.assertEquals(TimeWindow.getWindowStartWithOffset(3,offset,7),3); + Assert.assertEquals(TimeWindow.getWindowStartWithOffset(9,offset,7),3); + Assert.assertEquals(TimeWindow.getWindowStartWithOffset(10,offset,7),10); + + //[-2,5),[5,12),[12,19)... + offset = -2; + Assert.assertEquals(TimeWindow.getWindowStartWithOffset(1,offset,7),-2); + Assert.assertEquals(TimeWindow.getWindowStartWithOffset(-2,offset,7),-2); + Assert.assertEquals(TimeWindow.getWindowStartWithOffset(3,offset,7),-2); + Assert.assertEquals(TimeWindow.getWindowStartWithOffset(4,offset,7),-2); + Assert.assertEquals(TimeWindow.getWindowStartWithOffset(7,offset,7),5); + Assert.assertEquals(TimeWindow.getWindowStartWithOffset(12,offset,7),12); + + // for GMT+8:00 + offset = - TimeUnit.HOURS.toMillis(8); + long size = TimeUnit.DAYS.toMillis(1); + Assert.assertEquals(TimeWindow.getWindowStartWithOffset(1470902048450l,offset,size),1470844800000l); + } } diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowingTestHarnessTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowingTestHarnessTest.java index 7242e1ca30663..b9ea09f2075c8 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowingTestHarnessTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowingTestHarnessTest.java @@ -24,6 +24,8 @@ import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.typeutils.TypeInfoParser; +import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows; +import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows; import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; @@ -83,6 +85,84 @@ public void testEventTimeTumblingWindows() throws Exception { testHarness.close(); } + @Test + public void testEventTimeTumblingWindowsWithOffset() throws Exception { + final int WINDOW_SIZE = 2000; + final int OFFSET = 100; + TypeInformation> inputType = TypeInfoParser.parse("Tuple2"); + + TumblingEventTimeWindows windowAssigner = TumblingEventTimeWindows.of(Time.milliseconds(WINDOW_SIZE),Time.milliseconds(OFFSET)); + + WindowingTestHarness, TimeWindow> testHarness = new WindowingTestHarness<>( + new ExecutionConfig(), + windowAssigner, + BasicTypeInfo.STRING_TYPE_INFO, + inputType, + new TupleKeySelector(), + EventTimeTrigger.create(), + 0); + + // normal element + testHarness.processElement(new Tuple2<>("key2", 1), 1000); + testHarness.processWatermark(1985); + + testHarness.addExpectedWatermark(1985); + + // this will not be dropped because window.maxTimestamp() + allowedLateness > currentWatermark + testHarness.processElement(new Tuple2<>("key2", 2), 1980); + + // dropped as late + testHarness.processElement(new Tuple2<>("key2", 3), 1998); + + testHarness.processElement(new Tuple2<>("key2", 4), 2001); + testHarness.processWatermark(2999); + + testHarness.addExpectedElement(new Tuple2<>("key2", 1), 1999 + OFFSET); + testHarness.addExpectedElement(new Tuple2<>("key2", 2), 1999 + OFFSET); + testHarness.addExpectedElement(new Tuple2<>("key2", 3), 1999 + OFFSET); + testHarness.addExpectedElement(new Tuple2<>("key2", 4), 1999 + OFFSET); + + testHarness.addExpectedWatermark(2999); + + testHarness.processWatermark(3999); + testHarness.addExpectedWatermark(3999); + + testHarness.compareActualToExpectedOutput("Output is not correct"); + + testHarness.close(); + } + + @Test + public void testEventTimeSlidingWindowsWithOffset() throws Exception { + final int WINDOW_SIZE = 2000; + final int SLIDE = 500; + final int OFFSET = 10; + TypeInformation> inputType = TypeInfoParser.parse("Tuple2"); + + SlidingEventTimeWindows windowAssigner = SlidingEventTimeWindows.of(Time.milliseconds(WINDOW_SIZE),Time.milliseconds(SLIDE),Time.milliseconds(OFFSET)); + + WindowingTestHarness, TimeWindow> testHarness = new WindowingTestHarness<>( + new ExecutionConfig(), + windowAssigner, + BasicTypeInfo.STRING_TYPE_INFO, + inputType, + new TupleKeySelector(), + EventTimeTrigger.create(), + 0); + + testHarness.processElement(new Tuple2<>("key2", 1), 333); + testHarness.processWatermark(6666); + + testHarness.addExpectedElement(new Tuple2<>("key2",1),499 + OFFSET); + testHarness.addExpectedElement(new Tuple2<>("key2",1),999 + OFFSET); + testHarness.addExpectedElement(new Tuple2<>("key2",1),1499 + OFFSET); + testHarness.addExpectedElement(new Tuple2<>("key2",1),1999 + OFFSET); + testHarness.addExpectedWatermark(6666); + testHarness.compareActualToExpectedOutput("Output is not correct"); + + testHarness.close(); + } + @Test public void testProcessingTimeTumblingWindows() throws Exception { final int WINDOW_SIZE = 3000; @@ -130,6 +210,100 @@ public void testProcessingTimeTumblingWindows() throws Exception { testHarness.addExpectedElement(new Tuple2<>("key1", 1), 5999); testHarness.addExpectedElement(new Tuple2<>("key1", 1), 5999); + testHarness.compareActualToExpectedOutput("Output was not correct."); + testHarness.close(); + } + + @Test + public void testProcessingTimeTumblingWindowsWithOffset() throws Exception { + final int WINDOW_SIZE = 3000; + final int OFFSET = 1000; + + TypeInformation> inputType = TypeInfoParser.parse("Tuple2"); + + TumblingProcessingTimeWindows windowAssigner = TumblingProcessingTimeWindows.of(Time.milliseconds(WINDOW_SIZE), + Time.milliseconds(OFFSET)); + + WindowingTestHarness, TimeWindow> testHarness = new WindowingTestHarness<>( + new ExecutionConfig(), + windowAssigner, + BasicTypeInfo.STRING_TYPE_INFO, + inputType, + new TupleKeySelector(), + ProcessingTimeTrigger.create(), + 0); + + testHarness.setProcessingTime(3); + + // timestamp is ignored in processing time + testHarness.processElement(new Tuple2<>("key2", 1), Long.MAX_VALUE); + testHarness.processElement(new Tuple2<>("key2", 1), 7000); + testHarness.processElement(new Tuple2<>("key2", 1), 7000); + + testHarness.processElement(new Tuple2<>("key1", 1), 7000); + testHarness.processElement(new Tuple2<>("key1", 1), 7000); + + testHarness.setProcessingTime(5000); + + testHarness.addExpectedElement(new Tuple2<>("key2", 1), 999); + testHarness.addExpectedElement(new Tuple2<>("key2", 1), 999); + testHarness.addExpectedElement(new Tuple2<>("key2", 1), 999); + testHarness.addExpectedElement(new Tuple2<>("key1", 1), 999); + testHarness.addExpectedElement(new Tuple2<>("key1", 1), 999); + + testHarness.compareActualToExpectedOutput("Output was not correct."); + + testHarness.processElement(new Tuple2<>("key1", 1), 7000); + testHarness.processElement(new Tuple2<>("key1", 1), 7000); + testHarness.processElement(new Tuple2<>("key1", 1), 7000); + + testHarness.setProcessingTime(7000); + + testHarness.addExpectedElement(new Tuple2<>("key1", 1), 6999); + testHarness.addExpectedElement(new Tuple2<>("key1", 1), 6999); + testHarness.addExpectedElement(new Tuple2<>("key1", 1), 6999); + + testHarness.compareActualToExpectedOutput("Output was not correct."); + + testHarness.close(); + } + + @Test + public void testProcessingTimeSlidingWindowsWithOffset() throws Exception { + final int WINDOW_SIZE = 3000; + final int SLIDING = 1000; + final int OFFSET = 10; + + TypeInformation> inputType = TypeInfoParser.parse("Tuple2"); + + SlidingProcessingTimeWindows windowAssigner = SlidingProcessingTimeWindows.of(Time.milliseconds(WINDOW_SIZE), + Time.milliseconds(SLIDING),Time.milliseconds(OFFSET)); + + WindowingTestHarness, TimeWindow> testHarness = new WindowingTestHarness<>( + new ExecutionConfig(), + windowAssigner, + BasicTypeInfo.STRING_TYPE_INFO, + inputType, + new TupleKeySelector(), + ProcessingTimeTrigger.create(), + 0); + + testHarness.setProcessingTime(3); + + // timestamp is ignored in processing time + testHarness.processElement(new Tuple2<>("key2", 1), Long.MAX_VALUE); + + testHarness.setProcessingTime(1111); + + testHarness.addExpectedElement(new Tuple2<>("key2", 1), OFFSET - 1); + testHarness.addExpectedElement(new Tuple2<>("key2", 1), OFFSET + 999); + + testHarness.processElement(new Tuple2<>("key2", 2),Long.MIN_VALUE); + testHarness.setProcessingTime(2222); + + testHarness.addExpectedElement(new Tuple2<>("key2", 1), OFFSET + 1999); + testHarness.addExpectedElement(new Tuple2<>("key2", 2), OFFSET + 1999); + testHarness.compareActualToExpectedOutput("Output was not correct."); testHarness.close(); From a0f2e593c53cac8b44599c8c16a727d7ff0fcbf1 Mon Sep 17 00:00:00 2001 From: renkai Date: Sat, 13 Aug 2016 09:24:59 +0800 Subject: [PATCH 2/4] [FLINK-4282]Add Offset Parameter to WindowAssigners Simplify the method getWindowStartWithOffset --- .../api/windowing/windows/TimeWindow.java | 17 ++--------------- 1 file changed, 2 insertions(+), 15 deletions(-) diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/windows/TimeWindow.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/windows/TimeWindow.java index 8f9494e3ac4ab..a6ac3b1a5d584 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/windows/TimeWindow.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/windows/TimeWindow.java @@ -235,20 +235,7 @@ public int compare(TimeWindow o1, TimeWindow o2) { * @param windowSize The size of the generated windows. * @return window start */ - public static long getWindowStartWithOffset(long timestamp,long offset,long windowSize){ - if(Math.abs(offset) >= windowSize) { - throw new RuntimeException("Offset for TimeWindow should not be larger than or equal to windowSize" + - String.format(" offset: %s window size: %s ",offset,windowSize)); - } - long start = (timestamp - (timestamp %windowSize))+offset; - if(offset!=0){ - if(start + windowSize <= timestamp) { - start += windowSize; - } - if(start>timestamp) { - start -= windowSize; - } - } - return start; + public static long getWindowStartWithOffset(long timestamp, long offset, long windowSize) { + return timestamp - ((timestamp - offset) % windowSize + windowSize) % windowSize; } } From d9cdf10ce5e62de7de2bf74376c45737ba223747 Mon Sep 17 00:00:00 2001 From: renkai Date: Sun, 14 Aug 2016 08:18:07 +0800 Subject: [PATCH 3/4] [FLINK-4282]Add Offset Parameter to WindowAssigners Simplify the method getWindowStartWithOffset,mod only once. --- .../flink/streaming/api/windowing/windows/TimeWindow.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/windows/TimeWindow.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/windows/TimeWindow.java index a6ac3b1a5d584..797d3ff9e7165 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/windows/TimeWindow.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/windows/TimeWindow.java @@ -236,6 +236,6 @@ public int compare(TimeWindow o1, TimeWindow o2) { * @return window start */ public static long getWindowStartWithOffset(long timestamp, long offset, long windowSize) { - return timestamp - ((timestamp - offset) % windowSize + windowSize) % windowSize; + return timestamp - (timestamp - offset + windowSize) % windowSize; } } From 6702553f8c9fbb8e5a392f62a9783ed39c630aa6 Mon Sep 17 00:00:00 2001 From: renkai Date: Fri, 19 Aug 2016 00:57:29 +0800 Subject: [PATCH 4/4] [FLINK-4282]Add Offset Parameter to WindowAssigners Reformat code, move tests to suitable place and revert unrelated code change --- .../assigners/SlidingEventTimeWindows.java | 8 +- .../SlidingProcessingTimeWindows.java | 8 +- .../assigners/SlidingTimeWindows.java | 2 +- .../assigners/TumblingEventTimeWindows.java | 8 +- .../TumblingProcessingTimeWindows.java | 6 +- .../assigners/TumblingTimeWindows.java | 2 +- .../operators/windowing/TimeWindowTest.java | 59 +++++ .../windowing/WindowOperatorTest.java | 222 ++++++++++++++---- .../windowing/WindowingTestHarnessTest.java | 174 -------------- 9 files changed, 258 insertions(+), 231 deletions(-) create mode 100644 flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TimeWindowTest.java diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingEventTimeWindows.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingEventTimeWindows.java index 5394686a8a8ed..16171a064cf79 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingEventTimeWindows.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingEventTimeWindows.java @@ -54,7 +54,7 @@ public class SlidingEventTimeWindows extends WindowAssigner private final long offset; - protected SlidingEventTimeWindows(long size, long slide,long offset) { + protected SlidingEventTimeWindows(long size, long slide, long offset) { this.size = size; this.slide = slide; this.offset = offset; @@ -64,7 +64,7 @@ protected SlidingEventTimeWindows(long size, long slide,long offset) { public Collection assignWindows(Object element, long timestamp, WindowAssignerContext context) { if (timestamp > Long.MIN_VALUE) { List windows = new ArrayList<>((int) (size / slide)); - long lastStart = TimeWindow.getWindowStartWithOffset(timestamp,offset,slide); + long lastStart = TimeWindow.getWindowStartWithOffset(timestamp, offset, slide); for (long start = lastStart; start > timestamp - size; start -= slide) { @@ -105,7 +105,7 @@ public String toString() { * @return The time policy. */ public static SlidingEventTimeWindows of(Time size, Time slide) { - return new SlidingEventTimeWindows(size.toMilliseconds(), slide.toMilliseconds(),0); + return new SlidingEventTimeWindows(size.toMilliseconds(), slide.toMilliseconds(), 0); } /** @@ -128,7 +128,7 @@ public static SlidingEventTimeWindows of(Time size, Time slide) { * @param offset The offset which window start would be shifted by. * @return The time policy. */ - public static SlidingEventTimeWindows of(Time size, Time slide,Time offset) { + public static SlidingEventTimeWindows of(Time size, Time slide, Time offset) { return new SlidingEventTimeWindows(size.toMilliseconds(), slide.toMilliseconds(), offset.toMilliseconds() % slide.toMilliseconds()); } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingProcessingTimeWindows.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingProcessingTimeWindows.java index ec47bf4cf8c1c..e03467f661e32 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingProcessingTimeWindows.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingProcessingTimeWindows.java @@ -51,7 +51,7 @@ public class SlidingProcessingTimeWindows extends WindowAssigner assignWindows(Object element, long timestamp, WindowAssignerContext context) { timestamp = context.getCurrentProcessingTime(); List windows = new ArrayList<>((int) (size / slide)); - long lastStart = TimeWindow.getWindowStartWithOffset(timestamp,offset,slide); + long lastStart = TimeWindow.getWindowStartWithOffset(timestamp, offset, slide); for (long start = lastStart; start > timestamp - size; start -= slide) { @@ -97,7 +97,7 @@ public String toString() { * @return The time policy. */ public static SlidingProcessingTimeWindows of(Time size, Time slide) { - return new SlidingProcessingTimeWindows(size.toMilliseconds(), slide.toMilliseconds(),0); + return new SlidingProcessingTimeWindows(size.toMilliseconds(), slide.toMilliseconds(), 0); } /** @@ -120,7 +120,7 @@ public static SlidingProcessingTimeWindows of(Time size, Time slide) { * @param offset The offset which window start would be shifted by. * @return The time policy. */ - public static SlidingProcessingTimeWindows of(Time size, Time slide,Time offset) { + public static SlidingProcessingTimeWindows of(Time size, Time slide, Time offset) { return new SlidingProcessingTimeWindows(size.toMilliseconds(), slide.toMilliseconds(), offset.toMilliseconds() % slide.toMilliseconds()); } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingTimeWindows.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingTimeWindows.java index 1bf2654d31965..41a5d534c26b9 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingTimeWindows.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingTimeWindows.java @@ -33,7 +33,7 @@ public class SlidingTimeWindows extends SlidingEventTimeWindows { private static final long serialVersionUID = 1L; private SlidingTimeWindows(long size, long slide) { - super(size, slide,0); + super(size, slide, 0); } /** diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingEventTimeWindows.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingEventTimeWindows.java index 1fd122617caff..b7fa34391b94b 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingEventTimeWindows.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingEventTimeWindows.java @@ -51,7 +51,7 @@ public class TumblingEventTimeWindows extends WindowAssigner private final long offset; - protected TumblingEventTimeWindows(long size,long offset){ + protected TumblingEventTimeWindows(long size, long offset){ this.size = size; this.offset = offset; } @@ -59,7 +59,7 @@ protected TumblingEventTimeWindows(long size,long offset){ public Collection assignWindows(Object element, long timestamp, WindowAssignerContext context) { if (timestamp > Long.MIN_VALUE) { // Long.MIN_VALUE is currently assigned when no timestamp is present - long start = TimeWindow.getWindowStartWithOffset(timestamp,offset,size); + long start = TimeWindow.getWindowStartWithOffset(timestamp, offset, size); return Collections.singletonList(new TimeWindow(start, start + size)); } else { throw new RuntimeException("Record has Long.MIN_VALUE timestamp (= no timestamp marker). " + @@ -90,7 +90,7 @@ public String toString() { * @return The time policy. */ public static TumblingEventTimeWindows of(Time size) { - return new TumblingEventTimeWindows(size.toMilliseconds(),0); + return new TumblingEventTimeWindows(size.toMilliseconds(), 0); } /** @@ -113,7 +113,7 @@ public static TumblingEventTimeWindows of(Time size) { * @return The time policy. */ public static TumblingEventTimeWindows of(Time size, Time offset) { - return new TumblingEventTimeWindows(size.toMilliseconds(),offset.toMilliseconds() % size.toMilliseconds()); + return new TumblingEventTimeWindows(size.toMilliseconds(), offset.toMilliseconds() % size.toMilliseconds()); } @Override diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingProcessingTimeWindows.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingProcessingTimeWindows.java index 58e40ce6b8d5b..f1e9e111bfff9 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingProcessingTimeWindows.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingProcessingTimeWindows.java @@ -57,7 +57,7 @@ private TumblingProcessingTimeWindows(long size,long offset) { @Override public Collection assignWindows(Object element, long timestamp, WindowAssignerContext context) { final long now = context.getCurrentProcessingTime(); - long start = TimeWindow.getWindowStartWithOffset(now,offset,size); + long start = TimeWindow.getWindowStartWithOffset(now, offset, size); return Collections.singletonList(new TimeWindow(start, start + size)); } @@ -83,7 +83,7 @@ public String toString() { * @return The time policy. */ public static TumblingProcessingTimeWindows of(Time size) { - return new TumblingProcessingTimeWindows(size.toMilliseconds(),0); + return new TumblingProcessingTimeWindows(size.toMilliseconds(), 0); } /** @@ -105,7 +105,7 @@ public static TumblingProcessingTimeWindows of(Time size) { * @param offset The offset which window start would be shifted by. * @return The time policy. */ - public static TumblingProcessingTimeWindows of(Time size,Time offset) { + public static TumblingProcessingTimeWindows of(Time size, Time offset) { return new TumblingProcessingTimeWindows(size.toMilliseconds(), offset.toMilliseconds() % size.toMilliseconds()); } @Override diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingTimeWindows.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingTimeWindows.java index a27efcd4dd690..589bce33ad2ff 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingTimeWindows.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingTimeWindows.java @@ -33,7 +33,7 @@ public class TumblingTimeWindows extends TumblingEventTimeWindows { private static final long serialVersionUID = 1L; private TumblingTimeWindows(long size) { - super(size,0); + super(size, 0); } /** diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TimeWindowTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TimeWindowTest.java new file mode 100644 index 0000000000000..9633671376b04 --- /dev/null +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TimeWindowTest.java @@ -0,0 +1,59 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.operators.windowing; + +import org.apache.flink.streaming.api.windowing.windows.TimeWindow; +import org.junit.Assert; +import org.junit.Test; + +import java.util.concurrent.TimeUnit; + +public class TimeWindowTest { + @Test + public void testGetWindowStartWithOffset() { + //[0,7),[7,14),[14,21)... + long offset = 0; + Assert.assertEquals(TimeWindow.getWindowStartWithOffset(1,offset,7),0); + Assert.assertEquals(TimeWindow.getWindowStartWithOffset(6,offset,7),0); + Assert.assertEquals(TimeWindow.getWindowStartWithOffset(7,offset,7),7); + Assert.assertEquals(TimeWindow.getWindowStartWithOffset(8,offset,7),7); + + //[-4,3),[3,10),[10,17)... + offset = 3; + Assert.assertEquals(TimeWindow.getWindowStartWithOffset(1,offset,7),-4); + Assert.assertEquals(TimeWindow.getWindowStartWithOffset(2,offset,7),-4); + Assert.assertEquals(TimeWindow.getWindowStartWithOffset(3,offset,7),3); + Assert.assertEquals(TimeWindow.getWindowStartWithOffset(9,offset,7),3); + Assert.assertEquals(TimeWindow.getWindowStartWithOffset(10,offset,7),10); + + //[-2,5),[5,12),[12,19)... + offset = -2; + Assert.assertEquals(TimeWindow.getWindowStartWithOffset(1,offset,7),-2); + Assert.assertEquals(TimeWindow.getWindowStartWithOffset(-2,offset,7),-2); + Assert.assertEquals(TimeWindow.getWindowStartWithOffset(3,offset,7),-2); + Assert.assertEquals(TimeWindow.getWindowStartWithOffset(4,offset,7),-2); + Assert.assertEquals(TimeWindow.getWindowStartWithOffset(7,offset,7),5); + Assert.assertEquals(TimeWindow.getWindowStartWithOffset(12,offset,7),12); + + // for GMT+8:00 + offset = - TimeUnit.HOURS.toMillis(8); + long size = TimeUnit.DAYS.toMillis(1); + Assert.assertEquals(TimeWindow.getWindowStartWithOffset(1470902048450l,offset,size),1470844800000l); + } +} diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java index dfb37fc364dad..90e45860f8421 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java @@ -66,6 +66,7 @@ import org.apache.flink.streaming.runtime.tasks.TestTimeServiceProvider; import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; import org.apache.flink.streaming.util.TestHarnessUtil; +import org.apache.flink.streaming.util.WindowingTestHarness; import org.apache.flink.util.Collector; import org.junit.Assert; import org.junit.Test; @@ -553,7 +554,7 @@ public void testSessionWindowsWithCountTrigger() throws Exception { new OneInputStreamOperatorTestHarness<>(operator); testHarness.configureForKeyedStream(new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); - + ConcurrentLinkedQueue expectedOutput = new ConcurrentLinkedQueue<>(); testHarness.open(); @@ -660,7 +661,7 @@ public void testPointSessions() throws Exception { new OneInputStreamOperatorTestHarness<>(operator); testHarness.configureForKeyedStream(new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); - + ConcurrentLinkedQueue expectedOutput = new ConcurrentLinkedQueue<>(); testHarness.open(); @@ -722,7 +723,7 @@ public void testContinuousWatermarkTrigger() throws Exception { new OneInputStreamOperatorTestHarness<>(operator); testHarness.configureForKeyedStream(new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); - + ConcurrentLinkedQueue expectedOutput = new ConcurrentLinkedQueue<>(); testHarness.open(); @@ -815,7 +816,7 @@ public void testCountTrigger() throws Exception { new OneInputStreamOperatorTestHarness<>(operator); testHarness.configureForKeyedStream(new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); - + ConcurrentLinkedQueue expectedOutput = new ConcurrentLinkedQueue<>(); testHarness.open(); @@ -1167,7 +1168,7 @@ public void testLateness() throws Exception { operator.setInputType(inputType, new ExecutionConfig()); testHarness.open(); - + ConcurrentLinkedQueue expected = new ConcurrentLinkedQueue<>(); testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 500)); @@ -1362,7 +1363,7 @@ public void testDropDueToLatenessSliding() throws Exception { operator.setInputType(inputType, new ExecutionConfig()); testHarness.open(); - + ConcurrentLinkedQueue expected = new ConcurrentLinkedQueue<>(); testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 1000)); @@ -1441,7 +1442,7 @@ public void testDropDueToLatenessSessionZeroLatenessPurgingTrigger() throws Exce testHarness.configureForKeyedStream(new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); testHarness.open(); - + ConcurrentLinkedQueue expected = new ConcurrentLinkedQueue<>(); testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 1000)); @@ -1535,7 +1536,7 @@ public void testDropDueToLatenessSessionZeroLateness() throws Exception { testHarness.configureForKeyedStream(new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); testHarness.open(); - + ConcurrentLinkedQueue expected = new ConcurrentLinkedQueue<>(); testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 1000)); @@ -1625,7 +1626,7 @@ public void testDropDueToLatenessSessionWithLatenessPurgingTrigger() throws Exce testHarness.open(); ConcurrentLinkedQueue expected = new ConcurrentLinkedQueue<>(); - + testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 1000)); testHarness.processWatermark(new Watermark(1999)); @@ -1711,7 +1712,7 @@ public void testDropDueToLatenessSessionWithLateness() throws Exception { testHarness.configureForKeyedStream(new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); testHarness.open(); - + ConcurrentLinkedQueue expected = new ConcurrentLinkedQueue<>(); testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 1000)); @@ -1808,7 +1809,7 @@ public void testDropDueToLatenessSessionWithHugeLatenessPurgingTrigger() throws testHarness.configureForKeyedStream(new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); testHarness.open(); - + ConcurrentLinkedQueue expected = new ConcurrentLinkedQueue<>(); testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 1000)); @@ -2552,34 +2553,175 @@ public String toString() { } @Test - public void testGetWindowStartWithOffset() { - //[0,7),[7,14),[14,21)... - long offset = 0; - Assert.assertEquals(TimeWindow.getWindowStartWithOffset(1,offset,7),0); - Assert.assertEquals(TimeWindow.getWindowStartWithOffset(6,offset,7),0); - Assert.assertEquals(TimeWindow.getWindowStartWithOffset(7,offset,7),7); - Assert.assertEquals(TimeWindow.getWindowStartWithOffset(8,offset,7),7); - - //[-4,3),[3,10),[10,17)... - offset = 3; - Assert.assertEquals(TimeWindow.getWindowStartWithOffset(1,offset,7),-4); - Assert.assertEquals(TimeWindow.getWindowStartWithOffset(2,offset,7),-4); - Assert.assertEquals(TimeWindow.getWindowStartWithOffset(3,offset,7),3); - Assert.assertEquals(TimeWindow.getWindowStartWithOffset(9,offset,7),3); - Assert.assertEquals(TimeWindow.getWindowStartWithOffset(10,offset,7),10); - - //[-2,5),[5,12),[12,19)... - offset = -2; - Assert.assertEquals(TimeWindow.getWindowStartWithOffset(1,offset,7),-2); - Assert.assertEquals(TimeWindow.getWindowStartWithOffset(-2,offset,7),-2); - Assert.assertEquals(TimeWindow.getWindowStartWithOffset(3,offset,7),-2); - Assert.assertEquals(TimeWindow.getWindowStartWithOffset(4,offset,7),-2); - Assert.assertEquals(TimeWindow.getWindowStartWithOffset(7,offset,7),5); - Assert.assertEquals(TimeWindow.getWindowStartWithOffset(12,offset,7),12); - - // for GMT+8:00 - offset = - TimeUnit.HOURS.toMillis(8); - long size = TimeUnit.DAYS.toMillis(1); - Assert.assertEquals(TimeWindow.getWindowStartWithOffset(1470902048450l,offset,size),1470844800000l); + public void testEventTimeTumblingWindowsWithOffset() throws Exception { + final int WINDOW_SIZE = 2000; + final int OFFSET = 100; + TypeInformation> inputType = TypeInfoParser.parse("Tuple2"); + + TumblingEventTimeWindows windowAssigner = TumblingEventTimeWindows.of(Time.milliseconds(WINDOW_SIZE),Time.milliseconds(OFFSET)); + + WindowingTestHarness, TimeWindow> testHarness = new WindowingTestHarness<>( + new ExecutionConfig(), + windowAssigner, + BasicTypeInfo.STRING_TYPE_INFO, + inputType, + new TupleKeySelector(), + EventTimeTrigger.create(), + 0); + + // normal element + testHarness.processElement(new Tuple2<>("key2", 1), 1000); + testHarness.processWatermark(1985); + + testHarness.addExpectedWatermark(1985); + + // this will not be dropped because window.maxTimestamp() + allowedLateness > currentWatermark + testHarness.processElement(new Tuple2<>("key2", 2), 1980); + + // dropped as late + testHarness.processElement(new Tuple2<>("key2", 3), 1998); + + testHarness.processElement(new Tuple2<>("key2", 4), 2001); + testHarness.processWatermark(2999); + + testHarness.addExpectedElement(new Tuple2<>("key2", 1), 1999 + OFFSET); + testHarness.addExpectedElement(new Tuple2<>("key2", 2), 1999 + OFFSET); + testHarness.addExpectedElement(new Tuple2<>("key2", 3), 1999 + OFFSET); + testHarness.addExpectedElement(new Tuple2<>("key2", 4), 1999 + OFFSET); + + testHarness.addExpectedWatermark(2999); + + testHarness.processWatermark(3999); + testHarness.addExpectedWatermark(3999); + + testHarness.compareActualToExpectedOutput("Output is not correct"); + + testHarness.close(); + } + + @Test + public void testEventTimeSlidingWindowsWithOffset() throws Exception { + final int WINDOW_SIZE = 2000; + final int SLIDE = 500; + final int OFFSET = 10; + TypeInformation> inputType = TypeInfoParser.parse("Tuple2"); + + SlidingEventTimeWindows windowAssigner = SlidingEventTimeWindows.of(Time.milliseconds(WINDOW_SIZE),Time.milliseconds(SLIDE),Time.milliseconds(OFFSET)); + + WindowingTestHarness, TimeWindow> testHarness = new WindowingTestHarness<>( + new ExecutionConfig(), + windowAssigner, + BasicTypeInfo.STRING_TYPE_INFO, + inputType, + new TupleKeySelector(), + EventTimeTrigger.create(), + 0); + + testHarness.processElement(new Tuple2<>("key2", 1), 333); + testHarness.processWatermark(6666); + + testHarness.addExpectedElement(new Tuple2<>("key2",1),499 + OFFSET); + testHarness.addExpectedElement(new Tuple2<>("key2",1),999 + OFFSET); + testHarness.addExpectedElement(new Tuple2<>("key2",1),1499 + OFFSET); + testHarness.addExpectedElement(new Tuple2<>("key2",1),1999 + OFFSET); + testHarness.addExpectedWatermark(6666); + testHarness.compareActualToExpectedOutput("Output is not correct"); + + testHarness.close(); + } + + @Test + public void testProcessingTimeTumblingWindowsWithOffset() throws Exception { + final int WINDOW_SIZE = 3000; + final int OFFSET = 1000; + + TypeInformation> inputType = TypeInfoParser.parse("Tuple2"); + + TumblingProcessingTimeWindows windowAssigner = TumblingProcessingTimeWindows.of(Time.milliseconds(WINDOW_SIZE), + Time.milliseconds(OFFSET)); + + WindowingTestHarness, TimeWindow> testHarness = new WindowingTestHarness<>( + new ExecutionConfig(), + windowAssigner, + BasicTypeInfo.STRING_TYPE_INFO, + inputType, + new TupleKeySelector(), + ProcessingTimeTrigger.create(), + 0); + + testHarness.setProcessingTime(3); + + // timestamp is ignored in processing time + testHarness.processElement(new Tuple2<>("key2", 1), Long.MAX_VALUE); + testHarness.processElement(new Tuple2<>("key2", 1), 7000); + testHarness.processElement(new Tuple2<>("key2", 1), 7000); + + testHarness.processElement(new Tuple2<>("key1", 1), 7000); + testHarness.processElement(new Tuple2<>("key1", 1), 7000); + + testHarness.setProcessingTime(5000); + + testHarness.addExpectedElement(new Tuple2<>("key2", 1), 999); + testHarness.addExpectedElement(new Tuple2<>("key2", 1), 999); + testHarness.addExpectedElement(new Tuple2<>("key2", 1), 999); + testHarness.addExpectedElement(new Tuple2<>("key1", 1), 999); + testHarness.addExpectedElement(new Tuple2<>("key1", 1), 999); + + testHarness.compareActualToExpectedOutput("Output was not correct."); + + testHarness.processElement(new Tuple2<>("key1", 1), 7000); + testHarness.processElement(new Tuple2<>("key1", 1), 7000); + testHarness.processElement(new Tuple2<>("key1", 1), 7000); + + testHarness.setProcessingTime(7000); + + testHarness.addExpectedElement(new Tuple2<>("key1", 1), 6999); + testHarness.addExpectedElement(new Tuple2<>("key1", 1), 6999); + testHarness.addExpectedElement(new Tuple2<>("key1", 1), 6999); + + testHarness.compareActualToExpectedOutput("Output was not correct."); + + testHarness.close(); + } + + @Test + public void testProcessingTimeSlidingWindowsWithOffset() throws Exception { + final int WINDOW_SIZE = 3000; + final int SLIDING = 1000; + final int OFFSET = 10; + + TypeInformation> inputType = TypeInfoParser.parse("Tuple2"); + + SlidingProcessingTimeWindows windowAssigner = SlidingProcessingTimeWindows.of(Time.milliseconds(WINDOW_SIZE), + Time.milliseconds(SLIDING),Time.milliseconds(OFFSET)); + + WindowingTestHarness, TimeWindow> testHarness = new WindowingTestHarness<>( + new ExecutionConfig(), + windowAssigner, + BasicTypeInfo.STRING_TYPE_INFO, + inputType, + new TupleKeySelector(), + ProcessingTimeTrigger.create(), + 0); + + testHarness.setProcessingTime(3); + + // timestamp is ignored in processing time + testHarness.processElement(new Tuple2<>("key2", 1), Long.MAX_VALUE); + + testHarness.setProcessingTime(1111); + + testHarness.addExpectedElement(new Tuple2<>("key2", 1), OFFSET - 1); + testHarness.addExpectedElement(new Tuple2<>("key2", 1), OFFSET + 999); + + testHarness.processElement(new Tuple2<>("key2", 2),Long.MIN_VALUE); + testHarness.setProcessingTime(2222); + + testHarness.addExpectedElement(new Tuple2<>("key2", 1), OFFSET + 1999); + testHarness.addExpectedElement(new Tuple2<>("key2", 2), OFFSET + 1999); + + testHarness.compareActualToExpectedOutput("Output was not correct."); + + testHarness.close(); } } diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowingTestHarnessTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowingTestHarnessTest.java index b9ea09f2075c8..7242e1ca30663 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowingTestHarnessTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowingTestHarnessTest.java @@ -24,8 +24,6 @@ import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.typeutils.TypeInfoParser; -import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows; -import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows; import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; @@ -85,84 +83,6 @@ public void testEventTimeTumblingWindows() throws Exception { testHarness.close(); } - @Test - public void testEventTimeTumblingWindowsWithOffset() throws Exception { - final int WINDOW_SIZE = 2000; - final int OFFSET = 100; - TypeInformation> inputType = TypeInfoParser.parse("Tuple2"); - - TumblingEventTimeWindows windowAssigner = TumblingEventTimeWindows.of(Time.milliseconds(WINDOW_SIZE),Time.milliseconds(OFFSET)); - - WindowingTestHarness, TimeWindow> testHarness = new WindowingTestHarness<>( - new ExecutionConfig(), - windowAssigner, - BasicTypeInfo.STRING_TYPE_INFO, - inputType, - new TupleKeySelector(), - EventTimeTrigger.create(), - 0); - - // normal element - testHarness.processElement(new Tuple2<>("key2", 1), 1000); - testHarness.processWatermark(1985); - - testHarness.addExpectedWatermark(1985); - - // this will not be dropped because window.maxTimestamp() + allowedLateness > currentWatermark - testHarness.processElement(new Tuple2<>("key2", 2), 1980); - - // dropped as late - testHarness.processElement(new Tuple2<>("key2", 3), 1998); - - testHarness.processElement(new Tuple2<>("key2", 4), 2001); - testHarness.processWatermark(2999); - - testHarness.addExpectedElement(new Tuple2<>("key2", 1), 1999 + OFFSET); - testHarness.addExpectedElement(new Tuple2<>("key2", 2), 1999 + OFFSET); - testHarness.addExpectedElement(new Tuple2<>("key2", 3), 1999 + OFFSET); - testHarness.addExpectedElement(new Tuple2<>("key2", 4), 1999 + OFFSET); - - testHarness.addExpectedWatermark(2999); - - testHarness.processWatermark(3999); - testHarness.addExpectedWatermark(3999); - - testHarness.compareActualToExpectedOutput("Output is not correct"); - - testHarness.close(); - } - - @Test - public void testEventTimeSlidingWindowsWithOffset() throws Exception { - final int WINDOW_SIZE = 2000; - final int SLIDE = 500; - final int OFFSET = 10; - TypeInformation> inputType = TypeInfoParser.parse("Tuple2"); - - SlidingEventTimeWindows windowAssigner = SlidingEventTimeWindows.of(Time.milliseconds(WINDOW_SIZE),Time.milliseconds(SLIDE),Time.milliseconds(OFFSET)); - - WindowingTestHarness, TimeWindow> testHarness = new WindowingTestHarness<>( - new ExecutionConfig(), - windowAssigner, - BasicTypeInfo.STRING_TYPE_INFO, - inputType, - new TupleKeySelector(), - EventTimeTrigger.create(), - 0); - - testHarness.processElement(new Tuple2<>("key2", 1), 333); - testHarness.processWatermark(6666); - - testHarness.addExpectedElement(new Tuple2<>("key2",1),499 + OFFSET); - testHarness.addExpectedElement(new Tuple2<>("key2",1),999 + OFFSET); - testHarness.addExpectedElement(new Tuple2<>("key2",1),1499 + OFFSET); - testHarness.addExpectedElement(new Tuple2<>("key2",1),1999 + OFFSET); - testHarness.addExpectedWatermark(6666); - testHarness.compareActualToExpectedOutput("Output is not correct"); - - testHarness.close(); - } - @Test public void testProcessingTimeTumblingWindows() throws Exception { final int WINDOW_SIZE = 3000; @@ -210,100 +130,6 @@ public void testProcessingTimeTumblingWindows() throws Exception { testHarness.addExpectedElement(new Tuple2<>("key1", 1), 5999); testHarness.addExpectedElement(new Tuple2<>("key1", 1), 5999); - testHarness.compareActualToExpectedOutput("Output was not correct."); - testHarness.close(); - } - - @Test - public void testProcessingTimeTumblingWindowsWithOffset() throws Exception { - final int WINDOW_SIZE = 3000; - final int OFFSET = 1000; - - TypeInformation> inputType = TypeInfoParser.parse("Tuple2"); - - TumblingProcessingTimeWindows windowAssigner = TumblingProcessingTimeWindows.of(Time.milliseconds(WINDOW_SIZE), - Time.milliseconds(OFFSET)); - - WindowingTestHarness, TimeWindow> testHarness = new WindowingTestHarness<>( - new ExecutionConfig(), - windowAssigner, - BasicTypeInfo.STRING_TYPE_INFO, - inputType, - new TupleKeySelector(), - ProcessingTimeTrigger.create(), - 0); - - testHarness.setProcessingTime(3); - - // timestamp is ignored in processing time - testHarness.processElement(new Tuple2<>("key2", 1), Long.MAX_VALUE); - testHarness.processElement(new Tuple2<>("key2", 1), 7000); - testHarness.processElement(new Tuple2<>("key2", 1), 7000); - - testHarness.processElement(new Tuple2<>("key1", 1), 7000); - testHarness.processElement(new Tuple2<>("key1", 1), 7000); - - testHarness.setProcessingTime(5000); - - testHarness.addExpectedElement(new Tuple2<>("key2", 1), 999); - testHarness.addExpectedElement(new Tuple2<>("key2", 1), 999); - testHarness.addExpectedElement(new Tuple2<>("key2", 1), 999); - testHarness.addExpectedElement(new Tuple2<>("key1", 1), 999); - testHarness.addExpectedElement(new Tuple2<>("key1", 1), 999); - - testHarness.compareActualToExpectedOutput("Output was not correct."); - - testHarness.processElement(new Tuple2<>("key1", 1), 7000); - testHarness.processElement(new Tuple2<>("key1", 1), 7000); - testHarness.processElement(new Tuple2<>("key1", 1), 7000); - - testHarness.setProcessingTime(7000); - - testHarness.addExpectedElement(new Tuple2<>("key1", 1), 6999); - testHarness.addExpectedElement(new Tuple2<>("key1", 1), 6999); - testHarness.addExpectedElement(new Tuple2<>("key1", 1), 6999); - - testHarness.compareActualToExpectedOutput("Output was not correct."); - - testHarness.close(); - } - - @Test - public void testProcessingTimeSlidingWindowsWithOffset() throws Exception { - final int WINDOW_SIZE = 3000; - final int SLIDING = 1000; - final int OFFSET = 10; - - TypeInformation> inputType = TypeInfoParser.parse("Tuple2"); - - SlidingProcessingTimeWindows windowAssigner = SlidingProcessingTimeWindows.of(Time.milliseconds(WINDOW_SIZE), - Time.milliseconds(SLIDING),Time.milliseconds(OFFSET)); - - WindowingTestHarness, TimeWindow> testHarness = new WindowingTestHarness<>( - new ExecutionConfig(), - windowAssigner, - BasicTypeInfo.STRING_TYPE_INFO, - inputType, - new TupleKeySelector(), - ProcessingTimeTrigger.create(), - 0); - - testHarness.setProcessingTime(3); - - // timestamp is ignored in processing time - testHarness.processElement(new Tuple2<>("key2", 1), Long.MAX_VALUE); - - testHarness.setProcessingTime(1111); - - testHarness.addExpectedElement(new Tuple2<>("key2", 1), OFFSET - 1); - testHarness.addExpectedElement(new Tuple2<>("key2", 1), OFFSET + 999); - - testHarness.processElement(new Tuple2<>("key2", 2),Long.MIN_VALUE); - testHarness.setProcessingTime(2222); - - testHarness.addExpectedElement(new Tuple2<>("key2", 1), OFFSET + 1999); - testHarness.addExpectedElement(new Tuple2<>("key2", 2), OFFSET + 1999); - testHarness.compareActualToExpectedOutput("Output was not correct."); testHarness.close();