Skip to content

Commit

Permalink
This closes #1520
Browse files Browse the repository at this point in the history
  • Loading branch information
aljoscha committed Dec 8, 2016
2 parents c53e0b1 + f1a5704 commit 6807480
Showing 1 changed file with 18 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -78,11 +78,13 @@
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.streaming.api.collector.selector.OutputSelector;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.SplitStream;
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
Expand Down Expand Up @@ -554,24 +556,30 @@ public void translateNode(
.transform(transform.getName(), outputUnionTypeInformation, doFnOperator);
}

SplitStream<RawUnionValue> splitStream = unionOutputStream
.split(new OutputSelector<RawUnionValue>() {
@Override
public Iterable<String> select(RawUnionValue value) {
return Collections.singletonList(Integer.toString(value.getUnionTag()));
}
});

for (Map.Entry<TupleTag<?>, PCollection<?>> output : outputs.entrySet()) {
final int outputTag = tagsToLabels.get(output.getKey());

TypeInformation outputTypeInfo =
context.getTypeInfo(output.getValue());

@SuppressWarnings("unchecked")
DataStream filtered =
unionOutputStream.flatMap(new FlatMapFunction<RawUnionValue, Object>() {
@Override
public void flatMap(RawUnionValue value, Collector<Object> out) throws Exception {
if (value.getUnionTag() == outputTag) {
out.collect(value.getValue());
}
}
}).returns(outputTypeInfo);
DataStream unwrapped = splitStream.select(String.valueOf(outputTag))
.flatMap(new FlatMapFunction<RawUnionValue, Object>() {
@Override
public void flatMap(RawUnionValue value, Collector<Object> out) throws Exception {
out.collect(value.getValue());
}
}).returns(outputTypeInfo);

context.setOutputDataStream(output.getValue(), filtered);
context.setOutputDataStream(output.getValue(), unwrapped);
}
}

Expand Down

0 comments on commit 6807480

Please sign in to comment.