Skip to content

Commit

Permalink
Revert "Moves OldDoFn to runners-core"
Browse files Browse the repository at this point in the history
This reverts commit 5f8b8c5.
  • Loading branch information
aljoscha committed Jan 31, 2017
1 parent d6ddce3 commit 27ad603
Show file tree
Hide file tree
Showing 34 changed files with 49 additions and 53 deletions.
Expand Up @@ -23,8 +23,8 @@
import org.apache.beam.runners.apex.translation.operators.ApexParDoOperator;
import org.apache.beam.runners.core.AssignWindowsDoFn;
import org.apache.beam.runners.core.DoFnAdapters;
import org.apache.beam.runners.core.OldDoFn;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.util.WindowedValue;
Expand Down
Expand Up @@ -44,7 +44,6 @@
import org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetDoFn;
import org.apache.beam.runners.core.KeyedWorkItem;
import org.apache.beam.runners.core.KeyedWorkItems;
import org.apache.beam.runners.core.OldDoFn;
import org.apache.beam.runners.core.SystemReduceFn;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderException;
Expand All @@ -53,6 +52,7 @@
import org.apache.beam.sdk.transforms.Aggregator;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.GroupByKey;
import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.util.CoderUtils;
Expand Down
Expand Up @@ -43,14 +43,14 @@
import org.apache.beam.runners.core.DoFnRunners;
import org.apache.beam.runners.core.DoFnRunners.OutputManager;
import org.apache.beam.runners.core.ExecutionContext;
import org.apache.beam.runners.core.OldDoFn;
import org.apache.beam.runners.core.PushbackSideInputDoFnRunner;
import org.apache.beam.runners.core.SideInputHandler;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.ListCoder;
import org.apache.beam.sdk.transforms.Aggregator;
import org.apache.beam.sdk.transforms.Combine.CombineFn;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.util.NullSideInputReader;
import org.apache.beam.sdk.util.SideInputReader;
import org.apache.beam.sdk.util.UserCodeException;
Expand Down
Expand Up @@ -21,7 +21,8 @@

import com.google.common.collect.Iterables;
import java.util.Collection;
import org.apache.beam.runners.core.OldDoFn.RequiresWindowAccess;
import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.transforms.OldDoFn.RequiresWindowAccess;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.transforms.windowing.WindowFn;
Expand Down
Expand Up @@ -26,6 +26,7 @@
import org.apache.beam.sdk.transforms.DoFn.Context;
import org.apache.beam.sdk.transforms.DoFn.OnTimerContext;
import org.apache.beam.sdk.transforms.DoFn.ProcessContext;
import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.transforms.reflect.DoFnInvoker;
import org.apache.beam.sdk.transforms.reflect.DoFnInvokers;
Expand Down
Expand Up @@ -19,6 +19,7 @@

import org.apache.beam.sdk.transforms.Aggregator;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.TimeDomain;
import org.apache.beam.sdk.util.WindowedValue;
Expand Down
Expand Up @@ -24,6 +24,7 @@
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.Aggregator;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.SideInputReader;
import org.apache.beam.sdk.util.WindowedValue;
Expand Down
Expand Up @@ -21,6 +21,7 @@
import org.apache.beam.runners.core.triggers.ExecutableTriggerStateMachine;
import org.apache.beam.runners.core.triggers.TriggerStateMachines;
import org.apache.beam.sdk.transforms.Aggregator;
import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.transforms.Sum;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.SystemDoFnInternal;
Expand Down
Expand Up @@ -18,6 +18,7 @@
package org.apache.beam.runners.core;

import org.apache.beam.sdk.transforms.Aggregator;
import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.transforms.Sum;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.SystemDoFnInternal;
Expand Down
Expand Up @@ -22,6 +22,7 @@
import com.google.common.base.Predicate;
import com.google.common.collect.Iterables;
import org.apache.beam.sdk.transforms.Aggregator;
import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.TimeDomain;
import org.apache.beam.sdk.util.TimerInternals;
Expand Down
Expand Up @@ -27,10 +27,11 @@
import java.util.Set;
import org.apache.beam.runners.core.DoFnRunners.OutputManager;
import org.apache.beam.runners.core.ExecutionContext.StepContext;
import org.apache.beam.runners.core.OldDoFn.RequiresWindowAccess;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.Aggregator;
import org.apache.beam.sdk.transforms.Combine.CombineFn;
import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.transforms.OldDoFn.RequiresWindowAccess;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
Expand Down
Expand Up @@ -23,7 +23,7 @@
import java.util.Arrays;
import java.util.List;
import org.apache.beam.runners.core.BaseExecutionContext.StepContext;

