Skip to content

Commit

Permalink
[FLINK-11084][datastream] Forbid using two consecutive split transfor…
Browse files Browse the repository at this point in the history
…mations

This closes #7258
  • Loading branch information
Clarkkkkk authored and dawidwys committed Jan 11, 2019
1 parent 03dc53c commit 0d3125d
Show file tree
Hide file tree
Showing 5 changed files with 121 additions and 6 deletions.
Expand Up @@ -234,7 +234,9 @@ public final DataStream<T> union(DataStream<T>... streams) {
* {@link org.apache.flink.streaming.api.collector.selector.OutputSelector}
* for directing the tuples.
* @return The {@link SplitStream}
* @deprecated Please use side ouput instead.
*/
@Deprecated
public SplitStream<T> split(OutputSelector<T> outputSelector) {
return new SplitStream<>(this, clean(outputSelector));
}
Expand Down
Expand Up @@ -33,6 +33,7 @@
* @param <OUT> The type of the elements in the Stream
*/

@Deprecated
@PublicEvolving
public class SplitStream<OUT> extends DataStream<OUT> {

Expand Down
Expand Up @@ -258,6 +258,8 @@ private <T> Collection<Integer> transformSplit(SplitTransformation<T> split) {
StreamTransformation<T> input = split.getInput();
Collection<Integer> resultIds = transform(input);

validateSplitTransformation(input);

// the recursive transform call might have transformed this already
if (alreadyTransformed.containsKey(split)) {
return alreadyTransformed.get(split);
Expand Down Expand Up @@ -643,4 +645,20 @@ private String determineSlotSharingGroup(String specifiedGroup, Collection<Integ
return inputGroup == null ? "default" : inputGroup;
}
}

private <T> void validateSplitTransformation(StreamTransformation<T> input) {
if (input instanceof SelectTransformation || input instanceof SplitTransformation) {
throw new IllegalStateException("Consecutive multiple splits are not supported. Splits are deprecated. Please use side-outputs.");
} else if (input instanceof SideOutputTransformation) {
throw new IllegalStateException("Split after side-outputs are not supported. Splits are deprecated. Please use side-outputs.");
} else if (input instanceof UnionTransformation) {
for (StreamTransformation<T> transformation : ((UnionTransformation<T>) input).getInputs()) {
validateSplitTransformation(transformation);
}
} else if (input instanceof PartitionTransformation) {
validateSplitTransformation(((PartitionTransformation) input).getInput());
} else {
return;
}
}
}
Expand Up @@ -79,6 +79,7 @@
import org.apache.flink.streaming.runtime.partitioner.ShufflePartitioner;
import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
import org.apache.flink.util.TestLogger;

import org.hamcrest.core.StringStartsWith;
Expand Down Expand Up @@ -952,12 +953,7 @@ public boolean filter(Integer value) throws Exception {
fail(e.getMessage());
}

OutputSelector<Integer> outputSelector = new OutputSelector<Integer>() {
@Override
public Iterable<String> select(Integer value) {
return null;
}
};
OutputSelector<Integer> outputSelector = new DummyOutputSelector<>();

SplitStream<Integer> split = unionFilter.split(outputSelector);
split.select("dummy").addSink(new DiscardingSink<Integer>());
Expand Down Expand Up @@ -1087,6 +1083,91 @@ public void testChannelSelectors() {
assertTrue(globalPartitioner instanceof GlobalPartitioner);
}

/////////////////////////////////////////////////////////////
// Split testing
/////////////////////////////////////////////////////////////

@Test
public void testConsecutiveSplitRejection() {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

DataStreamSource<Integer> src = env.fromElements(0, 0);

OutputSelector<Integer> outputSelector = new DummyOutputSelector<>();

src.split(outputSelector).split(outputSelector).addSink(new DiscardingSink<>());

expectedException.expect(IllegalStateException.class);
expectedException.expectMessage("Consecutive multiple splits are not supported. Splits are deprecated. Please use side-outputs.");

env.getStreamGraph();
}

@Test
public void testSplitAfterSideOutputRejection() {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

DataStreamSource<Integer> src = env.fromElements(0, 0);

OutputTag<Integer> outputTag = new OutputTag<Integer>("dummy"){};
OutputSelector<Integer> outputSelector = new DummyOutputSelector<>();

src.getSideOutput(outputTag).split(outputSelector).addSink(new DiscardingSink<>());

expectedException.expect(IllegalStateException.class);
expectedException.expectMessage("Split after side-outputs are not supported. Splits are deprecated. Please use side-outputs.");

env.getStreamGraph();
}

@Test
public void testSelectBetweenConsecutiveSplitRejection() {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

DataStreamSource<Integer> src = env.fromElements(0, 0);

OutputSelector<Integer> outputSelector = new DummyOutputSelector<>();

src.split(outputSelector).select("dummy").split(outputSelector).addSink(new DiscardingSink<>());

expectedException.expect(IllegalStateException.class);
expectedException.expectMessage("Consecutive multiple splits are not supported. Splits are deprecated. Please use side-outputs.");

env.getStreamGraph();
}

@Test
public void testUnionBetweenConsecutiveSplitRejection() {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

DataStreamSource<Integer> src = env.fromElements(0, 0);

OutputSelector<Integer> outputSelector = new DummyOutputSelector<>();

src.split(outputSelector).select("dummy").union(src.map(x -> x)).split(outputSelector).addSink(new DiscardingSink<>());

expectedException.expect(IllegalStateException.class);
expectedException.expectMessage("Consecutive multiple splits are not supported. Splits are deprecated. Please use side-outputs.");

env.getStreamGraph();
}

@Test
public void testKeybyBetweenConsecutiveSplitRejection() {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

DataStreamSource<Integer> src = env.fromElements(0, 0);

OutputSelector<Integer> outputSelector = new DummyOutputSelector<>();

src.split(outputSelector).select("dummy").keyBy(x -> x).split(outputSelector).addSink(new DiscardingSink<>());

expectedException.expect(IllegalStateException.class);
expectedException.expectMessage("Consecutive multiple splits are not supported. Splits are deprecated. Please use side-outputs.");

env.getStreamGraph();
}

/////////////////////////////////////////////////////////////
// KeyBy testing
/////////////////////////////////////////////////////////////
Expand Down Expand Up @@ -1423,4 +1504,11 @@ public int getI() {
return i;
}
}

private class DummyOutputSelector<Integer> implements OutputSelector<Integer> {
@Override
public Iterable<String> select(Integer value) {
return null;
}
}
}
Expand Up @@ -903,13 +903,19 @@ class DataStream[T](stream: JavaStream[T]) {
* Operator used for directing tuples to specific named outputs using an
* OutputSelector. Calling this method on an operator creates a new
* [[SplitStream]].
*
* @deprecated Please use side output instead.
*/
@deprecated
def split(selector: OutputSelector[T]): SplitStream[T] = asScalaStream(stream.split(selector))

/**
* Creates a new [[SplitStream]] that contains only the elements satisfying the
* given output selector predicate.
*
* @deprecated Please use side output instead.
*/
@deprecated
def split(fun: T => TraversableOnce[String]): SplitStream[T] = {
if (fun == null) {
throw new NullPointerException("OutputSelector must not be null.")
Expand Down

0 comments on commit 0d3125d

Please sign in to comment.