From de8ed03621586314c09e39015e074e6a4a7894cc Mon Sep 17 00:00:00 2001 From: Kenneth Knowles Date: Thu, 8 Dec 2016 23:33:40 -0800 Subject: [PATCH] Remove misc uses of OldDoFn --- .../game/utils/WriteWindowedToBigQuery.java | 5 +---- .../dataflow/internal/AssignWindows.java | 8 ++++---- .../DataflowPipelineTranslatorTest.java | 17 +++++++++-------- 3 files changed, 14 insertions(+), 16 deletions(-) diff --git a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/utils/WriteWindowedToBigQuery.java b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/utils/WriteWindowedToBigQuery.java index c32289f46381..7a4fb2c8c195 100644 --- a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/utils/WriteWindowedToBigQuery.java +++ b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/utils/WriteWindowedToBigQuery.java @@ -23,7 +23,6 @@ import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition; import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.OldDoFn.RequiresWindowAccess; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.values.PCollection; @@ -43,9 +42,7 @@ public WriteWindowedToBigQuery(String tableName, } /** Convert each key/score pair into a BigQuery TableRow. */ - protected class BuildRowFn extends DoFn - implements RequiresWindowAccess { - + protected class BuildRowFn extends DoFn { @ProcessElement public void processElement(ProcessContext c, BoundedWindow window) { diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/AssignWindows.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/AssignWindows.java index 68ee7bc7e23c..27fe13d76b1d 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/AssignWindows.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/AssignWindows.java @@ -18,7 +18,7 @@ package org.apache.beam.runners.dataflow.internal; import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.transforms.OldDoFn; +import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.windowing.Window; @@ -63,9 +63,9 @@ public PCollection expand(PCollection input) { } else { // If the windowFn didn't change, we just run a pass-through transform and then set the // new windowing strategy. - return input.apply("Identity", ParDo.of(new OldDoFn() { - @Override - public void processElement(OldDoFn.ProcessContext c) throws Exception { + return input.apply("Identity", ParDo.of(new DoFn() { + @ProcessElement + public void processElement(ProcessContext c) throws Exception { c.output(c.element()); } })).setWindowingStrategyInternal(outputStrategy); diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java index ac4f2dff42e9..8d0b83aa97d6 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java @@ -70,7 +70,7 @@ import org.apache.beam.sdk.options.ValueProvider; import org.apache.beam.sdk.transforms.Count; import org.apache.beam.sdk.transforms.Create; -import org.apache.beam.sdk.transforms.OldDoFn; +import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.Sum; @@ -508,7 +508,7 @@ private static OutputReference getOutputPortReference(Step step) throws Exceptio } /** - * Returns a Step for a OldDoFn by creating and translating a pipeline. + * Returns a Step for a {@link DoFn} by creating and translating a pipeline. */ private static Step createPredefinedStep() throws Exception { DataflowPipelineOptions options = buildPipelineOptions(); @@ -533,8 +533,9 @@ private static Step createPredefinedStep() throws Exception { return step; } - private static class NoOpFn extends OldDoFn { - @Override public void processElement(ProcessContext c) throws Exception { + private static class NoOpFn extends DoFn { + @ProcessElement + public void processElement(ProcessContext c) throws Exception { c.output(c.element()); } } @@ -899,8 +900,8 @@ public void testStepDisplayData() throws Exception { DataflowPipelineTranslator translator = DataflowPipelineTranslator.fromOptions(options); Pipeline pipeline = Pipeline.create(options); - OldDoFn fn1 = new OldDoFn() { - @Override + DoFn fn1 = new DoFn() { + @ProcessElement public void processElement(ProcessContext c) throws Exception { c.output(c.element()); } @@ -915,8 +916,8 @@ public void populateDisplayData(DisplayData.Builder builder) { } }; - OldDoFn fn2 = new OldDoFn() { - @Override + DoFn fn2 = new DoFn() { + @ProcessElement public void processElement(ProcessContext c) throws Exception { c.output(c.element()); }