import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.util.SystemDoFnInternal;
import org.apache.beam.sdk.util.UserCodeException;
import org.apache.beam.sdk.util.WindowedValue;
Expand Down
Expand Up @@ -18,8 +18,8 @@
package org.apache.beam.runners.flink;

import java.io.Serializable;
import org.apache.beam.runners.core.OldDoFn;
import org.apache.beam.sdk.transforms.CombineFnBase.PerKeyCombineFn;
import org.apache.beam.sdk.transforms.OldDoFn;

/**
* An interface that runs a {@link PerKeyCombineFn} with unified APIs using
Expand Down
Expand Up @@ -17,13 +17,13 @@
*/
package org.apache.beam.runners.flink;

import org.apache.beam.runners.core.OldDoFn;
import org.apache.beam.runners.core.PerKeyCombineFnRunner;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.Combine.KeyedCombineFn;
import org.apache.beam.sdk.transforms.CombineFnBase.PerKeyCombineFn;
import org.apache.beam.sdk.transforms.CombineWithContext;
import org.apache.beam.sdk.transforms.CombineWithContext.KeyedCombineFnWithContext;
import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.values.PCollectionView;

/**
Expand Down
Expand Up @@ -19,10 +19,10 @@

import java.util.Map;
import org.apache.beam.runners.core.DoFnAdapters;
import org.apache.beam.runners.core.OldDoFn;
import org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.util.WindowingStrategy;
Expand Down
Expand Up @@ -24,12 +24,12 @@
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.beam.runners.core.OldDoFn;
import org.apache.beam.runners.flink.OldPerKeyCombineFnRunner;
import org.apache.beam.runners.flink.OldPerKeyCombineFnRunners;
import org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.CombineFnBase;
import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
Expand Down
Expand Up @@ -19,10 +19,10 @@

import java.util.Map;
import org.apache.beam.runners.core.DoFnAdapters;
import org.apache.beam.runners.core.OldDoFn;
import org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.transforms.join.RawUnionValue;
import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
import org.apache.beam.sdk.util.WindowedValue;
Expand Down
Expand Up @@ -19,8 +19,8 @@

import java.util.Collection;
import java.util.Map;
import org.apache.beam.runners.core.OldDoFn;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.transforms.join.RawUnionValue;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
Expand Down
Expand Up @@ -17,7 +17,7 @@
*/
package org.apache.beam.runners.flink.translation.functions;

import org.apache.beam.runners.core.OldDoFn;
import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.WindowFn;
import org.joda.time.Instant;
Expand Down
Expand Up @@ -24,12 +24,12 @@
import java.util.Comparator;
import java.util.Iterator;
import java.util.Map;
import org.apache.beam.runners.core.OldDoFn;
import org.apache.beam.runners.flink.OldPerKeyCombineFnRunner;
import org.apache.beam.runners.flink.OldPerKeyCombineFnRunners;
import org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.CombineFnBase;
import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
Expand Down
Expand Up @@ -24,11 +24,11 @@
import java.util.Collections;
import java.util.Iterator;
import java.util.Map;
import org.apache.beam.runners.core.OldDoFn;
import org.apache.beam.runners.flink.translation.wrappers.SerializableFnAggregatorWrapper;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.Aggregator;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.util.TimerInternals;
Expand Down
Expand Up @@ -26,12 +26,12 @@
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.beam.runners.core.OldDoFn;
import org.apache.beam.runners.flink.OldPerKeyCombineFnRunner;
import org.apache.beam.runners.flink.OldPerKeyCombineFnRunners;
import org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.CombineFnBase;
import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
Expand Down
Expand Up @@ -19,8 +19,8 @@

