From 0b1865295cb89d88878d0a021df103ed45240924 Mon Sep 17 00:00:00 2001 From: Kenneth Knowles Date: Thu, 4 Aug 2016 14:54:56 -0700 Subject: [PATCH 1/8] Correctly determine if DoFn has an anonymous class in ParDo --- .../src/main/java/org/apache/beam/sdk/transforms/ParDo.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java index bb1af9c11a69d..91f6203c3ed33 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java @@ -958,7 +958,7 @@ public Coder getDefaultOutputCoder( @Override protected String getKindString() { Class clazz = DoFnReflector.getDoFnClass(fn); - if (fn.getClass().isAnonymousClass()) { + if (clazz.isAnonymousClass()) { return "AnonymousParMultiDo"; } else { return String.format("ParMultiDo(%s)", StringUtils.approximateSimpleName(clazz)); From a1c06d71876384722982ec24da1607e41af653d9 Mon Sep 17 00:00:00 2001 From: Kenneth Knowles Date: Thu, 4 Aug 2016 14:56:19 -0700 Subject: [PATCH 2/8] Propagate getAllowedTimestampSkew from DoFn to its adapter --- .../java/org/apache/beam/sdk/transforms/DoFnReflector.java | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnReflector.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnReflector.java index 9bdfde801d58e..c6168b35a2997 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnReflector.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnReflector.java @@ -72,6 +72,7 @@ import net.bytebuddy.jar.asm.MethodVisitor; import net.bytebuddy.jar.asm.Opcodes; import net.bytebuddy.matcher.ElementMatchers; +import org.joda.time.Duration; import org.joda.time.Instant; import java.io.IOException; @@ -730,6 +731,11 @@ protected TypeDescriptor getOutputTypeDescriptor() { return fn.getOutputTypeDescriptor(); } + @Override + public Duration getAllowedTimestampSkew() { + return fn.getAllowedTimestampSkew(); + } + @Override public void populateDisplayData(DisplayData.Builder builder) { builder.include(fn); From 620bd9949a6176ddd1903687fe9b8ba8c5822367 Mon Sep 17 00:00:00 2001 From: Kenneth Knowles Date: Wed, 3 Aug 2016 19:55:21 -0700 Subject: [PATCH 3/8] Port join library to new DoFn --- .../beam/sdk/extensions/joinlibrary/Join.java | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/sdks/java/extensions/join-library/src/main/java/org/apache/beam/sdk/extensions/joinlibrary/Join.java b/sdks/java/extensions/join-library/src/main/java/org/apache/beam/sdk/extensions/joinlibrary/Join.java index 88836f9d84cb1..f4e6ccbe6dfea 100644 --- a/sdks/java/extensions/join-library/src/main/java/org/apache/beam/sdk/extensions/joinlibrary/Join.java +++ b/sdks/java/extensions/join-library/src/main/java/org/apache/beam/sdk/extensions/joinlibrary/Join.java @@ -20,7 +20,7 @@ import static com.google.common.base.Preconditions.checkNotNull; import org.apache.beam.sdk.coders.KvCoder; -import org.apache.beam.sdk.transforms.OldDoFn; +import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.join.CoGbkResult; import org.apache.beam.sdk.transforms.join.CoGroupByKey; @@ -59,8 +59,8 @@ public static PCollection>> innerJoin( .apply(CoGroupByKey.create()); return coGbkResultCollection.apply(ParDo.of( - new OldDoFn, KV>>() { - @Override + new DoFn, KV>>() { + @ProcessElement public void processElement(ProcessContext c) { KV e = c.element(); @@ -108,8 +108,8 @@ public static PCollection>> leftOuterJoin( .apply(CoGroupByKey.create()); return coGbkResultCollection.apply(ParDo.of( - new OldDoFn, KV>>() { - @Override + new DoFn, KV>>() { + @ProcessElement public void processElement(ProcessContext c) { KV e = c.element(); @@ -161,8 +161,8 @@ public static PCollection>> rightOuterJoin( .apply(CoGroupByKey.create()); return coGbkResultCollection.apply(ParDo.of( - new OldDoFn, KV>>() { - @Override + new DoFn, KV>>() { + @ProcessElement public void processElement(ProcessContext c) { KV e = c.element(); From f5011e5c62cb00fb4d8a91bd7d55d5083789a307 Mon Sep 17 00:00:00 2001 From: Kenneth Knowles Date: Wed, 3 Aug 2016 19:56:33 -0700 Subject: [PATCH 4/8] Port mentions of OldDoFn in PipelineOptions --- .../org/apache/beam/sdk/options/PipelineOptions.java | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptions.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptions.java index 365f668f042e4..4595fc871d225 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptions.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptions.java @@ -22,8 +22,8 @@ import org.apache.beam.sdk.options.ProxyInvocationHandler.Deserializer; import org.apache.beam.sdk.options.ProxyInvocationHandler.Serializer; import org.apache.beam.sdk.runners.PipelineRunner; -import org.apache.beam.sdk.transforms.OldDoFn; -import org.apache.beam.sdk.transforms.OldDoFn.Context; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.DoFn.Context; import org.apache.beam.sdk.transforms.display.HasDisplayData; import com.google.auto.service.AutoService; @@ -35,7 +35,6 @@ import java.lang.reflect.Proxy; import java.util.ServiceLoader; - import javax.annotation.concurrent.ThreadSafe; /** @@ -52,7 +51,7 @@ * and {@link PipelineOptionsFactory#as(Class)}. They can be created * from command-line arguments with {@link PipelineOptionsFactory#fromArgs(String[])}. * They can be converted to another type by invoking {@link PipelineOptions#as(Class)} and - * can be accessed from within a {@link OldDoFn} by invoking + * can be accessed from within a {@link DoFn} by invoking * {@link Context#getPipelineOptions()}. * *

For example: @@ -151,7 +150,7 @@ * {@link PipelineOptionsFactory#withValidation()} is invoked. * *

{@link JsonIgnore @JsonIgnore} is used to prevent a property from being serialized and - * available during execution of {@link OldDoFn}. See the Serialization section below for more + * available during execution of {@link DoFn}. See the Serialization section below for more * details. * *

Registration Of PipelineOptions

From 1959ddbedb2ad61824bf28e1e9139cc677a2aaf5 Mon Sep 17 00:00:00 2001 From: Kenneth Knowles Date: Wed, 3 Aug 2016 20:15:12 -0700 Subject: [PATCH 5/8] Port easy Java SDK tests to new DoFn --- .../org/apache/beam/sdk/PipelineTest.java | 8 ++--- .../apache/beam/sdk/coders/AvroCoderTest.java | 6 ++-- .../beam/sdk/coders/CoderRegistryTest.java | 10 +++--- .../sdk/coders/SerializableCoderTest.java | 10 +++--- .../apache/beam/sdk/io/CountingInputTest.java | 6 ++-- .../beam/sdk/io/CountingSourceTest.java | 6 ++-- .../beam/sdk/io/PubsubUnboundedSinkTest.java | 6 ++-- .../sdk/transforms/ApproximateUniqueTest.java | 6 ++-- .../beam/sdk/transforms/CombineFnsTest.java | 4 +-- .../beam/sdk/transforms/CombineTest.java | 18 +++++----- .../beam/sdk/transforms/CreateTest.java | 4 +-- .../beam/sdk/transforms/FlattenTest.java | 8 ++--- .../beam/sdk/transforms/GroupByKeyTest.java | 8 ++--- .../sdk/transforms/WithTimestampsTest.java | 12 +++---- .../display/DisplayDataEvaluatorTest.java | 10 +++--- .../transforms/display/DisplayDataTest.java | 6 ++-- .../sdk/transforms/join/CoGroupByKeyTest.java | 34 +++++++++---------- .../sdk/transforms/windowing/WindowTest.java | 10 +++--- .../transforms/windowing/WindowingTest.java | 23 +++++++------ .../beam/sdk/values/PCollectionTupleTest.java | 6 ++-- .../beam/sdk/values/TypedPValueTest.java | 10 +++--- 21 files changed, 106 insertions(+), 105 deletions(-) diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/PipelineTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/PipelineTest.java index 5137031e4502e..8b8649994a069 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/PipelineTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/PipelineTest.java @@ -36,8 +36,8 @@ import org.apache.beam.sdk.testing.RunnableOnService; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.Flatten; -import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.util.UserCodeException; @@ -146,9 +146,9 @@ public void testMultipleApply() { private static PTransform, PCollection> addSuffix( final String suffix) { - return ParDo.of(new OldDoFn() { - @Override - public void processElement(OldDoFn.ProcessContext c) { + return ParDo.of(new DoFn() { + @ProcessElement + public void processElement(DoFn.ProcessContext c) { c.output(c.element() + suffix); } }); diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/AvroCoderTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/AvroCoderTest.java index 41d0932edcbbc..3b13e351300a8 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/AvroCoderTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/AvroCoderTest.java @@ -31,7 +31,7 @@ import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.Create; -import org.apache.beam.sdk.transforms.OldDoFn; +import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.util.CloudObject; import org.apache.beam.sdk.util.SerializableUtils; @@ -134,8 +134,8 @@ public String toString() { } } - private static class GetTextFn extends OldDoFn { - @Override + private static class GetTextFn extends DoFn { + @ProcessElement public void processElement(ProcessContext c) { c.output(c.element().text); } diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/CoderRegistryTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/CoderRegistryTest.java index 35ec6c62258b8..da1540585a840 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/CoderRegistryTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/CoderRegistryTest.java @@ -27,7 +27,7 @@ import org.apache.beam.sdk.testing.NeedsRunner; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.Create; -import org.apache.beam.sdk.transforms.OldDoFn; +import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.util.CloudObject; @@ -366,8 +366,8 @@ public void testDefaultCoderAnnotationGeneric() throws Exception { private static class PTransformOutputingMySerializableGeneric extends PTransform, PCollection>>> { - private class OutputDoFn extends OldDoFn>> { - @Override + private class OutputDoFn extends DoFn>> { + @ProcessElement public void processElement(ProcessContext c) { } } @@ -430,8 +430,8 @@ private static class GenericOutputMySerializedGeneric PCollection, PCollection>>> { - private class OutputDoFn extends OldDoFn>> { - @Override + private class OutputDoFn extends DoFn>> { + @ProcessElement public void processElement(ProcessContext c) { } } diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/SerializableCoderTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/SerializableCoderTest.java index 3e7fd50fbe98e..b5465fae0e999 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/SerializableCoderTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/SerializableCoderTest.java @@ -26,7 +26,7 @@ import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.Create; -import org.apache.beam.sdk.transforms.OldDoFn; +import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.util.CloudObject; import org.apache.beam.sdk.util.CoderUtils; @@ -82,15 +82,15 @@ public int hashCode() { } } - static class StringToRecord extends OldDoFn { - @Override + static class StringToRecord extends DoFn { + @ProcessElement public void processElement(ProcessContext c) { c.output(new MyRecord(c.element())); } } - static class RecordToString extends OldDoFn { - @Override + static class RecordToString extends DoFn { + @ProcessElement public void processElement(ProcessContext c) { c.output(c.element().value); } diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CountingInputTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CountingInputTest.java index 95f745461ede6..4ec2c9ae02da7 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CountingInputTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CountingInputTest.java @@ -29,9 +29,9 @@ import org.apache.beam.sdk.testing.RunnableOnService; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.Count; +import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.Max; import org.apache.beam.sdk.transforms.Min; -import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.RemoveDuplicates; @@ -120,8 +120,8 @@ public void testUnboundedInputRate() { assertThat(endTime.isAfter(startTime.plus(expectedRuntimeMillis)), is(true)); } - private static class ElementValueDiff extends OldDoFn { - @Override + private static class ElementValueDiff extends DoFn { + @ProcessElement public void processElement(ProcessContext c) throws Exception { c.output(c.element() - c.timestamp().getMillis()); } diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CountingSourceTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CountingSourceTest.java index 45f636f17267c..0bd91c14846f5 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CountingSourceTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CountingSourceTest.java @@ -34,10 +34,10 @@ import org.apache.beam.sdk.testing.RunnableOnService; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.Count; +import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.Flatten; import org.apache.beam.sdk.transforms.Max; import org.apache.beam.sdk.transforms.Min; -import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.RemoveDuplicates; import org.apache.beam.sdk.transforms.SerializableFunction; @@ -159,8 +159,8 @@ public void testUnboundedSource() { p.run(); } - private static class ElementValueDiff extends OldDoFn { - @Override + private static class ElementValueDiff extends DoFn { + @ProcessElement public void processElement(ProcessContext c) throws Exception { c.output(c.element() - c.timestamp().getMillis()); } diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubUnboundedSinkTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubUnboundedSinkTest.java index f8592c976256c..db03a5cce7a0b 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubUnboundedSinkTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubUnboundedSinkTest.java @@ -24,7 +24,7 @@ import org.apache.beam.sdk.testing.NeedsRunner; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.Create; -import org.apache.beam.sdk.transforms.OldDoFn; +import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.util.PubsubClient; import org.apache.beam.sdk.util.PubsubClient.OutgoingMessage; @@ -58,8 +58,8 @@ public class PubsubUnboundedSinkTest { private static final String ID_LABEL = "id"; private static final int NUM_SHARDS = 10; - private static class Stamp extends OldDoFn { - @Override + private static class Stamp extends DoFn { + @ProcessElement public void processElement(ProcessContext c) { c.outputWithTimestamp(c.element(), new Instant(TIMESTAMP)); } diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ApproximateUniqueTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ApproximateUniqueTest.java index 5c8732f8072e3..7b6d671aa9351 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ApproximateUniqueTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ApproximateUniqueTest.java @@ -54,7 +54,7 @@ */ @RunWith(JUnit4.class) public class ApproximateUniqueTest implements Serializable { - // implements Serializable just to make it easy to use anonymous inner OldDoFn subclasses + // implements Serializable just to make it easy to use anonymous inner DoFn subclasses @Test public void testEstimationErrorToSampleSize() { @@ -223,8 +223,8 @@ private static void runApproximateUniquePipeline(int sampleSize) { .apply(View.asSingleton()); PCollection> approximateAndExact = approximate - .apply(ParDo.of(new OldDoFn>() { - @Override + .apply(ParDo.of(new DoFn>() { + @ProcessElement public void processElement(ProcessContext c) { c.output(KV.of(c.element(), c.sideInput(exact))); } diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineFnsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineFnsTest.java index d6bf8269358f4..95ba1aa0d8d14 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineFnsTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineFnsTest.java @@ -461,7 +461,7 @@ public UserString extractOutput( } private static class ExtractResultDoFn - extends OldDoFn, KV>> { + extends DoFn, KV>> { private final TupleTag maxIntTag; private final TupleTag concatStringTag; @@ -471,7 +471,7 @@ private static class ExtractResultDoFn this.concatStringTag = concatStringTag; } - @Override + @ProcessElement public void processElement(ProcessContext c) throws Exception { UserString userString = c.element().getValue().get(concatStringTag); KV value = KV.of( diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java index cb9928e2d4944..6421b3b274ecf 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java @@ -117,7 +117,7 @@ public class CombineTest implements Serializable { 1, 1, 2, 3, 5, 8, 13, 21, 34, 55 }; - @Mock private OldDoFn.ProcessContext processContext; + @Mock private DoFn.ProcessContext processContext; PCollection> createInput(Pipeline p, KV[] table) { @@ -372,8 +372,8 @@ public void testSlidingWindowsCombineWithContext() { pipeline.run(); } - private static class FormatPaneInfo extends OldDoFn { - @Override + private static class FormatPaneInfo extends DoFn { + @ProcessElement public void processElement(ProcessContext c) { c.output(c.element() + ": " + c.pane().isLast()); } @@ -560,8 +560,8 @@ public void testHotKeyCombining() { pipeline.run(); } - private static class GetLast extends OldDoFn { - @Override + private static class GetLast extends DoFn { + @ProcessElement public void processElement(ProcessContext c) { if (c.pane().isLast()) { c.output(c.element()); @@ -653,8 +653,8 @@ public void testCombineGloballyAsSingletonView() { PCollection output = pipeline .apply("CreateVoidMainInput", Create.of((Void) null)) - .apply("OutputSideInput", ParDo.of(new OldDoFn() { - @Override + .apply("OutputSideInput", ParDo.of(new DoFn() { + @ProcessElement public void processElement(ProcessContext c) { c.output(c.sideInput(view)); } @@ -1176,8 +1176,8 @@ public Coder getAccumulatorCoder( } private static PCollection copy(PCollection pc, final int n) { - return pc.apply(ParDo.of(new OldDoFn() { - @Override + return pc.apply(ParDo.of(new DoFn() { + @ProcessElement public void processElement(ProcessContext c) throws Exception { for (int i = 0; i < n; i++) { c.output(c.element()); diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CreateTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CreateTest.java index cf65423a199e7..9db01368cc66b 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CreateTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CreateTest.java @@ -229,8 +229,8 @@ public void testCreateWithUnserializableElements() throws Exception { p.run(); } - private static class PrintTimestamps extends OldDoFn { - @Override + private static class PrintTimestamps extends DoFn { + @ProcessElement public void processElement(ProcessContext c) { c.output(c.element() + ":" + c.timestamp().getMillis()); } diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FlattenTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FlattenTest.java index b81eedbb243ee..604536bca80e1 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FlattenTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FlattenTest.java @@ -130,8 +130,8 @@ public void testEmptyFlattenAsSideInput() { PCollection output = p .apply(Create.of((Void) null).withCoder(VoidCoder.of())) - .apply(ParDo.withSideInputs(view).of(new OldDoFn() { - @Override + .apply(ParDo.withSideInputs(view).of(new DoFn() { + @ProcessElement public void processElement(ProcessContext c) { for (String side : c.sideInput(view)) { c.output(side); @@ -339,8 +339,8 @@ public void testFlattenGetName() { ///////////////////////////////////////////////////////////////////////////// - private static class IdentityFn extends OldDoFn { - @Override + private static class IdentityFn extends DoFn { + @ProcessElement public void processElement(ProcessContext c) { c.output(c.element()); } diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java index 15c3ba8855234..afe460fcdc7ce 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java @@ -371,14 +371,14 @@ public void testOutputTimeFnLatest() { pipeline.run(); } - private static class AssertTimestamp extends OldDoFn, Void> { + private static class AssertTimestamp extends DoFn, Void> { private final Instant timestamp; public AssertTimestamp(Instant timestamp) { this.timestamp = timestamp; } - @Override + @ProcessElement public void processElement(ProcessContext c) throws Exception { assertThat(c.timestamp(), equalTo(timestamp)); } @@ -506,9 +506,9 @@ public BadEqualityKey decode(InputStream inStream, Context context) * Creates a KV that wraps the original KV together with a random key. */ static class AssignRandomKey - extends OldDoFn, KV>> { + extends DoFn, KV>> { - @Override + @ProcessElement public void processElement(ProcessContext c) throws Exception { c.output(KV.of(ThreadLocalRandom.current().nextLong(), c.element())); } diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/WithTimestampsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/WithTimestampsTest.java index d2ba45287b57e..e3814708788ee 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/WithTimestampsTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/WithTimestampsTest.java @@ -65,9 +65,9 @@ public Instant apply(String input) { .apply(WithTimestamps.of(timestampFn)); PCollection> timestampedVals = - timestamped.apply(ParDo.of(new OldDoFn>() { - @Override - public void processElement(OldDoFn>.ProcessContext c) + timestamped.apply(ParDo.of(new DoFn>() { + @ProcessElement + public void processElement(DoFn>.ProcessContext c) throws Exception { c.output(KV.of(c.element(), c.timestamp())); } @@ -150,9 +150,9 @@ public Instant apply(String input) { WithTimestamps.of(backInTimeFn).withAllowedTimestampSkew(skew.plus(100L))); PCollection> timestampedVals = - timestampedWithSkew.apply(ParDo.of(new OldDoFn>() { - @Override - public void processElement(OldDoFn>.ProcessContext c) + timestampedWithSkew.apply(ParDo.of(new DoFn>() { + @ProcessElement + public void processElement(DoFn>.ProcessContext c) throws Exception { c.output(KV.of(c.element(), c.timestamp())); } diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataEvaluatorTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataEvaluatorTest.java index c1848c67c3846..e2331146ea8df 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataEvaluatorTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataEvaluatorTest.java @@ -24,7 +24,7 @@ import static org.junit.Assert.assertThat; import org.apache.beam.sdk.io.TextIO; -import org.apache.beam.sdk.transforms.OldDoFn; +import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.values.PBegin; @@ -50,8 +50,8 @@ public void testCompositeTransform() { new PTransform, POutput> () { @Override public PCollection apply(PCollection input) { - return input.apply(ParDo.of(new OldDoFn() { - @Override + return input.apply(ParDo.of(new DoFn() { + @ProcessElement public void processElement(ProcessContext c) throws Exception { c.output(c.element()); } @@ -79,8 +79,8 @@ public void populateDisplayData(DisplayData.Builder builder) { @Test public void testPrimitiveTransform() { PTransform, ? super PCollection> myTransform = ParDo.of( - new OldDoFn() { - @Override + new DoFn() { + @ProcessElement public void processElement(ProcessContext c) throws Exception {} @Override diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataTest.java index 517f9683bb7a2..e2f38b44190a7 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataTest.java @@ -41,7 +41,7 @@ import static org.junit.Assert.assertThat; import static org.junit.Assert.fail; -import org.apache.beam.sdk.transforms.OldDoFn; +import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.display.DisplayData.Builder; @@ -1053,8 +1053,8 @@ public void populateDisplayData(Builder builder) { private static class IdentityTransform extends PTransform, PCollection> { @Override public PCollection apply(PCollection input) { - return input.apply(ParDo.of(new OldDoFn() { - @Override + return input.apply(ParDo.of(new DoFn() { + @ProcessElement public void processElement(ProcessContext c) throws Exception { c.output(c.element()); } diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/join/CoGroupByKeyTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/join/CoGroupByKeyTest.java index 97667a3e6891c..c6f82ecb0ca41 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/join/CoGroupByKeyTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/join/CoGroupByKeyTest.java @@ -29,9 +29,8 @@ import org.apache.beam.sdk.testing.RunnableOnService; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.DoFnTester; -import org.apache.beam.sdk.transforms.OldDoFn; -import org.apache.beam.sdk.transforms.OldDoFn.RequiresWindowAccess; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; @@ -84,10 +83,11 @@ private PCollection> createInput(String name, input = p.apply("Create" + name, Create.timestamped(list, timestamps) .withCoder(KvCoder.of(BigEndianIntegerCoder.of(), StringUtf8Coder.of()))); } - return input - .apply("Identity" + name, ParDo.of(new OldDoFn, - KV>() { - @Override + return input.apply( + "Identity" + name, + ParDo.of( + new DoFn, KV>() { + @ProcessElement public void processElement(ProcessContext c) { c.output(c.element()); } @@ -313,11 +313,11 @@ public Void apply(Map results) { } /** - * A OldDoFn used in testCoGroupByKeyWithWindowing(), to test processing the - * results of a CoGroupByKey. + * A DoFn used in testCoGroupByKeyWithWindowing(), to test processing the results of a + * CoGroupByKey. */ - private static class ClickOfPurchaseFn extends - OldDoFn, KV> implements RequiresWindowAccess { + private static class ClickOfPurchaseFn + extends DoFn, KV> { private final TupleTag clicksTag; private final TupleTag purchasesTag; @@ -329,9 +329,9 @@ private ClickOfPurchaseFn( this.purchasesTag = purchasesTag; } - @Override - public void processElement(ProcessContext c) { - BoundedWindow w = c.window(); + @ProcessElement + public void processElement(ProcessContext c, BoundedWindow window) { + BoundedWindow w = window; KV e = c.element(); CoGbkResult row = e.getValue(); Iterable clicks = row.getAll(clicksTag); @@ -347,11 +347,11 @@ public void processElement(ProcessContext c) { /** - * A OldDoFn used in testCoGroupByKeyHandleResults(), to test processing the + * A DoFn used in testCoGroupByKeyHandleResults(), to test processing the * results of a CoGroupByKey. */ private static class CorrelatePurchaseCountForAddressesWithoutNamesFn extends - OldDoFn, KV> { + DoFn, KV> { private final TupleTag purchasesTag; private final TupleTag addressesTag; @@ -367,7 +367,7 @@ private CorrelatePurchaseCountForAddressesWithoutNamesFn( this.namesTag = namesTag; } - @Override + @ProcessElement public void processElement(ProcessContext c) { KV e = c.element(); CoGbkResult row = e.getValue(); @@ -401,7 +401,7 @@ public void processElement(ProcessContext c) { } /** - * Tests that the consuming OldDoFn + * Tests that the consuming DoFn * (CorrelatePurchaseCountForAddressesWithoutNamesFn) performs as expected. */ @SuppressWarnings("unchecked") diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowTest.java index 27d25391c44f7..c583860d2834f 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowTest.java @@ -36,8 +36,8 @@ import org.apache.beam.sdk.testing.RunnableOnService; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.GroupByKey; -import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.util.WindowingStrategy; @@ -199,8 +199,8 @@ public void testOutputTimeFnDefault() { .apply(GroupByKey.create()) .apply( ParDo.of( - new OldDoFn>, Void>() { - @Override + new DoFn>, Void>() { + @ProcessElement public void processElement(ProcessContext c) throws Exception { assertThat( c.timestamp(), @@ -231,8 +231,8 @@ public void testOutputTimeFnEndOfWindow() { .apply(Window.>into(FixedWindows.of(Duration.standardMinutes(10))) .withOutputTimeFn(OutputTimeFns.outputAtEndOfWindow())) .apply(GroupByKey.create()) - .apply(ParDo.of(new OldDoFn>, Void>() { - @Override + .apply(ParDo.of(new DoFn>, Void>() { + @ProcessElement public void processElement(ProcessContext c) throws Exception { assertThat(c.timestamp(), equalTo(new Instant(10 * 60 * 1000 - 1))); } diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowingTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowingTest.java index 622a2777ada54..159e7004f80fa 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowingTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowingTest.java @@ -26,9 +26,8 @@ import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.Count; import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.Flatten; -import org.apache.beam.sdk.transforms.OldDoFn; -import org.apache.beam.sdk.transforms.OldDoFn.RequiresWindowAccess; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.values.KV; @@ -58,12 +57,14 @@ public class WindowingTest implements Serializable { private static class WindowedCount extends PTransform, PCollection> { - private final class FormatCountsDoFn - extends OldDoFn, String> implements RequiresWindowAccess { - @Override - public void processElement(ProcessContext c) { - c.output(c.element().getKey() + ":" + c.element().getValue() - + ":" + c.timestamp().getMillis() + ":" + c.window()); + private final class FormatCountsDoFn extends DoFn, String> { + @ProcessElement + public void processElement(ProcessContext c, BoundedWindow window) { + c.output( + c.element().getKey() + + ":" + c.element().getValue() + + ":" + c.timestamp().getMillis() + + ":" + window); } } private WindowFn windowFn; @@ -234,9 +235,9 @@ public void testTextIoInput() throws Exception { p.run(); } - /** A OldDoFn that tokenizes lines of text into individual words. */ - static class ExtractWordsWithTimestampsFn extends OldDoFn { - @Override + /** A DoFn that tokenizes lines of text into individual words. */ + static class ExtractWordsWithTimestampsFn extends DoFn { + @ProcessElement public void processElement(ProcessContext c) { String[] words = c.element().split("[^a-zA-Z0-9']+"); if (words.length == 2) { diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/values/PCollectionTupleTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/values/PCollectionTupleTest.java index 547c778c27bd7..13218b2979c71 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/values/PCollectionTupleTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/values/PCollectionTupleTest.java @@ -26,7 +26,7 @@ import org.apache.beam.sdk.testing.RunnableOnService; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.Create; -import org.apache.beam.sdk.transforms.OldDoFn; +import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.util.WindowingStrategy; import org.apache.beam.sdk.values.PCollection.IsBounded; @@ -75,8 +75,8 @@ public void testComposePCollectionTuple() { .apply(Create.of(inputs)); PCollectionTuple outputs = mainInput.apply(ParDo - .of(new OldDoFn() { - @Override + .of(new DoFn() { + @ProcessElement public void processElement(ProcessContext c) { c.sideOutput(sideOutputTag, c.element()); }}) diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/values/TypedPValueTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/values/TypedPValueTest.java index c525cf1e28a48..287223f44b15c 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/values/TypedPValueTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/values/TypedPValueTest.java @@ -26,7 +26,7 @@ import org.apache.beam.sdk.coders.VarIntCoder; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.Create; -import org.apache.beam.sdk.transforms.OldDoFn; +import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.ParDo; import org.junit.Rule; @@ -44,9 +44,9 @@ public class TypedPValueTest { @Rule public ExpectedException thrown = ExpectedException.none(); - private static class IdentityDoFn extends OldDoFn { + private static class IdentityDoFn extends DoFn { private static final long serialVersionUID = 0; - @Override + @ProcessElement public void processElement(ProcessContext c) throws Exception { c.output(c.element()); } @@ -129,9 +129,9 @@ public void testUntypedMainOutputTagTypedSideOutputTupleTag() { static class EmptyClass { } - private static class EmptyClassDoFn extends OldDoFn { + private static class EmptyClassDoFn extends DoFn { private static final long serialVersionUID = 0; - @Override + @ProcessElement public void processElement(ProcessContext c) throws Exception { c.output(new EmptyClass()); } From ef5e31f8b79dcedf8feb4bba0e313bfcf330ab1e Mon Sep 17 00:00:00 2001 From: Kenneth Knowles Date: Wed, 3 Aug 2016 20:15:58 -0700 Subject: [PATCH 6/8] Port PAssert to new DoFn --- .../org/apache/beam/sdk/testing/PAssert.java | 39 +++++++++---------- 1 file changed, 19 insertions(+), 20 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java index 80340c22c2f57..e07ee3dad4f4b 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java @@ -33,11 +33,10 @@ import org.apache.beam.sdk.runners.PipelineRunner; import org.apache.beam.sdk.transforms.Aggregator; import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.Flatten; import org.apache.beam.sdk.transforms.GroupByKey; import org.apache.beam.sdk.transforms.MapElements; -import org.apache.beam.sdk.transforms.OldDoFn; -import org.apache.beam.sdk.transforms.OldDoFn.RequiresWindowAccess; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.SerializableFunction; @@ -762,8 +761,8 @@ public PCollectionView apply(PBegin input) { .apply("RewindowActuals", rewindowActuals.windowActuals()) .apply( ParDo.of( - new OldDoFn() { - @Override + new DoFn() { + @ProcessElement public void processElement(ProcessContext context) throws CoderException { context.output(CoderUtils.clone(coder, context.element())); } @@ -884,8 +883,8 @@ public PCollection>> apply(PCollection input) { } } - private static final class ConcatFn extends OldDoFn>, Iterable> { - @Override + private static final class ConcatFn extends DoFn>, Iterable> { + @ProcessElement public void processElement(ProcessContext c) throws Exception { c.output(Iterables.concat(c.element())); } @@ -995,13 +994,13 @@ public PDone apply(PBegin input) { } /** - * A {@link OldDoFn} that runs a checking {@link SerializableFunction} on the contents of a + * A {@link DoFn} that runs a checking {@link SerializableFunction} on the contents of a * {@link PCollectionView}, and adjusts counters and thrown exceptions for use in testing. * *

The input is ignored, but is {@link Integer} to be usable on runners that do not support * null values. */ - private static class SideInputCheckerDoFn extends OldDoFn { + private static class SideInputCheckerDoFn extends DoFn { private final SerializableFunction checkerFn; private final Aggregator success = createAggregator(SUCCESS_COUNTER, new Sum.SumIntegerFn()); @@ -1015,7 +1014,7 @@ private SideInputCheckerDoFn( this.actual = actual; } - @Override + @ProcessElement public void processElement(ProcessContext c) { try { ActualT actualContents = c.sideInput(actual); @@ -1030,13 +1029,13 @@ public void processElement(ProcessContext c) { } /** - * A {@link OldDoFn} that runs a checking {@link SerializableFunction} on the contents of + * A {@link DoFn} that runs a checking {@link SerializableFunction} on the contents of * the single iterable element of the input {@link PCollection} and adjusts counters and * thrown exceptions for use in testing. * *

The singleton property is presumed, not enforced. */ - private static class GroupedValuesCheckerDoFn extends OldDoFn { + private static class GroupedValuesCheckerDoFn extends DoFn { private final SerializableFunction checkerFn; private final Aggregator success = createAggregator(SUCCESS_COUNTER, new Sum.SumIntegerFn()); @@ -1047,7 +1046,7 @@ private GroupedValuesCheckerDoFn(SerializableFunction checkerFn) this.checkerFn = checkerFn; } - @Override + @ProcessElement public void processElement(ProcessContext c) { try { doChecks(c.element(), checkerFn, success, failure); @@ -1061,14 +1060,14 @@ public void processElement(ProcessContext c) { } /** - * A {@link OldDoFn} that runs a checking {@link SerializableFunction} on the contents of + * A {@link DoFn} that runs a checking {@link SerializableFunction} on the contents of * the single item contained within the single iterable on input and * adjusts counters and thrown exceptions for use in testing. * *

The singleton property of the input {@link PCollection} is presumed, not enforced. However, * each input element must be a singleton iterable, or this will fail. */ - private static class SingletonCheckerDoFn extends OldDoFn, Void> { + private static class SingletonCheckerDoFn extends DoFn, Void> { private final SerializableFunction checkerFn; private final Aggregator success = createAggregator(SUCCESS_COUNTER, new Sum.SumIntegerFn()); @@ -1079,7 +1078,7 @@ private SingletonCheckerDoFn(SerializableFunction checkerFn) { this.checkerFn = checkerFn; } - @Override + @ProcessElement public void processElement(ProcessContext c) { try { ActualT actualContents = Iterables.getOnlyElement(c.element()); @@ -1310,7 +1309,7 @@ public PTransform, PCollection> windowActuals() { } /** - * A OldDoFn that filters elements based on their presence in a static collection of windows. + * A DoFn that filters elements based on their presence in a static collection of windows. */ private static final class FilterWindows extends PTransform, PCollection> { private final StaticWindows windows; @@ -1324,10 +1323,10 @@ public PCollection apply(PCollection input) { return input.apply("FilterWindows", ParDo.of(new Fn())); } - private class Fn extends OldDoFn implements RequiresWindowAccess { - @Override - public void processElement(ProcessContext c) throws Exception { - if (windows.getWindows().contains(c.window())) { + private class Fn extends DoFn { + @ProcessElement + public void processElement(ProcessContext c, BoundedWindow window) throws Exception { + if (windows.getWindows().contains(window)) { c.output(c.element()); } } From 269fbf386454ea77845e54764a125edba7039b03 Mon Sep 17 00:00:00 2001 From: Kenneth Knowles Date: Wed, 3 Aug 2016 20:22:26 -0700 Subject: [PATCH 7/8] Port easy I/O transforms to new DoFn --- .../beam/runners/dataflow/DataflowRunner.java | 3 +- .../java/org/apache/beam/sdk/io/PubsubIO.java | 14 ++++---- .../beam/sdk/io/PubsubUnboundedSink.java | 17 +++++---- .../beam/sdk/io/PubsubUnboundedSource.java | 7 ++-- .../beam/sdk/io/gcp/bigquery/BigQueryIO.java | 36 +++++++++---------- .../beam/sdk/io/gcp/bigtable/BigtableIO.java | 12 +++---- .../beam/sdk/io/gcp/datastore/V1Beta3.java | 18 +++++----- .../sdk/io/gcp/bigquery/BigQueryIOTest.java | 10 +++--- .../sdk/io/gcp/bigtable/BigtableWriteIT.java | 6 ++-- .../sdk/io/gcp/datastore/V1Beta3TestUtil.java | 9 +++-- .../org/apache/beam/sdk/io/jms/JmsIO.java | 10 +++--- .../org/apache/beam/sdk/io/kafka/KafkaIO.java | 19 +++++----- .../apache/beam/sdk/io/kafka/KafkaIOTest.java | 10 +++--- 13 files changed, 82 insertions(+), 89 deletions(-) diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java index abcf4157f729d..fadd9c78d8886 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java @@ -78,6 +78,7 @@ import org.apache.beam.sdk.transforms.Combine; import org.apache.beam.sdk.transforms.Combine.CombineFn; import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.Flatten; import org.apache.beam.sdk.transforms.GroupByKey; import org.apache.beam.sdk.transforms.OldDoFn; @@ -2715,7 +2716,7 @@ private static class UnsupportedIO transform; @Nullable - private OldDoFn doFn; + private DoFn doFn; /** * Builds an instance of this class from the overridden transform. diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubIO.java index 1902bca73b560..2b2717574e551 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubIO.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubIO.java @@ -25,7 +25,7 @@ import org.apache.beam.sdk.options.PubsubOptions; import org.apache.beam.sdk.runners.PipelineRunner; import org.apache.beam.sdk.transforms.Create; -import org.apache.beam.sdk.transforms.OldDoFn; +import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.display.DisplayData; @@ -709,11 +709,11 @@ public Duration getMaxReadTime() { * *

Public so can be suppressed by runners. */ - public class PubsubBoundedReader extends OldDoFn { + public class PubsubBoundedReader extends DoFn { private static final int DEFAULT_PULL_SIZE = 100; private static final int ACK_TIMEOUT_SEC = 60; - @Override + @ProcessElement public void processElement(ProcessContext c) throws IOException { try (PubsubClient pubsubClient = FACTORY.newClient(timestampLabel, idLabel, @@ -998,12 +998,12 @@ public Coder getCoder() { * *

Public so can be suppressed by runners. */ - public class PubsubBoundedWriter extends OldDoFn { + public class PubsubBoundedWriter extends DoFn { private static final int MAX_PUBLISH_BATCH_SIZE = 100; private transient List output; private transient PubsubClient pubsubClient; - @Override + @StartBundle public void startBundle(Context c) throws IOException { this.output = new ArrayList<>(); // NOTE: idLabel is ignored. @@ -1012,7 +1012,7 @@ public void startBundle(Context c) throws IOException { c.getPipelineOptions().as(PubsubOptions.class)); } - @Override + @ProcessElement public void processElement(ProcessContext c) throws IOException { // NOTE: The record id is always null. OutgoingMessage message = @@ -1025,7 +1025,7 @@ public void processElement(ProcessContext c) throws IOException { } } - @Override + @FinishBundle public void finishBundle(Context c) throws IOException { if (!output.isEmpty()) { publish(); diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSink.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSink.java index 9e9536df48522..301475198b02f 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSink.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSink.java @@ -31,8 +31,8 @@ import org.apache.beam.sdk.coders.VarIntCoder; import org.apache.beam.sdk.options.PubsubOptions; import org.apache.beam.sdk.transforms.Aggregator; +import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.GroupByKey; -import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.Sum; @@ -65,7 +65,6 @@ import java.util.List; import java.util.UUID; import java.util.concurrent.ThreadLocalRandom; - import javax.annotation.Nullable; /** @@ -78,7 +77,7 @@ *

  • We try to send messages in batches while also limiting send latency. *
  • No stats are logged. Rather some counters are used to keep track of elements and batches. *
  • Though some background threads are used by the underlying netty system all actual Pubsub - * calls are blocking. We rely on the underlying runner to allow multiple {@link OldDoFn} instances + * calls are blocking. We rely on the underlying runner to allow multiple {@link DoFn} instances * to execute concurrently and hide latency. *
  • A failed bundle will cause messages to be resent. Thus we rely on the Pubsub consumer * to dedup messages. @@ -155,7 +154,7 @@ enum RecordIdMethod { /** * Convert elements to messages and shard them. */ - private static class ShardFn extends OldDoFn> { + private static class ShardFn extends DoFn> { private final Aggregator elementCounter = createAggregator("elements", new Sum.SumLongFn()); private final Coder elementCoder; @@ -168,7 +167,7 @@ private static class ShardFn extends OldDoFn> this.recordIdMethod = recordIdMethod; } - @Override + @ProcessElement public void processElement(ProcessContext c) throws Exception { elementCounter.addValue(1L); byte[] elementBytes = CoderUtils.encodeToByteArray(elementCoder, c.element()); @@ -207,7 +206,7 @@ public void populateDisplayData(Builder builder) { * Publish messages to Pubsub in batches. */ private static class WriterFn - extends OldDoFn>, Void> { + extends DoFn>, Void> { private final PubsubClientFactory pubsubFactory; private final TopicPath topic; private final String timestampLabel; @@ -253,14 +252,14 @@ private void publishBatch(List messages, int bytes) byteCounter.addValue((long) bytes); } - @Override + @StartBundle public void startBundle(Context c) throws Exception { checkState(pubsubClient == null, "startBundle invoked without prior finishBundle"); pubsubClient = pubsubFactory.newClient(timestampLabel, idLabel, c.getPipelineOptions().as(PubsubOptions.class)); } - @Override + @ProcessElement public void processElement(ProcessContext c) throws Exception { List pubsubMessages = new ArrayList<>(publishBatchSize); int bytes = 0; @@ -285,7 +284,7 @@ public void processElement(ProcessContext c) throws Exception { } } - @Override + @FinishBundle public void finishBundle(Context c) throws Exception { pubsubClient.close(); pubsubClient = null; diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSource.java index d98bd6adef87a..f99b47142425a 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSource.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSource.java @@ -31,7 +31,7 @@ import org.apache.beam.sdk.options.PubsubOptions; import org.apache.beam.sdk.transforms.Aggregator; import org.apache.beam.sdk.transforms.Combine; -import org.apache.beam.sdk.transforms.OldDoFn; +import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.Sum; @@ -77,7 +77,6 @@ import java.util.Set; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.atomic.AtomicInteger; - import javax.annotation.Nullable; /** @@ -1107,7 +1106,7 @@ public boolean requiresDeduping() { // StatsFn // ================================================================================ - private static class StatsFn extends OldDoFn { + private static class StatsFn extends DoFn { private final Aggregator elementCounter = createAggregator("elements", new Sum.SumLongFn()); @@ -1131,7 +1130,7 @@ public StatsFn( this.idLabel = idLabel; } - @Override + @ProcessElement public void processElement(ProcessContext c) throws Exception { elementCounter.addValue(1L); c.output(c.element()); diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java index 2ba756217852e..ed2c32ee3d2b4 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java @@ -42,6 +42,7 @@ import org.apache.beam.sdk.runners.PipelineRunner; import org.apache.beam.sdk.transforms.Aggregator; import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.GroupByKey; import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.PTransform; @@ -103,7 +104,6 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; - import org.apache.avro.generic.GenericRecord; import org.joda.time.Instant; import org.slf4j.Logger; @@ -135,7 +135,6 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.regex.Matcher; import java.util.regex.Pattern; - import javax.annotation.Nullable; /** @@ -334,7 +333,7 @@ public static String toTableSpec(TableReference ref) { *

    Each {@link TableRow} contains values indexed by column name. Here is a * sample processing function that processes a "line" column from rows: *

    {@code
    -   * static class ExtractWordsFn extends OldDoFn {
    +   * static class ExtractWordsFn extends DoFn {
        *   public void processElement(ProcessContext c) {
        *     // Get the "line" field of the TableRow object, split it into words, and emit them.
        *     TableRow row = c.element();
    @@ -706,8 +705,8 @@ public PCollection apply(PCollection input) {
           input.getPipeline()
               .apply("Create(CleanupOperation)", Create.of(cleanupOperation))
               .apply("Cleanup", ParDo.of(
    -              new OldDoFn() {
    -                @Override
    +              new DoFn() {
    +                @ProcessElement
                     public void processElement(ProcessContext c)
                         throws Exception {
                       c.element().cleanup(c.getPipelineOptions());
    @@ -717,8 +716,8 @@ public void processElement(ProcessContext c)
           return outputs.get(mainOutput);
         }
     
    -    private static class IdentityFn extends OldDoFn {
    -      @Override
    +    private static class IdentityFn extends DoFn {
    +      @ProcessElement
           public void processElement(ProcessContext c) {
             c.output(c.element());
           }
    @@ -1271,7 +1270,7 @@ private static List getExtractFilePaths(String extractDestinationDir, Jo
        * 

    Here is a sample transform that produces TableRow values containing * "word" and "count" columns: *

    {@code
    -   * static class FormatCountsFn extends OldDoFn, TableRow> {
    +   * static class FormatCountsFn extends DoFn, TableRow> {
        *   public void processElement(ProcessContext c) {
        *     TableRow row = new TableRow()
        *         .set("word", c.element().getKey())
    @@ -2307,11 +2306,11 @@ private static void verifyTablePresence(DatasetService datasetService, TableRefe
       /////////////////////////////////////////////////////////////////////////////
     
       /**
    -   * Implementation of OldDoFn to perform streaming BigQuery write.
    +   * Implementation of DoFn to perform streaming BigQuery write.
        */
       @SystemDoFnInternal
       private static class StreamingWriteFn
    -      extends OldDoFn, TableRowInfo>, Void> {
    +      extends DoFn, TableRowInfo>, Void> {
         /** TableSchema in JSON. Use String to make the class Serializable. */
         private final String jsonTableSchema;
     
    @@ -2339,14 +2338,14 @@ private static class StreamingWriteFn
         }
     
         /** Prepares a target BigQuery table. */
    -    @Override
    +    @StartBundle
         public void startBundle(Context context) {
           tableRows = new HashMap<>();
           uniqueIdsForTableRows = new HashMap<>();
         }
     
         /** Accumulates the input into JsonTableRows and uniqueIdsForTableRows. */
    -    @Override
    +    @ProcessElement
         public void processElement(ProcessContext context) {
           String tableSpec = context.element().getKey().getKey();
           List rows = getOrCreateMapListValue(tableRows, tableSpec);
    @@ -2357,7 +2356,7 @@ public void processElement(ProcessContext context) {
         }
     
         /** Writes the accumulated rows into BigQuery with streaming API. */
    -    @Override
    +    @FinishBundle
         public void finishBundle(Context context) throws Exception {
           BigQueryOptions options = context.getPipelineOptions().as(BigQueryOptions.class);
     
    @@ -2544,8 +2543,7 @@ private static class TableRowInfo {
        * id is created by concatenating this randomUUID with a sequential number.
        */
       private static class TagWithUniqueIdsAndTable
    -      extends OldDoFn, TableRowInfo>>
    -      implements OldDoFn.RequiresWindowAccess {
    +      extends DoFn, TableRowInfo>> {
         /** TableSpec to write to. */
         private final String tableSpec;
     
    @@ -2571,18 +2569,18 @@ private static class TagWithUniqueIdsAndTable
         }
     
     
    -    @Override
    +    @StartBundle
         public void startBundle(Context context) {
           randomUUID = UUID.randomUUID().toString();
         }
     
         /** Tag the input with a unique id. */
    -    @Override
    -    public void processElement(ProcessContext context) throws IOException {
    +    @ProcessElement
    +    public void processElement(ProcessContext context, BoundedWindow window) throws IOException {
           String uniqueId = randomUUID + sequenceNo++;
           ThreadLocalRandom randomGenerator = ThreadLocalRandom.current();
           String tableSpec = tableSpecFromWindow(
    -          context.getPipelineOptions().as(BigQueryOptions.class), context.window());
    +          context.getPipelineOptions().as(BigQueryOptions.class), window);
           // We output on keys 0-50 to ensure that there's enough batching for
           // BigQuery.
           context.output(KV.of(ShardedKey.of(tableSpec, randomGenerator.nextInt(0, 50)),
    diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java
    index 1f77e3e48367b..bfdf4aaf0a637 100644
    --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java
    +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java
    @@ -31,7 +31,7 @@
     import org.apache.beam.sdk.io.range.ByteKeyRangeTracker;
     import org.apache.beam.sdk.options.PipelineOptions;
     import org.apache.beam.sdk.runners.PipelineRunner;
    -import org.apache.beam.sdk.transforms.OldDoFn;
    +import org.apache.beam.sdk.transforms.DoFn;
     import org.apache.beam.sdk.transforms.PTransform;
     import org.apache.beam.sdk.transforms.ParDo;
     import org.apache.beam.sdk.transforms.display.DisplayData;
    @@ -55,7 +55,6 @@
     import com.google.protobuf.ByteString;
     
     import io.grpc.Status;
    -
     import org.slf4j.Logger;
     import org.slf4j.LoggerFactory;
     
    @@ -65,7 +64,6 @@
     import java.util.List;
     import java.util.NoSuchElementException;
     import java.util.concurrent.ConcurrentLinkedQueue;
    -
     import javax.annotation.Nullable;
     
     /**
    @@ -512,7 +510,7 @@ private BigtableService getBigtableService() {
           return new BigtableServiceImpl(options);
         }
     
    -    private class BigtableWriterFn extends OldDoFn>, Void> {
    +    private class BigtableWriterFn extends DoFn>, Void> {
     
           public BigtableWriterFn(String tableId, BigtableService bigtableService) {
             this.tableId = checkNotNull(tableId, "tableId");
    @@ -520,13 +518,13 @@ public BigtableWriterFn(String tableId, BigtableService bigtableService) {
             this.failures = new ConcurrentLinkedQueue<>();
           }
     
    -      @Override
    +      @StartBundle
           public void startBundle(Context c) throws Exception {
             bigtableWriter = bigtableService.openForWriting(tableId);
             recordsWritten = 0;
           }
     
    -      @Override
    +      @ProcessElement
           public void processElement(ProcessContext c) throws Exception {
             checkForFailures();
             Futures.addCallback(
    @@ -534,7 +532,7 @@ public void processElement(ProcessContext c) throws Exception {
             ++recordsWritten;
           }
     
    -      @Override
    +      @FinishBundle
           public void finishBundle(Context c) throws Exception {
             bigtableWriter.close();
             bigtableWriter = null;
    diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/V1Beta3.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/V1Beta3.java
    index 6f3663ad3d10a..052feb34a8479 100644
    --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/V1Beta3.java
    +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/V1Beta3.java
    @@ -37,9 +37,9 @@
     import org.apache.beam.sdk.options.GcpOptions;
     import org.apache.beam.sdk.options.PipelineOptions;
     import org.apache.beam.sdk.transforms.Create;
    +import org.apache.beam.sdk.transforms.DoFn;
     import org.apache.beam.sdk.transforms.Flatten;
     import org.apache.beam.sdk.transforms.GroupByKey;
    -import org.apache.beam.sdk.transforms.OldDoFn;
     import org.apache.beam.sdk.transforms.PTransform;
     import org.apache.beam.sdk.transforms.ParDo;
     import org.apache.beam.sdk.transforms.Values;
    @@ -478,11 +478,11 @@ public String getNamespace() {
         }
     
         /**
    -     * A {@link OldDoFn} that splits a given query into multiple sub-queries, assigns them unique
    +     * A {@link DoFn} that splits a given query into multiple sub-queries, assigns them unique
          * keys and outputs them as {@link KV}.
          */
         @VisibleForTesting
    -    static class SplitQueryFn extends OldDoFn> {
    +    static class SplitQueryFn extends DoFn> {
           private final V1Beta3Options options;
           // number of splits to make for a given query
           private final int numSplits;
    @@ -505,13 +505,13 @@ public SplitQueryFn(V1Beta3Options options, int numSplits) {
             this.datastoreFactory = datastoreFactory;
           }
     
    -      @Override
    +      @StartBundle
           public void startBundle(Context c) throws Exception {
             datastore = datastoreFactory.getDatastore(c.getPipelineOptions(), options.projectId);
             querySplitter = datastoreFactory.getQuerySplitter();
           }
     
    -      @Override
    +      @ProcessElement
           public void processElement(ProcessContext c) throws Exception {
             int key = 1;
             Query query = c.element();
    @@ -559,10 +559,10 @@ public void populateDisplayData(Builder builder) {
         }
     
         /**
    -     * A {@link OldDoFn} that reads entities from Datastore for each query.
    +     * A {@link DoFn} that reads entities from Datastore for each query.
          */
         @VisibleForTesting
    -    static class ReadFn extends OldDoFn {
    +    static class ReadFn extends DoFn {
           private final V1Beta3Options options;
           private final V1Beta3DatastoreFactory datastoreFactory;
           // Datastore client
    @@ -578,13 +578,13 @@ public ReadFn(V1Beta3Options options) {
             this.datastoreFactory = datastoreFactory;
           }
     
    -      @Override
    +      @StartBundle
           public void startBundle(Context c) throws Exception {
             datastore = datastoreFactory.getDatastore(c.getPipelineOptions(), options.getProjectId());
           }
     
           /** Read and output entities for the given query. */
    -      @Override
    +      @ProcessElement
           public void processElement(ProcessContext context) throws Exception {
             Query query = context.element();
             String namespace = options.getNamespace();
    diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
    index 1ea1f9472f346..6d6eb60defe6a 100644
    --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
    +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
    @@ -22,6 +22,7 @@
     import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
     
     import static com.google.common.base.Preconditions.checkArgument;
    +
     import static org.hamcrest.Matchers.hasItem;
     import static org.junit.Assert.assertEquals;
     import static org.junit.Assert.assertNull;
    @@ -64,8 +65,8 @@
     import org.apache.beam.sdk.testing.SourceTestUtils.ExpectedSplitOutcome;
     import org.apache.beam.sdk.testing.TestPipeline;
     import org.apache.beam.sdk.transforms.Create;
    +import org.apache.beam.sdk.transforms.DoFn;
     import org.apache.beam.sdk.transforms.DoFnTester;
    -import org.apache.beam.sdk.transforms.OldDoFn;
     import org.apache.beam.sdk.transforms.ParDo;
     import org.apache.beam.sdk.transforms.SerializableFunction;
     import org.apache.beam.sdk.transforms.display.DisplayData;
    @@ -131,7 +132,6 @@
     import java.util.Map;
     import java.util.NoSuchElementException;
     import java.util.Set;
    -
     import javax.annotation.Nullable;
     
     /**
    @@ -235,7 +235,7 @@ private static class FakeJobService implements JobService, Serializable {
         private Object[] pollJobReturns;
         private String executingProject;
         // Both counts will be reset back to zeros after serialization.
    -    // This is a work around for OldDoFn's verifyUnmodified check.
    +    // This is a work around for DoFn's verifyUnmodified check.
         private transient int startJobCallsCount;
         private transient int pollJobStatusCallsCount;
     
    @@ -571,8 +571,8 @@ public void testReadFromTable() {
             .apply(BigQueryIO.Read.from("non-executing-project:somedataset.sometable")
                 .withTestServices(fakeBqServices)
                 .withoutValidation())
    -        .apply(ParDo.of(new OldDoFn() {
    -          @Override
    +        .apply(ParDo.of(new DoFn() {
    +          @ProcessElement
               public void processElement(ProcessContext c) throws Exception {
                 c.output((String) c.element().get("name"));
               }
    diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableWriteIT.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableWriteIT.java
    index 83489a597e9c5..ee3a6f97642bb 100644
    --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableWriteIT.java
    +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableWriteIT.java
    @@ -23,7 +23,7 @@
     import org.apache.beam.sdk.io.CountingInput;
     import org.apache.beam.sdk.options.PipelineOptionsFactory;
     import org.apache.beam.sdk.testing.TestPipeline;
    -import org.apache.beam.sdk.transforms.OldDoFn;
    +import org.apache.beam.sdk.transforms.DoFn;
     import org.apache.beam.sdk.transforms.ParDo;
     import org.apache.beam.sdk.values.KV;
     
    @@ -108,8 +108,8 @@ public void testE2EBigtableWrite() throws Exception {
     
         Pipeline p = Pipeline.create(options);
         p.apply(CountingInput.upTo(numRows))
    -        .apply(ParDo.of(new OldDoFn>>() {
    -          @Override
    +        .apply(ParDo.of(new DoFn>>() {
    +          @ProcessElement
               public void processElement(ProcessContext c) {
                 int index = c.element().intValue();
     
    diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1Beta3TestUtil.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1Beta3TestUtil.java
    index daed1cbc91209..7eaf23e805689 100644
    --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1Beta3TestUtil.java
    +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1Beta3TestUtil.java
    @@ -27,7 +27,7 @@
     
     import org.apache.beam.sdk.options.GcpOptions;
     import org.apache.beam.sdk.options.PipelineOptions;
    -import org.apache.beam.sdk.transforms.OldDoFn;
    +import org.apache.beam.sdk.transforms.DoFn;
     import org.apache.beam.sdk.util.AttemptBoundedExponentialBackOff;
     import org.apache.beam.sdk.util.RetryHttpRequestInitializer;
     
    @@ -60,7 +60,6 @@
     import java.util.Iterator;
     import java.util.List;
     import java.util.UUID;
    -
     import javax.annotation.Nullable;
     
     class V1Beta3TestUtil {
    @@ -109,9 +108,9 @@ static Entity makeEntity(Long value, Key ancestorKey, String kind, @Nullable Str
       }
     
       /**
    -   * A OldDoFn that creates entity for a long number.
    +   * A DoFn that creates entity for a long number.
        */
    -  static class CreateEntityFn extends OldDoFn {
    +  static class CreateEntityFn extends DoFn {
         private final String kind;
         @Nullable
         private final String namespace;
    @@ -124,7 +123,7 @@ static class CreateEntityFn extends OldDoFn {
           ancestorKey = makeAncestorKey(namespace, kind, ancestor);
         }
     
    -    @Override
    +    @ProcessElement
         public void processElement(ProcessContext c) throws Exception {
           c.output(makeEntity(c.element(), ancestorKey, kind, namespace));
         }
    diff --git a/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java b/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java
    index eeb02e639d48c..557fe13a3a372 100644
    --- a/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java
    +++ b/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java
    @@ -28,7 +28,7 @@
     import org.apache.beam.sdk.io.UnboundedSource.CheckpointMark;
     import org.apache.beam.sdk.io.UnboundedSource.UnboundedReader;
     import org.apache.beam.sdk.options.PipelineOptions;
    -import org.apache.beam.sdk.transforms.OldDoFn;
    +import org.apache.beam.sdk.transforms.DoFn;
     import org.apache.beam.sdk.transforms.PTransform;
     import org.apache.beam.sdk.transforms.ParDo;
     import org.apache.beam.sdk.transforms.display.DisplayData;
    @@ -453,7 +453,7 @@ public void validate(PCollection input) {
           checkArgument((queue != null || topic != null), "Either queue or topic is required");
         }
     
    -    private static class JmsWriter extends OldDoFn {
    +    private static class JmsWriter extends DoFn {
     
           private ConnectionFactory connectionFactory;
           private String queue;
    @@ -469,7 +469,7 @@ public JmsWriter(ConnectionFactory connectionFactory, String queue, String topic
             this.topic = topic;
           }
     
    -      @Override
    +      @StartBundle
           public void startBundle(Context c) throws Exception {
             if (producer == null) {
               this.connection = connectionFactory.createConnection();
    @@ -486,7 +486,7 @@ public void startBundle(Context c) throws Exception {
             }
           }
     
    -      @Override
    +      @ProcessElement
           public void processElement(ProcessContext ctx) throws Exception {
             String value = ctx.element();
     
    @@ -499,7 +499,7 @@ public void processElement(ProcessContext ctx) throws Exception {
             }
           }
     
    -      @Override
    +      @FinishBundle
           public void finishBundle(Context c) throws Exception {
             producer.close();
             producer = null;
    diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
    index 227121649e5ee..2383105d2e612 100644
    --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
    +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
    @@ -33,7 +33,7 @@
     import org.apache.beam.sdk.io.UnboundedSource.UnboundedReader;
     import org.apache.beam.sdk.io.kafka.KafkaCheckpointMark.PartitionMark;
     import org.apache.beam.sdk.options.PipelineOptions;
    -import org.apache.beam.sdk.transforms.OldDoFn;
    +import org.apache.beam.sdk.transforms.DoFn;
     import org.apache.beam.sdk.transforms.PTransform;
     import org.apache.beam.sdk.transforms.ParDo;
     import org.apache.beam.sdk.transforms.SerializableFunction;
    @@ -94,7 +94,6 @@
     import java.util.concurrent.SynchronousQueue;
     import java.util.concurrent.TimeUnit;
     import java.util.concurrent.atomic.AtomicBoolean;
    -
     import javax.annotation.Nullable;
     
     /**
    @@ -550,8 +549,8 @@ public PCollection> apply(PBegin begin) {
           return typedRead
               .apply(begin)
               .apply("Remove Kafka Metadata",
    -              ParDo.of(new OldDoFn, KV>() {
    -                @Override
    +              ParDo.of(new DoFn, KV>() {
    +                @ProcessElement
                     public void processElement(ProcessContext ctx) {
                       ctx.output(ctx.element().getKV());
                     }
    @@ -1315,8 +1314,8 @@ private KafkaValueWrite(TypedWrite kvWriteTransform) {
         public PDone apply(PCollection input) {
           return input
             .apply("Kafka values with default key",
    -          ParDo.of(new OldDoFn>() {
    -            @Override
    +          ParDo.of(new DoFn>() {
    +            @ProcessElement
                 public void processElement(ProcessContext ctx) throws Exception {
                   ctx.output(KV.of(null, ctx.element()));
                 }
    @@ -1326,9 +1325,9 @@ public void processElement(ProcessContext ctx) throws Exception {
         }
       }
     
    -  private static class KafkaWriter extends OldDoFn, Void> {
    +  private static class KafkaWriter extends DoFn, Void> {
     
    -    @Override
    +    @StartBundle
         public void startBundle(Context c) throws Exception {
           // Producer initialization is fairly costly. Move this to future initialization api to avoid
           // creating a producer for each bundle.
    @@ -1341,7 +1340,7 @@ public void startBundle(Context c) throws Exception {
           }
         }
     
    -    @Override
    +    @ProcessElement
         public void processElement(ProcessContext ctx) throws Exception {
           checkForFailures();
     
    @@ -1351,7 +1350,7 @@ public void processElement(ProcessContext ctx) throws Exception {
               new SendCallback());
         }
     
    -    @Override
    +    @FinishBundle
         public void finishBundle(Context c) throws Exception {
           producer.flush();
           producer.close();
    diff --git a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
    index d7b1921df4442..9a89c3621bcd3 100644
    --- a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
    +++ b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
    @@ -33,10 +33,10 @@
     import org.apache.beam.sdk.testing.PAssert;
     import org.apache.beam.sdk.testing.TestPipeline;
     import org.apache.beam.sdk.transforms.Count;
    +import org.apache.beam.sdk.transforms.DoFn;
     import org.apache.beam.sdk.transforms.Flatten;
     import org.apache.beam.sdk.transforms.Max;
     import org.apache.beam.sdk.transforms.Min;
    -import org.apache.beam.sdk.transforms.OldDoFn;
     import org.apache.beam.sdk.transforms.ParDo;
     import org.apache.beam.sdk.transforms.RemoveDuplicates;
     import org.apache.beam.sdk.transforms.SerializableFunction;
    @@ -280,8 +280,8 @@ public void testUnboundedSourceWithExplicitPartitions() {
         p.run();
       }
     
    -  private static class ElementValueDiff extends OldDoFn {
    -    @Override
    +  private static class ElementValueDiff extends DoFn {
    +    @ProcessElement
         public void processElement(ProcessContext c) throws Exception {
           c.output(c.element() - c.timestamp().getMillis());
         }
    @@ -308,8 +308,8 @@ public void testUnboundedSourceTimestamps() {
         p.run();
       }
     
    -  private static class RemoveKafkaMetadata extends OldDoFn, KV> {
    -    @Override
    +  private static class RemoveKafkaMetadata extends DoFn, KV> {
    +    @ProcessElement
         public void processElement(ProcessContext ctx) throws Exception {
           ctx.output(ctx.element().getKV());
         }
    
    From 47341e113334827101ddbf775c69ae34d178cd8f Mon Sep 17 00:00:00 2001
    From: Kenneth Knowles 
    Date: Wed, 3 Aug 2016 20:27:28 -0700
    Subject: [PATCH 8/8] Port easy transforms to new DoFn
    
    ---
     .../org/apache/beam/sdk/transforms/Count.java    |  4 ++--
     .../org/apache/beam/sdk/transforms/Create.java   |  4 ++--
     .../beam/sdk/transforms/FlatMapElements.java     |  4 ++--
     .../org/apache/beam/sdk/transforms/Flatten.java  |  4 ++--
     .../org/apache/beam/sdk/transforms/Keys.java     |  4 ++--
     .../org/apache/beam/sdk/transforms/KvSwap.java   |  4 ++--
     .../apache/beam/sdk/transforms/MapElements.java  |  4 ++--
     .../apache/beam/sdk/transforms/Partition.java    |  4 ++--
     .../beam/sdk/transforms/RemoveDuplicates.java    |  4 ++--
     .../org/apache/beam/sdk/transforms/Sample.java   |  6 +++---
     .../org/apache/beam/sdk/transforms/Values.java   |  4 ++--
     .../org/apache/beam/sdk/transforms/View.java     |  8 ++++----
     .../org/apache/beam/sdk/transforms/WithKeys.java |  4 ++--
     .../beam/sdk/transforms/WithTimestamps.java      |  6 +++---
     .../beam/sdk/transforms/join/CoGroupByKey.java   | 16 ++++++++--------
     15 files changed, 40 insertions(+), 40 deletions(-)
    
    diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Count.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Count.java
    index 7601ffc1df226..ac59c767504e1 100644
    --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Count.java
    +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Count.java
    @@ -107,8 +107,8 @@ public PerElement() { }
         public PCollection> apply(PCollection input) {
           return
               input
    -          .apply("Init", ParDo.of(new OldDoFn>() {
    -            @Override
    +          .apply("Init", ParDo.of(new DoFn>() {
    +            @ProcessElement
                 public void processElement(ProcessContext c) {
                   c.output(KV.of(c.element(), (Void) null));
                 }
    diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Create.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Create.java
    index fb7f7847159d6..08d0a7ae356ba 100644
    --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Create.java
    +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Create.java
    @@ -486,8 +486,8 @@ private TimestampedValues(
           this.elementCoder = elementCoder;
         }
     
    -    private static class ConvertTimestamps extends OldDoFn, T> {
    -      @Override
    +    private static class ConvertTimestamps extends DoFn, T> {
    +      @ProcessElement
           public void processElement(ProcessContext c) {
             c.outputWithTimestamp(c.element().getValue(), c.element().getTimestamp());
           }
    diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/FlatMapElements.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/FlatMapElements.java
    index b48da38bb06b4..694592ed86b01 100644
    --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/FlatMapElements.java
    +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/FlatMapElements.java
    @@ -133,9 +133,9 @@ private FlatMapElements(
     
       @Override
       public PCollection apply(PCollection input) {
    -    return input.apply("Map", ParDo.of(new OldDoFn() {
    +    return input.apply("Map", ParDo.of(new DoFn() {
           private static final long serialVersionUID = 0L;
    -      @Override
    +      @ProcessElement
           public void processElement(ProcessContext c) {
             for (OutputT element : fn.apply(c.element())) {
               c.output(element);
    diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Flatten.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Flatten.java
    index 53e898e99cbc6..7e09d7e4dd3bf 100644
    --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Flatten.java
    +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Flatten.java
    @@ -174,8 +174,8 @@ public PCollection apply(PCollection> in) {
           Coder elemCoder = ((IterableLikeCoder) inCoder).getElemCoder();
     
           return in.apply("FlattenIterables", ParDo.of(
    -          new OldDoFn, T>() {
    -            @Override
    +          new DoFn, T>() {
    +            @ProcessElement
                 public void processElement(ProcessContext c) {
                   for (T i : c.element()) {
                     c.output(i);
    diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Keys.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Keys.java
    index c8cbce80dc761..5ac1866a35904 100644
    --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Keys.java
    +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Keys.java
    @@ -58,8 +58,8 @@ private Keys() { }
       @Override
       public PCollection apply(PCollection> in) {
         return
    -        in.apply("Keys", ParDo.of(new OldDoFn, K>() {
    -          @Override
    +        in.apply("Keys", ParDo.of(new DoFn, K>() {
    +          @ProcessElement
               public void processElement(ProcessContext c) {
                 c.output(c.element().getKey());
               }
    diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/KvSwap.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/KvSwap.java
    index 430d37b0ff127..d4386d2a81077 100644
    --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/KvSwap.java
    +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/KvSwap.java
    @@ -62,8 +62,8 @@ private KvSwap() { }
       @Override
       public PCollection> apply(PCollection> in) {
         return
    -        in.apply("KvSwap", ParDo.of(new OldDoFn, KV>() {
    -          @Override
    +        in.apply("KvSwap", ParDo.of(new DoFn, KV>() {
    +          @ProcessElement
               public void processElement(ProcessContext c) {
                 KV e = c.element();
                 c.output(KV.of(e.getValue(), e.getKey()));
    diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/MapElements.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/MapElements.java
    index c83c39f8b610d..b7b9a5fa3d1fe 100644
    --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/MapElements.java
    +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/MapElements.java
    @@ -104,8 +104,8 @@ private MapElements(
     
       @Override
       public PCollection apply(PCollection input) {
    -    return input.apply("Map", ParDo.of(new OldDoFn() {
    -      @Override
    +    return input.apply("Map", ParDo.of(new DoFn() {
    +      @ProcessElement
           public void processElement(ProcessContext c) {
             c.output(fn.apply(c.element()));
           }
    diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Partition.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Partition.java
    index 2ddcc294ffcda..05c94700245a9 100644
    --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Partition.java
    +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Partition.java
    @@ -134,7 +134,7 @@ private Partition(PartitionDoFn partitionDoFn) {
         this.partitionDoFn = partitionDoFn;
       }
     
    -  private static class PartitionDoFn extends OldDoFn {
    +  private static class PartitionDoFn extends DoFn {
         private final int numPartitions;
         private final PartitionFn partitionFn;
         private final TupleTagList outputTags;
    @@ -163,7 +163,7 @@ public TupleTagList getOutputTags() {
           return outputTags;
         }
     
    -    @Override
    +    @ProcessElement
         public void processElement(ProcessContext c) {
           X input = c.element();
           int partition = partitionFn.partitionFor(input, numPartitions);
    diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/RemoveDuplicates.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/RemoveDuplicates.java
    index d82c4575532ea..bba4b51309570 100644
    --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/RemoveDuplicates.java
    +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/RemoveDuplicates.java
    @@ -85,8 +85,8 @@ public static  WithRepresentativeValues withRepresentativeValueF
       @Override
       public PCollection apply(PCollection in) {
         return in
    -        .apply("CreateIndex", ParDo.of(new OldDoFn>() {
    -          @Override
    +        .apply("CreateIndex", ParDo.of(new DoFn>() {
    +          @ProcessElement
               public void processElement(ProcessContext c) {
                 c.output(KV.of(c.element(), (Void) null));
               }
    diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Sample.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Sample.java
    index 724b2529d9a74..12ff2b90b4952 100644
    --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Sample.java
    +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Sample.java
    @@ -164,9 +164,9 @@ public void populateDisplayData(DisplayData.Builder builder) {
       }
     
       /**
    -   * A {@link OldDoFn} that returns up to limit elements from the side input PCollection.
    +   * A {@link DoFn} that returns up to limit elements from the side input PCollection.
        */
    -  private static class SampleAnyDoFn extends OldDoFn {
    +  private static class SampleAnyDoFn extends DoFn {
         long limit;
         final PCollectionView> iterableView;
     
    @@ -175,7 +175,7 @@ public SampleAnyDoFn(long limit, PCollectionView> iterableView) {
           this.iterableView = iterableView;
         }
     
    -    @Override
    +    @ProcessElement
         public void processElement(ProcessContext c) {
           for (T i : c.sideInput(iterableView)) {
             if (limit-- <= 0) {
    diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Values.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Values.java
    index 856e32a6e5be2..34342db53c6b8 100644
    --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Values.java
    +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Values.java
    @@ -58,8 +58,8 @@ private Values() { }
       @Override
       public PCollection apply(PCollection> in) {
         return
    -        in.apply("Values", ParDo.of(new OldDoFn, V>() {
    -          @Override
    +        in.apply("Values", ParDo.of(new DoFn, V>() {
    +          @ProcessElement
               public void processElement(ProcessContext c) {
                 c.output(c.element().getValue());
               }
    diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/View.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/View.java
    index 8a616378423b1..7a97c13d336cc 100644
    --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/View.java
    +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/View.java
    @@ -38,7 +38,7 @@
      *
      * 

    When a {@link ParDo} tranform is processing a main input * element in a window {@code w} and a {@link PCollectionView} is read via - * {@link OldDoFn.ProcessContext#sideInput}, the value of the view for {@code w} is + * {@link DoFn.ProcessContext#sideInput}, the value of the view for {@code w} is * returned. * *

    The SDK supports viewing a {@link PCollection}, per window, as a single value, @@ -118,7 +118,7 @@ * * PCollection PageVisits = urlVisits * .apply(ParDo.withSideInputs(urlToPage) - * .of(new OldDoFn() { + * .of(new DoFn() { * {@literal @}Override * void processElement(ProcessContext context) { * UrlVisit urlVisit = context.element(); @@ -154,11 +154,11 @@ private View() { } * *

    If the input {@link PCollection} is empty, * throws {@link java.util.NoSuchElementException} in the consuming - * {@link OldDoFn}. + * {@link DoFn}. * *

    If the input {@link PCollection} contains more than one * element, throws {@link IllegalArgumentException} in the - * consuming {@link OldDoFn}. + * consuming {@link DoFn}. */ public static AsSingleton asSingleton() { return new AsSingleton<>(); diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/WithKeys.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/WithKeys.java index 37d45aa3b5af4..2a44963e6fc2d 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/WithKeys.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/WithKeys.java @@ -113,8 +113,8 @@ public WithKeys withKeyType(TypeDescriptor keyType) { @Override public PCollection> apply(PCollection in) { PCollection> result = - in.apply("AddKeys", ParDo.of(new OldDoFn>() { - @Override + in.apply("AddKeys", ParDo.of(new DoFn>() { + @ProcessElement public void processElement(ProcessContext c) { c.output(KV.of(fn.apply(c.element()), c.element())); diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/WithTimestamps.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/WithTimestamps.java index 41b549b979751..7b395f5e8dbf4 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/WithTimestamps.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/WithTimestamps.java @@ -92,7 +92,7 @@ public WithTimestamps withAllowedTimestampSkew(Duration allowedTimestampSkew) * Returns the allowed timestamp skew duration, which is the maximum * duration that timestamps can be shifted backwards from the timestamp of the input element. * - * @see OldDoFn#getAllowedTimestampSkew() + * @see DoFn#getAllowedTimestampSkew() */ public Duration getAllowedTimestampSkew() { return allowedTimestampSkew; @@ -105,7 +105,7 @@ public PCollection apply(PCollection input) { .setTypeDescriptorInternal(input.getTypeDescriptor()); } - private static class AddTimestampsDoFn extends OldDoFn { + private static class AddTimestampsDoFn extends DoFn { private final SerializableFunction fn; private final Duration allowedTimestampSkew; @@ -114,7 +114,7 @@ public AddTimestampsDoFn(SerializableFunction fn, Duration allowedTi this.allowedTimestampSkew = allowedTimestampSkew; } - @Override + @ProcessElement public void processElement(ProcessContext c) { Instant timestamp = fn.apply(c.element()); checkNotNull( diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/CoGroupByKey.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/CoGroupByKey.java index 1bd9f4a90bbe5..cb06f95354fe5 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/CoGroupByKey.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/CoGroupByKey.java @@ -19,9 +19,9 @@ import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.Flatten; import org.apache.beam.sdk.transforms.GroupByKey; -import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.join.CoGbkResult.CoGbkResultCoder; @@ -57,7 +57,7 @@ * * PCollection finalResultCollection = * coGbkResultCollection.apply(ParDo.of( - * new OldDoFn, T>() { + * new DoFn, T>() { * @Override * public void processElement(ProcessContext c) { * KV e = c.element(); @@ -167,12 +167,12 @@ private PCollection> makeUnionTable( } /** - * A OldDoFn to construct a UnionTable (i.e., a + * A DoFn to construct a UnionTable (i.e., a * {@code PCollection>} from a * {@code PCollection>}. */ private static class ConstructUnionTableFn extends - OldDoFn, KV> { + DoFn, KV> { private final int index; @@ -180,7 +180,7 @@ public ConstructUnionTableFn(int index) { this.index = index; } - @Override + @ProcessElement public void processElement(ProcessContext c) { KV e = c.element(); c.output(KV.of(e.getKey(), new RawUnionValue(index, e.getValue()))); @@ -188,11 +188,11 @@ public void processElement(ProcessContext c) { } /** - * A OldDoFn to construct a CoGbkResult from an input grouped union + * A DoFn to construct a CoGbkResult from an input grouped union * table. */ private static class ConstructCoGbkResultFn - extends OldDoFn>, + extends DoFn>, KV> { private final CoGbkResultSchema schema; @@ -201,7 +201,7 @@ public ConstructCoGbkResultFn(CoGbkResultSchema schema) { this.schema = schema; } - @Override + @ProcessElement public void processElement(ProcessContext c) { KV> e = c.element(); c.output(KV.of(e.getKey(), new CoGbkResult(schema, e.getValue())));