From 5d461d8217c1ee305f59da240ccb0fccebe2420d Mon Sep 17 00:00:00 2001 From: kl0u Date: Tue, 20 Dec 2016 13:51:45 +0100 Subject: [PATCH 1/2] [FLINK-5296] Expose the old AlignedWindowOperators through special assigners The user can use the deprecated AccumulatingProcessingTimeWindowOperator and AggregatingProcessingTimeWindowOperator by using the TumblingAlignedProcessingTimeWindows and the SlidingAlignedProcessingTimeWindows introduced by this commit. These operators are neither backwards compatibility nor rescalable. --- .../api/datastream/WindowedStream.java | 106 ++++++++++++++++++ .../assigners/BaseAlignedWindowAssigner.java | 71 ++++++++++++ .../SlidingAlignedProcessingTimeWindows.java | 61 ++++++++++ .../TumblingAlignedProcessingTimeWindows.java | 53 +++++++++ .../TumblingProcessingTimeWindows.java | 1 + .../operators/windowing/WindowOperator.java | 6 + .../windowing/TimeWindowTranslationTest.java | 45 +++++++- .../api/scala/TimeWindowTranslationTest.scala | 9 +- .../api/scala/WindowTranslationTest.scala | 40 +++++++ 9 files changed, 383 insertions(+), 9 deletions(-) create mode 100644 flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/BaseAlignedWindowAssigner.java create mode 100644 flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingAlignedProcessingTimeWindows.java create mode 100644 flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingAlignedProcessingTimeWindows.java diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java index e2f930a524137..704875b197001 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java @@ -41,15 +41,21 @@ import org.apache.flink.streaming.api.functions.windowing.ReduceApplyWindowFunction; import org.apache.flink.streaming.api.functions.windowing.WindowFunction; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.api.windowing.assigners.BaseAlignedWindowAssigner; import org.apache.flink.streaming.api.windowing.assigners.MergingWindowAssigner; +import org.apache.flink.streaming.api.windowing.assigners.SlidingAlignedProcessingTimeWindows; import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows; +import org.apache.flink.streaming.api.windowing.assigners.TumblingAlignedProcessingTimeWindows; import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows; import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner; import org.apache.flink.streaming.api.windowing.evictors.Evictor; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.triggers.ProcessingTimeTrigger; import org.apache.flink.streaming.api.windowing.triggers.Trigger; +import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.streaming.api.windowing.windows.Window; +import org.apache.flink.streaming.runtime.operators.windowing.AccumulatingProcessingTimeWindowOperator; +import org.apache.flink.streaming.runtime.operators.windowing.AggregatingProcessingTimeWindowOperator; import org.apache.flink.streaming.runtime.operators.windowing.EvictingWindowOperator; import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction; import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalSingleValueWindowFunction; @@ -117,6 +123,10 @@ public WindowedStream trigger(Trigger trigger) { throw new UnsupportedOperationException("A merging window assigner cannot be used with a trigger that does not support merging."); } + if (windowAssigner instanceof BaseAlignedWindowAssigner) { + throw new UnsupportedOperationException("Cannot use a " + windowAssigner.getClass().getSimpleName() + " with a custom trigger."); + } + this.trigger = trigger; return this; } @@ -153,6 +163,10 @@ public WindowedStream evictor(Evictor evictor) { if (windowAssigner instanceof MergingWindowAssigner) { throw new UnsupportedOperationException("Cannot use a merging WindowAssigner with an Evictor."); } + + if (windowAssigner instanceof BaseAlignedWindowAssigner) { + throw new UnsupportedOperationException("Cannot use a " + windowAssigner.getClass().getSimpleName() + " with an Evictor."); + } this.evictor = evictor; return this; } @@ -187,6 +201,15 @@ public SingleOutputStreamOperator reduce(ReduceFunction function) { //clean the closure function = input.getExecutionEnvironment().clean(function); + + String callLocation = Utils.getCallLocationName(); + String udfName = "WindowedStream." + callLocation; + + SingleOutputStreamOperator result = createFastTimeOperatorIfValid(function, input.getType(), udfName); + if (result != null) { + return result; + } + LegacyWindowOperatorType legacyOpType = getLegacyWindowType(function); return reduce(function, new PassThroughWindowFunction(), legacyOpType); } @@ -421,6 +444,11 @@ public SingleOutputStreamOperator fold(ACC initialValue, throw new UnsupportedOperationException("Fold cannot be used with a merging WindowAssigner."); } + if (windowAssigner instanceof BaseAlignedWindowAssigner) { + throw new UnsupportedOperationException("Fold cannot be used with a " + + windowAssigner.getClass().getSimpleName() + " assigner."); + } + //clean the closures function = input.getExecutionEnvironment().clean(function); foldFunction = input.getExecutionEnvironment().clean(foldFunction); @@ -512,6 +540,11 @@ public SingleOutputStreamOperator apply(WindowFunction functi String callLocation = Utils.getCallLocationName(); String udfName = "WindowedStream." + callLocation; + SingleOutputStreamOperator result = createFastTimeOperatorIfValid(function, resultType, udfName); + if (result != null) { + return result; + } + LegacyWindowOperatorType legacyWindowOpType = getLegacyWindowType(function); String opName; KeySelector keySel = input.getKeySelector(); @@ -977,6 +1010,79 @@ private LegacyWindowOperatorType getLegacyWindowType(Function function) { return LegacyWindowOperatorType.NONE; } + private SingleOutputStreamOperator createFastTimeOperatorIfValid( + Function function, + TypeInformation resultType, + String functionName) { + + if (windowAssigner instanceof SlidingAlignedProcessingTimeWindows && trigger == null && evictor == null) { + SlidingAlignedProcessingTimeWindows timeWindows = (SlidingAlignedProcessingTimeWindows) windowAssigner; + final long windowLength = timeWindows.getSize(); + final long windowSlide = timeWindows.getSlide(); + + String opName = "Fast " + timeWindows + " of " + functionName; + + if (function instanceof ReduceFunction) { + @SuppressWarnings("unchecked") + ReduceFunction reducer = (ReduceFunction) function; + + @SuppressWarnings("unchecked") + OneInputStreamOperator op = (OneInputStreamOperator) + new AggregatingProcessingTimeWindowOperator<>( + reducer, input.getKeySelector(), + input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()), + input.getType().createSerializer(getExecutionEnvironment().getConfig()), + windowLength, windowSlide); + return input.transform(opName, resultType, op); + } + else if (function instanceof WindowFunction) { + @SuppressWarnings("unchecked") + WindowFunction wf = (WindowFunction) function; + + OneInputStreamOperator op = new AccumulatingProcessingTimeWindowOperator<>( + wf, input.getKeySelector(), + input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()), + input.getType().createSerializer(getExecutionEnvironment().getConfig()), + windowLength, windowSlide); + return input.transform(opName, resultType, op); + } + } else if (windowAssigner instanceof TumblingAlignedProcessingTimeWindows && trigger == null && evictor == null) { + TumblingAlignedProcessingTimeWindows timeWindows = (TumblingAlignedProcessingTimeWindows) windowAssigner; + final long windowLength = timeWindows.getSize(); + final long windowSlide = timeWindows.getSize(); + + String opName = "Fast " + timeWindows + " of " + functionName; + + if (function instanceof ReduceFunction) { + @SuppressWarnings("unchecked") + ReduceFunction reducer = (ReduceFunction) function; + + @SuppressWarnings("unchecked") + OneInputStreamOperator op = (OneInputStreamOperator) + new AggregatingProcessingTimeWindowOperator<>( + reducer, + input.getKeySelector(), + input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()), + input.getType().createSerializer(getExecutionEnvironment().getConfig()), + windowLength, windowSlide); + return input.transform(opName, resultType, op); + } + else if (function instanceof WindowFunction) { + @SuppressWarnings("unchecked") + WindowFunction wf = (WindowFunction) function; + + OneInputStreamOperator op = new AccumulatingProcessingTimeWindowOperator<>( + wf, input.getKeySelector(), + input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()), + input.getType().createSerializer(getExecutionEnvironment().getConfig()), + windowLength, windowSlide); + return input.transform(opName, resultType, op); + } + } + + return null; + } + public StreamExecutionEnvironment getExecutionEnvironment() { return input.getExecutionEnvironment(); } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/BaseAlignedWindowAssigner.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/BaseAlignedWindowAssigner.java new file mode 100644 index 0000000000000..d419f65d7a3d3 --- /dev/null +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/BaseAlignedWindowAssigner.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.api.windowing.assigners; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.windowing.triggers.Trigger; +import org.apache.flink.streaming.api.windowing.windows.TimeWindow; + +import java.util.Collection; + +/** + * A base {@link WindowAssigner} used to instantiate one of the deprecated + * {@link org.apache.flink.streaming.runtime.operators.windowing.AccumulatingProcessingTimeWindowOperator + * AccumulatingProcessingTimeWindowOperator} and + * {@link org.apache.flink.streaming.runtime.operators.windowing.AggregatingProcessingTimeWindowOperator + * AggregatingProcessingTimeWindowOperator}. + *

+ * For assigner that extend this one, the user can check the {@link TumblingAlignedProcessingTimeWindows} + * and the {@link SlidingAlignedProcessingTimeWindows}. + * */ +public class BaseAlignedWindowAssigner extends WindowAssigner { + + private static final long serialVersionUID = -6214980179706960234L; + + private final long size; + + protected BaseAlignedWindowAssigner(long size) { + this.size = size; + } + + public long getSize() { + return size; + } + + @Override + public Collection assignWindows(Object element, long timestamp, WindowAssignerContext context) { + throw new UnsupportedOperationException("This assigner should not be used with the WindowOperator."); + } + + @Override + public Trigger getDefaultTrigger(StreamExecutionEnvironment env) { + return null; + } + + @Override + public TypeSerializer getWindowSerializer(ExecutionConfig executionConfig) { + throw new UnsupportedOperationException("This assigner should not be used with the WindowOperator."); + } + + @Override + public boolean isEventTime() { + throw new UnsupportedOperationException("This assigner should not be used with the WindowOperator."); + } +} diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingAlignedProcessingTimeWindows.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingAlignedProcessingTimeWindows.java new file mode 100644 index 0000000000000..344bc64ef4f7d --- /dev/null +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingAlignedProcessingTimeWindows.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.windowing.assigners; + +import org.apache.flink.streaming.api.windowing.time.Time; + +/** + * A processing time sliding {@link WindowAssigner window assigner} used to perform windowing using the + * {@link org.apache.flink.streaming.runtime.operators.windowing.AccumulatingProcessingTimeWindowOperator + * AccumulatingProcessingTimeWindowOperator} and the + * {@link org.apache.flink.streaming.runtime.operators.windowing.AggregatingProcessingTimeWindowOperator + * AggregatingProcessingTimeWindowOperator}. + *

+ * With this assigner, the {@code trigger} used is a + * {@link org.apache.flink.streaming.api.windowing.triggers.ProcessingTimeTrigger + * ProcessingTimeTrigger} and no {@code evictor} can be specified. + *

+ * WARNING: Bear in mind that no rescaling and no backwards compatibility is supported. + * */ +public class SlidingAlignedProcessingTimeWindows extends BaseAlignedWindowAssigner { + + private static final long serialVersionUID = 3695562702662473688L; + + private final long slide; + + public SlidingAlignedProcessingTimeWindows(long size, long slide) { + super(size); + this.slide = slide; + } + + public long getSlide() { + return slide; + } + + /** + * Creates a new {@code SlidingAlignedProcessingTimeWindows} {@link WindowAssigner} that assigns + * elements to sliding time windows based on the element timestamp. + * + * @param size The size of the generated windows. + * @param slide The slide interval of the generated windows. + */ + public static SlidingAlignedProcessingTimeWindows of(Time size, Time slide) { + return new SlidingAlignedProcessingTimeWindows(size.toMilliseconds(), slide.toMilliseconds()); + } +} diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingAlignedProcessingTimeWindows.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingAlignedProcessingTimeWindows.java new file mode 100644 index 0000000000000..ea2035f000197 --- /dev/null +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingAlignedProcessingTimeWindows.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.windowing.assigners; + +import org.apache.flink.streaming.api.windowing.time.Time; + +/** + * A processing time tumbling {@link WindowAssigner window assigner} used to perform windowing using the + * {@link org.apache.flink.streaming.runtime.operators.windowing.AccumulatingProcessingTimeWindowOperator + * AccumulatingProcessingTimeWindowOperator} and the + * {@link org.apache.flink.streaming.runtime.operators.windowing.AggregatingProcessingTimeWindowOperator + * AggregatingProcessingTimeWindowOperator}. + *

+ * With this assigner, the {@code trigger} used is a + * {@link org.apache.flink.streaming.api.windowing.triggers.ProcessingTimeTrigger + * ProcessingTimeTrigger} and no {@code evictor} can be specified. + *

+ * WARNING: Bear in mind that no rescaling and no backwards compatibility is supported. + * */ +public class TumblingAlignedProcessingTimeWindows extends BaseAlignedWindowAssigner { + + private static final long serialVersionUID = -6217477609512299842L; + + protected TumblingAlignedProcessingTimeWindows(long size) { + super(size); + } + + /** + * Creates a new {@code TumblingAlignedProcessingTimeWindows} {@link WindowAssigner} that assigns + * elements to time windows based on the element timestamp. + * + * @param size The size of the generated windows. + */ + public static TumblingAlignedProcessingTimeWindows of(Time size) { + return new TumblingAlignedProcessingTimeWindows(size.toMilliseconds()); + } +} diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingProcessingTimeWindows.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingProcessingTimeWindows.java index f1e9e111bfff9..f4fb620a5f3f3 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingProcessingTimeWindows.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingProcessingTimeWindows.java @@ -108,6 +108,7 @@ public static TumblingProcessingTimeWindows of(Time size) { public static TumblingProcessingTimeWindows of(Time size, Time offset) { return new TumblingProcessingTimeWindows(size.toMilliseconds(), offset.toMilliseconds() % size.toMilliseconds()); } + @Override public TypeSerializer getWindowSerializer(ExecutionConfig executionConfig) { return new TimeWindow.Serializer(); diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java index 990162e92a8c4..628d66319ccde 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java @@ -50,6 +50,7 @@ import org.apache.flink.streaming.api.operators.InternalTimerService; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; import org.apache.flink.streaming.api.operators.TimestampedCollector; +import org.apache.flink.streaming.api.windowing.assigners.BaseAlignedWindowAssigner; import org.apache.flink.streaming.api.windowing.assigners.MergingWindowAssigner; import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows; import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows; @@ -214,6 +215,11 @@ public WindowOperator( super(windowFunction); + checkArgument(!(windowAssigner instanceof BaseAlignedWindowAssigner), + "The " + windowAssigner.getClass().getSimpleName() + " cannot be used with a WindowOperator. " + + "This assigner is only used with the AccumulatingProcessingTimeWindowOperator and " + + "the AggregatingProcessingTimeWindowOperator"); + checkArgument(allowedLateness >= 0); checkArgument(windowStateDescriptor == null || windowStateDescriptor.isSerializerInitialized(), diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TimeWindowTranslationTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TimeWindowTranslationTest.java index 8e37021d7e5d7..eb5b9f3dd2473 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TimeWindowTranslationTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TimeWindowTranslationTest.java @@ -32,7 +32,9 @@ import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; import org.apache.flink.streaming.api.transformations.OneInputTransformation; +import org.apache.flink.streaming.api.windowing.assigners.SlidingAlignedProcessingTimeWindows; import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows; +import org.apache.flink.streaming.api.windowing.assigners.TumblingAlignedProcessingTimeWindows; import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger; @@ -51,12 +53,48 @@ */ public class TimeWindowTranslationTest { + @Test + public void testTimeWindows() throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + DataStream> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2)); + + DummyReducer reducer = new DummyReducer(); + + DataStream> window1 = source + .keyBy(0) + .timeWindow(Time.of(1000, TimeUnit.MILLISECONDS), Time.of(100, TimeUnit.MILLISECONDS)) + .reduce(reducer); + + OneInputTransformation, Tuple2> transform1 = (OneInputTransformation, Tuple2>) window1.getTransformation(); + OneInputStreamOperator, Tuple2> operator1 = transform1.getOperator(); + Assert.assertTrue(operator1 instanceof WindowOperator); + + DataStream> window2 = source + .keyBy(0) + .timeWindow(Time.of(1000, TimeUnit.MILLISECONDS)) + .apply(new WindowFunction, Tuple2, Tuple, TimeWindow>() { + private static final long serialVersionUID = 1L; + + @Override + public void apply(Tuple tuple, + TimeWindow window, + Iterable> values, + Collector> out) throws Exception { + + } + }); + + OneInputTransformation, Tuple2> transform2 = (OneInputTransformation, Tuple2>) window2.getTransformation(); + OneInputStreamOperator, Tuple2> operator2 = transform2.getOperator(); + Assert.assertTrue(operator2 instanceof WindowOperator); + } + /** * These tests ensure that the fast aligned time windows operator is used if the * conditions are right. */ @Test - @Ignore public void testReduceFastTimeWindows() throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime); @@ -67,7 +105,7 @@ public void testReduceFastTimeWindows() throws Exception { DataStream> window1 = source .keyBy(0) - .timeWindow(Time.of(1000, TimeUnit.MILLISECONDS), Time.of(100, TimeUnit.MILLISECONDS)) + .window(SlidingAlignedProcessingTimeWindows.of(Time.of(1000, TimeUnit.MILLISECONDS), Time.of(100, TimeUnit.MILLISECONDS))) .reduce(reducer); OneInputTransformation, Tuple2> transform1 = (OneInputTransformation, Tuple2>) window1.getTransformation(); @@ -80,7 +118,6 @@ public void testReduceFastTimeWindows() throws Exception { * conditions are right. */ @Test - @Ignore public void testApplyFastTimeWindows() throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime); @@ -89,7 +126,7 @@ public void testApplyFastTimeWindows() throws Exception { DataStream> window1 = source .keyBy(0) - .timeWindow(Time.of(1000, TimeUnit.MILLISECONDS)) + .window(TumblingAlignedProcessingTimeWindows.of(Time.of(1000, TimeUnit.MILLISECONDS))) .apply(new WindowFunction, Tuple2, Tuple, TimeWindow>() { private static final long serialVersionUID = 1L; diff --git a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/TimeWindowTranslationTest.scala b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/TimeWindowTranslationTest.scala index ff976563760a0..92bd4e6caae59 100644 --- a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/TimeWindowTranslationTest.scala +++ b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/TimeWindowTranslationTest.scala @@ -25,7 +25,7 @@ import org.apache.flink.api.java.tuple.Tuple import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.scala.function.WindowFunction import org.apache.flink.streaming.api.transformations.OneInputTransformation -import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows +import org.apache.flink.streaming.api.windowing.assigners.{SlidingAlignedProcessingTimeWindows, SlidingEventTimeWindows, TumblingAlignedProcessingTimeWindows} import org.apache.flink.streaming.api.windowing.time.Time import org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger import org.apache.flink.streaming.api.windowing.windows.TimeWindow @@ -46,7 +46,6 @@ class TimeWindowTranslationTest extends StreamingMultipleProgramsTestBase { * conditions are right. */ @Test - @Ignore def testReduceFastTimeWindows(): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment @@ -54,7 +53,8 @@ class TimeWindowTranslationTest extends StreamingMultipleProgramsTestBase { val window1 = source .keyBy(0) - .timeWindow(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS)) + .window(SlidingAlignedProcessingTimeWindows.of( + Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS))) .reduce(new DummyReducer()) val transform1 = window1.javaStream.getTransformation @@ -70,7 +70,6 @@ class TimeWindowTranslationTest extends StreamingMultipleProgramsTestBase { * conditions are right. */ @Test - @Ignore def testApplyFastTimeWindows(): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime) @@ -79,7 +78,7 @@ class TimeWindowTranslationTest extends StreamingMultipleProgramsTestBase { val window1 = source .keyBy(0) - .timeWindow(Time.minutes(1)) + .window(TumblingAlignedProcessingTimeWindows.of(Time.minutes(1))) .apply(new WindowFunction[(String, Int), (String, Int), Tuple, TimeWindow]() { def apply( key: Tuple, diff --git a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowTranslationTest.scala b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowTranslationTest.scala index 7235b22ce59c7..7ec882b831e45 100644 --- a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowTranslationTest.scala +++ b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowTranslationTest.scala @@ -23,6 +23,7 @@ import org.apache.flink.api.common.functions.{FoldFunction, ReduceFunction, Rich import org.apache.flink.api.common.state.{FoldingStateDescriptor, ListStateDescriptor, ReducingStateDescriptor} import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation} import org.apache.flink.api.java.functions.KeySelector +import org.apache.flink.api.java.tuple.Tuple import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.operators.OneInputStreamOperator import org.apache.flink.streaming.api.scala.function.WindowFunction @@ -48,6 +49,45 @@ import org.junit.Test */ class WindowTranslationTest { + @Test + def testTimeWindows(): Unit = { + val env = StreamExecutionEnvironment.getExecutionEnvironment + + val source = env.fromElements(("hello", 1), ("hello", 2)) + + val reducer = new DummyReducer + + val window1 = source + .keyBy(0) + .timeWindow(Time.seconds(1), Time.milliseconds(100)) + .reduce(reducer) + + val transform1 = window1.javaStream.getTransformation + .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]] + + val operator1 = transform1.getOperator + + assertTrue(operator1.isInstanceOf[WindowOperator[_, _, _, _, _]]) + + val window2 = source + .keyBy(0) + .timeWindow(Time.minutes(1)) + .apply(new WindowFunction[(String, Int), (String, Int), Tuple, TimeWindow]() { + def apply( + key: Tuple, + window: TimeWindow, + values: Iterable[(String, Int)], + out: Collector[(String, Int)]) { } + }) + + val transform2 = window2.javaStream.getTransformation + .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]] + + val operator2 = transform2.getOperator + + assertTrue(operator2.isInstanceOf[WindowOperator[_, _, _, _, _]]) + } + /** * .reduce() does not support [[RichReduceFunction]], since the reduce function is used * internally in a [[org.apache.flink.api.common.state.ReducingState]]. From fcda005a8919e3da75914fdfdd1719c717360c4b Mon Sep 17 00:00:00 2001 From: kl0u Date: Wed, 11 Jan 2017 18:21:30 +0100 Subject: [PATCH 2/2] integrated comments --- .../windowing/TimeWindowTranslationTest.java | 7 ++- .../api/scala/TimeWindowTranslationTest.scala | 44 +++++++++++++++++++ .../api/scala/WindowTranslationTest.scala | 39 ---------------- 3 files changed, 50 insertions(+), 40 deletions(-) diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TimeWindowTranslationTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TimeWindowTranslationTest.java index eb5b9f3dd2473..e0bc0d541a761 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TimeWindowTranslationTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TimeWindowTranslationTest.java @@ -53,9 +53,14 @@ */ public class TimeWindowTranslationTest { + /** + * Verifies that calls to timeWindow() in Flink 1.2 instantiate a regular + * windowOperator instead of an aligned one. + */ @Test - public void testTimeWindows() throws Exception { + public void testAlignedWindowDeprecation() throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime); DataStream> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2)); diff --git a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/TimeWindowTranslationTest.scala b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/TimeWindowTranslationTest.scala index 92bd4e6caae59..b94b7e7fbd30b 100644 --- a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/TimeWindowTranslationTest.scala +++ b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/TimeWindowTranslationTest.scala @@ -41,6 +41,50 @@ import org.junit.{Ignore, Test} */ class TimeWindowTranslationTest extends StreamingMultipleProgramsTestBase { + /** + * Verifies that calls to timeWindow() in Flink 1.2 instantiate a regular + * windowOperator instead of an aligned one. + */ + @Test + def testAlignedWindowDeprecation(): Unit = { + val env = StreamExecutionEnvironment.getExecutionEnvironment + env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime) + + val source = env.fromElements(("hello", 1), ("hello", 2)) + + val reducer = new DummyReducer + + val window1 = source + .keyBy(0) + .timeWindow(Time.seconds(1), Time.milliseconds(100)) + .reduce(reducer) + + val transform1 = window1.javaStream.getTransformation + .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]] + + val operator1 = transform1.getOperator + + assertTrue(operator1.isInstanceOf[WindowOperator[_, _, _, _, _]]) + + val window2 = source + .keyBy(0) + .timeWindow(Time.minutes(1)) + .apply(new WindowFunction[(String, Int), (String, Int), Tuple, TimeWindow]() { + def apply( + key: Tuple, + window: TimeWindow, + values: Iterable[(String, Int)], + out: Collector[(String, Int)]) { } + }) + + val transform2 = window2.javaStream.getTransformation + .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]] + + val operator2 = transform2.getOperator + + assertTrue(operator2.isInstanceOf[WindowOperator[_, _, _, _, _]]) + } + /** * These tests ensure that the fast aligned time windows operator is used if the * conditions are right. diff --git a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowTranslationTest.scala b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowTranslationTest.scala index 7ec882b831e45..4b44fa8dc3a8f 100644 --- a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowTranslationTest.scala +++ b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowTranslationTest.scala @@ -49,45 +49,6 @@ import org.junit.Test */ class WindowTranslationTest { - @Test - def testTimeWindows(): Unit = { - val env = StreamExecutionEnvironment.getExecutionEnvironment - - val source = env.fromElements(("hello", 1), ("hello", 2)) - - val reducer = new DummyReducer - - val window1 = source - .keyBy(0) - .timeWindow(Time.seconds(1), Time.milliseconds(100)) - .reduce(reducer) - - val transform1 = window1.javaStream.getTransformation - .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]] - - val operator1 = transform1.getOperator - - assertTrue(operator1.isInstanceOf[WindowOperator[_, _, _, _, _]]) - - val window2 = source - .keyBy(0) - .timeWindow(Time.minutes(1)) - .apply(new WindowFunction[(String, Int), (String, Int), Tuple, TimeWindow]() { - def apply( - key: Tuple, - window: TimeWindow, - values: Iterable[(String, Int)], - out: Collector[(String, Int)]) { } - }) - - val transform2 = window2.javaStream.getTransformation - .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]] - - val operator2 = transform2.getOperator - - assertTrue(operator2.isInstanceOf[WindowOperator[_, _, _, _, _]]) - } - /** * .reduce() does not support [[RichReduceFunction]], since the reduce function is used * internally in a [[org.apache.flink.api.common.state.ReducingState]].