Skip to content

Commit

Permalink
[FLINK-11084]Add tests
Browse files Browse the repository at this point in the history
  • Loading branch information
Clarkkkkk committed Dec 10, 2018
1 parent ec04af6 commit 6438c8d
Show file tree
Hide file tree
Showing 4 changed files with 120 additions and 7 deletions.
Expand Up @@ -17,6 +17,7 @@

package org.apache.flink.streaming.api.collector.selector;

import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.java.tuple.Tuple2;

import java.util.ArrayList;
Expand Down Expand Up @@ -59,4 +60,9 @@ public OutputSelectorWrapper(OutputSelector<OUT> currentOutputSelector) {
public void addPair(OutputSelector<OUT> outputSelector, List<String> selectNames) {
parentOutputSelectorsAndSelectNames.addFirst(Tuple2.of(outputSelector, selectNames));
}

@VisibleForTesting
public OutputSelector<OUT> getCurrentOutputSelector() {
return currentOutputSelector;
}
}
Expand Up @@ -38,6 +38,7 @@
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.streaming.api.collector.selector.OutputSelector;
import org.apache.flink.streaming.api.collector.selector.OutputSelectorWrapper;
import org.apache.flink.streaming.api.datastream.BroadcastConnectedStream;
import org.apache.flink.streaming.api.datastream.BroadcastStream;
import org.apache.flink.streaming.api.datastream.ConnectedStreams;
Expand Down Expand Up @@ -963,7 +964,7 @@ public Iterable<String> select(Integer value) {
split.select("dummy").addSink(new DiscardingSink<Integer>());
List<OutputSelector<?>> outputSelectors = env.getStreamGraph().getStreamNode(unionFilter.getId()).getOutputSelectors();
assertEquals(1, outputSelectors.size());
assertEquals(outputSelector, outputSelectors.get(0));
assertEquals(outputSelector, ((OutputSelectorWrapper) outputSelectors.get(0)).getCurrentOutputSelector());

DataStream<Integer> select = split.select("a");
DataStreamSink<Integer> sink = select.print();
Expand Down
@@ -0,0 +1,92 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.streaming.api.datastream;

import org.apache.flink.streaming.api.collector.selector.OutputSelector;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.DiscardingSink;

import org.apache.commons.collections.IteratorUtils;
import org.junit.Assert;
import org.junit.Test;

import java.util.ArrayList;
import java.util.List;

/**
* Test the {@link OutputSelector} for consecutive split and select.
*/
public class ConsecutiveSplitSelectTest {

@Test
public void testConsecutiveSplitSelect() {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> dataStream = env.fromElements("a1", "a2");

SplitStream<String> splitStream1 = dataStream.split(new OutputSelector<String>() {
@Override public Iterable<String> select(String value) {
List<String> output = new ArrayList<>(1);
if (value.equals("a1")) {
output.add("name1");
} else {
output.add("name2");
}
return output;
}
});
SplitStream<String> splitStream2 = splitStream1.select("name1").split(new OutputSelector<String>() {
@Override public Iterable<String> select(String value) {
List<String> output = new ArrayList<>(1);
if (value.equals("a1")) {
output.add("name4");
} else {
output.add("name5");
}
return output;
}
});
splitStream1.addSink(new DiscardingSink<>());
splitStream2.addSink(new DiscardingSink<>());

List<OutputSelector<?>> outputSelectors = env.getStreamGraph().getStreamNode(1).getOutputSelectors();
OutputSelector<String> outputSelector1 = (OutputSelector<String>) outputSelectors.get(0);
OutputSelector<String> outputSelector2 = (OutputSelector<String>) outputSelectors.get(1);

List<String> actualResult = IteratorUtils.toList(outputSelector1.select("a1").iterator());
List<String> expectedResult = new ArrayList<>();
expectedResult.add("name1");
validateAndCleanUpTwoList(actualResult, expectedResult);

actualResult = IteratorUtils.toList(outputSelector2.select("a1").iterator());
expectedResult.add("name4");
validateAndCleanUpTwoList(actualResult, expectedResult);

actualResult = IteratorUtils.toList(outputSelector1.select("a2").iterator());
expectedResult.add("name2");
validateAndCleanUpTwoList(actualResult, expectedResult);

actualResult = IteratorUtils.toList(outputSelector2.select("a2").iterator());
validateAndCleanUpTwoList(actualResult, expectedResult);
}

private void validateAndCleanUpTwoList(List<String> actualResult, List<String> expectedResult) {
Assert.assertArrayEquals(actualResult.toArray(), expectedResult.toArray());
actualResult.clear();
expectedResult.clear();
}
}
Expand Up @@ -21,6 +21,8 @@
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.api.collector.selector.OutputSelector;
import org.apache.flink.streaming.api.collector.selector.OutputSelectorWrapper;
import org.apache.flink.streaming.api.datastream.ConnectedStreams;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
Expand Down Expand Up @@ -48,6 +50,9 @@

import org.junit.Test;

import java.util.ArrayList;
import java.util.List;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;

Expand Down Expand Up @@ -173,15 +178,15 @@ public void testVirtualTransformations() throws Exception {
// verify that partitioning in unions is preserved and that it works across split/select
assertTrue(graph.getStreamNode(map1Operator.getId()).getOutEdges().get(0).getPartitioner() instanceof BroadcastPartitioner);
assertTrue(graph.getStreamNode(map1Operator.getId()).getOutEdges().get(0).getSelectedNames().get(0).equals("even"));
assertTrue(graph.getStreamNode(map1Operator.getId()).getOutputSelectors().contains(selector1));
assertTrue(extractCurrentOutputSelectorFromWrapper(graph.getStreamNode(map1Operator.getId()).getOutputSelectors()).contains(selector1));

assertTrue(graph.getStreamNode(map2Operator.getId()).getOutEdges().get(0).getPartitioner() instanceof GlobalPartitioner);
assertTrue(graph.getStreamNode(map2Operator.getId()).getOutEdges().get(0).getSelectedNames().get(0).equals("odd"));
assertTrue(graph.getStreamNode(map2Operator.getId()).getOutputSelectors().contains(selector2));
assertTrue(extractCurrentOutputSelectorFromWrapper(graph.getStreamNode(map2Operator.getId()).getOutputSelectors()).contains(selector2));

assertTrue(graph.getStreamNode(map3Operator.getId()).getOutEdges().get(0).getPartitioner() instanceof ShufflePartitioner);
assertTrue(graph.getStreamNode(map3Operator.getId()).getOutEdges().get(0).getSelectedNames().get(0).equals("even"));
assertTrue(graph.getStreamNode(map3Operator.getId()).getOutputSelectors().contains(selector3));
assertTrue(extractCurrentOutputSelectorFromWrapper(graph.getStreamNode(map3Operator.getId()).getOutputSelectors()).contains(selector3));
}

/**
Expand Down Expand Up @@ -222,15 +227,15 @@ public void testVirtualTransformations2() throws Exception {
// verify that the properties are correctly set on all input operators
assertTrue(graph.getStreamNode(map1.getId()).getOutEdges().get(0).getPartitioner() instanceof BroadcastPartitioner);
assertTrue(graph.getStreamNode(map1.getId()).getOutEdges().get(0).getSelectedNames().get(0).equals("foo"));
assertTrue(graph.getStreamNode(map1.getId()).getOutputSelectors().contains(selector));
assertTrue(extractCurrentOutputSelectorFromWrapper(graph.getStreamNode(map1.getId()).getOutputSelectors()).contains(selector));

assertTrue(graph.getStreamNode(map2.getId()).getOutEdges().get(0).getPartitioner() instanceof BroadcastPartitioner);
assertTrue(graph.getStreamNode(map2.getId()).getOutEdges().get(0).getSelectedNames().get(0).equals("foo"));
assertTrue(graph.getStreamNode(map2.getId()).getOutputSelectors().contains(selector));
assertTrue(extractCurrentOutputSelectorFromWrapper(graph.getStreamNode(map2.getId()).getOutputSelectors()).contains(selector));

assertTrue(graph.getStreamNode(map3.getId()).getOutEdges().get(0).getPartitioner() instanceof BroadcastPartitioner);
assertTrue(graph.getStreamNode(map3.getId()).getOutEdges().get(0).getSelectedNames().get(0).equals("foo"));
assertTrue(graph.getStreamNode(map3.getId()).getOutputSelectors().contains(selector));
assertTrue(extractCurrentOutputSelectorFromWrapper(graph.getStreamNode(map3.getId()).getOutputSelectors()).contains(selector));

}

Expand Down Expand Up @@ -420,6 +425,15 @@ public void invoke(Integer value, Context ctx) throws Exception {}
env.getStreamGraph().getStreamingPlanAsJSON();
}

private List<OutputSelector> extractCurrentOutputSelectorFromWrapper(List<OutputSelector<?>> outputSelectorWrappers) {
List<OutputSelector> result = new ArrayList<>();
for (OutputSelector outputSelectorWrapper : outputSelectorWrappers) {
assertTrue(outputSelectorWrapper instanceof OutputSelectorWrapper);
result.add(((OutputSelectorWrapper) outputSelectorWrapper).getCurrentOutputSelector());
}
return result;
}

private static class OutputTypeConfigurableOperationWithTwoInputs
extends AbstractStreamOperator<Integer>
implements TwoInputStreamOperator<Integer, Integer, Integer>, OutputTypeConfigurable<Integer> {
Expand Down

0 comments on commit 6438c8d

Please sign in to comment.