From baa3e4997e061eed69d33fd1ffbfd2af832bf0f6 Mon Sep 17 00:00:00 2001 From: Kenneth Knowles Date: Thu, 15 Dec 2016 20:13:25 -0800 Subject: [PATCH] Remove deprecated AggregatorFactory from SDK --- .../operators/ApexParDoOperator.java | 2 +- .../apache/beam/runners/core/DoFnRunners.java | 1 - .../beam/runners/core/SimpleDoFnRunner.java | 1 - .../runners/core/SimpleOldDoFnRunner.java | 1 - .../runners/direct/AggregatorContainer.java | 2 +- .../wrappers/streaming/DoFnOperator.java | 3 ++- .../spark/aggregators/SparkAggregators.java | 5 +++-- .../beam/sdk/transforms/Aggregator.java | 19 ------------------- 8 files changed, 7 insertions(+), 27 deletions(-) diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java index 1e769496651f..4538fb5a052b 100644 --- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java +++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java @@ -37,6 +37,7 @@ import org.apache.beam.runners.apex.translation.utils.NoOpStepContext; import org.apache.beam.runners.apex.translation.utils.SerializablePipelineOptions; import org.apache.beam.runners.apex.translation.utils.ValueAndCoderKryoSerializable; +import org.apache.beam.runners.core.AggregatorFactory; import org.apache.beam.runners.core.DoFnAdapters; import org.apache.beam.runners.core.DoFnRunner; import org.apache.beam.runners.core.DoFnRunners; @@ -46,7 +47,6 @@ import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.ListCoder; import org.apache.beam.sdk.transforms.Aggregator; -import org.apache.beam.sdk.transforms.Aggregator.AggregatorFactory; import org.apache.beam.sdk.transforms.Combine.CombineFn; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.OldDoFn; diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java index da16573c3afa..0e4bf75ba0b4 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java @@ -22,7 +22,6 @@ import org.apache.beam.runners.core.GroupByKeyViaGroupByKeyOnly.GroupAlsoByWindow; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.Aggregator; -import org.apache.beam.sdk.transforms.Aggregator.AggregatorFactory; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java index 041cdde69292..d504b403002c 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java @@ -30,7 +30,6 @@ import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.Aggregator; -import org.apache.beam.sdk.transforms.Aggregator.AggregatorFactory; import org.apache.beam.sdk.transforms.Combine.CombineFn; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.DoFn.Context; diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleOldDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleOldDoFnRunner.java index 10af29aa2bdf..7d9320062f27 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleOldDoFnRunner.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleOldDoFnRunner.java @@ -28,7 +28,6 @@ import org.apache.beam.runners.core.DoFnRunners.OutputManager; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.Aggregator; -import org.apache.beam.sdk.transforms.Aggregator.AggregatorFactory; import org.apache.beam.sdk.transforms.Combine.CombineFn; import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.OldDoFn.RequiresWindowAccess; diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/AggregatorContainer.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/AggregatorContainer.java index e86bc3ebaf28..c7fa4df7cd8a 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/AggregatorContainer.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/AggregatorContainer.java @@ -27,8 +27,8 @@ import java.util.concurrent.ConcurrentMap; import javax.annotation.Nullable; import javax.annotation.concurrent.GuardedBy; +import org.apache.beam.runners.core.AggregatorFactory; import org.apache.beam.sdk.transforms.Aggregator; -import org.apache.beam.sdk.transforms.Aggregator.AggregatorFactory; import org.apache.beam.sdk.transforms.Combine.CombineFn; import org.apache.beam.sdk.util.ExecutionContext; diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java index 87b15a7e08e6..001e3b68ce0c 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java @@ -28,6 +28,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import org.apache.beam.runners.core.AggregatorFactory; import org.apache.beam.runners.core.DoFnAdapters; import org.apache.beam.runners.core.DoFnRunner; import org.apache.beam.runners.core.DoFnRunners; @@ -192,7 +193,7 @@ public void open() throws Exception { currentInputWatermark = Long.MIN_VALUE; currentOutputWatermark = currentInputWatermark; - Aggregator.AggregatorFactory aggregatorFactory = new Aggregator.AggregatorFactory() { + AggregatorFactory aggregatorFactory = new AggregatorFactory() { @Override public Aggregator createAggregatorForDoFn( Class fnClass, diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/SparkAggregators.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/SparkAggregators.java index 657264f377a6..17d584481bc2 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/SparkAggregators.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/SparkAggregators.java @@ -21,6 +21,7 @@ import com.google.common.collect.ImmutableList; import java.util.Collection; import java.util.Map; +import org.apache.beam.runners.core.AggregatorFactory; import org.apache.beam.runners.spark.translation.SparkRuntimeContext; import org.apache.beam.sdk.AggregatorValues; import org.apache.beam.sdk.transforms.Aggregator; @@ -99,9 +100,9 @@ public static T valueOf(final String name, } /** - * An implementation of {@link Aggregator.AggregatorFactory} for the SparkRunner. + * An implementation of {@link AggregatorFactory} for the SparkRunner. */ - public static class Factory implements Aggregator.AggregatorFactory { + public static class Factory implements AggregatorFactory { private final SparkRuntimeContext runtimeContext; private final Accumulator accumulator; diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Aggregator.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Aggregator.java index 43f53a8d5f94..4119c531c138 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Aggregator.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Aggregator.java @@ -18,7 +18,6 @@ package org.apache.beam.sdk.transforms; import org.apache.beam.sdk.transforms.Combine.CombineFn; -import org.apache.beam.sdk.util.ExecutionContext; /** * An {@code Aggregator} enables monitoring of values of type {@code InputT}, @@ -68,22 +67,4 @@ public interface Aggregator { * aggregator. */ CombineFn getCombineFn(); - - /** - * @deprecated this is for use only by runners and exists only for a migration period. Please - * use the identical interface in org.apache.beam.runners.core - */ - @Deprecated - interface AggregatorFactory { - /** - * Create an aggregator with the given {@code name} and {@link CombineFn}. - * - *

This method is called to create an aggregator for a {@link DoFn}. It receives the - * class of the {@link DoFn} being executed and the context of the step it is being - * executed in. - */ - Aggregator createAggregatorForDoFn( - Class fnClass, ExecutionContext.StepContext stepContext, - String aggregatorName, CombineFn combine); - } }