From 5f8b8c5b06cfd49c4293a20dff2eea99f1076444 Mon Sep 17 00:00:00 2001 From: Eugene Kirpichov Date: Tue, 17 Jan 2017 16:12:39 -0800 Subject: [PATCH] Moves OldDoFn to runners-core --- .../translation/WindowBoundTranslator.java | 2 +- .../operators/ApexGroupByKeyOperator.java | 2 +- .../operators/ApexParDoOperator.java | 2 +- .../beam/runners/core/AssignWindowsDoFn.java | 3 +-- .../beam/runners/core/DoFnAdapters.java | 1 - .../apache/beam/runners/core/DoFnRunner.java | 1 - .../apache/beam/runners/core/DoFnRunners.java | 1 - .../GroupAlsoByWindowViaWindowSetDoFn.java | 1 - .../runners/core/GroupAlsoByWindowsDoFn.java | 1 - .../core/LateDataDroppingDoFnRunner.java | 1 - .../apache/beam/runners/core}/OldDoFn.java | 9 ++++++++- .../runners/core/SimpleOldDoFnRunner.java | 3 +-- .../core}/DoFnDelegatingAggregatorTest.java | 4 +++- .../beam/runners/core}/NoOpOldDoFn.java | 3 ++- .../runners/core}/OldDoFnContextTest.java | 5 ++++- .../beam/runners/core}/OldDoFnTest.java | 7 ++++++- .../runners/core/SimpleOldDoFnRunnerTest.java | 2 +- .../flink/OldPerKeyCombineFnRunner.java | 2 +- .../flink/OldPerKeyCombineFnRunners.java | 2 +- .../functions/FlinkDoFnFunction.java | 2 +- .../FlinkMergingNonShuffleReduceFunction.java | 2 +- .../FlinkMultiOutputDoFnFunction.java | 2 +- .../FlinkMultiOutputProcessContext.java | 2 +- .../FlinkNoElementAssignContext.java | 2 +- .../functions/FlinkPartialReduceFunction.java | 2 +- .../functions/FlinkProcessContextBase.java | 2 +- .../functions/FlinkReduceFunction.java | 2 +- .../FlinkSingleOutputProcessContext.java | 2 +- .../wrappers/streaming/DoFnOperator.java | 2 +- .../streaming/WindowDoFnOperator.java | 2 +- .../beam/runners/dataflow/util/DoFnInfo.java | 4 +--- .../sdk/transforms/DelegatingAggregator.java | 2 +- .../org/apache/beam/sdk/util/NameUtils.java | 2 +- .../apache/beam/sdk/util/NameUtilsTest.java | 20 ++++++++----------- 34 files changed, 53 insertions(+), 49 deletions(-) rename {sdks/java/core/src/main/java/org/apache/beam/sdk/transforms => runners/core-java/src/main/java/org/apache/beam/runners/core}/OldDoFn.java (97%) rename {sdks/java/core/src/test/java/org/apache/beam/sdk/transforms => runners/core-java/src/test/java/org/apache/beam/runners/core}/DoFnDelegatingAggregatorTest.java (96%) rename {sdks/java/core/src/test/java/org/apache/beam/sdk/transforms => runners/core-java/src/test/java/org/apache/beam/runners/core}/NoOpOldDoFn.java (96%) rename {sdks/java/core/src/test/java/org/apache/beam/sdk/transforms => runners/core-java/src/test/java/org/apache/beam/runners/core}/OldDoFnContextTest.java (92%) rename {sdks/java/core/src/test/java/org/apache/beam/sdk/transforms => runners/core-java/src/test/java/org/apache/beam/runners/core}/OldDoFnTest.java (95%) diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/WindowBoundTranslator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/WindowBoundTranslator.java index ef049e19ce884..50af81d572922 100644 --- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/WindowBoundTranslator.java +++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/WindowBoundTranslator.java @@ -23,8 +23,8 @@ import org.apache.beam.runners.apex.translation.operators.ApexParDoOperator; import org.apache.beam.runners.core.AssignWindowsDoFn; import org.apache.beam.runners.core.DoFnAdapters; +import org.apache.beam.runners.core.OldDoFn; import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.Window; import org.apache.beam.sdk.util.WindowedValue; diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java index 173434f66c33e..4c2b46121908c 100644 --- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java +++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java @@ -44,6 +44,7 @@ import org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetDoFn; import org.apache.beam.runners.core.KeyedWorkItem; import org.apache.beam.runners.core.KeyedWorkItems; +import org.apache.beam.runners.core.OldDoFn; import org.apache.beam.runners.core.SystemReduceFn; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.CoderException; @@ -52,7 +53,6 @@ import org.apache.beam.sdk.transforms.Aggregator; import org.apache.beam.sdk.transforms.Combine; import org.apache.beam.sdk.transforms.GroupByKey; -import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.util.CoderUtils; diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java index de4c15d9017ac..808001e5825cc 100644 --- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java +++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java @@ -43,6 +43,7 @@ import org.apache.beam.runners.core.DoFnRunners; import org.apache.beam.runners.core.DoFnRunners.OutputManager; import org.apache.beam.runners.core.ExecutionContext; +import org.apache.beam.runners.core.OldDoFn; import org.apache.beam.runners.core.PushbackSideInputDoFnRunner; import org.apache.beam.runners.core.SideInputHandler; import org.apache.beam.sdk.coders.Coder; @@ -50,7 +51,6 @@ import org.apache.beam.sdk.transforms.Aggregator; import org.apache.beam.sdk.transforms.Combine.CombineFn; import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.util.NullSideInputReader; import org.apache.beam.sdk.util.SideInputReader; import org.apache.beam.sdk.util.UserCodeException; diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/AssignWindowsDoFn.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/AssignWindowsDoFn.java index 0eb1667c3a1ce..bbf3574616bd5 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/AssignWindowsDoFn.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/AssignWindowsDoFn.java @@ -21,8 +21,7 @@ import com.google.common.collect.Iterables; import java.util.Collection; -import org.apache.beam.sdk.transforms.OldDoFn; -import org.apache.beam.sdk.transforms.OldDoFn.RequiresWindowAccess; +import org.apache.beam.runners.core.OldDoFn.RequiresWindowAccess; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.transforms.windowing.WindowFn; diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnAdapters.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnAdapters.java index 0f5624f561660..23aba5895b90f 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnAdapters.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnAdapters.java @@ -26,7 +26,6 @@ import org.apache.beam.sdk.transforms.DoFn.Context; import org.apache.beam.sdk.transforms.DoFn.OnTimerContext; import org.apache.beam.sdk.transforms.DoFn.ProcessContext; -import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.transforms.reflect.DoFnInvoker; import org.apache.beam.sdk.transforms.reflect.DoFnInvokers; diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunner.java index 7c73a34915569..66f95dbbea97f 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunner.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunner.java @@ -19,7 +19,6 @@ import org.apache.beam.sdk.transforms.Aggregator; import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.util.TimeDomain; import org.apache.beam.sdk.util.WindowedValue; diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java index 2f3e93c5a4ab9..f3972aea545df 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java @@ -22,7 +22,6 @@ import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.Aggregator; import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.util.SideInputReader; import org.apache.beam.sdk.util.WindowedValue; diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java index d79683ada1fe7..ecce4fc4e4a22 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java @@ -21,7 +21,6 @@ import org.apache.beam.runners.core.triggers.ExecutableTriggerStateMachine; import org.apache.beam.runners.core.triggers.TriggerStateMachines; import org.apache.beam.sdk.transforms.Aggregator; -import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.Sum; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.util.SystemDoFnInternal; diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsDoFn.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsDoFn.java index 9a2f8fd020373..7e96136bd7156 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsDoFn.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsDoFn.java @@ -18,7 +18,6 @@ package org.apache.beam.runners.core; import org.apache.beam.sdk.transforms.Aggregator; -import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.Sum; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.util.SystemDoFnInternal; diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunner.java index 290171ad22800..9436ccffbeae1 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunner.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunner.java @@ -22,7 +22,6 @@ import com.google.common.base.Predicate; import com.google.common.collect.Iterables; import org.apache.beam.sdk.transforms.Aggregator; -import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.util.TimeDomain; import org.apache.beam.sdk.util.TimerInternals; diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/OldDoFn.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/OldDoFn.java similarity index 97% rename from sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/OldDoFn.java rename to runners/core-java/src/main/java/org/apache/beam/runners/core/OldDoFn.java index 7b0453302a398..b099721de8a13 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/OldDoFn.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/OldDoFn.java @@ -15,7 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.beam.sdk.transforms; +package org.apache.beam.runners.core; import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkNotNull; @@ -30,7 +30,14 @@ import org.apache.beam.sdk.annotations.Experimental.Kind; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.runners.PipelineRunner; +import org.apache.beam.sdk.transforms.Aggregator; +import org.apache.beam.sdk.transforms.Combine; import org.apache.beam.sdk.transforms.Combine.CombineFn; +import org.apache.beam.sdk.transforms.DelegatingAggregator; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.DoFnTester; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.transforms.display.HasDisplayData; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleOldDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleOldDoFnRunner.java index 9808e56830693..2fe9226593e03 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleOldDoFnRunner.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleOldDoFnRunner.java @@ -27,11 +27,10 @@ import java.util.Set; import org.apache.beam.runners.core.DoFnRunners.OutputManager; import org.apache.beam.runners.core.ExecutionContext.StepContext; +import org.apache.beam.runners.core.OldDoFn.RequiresWindowAccess; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.Aggregator; import org.apache.beam.sdk.transforms.Combine.CombineFn; -import org.apache.beam.sdk.transforms.OldDoFn; -import org.apache.beam.sdk.transforms.OldDoFn.RequiresWindowAccess; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.GlobalWindow; import org.apache.beam.sdk.transforms.windowing.GlobalWindows; diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnDelegatingAggregatorTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/DoFnDelegatingAggregatorTest.java similarity index 96% rename from sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnDelegatingAggregatorTest.java rename to runners/core-java/src/test/java/org/apache/beam/runners/core/DoFnDelegatingAggregatorTest.java index f51a6b0b15392..b44e8a42f888c 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnDelegatingAggregatorTest.java +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/DoFnDelegatingAggregatorTest.java @@ -15,7 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.beam.sdk.transforms; +package org.apache.beam.runners.core; import static org.junit.Assert.assertEquals; import static org.mockito.Matchers.anyLong; @@ -23,7 +23,9 @@ import static org.mockito.Mockito.never; import static org.mockito.Mockito.verify; +import org.apache.beam.sdk.transforms.Aggregator; import org.apache.beam.sdk.transforms.Combine.CombineFn; +import org.apache.beam.sdk.transforms.DelegatingAggregator; import org.junit.Before; import org.junit.Rule; import org.junit.Test; diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/NoOpOldDoFn.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/NoOpOldDoFn.java similarity index 96% rename from sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/NoOpOldDoFn.java rename to runners/core-java/src/test/java/org/apache/beam/runners/core/NoOpOldDoFn.java index 0db130db59ddd..5cbea8c5e1cf8 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/NoOpOldDoFn.java +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/NoOpOldDoFn.java @@ -15,9 +15,10 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.beam.sdk.transforms; +package org.apache.beam.runners.core; import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.transforms.Aggregator; import org.apache.beam.sdk.transforms.Combine.CombineFn; import org.apache.beam.sdk.values.TupleTag; import org.joda.time.Instant; diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/OldDoFnContextTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/OldDoFnContextTest.java similarity index 92% rename from sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/OldDoFnContextTest.java rename to runners/core-java/src/test/java/org/apache/beam/runners/core/OldDoFnContextTest.java index b5cb286d416d1..a1cd49deb3bc0 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/OldDoFnContextTest.java +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/OldDoFnContextTest.java @@ -15,12 +15,15 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.beam.sdk.transforms; +package org.apache.beam.runners.core; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import org.apache.beam.sdk.transforms.Aggregator; +import org.apache.beam.sdk.transforms.Combine; +import org.apache.beam.sdk.transforms.Sum; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/OldDoFnTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/OldDoFnTest.java similarity index 95% rename from sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/OldDoFnTest.java rename to runners/core-java/src/test/java/org/apache/beam/runners/core/OldDoFnTest.java index 1c767b17c9aa6..651bc7257d132 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/OldDoFnTest.java +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/OldDoFnTest.java @@ -15,7 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.beam.sdk.transforms; +package org.apache.beam.runners.core; import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.isA; @@ -25,7 +25,12 @@ import java.io.Serializable; import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.transforms.Aggregator; +import org.apache.beam.sdk.transforms.Combine; import org.apache.beam.sdk.transforms.Combine.CombineFn; +import org.apache.beam.sdk.transforms.Max; +import org.apache.beam.sdk.transforms.SerializableFunction; +import org.apache.beam.sdk.transforms.Sum; import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.values.TupleTag; import org.joda.time.Instant; diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleOldDoFnRunnerTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleOldDoFnRunnerTest.java index 4610069aa64df..97da9ee1c98e8 100644 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleOldDoFnRunnerTest.java +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleOldDoFnRunnerTest.java @@ -23,7 +23,7 @@ import java.util.Arrays; import java.util.List; import org.apache.beam.runners.core.BaseExecutionContext.StepContext; -import org.apache.beam.sdk.transforms.OldDoFn; + import org.apache.beam.sdk.util.SystemDoFnInternal; import org.apache.beam.sdk.util.UserCodeException; import org.apache.beam.sdk.util.WindowedValue; diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/OldPerKeyCombineFnRunner.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/OldPerKeyCombineFnRunner.java index 5d676dce55ded..71c3aa429ae3b 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/OldPerKeyCombineFnRunner.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/OldPerKeyCombineFnRunner.java @@ -18,8 +18,8 @@ package org.apache.beam.runners.flink; import java.io.Serializable; +import org.apache.beam.runners.core.OldDoFn; import org.apache.beam.sdk.transforms.CombineFnBase.PerKeyCombineFn; -import org.apache.beam.sdk.transforms.OldDoFn; /** * An interface that runs a {@link PerKeyCombineFn} with unified APIs using diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/OldPerKeyCombineFnRunners.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/OldPerKeyCombineFnRunners.java index 8ebeadf7d7756..90894f239d44e 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/OldPerKeyCombineFnRunners.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/OldPerKeyCombineFnRunners.java @@ -17,13 +17,13 @@ */ package org.apache.beam.runners.flink; +import org.apache.beam.runners.core.OldDoFn; import org.apache.beam.runners.core.PerKeyCombineFnRunner; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.Combine.KeyedCombineFn; import org.apache.beam.sdk.transforms.CombineFnBase.PerKeyCombineFn; import org.apache.beam.sdk.transforms.CombineWithContext; import org.apache.beam.sdk.transforms.CombineWithContext.KeyedCombineFnWithContext; -import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.values.PCollectionView; /** diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java index 2a4a68e7b5bb0..8b2bcc6baacf1 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java @@ -19,10 +19,10 @@ import java.util.Map; import org.apache.beam.runners.core.DoFnAdapters; +import org.apache.beam.runners.core.OldDoFn; import org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.reflect.DoFnSignatures; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.util.WindowingStrategy; diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingNonShuffleReduceFunction.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingNonShuffleReduceFunction.java index 1b4317280962a..5ec6a773c8e0e 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingNonShuffleReduceFunction.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingNonShuffleReduceFunction.java @@ -24,12 +24,12 @@ import java.util.Iterator; import java.util.List; import java.util.Map; +import org.apache.beam.runners.core.OldDoFn; import org.apache.beam.runners.flink.OldPerKeyCombineFnRunner; import org.apache.beam.runners.flink.OldPerKeyCombineFnRunners; import org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.CombineFnBase; -import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.IntervalWindow; import org.apache.beam.sdk.transforms.windowing.OutputTimeFn; diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMultiOutputDoFnFunction.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMultiOutputDoFnFunction.java index a97bd46fc77b6..aeeabbf59ecb5 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMultiOutputDoFnFunction.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMultiOutputDoFnFunction.java @@ -19,10 +19,10 @@ import java.util.Map; import org.apache.beam.runners.core.DoFnAdapters; +import org.apache.beam.runners.core.OldDoFn; import org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.join.RawUnionValue; import org.apache.beam.sdk.transforms.reflect.DoFnSignatures; import org.apache.beam.sdk.util.WindowedValue; diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMultiOutputProcessContext.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMultiOutputProcessContext.java index a3d2b1849dd4f..7882b5f7a53e7 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMultiOutputProcessContext.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMultiOutputProcessContext.java @@ -19,8 +19,8 @@ import java.util.Collection; import java.util.Map; +import org.apache.beam.runners.core.OldDoFn; import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.join.RawUnionValue; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkNoElementAssignContext.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkNoElementAssignContext.java index c89027262cd9c..ad7255b279022 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkNoElementAssignContext.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkNoElementAssignContext.java @@ -17,7 +17,7 @@ */ package org.apache.beam.runners.flink.translation.functions; -import org.apache.beam.sdk.transforms.OldDoFn; +import org.apache.beam.runners.core.OldDoFn; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.WindowFn; import org.joda.time.Instant; diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkPartialReduceFunction.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkPartialReduceFunction.java index f5a908799ce37..7db30d1132939 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkPartialReduceFunction.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkPartialReduceFunction.java @@ -24,12 +24,12 @@ import java.util.Comparator; import java.util.Iterator; import java.util.Map; +import org.apache.beam.runners.core.OldDoFn; import org.apache.beam.runners.flink.OldPerKeyCombineFnRunner; import org.apache.beam.runners.flink.OldPerKeyCombineFnRunners; import org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.CombineFnBase; -import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.OutputTimeFn; import org.apache.beam.sdk.transforms.windowing.PaneInfo; diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkProcessContextBase.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkProcessContextBase.java index 53b98038adb9a..e955679ff8fb2 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkProcessContextBase.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkProcessContextBase.java @@ -24,11 +24,11 @@ import java.util.Collections; import java.util.Iterator; import java.util.Map; +import org.apache.beam.runners.core.OldDoFn; import org.apache.beam.runners.flink.translation.wrappers.SerializableFnAggregatorWrapper; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.Aggregator; import org.apache.beam.sdk.transforms.Combine; -import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.util.TimerInternals; diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkReduceFunction.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkReduceFunction.java index a3fa0d41cc27a..81e37f48c16dd 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkReduceFunction.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkReduceFunction.java @@ -26,12 +26,12 @@ import java.util.Iterator; import java.util.List; import java.util.Map; +import org.apache.beam.runners.core.OldDoFn; import org.apache.beam.runners.flink.OldPerKeyCombineFnRunner; import org.apache.beam.runners.flink.OldPerKeyCombineFnRunners; import org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.CombineFnBase; -import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.OutputTimeFn; import org.apache.beam.sdk.transforms.windowing.PaneInfo; diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkSingleOutputProcessContext.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkSingleOutputProcessContext.java index 529b1cc9e528f..0db7f5a0764ce 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkSingleOutputProcessContext.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkSingleOutputProcessContext.java @@ -19,8 +19,8 @@ import java.util.Collection; import java.util.Map; +import org.apache.beam.runners.core.OldDoFn; import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.util.WindowedValue; diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java index 90cdf4c257868..ac85b3c090da5 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java @@ -33,6 +33,7 @@ import org.apache.beam.runners.core.DoFnRunner; import org.apache.beam.runners.core.DoFnRunners; import org.apache.beam.runners.core.ExecutionContext; +import org.apache.beam.runners.core.OldDoFn; import org.apache.beam.runners.core.PushbackSideInputDoFnRunner; import org.apache.beam.runners.core.SideInputHandler; import org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions; @@ -43,7 +44,6 @@ import org.apache.beam.sdk.transforms.Aggregator; import org.apache.beam.sdk.transforms.Combine; import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.join.RawUnionValue; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.util.CoderUtils; diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java index cd6b5aa982702..d4273b228d025 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java @@ -42,12 +42,12 @@ import org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetDoFn; import org.apache.beam.runners.core.KeyedWorkItem; import org.apache.beam.runners.core.KeyedWorkItems; +import org.apache.beam.runners.core.OldDoFn; import org.apache.beam.runners.core.SystemReduceFn; import org.apache.beam.runners.flink.translation.wrappers.DataInputViewWrapper; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.util.TimeDomain; import org.apache.beam.sdk.util.TimerInternals; diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DoFnInfo.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DoFnInfo.java index 0c5be90650005..4d80a39cb3a0d 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DoFnInfo.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DoFnInfo.java @@ -21,7 +21,6 @@ import java.util.Map; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.util.WindowingStrategy; import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.TupleTag; @@ -41,8 +40,7 @@ public class DoFnInfo implements Serializable { private final Map> outputMap; /** - * Creates a {@link DoFnInfo} for the given {@link Serializable} object, which is expected to be a - * {@link DoFn} or {@link OldDoFn} or other context-appropriate UDF blob. + * Creates a {@link DoFnInfo} for the given {@link DoFn}. */ public static DoFnInfo forFn( DoFn doFn, diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DelegatingAggregator.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DelegatingAggregator.java index e03d3b15775cb..cfaf0a64bf159 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DelegatingAggregator.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DelegatingAggregator.java @@ -35,7 +35,7 @@ * @param the type of input element * @param the type of output element */ -class DelegatingAggregator +public class DelegatingAggregator implements Aggregator, Serializable { private static final AtomicInteger ID_GEN = new AtomicInteger(); private final int id; diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/NameUtils.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/NameUtils.java index 1c59af74a5819..72179a35c12da 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/NameUtils.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/NameUtils.java @@ -40,7 +40,7 @@ public interface NameOverride { } private static final String[] STANDARD_NAME_SUFFIXES = - new String[]{"OldDoFn", "DoFn", "CombineFn", "Fn"}; + new String[]{"DoFn", "CombineFn", "Fn"}; /** * Pattern to match a non-anonymous inner class. diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/NameUtilsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/NameUtilsTest.java index b81aa366c43ba..6848ea440ad48 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/NameUtilsTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/NameUtilsTest.java @@ -42,22 +42,18 @@ public class NameUtilsTest { @Test public void testDropsStandardSuffixes() { - assertEquals("Embedded", NameUtils.approximateSimpleName("EmbeddedOldDoFn", true)); assertEquals("Embedded", NameUtils.approximateSimpleName("EmbeddedDoFn", true)); assertEquals("Embedded", NameUtils.approximateSimpleName("EmbeddedFn", true)); - assertEquals("Embedded", NameUtils.approximateSimpleName("EmbeddedOldDoFn", false)); assertEquals("Embedded", NameUtils.approximateSimpleName("EmbeddedDoFn", false)); assertEquals("Embedded", NameUtils.approximateSimpleName("EmbeddedFn", false)); } @Test public void testDropsStandardSuffixesInAllComponents() { - assertEquals("Embedded", NameUtils.approximateSimpleName("SomeOldDoFn$EmbeddedFn", true)); assertEquals("Embedded", NameUtils.approximateSimpleName("SomeDoFn$EmbeddedDoFn", true)); assertEquals("Embedded", NameUtils.approximateSimpleName("SomeFn$EmbeddedFn", true)); - assertEquals("Some.Embedded", NameUtils.approximateSimpleName("SomeOldDoFn$EmbeddedFn", false)); assertEquals("Some.Embedded", NameUtils.approximateSimpleName("SomeDoFn$EmbeddedDoFn", false)); assertEquals("Some.Embedded", NameUtils.approximateSimpleName("SomeFn$EmbeddedFn", false)); } @@ -79,12 +75,12 @@ public void testDropsOuterClassNamesFalse() { /** * Inner class for simple name test. */ - private class EmbeddedOldDoFn { + private class EmbeddedDoFn { - private class DeeperEmbeddedOldDoFn extends EmbeddedOldDoFn {} + private class DeeperEmbeddedDoFn extends EmbeddedDoFn {} - private EmbeddedOldDoFn getEmbedded() { - return new DeeperEmbeddedOldDoFn(); + private EmbeddedDoFn getEmbedded() { + return new DeeperEmbeddedDoFn(); } } @@ -112,18 +108,18 @@ private interface AnonymousClass { @Test public void testSimpleName() { - assertEquals("Embedded", NameUtils.approximateSimpleName(new EmbeddedOldDoFn())); + assertEquals("Embedded", NameUtils.approximateSimpleName(new EmbeddedDoFn())); } @Test public void testAnonSimpleName() throws Exception { - assertEquals("Anonymous", NameUtils.approximateSimpleName(new EmbeddedOldDoFn() {})); + assertEquals("Anonymous", NameUtils.approximateSimpleName(new EmbeddedDoFn() {})); } @Test public void testNestedSimpleName() { - EmbeddedOldDoFn fn = new EmbeddedOldDoFn(); - EmbeddedOldDoFn inner = fn.getEmbedded(); + EmbeddedDoFn fn = new EmbeddedDoFn(); + EmbeddedDoFn inner = fn.getEmbedded(); assertEquals("DeeperEmbedded", NameUtils.approximateSimpleName(inner)); }