import java.util.Collection;
import java.util.Map;
import org.apache.beam.runners.core.OldDoFn;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.util.WindowedValue;
Expand Down
Expand Up @@ -33,7 +33,6 @@
import org.apache.beam.runners.core.DoFnRunner;
import org.apache.beam.runners.core.DoFnRunners;
import org.apache.beam.runners.core.ExecutionContext;
import org.apache.beam.runners.core.OldDoFn;
import org.apache.beam.runners.core.PushbackSideInputDoFnRunner;
import org.apache.beam.runners.core.SideInputHandler;
import org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions;
Expand All @@ -44,6 +43,7 @@
import org.apache.beam.sdk.transforms.Aggregator;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.transforms.join.RawUnionValue;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.CoderUtils;
Expand Down
Expand Up @@ -42,12 +42,12 @@
import org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetDoFn;
import org.apache.beam.runners.core.KeyedWorkItem;
import org.apache.beam.runners.core.KeyedWorkItems;
import org.apache.beam.runners.core.OldDoFn;
import org.apache.beam.runners.core.SystemReduceFn;
import org.apache.beam.runners.flink.translation.wrappers.DataInputViewWrapper;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.TimeDomain;
import org.apache.beam.sdk.util.TimerInternals;
Expand Down
Expand Up @@ -23,6 +23,7 @@
import java.util.Map;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.util.WindowingStrategy;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.TupleTag;
Expand All @@ -43,7 +44,8 @@ public class DoFnInfo<InputT, OutputT> implements Serializable {
private final Map<Long, TupleTag<?>> outputMap;

/**
* Creates a {@link DoFnInfo} for the given {@link DoFn}.
* Creates a {@link DoFnInfo} for the given {@link Serializable} object, which is expected to be a
* {@link DoFn} or {@link OldDoFn} or other context-appropriate UDF blob.
*/
public static <InputT, OutputT> DoFnInfo<InputT, OutputT> forFn(
Serializable doFn,
Expand Down
Expand Up @@ -35,7 +35,7 @@
* @param <AggInputT> the type of input element
* @param <AggOutputT> the type of output element
*/
public class DelegatingAggregator<AggInputT, AggOutputT>
class DelegatingAggregator<AggInputT, AggOutputT>
implements Aggregator<AggInputT, AggOutputT>, Serializable {
private static final AtomicInteger ID_GEN = new AtomicInteger();
private final int id;
Expand Down
Expand Up @@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.runners.core;
package org.apache.beam.sdk.transforms;

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
Expand All @@ -30,14 +30,7 @@
import org.apache.beam.sdk.annotations.Experimental.Kind;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.runners.PipelineRunner;
import org.apache.beam.sdk.transforms.Aggregator;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.Combine.CombineFn;
import org.apache.beam.sdk.transforms.DelegatingAggregator;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.DoFnTester;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.transforms.display.HasDisplayData;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
Expand Down
Expand Up @@ -40,7 +40,7 @@ public interface NameOverride {
}

private static final String[] STANDARD_NAME_SUFFIXES =
new String[]{"DoFn", "CombineFn", "Fn"};
new String[]{"OldDoFn", "DoFn", "CombineFn", "Fn"};

/**
* Pattern to match a non-anonymous inner class.
Expand Down
Expand Up @@ -15,17 +15,15 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.runners.core;
package org.apache.beam.sdk.transforms;

import static org.junit.Assert.assertEquals;
import static org.mockito.Matchers.anyLong;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;

import org.apache.beam.sdk.transforms.Aggregator;
import org.apache.beam.sdk.transforms.Combine.CombineFn;
import org.apache.beam.sdk.transforms.DelegatingAggregator;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
Expand Down
Expand Up @@ -15,10 +15,9 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.runners.core;
package org.apache.beam.sdk.transforms;

import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.Aggregator;
import org.apache.beam.sdk.transforms.Combine.CombineFn;
import org.apache.beam.sdk.values.TupleTag;
import org.joda.time.Instant;
Expand Down
Expand Up @@ -15,15 +15,12 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.runners.core;
package org.apache.beam.sdk.transforms;

import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import org.apache.beam.sdk.transforms.Aggregator;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.Sum;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
Expand Down

0 comments on commit 27ad603

Please sign in to comment.