From 97a772a7011dd00a0ede989172a3d3ac71aa9562 Mon Sep 17 00:00:00 2001 From: Eugene Kirpichov Date: Fri, 3 Mar 2017 10:53:28 -0800 Subject: [PATCH 1/4] Removes ParDo.Unbound and UnboundMulti --- .../apache/beam/examples/complete/TfIdf.java | 3 +- .../examples/cookbook/FilterExamples.java | 3 +- .../examples/complete/game/GameStats.java | 8 +- .../apex/translation/ParDoTranslatorTest.java | 11 +- .../StatefulParDoEvaluatorFactoryTest.java | 3 +- .../beam/runners/flink/examples/TFIDF.java | 3 +- .../BatchStatefulParDoOverridesTest.java | 2 +- .../DataflowPipelineTranslatorTest.java | 4 +- .../org/apache/beam/sdk/testing/PAssert.java | 2 +- .../apache/beam/sdk/transforms/Combine.java | 4 +- .../org/apache/beam/sdk/transforms/ParDo.java | 297 +----------------- .../apache/beam/sdk/transforms/Partition.java | 4 +- .../apache/beam/sdk/transforms/Sample.java | 7 +- .../apache/beam/sdk/metrics/MetricsTest.java | 5 +- .../beam/sdk/transforms/FlattenTest.java | 4 +- .../apache/beam/sdk/transforms/ParDoTest.java | 54 ++-- .../apache/beam/sdk/transforms/ViewTest.java | 152 ++++----- .../beam/sdk/values/TypedPValueTest.java | 4 +- 18 files changed, 154 insertions(+), 416 deletions(-) diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/TfIdf.java b/examples/java/src/main/java/org/apache/beam/examples/complete/TfIdf.java index 9de5617fe83f..f7904d3ea27a 100644 --- a/examples/java/src/main/java/org/apache/beam/examples/complete/TfIdf.java +++ b/examples/java/src/main/java/org/apache/beam/examples/complete/TfIdf.java @@ -323,7 +323,6 @@ public void processElement(ProcessContext c) { // presented to each invocation of the DoFn. PCollection> wordToDf = wordToDocCount .apply("ComputeDocFrequencies", ParDo - .withSideInputs(totalDocuments) .of(new DoFn, KV>() { @ProcessElement public void processElement(ProcessContext c) { @@ -335,7 +334,7 @@ public void processElement(ProcessContext c) { c.output(KV.of(word, documentFrequency)); } - })); + }).withSideInputs(totalDocuments)); // Join the term frequency and document frequency // collections, each keyed on the word. diff --git a/examples/java/src/main/java/org/apache/beam/examples/cookbook/FilterExamples.java b/examples/java/src/main/java/org/apache/beam/examples/cookbook/FilterExamples.java index 815ac7b1f6d2..fed9db79d1b2 100644 --- a/examples/java/src/main/java/org/apache/beam/examples/cookbook/FilterExamples.java +++ b/examples/java/src/main/java/org/apache/beam/examples/cookbook/FilterExamples.java @@ -175,7 +175,6 @@ public PCollection expand(PCollection rows) { // We'll only output readings with temperatures below this mean. PCollection filteredRows = monthFilteredRows .apply("ParseAndFilter", ParDo - .withSideInputs(globalMeanTemp) .of(new DoFn() { @ProcessElement public void processElement(ProcessContext c) { @@ -185,7 +184,7 @@ public void processElement(ProcessContext c) { c.output(c.element()); } } - })); + }).withSideInputs(globalMeanTemp)); return filteredRows; } diff --git a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java index c880061cdebd..93e8254787a1 100644 --- a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java +++ b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java @@ -125,7 +125,6 @@ public PCollection> expand(PCollection> PCollection> filtered = sumScores .apply("ProcessAndFilter", ParDo // use the derived mean total score as a side input - .withSideInputs(globalMeanScore) .of(new DoFn, KV>() { private final Aggregator numSpammerUsers = createAggregator("SpammerUsers", Sum.ofLongs()); @@ -140,7 +139,7 @@ public void processElement(ProcessContext c) { c.output(c.element()); } } - })); + }).withSideInputs(globalMeanScore)); return filtered; } } @@ -288,7 +287,6 @@ public static void main(String[] args) throws Exception { FixedWindows.of(Duration.standardMinutes(options.getFixedWindowDuration())))) // Filter out the detected spammer users, using the side input derived above. .apply("FilterOutSpammers", ParDo - .withSideInputs(spammersView) .of(new DoFn() { @ProcessElement public void processElement(ProcessContext c) { @@ -297,8 +295,8 @@ public void processElement(ProcessContext c) { c.output(c.element()); } } - })) - // Extract and sum teamname/score pairs from the event data. + }).withSideInputs(spammersView)) + // Extract and sum teamname/score pairs from the event data. .apply("ExtractTeamScore", new ExtractAndSumScore("team")) // [END DocInclude_FilterAndCalc] // Write the result to BigQuery diff --git a/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/ParDoTranslatorTest.java b/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/ParDoTranslatorTest.java index 83e68f7822d4..3bcba00e685d 100644 --- a/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/ParDoTranslatorTest.java +++ b/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/ParDoTranslatorTest.java @@ -281,13 +281,14 @@ public void testMultiOutputParDoWithSideInputs() throws Exception { PCollectionTuple outputs = pipeline .apply(Create.of(inputs)) - .apply(ParDo.withSideInputs(sideInput1) - .withSideInputs(sideInputUnread) - .withSideInputs(sideInput2) - .withOutputTags(mainOutputTag, TupleTagList.of(sideOutputTag)) + .apply(ParDo .of(new TestMultiOutputWithSideInputsFn( Arrays.asList(sideInput1, sideInput2), - Arrays.>asList()))); + Arrays.>asList())) + .withSideInputs(sideInput1) + .withSideInputs(sideInputUnread) + .withSideInputs(sideInput2) + .withOutputTags(mainOutputTag, TupleTagList.of(sideOutputTag))); outputs.get(mainOutputTag).apply(ParDo.of(new EmbeddedCollector())); outputs.get(sideOutputTag).setCoder(VoidCoder.of()); diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactoryTest.java index 9bf6bc93573b..946cd6913cc8 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactoryTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactoryTest.java @@ -243,7 +243,7 @@ public void testUnprocessedElements() throws Exception { mainInput .apply( new ParDoMultiOverrideFactory.GbkThenStatefulParDo<>( - ParDo.withSideInputs(sideInput) + ParDo .of( new DoFn, Integer>() { @StateId(stateId) @@ -253,6 +253,7 @@ public void testUnprocessedElements() throws Exception { @ProcessElement public void process(ProcessContext c) {} }) + .withSideInputs(sideInput) .withOutputTags(mainOutput, TupleTagList.empty()))) .get(mainOutput) .setCoder(VarIntCoder.of()); diff --git a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/TFIDF.java b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/TFIDF.java index 89e261b52b30..8e1df08144f8 100644 --- a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/TFIDF.java +++ b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/TFIDF.java @@ -347,7 +347,6 @@ public void processElement(ProcessContext c) { // presented to each invocation of the DoFn. PCollection> wordToDf = wordToDocCount .apply("ComputeDocFrequencies", ParDo - .withSideInputs(totalDocuments) .of(new DoFn, KV>() { private static final long serialVersionUID = 0; @@ -361,7 +360,7 @@ public void processElement(ProcessContext c) { c.output(KV.of(word, documentFrequency)); } - })); + }).withSideInputs(totalDocuments)); // Join the term frequency and document frequency // collections, each keyed on the word. diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/BatchStatefulParDoOverridesTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/BatchStatefulParDoOverridesTest.java index 899902a3d603..f995ff322171 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/BatchStatefulParDoOverridesTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/BatchStatefulParDoOverridesTest.java @@ -87,7 +87,7 @@ public void testMultiOutputOverrideNonCrashing() throws Exception { DummyStatefulDoFn fn = new DummyStatefulDoFn(); pipeline .apply(Create.of(KV.of(1, 2))) - .apply(ParDo.withOutputTags(mainOutputTag, TupleTagList.empty()).of(fn)); + .apply(ParDo.of(fn).withOutputTags(mainOutputTag, TupleTagList.empty())); DataflowRunner runner = DataflowRunner.fromOptions(options); runner.replaceTransforms(pipeline); diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java index 7e2eb5ff3b45..3d98aa9e2d9b 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java @@ -851,7 +851,7 @@ public void testBatchStatefulParDoTranslation() throws Exception { pipeline .apply(Create.of(KV.of(1, 1))) .apply( - ParDo.withOutputTags(mainOutputTag, TupleTagList.empty()).of( + ParDo.of( new DoFn, Integer>() { @StateId("unused") final StateSpec> stateSpec = @@ -861,7 +861,7 @@ public void testBatchStatefulParDoTranslation() throws Exception { public void process(ProcessContext c) { // noop } - })); + }).withOutputTags(mainOutputTag, TupleTagList.empty())); runner.replaceTransforms(pipeline); diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java index 412753c60a9f..56df449ce516 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java @@ -1110,7 +1110,7 @@ public PDone expand(PBegin input) { .apply("WindowToken", windowToken) .apply( "RunChecks", - ParDo.withSideInputs(actual).of(new SideInputCheckerDoFn<>(checkerFn, actual, site))); + ParDo.of(new SideInputCheckerDoFn<>(checkerFn, actual, site)).withSideInputs(actual)); return PDone.in(input.getPipeline()); } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java index 3215ffae4590..6c5c1f121b68 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java @@ -1484,7 +1484,7 @@ private PCollection insertDefaultValueIfEmpty(PCollection mayb final OutputT defaultValue = fn.defaultValue(); PCollection defaultIfEmpty = maybeEmpty.getPipeline() .apply("CreateVoid", Create.of((Void) null).withCoder(VoidCoder.of())) - .apply("ProduceDefault", ParDo.withSideInputs(maybeEmptyView).of( + .apply("ProduceDefault", ParDo.of( new DoFn() { @ProcessElement public void processElement(ProcessContext c) { @@ -1493,7 +1493,7 @@ public void processElement(ProcessContext c) { c.output(defaultValue); } } - })) + }).withSideInputs(maybeEmptyView)) .setCoder(maybeEmpty.getCoder()) .setWindowingStrategyInternal(maybeEmpty.getWindowingStrategy()); diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java index 76c06b6dfe19..854084a7fde5 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java @@ -22,6 +22,7 @@ import com.google.common.collect.ImmutableList; import java.io.Serializable; import java.util.Arrays; +import java.util.Collections; import java.util.List; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.coders.CannotProvideCoderException; @@ -150,7 +151,7 @@ * {@link PCollectionView PCollectionViews} express styles of accessing * {@link PCollection PCollections} computed by earlier pipeline operations, * passed in to the {@link ParDo} transform using - * {@link #withSideInputs}, and their contents accessible to each of + * {@link ParDo.Bound#withSideInputs}, and their contents accessible to each of * the {@link DoFn} operations via {@link DoFn.ProcessContext#sideInput sideInput}. * For example: * @@ -180,7 +181,7 @@ * {@link PCollection PCollections}, each keyed by a distinct {@link TupleTag}, * and bundled in a {@link PCollectionTuple}. The {@link TupleTag TupleTags} * to be used for the output {@link PCollectionTuple} are specified by - * invoking {@link #withOutputTags}. Unconsumed side outputs do not + * invoking {@link ParDo.Bound#withOutputTags}. Unconsumed side outputs do not * necessarily need to be explicitly specified, even if the {@link DoFn} * generates them. Within the {@link DoFn}, an element is added to the * main output {@link PCollection} as normal, using @@ -203,11 +204,6 @@ * PCollectionTuple results = * words.apply( * ParDo - * // Specify the main and consumed side output tags of the - * // PCollectionTuple result: - * .withOutputTags(wordsBelowCutOffTag, - * TupleTagList.of(wordLengthsAboveCutOffTag) - * .and(markedWordsTag)) * .of(new DoFn() { * // Create a tag for the unconsumed side output. * final TupleTag specialWordsTag = @@ -230,7 +226,12 @@ * // Emit this word to the unconsumed side output. * c.sideOutput(specialWordsTag, word); * } - * }})); + * }}) + * // Specify the main and consumed side output tags of the + * // PCollectionTuple result: + * .withOutputTags(wordsBelowCutOffTag, + * TupleTagList.of(wordLengthsAboveCutOffTag) + * .and(markedWordsTag))); * // Extract the PCollection results, by tag. * PCollection wordsBelowCutOff = * results.get(wordsBelowCutOffTag); @@ -240,35 +241,6 @@ * results.get(markedWordsTag); * } * - *

Properties May Be Specified In Any Order

- * - *

Several properties can be specified for a {@link ParDo} - * {@link PTransform}, including side inputs, side output tags, - * and {@link DoFn} to invoke. Only the {@link DoFn} is required; side inputs and side - * output tags are only specified when they're needed. These - * properties can be specified in any order, as long as they're - * specified before the {@link ParDo} {@link PTransform} is applied. - * - *

The approach used to allow these properties to be specified in - * any order, with some properties omitted, is to have each of the - * property "setter" methods defined as static factory methods on - * {@link ParDo} itself, which return an instance of either - * {@link ParDo.Unbound} or - * {@link ParDo.Bound} nested classes, each of which offer - * property setter instance methods to enable setting additional - * properties. {@link ParDo.Bound} is used for {@link ParDo} - * transforms whose {@link DoFn} is specified and whose input and - * output static types have been bound. {@link ParDo.Unbound ParDo.Unbound} is used - * for {@link ParDo} transforms that have not yet had their - * {@link DoFn} specified. Only {@link ParDo.Bound} instances can be - * applied. - * - *

Another benefit of this approach is that it reduces the number - * of type parameters that need to be specified manually. In - * particular, the input and output types of the {@link ParDo} - * {@link PTransform} are inferred automatically from the type - * parameters of the {@link DoFn} argument passed to {@link ParDo#of}. - * *

Output Coders

* *

By default, the {@link Coder Coder<OutputT>} for the @@ -442,92 +414,17 @@ */ public class ParDo { - /** - * Creates a {@link ParDo} {@link PTransform} with the given - * side inputs. - * - *

Side inputs are {@link PCollectionView PCollectionViews}, whose contents are - * computed during pipeline execution and then made accessible to - * {@link DoFn} code via {@link DoFn.ProcessContext#sideInput sideInput}. Each - * invocation of the {@link DoFn} receives the same values for these - * side inputs. - * - *

See the discussion of Side Inputs above for more explanation. - * - *

The resulting {@link PTransform} is incomplete, and its - * input/output types are not yet bound. Use - * {@link ParDo.Unbound#of} to specify the {@link DoFn} to - * invoke, which will also bind the input/output types of this - * {@link PTransform}. - */ - public static Unbound withSideInputs(PCollectionView... sideInputs) { - return new Unbound().withSideInputs(sideInputs); - } - - /** - * Creates a {@link ParDo} with the given side inputs. - * - *

Side inputs are {@link PCollectionView}s, whose contents are - * computed during pipeline execution and then made accessible to - * {@link DoFn} code via {@link DoFn.ProcessContext#sideInput sideInput}. - * - *

See the discussion of Side Inputs above for more explanation. - * - *

The resulting {@link PTransform} is incomplete, and its - * input/output types are not yet bound. Use - * {@link ParDo.Unbound#of} to specify the {@link DoFn} to - * invoke, which will also bind the input/output types of this - * {@link PTransform}. - */ - public static Unbound withSideInputs( - Iterable> sideInputs) { - return new Unbound().withSideInputs(sideInputs); - } - - /** - * Creates a multi-output {@link ParDo} {@link PTransform} whose - * output {@link PCollection}s will be referenced using the given main - * output and side output tags. - * - *

{@link TupleTag TupleTags} are used to name (with its static element - * type {@code T}) each main and side output {@code PCollection}. - * This {@link PTransform PTransform's} {@link DoFn} emits elements to the main - * output {@link PCollection} as normal, using - * {@link DoFn.Context#output}. The {@link DoFn} emits elements to - * a side output {@code PCollection} using - * {@link DoFn.Context#sideOutput}, passing that side output's tag - * as an argument. The result of invoking this {@link PTransform} - * will be a {@link PCollectionTuple}, and any of the the main and - * side output {@code PCollection}s can be retrieved from it via - * {@link PCollectionTuple#get}, passing the output's tag as an - * argument. - * - *

See the discussion of Side Outputs above for more explanation. - * - *

The resulting {@link PTransform} is incomplete, and its input - * type is not yet bound. Use {@link ParDo.UnboundMulti#of} - * to specify the {@link DoFn} to invoke, which will also bind the - * input type of this {@link PTransform}. - */ - public static UnboundMulti withOutputTags( - TupleTag mainOutputTag, - TupleTagList sideOutputTags) { - return new Unbound().withOutputTags(mainOutputTag, sideOutputTags); - } - /** * Creates a {@link ParDo} {@link PTransform} that will invoke the * given {@link DoFn} function. * - *

The resulting {@link PTransform PTransform's} types have been bound, with the - * input being a {@code PCollection} and the output a - * {@code PCollection}, inferred from the types of the argument - * {@code DoFn}. It is ready to be applied, or further + *

The resulting {@link PTransform PTransform} is ready to be applied, or further * properties can be set on it first. */ public static Bound of(DoFn fn) { validate(fn); - return new Unbound().of(fn, displayDataForFn(fn)); + return new Bound( + fn, Collections.>emptyList(), displayDataForFn(fn)); } private static DisplayData.ItemSpec> displayDataForFn(T fn) { @@ -587,85 +484,6 @@ private static void validate(DoFn fn) { } } - /** - * An incomplete {@link ParDo} transform, with unbound input/output types. - * - *

Before being applied, {@link ParDo.Unbound#of} must be - * invoked to specify the {@link DoFn} to invoke, which will also - * bind the input/output types of this {@link PTransform}. - */ - public static class Unbound { - private final List> sideInputs; - - Unbound() { - this(ImmutableList.>of()); - } - - Unbound(List> sideInputs) { - this.sideInputs = sideInputs; - } - - /** - * Returns a new {@link ParDo} transform that's like this - * transform but with the specified additional side inputs. - * Does not modify this transform. The resulting transform is - * still incomplete. - * - *

See the discussion of Side Inputs above and on - * {@link ParDo#withSideInputs} for more explanation. - */ - public Unbound withSideInputs(PCollectionView... sideInputs) { - return withSideInputs(Arrays.asList(sideInputs)); - } - - /** - * Returns a new {@link ParDo} transform that is like this - * transform but with the specified additional side inputs. Does not modify - * this transform. The resulting transform is still incomplete. - * - *

See the discussion of Side Inputs above and on - * {@link ParDo#withSideInputs} for more explanation. - */ - public Unbound withSideInputs( - Iterable> sideInputs) { - ImmutableList.Builder> builder = ImmutableList.builder(); - builder.addAll(this.sideInputs); - builder.addAll(sideInputs); - return new Unbound(builder.build()); - } - - /** - * Returns a new {@link ParDo} {@link PTransform} that's like this - * transform but which will invoke the given {@link DoFn} - * function, and which has its input and output types bound. Does - * not modify this transform. The resulting {@link PTransform} is - * sufficiently specified to be applied, but more properties can - * still be specified. - */ - public Bound of(DoFn fn) { - validate(fn); - return of(fn, displayDataForFn(fn)); - } - - /** - * Returns a new multi-output {@link ParDo} transform that's like this transform but with the - * specified main and side output tags. Does not modify this transform. The resulting transform - * is still incomplete. - * - *

See the discussion of Side Outputs above and on {@link ParDo#withOutputTags} for more - * explanation. - */ - public UnboundMulti withOutputTags( - TupleTag mainOutputTag, TupleTagList sideOutputTags) { - return new UnboundMulti<>(sideInputs, mainOutputTag, sideOutputTags); - } - - private Bound of( - DoFn doFn, DisplayData.ItemSpec> fnDisplayData) { - return new Bound<>(doFn, sideInputs, fnDisplayData); - } - } - /** * A {@link PTransform} that, when applied to a {@code PCollection}, * invokes a user-specified {@code DoFn} on all its elements, @@ -698,8 +516,7 @@ public static class Bound * {@link PTransform} but with the specified additional side inputs. Does not * modify this {@link PTransform}. * - *

See the discussion of Side Inputs above and on - * {@link ParDo#withSideInputs} for more explanation. + *

See the discussion of Side Inputs above for more explanation. */ public Bound withSideInputs(PCollectionView... sideInputs) { return withSideInputs(Arrays.asList(sideInputs)); @@ -710,8 +527,7 @@ public Bound withSideInputs(PCollectionView... sideInputs) { * {@link PTransform} but with the specified additional side inputs. Does not * modify this {@link PTransform}. * - *

See the discussion of Side Inputs above and on - * {@link ParDo#withSideInputs} for more explanation. + *

See the discussion of Side Inputs above for more explanation. */ public Bound withSideInputs( Iterable> sideInputs) { @@ -729,8 +545,7 @@ public Bound withSideInputs( * PTransform} but with the specified main and side output tags. Does not modify this {@link * PTransform}. * - *

See the discussion of Side Outputs above and on {@link ParDo#withOutputTags} for more - * explanation. + *

See the discussion of Side Outputs above for more explanation. */ public BoundMulti withOutputTags( TupleTag mainOutputTag, TupleTagList sideOutputTags) { @@ -780,82 +595,6 @@ public List> getSideInputs() { } } - /** - * An incomplete multi-output {@link ParDo} transform, with unbound - * input type. - * - *

Before being applied, {@link ParDo.UnboundMulti#of} must be - * invoked to specify the {@link DoFn} to invoke, which will also - * bind the input type of this {@link PTransform}. - * - * @param the type of the main output {@code PCollection} elements - */ - public static class UnboundMulti { - private final List> sideInputs; - private final TupleTag mainOutputTag; - private final TupleTagList sideOutputTags; - - UnboundMulti(List> sideInputs, - TupleTag mainOutputTag, - TupleTagList sideOutputTags) { - this.sideInputs = sideInputs; - this.mainOutputTag = mainOutputTag; - this.sideOutputTags = sideOutputTags; - } - - /** - * Returns a new multi-output {@link ParDo} transform that's like - * this transform but with the specified side inputs. Does not - * modify this transform. The resulting transform is still - * incomplete. - * - *

See the discussion of Side Inputs above and on - * {@link ParDo#withSideInputs} for more explanation. - */ - public UnboundMulti withSideInputs( - PCollectionView... sideInputs) { - return withSideInputs(Arrays.asList(sideInputs)); - } - - /** - * Returns a new multi-output {@link ParDo} transform that's like - * this transform but with the specified additional side inputs. Does not - * modify this transform. The resulting transform is still - * incomplete. - * - *

See the discussion of Side Inputs above and on - * {@link ParDo#withSideInputs} for more explanation. - */ - public UnboundMulti withSideInputs( - Iterable> sideInputs) { - return new UnboundMulti<>( - ImmutableList.>builder() - .addAll(this.sideInputs) - .addAll(sideInputs) - .build(), - mainOutputTag, - sideOutputTags); - } - - /** - * Returns a new multi-output {@link ParDo} {@link PTransform} - * that's like this transform but which will invoke the given - * {@link DoFn} function, and which has its input type bound. - * Does not modify this transform. The resulting - * {@link PTransform} is sufficiently specified to be applied, but - * more properties can still be specified. - */ - public BoundMulti of(DoFn fn) { - validate(fn); - return of(fn, displayDataForFn(fn)); - } - - private BoundMulti of( - DoFn fn, DisplayData.ItemSpec> fnDisplayData) { - return new BoundMulti<>(fn, sideInputs, mainOutputTag, sideOutputTags, fnDisplayData); - } - } - /** * A {@link PTransform} that, when applied to a * {@code PCollection}, invokes a user-specified @@ -893,8 +632,7 @@ public static class BoundMulti * that's like this {@link PTransform} but with the specified additional side * inputs. Does not modify this {@link PTransform}. * - *

See the discussion of Side Inputs above and on - * {@link ParDo#withSideInputs} for more explanation. + *

See the discussion of Side Inputs above for more explanation. */ public BoundMulti withSideInputs( PCollectionView... sideInputs) { @@ -906,8 +644,7 @@ public BoundMulti withSideInputs( * PTransform} but with the specified additional side inputs. Does not modify this {@link * PTransform}. * - *

See the discussion of Side Inputs above and on {@link ParDo#withSideInputs} for more - * explanation. + *

See the discussion of Side Inputs above for more explanation. */ public BoundMulti withSideInputs( Iterable> sideInputs) { diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Partition.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Partition.java index e0b2b61fd7e9..2031bc94a379 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Partition.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Partition.java @@ -105,8 +105,8 @@ public PCollectionList expand(PCollection in) { PCollectionTuple outputs = in.apply( ParDo - .withOutputTags(new TupleTag(){}, outputTags) - .of(partitionDoFn)); + .of(partitionDoFn) + .withOutputTags(new TupleTag(){}, outputTags)); PCollectionList pcs = PCollectionList.empty(in.getPipeline()); Coder coder = in.getCoder(); diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Sample.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Sample.java index 3734f7b0c0f5..3d35c808265d 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Sample.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Sample.java @@ -146,12 +146,9 @@ private Any(long limit) { @Override public PCollection expand(PCollection in) { PCollectionView> iterableView = in.apply(View.asIterable()); - return - in.getPipeline() + return in.getPipeline() .apply(Create.of((Void) null).withCoder(VoidCoder.of())) - .apply(ParDo - .withSideInputs(iterableView) - .of(new SampleAnyDoFn<>(limit, iterableView))) + .apply(ParDo.of(new SampleAnyDoFn<>(limit, iterableView)).withSideInputs(iterableView)) .setCoder(in.getCoder()); } diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsTest.java index 265b51930930..361bf1baf181 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsTest.java @@ -207,7 +207,7 @@ public void finishBundle(Context c) { bundleDist.update(40L); } })) - .apply("MyStep2", ParDo.withOutputTags(output1, TupleTagList.of(output2)) + .apply("MyStep2", ParDo .of(new DoFn() { @SuppressWarnings("unused") @ProcessElement @@ -221,7 +221,8 @@ public void processElement(ProcessContext c) { c.output(element); c.sideOutput(output2, element); } - })); + }) + .withOutputTags(output1, TupleTagList.of(output2))); PipelineResult result = pipeline.run(); result.waitUntilFinish(); diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FlattenTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FlattenTest.java index 1753c49759da..a4f25452bb2c 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FlattenTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FlattenTest.java @@ -198,14 +198,14 @@ public void testEmptyFlattenAsSideInput() { PCollection output = p .apply(Create.of((Void) null).withCoder(VoidCoder.of())) - .apply(ParDo.withSideInputs(view).of(new DoFn() { + .apply(ParDo.of(new DoFn() { @ProcessElement public void processElement(ProcessContext c) { for (String side : c.sideInput(view)) { c.output(side); } } - })); + }).withSideInputs(view)); PAssert.that(output).empty(); p.run(); diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java index 336f4c01cd08..9a4fd15882aa 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java @@ -497,12 +497,13 @@ public void testParDoWithOnlySideOutputs() { PCollectionTuple outputs = pipeline .apply(Create.of(inputs)) - .apply(ParDo.withOutputTags(mainOutputTag, TupleTagList.of(sideOutputTag)) + .apply(ParDo .of(new DoFn(){ @ProcessElement public void processElement(ProcessContext c) { c.sideOutput(sideOutputTag, c.element()); - }})); + }}) + .withOutputTags(mainOutputTag, TupleTagList.of(sideOutputTag))); PAssert.that(outputs.get(mainOutputTag)).empty(); PAssert.that(outputs.get(sideOutputTag)).containsInAnyOrder(inputs); @@ -586,10 +587,11 @@ public void testParDoWithSideInputs() { PCollection output = pipeline .apply(Create.of(inputs)) - .apply(ParDo.withSideInputs(sideInput1, sideInputUnread, sideInput2) + .apply(ParDo .of(new TestDoFn( Arrays.asList(sideInput1, sideInput2), - Arrays.>asList()))); + Arrays.>asList())) + .withSideInputs(sideInput1, sideInputUnread, sideInput2)); PAssert.that(output) .satisfies(ParDoTest.HasExpectedOutput @@ -617,12 +619,13 @@ public void testParDoWithSideInputsIsCumulative() { PCollection output = pipeline .apply(Create.of(inputs)) - .apply(ParDo.withSideInputs(sideInput1) - .withSideInputs(sideInputUnread) - .withSideInputs(sideInput2) + .apply(ParDo .of(new TestDoFn( Arrays.asList(sideInput1, sideInput2), - Arrays.>asList()))); + Arrays.>asList())) + .withSideInputs(sideInput1) + .withSideInputs(sideInputUnread) + .withSideInputs(sideInput2)); PAssert.that(output) .satisfies(ParDoTest.HasExpectedOutput @@ -653,13 +656,14 @@ public void testMultiOutputParDoWithSideInputs() { PCollectionTuple outputs = pipeline .apply(Create.of(inputs)) - .apply(ParDo.withSideInputs(sideInput1) - .withSideInputs(sideInputUnread) - .withSideInputs(sideInput2) - .withOutputTags(mainOutputTag, TupleTagList.of(sideOutputTag)) + .apply(ParDo .of(new TestDoFn( Arrays.asList(sideInput1, sideInput2), - Arrays.>asList()))); + Arrays.>asList())) + .withSideInputs(sideInput1) + .withSideInputs(sideInputUnread) + .withSideInputs(sideInput2) + .withOutputTags(mainOutputTag, TupleTagList.of(sideOutputTag))); PAssert.that(outputs.get(mainOutputTag)) .satisfies(ParDoTest.HasExpectedOutput @@ -690,13 +694,14 @@ public void testMultiOutputParDoWithSideInputsIsCumulative() { PCollectionTuple outputs = pipeline .apply(Create.of(inputs)) - .apply(ParDo.withSideInputs(sideInput1) - .withSideInputs(sideInputUnread) - .withSideInputs(sideInput2) - .withOutputTags(mainOutputTag, TupleTagList.of(sideOutputTag)) + .apply(ParDo .of(new TestDoFn( Arrays.asList(sideInput1, sideInput2), - Arrays.>asList()))); + Arrays.>asList())) + .withSideInputs(sideInput1) + .withSideInputs(sideInputUnread) + .withSideInputs(sideInput2) + .withOutputTags(mainOutputTag, TupleTagList.of(sideOutputTag))); PAssert.that(outputs.get(mainOutputTag)) .satisfies(ParDoTest.HasExpectedOutput @@ -1201,7 +1206,6 @@ public void testMainOutputApplySideOutputNoCoder() { .apply(Create.of(new TestDummy()) .withCoder(TestDummyCoder.of())) .apply(ParDo - .withOutputTags(mainOutputTag, TupleTagList.of(sideOutputTag)) .of( new DoFn() { @ProcessElement @@ -1211,7 +1215,8 @@ public void processElement(ProcessContext context) { context.sideOutput(sideOutputTag, element); } }) - ); + .withOutputTags(mainOutputTag, TupleTagList.of(sideOutputTag)) + ); // Before fix, tuple.get(mainOutputTag).apply(...) would indirectly trigger // tuple.get(sideOutputTag).finishSpecifyingOutput(), which would crash @@ -1263,14 +1268,15 @@ public void testParDoSideOutputWithTimestamp() { PCollection output = input - .apply(ParDo.withOutputTags(mainOutputTag, TupleTagList.of(sideOutputTag)).of( + .apply(ParDo.of( new DoFn() { @ProcessElement public void processElement(ProcessContext c) { c.sideOutputWithTimestamp( sideOutputTag, c.element(), new Instant(c.element().longValue())); } - })).get(sideOutputTag) + }).withOutputTags(mainOutputTag, TupleTagList.of(sideOutputTag))) + .get(sideOutputTag) .apply(ParDo.of(new TestShiftTimestampDoFn(Duration.ZERO, Duration.ZERO))) .apply(ParDo.of(new TestFormatTimestampDoFn())); @@ -2297,8 +2303,8 @@ public void populateDisplayData(Builder builder) { }; ParDo.BoundMulti parDo = ParDo - .withOutputTags(new TupleTag(), TupleTagList.empty()) - .of(fn); + .of(fn) + .withOutputTags(new TupleTag(), TupleTagList.empty()); DisplayData displayData = DisplayData.from(parDo); assertThat(displayData, includesDisplayDataFor("fn", fn)); diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ViewTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ViewTest.java index 740d808c790c..867fe0a6445f 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ViewTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ViewTest.java @@ -99,12 +99,12 @@ public void testSingletonSideInput() { PCollection output = pipeline.apply("Create123", Create.of(1, 2, 3)) .apply("OutputSideInputs", - ParDo.withSideInputs(view).of(new DoFn() { + ParDo.of(new DoFn() { @ProcessElement public void processElement(ProcessContext c) { c.output(c.sideInput(view)); } - })); + }).withSideInputs(view)); PAssert.that(output).containsInAnyOrder(47, 47, 47); @@ -129,12 +129,12 @@ public void testWindowedSingletonSideInput() { TimestampedValue.of(3, new Instant(12)))) .apply("MainWindowInto", Window.into(FixedWindows.of(Duration.millis(10)))) .apply("OutputSideInputs", - ParDo.withSideInputs(view).of(new DoFn() { + ParDo.of(new DoFn() { @ProcessElement public void processElement(ProcessContext c) { c.output(c.sideInput(view)); } - })); + }).withSideInputs(view)); PAssert.that(output).containsInAnyOrder(47, 47, 48); @@ -150,12 +150,12 @@ public void testEmptySingletonSideInput() throws Exception { .apply(View.asSingleton()); pipeline.apply("Create123", Create.of(1, 2, 3)) - .apply("OutputSideInputs", ParDo.withSideInputs(view).of(new DoFn() { + .apply("OutputSideInputs", ParDo.of(new DoFn() { @ProcessElement public void processElement(ProcessContext c) { c.output(c.sideInput(view)); } - })); + }).withSideInputs(view)); thrown.expect(PipelineExecutionException.class); thrown.expectCause(isA(NoSuchElementException.class)); @@ -174,12 +174,12 @@ public void testNonSingletonSideInput() throws Exception { final PCollectionView view = oneTwoThree.apply(View.asSingleton()); oneTwoThree.apply( - "OutputSideInputs", ParDo.withSideInputs(view).of(new DoFn() { + "OutputSideInputs", ParDo.of(new DoFn() { @ProcessElement public void processElement(ProcessContext c) { c.output(c.sideInput(view)); } - })); + }).withSideInputs(view)); thrown.expect(PipelineExecutionException.class); thrown.expectCause(isA(IllegalArgumentException.class)); @@ -200,7 +200,7 @@ public void testListSideInput() { PCollection output = pipeline.apply("CreateMainInput", Create.of(29, 31)) .apply("OutputSideInputs", - ParDo.withSideInputs(view).of(new DoFn() { + ParDo.of(new DoFn() { @ProcessElement public void processElement(ProcessContext c) { checkArgument(c.sideInput(view).size() == 4); @@ -209,7 +209,7 @@ public void processElement(ProcessContext c) { c.output(i); } } - })); + }).withSideInputs(view)); PAssert.that(output).containsInAnyOrder(11, 13, 17, 23, 11, 13, 17, 23); @@ -240,7 +240,7 @@ public void testWindowedListSideInput() { .apply("MainWindowInto", Window.into(FixedWindows.of(Duration.millis(10)))) .apply( "OutputSideInputs", - ParDo.withSideInputs(view).of(new DoFn() { + ParDo.of(new DoFn() { @ProcessElement public void processElement(ProcessContext c) { checkArgument(c.sideInput(view).size() == 4); @@ -249,7 +249,7 @@ public void processElement(ProcessContext c) { c.output(i); } } - })); + }).withSideInputs(view)); PAssert.that(output).containsInAnyOrder(11, 13, 17, 23, 31, 33, 37, 43); @@ -267,14 +267,14 @@ public void testEmptyListSideInput() throws Exception { PCollection results = pipeline.apply("Create1", Create.of(1)) .apply("OutputSideInputs", - ParDo.withSideInputs(view).of(new DoFn() { + ParDo.of(new DoFn() { @ProcessElement public void processElement(ProcessContext c) { assertTrue(c.sideInput(view).isEmpty()); assertFalse(c.sideInput(view).iterator().hasNext()); c.output(1); } - })); + }).withSideInputs(view)); // Pass at least one value through to guarantee that DoFn executes. PAssert.that(results).containsInAnyOrder(1); @@ -292,7 +292,7 @@ public void testListSideInputIsImmutable() { PCollection output = pipeline.apply("CreateMainInput", Create.of(29)) .apply("OutputSideInputs", - ParDo.withSideInputs(view).of(new DoFn() { + ParDo.of(new DoFn() { @ProcessElement public void processElement(ProcessContext c) { try { @@ -319,7 +319,7 @@ public void processElement(ProcessContext c) { c.output(i); } } - })); + }).withSideInputs(view)); // Pass at least one value through to guarantee that DoFn executes. PAssert.that(output).containsInAnyOrder(11); @@ -338,14 +338,14 @@ public void testIterableSideInput() { PCollection output = pipeline.apply("CreateMainInput", Create.of(29, 31)) .apply("OutputSideInputs", - ParDo.withSideInputs(view).of(new DoFn() { + ParDo.of(new DoFn() { @ProcessElement public void processElement(ProcessContext c) { for (Integer i : c.sideInput(view)) { c.output(i); } } - })); + }).withSideInputs(view)); PAssert.that(output).containsInAnyOrder(11, 13, 17, 23, 11, 13, 17, 23); @@ -377,14 +377,14 @@ public void testWindowedIterableSideInput() { TimestampedValue.of(35, new Instant(11)))) .apply("MainWindowInto", Window.into(FixedWindows.of(Duration.millis(10)))) .apply("OutputSideInputs", - ParDo.withSideInputs(view).of(new DoFn() { + ParDo.of(new DoFn() { @ProcessElement public void processElement(ProcessContext c) { for (Integer i : c.sideInput(view)) { c.output(i); } } - })); + }).withSideInputs(view)); PAssert.that(output).containsInAnyOrder(11, 13, 17, 23, 31, 33, 37, 43); @@ -402,13 +402,13 @@ public void testEmptyIterableSideInput() throws Exception { PCollection results = pipeline.apply("Create1", Create.of(1)) .apply("OutputSideInputs", - ParDo.withSideInputs(view).of(new DoFn() { + ParDo.of(new DoFn() { @ProcessElement public void processElement(ProcessContext c) { assertFalse(c.sideInput(view).iterator().hasNext()); c.output(1); } - })); + }).withSideInputs(view)); // Pass at least one value through to guarantee that DoFn executes. PAssert.that(results).containsInAnyOrder(1); @@ -426,7 +426,7 @@ public void testIterableSideInputIsImmutable() { PCollection output = pipeline.apply("CreateMainInput", Create.of(29)) .apply("OutputSideInputs", - ParDo.withSideInputs(view).of(new DoFn() { + ParDo.of(new DoFn() { @ProcessElement public void processElement(ProcessContext c) { Iterator iterator = c.sideInput(view).iterator(); @@ -439,7 +439,7 @@ public void processElement(ProcessContext c) { c.output(iterator.next()); } } - })); + }).withSideInputs(view)); // Pass at least one value through to guarantee that DoFn executes. PAssert.that(output).containsInAnyOrder(11); @@ -459,14 +459,14 @@ public void testMultimapSideInput() { pipeline.apply("CreateMainInput", Create.of("apple", "banana", "blackberry")) .apply( "OutputSideInputs", - ParDo.withSideInputs(view).of(new DoFn>() { + ParDo.of(new DoFn>() { @ProcessElement public void processElement(ProcessContext c) { for (Integer v : c.sideInput(view).get(c.element().substring(0, 1))) { c.output(of(c.element(), v)); } } - })); + }).withSideInputs(view)); PAssert.that(output).containsInAnyOrder( KV.of("apple", 1), KV.of("apple", 2), KV.of("banana", 3), KV.of("blackberry", 3)); @@ -486,7 +486,7 @@ public void testMultimapAsEntrySetSideInput() { pipeline.apply("CreateMainInput", Create.of(2 /* size */)) .apply( "OutputSideInputs", - ParDo.withSideInputs(view).of(new DoFn>() { + ParDo.of(new DoFn>() { @ProcessElement public void processElement(ProcessContext c) { assertEquals((int) c.element(), c.sideInput(view).size()); @@ -497,7 +497,7 @@ public void processElement(ProcessContext c) { } } } - })); + }).withSideInputs(view)); PAssert.that(output).containsInAnyOrder( KV.of("a", 1), KV.of("a", 2), KV.of("b", 3)); @@ -539,14 +539,14 @@ public void testMultimapSideInputWithNonDeterministicKeyCoder() { pipeline.apply("CreateMainInput", Create.of("apple", "banana", "blackberry")) .apply( "OutputSideInputs", - ParDo.withSideInputs(view).of(new DoFn>() { + ParDo.of(new DoFn>() { @ProcessElement public void processElement(ProcessContext c) { for (Integer v : c.sideInput(view).get(c.element().substring(0, 1))) { c.output(of(c.element(), v)); } } - })); + }).withSideInputs(view)); PAssert.that(output).containsInAnyOrder( KV.of("apple", 1), KV.of("apple", 2), KV.of("banana", 3), KV.of("blackberry", 3)); @@ -574,7 +574,7 @@ public void testWindowedMultimapSideInput() { TimestampedValue.of("banana", new Instant(13)), TimestampedValue.of("blackberry", new Instant(16)))) .apply("MainWindowInto", Window.into(FixedWindows.of(Duration.millis(10)))) - .apply("OutputSideInputs", ParDo.withSideInputs(view).of( + .apply("OutputSideInputs", ParDo.of( new DoFn>() { @ProcessElement public void processElement(ProcessContext c) { @@ -584,7 +584,7 @@ public void processElement(ProcessContext c) { c.output(of(c.element(), v)); } } - })); + }).withSideInputs(view)); PAssert.that(output).containsInAnyOrder( KV.of("apple", 1), KV.of("apple", 2), KV.of("banana", 3), KV.of("blackberry", 3)); @@ -611,7 +611,7 @@ public void testWindowedMultimapAsEntrySetSideInput() { TimestampedValue.of(1 /* size */, new Instant(5)), TimestampedValue.of(1 /* size */, new Instant(16)))) .apply("MainWindowInto", Window.into(FixedWindows.of(Duration.millis(10)))) - .apply("OutputSideInputs", ParDo.withSideInputs(view).of( + .apply("OutputSideInputs", ParDo.of( new DoFn>() { @ProcessElement public void processElement(ProcessContext c) { @@ -626,7 +626,7 @@ public void processElement(ProcessContext c) { } } } - })); + }).withSideInputs(view)); PAssert.that(output).containsInAnyOrder( KV.of("a", 1), KV.of("a", 2), KV.of("b", 3)); @@ -655,7 +655,7 @@ public void testWindowedMultimapSideInputWithNonDeterministicKeyCoder() { TimestampedValue.of("banana", new Instant(13)), TimestampedValue.of("blackberry", new Instant(16)))) .apply("MainWindowInto", Window.into(FixedWindows.of(Duration.millis(10)))) - .apply("OutputSideInputs", ParDo.withSideInputs(view).of( + .apply("OutputSideInputs", ParDo.of( new DoFn>() { @ProcessElement public void processElement(ProcessContext c) { @@ -665,7 +665,7 @@ public void processElement(ProcessContext c) { c.output(of(c.element(), v)); } } - })); + }).withSideInputs(view)); PAssert.that(output).containsInAnyOrder( KV.of("apple", 1), KV.of("apple", 2), KV.of("banana", 3), KV.of("blackberry", 3)); @@ -685,7 +685,7 @@ public void testEmptyMultimapSideInput() throws Exception { PCollection results = pipeline.apply("Create1", Create.of(1)) .apply("OutputSideInputs", - ParDo.withSideInputs(view).of(new DoFn() { + ParDo.of(new DoFn() { @ProcessElement public void processElement(ProcessContext c) { assertTrue(c.sideInput(view).isEmpty()); @@ -693,7 +693,7 @@ public void processElement(ProcessContext c) { assertFalse(c.sideInput(view).entrySet().iterator().hasNext()); c.output(c.element()); } - })); + }).withSideInputs(view)); // Pass at least one value through to guarantee that DoFn executes. PAssert.that(results).containsInAnyOrder(1); @@ -715,7 +715,7 @@ public void testEmptyMultimapSideInputWithNonDeterministicKeyCoder() throws Exce PCollection results = pipeline.apply("Create1", Create.of(1)) .apply("OutputSideInputs", - ParDo.withSideInputs(view).of(new DoFn() { + ParDo.of(new DoFn() { @ProcessElement public void processElement(ProcessContext c) { assertTrue(c.sideInput(view).isEmpty()); @@ -723,7 +723,7 @@ public void processElement(ProcessContext c) { assertFalse(c.sideInput(view).entrySet().iterator().hasNext()); c.output(c.element()); } - })); + }).withSideInputs(view)); // Pass at least one value through to guarantee that DoFn executes. PAssert.that(results).containsInAnyOrder(1); @@ -743,7 +743,7 @@ public void testMultimapSideInputIsImmutable() { pipeline.apply("CreateMainInput", Create.of("apple")) .apply( "OutputSideInputs", - ParDo.withSideInputs(view).of(new DoFn>() { + ParDo.of(new DoFn>() { @ProcessElement public void processElement(ProcessContext c) { try { @@ -770,7 +770,7 @@ public void processElement(ProcessContext c) { c.output(KV.of(c.element(), v)); } } - })); + }).withSideInputs(view)); // Pass at least one value through to guarantee that DoFn executes. PAssert.that(output).containsInAnyOrder(KV.of("apple", 1)); @@ -790,13 +790,13 @@ public void testMapSideInput() { pipeline.apply("CreateMainInput", Create.of("apple", "banana", "blackberry")) .apply( "OutputSideInputs", - ParDo.withSideInputs(view).of(new DoFn>() { + ParDo.of(new DoFn>() { @ProcessElement public void processElement(ProcessContext c) { c.output( of(c.element(), c.sideInput(view).get(c.element().substring(0, 1)))); } - })); + }).withSideInputs(view)); PAssert.that(output).containsInAnyOrder( KV.of("apple", 1), KV.of("banana", 3), KV.of("blackberry", 3)); @@ -816,7 +816,7 @@ public void testMapAsEntrySetSideInput() { pipeline.apply("CreateMainInput", Create.of(2 /* size */)) .apply( "OutputSideInputs", - ParDo.withSideInputs(view).of(new DoFn>() { + ParDo.of(new DoFn>() { @ProcessElement public void processElement(ProcessContext c) { assertEquals((int) c.element(), c.sideInput(view).size()); @@ -825,7 +825,7 @@ public void processElement(ProcessContext c) { c.output(KV.of(entry.getKey(), entry.getValue())); } } - })); + }).withSideInputs(view)); PAssert.that(output).containsInAnyOrder( KV.of("a", 1), KV.of("b", 3)); @@ -847,13 +847,13 @@ public void testMapSideInputWithNonDeterministicKeyCoder() { pipeline.apply("CreateMainInput", Create.of("apple", "banana", "blackberry")) .apply( "OutputSideInputs", - ParDo.withSideInputs(view).of(new DoFn>() { + ParDo.of(new DoFn>() { @ProcessElement public void processElement(ProcessContext c) { c.output( of(c.element(), c.sideInput(view).get(c.element().substring(0, 1)))); } - })); + }).withSideInputs(view)); PAssert.that(output).containsInAnyOrder( KV.of("apple", 1), KV.of("banana", 3), KV.of("blackberry", 3)); @@ -881,7 +881,7 @@ public void testWindowedMapSideInput() { TimestampedValue.of("banana", new Instant(4)), TimestampedValue.of("blackberry", new Instant(16)))) .apply("MainWindowInto", Window.into(FixedWindows.of(Duration.millis(10)))) - .apply("OutputSideInputs", ParDo.withSideInputs(view).of( + .apply("OutputSideInputs", ParDo.of( new DoFn>() { @ProcessElement public void processElement(ProcessContext c) { @@ -890,7 +890,7 @@ public void processElement(ProcessContext c) { c.sideInput(view).get( c.element().substring(0, 1)))); } - })); + }).withSideInputs(view)); PAssert.that(output).containsInAnyOrder( KV.of("apple", 1), KV.of("banana", 2), KV.of("blackberry", 3)); @@ -917,7 +917,7 @@ public void testWindowedMapAsEntrySetSideInput() { TimestampedValue.of(2 /* size */, new Instant(5)), TimestampedValue.of(1 /* size */, new Instant(16)))) .apply("MainWindowInto", Window.into(FixedWindows.of(Duration.millis(10)))) - .apply("OutputSideInputs", ParDo.withSideInputs(view).of( + .apply("OutputSideInputs", ParDo.of( new DoFn>() { @ProcessElement public void processElement(ProcessContext c) { @@ -930,7 +930,7 @@ public void processElement(ProcessContext c) { c.output(KV.of(entry.getKey(), entry.getValue())); } } - })); + }).withSideInputs(view)); PAssert.that(output).containsInAnyOrder( KV.of("a", 1), KV.of("b", 2), KV.of("b", 3)); @@ -961,7 +961,7 @@ public void testWindowedMapSideInputWithNonDeterministicKeyCoder() { TimestampedValue.of("banana", new Instant(4)), TimestampedValue.of("blackberry", new Instant(16)))) .apply("MainWindowInto", Window.into(FixedWindows.of(Duration.millis(10)))) - .apply("OutputSideInputs", ParDo.withSideInputs(view).of( + .apply("OutputSideInputs", ParDo.of( new DoFn>() { @ProcessElement public void processElement(ProcessContext c) { @@ -970,7 +970,7 @@ public void processElement(ProcessContext c) { c.sideInput(view).get( c.element().substring(0, 1)))); } - })); + }).withSideInputs(view)); PAssert.that(output).containsInAnyOrder( KV.of("apple", 1), KV.of("banana", 2), KV.of("blackberry", 3)); @@ -991,7 +991,7 @@ public void testEmptyMapSideInput() throws Exception { PCollection results = pipeline.apply("Create1", Create.of(1)) .apply("OutputSideInputs", - ParDo.withSideInputs(view).of(new DoFn() { + ParDo.of(new DoFn() { @ProcessElement public void processElement(ProcessContext c) { assertTrue(c.sideInput(view).isEmpty()); @@ -999,7 +999,7 @@ public void processElement(ProcessContext c) { assertFalse(c.sideInput(view).entrySet().iterator().hasNext()); c.output(c.element()); } - })); + }).withSideInputs(view)); // Pass at least one value through to guarantee that DoFn executes. PAssert.that(results).containsInAnyOrder(1); @@ -1019,7 +1019,7 @@ public void testEmptyMapSideInputWithNonDeterministicKeyCoder() throws Exception PCollection results = pipeline.apply("Create1", Create.of(1)) .apply("OutputSideInputs", - ParDo.withSideInputs(view).of(new DoFn() { + ParDo.of(new DoFn() { @ProcessElement public void processElement(ProcessContext c) { assertTrue(c.sideInput(view).isEmpty()); @@ -1027,7 +1027,7 @@ public void processElement(ProcessContext c) { assertFalse(c.sideInput(view).entrySet().iterator().hasNext()); c.output(c.element()); } - })); + }).withSideInputs(view)); // Pass at least one value through to guarantee that DoFn executes. PAssert.that(results).containsInAnyOrder(1); @@ -1052,13 +1052,13 @@ public void testMapSideInputWithNullValuesCatchesDuplicates() { pipeline.apply("CreateMainInput", Create.of("apple", "banana", "blackberry")) .apply( "OutputSideInputs", - ParDo.withSideInputs(view).of(new DoFn>() { + ParDo.of(new DoFn>() { @ProcessElement public void processElement(ProcessContext c) { c.output( KV.of(c.element(), c.sideInput(view).get(c.element().substring(0, 1)))); } - })); + }).withSideInputs(view)); PAssert.that(output).containsInAnyOrder( KV.of("apple", 1), KV.of("banana", 3), KV.of("blackberry", 3)); @@ -1082,7 +1082,7 @@ public void testMapSideInputIsImmutable() { pipeline.apply("CreateMainInput", Create.of("apple")) .apply( "OutputSideInputs", - ParDo.withSideInputs(view).of(new DoFn>() { + ParDo.of(new DoFn>() { @ProcessElement public void processElement(ProcessContext c) { try { @@ -1108,7 +1108,7 @@ public void processElement(ProcessContext c) { c.output( KV.of(c.element(), c.sideInput(view).get(c.element().substring(0, 1)))); } - })); + }).withSideInputs(view)); // Pass at least one value through to guarantee that DoFn executes. PAssert.that(output).containsInAnyOrder(KV.of("apple", 1)); @@ -1128,13 +1128,13 @@ public void testCombinedMapSideInput() { PCollection> output = pipeline.apply("CreateMainInput", Create.of("apple", "banana", "blackberry")) .apply("Output", - ParDo.withSideInputs(view).of(new DoFn>() { + ParDo.of(new DoFn>() { @ProcessElement public void processElement(ProcessContext c) { c.output(KV .of(c.element(), c.sideInput(view).get(c.element().substring(0, 1)))); } - })); + }).withSideInputs(view)); PAssert.that(output).containsInAnyOrder( KV.of("apple", 21), KV.of("banana", 3), KV.of("blackberry", 3)); @@ -1161,13 +1161,13 @@ public void testWindowedSideInputFixedToFixed() { TimestampedValue.of("B", new Instant(15)), TimestampedValue.of("C", new Instant(7)))) .apply("WindowMainInput", Window.into(FixedWindows.of(Duration.millis(10)))) - .apply("OutputMainAndSideInputs", ParDo.withSideInputs(view).of( + .apply("OutputMainAndSideInputs", ParDo.of( new DoFn() { @ProcessElement public void processElement(ProcessContext c) { c.output(c.element() + c.sideInput(view)); } - })); + }).withSideInputs(view)); PAssert.that(output).containsInAnyOrder("A1", "B5", "C1"); @@ -1193,13 +1193,13 @@ public void testWindowedSideInputFixedToGlobal() { TimestampedValue.of("B", new Instant(15)), TimestampedValue.of("C", new Instant(7)))) .apply("WindowMainInput", Window.into(FixedWindows.of(Duration.millis(10)))) - .apply("OutputMainAndSideInputs", ParDo.withSideInputs(view).of( + .apply("OutputMainAndSideInputs", ParDo.of( new DoFn() { @ProcessElement public void processElement(ProcessContext c) { c.output(c.element() + c.sideInput(view)); } - })); + }).withSideInputs(view)); PAssert.that(output).containsInAnyOrder("A6", "B6", "C6"); @@ -1223,13 +1223,13 @@ public void testWindowedSideInputFixedToFixedWithDefault() { TimestampedValue.of("B", new Instant(15)), TimestampedValue.of("C", new Instant(7)))) .apply("WindowMainInput", Window.into(FixedWindows.of(Duration.millis(10)))) - .apply("OutputMainAndSideInputs", ParDo.withSideInputs(view).of( + .apply("OutputMainAndSideInputs", ParDo.of( new DoFn() { @ProcessElement public void processElement(ProcessContext c) { c.output(c.element() + c.sideInput(view)); } - })); + }).withSideInputs(view)); PAssert.that(output).containsInAnyOrder("A0", "B5", "C0"); @@ -1253,12 +1253,12 @@ public Void apply(Iterable input) { pipeline.apply("CreateMainInput", Create.of("")) .apply( "OutputMainAndSideInputs", - ParDo.withSideInputs(view).of(new DoFn() { + ParDo.of(new DoFn() { @ProcessElement public void processElement(ProcessContext c) { c.output(c.element() + c.sideInput(view)); } - })); + }).withSideInputs(view)); PAssert.that(output).containsInAnyOrder("null"); @@ -1282,18 +1282,18 @@ public void processElement(ProcessContext c) { pipeline.apply("CreateVoid2", Create.of((Void) null).withCoder(VoidCoder.of())) .apply( "OutputSideInput", - ParDo.withSideInputs(view1).of(new DoFn>() { + ParDo.of(new DoFn>() { @ProcessElement public void processElement(ProcessContext c) { c.output(c.sideInput(view1)); } - })) + }).withSideInputs(view1)) .apply("View2", View.>asIterable()); PCollection output = pipeline.apply("CreateVoid3", Create.of((Void) null).withCoder(VoidCoder.of())) .apply("ReadIterableSideInput", - ParDo.withSideInputs(view2).of(new DoFn() { + ParDo.of(new DoFn() { @ProcessElement public void processElement(ProcessContext c) { for (Iterable input : c.sideInput(view2)) { @@ -1302,7 +1302,7 @@ public void processElement(ProcessContext c) { } } } - })); + }).withSideInputs(view2)); PAssert.that(output).containsInAnyOrder(17); diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/values/TypedPValueTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/values/TypedPValueTest.java index 5e7cc7da7f07..18d550cd201d 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/values/TypedPValueTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/values/TypedPValueTest.java @@ -59,8 +59,8 @@ private PCollectionTuple buildPCollectionTupleWithTags( PCollection input = p.apply(Create.of(1, 2, 3)); PCollectionTuple tuple = input.apply( ParDo - .withOutputTags(mainOutputTag, TupleTagList.of(sideOutputTag)) - .of(new IdentityDoFn())); + .of(new IdentityDoFn()) + .withOutputTags(mainOutputTag, TupleTagList.of(sideOutputTag))); return tuple; } From 08db2a62d573e612c3f17a7ce90da89cedffd37c Mon Sep 17 00:00:00 2001 From: Eugene Kirpichov Date: Fri, 3 Mar 2017 10:53:59 -0800 Subject: [PATCH 2/4] Removes accidentally left over class in test --- .../apache/beam/sdk/transforms/ParDoTest.java | 16 ---------------- 1 file changed, 16 deletions(-) diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java index 9a4fd15882aa..19265e163f87 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java @@ -2311,22 +2311,6 @@ public void populateDisplayData(Builder builder) { assertThat(displayData, hasDisplayItem("fn", fn.getClass())); } - private abstract static class SomeTracker implements RestrictionTracker {} - private static class TestSplittableDoFn extends DoFn { - @ProcessElement - public void processElement(ProcessContext context, SomeTracker tracker) {} - - @GetInitialRestriction - public Object getInitialRestriction(Integer element) { - return null; - } - - @NewTracker - public SomeTracker newTracker(Object restriction) { - return null; - } - } - @Test public void testRejectsWrongWindowType() { From 4045144e5467d2e42e0f3876621ab0955388bb01 Mon Sep 17 00:00:00 2001 From: Eugene Kirpichov Date: Fri, 3 Mar 2017 11:06:49 -0800 Subject: [PATCH 3/4] Renamed ParDo.Bound to ParDo.SingleOutput --- .../core/construction/PTransformMatchers.java | 24 ++++----- .../construction/PTransformMatchersTest.java | 6 +-- .../org/apache/beam/runners/core/OldDoFn.java | 10 ++-- .../beam/runners/direct/DirectRunner.java | 4 +- .../dataflow/BatchStatefulParDoOverrides.java | 18 ++++--- .../beam/runners/dataflow/DataflowRunner.java | 4 +- .../dataflow/PrimitiveParDoSingleFactory.java | 13 +++-- .../DataflowPipelineTranslatorTest.java | 4 +- .../PrimitiveParDoSingleFactoryTest.java | 8 +-- .../runners/spark/SparkPipelineStateTest.java | 2 +- .../beam/sdk/AggregatorPipelineExtractor.java | 4 +- .../org/apache/beam/sdk/transforms/DoFn.java | 8 +-- .../org/apache/beam/sdk/transforms/ParDo.java | 20 +++---- .../org/apache/beam/sdk/transforms/View.java | 2 +- .../sdk/AggregatorPipelineExtractorTest.java | 54 +++++++++---------- .../sdk/runners/TransformHierarchyTest.java | 8 +-- .../apache/beam/sdk/transforms/ParDoTest.java | 7 ++- .../beam/sdk/values/TypedPValueTest.java | 2 +- 18 files changed, 100 insertions(+), 98 deletions(-) diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformMatchers.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformMatchers.java index 38cf76fb7c8e..f803a9fe1ef4 100644 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformMatchers.java +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformMatchers.java @@ -70,16 +70,16 @@ public boolean matches(AppliedPTransform application) { } /** - * A {@link PTransformMatcher} that matches a {@link ParDo.Bound} containing a {@link DoFn} that - * is splittable, as signified by {@link ProcessElementMethod#isSplittable()}. + * A {@link PTransformMatcher} that matches a {@link ParDo.SingleOutput} containing a {@link DoFn} + * that is splittable, as signified by {@link ProcessElementMethod#isSplittable()}. */ public static PTransformMatcher splittableParDoSingle() { return new PTransformMatcher() { @Override public boolean matches(AppliedPTransform application) { PTransform transform = application.getTransform(); - if (transform instanceof ParDo.Bound) { - DoFn fn = ((ParDo.Bound) transform).getFn(); + if (transform instanceof ParDo.SingleOutput) { + DoFn fn = ((ParDo.SingleOutput) transform).getFn(); DoFnSignature signature = DoFnSignatures.signatureForDoFn(fn); return signature.processElement().isSplittable(); } @@ -89,17 +89,17 @@ public boolean matches(AppliedPTransform application) { } /** - * A {@link PTransformMatcher} that matches a {@link ParDo.Bound} containing a {@link DoFn} that - * uses state or timers, as specified by {@link DoFnSignature#usesState()} and - * {@link DoFnSignature#usesTimers()}. + * A {@link PTransformMatcher} that matches a {@link ParDo.SingleOutput} containing a {@link DoFn} + * that uses state or timers, as specified by {@link DoFnSignature#usesState()} and {@link + * DoFnSignature#usesTimers()}. */ public static PTransformMatcher stateOrTimerParDoSingle() { return new PTransformMatcher() { @Override public boolean matches(AppliedPTransform application) { PTransform transform = application.getTransform(); - if (transform instanceof ParDo.Bound) { - DoFn fn = ((ParDo.Bound) transform).getFn(); + if (transform instanceof ParDo.SingleOutput) { + DoFn fn = ((ParDo.SingleOutput) transform).getFn(); DoFnSignature signature = DoFnSignatures.signatureForDoFn(fn); return signature.usesState() || signature.usesTimers(); } @@ -148,7 +148,7 @@ public boolean matches(AppliedPTransform application) { } /** - * A {@link PTransformMatcher} which matches a {@link ParDo.Bound} or {@link ParDo.BoundMulti} + * A {@link PTransformMatcher} which matches a {@link ParDo.SingleOutput} or {@link ParDo.BoundMulti} * where the {@link DoFn} is of the provided type. */ public static PTransformMatcher parDoWithFnType(final Class fnType) { @@ -156,8 +156,8 @@ public static PTransformMatcher parDoWithFnType(final Class fnTy @Override public boolean matches(AppliedPTransform application) { DoFn fn; - if (application.getTransform() instanceof ParDo.Bound) { - fn = ((ParDo.Bound) application.getTransform()).getFn(); + if (application.getTransform() instanceof ParDo.SingleOutput) { + fn = ((ParDo.SingleOutput) application.getTransform()).getFn(); } else if (application.getTransform() instanceof ParDo.BoundMulti) { fn = ((ParDo.BoundMulti) application.getTransform()).getFn(); } else { diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java index 0fead17c7dcf..484aba46caa9 100644 --- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java +++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java @@ -92,7 +92,7 @@ public class PTransformMatchersTest implements Serializable { @Test public void classEqualToMatchesSameClass() { - PTransformMatcher matcher = PTransformMatchers.classEqualTo(ParDo.Bound.class); + PTransformMatcher matcher = PTransformMatchers.classEqualTo(ParDo.SingleOutput.class); AppliedPTransform application = getAppliedTransform( ParDo.of( @@ -127,7 +127,7 @@ public PCollection expand(PCollection> input) { @Test public void classEqualToDoesNotMatchUnrelatedClass() { - PTransformMatcher matcher = PTransformMatchers.classEqualTo(ParDo.Bound.class); + PTransformMatcher matcher = PTransformMatchers.classEqualTo(ParDo.SingleOutput.class); AppliedPTransform application = getAppliedTransform(Window.>into(new GlobalWindows())); @@ -192,7 +192,7 @@ public void onTimer(OnTimerContext context) { }; /** - * Demonstrates that a {@link ParDo.Bound} does not match any ParDo matcher. + * Demonstrates that a {@link ParDo.SingleOutput} does not match any ParDo matcher. */ @Test public void parDoSingle() { diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/OldDoFn.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/OldDoFn.java index 4033260778e3..e9d4740c84a0 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/OldDoFn.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/OldDoFn.java @@ -141,8 +141,8 @@ public abstract class Context { *

Once passed to {@code sideOutput} the element should not be modified * in any way. * - *

The caller of {@code ParDo} uses {@link ParDo#withOutputTags withOutputTags} to - * specify the tags of side outputs that it consumes. Non-consumed side + *

The caller of {@code ParDo} uses {@link ParDo.SingleOutput#withOutputTags withOutputTags} + * to specify the tags of side outputs that it consumes. Non-consumed side * outputs, e.g., outputs for monitoring purposes only, don't necessarily * need to be specified. * @@ -157,7 +157,7 @@ public abstract class Context { * to access any information about the input element. The output element * will have a timestamp of negative infinity. * - * @see ParDo#withOutputTags + * @see ParDo.SingleOutput#withOutputTags */ public abstract void sideOutput(TupleTag tag, T output); @@ -181,7 +181,7 @@ public abstract class Context { * to access any information about the input element except for the * timestamp. * - * @see ParDo#withOutputTags + * @see ParDo.SingleOutput#withOutputTags */ public abstract void sideOutputWithTimestamp( TupleTag tag, T output, Instant timestamp); @@ -251,7 +251,7 @@ public abstract class ProcessContext extends Context { * for how this corresponding window is determined. * * @throws IllegalArgumentException if this is not a side input - * @see ParDo#withSideInputs + * @see ParDo.SingleOutput#withSideInputs */ public abstract T sideInput(PCollectionView view); diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java index 62df6c803d1a..4ee364f85fc9 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java @@ -168,7 +168,7 @@ interface PCollectionViewWriter { /** The set of {@link PTransform PTransforms} that execute a UDF. Useful for some enforcements. */ private static final Set> CONTAINS_UDF = ImmutableSet.of( - Read.Bounded.class, Read.Unbounded.class, ParDo.Bound.class, ParDo.BoundMulti.class); + Read.Bounded.class, Read.Unbounded.class, ParDo.SingleOutput.class, ParDo.BoundMulti.class); enum Enforcement { ENCODABILITY { @@ -221,7 +221,7 @@ public static BundleFactory bundleFactoryFor(Set enforcements, Dire enabledParDoEnforcements.add(ImmutabilityEnforcementFactory.create()); } Collection parDoEnforcements = enabledParDoEnforcements.build(); - enforcements.put(ParDo.Bound.class, parDoEnforcements); + enforcements.put(ParDo.SingleOutput.class, parDoEnforcements); enforcements.put(ParDo.BoundMulti.class, parDoEnforcements); return enforcements.build(); } diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchStatefulParDoOverrides.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchStatefulParDoOverrides.java index 91f84abdfa95..82629db0164c 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchStatefulParDoOverrides.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchStatefulParDoOverrides.java @@ -58,12 +58,13 @@ public class BatchStatefulParDoOverrides { /** - * Returns a {@link PTransformOverrideFactory} that replaces a single-output - * {@link ParDo} with a composite transform specialized for the {@link DataflowRunner}. + * Returns a {@link PTransformOverrideFactory} that replaces a single-output {@link ParDo} with a + * composite transform specialized for the {@link DataflowRunner}. */ public static PTransformOverrideFactory< - PCollection>, PCollection, ParDo.Bound, OutputT>> + PCollection>, PCollection, + ParDo.SingleOutput, OutputT>> singleOutputOverrideFactory() { return new SingleOutputOverrideFactory<>(); } @@ -82,12 +83,13 @@ public class BatchStatefulParDoOverrides { private static class SingleOutputOverrideFactory implements PTransformOverrideFactory< - PCollection>, PCollection, ParDo.Bound, OutputT>> { + PCollection>, PCollection, + ParDo.SingleOutput, OutputT>> { @Override @SuppressWarnings("unchecked") public PTransform>, PCollection> getReplacementTransform( - ParDo.Bound, OutputT> originalParDo) { + ParDo.SingleOutput, OutputT> originalParDo) { return new StatefulSingleOutputParDo<>(originalParDo); } @@ -129,13 +131,13 @@ public Map mapOutputs( static class StatefulSingleOutputParDo extends PTransform>, PCollection> { - private final ParDo.Bound, OutputT> originalParDo; + private final ParDo.SingleOutput, OutputT> originalParDo; - StatefulSingleOutputParDo(ParDo.Bound, OutputT> originalParDo) { + StatefulSingleOutputParDo(ParDo.SingleOutput, OutputT> originalParDo) { this.originalParDo = originalParDo; } - ParDo.Bound, OutputT> getOriginalParDo() { + ParDo.SingleOutput, OutputT> getOriginalParDo() { return originalParDo; } diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java index c612a208782a..e48ed779991e 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java @@ -371,7 +371,9 @@ private Map getOverrides(boolean s .put( PTransformMatchers.classEqualTo(Combine.GroupedValues.class), new PrimitiveCombineGroupedValuesOverrideFactory()) - .put(PTransformMatchers.classEqualTo(ParDo.Bound.class), new PrimitiveParDoSingleFactory()); + .put( + PTransformMatchers.classEqualTo(ParDo.SingleOutput.class), + new PrimitiveParDoSingleFactory()); return ptoverrides.build(); } diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/PrimitiveParDoSingleFactory.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/PrimitiveParDoSingleFactory.java index a749730d33bd..db50cc276679 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/PrimitiveParDoSingleFactory.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/PrimitiveParDoSingleFactory.java @@ -26,21 +26,20 @@ import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; -import org.apache.beam.sdk.transforms.ParDo.Bound; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionView; /** - * A {@link PTransformOverrideFactory} that produces {@link ParDoSingle} instances from - * {@link ParDo.Bound} instances. {@link ParDoSingle} is a primitive {@link PTransform}, to ensure + * A {@link PTransformOverrideFactory} that produces {@link ParDoSingle} instances from {@link + * ParDo.SingleOutput} instances. {@link ParDoSingle} is a primitive {@link PTransform}, to ensure * that {@link DisplayData} appears on all {@link ParDo ParDos} in the {@link DataflowRunner}. */ public class PrimitiveParDoSingleFactory extends SingleInputOutputOverrideFactory< - PCollection, PCollection, ParDo.Bound> { + PCollection, PCollection, ParDo.SingleOutput> { @Override public PTransform, PCollection> getReplacementTransform( - ParDo.Bound transform) { + ParDo.SingleOutput transform) { return new ParDoSingle<>(transform); } @@ -49,9 +48,9 @@ public PTransform, PCollection> getReplac */ public static class ParDoSingle extends ForwardingPTransform, PCollection> { - private final ParDo.Bound original; + private final ParDo.SingleOutput original; - private ParDoSingle(Bound original) { + private ParDoSingle(ParDo.SingleOutput original) { this.original = original; } diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java index 3d98aa9e2d9b..0f74e572bdb3 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java @@ -1001,8 +1001,8 @@ public void populateDisplayData(DisplayData.Builder builder) { } }; - ParDo.Bound parDo1 = ParDo.of(fn1); - ParDo.Bound parDo2 = ParDo.of(fn2); + ParDo.SingleOutput parDo1 = ParDo.of(fn1); + ParDo.SingleOutput parDo2 = ParDo.of(fn2); pipeline .apply(Create.of(1, 2, 3)) .apply(parDo1) diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/PrimitiveParDoSingleFactoryTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/PrimitiveParDoSingleFactoryTest.java index cb1e34ee7505..bff46ea57fae 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/PrimitiveParDoSingleFactoryTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/PrimitiveParDoSingleFactoryTest.java @@ -58,11 +58,11 @@ public class PrimitiveParDoSingleFactoryTest implements Serializable { /** * A test that demonstrates that the replacement transform has the Display Data of the - * {@link ParDo.Bound} it replaces. + * {@link ParDo.SingleOutput} it replaces. */ @Test public void getReplacementTransformPopulateDisplayData() { - ParDo.Bound originalTransform = ParDo.of(new ToLongFn()); + ParDo.SingleOutput originalTransform = ParDo.of(new ToLongFn()); DisplayData originalDisplayData = DisplayData.from(originalTransform); PTransform, PCollection> replacement = @@ -88,7 +88,7 @@ public void getReplacementTransformGetSideInputs() { pipeline .apply("StringSideInputVals", Create.of("foo", "bar", "baz")) .apply("SideStringsView", View.asList()); - ParDo.Bound originalTransform = + ParDo.SingleOutput originalTransform = ParDo.of(new ToLongFn()).withSideInputs(sideLong, sideStrings); PTransform, PCollection> replacementTransform = @@ -100,7 +100,7 @@ public void getReplacementTransformGetSideInputs() { @Test public void getReplacementTransformGetFn() { DoFn originalFn = new ToLongFn(); - ParDo.Bound originalTransform = ParDo.of(originalFn); + ParDo.SingleOutput originalTransform = ParDo.of(originalFn); PTransform, PCollection> replacementTransform = factory.getReplacementTransform(originalTransform); ParDoSingle parDoSingle = (ParDoSingle) replacementTransform; diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkPipelineStateTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkPipelineStateTest.java index 3a68d6f75fcc..cfbad01952e7 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkPipelineStateTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkPipelineStateTest.java @@ -62,7 +62,7 @@ private static class MyCustomException extends RuntimeException { private static final String FAILED_THE_BATCH_INTENTIONALLY = "Failed the batch intentionally"; - private ParDo.Bound printParDo(final String prefix) { + private ParDo.SingleOutput printParDo(final String prefix) { return ParDo.of(new DoFn() { @ProcessElement diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/AggregatorPipelineExtractor.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/AggregatorPipelineExtractor.java index c79f779d81bf..7a422b8268d0 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/AggregatorPipelineExtractor.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/AggregatorPipelineExtractor.java @@ -69,8 +69,8 @@ public void visitPrimitiveTransform(TransformHierarchy.Node node) { private Collection> getAggregators(PTransform transform) { if (transform != null) { - if (transform instanceof ParDo.Bound) { - return AggregatorRetriever.getAggregators(((ParDo.Bound) transform).getFn()); + if (transform instanceof ParDo.SingleOutput) { + return AggregatorRetriever.getAggregators(((ParDo.SingleOutput) transform).getFn()); } else if (transform instanceof ParDo.BoundMulti) { return AggregatorRetriever.getAggregators( ((ParDo.BoundMulti) transform).getFn()); diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java index 20c66c0f4082..a7730f00b9fb 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java @@ -160,7 +160,7 @@ public abstract class Context { *

Once passed to {@code sideOutput} the element should not be modified * in any way. * - *

The caller of {@code ParDo} uses {@link ParDo#withOutputTags} to + *

The caller of {@code ParDo} uses {@link ParDo.SingleOutput#withOutputTags} to * specify the tags of side outputs that it consumes. Non-consumed side * outputs, e.g., outputs for monitoring purposes only, don't necessarily * need to be specified. @@ -179,7 +179,7 @@ public abstract class Context { *

Note: A splittable {@link DoFn} is not allowed to output from * {@link StartBundle} or {@link FinishBundle} methods. * - * @see ParDo#withOutputTags + * @see ParDo.SingleOutput#withOutputTags */ public abstract void sideOutput(TupleTag tag, T output); @@ -206,7 +206,7 @@ public abstract class Context { *

Note: A splittable {@link DoFn} is not allowed to output from * {@link StartBundle} or {@link FinishBundle} methods. * - * @see ParDo#withOutputTags + * @see ParDo.SingleOutput#withOutputTags */ public abstract void sideOutputWithTimestamp( TupleTag tag, T output, Instant timestamp); @@ -272,7 +272,7 @@ public abstract class ProcessContext extends Context { * Returns the value of the side input. * * @throws IllegalArgumentException if this is not a side input - * @see ParDo#withSideInputs + * @see ParDo.SingleOutput#withSideInputs */ public abstract T sideInput(PCollectionView view); diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java index 854084a7fde5..2c5f664e2b7c 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java @@ -151,7 +151,7 @@ * {@link PCollectionView PCollectionViews} express styles of accessing * {@link PCollection PCollections} computed by earlier pipeline operations, * passed in to the {@link ParDo} transform using - * {@link ParDo.Bound#withSideInputs}, and their contents accessible to each of + * {@link SingleOutput#withSideInputs}, and their contents accessible to each of * the {@link DoFn} operations via {@link DoFn.ProcessContext#sideInput sideInput}. * For example: * @@ -181,7 +181,7 @@ * {@link PCollection PCollections}, each keyed by a distinct {@link TupleTag}, * and bundled in a {@link PCollectionTuple}. The {@link TupleTag TupleTags} * to be used for the output {@link PCollectionTuple} are specified by - * invoking {@link ParDo.Bound#withOutputTags}. Unconsumed side outputs do not + * invoking {@link SingleOutput#withOutputTags}. Unconsumed side outputs do not * necessarily need to be explicitly specified, even if the {@link DoFn} * generates them. Within the {@link DoFn}, an element is added to the * main output {@link PCollection} as normal, using @@ -421,9 +421,9 @@ public class ParDo { *

The resulting {@link PTransform PTransform} is ready to be applied, or further * properties can be set on it first. */ - public static Bound of(DoFn fn) { + public static SingleOutput of(DoFn fn) { validate(fn); - return new Bound( + return new SingleOutput( fn, Collections.>emptyList(), displayDataForFn(fn)); } @@ -491,18 +491,18 @@ private static void validate(DoFn fn) { * {@code PCollection}. * *

A multi-output form of this transform can be created with - * {@link ParDo.Bound#withOutputTags}. + * {@link SingleOutput#withOutputTags}. * * @param the type of the (main) input {@link PCollection} elements * @param the type of the (main) output {@link PCollection} elements */ - public static class Bound + public static class SingleOutput extends PTransform, PCollection> { private final List> sideInputs; private final DoFn fn; private final DisplayData.ItemSpec> fnDisplayData; - Bound( + SingleOutput( DoFn fn, List> sideInputs, DisplayData.ItemSpec> fnDisplayData) { @@ -518,7 +518,7 @@ public static class Bound * *

See the discussion of Side Inputs above for more explanation. */ - public Bound withSideInputs(PCollectionView... sideInputs) { + public SingleOutput withSideInputs(PCollectionView... sideInputs) { return withSideInputs(Arrays.asList(sideInputs)); } @@ -529,9 +529,9 @@ public Bound withSideInputs(PCollectionView... sideInputs) { * *

See the discussion of Side Inputs above for more explanation. */ - public Bound withSideInputs( + public SingleOutput withSideInputs( Iterable> sideInputs) { - return new Bound<>( + return new SingleOutput<>( fn, ImmutableList.>builder() .addAll(this.sideInputs) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/View.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/View.java index 1986ac51c43b..226e163fcb1b 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/View.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/View.java @@ -129,7 +129,7 @@ * } * * - *

See {@link ParDo#withSideInputs} for details on how to access + *

See {@link ParDo.SingleOutput#withSideInputs} for details on how to access * this variable inside a {@link ParDo} over another {@link PCollection}. */ public class View { diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/AggregatorPipelineExtractorTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/AggregatorPipelineExtractorTest.java index 22efd85132e0..52bcc93a21ab 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/AggregatorPipelineExtractorTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/AggregatorPipelineExtractorTest.java @@ -64,17 +64,17 @@ public void setup() { @SuppressWarnings("unchecked") @Test - public void testGetAggregatorStepsWithParDoBoundExtractsSteps() { + public void testGetAggregatorStepsWithParDoSingleOutputExtractsSteps() { @SuppressWarnings("rawtypes") - ParDo.Bound bound = mock(ParDo.Bound.class, "Bound"); + ParDo.SingleOutput parDo = mock(ParDo.SingleOutput.class, "parDo"); AggregatorProvidingDoFn fn = new AggregatorProvidingDoFn<>(); - when(bound.getFn()).thenReturn(fn); + when(parDo.getFn()).thenReturn(fn); Aggregator aggregatorOne = fn.addAggregator(Sum.ofLongs()); Aggregator aggregatorTwo = fn.addAggregator(Min.ofIntegers()); TransformHierarchy.Node transformNode = mock(TransformHierarchy.Node.class); - when(transformNode.getTransform()).thenReturn(bound); + when(transformNode.getTransform()).thenReturn(parDo); doAnswer(new VisitNodesAnswer(ImmutableList.of(transformNode))) .when(p) @@ -85,8 +85,8 @@ public void testGetAggregatorStepsWithParDoBoundExtractsSteps() { Map, Collection>> aggregatorSteps = extractor.getAggregatorSteps(); - assertEquals(ImmutableSet.>of(bound), aggregatorSteps.get(aggregatorOne)); - assertEquals(ImmutableSet.>of(bound), aggregatorSteps.get(aggregatorTwo)); + assertEquals(ImmutableSet.>of(parDo), aggregatorSteps.get(aggregatorOne)); + assertEquals(ImmutableSet.>of(parDo), aggregatorSteps.get(aggregatorTwo)); assertEquals(aggregatorSteps.size(), 2); } @@ -94,15 +94,15 @@ public void testGetAggregatorStepsWithParDoBoundExtractsSteps() { @Test public void testGetAggregatorStepsWithParDoBoundMultiExtractsSteps() { @SuppressWarnings("rawtypes") - ParDo.BoundMulti bound = mock(ParDo.BoundMulti.class, "BoundMulti"); + ParDo.BoundMulti parDo = mock(ParDo.BoundMulti.class, "parDo"); AggregatorProvidingDoFn fn = new AggregatorProvidingDoFn<>(); - when(bound.getFn()).thenReturn(fn); + when(parDo.getFn()).thenReturn(fn); Aggregator aggregatorOne = fn.addAggregator(Max.ofLongs()); Aggregator aggregatorTwo = fn.addAggregator(Min.ofDoubles()); TransformHierarchy.Node transformNode = mock(TransformHierarchy.Node.class); - when(transformNode.getTransform()).thenReturn(bound); + when(transformNode.getTransform()).thenReturn(parDo); doAnswer(new VisitNodesAnswer(ImmutableList.of(transformNode))) .when(p) @@ -113,8 +113,8 @@ public void testGetAggregatorStepsWithParDoBoundMultiExtractsSteps() { Map, Collection>> aggregatorSteps = extractor.getAggregatorSteps(); - assertEquals(ImmutableSet.>of(bound), aggregatorSteps.get(aggregatorOne)); - assertEquals(ImmutableSet.>of(bound), aggregatorSteps.get(aggregatorTwo)); + assertEquals(ImmutableSet.>of(parDo), aggregatorSteps.get(aggregatorOne)); + assertEquals(ImmutableSet.>of(parDo), aggregatorSteps.get(aggregatorTwo)); assertEquals(2, aggregatorSteps.size()); } @@ -122,20 +122,20 @@ public void testGetAggregatorStepsWithParDoBoundMultiExtractsSteps() { @Test public void testGetAggregatorStepsWithOneAggregatorInMultipleStepsAddsSteps() { @SuppressWarnings("rawtypes") - ParDo.Bound bound = mock(ParDo.Bound.class, "Bound"); + ParDo.SingleOutput parDo = mock(ParDo.SingleOutput.class, "parDo"); @SuppressWarnings("rawtypes") - ParDo.BoundMulti otherBound = mock(ParDo.BoundMulti.class, "otherBound"); + ParDo.BoundMulti otherParDo = mock(ParDo.BoundMulti.class, "otherParDo"); AggregatorProvidingDoFn fn = new AggregatorProvidingDoFn<>(); - when(bound.getFn()).thenReturn(fn); - when(otherBound.getFn()).thenReturn(fn); + when(parDo.getFn()).thenReturn(fn); + when(otherParDo.getFn()).thenReturn(fn); Aggregator aggregatorOne = fn.addAggregator(Sum.ofLongs()); Aggregator aggregatorTwo = fn.addAggregator(Min.ofDoubles()); TransformHierarchy.Node transformNode = mock(TransformHierarchy.Node.class); - when(transformNode.getTransform()).thenReturn(bound); + when(transformNode.getTransform()).thenReturn(parDo); TransformHierarchy.Node otherTransformNode = mock(TransformHierarchy.Node.class); - when(otherTransformNode.getTransform()).thenReturn(otherBound); + when(otherTransformNode.getTransform()).thenReturn(otherParDo); doAnswer(new VisitNodesAnswer(ImmutableList.of(transformNode, otherTransformNode))) .when(p) @@ -147,9 +147,9 @@ public void testGetAggregatorStepsWithOneAggregatorInMultipleStepsAddsSteps() { extractor.getAggregatorSteps(); assertEquals( - ImmutableSet.>of(bound, otherBound), aggregatorSteps.get(aggregatorOne)); + ImmutableSet.>of(parDo, otherParDo), aggregatorSteps.get(aggregatorOne)); assertEquals( - ImmutableSet.>of(bound, otherBound), aggregatorSteps.get(aggregatorTwo)); + ImmutableSet.>of(parDo, otherParDo), aggregatorSteps.get(aggregatorTwo)); assertEquals(2, aggregatorSteps.size()); } @@ -157,25 +157,25 @@ public void testGetAggregatorStepsWithOneAggregatorInMultipleStepsAddsSteps() { @Test public void testGetAggregatorStepsWithDifferentStepsAddsSteps() { @SuppressWarnings("rawtypes") - ParDo.Bound bound = mock(ParDo.Bound.class, "Bound"); + ParDo.SingleOutput parDo = mock(ParDo.SingleOutput.class, "parDo"); AggregatorProvidingDoFn fn = new AggregatorProvidingDoFn<>(); Aggregator aggregatorOne = fn.addAggregator(Sum.ofLongs()); - when(bound.getFn()).thenReturn(fn); + when(parDo.getFn()).thenReturn(fn); @SuppressWarnings("rawtypes") - ParDo.BoundMulti otherBound = mock(ParDo.BoundMulti.class, "otherBound"); + ParDo.BoundMulti otherParDo = mock(ParDo.BoundMulti.class, "otherParDo"); AggregatorProvidingDoFn otherFn = new AggregatorProvidingDoFn<>(); Aggregator aggregatorTwo = otherFn.addAggregator(Sum.ofDoubles()); - when(otherBound.getFn()).thenReturn(otherFn); + when(otherParDo.getFn()).thenReturn(otherFn); TransformHierarchy.Node transformNode = mock(TransformHierarchy.Node.class); - when(transformNode.getTransform()).thenReturn(bound); + when(transformNode.getTransform()).thenReturn(parDo); TransformHierarchy.Node otherTransformNode = mock(TransformHierarchy.Node.class); - when(otherTransformNode.getTransform()).thenReturn(otherBound); + when(otherTransformNode.getTransform()).thenReturn(otherParDo); doAnswer(new VisitNodesAnswer(ImmutableList.of(transformNode, otherTransformNode))) .when(p) @@ -186,8 +186,8 @@ public void testGetAggregatorStepsWithDifferentStepsAddsSteps() { Map, Collection>> aggregatorSteps = extractor.getAggregatorSteps(); - assertEquals(ImmutableSet.>of(bound), aggregatorSteps.get(aggregatorOne)); - assertEquals(ImmutableSet.>of(otherBound), aggregatorSteps.get(aggregatorTwo)); + assertEquals(ImmutableSet.>of(parDo), aggregatorSteps.get(aggregatorOne)); + assertEquals(ImmutableSet.>of(otherParDo), aggregatorSteps.get(aggregatorTwo)); assertEquals(2, aggregatorSteps.size()); } diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformHierarchyTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformHierarchyTest.java index 426fd8bd42e4..56dc743c55b2 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformHierarchyTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformHierarchyTest.java @@ -46,7 +46,7 @@ import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; -import org.apache.beam.sdk.transforms.ParDo.Bound; +import org.apache.beam.sdk.transforms.ParDo.SingleOutput; import org.apache.beam.sdk.transforms.ParDo.BoundMulti; import org.apache.beam.sdk.util.WindowingStrategy; import org.apache.beam.sdk.values.PBegin; @@ -245,7 +245,7 @@ public POutput expand(PInput input) { @Test public void replaceWithCompositeSucceeds() { - final ParDo.Bound originalParDo = + final SingleOutput originalParDo = ParDo.of( new DoFn() { @ProcessElement @@ -324,7 +324,7 @@ public void visitVisitsAllPushed() { PCollection.createPrimitiveOutputInternal( pipeline, WindowingStrategy.globalDefault(), IsBounded.BOUNDED); - ParDo.Bound pardo = + SingleOutput pardo = ParDo.of( new DoFn() { @ProcessElement @@ -408,7 +408,7 @@ public void visitValue(PValue value, TransformHierarchy.Node producer) { @Test public void visitAfterReplace() { Node root = hierarchy.getCurrent(); - final Bound originalParDo = + final SingleOutput originalParDo = ParDo.of( new DoFn() { @ProcessElement diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java index 19265e163f87..9f621f86630b 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java @@ -72,11 +72,10 @@ import org.apache.beam.sdk.testing.ValidatesRunner; import org.apache.beam.sdk.transforms.DoFn.OnTimer; import org.apache.beam.sdk.transforms.DoFn.ProcessElement; -import org.apache.beam.sdk.transforms.ParDo.Bound; +import org.apache.beam.sdk.transforms.ParDo.SingleOutput; import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.transforms.display.DisplayData.Builder; import org.apache.beam.sdk.transforms.display.DisplayDataMatchers; -import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.FixedWindows; import org.apache.beam.sdk.transforms.windowing.GlobalWindow; @@ -1511,7 +1510,7 @@ public void populateDisplayData(Builder builder) { } }; - Bound parDo = ParDo.of(fn); + SingleOutput parDo = ParDo.of(fn); DisplayData displayData = DisplayData.from(parDo); assertThat(displayData, hasDisplayItem(allOf( @@ -1534,7 +1533,7 @@ public void populateDisplayData(Builder builder) { } }; - Bound parDo = ParDo.of(fn); + SingleOutput parDo = ParDo.of(fn); DisplayData displayData = DisplayData.from(parDo); assertThat(displayData, includesDisplayDataFor("fn", fn)); diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/values/TypedPValueTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/values/TypedPValueTest.java index 18d550cd201d..211dfd999cb4 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/values/TypedPValueTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/values/TypedPValueTest.java @@ -161,7 +161,7 @@ public void testParDoWithNoSideOutputsErrorDoesNotMentionTupleTag() { public void testFinishSpecifyingShouldFailIfNoCoderInferrable() { p.enableAbandonedNodeEnforcement(false); PCollection created = p.apply(Create.of(1, 2, 3)); - ParDo.Bound uninferrableParDo = ParDo.of(new EmptyClassDoFn()); + ParDo.SingleOutput uninferrableParDo = ParDo.of(new EmptyClassDoFn()); PCollection unencodable = created.apply(uninferrableParDo); From 7947734c6157a16298d2e1f84937685cfb3afd71 Mon Sep 17 00:00:00 2001 From: Eugene Kirpichov Date: Fri, 3 Mar 2017 11:13:10 -0800 Subject: [PATCH 4/4] Renamed ParDo.BoundMulti to ParDo.MultiOutput --- .../translation/ApexPipelineTranslator.java | 2 +- .../apex/translation/ParDoTranslator.java | 8 ++++---- .../core/construction/PTransformMatchers.java | 20 +++++++++---------- .../beam/runners/core/SplittableParDo.java | 8 ++++---- .../runners/core/SplittableParDoTest.java | 2 +- .../beam/runners/direct/DirectRunner.java | 5 +++-- .../direct/KeyedPValueTrackingVisitor.java | 4 ++-- .../runners/direct/ParDoEvaluatorFactory.java | 8 ++++---- .../direct/ParDoMultiOverrideFactory.java | 20 +++++++++---------- .../direct/TransformEvaluatorRegistry.java | 2 +- .../flink/FlinkBatchTransformTranslators.java | 6 +++--- .../FlinkStreamingTransformTranslators.java | 6 +++--- .../dataflow/BatchStatefulParDoOverrides.java | 13 ++++++------ .../dataflow/DataflowPipelineTranslator.java | 8 ++++---- .../translation/TransformTranslator.java | 9 +++++---- .../StreamingTransformTranslator.java | 8 ++++---- .../streaming/TrackStreamingSourcesTest.java | 4 ++-- .../beam/sdk/AggregatorPipelineExtractor.java | 4 ++-- .../org/apache/beam/sdk/transforms/ParDo.java | 14 ++++++------- .../transforms/windowing/WindowMappingFn.java | 4 ++-- .../apache/beam/sdk/values/TypedPValue.java | 2 +- .../sdk/AggregatorPipelineExtractorTest.java | 8 ++++---- .../sdk/runners/TransformHierarchyTest.java | 6 +++--- .../apache/beam/sdk/transforms/ParDoTest.java | 6 +++--- 24 files changed, 89 insertions(+), 88 deletions(-) diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ApexPipelineTranslator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ApexPipelineTranslator.java index 7eb955123cbe..42ff14433ce8 100644 --- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ApexPipelineTranslator.java +++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ApexPipelineTranslator.java @@ -59,7 +59,7 @@ public class ApexPipelineTranslator implements Pipeline.PipelineVisitor { static { // register TransformTranslators - registerTransformTranslator(ParDo.BoundMulti.class, new ParDoTranslator<>()); + registerTransformTranslator(ParDo.MultiOutput.class, new ParDoTranslator<>()); registerTransformTranslator(Read.Unbounded.class, new ReadUnboundedTranslator()); registerTransformTranslator(Read.Bounded.class, new ReadBoundedTranslator()); registerTransformTranslator(GroupByKey.class, new GroupByKeyTranslator()); diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ParDoTranslator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ParDoTranslator.java index 5ffc3c389a68..75722c793ee3 100644 --- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ParDoTranslator.java +++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ParDoTranslator.java @@ -44,15 +44,15 @@ import org.slf4j.LoggerFactory; /** - * {@link ParDo.BoundMulti} is translated to {@link ApexParDoOperator} that wraps the {@link DoFn}. + * {@link ParDo.MultiOutput} is translated to {@link ApexParDoOperator} that wraps the {@link DoFn}. */ class ParDoTranslator - implements TransformTranslator> { + implements TransformTranslator> { private static final long serialVersionUID = 1L; private static final Logger LOG = LoggerFactory.getLogger(ParDoTranslator.class); @Override - public void translate(ParDo.BoundMulti transform, TranslationContext context) { + public void translate(ParDo.MultiOutput transform, TranslationContext context) { DoFn doFn = transform.getFn(); DoFnSignature signature = DoFnSignatures.getSignature(doFn.getClass()); @@ -105,7 +105,7 @@ public void translate(ParDo.BoundMulti transform, TranslationCo checkArgument( output.getValue() instanceof PCollection, "%s %s outputs non-PCollection %s of type %s", - ParDo.BoundMulti.class.getSimpleName(), + ParDo.MultiOutput.class.getSimpleName(), context.getFullName(), output.getValue(), output.getValue().getClass().getSimpleName()); diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformMatchers.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformMatchers.java index f803a9fe1ef4..d5a91a79ce16 100644 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformMatchers.java +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformMatchers.java @@ -109,7 +109,7 @@ public boolean matches(AppliedPTransform application) { } /** - * A {@link PTransformMatcher} that matches a {@link ParDo.BoundMulti} containing a {@link DoFn} + * A {@link PTransformMatcher} that matches a {@link ParDo.MultiOutput} containing a {@link DoFn} * that is splittable, as signified by {@link ProcessElementMethod#isSplittable()}. */ public static PTransformMatcher splittableParDoMulti() { @@ -117,8 +117,8 @@ public static PTransformMatcher splittableParDoMulti() { @Override public boolean matches(AppliedPTransform application) { PTransform transform = application.getTransform(); - if (transform instanceof ParDo.BoundMulti) { - DoFn fn = ((ParDo.BoundMulti) transform).getFn(); + if (transform instanceof ParDo.MultiOutput) { + DoFn fn = ((ParDo.MultiOutput) transform).getFn(); DoFnSignature signature = DoFnSignatures.signatureForDoFn(fn); return signature.processElement().isSplittable(); } @@ -128,7 +128,7 @@ public boolean matches(AppliedPTransform application) { } /** - * A {@link PTransformMatcher} that matches a {@link ParDo.BoundMulti} containing a {@link DoFn} + * A {@link PTransformMatcher} that matches a {@link ParDo.MultiOutput} containing a {@link DoFn} * that uses state or timers, as specified by {@link DoFnSignature#usesState()} and * {@link DoFnSignature#usesTimers()}. */ @@ -137,8 +137,8 @@ public static PTransformMatcher stateOrTimerParDoMulti() { @Override public boolean matches(AppliedPTransform application) { PTransform transform = application.getTransform(); - if (transform instanceof ParDo.BoundMulti) { - DoFn fn = ((ParDo.BoundMulti) transform).getFn(); + if (transform instanceof ParDo.MultiOutput) { + DoFn fn = ((ParDo.MultiOutput) transform).getFn(); DoFnSignature signature = DoFnSignatures.signatureForDoFn(fn); return signature.usesState() || signature.usesTimers(); } @@ -148,8 +148,8 @@ public boolean matches(AppliedPTransform application) { } /** - * A {@link PTransformMatcher} which matches a {@link ParDo.SingleOutput} or {@link ParDo.BoundMulti} - * where the {@link DoFn} is of the provided type. + * A {@link PTransformMatcher} which matches a {@link ParDo.SingleOutput} or {@link + * ParDo.MultiOutput} where the {@link DoFn} is of the provided type. */ public static PTransformMatcher parDoWithFnType(final Class fnType) { return new PTransformMatcher() { @@ -158,8 +158,8 @@ public boolean matches(AppliedPTransform application) { DoFn fn; if (application.getTransform() instanceof ParDo.SingleOutput) { fn = ((ParDo.SingleOutput) application.getTransform()).getFn(); - } else if (application.getTransform() instanceof ParDo.BoundMulti) { - fn = ((ParDo.BoundMulti) application.getTransform()).getFn(); + } else if (application.getTransform() instanceof ParDo.MultiOutput) { + fn = ((ParDo.MultiOutput) application.getTransform()).getFn(); } else { return false; } diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java index 544bfa012e05..0b311c775110 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java @@ -82,14 +82,14 @@ @Experimental(Experimental.Kind.SPLITTABLE_DO_FN) public class SplittableParDo extends PTransform, PCollectionTuple> { - private final ParDo.BoundMulti parDo; + private final ParDo.MultiOutput parDo; /** * Creates the transform for the given original multi-output {@link ParDo}. * * @param parDo The splittable {@link ParDo} transform. */ - public SplittableParDo(ParDo.BoundMulti parDo) { + public SplittableParDo(ParDo.MultiOutput parDo) { checkNotNull(parDo, "parDo must not be null"); this.parDo = parDo; checkArgument( @@ -248,7 +248,7 @@ public PCollectionTuple expand( windowingStrategy, input.isBounded().and(signature.isBoundedPerElement())); - // Set output type descriptor similarly to how ParDo.BoundMulti does it. + // Set output type descriptor similarly to how ParDo.MultiOutput does it. outputs.get(mainOutputTag).setTypeDescriptor(fn.getOutputTypeDescriptor()); return outputs; @@ -260,7 +260,7 @@ public Coder getDefaultOutputCoder( input, TypedPValue output) throws CannotProvideCoderException { - // Similar logic to ParDo.BoundMulti.getDefaultOutputCoder. + // Similar logic to ParDo.MultiOutput.getDefaultOutputCoder. @SuppressWarnings("unchecked") KeyedWorkItemCoder> kwiCoder = (KeyedWorkItemCoder) input.getCoder(); diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java index af547c226911..ee94ee07e611 100644 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java @@ -135,7 +135,7 @@ private static PCollection makeBoundedCollection(Pipeline pipeline) { private static final TupleTag MAIN_OUTPUT_TAG = new TupleTag() {}; - private ParDo.BoundMulti makeParDo(DoFn fn) { + private ParDo.MultiOutput makeParDo(DoFn fn) { return ParDo.of(fn).withOutputTags(MAIN_OUTPUT_TAG, TupleTagList.empty()); } diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java index 4ee364f85fc9..11fe3f50270b 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java @@ -53,6 +53,7 @@ import org.apache.beam.sdk.transforms.GroupByKey; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.ParDo.MultiOutput; import org.apache.beam.sdk.transforms.View.CreatePCollectionView; import org.apache.beam.sdk.util.UserCodeException; import org.apache.beam.sdk.util.WindowedValue; @@ -168,7 +169,7 @@ interface PCollectionViewWriter { /** The set of {@link PTransform PTransforms} that execute a UDF. Useful for some enforcements. */ private static final Set> CONTAINS_UDF = ImmutableSet.of( - Read.Bounded.class, Read.Unbounded.class, ParDo.SingleOutput.class, ParDo.BoundMulti.class); + Read.Bounded.class, Read.Unbounded.class, ParDo.SingleOutput.class, MultiOutput.class); enum Enforcement { ENCODABILITY { @@ -222,7 +223,7 @@ public static BundleFactory bundleFactoryFor(Set enforcements, Dire } Collection parDoEnforcements = enabledParDoEnforcements.build(); enforcements.put(ParDo.SingleOutput.class, parDoEnforcements); - enforcements.put(ParDo.BoundMulti.class, parDoEnforcements); + enforcements.put(MultiOutput.class, parDoEnforcements); return enforcements.build(); } diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/KeyedPValueTrackingVisitor.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/KeyedPValueTrackingVisitor.java index 32eb692ccbab..02b1bed280c3 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/KeyedPValueTrackingVisitor.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/KeyedPValueTrackingVisitor.java @@ -116,8 +116,8 @@ private static boolean isKeyPreserving(PTransform transform) { // The most obvious alternative would be a package-private marker interface, but // better to make this obviously hacky so it is less likely to proliferate. Meanwhile // we intend to allow explicit expression of key-preserving DoFn in the model. - if (transform instanceof ParDo.BoundMulti) { - ParDo.BoundMulti parDo = (ParDo.BoundMulti) transform; + if (transform instanceof ParDo.MultiOutput) { + ParDo.MultiOutput parDo = (ParDo.MultiOutput) transform; return parDo.getFn() instanceof ParDoMultiOverrideFactory.ToKeyedWorkItem; } else { return false; diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluatorFactory.java index 7d6a8ea0ce7f..b0e97fbfc405 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluatorFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluatorFactory.java @@ -37,7 +37,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -/** A {@link TransformEvaluatorFactory} for {@link ParDo.BoundMulti}. */ +/** A {@link TransformEvaluatorFactory} for {@link ParDo.MultiOutput}. */ final class ParDoEvaluatorFactory implements TransformEvaluatorFactory { private static final Logger LOG = LoggerFactory.getLogger(ParDoEvaluatorFactory.class); @@ -62,13 +62,13 @@ public TransformEvaluator forApplication( AppliedPTransform application, CommittedBundle inputBundle) throws Exception { @SuppressWarnings("unchecked") - AppliedPTransform, PCollectionTuple, ParDo.BoundMulti> + AppliedPTransform, PCollectionTuple, ParDo.MultiOutput> parDoApplication = (AppliedPTransform< - PCollection, PCollectionTuple, ParDo.BoundMulti>) + PCollection, PCollectionTuple, ParDo.MultiOutput>) application; - ParDo.BoundMulti transform = parDoApplication.getTransform(); + ParDo.MultiOutput transform = parDoApplication.getTransform(); final DoFn doFn = transform.getFn(); @SuppressWarnings({"unchecked", "rawtypes"}) diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java index c9990936c6a2..4604fccecbbe 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java @@ -36,7 +36,7 @@ import org.apache.beam.sdk.transforms.GroupByKey; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; -import org.apache.beam.sdk.transforms.ParDo.BoundMulti; +import org.apache.beam.sdk.transforms.ParDo.MultiOutput; import org.apache.beam.sdk.transforms.reflect.DoFnSignature; import org.apache.beam.sdk.transforms.reflect.DoFnSignatures; import org.apache.beam.sdk.transforms.windowing.AfterPane; @@ -61,11 +61,11 @@ */ class ParDoMultiOverrideFactory implements PTransformOverrideFactory< - PCollection, PCollectionTuple, BoundMulti> { + PCollection, PCollectionTuple, MultiOutput> { @Override @SuppressWarnings("unchecked") public PTransform, PCollectionTuple> getReplacementTransform( - BoundMulti transform) { + MultiOutput transform) { DoFn fn = transform.getFn(); DoFnSignature signature = DoFnSignatures.getSignature(fn.getClass()); @@ -75,8 +75,8 @@ public PTransform, PCollectionTuple> getReplacemen || signature.timerDeclarations().size() > 0) { // Based on the fact that the signature is stateful, DoFnSignatures ensures // that it is also keyed - ParDo.BoundMulti, OutputT> keyedTransform = - (ParDo.BoundMulti, OutputT>) transform; + MultiOutput, OutputT> keyedTransform = + (MultiOutput, OutputT>) transform; return new GbkThenStatefulParDo(keyedTransform); } else { @@ -98,9 +98,9 @@ public Map mapOutputs( static class GbkThenStatefulParDo extends PTransform>, PCollectionTuple> { - private final ParDo.BoundMulti, OutputT> underlyingParDo; + private final MultiOutput, OutputT> underlyingParDo; - public GbkThenStatefulParDo(ParDo.BoundMulti, OutputT> underlyingParDo) { + public GbkThenStatefulParDo(MultiOutput, OutputT> underlyingParDo) { this.underlyingParDo = underlyingParDo; } @@ -165,17 +165,17 @@ public PCollectionTuple expand(PCollection> input) { static class StatefulParDo extends PTransform>>, PCollectionTuple> { - private final transient ParDo.BoundMulti, OutputT> underlyingParDo; + private final transient MultiOutput, OutputT> underlyingParDo; private final transient PCollection> originalInput; public StatefulParDo( - ParDo.BoundMulti, OutputT> underlyingParDo, + MultiOutput, OutputT> underlyingParDo, PCollection> originalInput) { this.underlyingParDo = underlyingParDo; this.originalInput = originalInput; } - public ParDo.BoundMulti, OutputT> getUnderlyingParDo() { + public MultiOutput, OutputT> getUnderlyingParDo() { return underlyingParDo; } diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java index 62fee5309a38..5ad8709f6b9c 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java @@ -51,7 +51,7 @@ public static TransformEvaluatorRegistry defaultRegistry(EvaluationContext ctxt) ImmutableMap., TransformEvaluatorFactory>builder() .put(Read.Bounded.class, new BoundedReadEvaluatorFactory(ctxt)) .put(Read.Unbounded.class, new UnboundedReadEvaluatorFactory(ctxt)) - .put(ParDo.BoundMulti.class, new ParDoEvaluatorFactory<>(ctxt)) + .put(ParDo.MultiOutput.class, new ParDoEvaluatorFactory<>(ctxt)) .put(StatefulParDo.class, new StatefulParDoEvaluatorFactory<>(ctxt)) .put(PCollections.class, new FlattenEvaluatorFactory(ctxt)) .put(ViewEvaluatorFactory.WriteView.class, new ViewEvaluatorFactory(ctxt)) diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkBatchTransformTranslators.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkBatchTransformTranslators.java index 31a6bdace118..1d6728b8f525 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkBatchTransformTranslators.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkBatchTransformTranslators.java @@ -112,7 +112,7 @@ class FlinkBatchTransformTranslators { TRANSLATORS.put(Window.Assign.class, new WindowAssignTranslatorBatch()); - TRANSLATORS.put(ParDo.BoundMulti.class, new ParDoTranslatorBatch()); + TRANSLATORS.put(ParDo.MultiOutput.class, new ParDoTranslatorBatch()); TRANSLATORS.put(Read.Bounded.class, new ReadSourceTranslatorBatch()); } @@ -499,12 +499,12 @@ private static void rejectSplittable(DoFn doFn) { private static class ParDoTranslatorBatch implements FlinkBatchPipelineTranslator.BatchTransformTranslator< - ParDo.BoundMulti> { + ParDo.MultiOutput> { @Override @SuppressWarnings("unchecked") public void translateNode( - ParDo.BoundMulti transform, + ParDo.MultiOutput transform, FlinkBatchTranslationContext context) { DoFn doFn = transform.getFn(); rejectSplittable(doFn); diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java index 7227dceddbc1..00b0412223db 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java @@ -121,7 +121,7 @@ class FlinkStreamingTransformTranslators { TRANSLATORS.put(Write.class, new WriteSinkStreamingTranslator()); TRANSLATORS.put(TextIO.Write.Bound.class, new TextIOWriteBoundStreamingTranslator()); - TRANSLATORS.put(ParDo.BoundMulti.class, new ParDoStreamingTranslator()); + TRANSLATORS.put(ParDo.MultiOutput.class, new ParDoStreamingTranslator()); TRANSLATORS.put(Window.Assign.class, new WindowAssignTranslator()); TRANSLATORS.put(Flatten.PCollections.class, new FlattenPCollectionTranslator()); @@ -398,11 +398,11 @@ public RawUnionValue map(T o) throws Exception { private static class ParDoStreamingTranslator extends FlinkStreamingPipelineTranslator.StreamTransformTranslator< - ParDo.BoundMulti> { + ParDo.MultiOutput> { @Override public void translateNode( - ParDo.BoundMulti transform, + ParDo.MultiOutput transform, FlinkStreamingTranslationContext context) { DoFn doFn = transform.getFn(); diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchStatefulParDoOverrides.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchStatefulParDoOverrides.java index 82629db0164c..1d19d64907f9 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchStatefulParDoOverrides.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchStatefulParDoOverrides.java @@ -33,7 +33,6 @@ import org.apache.beam.sdk.transforms.GroupByKey; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; -import org.apache.beam.sdk.transforms.ParDo.BoundMulti; import org.apache.beam.sdk.transforms.reflect.DoFnSignature; import org.apache.beam.sdk.transforms.reflect.DoFnSignatures; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; @@ -76,7 +75,7 @@ public class BatchStatefulParDoOverrides { public static PTransformOverrideFactory< PCollection>, PCollectionTuple, - ParDo.BoundMulti, OutputT>> + ParDo.MultiOutput, OutputT>> multiOutputOverrideFactory() { return new MultiOutputOverrideFactory<>(); } @@ -107,12 +106,12 @@ public Map mapOutputs( private static class MultiOutputOverrideFactory implements PTransformOverrideFactory< - PCollection>, PCollectionTuple, ParDo.BoundMulti, OutputT>> { + PCollection>, PCollectionTuple, ParDo.MultiOutput, OutputT>> { @Override @SuppressWarnings("unchecked") public PTransform>, PCollectionTuple> getReplacementTransform( - BoundMulti, OutputT> originalParDo) { + ParDo.MultiOutput, OutputT> originalParDo) { return new StatefulMultiOutputParDo<>(originalParDo); } @@ -159,9 +158,9 @@ public PCollection expand(PCollection> input) { static class StatefulMultiOutputParDo extends PTransform>, PCollectionTuple> { - private final BoundMulti, OutputT> originalParDo; + private final ParDo.MultiOutput, OutputT> originalParDo; - StatefulMultiOutputParDo(ParDo.BoundMulti, OutputT> originalParDo) { + StatefulMultiOutputParDo(ParDo.MultiOutput, OutputT> originalParDo) { this.originalParDo = originalParDo; } @@ -182,7 +181,7 @@ public PCollectionTuple expand(PCollection> input) { return input.apply(new GbkBeforeStatefulParDo()).apply(statefulParDo); } - public BoundMulti, OutputT> getOriginalParDo() { + public ParDo.MultiOutput, OutputT> getOriginalParDo() { return originalParDo; } } diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java index db965945580c..6d231b901264 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java @@ -821,15 +821,15 @@ private void groupByKeyHelper( }); registerTransformTranslator( - ParDo.BoundMulti.class, - new TransformTranslator() { + ParDo.MultiOutput.class, + new TransformTranslator() { @Override - public void translate(ParDo.BoundMulti transform, TranslationContext context) { + public void translate(ParDo.MultiOutput transform, TranslationContext context) { translateMultiHelper(transform, context); } private void translateMultiHelper( - ParDo.BoundMulti transform, TranslationContext context) { + ParDo.MultiOutput transform, TranslationContext context) { StepTranslationContext stepContext = context.addStep(transform, "ParallelDo"); translateInputs( diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java index ffb207a31e72..1b846b4cb99c 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java @@ -355,11 +355,12 @@ public String toNativeString() { }; } - private static TransformEvaluator> + private static TransformEvaluator> parDo() { - return new TransformEvaluator>() { + return new TransformEvaluator>() { @Override - public void evaluate(ParDo.BoundMulti transform, EvaluationContext context) { + public void evaluate( + ParDo.MultiOutput transform, EvaluationContext context) { String stepName = context.getCurrentTransform().getFullName(); DoFn doFn = transform.getFn(); rejectSplittable(doFn); @@ -847,7 +848,7 @@ public String toNativeString() { EVALUATORS.put(Read.Bounded.class, readBounded()); EVALUATORS.put(HadoopIO.Read.Bound.class, readHadoop()); EVALUATORS.put(HadoopIO.Write.Bound.class, writeHadoop()); - EVALUATORS.put(ParDo.BoundMulti.class, parDo()); + EVALUATORS.put(ParDo.MultiOutput.class, parDo()); EVALUATORS.put(GroupByKey.class, groupByKey()); EVALUATORS.put(Combine.GroupedValues.class, combineGrouped()); EVALUATORS.put(Combine.Globally.class, combineGlobally()); diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java index 25fecf6dd85b..3bcff25b3bb3 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java @@ -366,11 +366,11 @@ public String toNativeString() { }; } - private static TransformEvaluator> + private static TransformEvaluator> multiDo() { - return new TransformEvaluator>() { + return new TransformEvaluator>() { public void evaluate( - final ParDo.BoundMulti transform, final EvaluationContext context) { + final ParDo.MultiOutput transform, final EvaluationContext context) { final DoFn doFn = transform.getFn(); rejectSplittable(doFn); rejectStateAndTimers(doFn); @@ -523,7 +523,7 @@ public JavaRDD>> call( EVALUATORS.put(Read.Unbounded.class, readUnbounded()); EVALUATORS.put(GroupByKey.class, groupByKey()); EVALUATORS.put(Combine.GroupedValues.class, combineGrouped()); - EVALUATORS.put(ParDo.BoundMulti.class, multiDo()); + EVALUATORS.put(ParDo.MultiOutput.class, multiDo()); EVALUATORS.put(ConsoleIO.Write.Unbound.class, print()); EVALUATORS.put(CreateStream.class, createFromQueue()); EVALUATORS.put(Window.Assign.class, window()); diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/TrackStreamingSourcesTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/TrackStreamingSourcesTest.java index d66633b4c49b..41ccd0837dcb 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/TrackStreamingSourcesTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/TrackStreamingSourcesTest.java @@ -83,7 +83,7 @@ public void testTrackSingle() { p.apply(emptyStream).apply(ParDo.of(new PassthroughFn<>())); - p.traverseTopologically(new StreamingSourceTracker(jssc, p, ParDo.BoundMulti.class, 0)); + p.traverseTopologically(new StreamingSourceTracker(jssc, p, ParDo.MultiOutput.class, 0)); assertThat(StreamingSourceTracker.numAssertions, equalTo(1)); } @@ -111,7 +111,7 @@ public void testTrackFlattened() { PCollectionList.of(pcol1).and(pcol2).apply(Flatten.pCollections()); flattened.apply(ParDo.of(new PassthroughFn<>())); - p.traverseTopologically(new StreamingSourceTracker(jssc, p, ParDo.BoundMulti.class, 0, 1)); + p.traverseTopologically(new StreamingSourceTracker(jssc, p, ParDo.MultiOutput.class, 0, 1)); assertThat(StreamingSourceTracker.numAssertions, equalTo(1)); } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/AggregatorPipelineExtractor.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/AggregatorPipelineExtractor.java index 7a422b8268d0..8804f55d34bb 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/AggregatorPipelineExtractor.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/AggregatorPipelineExtractor.java @@ -71,9 +71,9 @@ public void visitPrimitiveTransform(TransformHierarchy.Node node) { if (transform != null) { if (transform instanceof ParDo.SingleOutput) { return AggregatorRetriever.getAggregators(((ParDo.SingleOutput) transform).getFn()); - } else if (transform instanceof ParDo.BoundMulti) { + } else if (transform instanceof ParDo.MultiOutput) { return AggregatorRetriever.getAggregators( - ((ParDo.BoundMulti) transform).getFn()); + ((ParDo.MultiOutput) transform).getFn()); } } return Collections.emptyList(); diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java index 2c5f664e2b7c..39bf3c5d41c7 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java @@ -547,9 +547,9 @@ public SingleOutput withSideInputs( * *

See the discussion of Side Outputs above for more explanation. */ - public BoundMulti withOutputTags( + public MultiOutput withOutputTags( TupleTag mainOutputTag, TupleTagList sideOutputTags) { - return new BoundMulti<>(fn, sideInputs, mainOutputTag, sideOutputTags, fnDisplayData); + return new MultiOutput<>(fn, sideInputs, mainOutputTag, sideOutputTags, fnDisplayData); } @Override @@ -606,7 +606,7 @@ public List> getSideInputs() { * @param the type of the (main) input {@code PCollection} elements * @param the type of the main output {@code PCollection} elements */ - public static class BoundMulti + public static class MultiOutput extends PTransform, PCollectionTuple> { private final List> sideInputs; private final TupleTag mainOutputTag; @@ -614,7 +614,7 @@ public static class BoundMulti private final DisplayData.ItemSpec> fnDisplayData; private final DoFn fn; - BoundMulti( + MultiOutput( DoFn fn, List> sideInputs, TupleTag mainOutputTag, @@ -634,7 +634,7 @@ public static class BoundMulti * *

See the discussion of Side Inputs above for more explanation. */ - public BoundMulti withSideInputs( + public MultiOutput withSideInputs( PCollectionView... sideInputs) { return withSideInputs(Arrays.asList(sideInputs)); } @@ -646,9 +646,9 @@ public BoundMulti withSideInputs( * *

See the discussion of Side Inputs above for more explanation. */ - public BoundMulti withSideInputs( + public MultiOutput withSideInputs( Iterable> sideInputs) { - return new BoundMulti<>( + return new MultiOutput<>( fn, ImmutableList.>builder() .addAll(this.sideInputs) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/WindowMappingFn.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/WindowMappingFn.java index 910ed98aefdb..c144aba3a381 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/WindowMappingFn.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/WindowMappingFn.java @@ -19,14 +19,14 @@ package org.apache.beam.sdk.transforms.windowing; import java.io.Serializable; -import org.apache.beam.sdk.transforms.ParDo.BoundMulti; +import org.apache.beam.sdk.transforms.ParDo.MultiOutput; import org.apache.beam.sdk.values.PCollectionView; import org.joda.time.Duration; /** * A function that takes the windows of elements in a main input and maps them to the appropriate * window in a {@link PCollectionView} consumed as a - * {@link BoundMulti#withSideInputs(PCollectionView[]) side input}. + * {@link MultiOutput#withSideInputs(PCollectionView[]) side input}. */ public abstract class WindowMappingFn implements Serializable { private final Duration maximumLookback; diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TypedPValue.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TypedPValue.java index de1b99cdc492..d35383554e69 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TypedPValue.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TypedPValue.java @@ -152,7 +152,7 @@ private CoderOrFailure inferCoderOrFail( // and provide a better error message if so. Unfortunately, this information is not // directly available from the TypeDescriptor, so infer based on the type of the PTransform // and the error message itself. - if (transform instanceof ParDo.BoundMulti + if (transform instanceof ParDo.MultiOutput && exc.getReason() == ReasonCode.TYPE_ERASURE) { inferFromTokenException = new CannotProvideCoderException(exc.getMessage() + " If this error occurs for a side output of the producing ParDo, verify that the " diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/AggregatorPipelineExtractorTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/AggregatorPipelineExtractorTest.java index 52bcc93a21ab..0d188403d3bf 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/AggregatorPipelineExtractorTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/AggregatorPipelineExtractorTest.java @@ -92,9 +92,9 @@ public void testGetAggregatorStepsWithParDoSingleOutputExtractsSteps() { @SuppressWarnings("unchecked") @Test - public void testGetAggregatorStepsWithParDoBoundMultiExtractsSteps() { + public void testGetAggregatorStepsWithParDoMultiOutputExtractsSteps() { @SuppressWarnings("rawtypes") - ParDo.BoundMulti parDo = mock(ParDo.BoundMulti.class, "parDo"); + ParDo.MultiOutput parDo = mock(ParDo.MultiOutput.class, "parDo"); AggregatorProvidingDoFn fn = new AggregatorProvidingDoFn<>(); when(parDo.getFn()).thenReturn(fn); @@ -124,7 +124,7 @@ public void testGetAggregatorStepsWithOneAggregatorInMultipleStepsAddsSteps() { @SuppressWarnings("rawtypes") ParDo.SingleOutput parDo = mock(ParDo.SingleOutput.class, "parDo"); @SuppressWarnings("rawtypes") - ParDo.BoundMulti otherParDo = mock(ParDo.BoundMulti.class, "otherParDo"); + ParDo.MultiOutput otherParDo = mock(ParDo.MultiOutput.class, "otherParDo"); AggregatorProvidingDoFn fn = new AggregatorProvidingDoFn<>(); when(parDo.getFn()).thenReturn(fn); when(otherParDo.getFn()).thenReturn(fn); @@ -165,7 +165,7 @@ public void testGetAggregatorStepsWithDifferentStepsAddsSteps() { when(parDo.getFn()).thenReturn(fn); @SuppressWarnings("rawtypes") - ParDo.BoundMulti otherParDo = mock(ParDo.BoundMulti.class, "otherParDo"); + ParDo.MultiOutput otherParDo = mock(ParDo.MultiOutput.class, "otherParDo"); AggregatorProvidingDoFn otherFn = new AggregatorProvidingDoFn<>(); Aggregator aggregatorTwo = otherFn.addAggregator(Sum.ofDoubles()); diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformHierarchyTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformHierarchyTest.java index 56dc743c55b2..f62b32038b72 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformHierarchyTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformHierarchyTest.java @@ -46,8 +46,8 @@ import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.ParDo.MultiOutput; import org.apache.beam.sdk.transforms.ParDo.SingleOutput; -import org.apache.beam.sdk.transforms.ParDo.BoundMulti; import org.apache.beam.sdk.util.WindowingStrategy; import org.apache.beam.sdk.values.PBegin; import org.apache.beam.sdk.values.PCollection; @@ -268,7 +268,7 @@ public void processElement(ProcessContext ctxt) { hierarchy.popNode(); final TupleTag longs = new TupleTag<>(); - final ParDo.BoundMulti replacementParDo = + final MultiOutput replacementParDo = ParDo.of( new DoFn() { @ProcessElement @@ -431,7 +431,7 @@ public void processElement(ProcessContext ctxt) { hierarchy.popNode(); final TupleTag longs = new TupleTag<>(); - final BoundMulti replacementParDo = + final MultiOutput replacementParDo = ParDo.of( new DoFn() { @ProcessElement diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java index 9f621f86630b..cbbbe5f7a3ab 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java @@ -898,7 +898,7 @@ public void processElement(ProcessContext cxt) { } }; - ParDo.BoundMulti parDo = + ParDo.MultiOutput parDo = ParDo.of(fn).withOutputTags(mainOut, TupleTagList.of(sideOutOne).and(sideOutTwo)); PCollectionTuple firstApplication = longs.apply("first", parDo); PCollectionTuple secondApplication = longs.apply("second", parDo); @@ -1161,7 +1161,7 @@ public void testSideOutputUnregisteredExplicitCoder() throws Exception { final TupleTag mainOutputTag = new TupleTag("main"); final TupleTag sideOutputTag = new TupleTag("unregisteredSide"); - ParDo.BoundMulti pardo = ParDo.of(new SideOutputDummyFn(sideOutputTag)) + ParDo.MultiOutput pardo = ParDo.of(new SideOutputDummyFn(sideOutputTag)) .withOutputTags(mainOutputTag, TupleTagList.of(sideOutputTag)); PCollectionTuple outputTuple = input.apply(pardo); @@ -2301,7 +2301,7 @@ public void populateDisplayData(Builder builder) { } }; - ParDo.BoundMulti parDo = ParDo + ParDo.MultiOutput parDo = ParDo .of(fn) .withOutputTags(new TupleTag(), TupleTagList.empty());