Permalink
Browse files

[FLINK-5237] Consolidate and harmonize Window Translation Tests

  • Loading branch information...
1 parent 8c8c028 commit aa220e487db88079c224e50190055cef41df3f9a @aljoscha aljoscha committed Nov 24, 2016
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -17,7 +17,9 @@
*/
package org.apache.flink.streaming.runtime.operators.windowing;
+import org.apache.flink.api.common.functions.FoldFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.state.FoldingStateDescriptor;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.ReducingStateDescriptor;
import org.apache.flink.api.java.tuple.Tuple;
@@ -44,9 +46,8 @@
import java.util.concurrent.TimeUnit;
/**
- * These tests verify that the api calls on
- * {@link WindowedStream} instantiate
- * the correct window operator.
+ * These tests verify that the api calls on {@link WindowedStream} that use the "time" shortcut
+ * instantiate the correct window operator.
*/
public class TimeWindowTranslationTest {
@@ -56,8 +57,9 @@
*/
@Test
@Ignore
- public void testFastTimeWindows() throws Exception {
+ public void testReduceFastTimeWindows() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
@@ -71,8 +73,21 @@ public void testFastTimeWindows() throws Exception {
OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform1 = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window1.getTransformation();
OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator1 = transform1.getOperator();
Assert.assertTrue(operator1 instanceof AggregatingProcessingTimeWindowOperator);
+ }
- DataStream<Tuple2<String, Integer>> window2 = source
+ /**
+ * These tests ensure that the fast aligned time windows operator is used if the
+ * conditions are right.
+ */
+ @Test
+ @Ignore
+ public void testApplyFastTimeWindows() throws Exception {
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
+
+ DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
+
+ DataStream<Tuple2<String, Integer>> window1 = source
.keyBy(0)
.timeWindow(Time.of(1000, TimeUnit.MILLISECONDS))
.apply(new WindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple, TimeWindow>() {
@@ -87,25 +102,25 @@ public void apply(Tuple tuple,
}
});
- OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform2 = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window2.getTransformation();
- OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator2 = transform2.getOperator();
- Assert.assertTrue(operator2 instanceof AccumulatingProcessingTimeWindowOperator);
+ OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform1 = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window1.getTransformation();
+ OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator1 = transform1.getOperator();
+ Assert.assertTrue(operator1 instanceof AccumulatingProcessingTimeWindowOperator);
}
@Test
@SuppressWarnings("rawtypes")
- public void testEventTimeWindows() throws Exception {
+ public void testReduceEventTimeWindows() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
- DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
-
- DummyReducer reducer = new DummyReducer();
+ DataStream<Tuple2<String, Integer>> source = env.fromElements(
+ Tuple2.of("hello", 1),
+ Tuple2.of("hello", 2));
DataStream<Tuple2<String, Integer>> window1 = source
- .keyBy(0)
- .timeWindow(Time.of(1000, TimeUnit.MILLISECONDS), Time.of(100, TimeUnit.MILLISECONDS))
- .reduce(reducer);
+ .keyBy(0)
+ .timeWindow(Time.of(1000, TimeUnit.MILLISECONDS),Time.of(100, TimeUnit.MILLISECONDS))
+ .reduce(new DummyReducer());
OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform1 = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window1.getTransformation();
OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator1 = transform1.getOperator();
@@ -114,8 +129,43 @@ public void testEventTimeWindows() throws Exception {
Assert.assertTrue(winOperator1.getTrigger() instanceof EventTimeTrigger);
Assert.assertTrue(winOperator1.getWindowAssigner() instanceof SlidingEventTimeWindows);
Assert.assertTrue(winOperator1.getStateDescriptor() instanceof ReducingStateDescriptor);
+ }
- DataStream<Tuple2<String, Integer>> window2 = source
+ @Test
+ @SuppressWarnings("rawtypes")
+ public void testFoldEventTimeWindows() throws Exception {
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
+
+ DataStream<Tuple2<String, Integer>> source = env.fromElements(
+ Tuple2.of("hello", 1),
+ Tuple2.of("hello", 2));
+
+ DataStream<Tuple2<String, Integer>> window1 = source
+ .keyBy(0)
+ .timeWindow(Time.of(1000, TimeUnit.MILLISECONDS),Time.of(100, TimeUnit.MILLISECONDS))
+ .fold(new Tuple2<>("", 1), new DummyFolder());
+
+ OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform1 = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window1.getTransformation();
+ OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator1 = transform1.getOperator();
+ Assert.assertTrue(operator1 instanceof WindowOperator);
+ WindowOperator winOperator1 = (WindowOperator) operator1;
+ Assert.assertTrue(winOperator1.getTrigger() instanceof EventTimeTrigger);
+ Assert.assertTrue(winOperator1.getWindowAssigner() instanceof SlidingEventTimeWindows);
+ Assert.assertTrue(winOperator1.getStateDescriptor() instanceof FoldingStateDescriptor);
+ }
+
+ @Test
+ @SuppressWarnings("rawtypes")
+ public void testApplyEventTimeWindows() throws Exception {
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
+
+ DataStream<Tuple2<String, Integer>> source = env.fromElements(
+ Tuple2.of("hello", 1),
+ Tuple2.of("hello", 2));
+
+ DataStream<Tuple2<String, Integer>> window1 = source
.keyBy(0)
.timeWindow(Time.of(1000, TimeUnit.MILLISECONDS))
.apply(new WindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple, TimeWindow>() {
@@ -130,14 +180,13 @@ public void apply(Tuple tuple,
}
});
- OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform2 = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window2.getTransformation();
- OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator2 = transform2.getOperator();
- Assert.assertTrue(operator2 instanceof WindowOperator);
- WindowOperator winOperator2 = (WindowOperator) operator2;
- Assert.assertTrue(winOperator2.getTrigger() instanceof EventTimeTrigger);
- Assert.assertTrue(winOperator2.getWindowAssigner() instanceof TumblingEventTimeWindows);
- Assert.assertTrue(winOperator2.getStateDescriptor() instanceof ListStateDescriptor);
-
+ OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform1 = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window1.getTransformation();
+ OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator1 = transform1.getOperator();
+ Assert.assertTrue(operator1 instanceof WindowOperator);
+ WindowOperator winOperator1 = (WindowOperator) operator1;
+ Assert.assertTrue(winOperator1.getTrigger() instanceof EventTimeTrigger);
+ Assert.assertTrue(winOperator1.getWindowAssigner() instanceof TumblingEventTimeWindows);
+ Assert.assertTrue(winOperator1.getStateDescriptor() instanceof ListStateDescriptor);
}
/**
@@ -187,12 +236,23 @@ public void apply(
// UDFs
// ------------------------------------------------------------------------
- public static class DummyReducer implements ReduceFunction<Tuple2<String, Integer>> {
+ private static class DummyReducer implements ReduceFunction<Tuple2<String, Integer>> {
private static final long serialVersionUID = 1L;
@Override
public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1, Tuple2<String, Integer> value2) throws Exception {
return value1;
}
}
+
+ private static class DummyFolder
+ implements FoldFunction<Tuple2<String, Integer>, Tuple2<String, Integer>> {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Tuple2<String, Integer> fold(Tuple2<String, Integer> value1, Tuple2<String, Integer> value2) throws Exception {
+ return value1;
+ }
+ }
+
}
Oops, something went wrong.

0 comments on commit aa220e4

Please sign in to comment.