From 73af891141f453d607104e28d3a0e0a3438a409b Mon Sep 17 00:00:00 2001 From: Gyula Fora Date: Tue, 28 Jul 2015 22:01:32 +0200 Subject: [PATCH] [FLINK-2419] Add test for sinks after keyBy and groupBy Closes #947 --- .../flink/streaming/api/DataStreamTest.java | 42 +++++++++++++++++++ 1 file changed, 42 insertions(+) diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java index 764c6f2d38f19..324143f04eb3b 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java @@ -460,6 +460,48 @@ public String map2(Integer value) { fail(e.getMessage()); } } + + @Test + public void sinkKeyTest() { + StreamExecutionEnvironment env = new TestStreamEnvironment(PARALLELISM, MEMORYSIZE); + StreamGraph streamGraph = env.getStreamGraph(); + + DataStream sink = env.generateSequence(1, 100).print(); + assertTrue(streamGraph.getStreamNode(sink.getId()).getStatePartitioner() == null); + assertTrue(streamGraph.getStreamNode(sink.getId()).getInEdges().get(0).getPartitioner() instanceof RebalancePartitioner); + + KeySelector key1 = new KeySelector() { + + private static final long serialVersionUID = 1L; + + @Override + public Long getKey(Long value) throws Exception { + return (long) 0; + } + }; + + DataStream sink2 = env.generateSequence(1, 100).keyBy(key1).print(); + + assertTrue(streamGraph.getStreamNode(sink2.getId()).getStatePartitioner() != null); + assertEquals(key1, streamGraph.getStreamNode(sink2.getId()).getStatePartitioner()); + assertTrue(streamGraph.getStreamNode(sink2.getId()).getInEdges().get(0).getPartitioner() instanceof FieldsPartitioner); + + KeySelector key2 = new KeySelector() { + + private static final long serialVersionUID = 1L; + + @Override + public Long getKey(Long value) throws Exception { + return (long) 0; + } + }; + + DataStream sink3 = env.generateSequence(1, 100).keyBy(key2).print(); + + assertTrue(streamGraph.getStreamNode(sink3.getId()).getStatePartitioner() != null); + assertEquals(key2, streamGraph.getStreamNode(sink3.getId()).getStatePartitioner()); + assertTrue(streamGraph.getStreamNode(sink3.getId()).getInEdges().get(0).getPartitioner() instanceof FieldsPartitioner); + } @Test public void testChannelSelectors() {