Skip to content

Commit

Permalink
[BEAM-1096] flink streaming side output optimization using SplitStream
Browse files Browse the repository at this point in the history
fix checkstyle
  • Loading branch information
xhumanoid committed Dec 6, 2016
1 parent 568d73f commit 3ab8063
Showing 1 changed file with 13 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,12 @@
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.streaming.api.collector.selector.OutputSelector;
import org.apache.flink.streaming.api.datastream.*;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.SplitStream;
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
Expand Down Expand Up @@ -551,12 +556,13 @@ public void translateNode(
.transform(transform.getName(), outputUnionTypeInformation, doFnOperator);
}

SplitStream<RawUnionValue> splitStream = unionOutputStream.split(new OutputSelector<RawUnionValue>() {
@Override
public Iterable<String> select(RawUnionValue value) {
return Collections.singletonList(Integer.toString(value.getUnionTag()));
}
});
SplitStream<RawUnionValue> splitStream = unionOutputStream
.split(new OutputSelector<RawUnionValue>() {
@Override
public Iterable<String> select(RawUnionValue value) {
return Collections.singletonList(Integer.toString(value.getUnionTag()));
}
});

for (Map.Entry<TupleTag<?>, PCollection<?>> output : outputs.entrySet()) {
final int outputTag = tagsToLabels.get(output.getKey());
Expand Down

0 comments on commit 3ab8063

Please sign in to comment.