From 4e6230cc734ab3dba081e04d135a285b73008270 Mon Sep 17 00:00:00 2001 From: Scott Wegner Date: Wed, 17 Aug 2016 14:38:36 -0700 Subject: [PATCH] Update DoFn javadocs to remove references to OldDoFn and Dataflow --- .../examples/common/PubsubFileInjector.java | 2 +- .../apache/beam/sdk/util/DoFnRunnerBase.java | 16 +- .../beam/sdk/util/GroupAlsoByWindowsDoFn.java | 2 +- .../apache/beam/sdk/util/ReduceFnRunner.java | 5 +- .../beam/sdk/util/SimpleDoFnRunner.java | 4 +- .../ImmutabilityCheckingBundleFactory.java | 4 +- .../direct/TransformEvaluatorFactory.java | 3 +- .../beam/runners/dataflow/util/DoFnInfo.java | 7 +- .../translation/MultiOutputWordCountTest.java | 2 +- .../spark/translation/SerializationTest.java | 4 +- .../org/apache/beam/sdk/AggregatorValues.java | 4 +- .../beam/sdk/transforms/Aggregator.java | 14 +- .../beam/sdk/transforms/CombineFns.java | 18 +- .../org/apache/beam/sdk/transforms/DoFn.java | 23 +- .../beam/sdk/transforms/DoFnTester.java | 62 ++-- .../beam/sdk/transforms/GroupByKey.java | 7 +- .../beam/sdk/transforms/PTransform.java | 2 +- .../org/apache/beam/sdk/transforms/ParDo.java | 306 +++++++++--------- .../beam/sdk/transforms/SimpleFunction.java | 6 +- .../sdk/transforms/windowing/PaneInfo.java | 10 +- .../beam/sdk/util/BaseExecutionContext.java | 4 +- .../util/ReifyTimestampAndWindowsDoFn.java | 4 +- .../beam/sdk/util/SerializableUtils.java | 2 +- .../beam/sdk/util/SystemDoFnInternal.java | 7 +- .../beam/sdk/util/WindowingInternals.java | 3 +- .../DoFnDelegatingAggregatorTest.java | 2 +- .../beam/sdk/transforms/DoFnTesterTest.java | 3 +- .../beam/sdk/transforms/NoOpOldDoFn.java | 2 +- 28 files changed, 263 insertions(+), 265 deletions(-) diff --git a/examples/java/src/main/java/org/apache/beam/examples/common/PubsubFileInjector.java b/examples/java/src/main/java/org/apache/beam/examples/common/PubsubFileInjector.java index e6a1495e545d..4634159826d3 100644 --- a/examples/java/src/main/java/org/apache/beam/examples/common/PubsubFileInjector.java +++ b/examples/java/src/main/java/org/apache/beam/examples/common/PubsubFileInjector.java @@ -69,7 +69,7 @@ public Bound publish(String outputTopic) { } } - /** A OldDoFn that publishes non-empty lines to Google Cloud PubSub. */ + /** A {@link OldDoFn} that publishes non-empty lines to Google Cloud PubSub. */ public static class Bound extends OldDoFn { private final String outputTopic; private final String timestampLabelKey; diff --git a/runners/core-java/src/main/java/org/apache/beam/sdk/util/DoFnRunnerBase.java b/runners/core-java/src/main/java/org/apache/beam/sdk/util/DoFnRunnerBase.java index 8a0f6bf868d9..04a0978b6030 100644 --- a/runners/core-java/src/main/java/org/apache/beam/sdk/util/DoFnRunnerBase.java +++ b/runners/core-java/src/main/java/org/apache/beam/sdk/util/DoFnRunnerBase.java @@ -58,10 +58,10 @@ */ public abstract class DoFnRunnerBase implements DoFnRunner { - /** The OldDoFn being run. */ + /** The {@link OldDoFn} being run. */ public final OldDoFn fn; - /** The context used for running the OldDoFn. */ + /** The context used for running the {@link OldDoFn}. */ public final DoFnContext context; protected DoFnRunnerBase( @@ -164,8 +164,8 @@ public void finishBundle() { /** * A concrete implementation of {@code OldDoFn.Context} used for running a {@link OldDoFn}. * - * @param the type of the OldDoFn's (main) input elements - * @param the type of the OldDoFn's (main) output elements + * @param the type of the {@link OldDoFn} (main) input elements + * @param the type of the {@link OldDoFn} (main) output elements */ private static class DoFnContext extends OldDoFn.Context { @@ -350,7 +350,7 @@ protected Aggregator createAggreg } /** - * Returns a new {@code OldDoFn.ProcessContext} for the given element. + * Returns a new {@link OldDoFn.ProcessContext} for the given element. */ protected OldDoFn.ProcessContext createProcessContext( WindowedValue elem) { @@ -366,11 +366,11 @@ private boolean isSystemDoFn() { } /** - * A concrete implementation of {@code OldDoFn.ProcessContext} used for + * A concrete implementation of {@link OldDoFn.ProcessContext} used for * running a {@link OldDoFn} over a single element. * - * @param the type of the OldDoFn's (main) input elements - * @param the type of the OldDoFn's (main) output elements + * @param the type of the {@link OldDoFn} (main) input elements + * @param the type of the {@link OldDoFn} (main) output elements */ static class DoFnProcessContext extends OldDoFn.ProcessContext { diff --git a/runners/core-java/src/main/java/org/apache/beam/sdk/util/GroupAlsoByWindowsDoFn.java b/runners/core-java/src/main/java/org/apache/beam/sdk/util/GroupAlsoByWindowsDoFn.java index f82e5dfe32e7..f386dfba1220 100644 --- a/runners/core-java/src/main/java/org/apache/beam/sdk/util/GroupAlsoByWindowsDoFn.java +++ b/runners/core-java/src/main/java/org/apache/beam/sdk/util/GroupAlsoByWindowsDoFn.java @@ -26,7 +26,7 @@ import org.apache.beam.sdk.values.KV; /** - * OldDoFn that merges windows and groups elements in those windows, optionally + * {@link OldDoFn} that merges windows and groups elements in those windows, optionally * combining values. * * @param key type diff --git a/runners/core-java/src/main/java/org/apache/beam/sdk/util/ReduceFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/sdk/util/ReduceFnRunner.java index 61e5b21ebfd9..7c3e4d749a1f 100644 --- a/runners/core-java/src/main/java/org/apache/beam/sdk/util/ReduceFnRunner.java +++ b/runners/core-java/src/main/java/org/apache/beam/sdk/util/ReduceFnRunner.java @@ -33,7 +33,6 @@ import javax.annotation.Nullable; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.Aggregator; -import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.windowing.AfterWatermark; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.GlobalWindow; @@ -177,8 +176,8 @@ public class ReduceFnRunner { * Store the previously emitted pane (if any) for each window. * *
    - *
  • State: The previous {@link PaneInfo} passed to the user's {@link OldDoFn#processElement}, - * if any. + *
  • State: The previous {@link PaneInfo} passed to the user's {@code DoFn.ProcessElement} + * method, if any. *
  • Style style: DIRECT *
  • Merging: Always keyed by actual window, so does not depend on {@link #activeWindows}. * Cleared when window is merged away. diff --git a/runners/core-java/src/main/java/org/apache/beam/sdk/util/SimpleDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/sdk/util/SimpleDoFnRunner.java index 6c1cf451d61a..1ebe5a874e74 100644 --- a/runners/core-java/src/main/java/org/apache/beam/sdk/util/SimpleDoFnRunner.java +++ b/runners/core-java/src/main/java/org/apache/beam/sdk/util/SimpleDoFnRunner.java @@ -28,8 +28,8 @@ /** * Runs a {@link OldDoFn} by constructing the appropriate contexts and passing them in. * - * @param the type of the OldDoFn's (main) input elements - * @param the type of the OldDoFn's (main) output elements + * @param the type of the {@link OldDoFn} (main) input elements + * @param the type of the {@link OldDoFn} (main) output elements */ public class SimpleDoFnRunner extends DoFnRunnerBase{ diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ImmutabilityCheckingBundleFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ImmutabilityCheckingBundleFactory.java index d5c0f0c354e6..71bd8b4e5005 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ImmutabilityCheckingBundleFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ImmutabilityCheckingBundleFactory.java @@ -25,7 +25,7 @@ import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.CoderException; -import org.apache.beam.sdk.transforms.OldDoFn; +import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.util.IllegalMutationException; import org.apache.beam.sdk.util.MutationDetector; import org.apache.beam.sdk.util.MutationDetectors; @@ -40,7 +40,7 @@ * elements added to the bundle will be encoded by the {@link Coder} of the underlying * {@link PCollection}. * - *

    This catches errors during the execution of a {@link OldDoFn} caused by modifying an element + *

    This catches errors during the execution of a {@link DoFn} caused by modifying an element * after it is added to an output {@link PCollection}. */ class ImmutabilityCheckingBundleFactory implements BundleFactory { diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorFactory.java index e9fa06b107f7..ecf2da8f67a8 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorFactory.java @@ -22,7 +22,6 @@ import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.io.Read; import org.apache.beam.sdk.transforms.AppliedPTransform; -import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.PTransform; /** @@ -37,7 +36,7 @@ public interface TransformEvaluatorFactory { * Create a new {@link TransformEvaluator} for the application of the {@link PTransform}. * *

    Any work that must be done before input elements are processed (such as calling - * {@link OldDoFn#startBundle(OldDoFn.Context)}) must be done before the + * {@code DoFn.StartBundle}) must be done before the * {@link TransformEvaluator} is made available to the caller. * *

    May return null if the application cannot produce an evaluator (for example, it is a diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DoFnInfo.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DoFnInfo.java index 139db9da0ab9..949c38148993 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DoFnInfo.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DoFnInfo.java @@ -24,10 +24,10 @@ import org.apache.beam.sdk.values.PCollectionView; /** - * Wrapper class holding the necessary information to serialize a OldDoFn. + * Wrapper class holding the necessary information to serialize a {@link OldDoFn}. * - * @param the type of the (main) input elements of the OldDoFn - * @param the type of the (main) output elements of the OldDoFn + * @param the type of the (main) input elements of the {@link OldDoFn} + * @param the type of the (main) output elements of the {@link OldDoFn} */ public class DoFnInfo implements Serializable { private final OldDoFn doFn; @@ -66,3 +66,4 @@ public Coder getInputCoder() { return inputCoder; } } + diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/MultiOutputWordCountTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/MultiOutputWordCountTest.java index 517596aab782..acfa3dfae2b8 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/MultiOutputWordCountTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/MultiOutputWordCountTest.java @@ -100,7 +100,7 @@ public void testRun() throws Exception { } /** - * A OldDoFn that tokenizes lines of text into individual words. + * A {@link DoFn} that tokenizes lines of text into individual words. */ static class ExtractWordsFn extends DoFn { diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SerializationTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SerializationTest.java index 0e9121ca87f9..22a40cd29e2c 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SerializationTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SerializationTest.java @@ -142,7 +142,7 @@ public void testRun() throws Exception { } /** - * A OldDoFn that tokenizes lines of text into individual words. + * A {@link DoFn} that tokenizes lines of text into individual words. */ static class ExtractWordsFn extends DoFn { private static final Pattern WORD_BOUNDARY = Pattern.compile("[^a-zA-Z']+"); @@ -170,7 +170,7 @@ public void processElement(ProcessContext c) { } /** - * A OldDoFn that converts a Word and Count into a printable string. + * A {@link DoFn} that converts a Word and Count into a printable string. */ private static class FormatCountsFn extends DoFn, StringHolder> { @ProcessElement diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/AggregatorValues.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/AggregatorValues.java index 6297085319e2..1fd034a02cc0 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/AggregatorValues.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/AggregatorValues.java @@ -21,11 +21,11 @@ import java.util.Map; import org.apache.beam.sdk.transforms.Aggregator; import org.apache.beam.sdk.transforms.Combine.CombineFn; -import org.apache.beam.sdk.transforms.OldDoFn; +import org.apache.beam.sdk.transforms.DoFn; /** * A collection of values associated with an {@link Aggregator}. Aggregators declared in a - * {@link OldDoFn} are emitted on a per-{@code OldDoFn}-application basis. + * {@link DoFn} are emitted on a per-{@link DoFn}-application basis. * * @param the output type of the aggregator */ diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Aggregator.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Aggregator.java index db4ab33ff1b0..67d399fbc820 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Aggregator.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Aggregator.java @@ -25,8 +25,8 @@ * to be combined across all bundles. * *

    Aggregators are created by calling - * {@link OldDoFn#createAggregator OldDoFn.createAggregatorForDoFn}, - * typically from the {@link OldDoFn} constructor. Elements can be added to the + * {@link DoFn#createAggregator DoFn.createAggregatorForDoFn}, + * typically from the {@link DoFn} constructor. Elements can be added to the * {@code Aggregator} by calling {@link Aggregator#addValue}. * *

    Aggregators are visible in the monitoring UI, when the pipeline is run @@ -37,14 +37,14 @@ * *

    Example: *

     {@code
    - * class MyDoFn extends OldDoFn {
    + * class MyDoFn extends DoFn {
      *   private Aggregator myAggregator;
      *
      *   public MyDoFn() {
      *     myAggregator = createAggregatorForDoFn("myAggregator", new Sum.SumIntegerFn());
      *   }
      *
    - *   @Override
    + *   @ProcessElement
      *   public void processElement(ProcessContext c) {
      *     myAggregator.addValue(1);
      *   }
    @@ -79,8 +79,8 @@ interface AggregatorFactory {
         /**
          * Create an aggregator with the given {@code name} and {@link CombineFn}.
          *
    -     *  

    This method is called to create an aggregator for a {@link OldDoFn}. It receives the - * class of the {@link OldDoFn} being executed and the context of the step it is being + *

    This method is called to create an aggregator for a {@link DoFn}. It receives the + * class of the {@link DoFn} being executed and the context of the step it is being * executed in. */ Aggregator createAggregatorForDoFn( @@ -90,7 +90,7 @@ Aggregator createAggregatorForDoFn( // TODO: Consider the following additional API conveniences: // - In addition to createAggregatorForDoFn(), consider adding getAggregator() to - // avoid the need to store the aggregator locally in a OldDoFn, i.e., create + // avoid the need to store the aggregator locally in a DoFn, i.e., create // if not already present. // - Add a shortcut for the most common aggregator: // c.createAggregatorForDoFn("name", new Sum.SumIntegerFn()). diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineFns.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineFns.java index 9fa8ded6b240..6f059932f5f4 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineFns.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineFns.java @@ -67,7 +67,7 @@ public class CombineFns { *

    The same {@link TupleTag} cannot be used in a composition multiple times. * *

    Example: - *

    { @code
    +   * 
    
        * PCollection> latencies = ...;
        *
        * TupleTag maxLatencyTag = new TupleTag();
    @@ -75,7 +75,7 @@ public class CombineFns {
        *
        * SimpleFunction identityFn =
        *     new SimpleFunction() {
    -   *       @Override
    +   *      {@literal @}Override
        *       public Integer apply(Integer input) {
        *           return input;
        *       }};
    @@ -87,8 +87,8 @@ public class CombineFns {
        *
        * PCollection finalResultCollection = maxAndMean
        *     .apply(ParDo.of(
    -   *         new OldDoFn, T>() {
    -   *           @Override
    +   *         new DoFn, T>() {
    +   *          {@literal @}ProcessElement
        *           public void processElement(ProcessContext c) throws Exception {
        *             KV e = c.element();
        *             Integer maxLatency = e.getValue().get(maxLatencyTag);
    @@ -97,7 +97,7 @@ public class CombineFns {
        *             c.output(...some T...);
        *           }
        *         }));
    -   * } 
    + *
    */ public static ComposeKeyedCombineFnBuilder composeKeyed() { return new ComposeKeyedCombineFnBuilder(); @@ -110,7 +110,7 @@ public static ComposeKeyedCombineFnBuilder composeKeyed() { *

    The same {@link TupleTag} cannot be used in a composition multiple times. * *

    Example: - *

    { @code
    +   * 
    
        * PCollection globalLatencies = ...;
        *
        * TupleTag maxLatencyTag = new TupleTag();
    @@ -130,8 +130,8 @@ public static ComposeKeyedCombineFnBuilder composeKeyed() {
        *
        * PCollection finalResultCollection = maxAndMean
        *     .apply(ParDo.of(
    -   *         new OldDoFn() {
    -   *           @Override
    +   *         new DoFn() {
    +   *          {@literal @}ProcessElement
        *           public void processElement(ProcessContext c) throws Exception {
        *             CoCombineResult e = c.element();
        *             Integer maxLatency = e.get(maxLatencyTag);
    @@ -140,7 +140,7 @@ public static ComposeKeyedCombineFnBuilder composeKeyed() {
        *             c.output(...some T...);
        *           }
        *         }));
    -   * } 
    + *
    */ public static ComposeCombineFnBuilder compose() { return new ComposeCombineFnBuilder(); diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java index 9f898261c7bb..59c832323686 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java @@ -63,8 +63,6 @@ * that satisfies the requirements described there. See the {@link ProcessElement} * for details. * - *

    This functionality is experimental and likely to change. - * *

    Example usage: * *

     {@code
    @@ -123,7 +121,7 @@ public abstract class Context {
          *
          * 

    If invoked from {@link ProcessElement}), the timestamp * must not be older than the input element's timestamp minus - * {@link OldDoFn#getAllowedTimestampSkew}. The output element will + * {@link DoFn#getAllowedTimestampSkew}. The output element will * be in the same windows as the input element. * *

    If invoked from {@link StartBundle} or {@link FinishBundle}, @@ -172,7 +170,7 @@ public abstract class Context { * *

    If invoked from {@link ProcessElement}), the timestamp * must not be older than the input element's timestamp minus - * {@link OldDoFn#getAllowedTimestampSkew}. The output element will + * {@link DoFn#getAllowedTimestampSkew}. The output element will * be in the same windows as the input element. * *

    If invoked from {@link StartBundle} or {@link FinishBundle}, @@ -190,7 +188,7 @@ public abstract void sideOutputWithTimestamp( } /** - * Information accessible when running {@link OldDoFn#processElement}. + * Information accessible when running a {@link DoFn.ProcessElement} method. */ public abstract class ProcessContext extends Context { @@ -359,9 +357,14 @@ public OutputReceiver outputReceiver() { * Annotation for the method to use to prepare an instance for processing a batch of elements. * The method annotated with this must satisfy the following constraints: *

      - *
    • It must have at least one argument. + *
    • It must have exactly one argument. *
    • Its first (and only) argument must be a {@link DoFn.Context}. *
    + * + *

    A simple method declaration would look like: + * + * public void setup(DoFn.Context c) { .. } + * */ @Documented @Retention(RetentionPolicy.RUNTIME) @@ -414,13 +417,13 @@ public OutputReceiver outputReceiver() { /** * Returns an {@link Aggregator} with aggregation logic specified by the * {@link CombineFn} argument. The name provided must be unique across - * {@link Aggregator}s created within the OldDoFn. Aggregators can only be created + * {@link Aggregator}s created within the {@link DoFn}. Aggregators can only be created * during pipeline construction. * * @param name the name of the aggregator * @param combiner the {@link CombineFn} to use in the aggregator * @return an aggregator for the provided name and combiner in the scope of - * this OldDoFn + * this {@link DoFn} * @throws NullPointerException if the name or combiner is null * @throws IllegalArgumentException if the given name collides with another * aggregator in this scope @@ -447,13 +450,13 @@ public OutputReceiver outputReceiver() { /** * Returns an {@link Aggregator} with the aggregation logic specified by the * {@link SerializableFunction} argument. The name provided must be unique - * across {@link Aggregator}s created within the OldDoFn. Aggregators can only be + * across {@link Aggregator}s created within the {@link DoFn}. Aggregators can only be * created during pipeline construction. * * @param name the name of the aggregator * @param combiner the {@link SerializableFunction} to use in the aggregator * @return an aggregator for the provided name and combiner in the scope of - * this OldDoFn + * this {@link DoFn} * @throws NullPointerException if the name or combiner is null * @throws IllegalArgumentException if the given name collides with another * aggregator in this scope diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java index 82c12938fa68..6801768c1096 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java @@ -50,12 +50,12 @@ import org.joda.time.Instant; /** - * A harness for unit-testing a {@link OldDoFn}. + * A harness for unit-testing a {@link DoFn}. * *

    For example: * *

     {@code
    - * OldDoFn fn = ...;
    + * DoFn fn = ...;
      *
      * DoFnTester fnTester = DoFnTester.of(fn);
      *
    @@ -72,17 +72,17 @@
      * Assert.assertThat(fnTester.processBundle(i1, i2, ...), Matchers.hasItems(...));
      * } 
    * - * @param the type of the {@code OldDoFn}'s (main) input elements - * @param the type of the {@code OldDoFn}'s (main) output elements + * @param the type of the {@link DoFn}'s (main) input elements + * @param the type of the {@link DoFn}'s (main) output elements */ public class DoFnTester { /** * Returns a {@code DoFnTester} supporting unit-testing of the given - * {@link OldDoFn}. + * {@link DoFn}. */ @SuppressWarnings("unchecked") - public static DoFnTester of(OldDoFn fn) { - return new DoFnTester(fn); + public static DoFnTester of(DoFn fn) { + return new DoFnTester<>(DoFnAdapters.toOldDoFn(fn)); } /** @@ -90,19 +90,19 @@ public static DoFnTester of(OldDoFn DoFnTester - of(DoFn fn) { - return new DoFnTester(DoFnAdapters.toOldDoFn(fn)); + public static DoFnTester + of(OldDoFn fn) { + return new DoFnTester<>(fn); } /** * Registers the tuple of values of the side input {@link PCollectionView}s to - * pass to the {@link OldDoFn} under test. + * pass to the {@link DoFn} under test. * *

    Resets the state of this {@link DoFnTester}. * *

    If this isn't called, {@code DoFnTester} assumes the - * {@link OldDoFn} takes no side inputs. + * {@link DoFn} takes no side inputs. */ public void setSideInputs(Map, Map> sideInputs) { this.sideInputs = sideInputs; @@ -110,7 +110,7 @@ public void setSideInputs(Map, Map> sideInp } /** - * Registers the values of a side input {@link PCollectionView} to pass to the {@link OldDoFn} + * Registers the values of a side input {@link PCollectionView} to pass to the {@link DoFn} * under test. * *

    The provided value is the final value of the side input in the specified window, not @@ -129,7 +129,7 @@ public void setSideInput(PCollectionView sideInput, BoundedWindow window, } /** - * Whether or not a {@link DoFnTester} should clone the {@link OldDoFn} under test. + * Whether or not a {@link DoFnTester} should clone the {@link DoFn} under test. */ public enum CloningBehavior { CLONE, @@ -137,14 +137,14 @@ public enum CloningBehavior { } /** - * Instruct this {@link DoFnTester} whether or not to clone the {@link OldDoFn} under test. + * Instruct this {@link DoFnTester} whether or not to clone the {@link DoFn} under test. */ public void setCloningBehavior(CloningBehavior newValue) { this.cloningBehavior = newValue; } /** - * Indicates whether this {@link DoFnTester} will clone the {@link OldDoFn} under test. + * Indicates whether this {@link DoFnTester} will clone the {@link DoFn} under test. */ public CloningBehavior getCloningBehavior() { return cloningBehavior; @@ -166,7 +166,7 @@ public List processBundle(Iterable inputElements) th } /** - * A convenience method for testing {@link OldDoFn DoFns} with bundles of elements. + * A convenience method for testing {@link DoFn DoFns} with bundles of elements. * Logic proceeds as follows: * *

      @@ -182,9 +182,9 @@ public final List processBundle(InputT... inputElements) throws Excepti } /** - * Calls {@link OldDoFn#startBundle} on the {@code OldDoFn} under test. + * Calls the {@link DoFn.StartBundle} method on the {@link DoFn} under test. * - *

      If needed, first creates a fresh instance of the OldDoFn under test. + *

      If needed, first creates a fresh instance of the {@link DoFn} under test. */ public void startBundle() throws Exception { resetState(); @@ -210,14 +210,14 @@ private static void unwrapUserCodeException(UserCodeException e) throws Exceptio } /** - * Calls {@link OldDoFn#processElement} on the {@code OldDoFn} under test, in a - * context where {@link OldDoFn.ProcessContext#element} returns the + * Calls the {@link DoFn.ProcessElement} method on the {@link DoFn} under test, in a + * context where {@link DoFn.ProcessContext#element} returns the * given element. * *

      Will call {@link #startBundle} automatically, if it hasn't * already been called. * - * @throws IllegalStateException if the {@code OldDoFn} under test has already + * @throws IllegalStateException if the {@code DoFn} under test has already * been finished */ public void processElement(InputT element) throws Exception { @@ -235,12 +235,12 @@ public void processElement(InputT element) throws Exception { } /** - * Calls {@link OldDoFn#finishBundle} of the {@code OldDoFn} under test. + * Calls the {@link DoFn.FinishBundle} method of the {@link DoFn} under test. * *

      Will call {@link #startBundle} automatically, if it hasn't * already been called. * - * @throws IllegalStateException if the {@code OldDoFn} under test has already + * @throws IllegalStateException if the {@link DoFn} under test has already * been finished */ public void finishBundle() throws Exception { @@ -674,7 +674,7 @@ protected Aggregator createAggreg ///////////////////////////////////////////////////////////////////////////// - /** The possible states of processing a OldDoFn. */ + /** The possible states of processing a {@link DoFn}. */ enum State { UNSTARTED, STARTED, @@ -683,23 +683,23 @@ enum State { private final PipelineOptions options = PipelineOptionsFactory.create(); - /** The original OldDoFn under test. */ + /** The original {@link OldDoFn} under test. */ private final OldDoFn origFn; /** - * Whether to clone the original {@link OldDoFn} or just use it as-is. + * Whether to clone the original {@link DoFn} or just use it as-is. * - *

      Worker-side {@link OldDoFn DoFns} may not be serializable, and are not required to be. + *

      Worker-side {@link DoFn DoFns} may not be serializable, and are not required to be. */ private CloningBehavior cloningBehavior = CloningBehavior.CLONE; - /** The side input values to provide to the OldDoFn under test. */ + /** The side input values to provide to the {@link DoFn} under test. */ private Map, Map> sideInputs = new HashMap<>(); private Map accumulators; - /** The output tags used by the OldDoFn under test. */ + /** The output tags used by the {@link DoFn} under test. */ private TupleTag mainOutputTag = new TupleTag<>(); /** The original OldDoFn under test, if started. */ @@ -708,7 +708,7 @@ enum State { /** The ListOutputManager to examine the outputs. */ private Map, List>> outputs; - /** The state of processing of the OldDoFn under test. */ + /** The state of processing of the {@link DoFn} under test. */ private State state; private DoFnTester(OldDoFn origFn) { diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByKey.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByKey.java index ed7f411aec11..3a3da65e077d 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByKey.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByKey.java @@ -63,18 +63,19 @@ * {@code Coder} of the values of the input. * *

      Example of use: - *

       {@code
      + * 
      
        * PCollection> urlDocPairs = ...;
        * PCollection>> urlToDocs =
        *     urlDocPairs.apply(GroupByKey.create());
        * PCollection results =
      - *     urlToDocs.apply(ParDo.of(new OldDoFn>, R>() {
      + *     urlToDocs.apply(ParDo.of(new DoFn>, R>() {
      + *      {@literal @}ProcessElement
        *       public void processElement(ProcessContext c) {
        *         String url = c.element().getKey();
        *         Iterable docsWithThatUrl = c.element().getValue();
        *         ... process all docs having that url ...
        *       }}));
      - * } 
      + *
      * *

      {@code GroupByKey} is a key primitive in data-parallel * processing, since it is the main way to efficiently bring diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/PTransform.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/PTransform.java index 19abef90cea1..4a58141ab503 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/PTransform.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/PTransform.java @@ -146,7 +146,7 @@ * implementing {@code Serializable}. * *

      {@code PTransform} is marked {@code Serializable} solely - * because it is common for an anonymous {@code OldDoFn}, + * because it is common for an anonymous {@link DoFn}, * instance to be created within an * {@code apply()} method of a composite {@code PTransform}. * diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java index 5efbe9f19024..f9cb557c1e5d 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java @@ -22,7 +22,6 @@ import java.util.Arrays; import java.util.List; import org.apache.beam.sdk.Pipeline; -import org.apache.beam.sdk.annotations.Experimental; import org.apache.beam.sdk.coders.CannotProvideCoderException; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.runners.PipelineRunner; @@ -50,13 +49,12 @@ *

      The {@link ParDo} processing style is similar to what happens inside * the "Mapper" or "Reducer" class of a MapReduce-style algorithm. * - *

      {@link OldDoFn DoFns}

      + *

      {@link DoFn DoFns}

      * *

      The function to use to process each element is specified by a - * {@link OldDoFn OldDoFn<InputT, OutputT>}, primarily via its - * {@link OldDoFn#processElement processElement} method. The {@link OldDoFn} may also - * override the default implementations of {@link OldDoFn#startBundle startBundle} - * and {@link OldDoFn#finishBundle finishBundle}. + * {@link DoFn DoFn<InputT, OutputT>}, primarily via its + * {@link DoFn.ProcessElement ProcessElement} method. The {@link DoFn} may also + * provide a {@link DoFn.StartBundle StartBundle} and {@link DoFn.FinishBundle finishBundle} method. * *

      Conceptually, when a {@link ParDo} transform is executed, the * elements of the input {@link PCollection} are first divided up @@ -66,39 +64,38 @@ * *

        *
      1. If required, a fresh instance of the argument {@link DoFn} is created - * on a worker, and {@link DoFn#setup()} is called on this instance. This may be through - * deserialization or other means. A {@link PipelineRunner} may reuse {@link DoFn} instances for - * multiple bundles. A {@link DoFn} that has terminated abnormally (by throwing an + * on a worker, and the {@link DoFn.Setup} method is called on this instance. This may be + * through deserialization or other means. A {@link PipelineRunner} may reuse {@link DoFn} + * instances for multiple bundles. A {@link DoFn} that has terminated abnormally (by throwing an * {@link Exception}) will never be reused.
      2. - *
      3. The {@link OldDoFn OldDoFn's} {@link OldDoFn#startBundle} method is called to - * initialize it. If this method is not overridden, the call may be optimized - * away.
      4. - *
      5. The {@link OldDoFn OldDoFn's} {@link OldDoFn#processElement} method + *
      6. The {@link DoFn DoFn's} {@link DoFn.StartBundle} method, if provided, is called to + * initialize it.
      7. + *
      8. The {@link DoFn DoFn's} {@link DoFn.ProcessElement} method * is called on each of the input elements in the bundle.
      9. - *
      10. The {@link OldDoFn OldDoFn's} {@link OldDoFn#finishBundle} method is called - * to complete its work. After {@link OldDoFn#finishBundle} is called, the - * framework will not again invoke {@link OldDoFn#processElement} or - * {@link OldDoFn#finishBundle} - * until a new call to {@link OldDoFn#startBundle} has occurred. - * If this method is not overridden, this call may be optimized away.
      11. - *
      12. If any of {@link DoFn#setup}, {@link DoFn#startBundle}, {@link DoFn#processElement} or - * {@link DoFn#finishBundle} throw an exception, {@link DoFn#teardown} will be called on the - * {@link DoFn} instance.
      13. - *
      14. If a runner will no longer use a {@link DoFn}, {@link DoFn#teardown()} will be called on - * the discarded instance.
      15. + *
      16. The {@link DoFn DoFn's} {@link DoFn.FinishBundle} method, if provided, is called + * to complete its work. After {@link DoFn.FinishBundle} is called, the + * framework will not again invoke {@link DoFn.ProcessElement} or + * {@link DoFn.FinishBundle} + * until a new call to {@link DoFn.StartBundle} has occurred.
      17. + *
      18. If any of {@link DoFn.Setup}, {@link DoFn.StartBundle}, {@link DoFn.ProcessElement} or + * {@link DoFn.FinishBundle} methods throw an exception, the {@link DoFn.Teardown} method, if + * provided, will be called on the {@link DoFn} instance.
      19. + *
      20. If a runner will no longer use a {@link DoFn}, the {@link DoFn.Teardown} method, if + * provided, will be called on the discarded instance.
      21. *
      * - * Each of the calls to any of the {@link OldDoFn OldDoFn's} processing + * Each of the calls to any of the {@link DoFn DoFn's} processing * methods can produce zero or more output elements. All of the - * of output elements from all of the {@link OldDoFn} instances + * of output elements from all of the {@link DoFn} instances * are included in the output {@link PCollection}. * *

      For example: * - *

       {@code
      + * 
      
        * PCollection lines = ...;
        * PCollection words =
      - *     lines.apply(ParDo.of(new OldDoFn() {
      + *     lines.apply(ParDo.of(new DoFn() {
      + *        {@literal @}ProcessElement
        *         public void processElement(ProcessContext c) {
        *           String line = c.element();
        *           for (String word : line.split("[^a-zA-Z']+")) {
      @@ -106,13 +103,14 @@
        *           }
        *         }}));
        * PCollection wordLengths =
      - *     words.apply(ParDo.of(new OldDoFn() {
      + *     words.apply(ParDo.of(new DoFn() {
      + *        {@literal @}ProcessElement
        *         public void processElement(ProcessContext c) {
        *           String word = c.element();
        *           Integer length = word.length();
        *           c.output(length);
        *         }}));
      - * } 
      + *
      * *

      Each output element has the same timestamp and is in the same windows * as its corresponding input element, and the output {@code PCollection} @@ -131,9 +129,9 @@ * *

       {@code
        * PCollection words =
      - *     lines.apply("ExtractWords", ParDo.of(new OldDoFn() { ... }));
      + *     lines.apply("ExtractWords", ParDo.of(new DoFn() { ... }));
        * PCollection wordLengths =
      - *     words.apply("ComputeWordLengths", ParDo.of(new OldDoFn() { ... }));
      + *     words.apply("ComputeWordLengths", ParDo.of(new DoFn() { ... }));
        * } 
      * *

      Side Inputs

      @@ -145,17 +143,18 @@ * {@link PCollection PCollections} computed by earlier pipeline operations, * passed in to the {@link ParDo} transform using * {@link #withSideInputs}, and their contents accessible to each of - * the {@link OldDoFn} operations via {@link OldDoFn.ProcessContext#sideInput sideInput}. + * the {@link DoFn} operations via {@link DoFn.ProcessContext#sideInput sideInput}. * For example: * - *
       {@code
      + * 
      
        * PCollection words = ...;
        * PCollection maxWordLengthCutOff = ...; // Singleton PCollection
        * final PCollectionView maxWordLengthCutOffView =
        *     maxWordLengthCutOff.apply(View.asSingleton());
        * PCollection wordsBelowCutOff =
        *     words.apply(ParDo.withSideInputs(maxWordLengthCutOffView)
      - *                      .of(new OldDoFn() {
      + *                      .of(new DoFn() {
      + *        {@literal @}ProcessElement
        *         public void processElement(ProcessContext c) {
        *           String word = c.element();
        *           int lengthCutOff = c.sideInput(maxWordLengthCutOffView);
      @@ -163,7 +162,7 @@
        *             c.output(word);
        *           }
        *         }}));
      - * } 
      + *
      * *

      Side Outputs

      * @@ -174,13 +173,13 @@ * and bundled in a {@link PCollectionTuple}. The {@link TupleTag TupleTags} * to be used for the output {@link PCollectionTuple} are specified by * invoking {@link #withOutputTags}. Unconsumed side outputs do not - * necessarily need to be explicitly specified, even if the {@link OldDoFn} - * generates them. Within the {@link OldDoFn}, an element is added to the + * necessarily need to be explicitly specified, even if the {@link DoFn} + * generates them. Within the {@link DoFn}, an element is added to the * main output {@link PCollection} as normal, using - * {@link OldDoFn.Context#output}, while an element is added to a side output - * {@link PCollection} using {@link OldDoFn.Context#sideOutput}. For example: + * {@link DoFn.Context#output}, while an element is added to a side output + * {@link PCollection} using {@link DoFn.Context#sideOutput}. For example: * - *
       {@code
      + * 
      
        * PCollection words = ...;
        * // Select words whose length is below a cut off,
        * // plus the lengths of words that are above the cut off.
      @@ -201,10 +200,11 @@
        *         .withOutputTags(wordsBelowCutOffTag,
        *                         TupleTagList.of(wordLengthsAboveCutOffTag)
        *                                     .and(markedWordsTag))
      - *         .of(new OldDoFn() {
      + *         .of(new DoFn() {
        *             // Create a tag for the unconsumed side output.
        *             final TupleTag specialWordsTag =
        *                 new TupleTag(){};
      + *            {@literal @}ProcessElement
        *             public void processElement(ProcessContext c) {
        *               String word = c.element();
        *               if (word.length() <= wordLengthCutOff) {
      @@ -230,14 +230,13 @@
        *     results.get(wordLengthsAboveCutOffTag);
        * PCollection markedWords =
        *     results.get(markedWordsTag);
      - * } 
      + *
      * *

      Properties May Be Specified In Any Order

      * *

      Several properties can be specified for a {@link ParDo} - * {@link PTransform}, including name, side inputs, side output tags, - * and {@link OldDoFn} to invoke. Only the {@link OldDoFn} is required; the - * name is encouraged but not required, and side inputs and side + * {@link PTransform}, including side inputs, side output tags, + * and {@link DoFn} to invoke. Only the {@link DoFn} is required; side inputs and side * output tags are only specified when they're needed. These * properties can be specified in any order, as long as they're * specified before the {@link ParDo} {@link PTransform} is applied. @@ -250,23 +249,23 @@ * {@link ParDo.Bound} nested classes, each of which offer * property setter instance methods to enable setting additional * properties. {@link ParDo.Bound} is used for {@link ParDo} - * transforms whose {@link OldDoFn} is specified and whose input and + * transforms whose {@link DoFn} is specified and whose input and * output static types have been bound. {@link ParDo.Unbound ParDo.Unbound} is used * for {@link ParDo} transforms that have not yet had their - * {@link OldDoFn} specified. Only {@link ParDo.Bound} instances can be + * {@link DoFn} specified. Only {@link ParDo.Bound} instances can be * applied. * *

      Another benefit of this approach is that it reduces the number * of type parameters that need to be specified manually. In * particular, the input and output types of the {@link ParDo} * {@link PTransform} are inferred automatically from the type - * parameters of the {@link OldDoFn} argument passed to {@link ParDo#of}. + * parameters of the {@link DoFn} argument passed to {@link ParDo#of}. * *

      Output Coders

      * *

      By default, the {@link Coder Coder<OutputT>} for the * elements of the main output {@link PCollection PCollection<OutputT>} is - * inferred from the concrete type of the {@link OldDoFn OldDoFn<InputT, OutputT>}. + * inferred from the concrete type of the {@link DoFn DoFn<InputT, OutputT>}. * *

      By default, the {@link Coder Coder<SideOutputT>} for the elements of * a side output {@link PCollection PCollection<SideOutputT>} is inferred @@ -286,74 +285,74 @@ * This style of {@code TupleTag} instantiation is used in the example of * multiple side outputs, above. * - *

      Serializability of {@link OldDoFn DoFns}

      + *

      Serializability of {@link DoFn DoFns}

      * - *

      A {@link OldDoFn} passed to a {@link ParDo} transform must be - * {@link Serializable}. This allows the {@link OldDoFn} instance + *

      A {@link DoFn} passed to a {@link ParDo} transform must be + * {@link Serializable}. This allows the {@link DoFn} instance * created in this "main program" to be sent (in serialized form) to * remote worker machines and reconstituted for bundles of elements - * of the input {@link PCollection} being processed. A {@link OldDoFn} + * of the input {@link PCollection} being processed. A {@link DoFn} * can have instance variable state, and non-transient instance * variable state will be serialized in the main program and then * deserialized on remote worker machines for some number of bundles * of elements to process. * - *

      {@link OldDoFn DoFns} expressed as anonymous inner classes can be + *

      {@link DoFn DoFns} expressed as anonymous inner classes can be * convenient, but due to a quirk in Java's rules for serializability, * non-static inner or nested classes (including anonymous inner * classes) automatically capture their enclosing class's instance in * their serialized state. This can lead to including much more than - * intended in the serialized state of a {@link OldDoFn}, or even things + * intended in the serialized state of a {@link DoFn}, or even things * that aren't {@link Serializable}. * *

      There are two ways to avoid unintended serialized state in a - * {@link OldDoFn}: + * {@link DoFn}: * *

        * - *
      • Define the {@link OldDoFn} as a named, static class. + *
      • Define the {@link DoFn} as a named, static class. * - *
      • Define the {@link OldDoFn} as an anonymous inner class inside of + *
      • Define the {@link DoFn} as an anonymous inner class inside of * a static method. * *
      * *

      Both of these approaches ensure that there is no implicit enclosing - * instance serialized along with the {@link OldDoFn} instance. + * instance serialized along with the {@link DoFn} instance. * *

      Prior to Java 8, any local variables of the enclosing * method referenced from within an anonymous inner class need to be - * marked as {@code final}. If defining the {@link OldDoFn} as a named + * marked as {@code final}. If defining the {@link DoFn} as a named * static class, such variables would be passed as explicit * constructor arguments and stored in explicit instance variables. * *

      There are three main ways to initialize the state of a - * {@link OldDoFn} instance processing a bundle: + * {@link DoFn} instance processing a bundle: * *

        * *
      • Define instance variable state (including implicit instance * variables holding final variables captured by an anonymous inner - * class), initialized by the {@link OldDoFn}'s constructor (which is + * class), initialized by the {@link DoFn}'s constructor (which is * implicit for an anonymous inner class). This state will be - * automatically serialized and then deserialized in the {@code OldDoFn} + * automatically serialized and then deserialized in the {@link DoFn} * instances created for bundles. This method is good for state - * known when the original {@code OldDoFn} is created in the main + * known when the original {@link DoFn} is created in the main * program, if it's not overly large. This is not suitable for any - * state which must only be used for a single bundle, as {@link OldDoFn OldDoFn's} + * state which must only be used for a single bundle, as {@link DoFn DoFn's} * may be used to process multiple bundles. * *
      • Compute the state as a singleton {@link PCollection} and pass it - * in as a side input to the {@link OldDoFn}. This is good if the state + * in as a side input to the {@link DoFn}. This is good if the state * needs to be computed by the pipeline, or if the state is very large * and so is best read from file(s) rather than sent as part of the - * {@code OldDoFn}'s serialized state. + * {@link DoFn DoFn's} serialized state. * - *
      • Initialize the state in each {@link OldDoFn} instance, in - * {@link OldDoFn#startBundle}. This is good if the initialization + *
      • Initialize the state in each {@link DoFn} instance, in a + * {@link DoFn.StartBundle} method. This is good if the initialization * doesn't depend on any information known only by the main program or * computed by earlier pipeline operations, but is the same for all - * instances of this {@link OldDoFn} for all program executions, say + * instances of this {@link DoFn} for all program executions, say * setting up empty caches or initializing constant data. * *
      @@ -363,16 +362,16 @@ *

      {@link ParDo} operations are intended to be able to run in * parallel across multiple worker machines. This precludes easy * sharing and updating mutable state across those machines. There is - * no support in the Google Cloud Dataflow system for communicating + * no support in the Beam model for communicating * and synchronizing updates to shared state across worker machines, * so programs should not access any mutable static variable state in - * their {@link OldDoFn}, without understanding that the Java processes + * their {@link DoFn}, without understanding that the Java processes * for the main program and workers will each have its own independent * copy of such state, and there won't be any automatic copying of * that state across Java processes. All information should be - * communicated to {@link OldDoFn} instances via main and side inputs and + * communicated to {@link DoFn} instances via main and side inputs and * serialized state, and all output should be communicated from a - * {@link OldDoFn} instance via main and side outputs, in the absence of + * {@link DoFn} instance via main and side outputs, in the absence of * external communication mechanisms written by user code. * *

      Fault Tolerance

      @@ -380,29 +379,28 @@ *

      In a distributed system, things can fail: machines can crash, * machines can be unable to communicate across the network, etc. * While individual failures are rare, the larger the job, the greater - * the chance that something, somewhere, will fail. The Google Cloud - * Dataflow service strives to mask such failures automatically, - * principally by retrying failed {@link OldDoFn} bundle. This means - * that a {@code OldDoFn} instance might process a bundle partially, then - * crash for some reason, then be rerun (often on a different worker - * machine) on that same bundle and on the same elements as before. - * Sometimes two or more {@link OldDoFn} instances will be running on the + * the chance that something, somewhere, will fail. Beam runners may strive + * to mask such failures by retrying failed {@link DoFn} bundle. This means + * that a {@link DoFn} instance might process a bundle partially, then + * crash for some reason, then be rerun (often in a new JVM) on that + * same bundle and on the same elements as before. + * Sometimes two or more {@link DoFn} instances will be running on the * same bundle simultaneously, with the system taking the results of * the first instance to complete successfully. Consequently, the - * code in a {@link OldDoFn} needs to be written such that these + * code in a {@link DoFn} needs to be written such that these * duplicate (sequential or concurrent) executions do not cause - * problems. If the outputs of a {@link OldDoFn} are a pure function of + * problems. If the outputs of a {@link DoFn} are a pure function of * its inputs, then this requirement is satisfied. However, if a - * {@link OldDoFn OldDoFn's} execution has external side-effects, such as performing - * updates to external HTTP services, then the {@link OldDoFn OldDoFn's} code + * {@link DoFn DoFn's} execution has external side-effects, such as performing + * updates to external HTTP services, then the {@link DoFn DoFn's} code * needs to take care to ensure that those updates are idempotent and * that concurrent updates are acceptable. This property can be * difficult to achieve, so it is advisable to strive to keep - * {@link OldDoFn DoFns} as pure functions as much as possible. + * {@link DoFn DoFns} as pure functions as much as possible. * *

      Optimization

      * - *

      The Google Cloud Dataflow service automatically optimizes a + *

      Beam runners may choose to apply optimizations to a * pipeline before it is executed. A key optimization, fusion, * relates to {@link ParDo} operations. If one {@link ParDo} operation produces a * {@link PCollection} that is then consumed as the main input of another @@ -419,18 +417,16 @@ * written to disk, saving all the I/O and space expense of * constructing it. * - *

      The Google Cloud Dataflow service applies fusion as much as - * possible, greatly reducing the cost of executing pipelines. As a - * result, it is essentially "free" to write {@link ParDo} operations in a + *

      When Beam runners apply fusion optimization, it is essentially "free" + * to write {@link ParDo} operations in a * very modular, composable style, each {@link ParDo} operation doing one * clear task, and stringing together sequences of {@link ParDo} operations to * get the desired overall effect. Such programs can be easier to * understand, easier to unit-test, easier to extend and evolve, and * easier to reuse in new programs. The predefined library of - * PTransforms that come with Google Cloud Dataflow makes heavy use of - * this modular, composable style, trusting to the Google Cloud - * Dataflow service's optimizer to "flatten out" all the compositions - * into highly optimized stages. + * PTransforms that come with Beam makes heavy use of + * this modular, composable style, trusting to the runner to + * "flatten out" all the compositions into highly optimized stages. * * @see the web * documentation for ParDo @@ -443,15 +439,15 @@ public class ParDo { * *

      Side inputs are {@link PCollectionView PCollectionViews}, whose contents are * computed during pipeline execution and then made accessible to - * {@link OldDoFn} code via {@link OldDoFn.ProcessContext#sideInput sideInput}. Each - * invocation of the {@link OldDoFn} receives the same values for these + * {@link DoFn} code via {@link DoFn.ProcessContext#sideInput sideInput}. Each + * invocation of the {@link DoFn} receives the same values for these * side inputs. * *

      See the discussion of Side Inputs above for more explanation. * *

      The resulting {@link PTransform} is incomplete, and its * input/output types are not yet bound. Use - * {@link ParDo.Unbound#of} to specify the {@link OldDoFn} to + * {@link ParDo.Unbound#of} to specify the {@link DoFn} to * invoke, which will also bind the input/output types of this * {@link PTransform}. */ @@ -464,13 +460,13 @@ public static Unbound withSideInputs(PCollectionView... sideInputs) { * *

      Side inputs are {@link PCollectionView}s, whose contents are * computed during pipeline execution and then made accessible to - * {@code OldDoFn} code via {@link OldDoFn.ProcessContext#sideInput sideInput}. + * {@link DoFn} code via {@link DoFn.ProcessContext#sideInput sideInput}. * *

      See the discussion of Side Inputs above for more explanation. * *

      The resulting {@link PTransform} is incomplete, and its * input/output types are not yet bound. Use - * {@link ParDo.Unbound#of} to specify the {@link OldDoFn} to + * {@link ParDo.Unbound#of} to specify the {@link DoFn} to * invoke, which will also bind the input/output types of this * {@link PTransform}. */ @@ -486,11 +482,11 @@ public static Unbound withSideInputs( * *

      {@link TupleTag TupleTags} are used to name (with its static element * type {@code T}) each main and side output {@code PCollection}. - * This {@link PTransform PTransform's} {@link OldDoFn} emits elements to the main + * This {@link PTransform PTransform's} {@link DoFn} emits elements to the main * output {@link PCollection} as normal, using - * {@link OldDoFn.Context#output}. The {@link OldDoFn} emits elements to + * {@link DoFn.Context#output}. The {@link DoFn} emits elements to * a side output {@code PCollection} using - * {@link OldDoFn.Context#sideOutput}, passing that side output's tag + * {@link DoFn.Context#sideOutput}, passing that side output's tag * as an argument. The result of invoking this {@link PTransform} * will be a {@link PCollectionTuple}, and any of the the main and * side output {@code PCollection}s can be retrieved from it via @@ -501,7 +497,7 @@ public static Unbound withSideInputs( * *

      The resulting {@link PTransform} is incomplete, and its input * type is not yet bound. Use {@link ParDo.UnboundMulti#of} - * to specify the {@link OldDoFn} to invoke, which will also bind the + * to specify the {@link DoFn} to invoke, which will also bind the * input type of this {@link PTransform}. */ public static UnboundMulti withOutputTags( @@ -510,6 +506,20 @@ public static UnboundMulti withOutputTags( return new Unbound().withOutputTags(mainOutputTag, sideOutputTags); } + /** + * Creates a {@link ParDo} {@link PTransform} that will invoke the + * given {@link DoFn} function. + * + *

      The resulting {@link PTransform PTransform's} types have been bound, with the + * input being a {@code PCollection} and the output a + * {@code PCollection}, inferred from the types of the argument + * {@code DoFn}. It is ready to be applied, or further + * properties can be set on it first. + */ + public static Bound of(DoFn fn) { + return of(adapt(fn), fn.getClass()); + } + /** * Creates a {@link ParDo} {@link PTransform} that will invoke the * given {@link OldDoFn} function. @@ -537,29 +547,11 @@ private static Bound of( return DoFnAdapters.toOldDoFn(fn); } - /** - * Creates a {@link ParDo} {@link PTransform} that will invoke the - * given {@link DoFn} function. - * - *

      The resulting {@link PTransform PTransform's} types have been bound, with the - * input being a {@code PCollection} and the output a - * {@code PCollection}, inferred from the types of the argument - * {@code OldDoFn}. It is ready to be applied, or further - * properties can be set on it first. - * - *

      {@link DoFn} is an experimental alternative to - * {@link OldDoFn} which simplifies accessing the window of the element. - */ - @Experimental - public static Bound of(DoFn fn) { - return of(adapt(fn), fn.getClass()); - } - /** * An incomplete {@link ParDo} transform, with unbound input/output types. * *

      Before being applied, {@link ParDo.Unbound#of} must be - * invoked to specify the {@link OldDoFn} to invoke, which will also + * invoked to specify the {@link DoFn} to invoke, which will also * bind the input/output types of this {@link PTransform}. */ public static class Unbound { @@ -619,6 +611,18 @@ public UnboundMulti withOutputTags(TupleTag mainOutp name, sideInputs, mainOutputTag, sideOutputTags); } + /** + * Returns a new {@link ParDo} {@link PTransform} that's like this + * transform but which will invoke the given {@link DoFn} + * function, and which has its input and output types bound. Does + * not modify this transform. The resulting {@link PTransform} is + * sufficiently specified to be applied, but more properties can + * still be specified. + */ + public Bound of(DoFn fn) { + return of(adapt(fn), fn.getClass()); + } + /** * Returns a new {@link ParDo} {@link PTransform} that's like this * transform but that will invoke the given {@link OldDoFn} @@ -638,24 +642,11 @@ private Bound of( OldDoFn fn, Class fnClass) { return new Bound<>(name, sideInputs, fn, fnClass); } - - - /** - * Returns a new {@link ParDo} {@link PTransform} that's like this - * transform but which will invoke the given {@link DoFn} - * function, and which has its input and output types bound. Does - * not modify this transform. The resulting {@link PTransform} is - * sufficiently specified to be applied, but more properties can - * still be specified. - */ - public Bound of(DoFn fn) { - return of(adapt(fn), fn.getClass()); - } } /** * A {@link PTransform} that, when applied to a {@code PCollection}, - * invokes a user-specified {@code OldDoFn} on all its elements, + * invokes a user-specified {@code DoFn} on all its elements, * with all its outputs collected into an output * {@code PCollection}. * @@ -756,9 +747,9 @@ protected String getKindString() { /** * {@inheritDoc} * - *

      {@link ParDo} registers its internal {@link OldDoFn} as a subcomponent for display data. - * {@link OldDoFn} implementations can register display data by overriding - * {@link OldDoFn#populateDisplayData}. + *

      {@link ParDo} registers its internal {@link DoFn} as a subcomponent for display data. + * {@link DoFn} implementations can register display data by overriding + * {@link DoFn#populateDisplayData}. */ @Override public void populateDisplayData(Builder builder) { @@ -780,7 +771,7 @@ public List> getSideInputs() { * input type. * *

      Before being applied, {@link ParDo.UnboundMulti#of} must be - * invoked to specify the {@link OldDoFn} to invoke, which will also + * invoked to specify the {@link DoFn} to invoke, which will also * bind the input type of this {@link PTransform}. * * @param the type of the main output {@code PCollection} elements @@ -836,38 +827,41 @@ public UnboundMulti withSideInputs( /** * Returns a new multi-output {@link ParDo} {@link PTransform} - * that's like this transform but that will invoke the given - * {@link OldDoFn} function, and that has its input type bound. + * that's like this transform but which will invoke the given + * {@link DoFn} function, and which has its input type bound. * Does not modify this transform. The resulting * {@link PTransform} is sufficiently specified to be applied, but * more properties can still be specified. */ - public BoundMulti of(OldDoFn fn) { - return of(fn, fn.getClass()); - } - - public BoundMulti of(OldDoFn fn, Class fnClass) { - return new BoundMulti<>( - name, sideInputs, mainOutputTag, sideOutputTags, fn, fnClass); + public BoundMulti of(DoFn fn) { + return of(adapt(fn), fn.getClass()); } /** * Returns a new multi-output {@link ParDo} {@link PTransform} - * that's like this transform but which will invoke the given - * {@link DoFn} function, and which has its input type bound. + * that's like this transform but that will invoke the given + * {@link OldDoFn} function, and that has its input type bound. * Does not modify this transform. The resulting * {@link PTransform} is sufficiently specified to be applied, but * more properties can still be specified. + * + * @deprecated please port your {@link OldDoFn} to a {@link DoFn} */ - public BoundMulti of(DoFn fn) { - return of(adapt(fn), fn.getClass()); + @Deprecated + public BoundMulti of(OldDoFn fn) { + return of(fn, fn.getClass()); + } + + private BoundMulti of(OldDoFn fn, Class fnClass) { + return new BoundMulti<>( + name, sideInputs, mainOutputTag, sideOutputTags, fn, fnClass); } } /** * A {@link PTransform} that, when applied to a * {@code PCollection}, invokes a user-specified - * {@code OldDoFn} on all its elements, which can emit elements + * {@code DoFn} on all its elements, which can emit elements * to any of the {@link PTransform}'s main and side output * {@code PCollection}s, which are bundled into a result * {@code PCollectionTuple}. @@ -939,7 +933,7 @@ public PCollectionTuple apply(PCollection input) { input.isBounded()); // The fn will likely be an instance of an anonymous subclass - // such as OldDoFn { }, thus will have a high-fidelity + // such as DoFn { }, thus will have a high-fidelity // TypeDescriptor for the output type. outputs.get(mainOutputTag).setTypeDescriptorInternal(fn.getOutputTypeDescriptor()); diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/SimpleFunction.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/SimpleFunction.java index bf075f8dcbd2..86046598fb4c 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/SimpleFunction.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/SimpleFunction.java @@ -37,7 +37,7 @@ SimpleFunction fromSerializableFunctionWithOutputType( /** * Returns a {@link TypeDescriptor} capturing what is known statically - * about the input type of this {@code OldDoFn} instance's most-derived + * about the input type of this {@link SimpleFunction} instance's most-derived * class. * *

      See {@link #getOutputTypeDescriptor} for more discussion. @@ -48,10 +48,10 @@ public TypeDescriptor getInputTypeDescriptor() { /** * Returns a {@link TypeDescriptor} capturing what is known statically - * about the output type of this {@code OldDoFn} instance's + * about the output type of this {@link SimpleFunction} instance's * most-derived class. * - *

      In the normal case of a concrete {@code OldDoFn} subclass with + *

      In the normal case of a concrete {@link SimpleFunction} subclass with * no generic type parameters of its own (including anonymous inner * classes), this will be a complete non-generic type, which is good * for choosing a default output {@code Coder} for the output diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/PaneInfo.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/PaneInfo.java index 0c87e2271ec9..727a4925cb31 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/PaneInfo.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/PaneInfo.java @@ -29,15 +29,15 @@ import org.apache.beam.sdk.coders.AtomicCoder; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.CoderException; +import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.GroupByKey; -import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.util.VarInt; /** * Provides information about the pane an element belongs to. Every pane is implicitly associated * with a window. Panes are observable only via the - * {@link OldDoFn.ProcessContext#pane} method of the context - * passed to a {@link OldDoFn#processElement} overridden method. + * {@link DoFn.ProcessContext#pane} method of the context + * passed to a {@link DoFn.ProcessElement} method. * *

      Note: This does not uniquely identify a pane, and should not be used for comparisons. */ @@ -72,8 +72,8 @@ public final class PaneInfo { * definitions: *

        *
      1. We'll call a pipeline 'simple' if it does not use - * {@link OldDoFn.Context#outputWithTimestamp} in - * any {@code OldDoFn}, and it uses the same + * {@link DoFn.Context#outputWithTimestamp} in + * any {@link DoFn}, and it uses the same * {@link org.apache.beam.sdk.transforms.windowing.Window.Bound#withAllowedLateness} * argument value on all windows (or uses the default of {@link org.joda.time.Duration#ZERO}). *
      2. We'll call an element 'locally late', from the point of view of a computation on a diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BaseExecutionContext.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BaseExecutionContext.java index dead76eb3d00..9ee55ad3a4f0 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BaseExecutionContext.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BaseExecutionContext.java @@ -106,7 +106,7 @@ public Collection getAllStepContexts() { /** * Hook for subclasses to implement that will be called whenever - * {@link OldDoFn.Context#output} + * {@code DoFn.Context#output} * is called. */ @Override @@ -114,7 +114,7 @@ public void noteOutput(WindowedValue output) {} /** * Hook for subclasses to implement that will be called whenever - * {@link OldDoFn.Context#sideOutput} + * {@code DoFn.Context#sideOutput} * is called. */ @Override diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ReifyTimestampAndWindowsDoFn.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ReifyTimestampAndWindowsDoFn.java index 2808ca90568e..8f3f540e66ae 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ReifyTimestampAndWindowsDoFn.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ReifyTimestampAndWindowsDoFn.java @@ -21,8 +21,8 @@ import org.apache.beam.sdk.values.KV; /** - * OldDoFn that makes timestamps and window assignments explicit in the value part of each key/value - * pair. + * {@link OldDoFn} that makes timestamps and window assignments explicit in the value part of each + * key/value pair. * * @param the type of the keys of the input and output {@code PCollection}s * @param the type of the values of the input {@code PCollection} diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/SerializableUtils.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/SerializableUtils.java index 354aa5d91182..6b3218ecf199 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/SerializableUtils.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/SerializableUtils.java @@ -102,7 +102,7 @@ public static T clone(T value) { */ public static CloudObject ensureSerializable(Coder coder) { // Make sure that Coders are java serializable as well since - // they are regularly captured within OldDoFn's. + // they are regularly captured within DoFn's. Coder copy = (Coder) ensureSerializable((Serializable) coder); CloudObject cloudObject = copy.asCloudObject(); diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/SystemDoFnInternal.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/SystemDoFnInternal.java index e9904b2d7660..004496b8319b 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/SystemDoFnInternal.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/SystemDoFnInternal.java @@ -22,15 +22,14 @@ import java.lang.annotation.Retention; import java.lang.annotation.RetentionPolicy; import java.lang.annotation.Target; -import org.apache.beam.sdk.transforms.OldDoFn; /** - * Annotation to mark {@link OldDoFn DoFns} as an internal component of the Dataflow SDK. + * Annotation to mark {@code DoFns} as an internal component of the Beam SDK. * *

        Currently, the only effect of this is to mark any aggregators reported by an annotated - * {@code OldDoFn} as a system counter (as opposed to a user counter). + * {@code DoFn} as a system counter (as opposed to a user counter). * - *

        This is internal to the Dataflow SDK. + *

        This is internal to the Beam SDK. */ @Documented @Retention(RetentionPolicy.RUNTIME) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowingInternals.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowingInternals.java index 54158d24e520..016276cb4bf3 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowingInternals.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowingInternals.java @@ -20,6 +20,7 @@ import java.io.IOException; import java.util.Collection; import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.util.state.StateInternals; @@ -28,7 +29,7 @@ import org.joda.time.Instant; /** - * Interface that may be required by some (internal) {@code OldDoFn}s to implement windowing. It + * Interface that may be required by some (internal) {@link DoFn}s to implement windowing. It * should not be necessary for general user code to interact with this at all. * *

        This interface should be provided by runner implementors to support windowing on their runner. diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnDelegatingAggregatorTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnDelegatingAggregatorTest.java index 25b909aabbaf..c072fd7c47db 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnDelegatingAggregatorTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnDelegatingAggregatorTest.java @@ -35,7 +35,7 @@ import org.mockito.MockitoAnnotations; /** - * Tests for OldDoFn.DelegatingAggregator. + * Tests for {@link OldDoFn.DelegatingAggregator}. */ @RunWith(JUnit4.class) public class DoFnDelegatingAggregatorTest { diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTesterTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTesterTest.java index 2f1519cc4276..2649be5aadf3 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTesterTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTesterTest.java @@ -276,7 +276,8 @@ public void processElement(ProcessContext c) throws Exception { } /** - * A OldDoFn that adds values to an aggregator and converts input to String in processElement. + * An {@link OldDoFn} that adds values to an aggregator and converts input to String in + * {@link OldDoFn#processElement). */ private static class CounterDoFn extends OldDoFn { Aggregator agg = createAggregator("ctr", new Sum.SumLongFn()); diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/NoOpOldDoFn.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/NoOpOldDoFn.java index c73251000650..302b66aa7c4e 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/NoOpOldDoFn.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/NoOpOldDoFn.java @@ -28,7 +28,7 @@ /** * A {@link OldDoFn} that does nothing with provided elements. Used for testing - * methods provided by the OldDoFn abstract class. + * methods provided by the {@link OldDoFn} abstract class. * * @param unused. * @param unused.