From 0a8ad92b9dc5bc90572702592e1b203f4dd909f7 Mon Sep 17 00:00:00 2001 From: JingsongLi Date: Sun, 4 Jun 2017 21:56:10 +0800 Subject: [PATCH 1/2] [BEAM-1498] Use Flink-native side outputs --- .../FlinkStreamingTransformTranslators.java | 145 ++++++------------ .../wrappers/streaming/DoFnOperator.java | 40 +++-- .../streaming/WindowDoFnOperator.java | 4 +- .../runners/flink/PipelineOptionsTest.java | 5 +- .../flink/streaming/DoFnOperatorTest.java | 65 ++++---- 5 files changed, 112 insertions(+), 147 deletions(-) diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java index 00e9934aacbf1..d8c30499b6e9f 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java @@ -18,9 +18,6 @@ package org.apache.beam.runners.flink; -import static com.google.common.base.Preconditions.checkArgument; - -import com.google.common.collect.Lists; import com.google.common.collect.Maps; import java.nio.ByteBuffer; import java.util.ArrayList; @@ -29,7 +26,6 @@ import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.Map.Entry; import org.apache.beam.runners.core.KeyedWorkItem; import org.apache.beam.runners.core.SplittableParDoViaKeyedWorkItems; import org.apache.beam.runners.core.SystemReduceFn; @@ -84,16 +80,15 @@ import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.typeutils.GenericTypeInfo; import org.apache.flink.api.java.typeutils.ResultTypeQueryable; -import org.apache.flink.streaming.api.collector.selector.OutputSelector; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.KeyedStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; -import org.apache.flink.streaming.api.datastream.SplitStream; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; import org.apache.flink.streaming.api.operators.TwoInputStreamOperator; import org.apache.flink.streaming.api.transformations.TwoInputTransformation; import org.apache.flink.util.Collector; +import org.apache.flink.util.OutputTag; /** * This class contains all the mappings between Beam and Flink @@ -337,7 +332,7 @@ public RawUnionValue map(T o) throws Exception { static class ParDoTranslationHelper { interface DoFnOperatorFactory { - DoFnOperator createDoFnOperator( + DoFnOperator createDoFnOperator( DoFn doFn, String stepName, List> sideInputs, @@ -345,7 +340,7 @@ DoFnOperator createDoFnOperator( List> additionalOutputTags, FlinkStreamingTranslationContext context, WindowingStrategy windowingStrategy, - Map, Integer> tagsToLabels, + Map, OutputTag>> tagsToLabels, Coder> inputCoder, Coder keyCoder, Map> transformedSideInputs); @@ -354,7 +349,6 @@ DoFnOperator createDoFnOperator( static void translateParDo( String transformName, DoFn doFn, - String stepName, PCollection input, List> sideInputs, Map, PValue> outputs, @@ -366,10 +360,15 @@ static void translateParDo( // we assume that the transformation does not change the windowing strategy. WindowingStrategy windowingStrategy = input.getWindowingStrategy(); - Map, Integer> tagsToLabels = - transformTupleTagsToLabels(mainOutputTag, outputs); + Map, OutputTag>> tagsToOutputTags = Maps.newHashMap(); + for (Map.Entry, PValue> entry : outputs.entrySet()) { + if (!tagsToOutputTags.containsKey(entry.getKey())) { + tagsToOutputTags.put(entry.getKey(), new OutputTag<>(entry.getKey().getId(), + (TypeInformation) context.getTypeInfo((PCollection) entry.getValue()))); + } + } - SingleOutputStreamOperator unionOutputStream; + SingleOutputStreamOperator> outputStream; Coder> inputCoder = context.getCoder(input); @@ -391,8 +390,12 @@ static void translateParDo( stateful = true; } + CoderTypeInformation> outputTypeInformation = + new CoderTypeInformation<>( + context.getCoder((PCollection) outputs.get(mainOutputTag))); + if (sideInputs.isEmpty()) { - DoFnOperator doFnOperator = + DoFnOperator doFnOperator = doFnOperatorFactory.createDoFnOperator( doFn, context.getCurrentTransform().getFullName(), @@ -401,24 +404,19 @@ static void translateParDo( additionalOutputTags, context, windowingStrategy, - tagsToLabels, + tagsToOutputTags, inputCoder, keyCoder, new HashMap>() /* side-input mapping */); - UnionCoder outputUnionCoder = createUnionCoder(outputs); - - CoderTypeInformation outputUnionTypeInformation = - new CoderTypeInformation<>(outputUnionCoder); - - unionOutputStream = inputDataStream - .transform(transformName, outputUnionTypeInformation, doFnOperator); + outputStream = inputDataStream + .transform(transformName, outputTypeInformation, doFnOperator); } else { Tuple2>, DataStream> transformedSideInputs = transformSideInputs(sideInputs, context); - DoFnOperator doFnOperator = + DoFnOperator doFnOperator = doFnOperatorFactory.createDoFnOperator( doFn, context.getCurrentTransform().getFullName(), @@ -427,16 +425,11 @@ static void translateParDo( additionalOutputTags, context, windowingStrategy, - tagsToLabels, + tagsToOutputTags, inputCoder, keyCoder, transformedSideInputs.f0); - UnionCoder outputUnionCoder = createUnionCoder(outputs); - - CoderTypeInformation outputUnionTypeInformation = - new CoderTypeInformation<>(outputUnionCoder); - if (stateful) { // we have to manually contruct the two-input transform because we're not // allowed to have only one input keyed, normally. @@ -448,83 +441,35 @@ static void translateParDo( keyedStream.getTransformation(), transformedSideInputs.f1.broadcast().getTransformation(), transformName, - (TwoInputStreamOperator) doFnOperator, - outputUnionTypeInformation, + doFnOperator, + outputTypeInformation, keyedStream.getParallelism()); rawFlinkTransform.setStateKeyType(keyedStream.getKeyType()); rawFlinkTransform.setStateKeySelectors(keyedStream.getKeySelector(), null); - unionOutputStream = new SingleOutputStreamOperator( - keyedStream.getExecutionEnvironment(), - rawFlinkTransform) {}; // we have to cheat around the ctor being protected + outputStream = new SingleOutputStreamOperator( + keyedStream.getExecutionEnvironment(), + rawFlinkTransform) { + }; // we have to cheat around the ctor being protected keyedStream.getExecutionEnvironment().addOperator(rawFlinkTransform); } else { - unionOutputStream = inputDataStream + outputStream = inputDataStream .connect(transformedSideInputs.f1.broadcast()) - .transform(transformName, outputUnionTypeInformation, doFnOperator); + .transform(transformName, outputTypeInformation, doFnOperator); } } - SplitStream splitStream = unionOutputStream - .split(new OutputSelector() { - @Override - public Iterable select(RawUnionValue value) { - return Collections.singletonList(Integer.toString(value.getUnionTag())); - } - }); - - for (Entry, PValue> output : outputs.entrySet()) { - final int outputTag = tagsToLabels.get(output.getKey()); - - TypeInformation outputTypeInfo = context.getTypeInfo((PCollection) output.getValue()); - - @SuppressWarnings("unchecked") - DataStream unwrapped = splitStream.select(String.valueOf(outputTag)) - .flatMap(new FlatMapFunction() { - @Override - public void flatMap(RawUnionValue value, Collector out) throws Exception { - out.collect(value.getValue()); - } - }).returns(outputTypeInfo); - - context.setOutputDataStream(output.getValue(), unwrapped); - } - } - - private static Map, Integer> transformTupleTagsToLabels( - TupleTag mainTag, - Map, PValue> allTaggedValues) { + context.setOutputDataStream(outputs.get(mainOutputTag), outputStream); - Map, Integer> tagToLabelMap = Maps.newHashMap(); - int count = 0; - tagToLabelMap.put(mainTag, count++); - for (TupleTag key : allTaggedValues.keySet()) { - if (!tagToLabelMap.containsKey(key)) { - tagToLabelMap.put(key, count++); + for (Map.Entry, PValue> entry : outputs.entrySet()) { + if (!entry.getKey().equals(mainOutputTag)) { + context.setOutputDataStream(entry.getValue(), + outputStream.getSideOutput(tagsToOutputTags.get(entry.getKey()))); } } - return tagToLabelMap; - } - - private static UnionCoder createUnionCoder(Map, PValue> taggedCollections) { - List> outputCoders = Lists.newArrayList(); - for (PValue taggedColl : taggedCollections.values()) { - checkArgument( - taggedColl instanceof PCollection, - "A Union Coder can only be created for a Collection of Tagged %s. Got %s", - PCollection.class.getSimpleName(), - taggedColl.getClass().getSimpleName()); - PCollection coll = (PCollection) taggedColl; - WindowedValue.FullWindowedValueCoder windowedValueCoder = - WindowedValue.getFullCoder( - coll.getCoder(), - coll.getWindowingStrategy().getWindowFn().windowCoder()); - outputCoders.add(windowedValueCoder); - } - return UnionCoder.of(outputCoders); } } @@ -540,7 +485,6 @@ public void translateNode( ParDoTranslationHelper.translateParDo( transform.getName(), transform.getFn(), - context.getCurrentTransform().getFullName(), (PCollection) context.getInput(transform), transform.getSideInputs(), context.getOutputs(transform), @@ -549,7 +493,7 @@ public void translateNode( context, new ParDoTranslationHelper.DoFnOperatorFactory() { @Override - public DoFnOperator createDoFnOperator( + public DoFnOperator createDoFnOperator( DoFn doFn, String stepName, List> sideInputs, @@ -557,7 +501,7 @@ public DoFnOperator createDoFnOperator( List> additionalOutputTags, FlinkStreamingTranslationContext context, WindowingStrategy windowingStrategy, - Map, Integer> tagsToLabels, + Map, OutputTag>> tagsToOutputTags, Coder> inputCoder, Coder keyCoder, Map> transformedSideInputs) { @@ -567,7 +511,7 @@ public DoFnOperator createDoFnOperator( inputCoder, mainOutputTag, additionalOutputTags, - new DoFnOperator.MultiOutputOutputManagerFactory(tagsToLabels), + new DoFnOperator.MultiOutputOutputManagerFactory(mainOutputTag, tagsToOutputTags), windowingStrategy, transformedSideInputs, sideInputs, @@ -592,7 +536,6 @@ public void translateNode( ParDoTranslationHelper.translateParDo( transform.getName(), transform.newProcessFn(transform.getFn()), - context.getCurrentTransform().getFullName(), context.getInput(transform), transform.getSideInputs(), context.getOutputs(transform), @@ -604,8 +547,7 @@ public void translateNode( @Override public DoFnOperator< KeyedWorkItem>, - OutputT, - RawUnionValue> createDoFnOperator( + OutputT, OutputT> createDoFnOperator( DoFn< KeyedWorkItem>, OutputT> doFn, @@ -615,7 +557,7 @@ RawUnionValue> createDoFnOperator( List> additionalOutputTags, FlinkStreamingTranslationContext context, WindowingStrategy windowingStrategy, - Map, Integer> tagsToLabels, + Map, OutputTag>> tagsToOutputTags, Coder< WindowedValue< KeyedWorkItem< @@ -629,7 +571,7 @@ RawUnionValue> createDoFnOperator( inputCoder, mainOutputTag, additionalOutputTags, - new DoFnOperator.MultiOutputOutputManagerFactory(tagsToLabels), + new DoFnOperator.MultiOutputOutputManagerFactory(mainOutputTag, tagsToOutputTags), windowingStrategy, transformedSideInputs, sideInputs, @@ -756,8 +698,7 @@ public void translateNode( TypeInformation>>> outputTypeInfo = context.getTypeInfo(context.getOutput(transform)); - DoFnOperator.DefaultOutputManagerFactory< - WindowedValue>>> outputManagerFactory = + DoFnOperator.DefaultOutputManagerFactory>> outputManagerFactory = new DoFnOperator.DefaultOutputManagerFactory<>(); WindowDoFnOperator> doFnOperator = @@ -868,7 +809,7 @@ public void translateNode( (Coder) windowedWorkItemCoder, new TupleTag>("main output"), Collections.>emptyList(), - new DoFnOperator.DefaultOutputManagerFactory>>(), + new DoFnOperator.DefaultOutputManagerFactory>(), windowingStrategy, new HashMap>(), /* side-input mapping */ Collections.>emptyList(), /* side inputs */ @@ -894,7 +835,7 @@ public void translateNode( (Coder) windowedWorkItemCoder, new TupleTag>("main output"), Collections.>emptyList(), - new DoFnOperator.DefaultOutputManagerFactory>>(), + new DoFnOperator.DefaultOutputManagerFactory>(), windowingStrategy, transformSideInputs.f0, sideInputs, diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java index 594fe0e7e55a2..8c27ed97df8c7 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java @@ -87,6 +87,7 @@ import org.apache.flink.streaming.api.operators.TwoInputStreamOperator; import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.util.OutputTag; import org.joda.time.Instant; /** @@ -98,9 +99,9 @@ * type when we have additional tagged outputs */ public class DoFnOperator - extends AbstractStreamOperator - implements OneInputStreamOperator, OutputT>, - TwoInputStreamOperator, RawUnionValue, OutputT>, + extends AbstractStreamOperator> + implements OneInputStreamOperator, WindowedValue>, + TwoInputStreamOperator, RawUnionValue, WindowedValue>, KeyGroupCheckpointedOperator, Triggerable { protected DoFn doFn; @@ -662,7 +663,7 @@ private void setCurrentOutputWatermark(long currentOutputWatermark) { * a Flink {@link Output}. */ interface OutputManagerFactory extends Serializable { - DoFnRunners.OutputManager create(Output> output); + DoFnRunners.OutputManager create(Output>> output); } /** @@ -673,14 +674,15 @@ interface OutputManagerFactory extends Serializable { public static class DefaultOutputManagerFactory implements OutputManagerFactory { @Override - public DoFnRunners.OutputManager create(final Output> output) { + public DoFnRunners.OutputManager create( + final Output>> output) { return new DoFnRunners.OutputManager() { @Override public void output(TupleTag tag, WindowedValue value) { // with tagged outputs we can't get around this because we don't // know our own output type... @SuppressWarnings("unchecked") - OutputT castValue = (OutputT) value; + WindowedValue castValue = (WindowedValue) value; output.collect(new StreamRecord<>(castValue)); } }; @@ -692,22 +694,34 @@ public void output(TupleTag tag, WindowedValue value) { * {@link DoFnRunners.OutputManager} that can write to multiple logical * outputs by unioning them in a {@link RawUnionValue}. */ - public static class MultiOutputOutputManagerFactory - implements OutputManagerFactory { + public static class MultiOutputOutputManagerFactory + implements OutputManagerFactory { - Map, Integer> mapping; + private TupleTag mainTag; + Map, OutputTag>> mapping; - public MultiOutputOutputManagerFactory(Map, Integer> mapping) { + public MultiOutputOutputManagerFactory( + TupleTag mainTag, + Map, OutputTag>> mapping) { + this.mainTag = mainTag; this.mapping = mapping; } @Override - public DoFnRunners.OutputManager create(final Output> output) { + public DoFnRunners.OutputManager create( + final Output>> output) { return new DoFnRunners.OutputManager() { @Override public void output(TupleTag tag, WindowedValue value) { - int intTag = mapping.get(tag); - output.collect(new StreamRecord<>(new RawUnionValue(intTag, value))); + if (tag.equals(mainTag)) { + @SuppressWarnings("unchecked") + WindowedValue outputValue = (WindowedValue) value; + output.collect(new StreamRecord<>(outputValue)); + } else { + @SuppressWarnings("unchecked") + OutputTag> outputTag = (OutputTag) mapping.get(tag); + output.>collect(outputTag, new StreamRecord<>(value)); + } } }; } diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java index bf64edefbaea2..ea578b98bcd0f 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java @@ -46,7 +46,7 @@ * Flink operator for executing window {@link DoFn DoFns}. */ public class WindowDoFnOperator - extends DoFnOperator, KV, WindowedValue>> { + extends DoFnOperator, KV, KV> { private final SystemReduceFn systemReduceFn; @@ -56,7 +56,7 @@ public WindowDoFnOperator( Coder>> inputCoder, TupleTag> mainOutputTag, List> additionalOutputTags, - OutputManagerFactory>> outputManagerFactory, + OutputManagerFactory> outputManagerFactory, WindowingStrategy windowingStrategy, Map> sideInputTagMapping, Collection> sideInputs, diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java index 8382a2d34a3bd..bc0b1c2d62ed5 100644 --- a/runners/flink/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java +++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java @@ -173,13 +173,12 @@ public void parDoBaseClassPipelineOptionsSerializationTest() throws Exception { final byte[] serialized = SerializationUtils.serialize(doFnOperator); @SuppressWarnings("unchecked") - DoFnOperator deserialized = - (DoFnOperator) SerializationUtils.deserialize(serialized); + DoFnOperator deserialized = SerializationUtils.deserialize(serialized); TypeInformation> typeInformation = TypeInformation.of( new TypeHint>() {}); - OneInputStreamOperatorTestHarness, Object> testHarness = + OneInputStreamOperatorTestHarness, WindowedValue> testHarness = new OneInputStreamOperatorTestHarness<>(deserialized, typeInformation.createSerializer(new ExecutionConfig())); diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java index 79bc0e0b1671b..132242efd2500 100644 --- a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java +++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java @@ -65,6 +65,7 @@ import org.apache.flink.streaming.util.KeyedTwoInputStreamOperatorTestHarness; import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; import org.apache.flink.streaming.util.TwoInputStreamOperatorTestHarness; +import org.apache.flink.util.OutputTag; import org.joda.time.Duration; import org.joda.time.Instant; import org.junit.Test; @@ -123,7 +124,7 @@ public void testSingleOutput() throws Exception { PipelineOptionsFactory.as(FlinkPipelineOptions.class), null); - OneInputStreamOperatorTestHarness, String> testHarness = + OneInputStreamOperatorTestHarness, WindowedValue> testHarness = new OneInputStreamOperatorTestHarness<>(doFnOperator); testHarness.open(); @@ -147,26 +148,27 @@ public void testMultiOutputOutput() throws Exception { TupleTag mainOutput = new TupleTag<>("main-output"); TupleTag additionalOutput1 = new TupleTag<>("output-1"); TupleTag additionalOutput2 = new TupleTag<>("output-2"); - ImmutableMap, Integer> outputMapping = ImmutableMap., Integer>builder() - .put(mainOutput, 1) - .put(additionalOutput1, 2) - .put(additionalOutput2, 3) - .build(); + ImmutableMap, OutputTag> outputMapping = + ImmutableMap., OutputTag>builder() + .put(mainOutput, new OutputTag(mainOutput.getId()){}) + .put(additionalOutput1, new OutputTag(additionalOutput1.getId()){}) + .put(additionalOutput2, new OutputTag(additionalOutput2.getId()){}) + .build(); - DoFnOperator doFnOperator = new DoFnOperator<>( + DoFnOperator doFnOperator = new DoFnOperator<>( new MultiOutputDoFn(additionalOutput1, additionalOutput2), "stepName", windowedValueCoder, mainOutput, ImmutableList.>of(additionalOutput1, additionalOutput2), - new DoFnOperator.MultiOutputOutputManagerFactory(outputMapping), + new DoFnOperator.MultiOutputOutputManagerFactory(mainOutput, outputMapping), WindowingStrategy.globalDefault(), new HashMap>(), /* side-input mapping */ Collections.>emptyList(), /* side inputs */ PipelineOptionsFactory.as(FlinkPipelineOptions.class), null); - OneInputStreamOperatorTestHarness, RawUnionValue> testHarness = + OneInputStreamOperatorTestHarness, WindowedValue> testHarness = new OneInputStreamOperatorTestHarness<>(doFnOperator); testHarness.open(); @@ -176,17 +178,26 @@ public void testMultiOutputOutput() throws Exception { testHarness.processElement(new StreamRecord<>(WindowedValue.valueInGlobalWindow("hello"))); assertThat( - this.stripStreamRecordFromRawUnion(testHarness.getOutput()), + this.stripStreamRecord(testHarness.getOutput()), + contains( + WindowedValue.valueInGlobalWindow("got: hello"))); + + assertThat( + this.stripStreamRecord(testHarness.getSideOutput(outputMapping.get(additionalOutput1))), contains( - new RawUnionValue(2, WindowedValue.valueInGlobalWindow("extra: one")), - new RawUnionValue(3, WindowedValue.valueInGlobalWindow("extra: two")), - new RawUnionValue(1, WindowedValue.valueInGlobalWindow("got: hello")), - new RawUnionValue(2, WindowedValue.valueInGlobalWindow("got: hello")), - new RawUnionValue(3, WindowedValue.valueInGlobalWindow("got: hello")))); + WindowedValue.valueInGlobalWindow("extra: one"), + WindowedValue.valueInGlobalWindow("got: hello"))); + + assertThat( + this.stripStreamRecord(testHarness.getSideOutput(outputMapping.get(additionalOutput2))), + contains( + WindowedValue.valueInGlobalWindow("extra: two"), + WindowedValue.valueInGlobalWindow("got: hello"))); testHarness.close(); } + @Test public void testLateDroppingForStatefulFn() throws Exception { @@ -212,13 +223,13 @@ public void processElement(ProcessContext context) { TupleTag outputTag = new TupleTag<>("main-output"); - DoFnOperator> doFnOperator = new DoFnOperator<>( + DoFnOperator doFnOperator = new DoFnOperator<>( fn, "stepName", windowedValueCoder, outputTag, Collections.>emptyList(), - new DoFnOperator.DefaultOutputManagerFactory>(), + new DoFnOperator.DefaultOutputManagerFactory(), windowingStrategy, new HashMap>(), /* side-input mapping */ Collections.>emptyList(), /* side inputs */ @@ -325,14 +336,14 @@ public void onTimer(OnTimerContext context, @StateId(stateId) ValueState TupleTag> outputTag = new TupleTag<>("main-output"); DoFnOperator< - KV, KV, WindowedValue>> doFnOperator = + KV, KV, KV> doFnOperator = new DoFnOperator<>( fn, "stepName", windowedValueCoder, outputTag, Collections.>emptyList(), - new DoFnOperator.DefaultOutputManagerFactory>>(), + new DoFnOperator.DefaultOutputManagerFactory>(), windowingStrategy, new HashMap>(), /* side-input mapping */ Collections.>emptyList(), /* side inputs */ @@ -435,8 +446,8 @@ public void testSideInputs(boolean keyed) throws Exception { PipelineOptionsFactory.as(FlinkPipelineOptions.class), keyCoder); - TwoInputStreamOperatorTestHarness, RawUnionValue, String> testHarness = - new TwoInputStreamOperatorTestHarness<>(doFnOperator); + TwoInputStreamOperatorTestHarness, RawUnionValue, WindowedValue> + testHarness = new TwoInputStreamOperatorTestHarness<>(doFnOperator); if (keyed) { // we use a dummy key for the second input since it is considered to be broadcast @@ -527,19 +538,19 @@ public WindowedValue apply(@Nullable Object o) { }); } - private Iterable stripStreamRecordFromRawUnion(Iterable input) { + private Iterable> stripStreamRecord(Iterable input) { return FluentIterable.from(input).filter(new Predicate() { @Override public boolean apply(@Nullable Object o) { - return o instanceof StreamRecord && ((StreamRecord) o).getValue() instanceof RawUnionValue; + return o instanceof StreamRecord; } - }).transform(new Function() { + }).transform(new Function>() { @Nullable @Override @SuppressWarnings({"unchecked", "rawtypes"}) - public RawUnionValue apply(@Nullable Object o) { - if (o instanceof StreamRecord && ((StreamRecord) o).getValue() instanceof RawUnionValue) { - return (RawUnionValue) ((StreamRecord) o).getValue(); + public WindowedValue apply(@Nullable Object o) { + if (o instanceof StreamRecord) { + return (WindowedValue) ((StreamRecord) o).getValue(); } throw new RuntimeException("unreachable"); } From 6966280dca498618405a93f8cd664473746e69cb Mon Sep 17 00:00:00 2001 From: JingsongLi Date: Tue, 6 Jun 2017 17:31:09 +0800 Subject: [PATCH 2/2] Remove the FnOutputT parameter from DoFnOperator --- .../FlinkStreamingTransformTranslators.java | 10 +++++----- .../wrappers/streaming/DoFnOperator.java | 20 +++++++++---------- .../streaming/SplittableDoFnOperator.java | 12 +++++------ .../streaming/WindowDoFnOperator.java | 2 +- .../runners/flink/PipelineOptionsTest.java | 6 +++--- .../flink/streaming/DoFnOperatorTest.java | 11 +++++----- 6 files changed, 30 insertions(+), 31 deletions(-) diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java index d8c30499b6e9f..2a7c5d66c953d 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java @@ -332,7 +332,7 @@ public RawUnionValue map(T o) throws Exception { static class ParDoTranslationHelper { interface DoFnOperatorFactory { - DoFnOperator createDoFnOperator( + DoFnOperator createDoFnOperator( DoFn doFn, String stepName, List> sideInputs, @@ -395,7 +395,7 @@ static void translateParDo( context.getCoder((PCollection) outputs.get(mainOutputTag))); if (sideInputs.isEmpty()) { - DoFnOperator doFnOperator = + DoFnOperator doFnOperator = doFnOperatorFactory.createDoFnOperator( doFn, context.getCurrentTransform().getFullName(), @@ -416,7 +416,7 @@ static void translateParDo( Tuple2>, DataStream> transformedSideInputs = transformSideInputs(sideInputs, context); - DoFnOperator doFnOperator = + DoFnOperator doFnOperator = doFnOperatorFactory.createDoFnOperator( doFn, context.getCurrentTransform().getFullName(), @@ -493,7 +493,7 @@ public void translateNode( context, new ParDoTranslationHelper.DoFnOperatorFactory() { @Override - public DoFnOperator createDoFnOperator( + public DoFnOperator createDoFnOperator( DoFn doFn, String stepName, List> sideInputs, @@ -547,7 +547,7 @@ public void translateNode( @Override public DoFnOperator< KeyedWorkItem>, - OutputT, OutputT> createDoFnOperator( + OutputT> createDoFnOperator( DoFn< KeyedWorkItem>, OutputT> doFn, diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java index 8c27ed97df8c7..350f32367a84c 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java @@ -94,21 +94,21 @@ * Flink operator for executing {@link DoFn DoFns}. * * @param the input type of the {@link DoFn} - * @param the output type of the {@link DoFn} + * @param the output type of the {@link DoFn} * @param the output type of the operator, this can be different from the fn output * type when we have additional tagged outputs */ -public class DoFnOperator +public class DoFnOperator extends AbstractStreamOperator> implements OneInputStreamOperator, WindowedValue>, TwoInputStreamOperator, RawUnionValue, WindowedValue>, KeyGroupCheckpointedOperator, Triggerable { - protected DoFn doFn; + protected DoFn doFn; protected final SerializedPipelineOptions serializedOptions; - protected final TupleTag mainOutputTag; + protected final TupleTag mainOutputTag; protected final List> additionalOutputTags; protected final Collection> sideInputs; @@ -118,8 +118,8 @@ public class DoFnOperator protected final OutputManagerFactory outputManagerFactory; - protected transient DoFnRunner doFnRunner; - protected transient PushbackSideInputDoFnRunner pushbackDoFnRunner; + protected transient DoFnRunner doFnRunner; + protected transient PushbackSideInputDoFnRunner pushbackDoFnRunner; protected transient SideInputHandler sideInputHandler; @@ -127,7 +127,7 @@ public class DoFnOperator protected transient DoFnRunners.OutputManager outputManager; - private transient DoFnInvoker doFnInvoker; + private transient DoFnInvoker doFnInvoker; protected transient long currentInputWatermark; @@ -156,10 +156,10 @@ public class DoFnOperator private transient Optional pushedBackWatermark; public DoFnOperator( - DoFn doFn, + DoFn doFn, String stepName, Coder> inputCoder, - TupleTag mainOutputTag, + TupleTag mainOutputTag, List> additionalOutputTags, OutputManagerFactory outputManagerFactory, WindowingStrategy windowingStrategy, @@ -192,7 +192,7 @@ private org.apache.beam.runners.core.StepContext createStepContext() { // allow overriding this in WindowDoFnOperator because this one dynamically creates // the DoFn - protected DoFn getDoFn() { + protected DoFn getDoFn() { return doFn; } diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SplittableDoFnOperator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SplittableDoFnOperator.java index 4ac2ff5221383..5d08eba96a28e 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SplittableDoFnOperator.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SplittableDoFnOperator.java @@ -55,19 +55,19 @@ * the {@code @ProcessElement} method of a splittable {@link DoFn}. */ public class SplittableDoFnOperator< - InputT, FnOutputT, OutputT, RestrictionT, TrackerT extends RestrictionTracker> + InputT, OutputT, RestrictionT, TrackerT extends RestrictionTracker> extends DoFnOperator< - KeyedWorkItem>, FnOutputT, OutputT> { + KeyedWorkItem>, OutputT> { private transient ScheduledExecutorService executorService; public SplittableDoFnOperator( - DoFn>, FnOutputT> doFn, + DoFn>, OutputT> doFn, String stepName, Coder< WindowedValue< KeyedWorkItem>>> inputCoder, - TupleTag mainOutputTag, + TupleTag mainOutputTag, List> additionalOutputTags, OutputManagerFactory outputManagerFactory, WindowingStrategy windowingStrategy, @@ -120,10 +120,10 @@ public TimerInternals timerInternalsForKey(String key) { new OutputAndTimeBoundedSplittableProcessElementInvoker<>( doFn, serializedOptions.getPipelineOptions(), - new OutputWindowedValue() { + new OutputWindowedValue() { @Override public void outputWindowedValue( - FnOutputT output, + OutputT output, Instant timestamp, Collection windows, PaneInfo pane) { diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java index ea578b98bcd0f..78d585e3e7869 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java @@ -46,7 +46,7 @@ * Flink operator for executing window {@link DoFn DoFns}. */ public class WindowDoFnOperator - extends DoFnOperator, KV, KV> { + extends DoFnOperator, KV> { private final SystemReduceFn systemReduceFn; diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java index bc0b1c2d62ed5..d0281eccbf666 100644 --- a/runners/flink/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java +++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java @@ -136,7 +136,7 @@ public void testNonNull() { @Test(expected = Exception.class) public void parDoBaseClassPipelineOptionsNullTest() { - DoFnOperator doFnOperator = new DoFnOperator<>( + DoFnOperator doFnOperator = new DoFnOperator<>( new TestDoFn(), "stepName", WindowedValue.getValueOnlyCoder(StringUtf8Coder.of()), @@ -157,7 +157,7 @@ public void parDoBaseClassPipelineOptionsNullTest() { @Test public void parDoBaseClassPipelineOptionsSerializationTest() throws Exception { - DoFnOperator doFnOperator = new DoFnOperator<>( + DoFnOperator doFnOperator = new DoFnOperator<>( new TestDoFn(), "stepName", WindowedValue.getValueOnlyCoder(StringUtf8Coder.of()), @@ -173,7 +173,7 @@ public void parDoBaseClassPipelineOptionsSerializationTest() throws Exception { final byte[] serialized = SerializationUtils.serialize(doFnOperator); @SuppressWarnings("unchecked") - DoFnOperator deserialized = SerializationUtils.deserialize(serialized); + DoFnOperator deserialized = SerializationUtils.deserialize(serialized); TypeInformation> typeInformation = TypeInformation.of( new TypeHint>() {}); diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java index 132242efd2500..ad9d236c53484 100644 --- a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java +++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java @@ -111,7 +111,7 @@ public void testSingleOutput() throws Exception { TupleTag outputTag = new TupleTag<>("main-output"); - DoFnOperator doFnOperator = new DoFnOperator<>( + DoFnOperator doFnOperator = new DoFnOperator<>( new IdentityDoFn(), "stepName", windowedValueCoder, @@ -155,7 +155,7 @@ public void testMultiOutputOutput() throws Exception { .put(additionalOutput2, new OutputTag(additionalOutput2.getId()){}) .build(); - DoFnOperator doFnOperator = new DoFnOperator<>( + DoFnOperator doFnOperator = new DoFnOperator<>( new MultiOutputDoFn(additionalOutput1, additionalOutput2), "stepName", windowedValueCoder, @@ -223,7 +223,7 @@ public void processElement(ProcessContext context) { TupleTag outputTag = new TupleTag<>("main-output"); - DoFnOperator doFnOperator = new DoFnOperator<>( + DoFnOperator doFnOperator = new DoFnOperator<>( fn, "stepName", windowedValueCoder, @@ -335,8 +335,7 @@ public void onTimer(OnTimerContext context, @StateId(stateId) ValueState TupleTag> outputTag = new TupleTag<>("main-output"); - DoFnOperator< - KV, KV, KV> doFnOperator = + DoFnOperator, KV> doFnOperator = new DoFnOperator<>( fn, "stepName", @@ -433,7 +432,7 @@ public void testSideInputs(boolean keyed) throws Exception { keyCoder = StringUtf8Coder.of(); } - DoFnOperator doFnOperator = new DoFnOperator<>( + DoFnOperator doFnOperator = new DoFnOperator<>( new IdentityDoFn(), "stepName", windowedValueCoder,