From ad5eb06619b724236ad0d2a384b8ecf4c610f1e4 Mon Sep 17 00:00:00 2001 From: Eugene Kirpichov Date: Fri, 9 Dec 2016 17:21:40 -0800 Subject: [PATCH 1/6] Removes code for wrapping DoFn as an OldDoFn --- .../beam/sdk/transforms/DoFnAdapters.java | 150 --------- .../apache/beam/sdk/transforms/OldDoFn.java | 295 +----------------- .../sdk/transforms/reflect/DoFnInvokers.java | 141 +-------- .../transforms/reflect/DoFnInvokersTest.java | 36 --- 4 files changed, 11 insertions(+), 611 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnAdapters.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnAdapters.java index e15b08b46d79..d1c40a69f54f 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnAdapters.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnAdapters.java @@ -18,8 +18,6 @@ package org.apache.beam.sdk.transforms; import java.io.IOException; -import java.util.Collection; -import javax.annotation.Nullable; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.Combine.CombineFn; import org.apache.beam.sdk.transforms.DoFn.Context; @@ -38,7 +36,6 @@ import org.apache.beam.sdk.util.state.State; import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.TupleTag; -import org.apache.beam.sdk.values.TypeDescriptor; import org.joda.time.Duration; import org.joda.time.Instant; @@ -53,18 +50,6 @@ public class DoFnAdapters { /** Should not be instantiated. */ private DoFnAdapters() {} - /** - * If this is an {@link OldDoFn} produced via {@link #toOldDoFn}, returns the class of the - * original {@link DoFn}, otherwise returns {@code fn.getClass()}. - */ - public static Class getDoFnClass(OldDoFn fn) { - if (fn instanceof SimpleDoFnAdapter) { - return ((SimpleDoFnAdapter) fn).fn.getClass(); - } else { - return fn.getClass(); - } - } - /** Creates an {@link OldDoFn} that delegates to the {@link DoFn}. */ @SuppressWarnings({"unchecked", "rawtypes"}) public static OldDoFn toOldDoFn(DoFn fn) { @@ -76,126 +61,6 @@ public static OldDoFn toOldDoFn(DoFn OldDoFn.ProcessContext adaptProcessContext( - OldDoFn fn, - final DoFn.ProcessContext c, - final DoFnInvoker.ArgumentProvider extra) { - return fn.new ProcessContext() { - @Override - public InputT element() { - return c.element(); - } - - @Override - public T sideInput(PCollectionView view) { - return c.sideInput(view); - } - - @Override - public Instant timestamp() { - return c.timestamp(); - } - - @Override - public BoundedWindow window() { - return extra.window(); - } - - @Override - public PaneInfo pane() { - return c.pane(); - } - - @Override - public WindowingInternals windowingInternals() { - return extra.windowingInternals(); - } - - @Override - public PipelineOptions getPipelineOptions() { - return c.getPipelineOptions(); - } - - @Override - public void output(OutputT output) { - c.output(output); - } - - @Override - public void outputWithTimestamp(OutputT output, Instant timestamp) { - c.outputWithTimestamp(output, timestamp); - } - - @Override - public void sideOutput(TupleTag tag, T output) { - c.sideOutput(tag, output); - } - - @Override - public void sideOutputWithTimestamp(TupleTag tag, T output, Instant timestamp) { - c.sideOutputWithTimestamp(tag, output, timestamp); - } - - @Override - protected Aggregator createAggregatorInternal( - String name, CombineFn combiner) { - return c.createAggregator(name, combiner); - } - }; - } - - /** Creates a {@link OldDoFn.ProcessContext} from a {@link DoFn.ProcessContext}. */ - public static OldDoFn.Context adaptContext( - OldDoFn fn, - final DoFn.Context c) { - return fn.new Context() { - @Override - public PipelineOptions getPipelineOptions() { - return c.getPipelineOptions(); - } - - @Override - public void output(OutputT output) { - c.output(output); - } - - @Override - public void outputWithTimestamp(OutputT output, Instant timestamp) { - c.outputWithTimestamp(output, timestamp); - } - - @Override - public void sideOutput(TupleTag tag, T output) { - c.sideOutput(tag, output); - } - - @Override - public void sideOutputWithTimestamp(TupleTag tag, T output, Instant timestamp) { - c.sideOutputWithTimestamp(tag, output, timestamp); - } - - @Override - protected Aggregator createAggregatorInternal( - String name, CombineFn combiner) { - return c.createAggregator(name, combiner); - } - }; - } - - /** - * If the fn was created using {@link #toOldDoFn}, returns the original {@link DoFn}. Otherwise, - * returns {@code null}. - */ - @Nullable - public static DoFn getDoFn(OldDoFn fn) { - if (fn instanceof SimpleDoFnAdapter) { - return ((SimpleDoFnAdapter) fn).fn; - } else { - return null; - } - } - /** * Wraps a {@link DoFn} that doesn't require access to {@link BoundedWindow} as an {@link * OldDoFn}. @@ -237,21 +102,6 @@ public void processElement(ProcessContext c) throws Exception { invoker.invokeProcessElement(adapter); } - @Override - protected TypeDescriptor getInputTypeDescriptor() { - return fn.getInputTypeDescriptor(); - } - - @Override - protected TypeDescriptor getOutputTypeDescriptor() { - return fn.getOutputTypeDescriptor(); - } - - @Override - Collection> getAggregators() { - return fn.getAggregators(); - } - @Override public Duration getAllowedTimestampSkew() { return fn.getAllowedTimestampSkew(); diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/OldDoFn.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/OldDoFn.java index 2d2c1fde737b..0aef552f1c57 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/OldDoFn.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/OldDoFn.java @@ -38,7 +38,6 @@ import org.apache.beam.sdk.util.WindowingInternals; import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.TupleTag; -import org.apache.beam.sdk.values.TypeDescriptor; import org.joda.time.Duration; import org.joda.time.Instant; @@ -71,21 +70,6 @@ */ @Deprecated public abstract class OldDoFn implements Serializable, HasDisplayData { - - public DoFn toDoFn() { - DoFn doFn = DoFnAdapters.getDoFn(this); - if (doFn != null) { - return doFn; - } - if (this instanceof RequiresWindowAccess) { - // No parameters as it just accesses `this` - return new AdaptedRequiresWindowAccessDoFn(); - } else { - // No parameters as it just accesses `this` - return new AdaptedDoFn(); - } - } - /** * Information accessible to all methods in this {@code OldDoFn}. * Used primarily to output elements. @@ -334,7 +318,7 @@ public OldDoFn() { this(new HashMap>()); } - OldDoFn(Map> aggregators) { + public OldDoFn(Map> aggregators) { this.aggregators = aggregators; } @@ -418,32 +402,6 @@ public void populateDisplayData(DisplayData.Builder builder) { ///////////////////////////////////////////////////////////////////////////// - /** - * Returns a {@link TypeDescriptor} capturing what is known statically - * about the input type of this {@code OldDoFn} instance's most-derived - * class. - * - *

See {@link #getOutputTypeDescriptor} for more discussion. - */ - protected TypeDescriptor getInputTypeDescriptor() { - return new TypeDescriptor(getClass()) {}; - } - - /** - * Returns a {@link TypeDescriptor} capturing what is known statically - * about the output type of this {@code OldDoFn} instance's - * most-derived class. - * - *

In the normal case of a concrete {@code OldDoFn} subclass with - * no generic type parameters of its own (including anonymous inner - * classes), this will be a complete non-generic type, which is good - * for choosing a default output {@code Coder} for the output - * {@code PCollection}. - */ - protected TypeDescriptor getOutputTypeDescriptor() { - return new TypeDescriptor(getClass()) {}; - } - /** * Returns an {@link Aggregator} with aggregation logic specified by the * {@link CombineFn} argument. The name provided must be unique across @@ -504,255 +462,4 @@ protected final Aggregator createAggregator(St Collection> getAggregators() { return Collections.>unmodifiableCollection(aggregators.values()); } - - /** - * A {@link Context} for an {@link OldDoFn} via a context for a proper {@link DoFn}. - */ - private class AdaptedContext extends Context { - - private final DoFn.Context newContext; - - public AdaptedContext( - DoFn.Context newContext) { - this.newContext = newContext; - super.setupDelegateAggregators(); - } - - @Override - public PipelineOptions getPipelineOptions() { - return newContext.getPipelineOptions(); - } - - @Override - public void output(OutputT output) { - newContext.output(output); - } - - @Override - public void outputWithTimestamp(OutputT output, Instant timestamp) { - newContext.outputWithTimestamp(output, timestamp); - } - - @Override - public void sideOutput(TupleTag tag, T output) { - newContext.sideOutput(tag, output); - } - - @Override - public void sideOutputWithTimestamp(TupleTag tag, T output, Instant timestamp) { - newContext.sideOutputWithTimestamp(tag, output, timestamp); - } - - @Override - protected Aggregator createAggregatorInternal( - String name, CombineFn combiner) { - return newContext.createAggregator(name, combiner); - } - } - - /** - * A {@link ProcessContext} for an {@link OldDoFn} via a context for a proper {@link DoFn}. - */ - private class AdaptedProcessContext extends ProcessContext { - - private final DoFn.ProcessContext newContext; - - public AdaptedProcessContext(DoFn.ProcessContext newContext) { - this.newContext = newContext; - } - - @Override - public InputT element() { - return newContext.element(); - } - - @Override - public T sideInput(PCollectionView view) { - return newContext.sideInput(view); - } - - @Override - public Instant timestamp() { - return newContext.timestamp(); - } - - @Override - public BoundedWindow window() { - throw new UnsupportedOperationException(String.format( - "%s.%s.windowingInternals() is no longer supported. Please convert your %s to a %s", - OldDoFn.class.getSimpleName(), - OldDoFn.ProcessContext.class.getSimpleName(), - OldDoFn.class.getSimpleName(), - DoFn.class.getSimpleName())); - } - - @Override - public PaneInfo pane() { - return newContext.pane(); - } - - @Override - public WindowingInternals windowingInternals() { - throw new UnsupportedOperationException(String.format( - "%s.%s.windowingInternals() is no longer supported. Please convert your %s to a %s", - OldDoFn.class.getSimpleName(), - OldDoFn.ProcessContext.class.getSimpleName(), - OldDoFn.class.getSimpleName(), - DoFn.class.getSimpleName())); - } - - @Override - public PipelineOptions getPipelineOptions() { - return newContext.getPipelineOptions(); - } - - @Override - public void output(OutputT output) { - newContext.output(output); - } - - @Override - public void outputWithTimestamp(OutputT output, Instant timestamp) { - newContext.outputWithTimestamp(output, timestamp); - } - - @Override - public void sideOutput(TupleTag tag, T output) { - newContext.sideOutput(tag, output); - } - - @Override - public void sideOutputWithTimestamp(TupleTag tag, T output, Instant timestamp) { - newContext.sideOutputWithTimestamp(tag, output, timestamp); - } - - @Override - protected Aggregator createAggregatorInternal( - String name, CombineFn combiner) { - return newContext.createAggregator(name, combiner); - } - } - - private class AdaptedDoFn extends DoFn { - - @Setup - public void setup() throws Exception { - OldDoFn.this.setup(); - } - - @StartBundle - public void startBundle(Context c) throws Exception { - OldDoFn.this.startBundle(OldDoFn.this.new AdaptedContext(c)); - } - - @ProcessElement - public void processElement(ProcessContext c) throws Exception { - OldDoFn.this.processElement(OldDoFn.this.new AdaptedProcessContext(c)); - } - - @FinishBundle - public void finishBundle(Context c) throws Exception { - OldDoFn.this.finishBundle(OldDoFn.this.new AdaptedContext(c)); - } - - @Teardown - public void teardown() throws Exception { - OldDoFn.this.teardown(); - } - - @Override - public Duration getAllowedTimestampSkew() { - return OldDoFn.this.getAllowedTimestampSkew(); - } - - @Override - public void populateDisplayData(DisplayData.Builder builder) { - OldDoFn.this.populateDisplayData(builder); - } - - @Override - public TypeDescriptor getInputTypeDescriptor() { - return OldDoFn.this.getInputTypeDescriptor(); - } - - @Override - Collection> getAggregators() { - return OldDoFn.this.getAggregators(); - } - - @Override - public TypeDescriptor getOutputTypeDescriptor() { - return OldDoFn.this.getOutputTypeDescriptor(); - } - } - - /** - * A {@link ProcessContext} for an {@link OldDoFn} that implements - * {@link RequiresWindowAccess}, via a context for a proper {@link DoFn}. - */ - private class AdaptedRequiresWindowAccessProcessContext extends AdaptedProcessContext { - - private final BoundedWindow window; - - public AdaptedRequiresWindowAccessProcessContext( - DoFn.ProcessContext newContext, - BoundedWindow window) { - super(newContext); - this.window = window; - } - - @Override - public BoundedWindow window() { - return window; - } - } - - private class AdaptedRequiresWindowAccessDoFn extends DoFn { - - @Setup - public void setup() throws Exception { - OldDoFn.this.setup(); - } - - @StartBundle - public void startBundle(Context c) throws Exception { - OldDoFn.this.startBundle(OldDoFn.this.new AdaptedContext(c)); - } - - @ProcessElement - public void processElement(ProcessContext c, BoundedWindow window) throws Exception { - OldDoFn.this.processElement( - OldDoFn.this.new AdaptedRequiresWindowAccessProcessContext(c, window)); - } - - @FinishBundle - public void finishBundle(Context c) throws Exception { - OldDoFn.this.finishBundle(OldDoFn.this.new AdaptedContext(c)); - } - - @Teardown - public void teardown() throws Exception { - OldDoFn.this.teardown(); - } - - @Override - public Duration getAllowedTimestampSkew() { - return OldDoFn.this.getAllowedTimestampSkew(); - } - - @Override - public void populateDisplayData(DisplayData.Builder builder) { - OldDoFn.this.populateDisplayData(builder); - } - - @Override - public TypeDescriptor getInputTypeDescriptor() { - return OldDoFn.this.getInputTypeDescriptor(); - } - - @Override - public TypeDescriptor getOutputTypeDescriptor() { - return OldDoFn.this.getOutputTypeDescriptor(); - } - } } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokers.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokers.java index 50a7082cdad5..b141d51150ba 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokers.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokers.java @@ -18,13 +18,7 @@ package org.apache.beam.sdk.transforms.reflect; import java.io.Serializable; -import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.coders.CoderRegistry; import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.DoFnAdapters; -import org.apache.beam.sdk.transforms.OldDoFn; -import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker; -import org.apache.beam.sdk.util.UserCodeException; /** Static utilities for working with {@link DoFnInvoker}. */ public class DoFnInvokers { @@ -42,137 +36,22 @@ public static DoFnInvoker invokerFor( return ByteBuddyDoFnInvokerFactory.only().newByteBuddyInvoker(fn); } - private DoFnInvokers() {} - /** - * Returns a {@link DoFnInvoker} for the given {@link Object}, which should be either a {@link - * DoFn} or an {@link OldDoFn}. The expected use would be to deserialize a user's function as an - * {@link Object} and then pass it to this method, so there is no need to statically specify what - * sort of object it is. + * Temporarily retained for compatibility with Dataflow worker. + * TODO: delete this when Dataflow worker is fixed to call {@link #invokerFor(DoFn)}. * - * @deprecated this is to be used only as a migration path for decoupling upgrades + * @deprecated Use {@link #invokerFor(DoFn)}. */ + @SuppressWarnings("unchecked") @Deprecated - public static DoFnInvoker invokerFor(Serializable deserializedFn) { + public static DoFnInvoker invokerFor( + Serializable deserializedFn) { if (deserializedFn instanceof DoFn) { - return invokerFor((DoFn) deserializedFn); - } else if (deserializedFn instanceof OldDoFn) { - return new OldDoFnInvoker<>((OldDoFn) deserializedFn); - } else { - throw new IllegalArgumentException( - String.format( - "Cannot create a %s for %s; it should be either a %s or an %s.", - DoFnInvoker.class.getSimpleName(), - deserializedFn.toString(), - DoFn.class.getSimpleName(), - OldDoFn.class.getSimpleName())); + return invokerFor((DoFn) deserializedFn); } + throw new UnsupportedOperationException( + "Only DoFn supported, was: " + deserializedFn.getClass()); } - /** @deprecated use {@link DoFnInvokers#invokerFor(DoFn)}. */ - @Deprecated public static final DoFnInvokers INSTANCE = new DoFnInvokers(); - - /** @deprecated use {@link DoFnInvokers#invokerFor(DoFn)}. */ - @Deprecated - public DoFnInvoker invokerFor(Object deserializedFn) { - return (DoFnInvoker) DoFnInvokers.invokerFor((Serializable) deserializedFn); - } - - - static class OldDoFnInvoker implements DoFnInvoker { - - private final OldDoFn fn; - - public OldDoFnInvoker(OldDoFn fn) { - this.fn = fn; - } - - @Override - public DoFn.ProcessContinuation invokeProcessElement( - ArgumentProvider extra) { - // The outer DoFn is immaterial - it exists only to avoid typing InputT and OutputT repeatedly - DoFn.ProcessContext newCtx = - extra.processContext(new DoFn() {}); - OldDoFn.ProcessContext oldCtx = - DoFnAdapters.adaptProcessContext(fn, newCtx, extra); - try { - fn.processElement(oldCtx); - return DoFn.ProcessContinuation.stop(); - } catch (Throwable exc) { - throw UserCodeException.wrap(exc); - } - } - - @Override - public void invokeOnTimer(String timerId, ArgumentProvider arguments) { - throw new UnsupportedOperationException( - String.format("Timers are not supported for %s", OldDoFn.class.getSimpleName())); - } - - @Override - public void invokeStartBundle(DoFn.Context c) { - OldDoFn.Context oldCtx = DoFnAdapters.adaptContext(fn, c); - try { - fn.startBundle(oldCtx); - } catch (Throwable exc) { - throw UserCodeException.wrap(exc); - } - } - - @Override - public void invokeFinishBundle(DoFn.Context c) { - OldDoFn.Context oldCtx = DoFnAdapters.adaptContext(fn, c); - try { - fn.finishBundle(oldCtx); - } catch (Throwable exc) { - throw UserCodeException.wrap(exc); - } - } - - @Override - public void invokeSetup() { - try { - fn.setup(); - } catch (Throwable exc) { - throw UserCodeException.wrap(exc); - } - } - - @Override - public void invokeTeardown() { - try { - fn.teardown(); - } catch (Throwable exc) { - throw UserCodeException.wrap(exc); - } - } - - @Override - public RestrictionT invokeGetInitialRestriction(InputT element) { - throw new UnsupportedOperationException("OldDoFn is not splittable"); - } - - @Override - public Coder invokeGetRestrictionCoder( - CoderRegistry coderRegistry) { - throw new UnsupportedOperationException("OldDoFn is not splittable"); - } - - @Override - public void invokeSplitRestriction( - InputT element, RestrictionT restriction, DoFn.OutputReceiver receiver) { - throw new UnsupportedOperationException("OldDoFn is not splittable"); - } - - @Override - public > - TrackerT invokeNewTracker(RestrictionT restriction) { - throw new UnsupportedOperationException("OldDoFn is not splittable"); - } - - @Override - public DoFn getFn() { - throw new UnsupportedOperationException("getFn is not supported for OldDoFn"); - } - } + private DoFnInvokers() {} } diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java index 456a6ebff0a6..55b8e7efb244 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java @@ -25,7 +25,6 @@ import static org.junit.Assert.assertThat; import static org.mockito.Matchers.eq; import static org.mockito.Matchers.same; -import static org.mockito.Mockito.any; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; @@ -738,39 +737,4 @@ public void onMyTimer(IntervalWindow w) { invoker.invokeOnTimer(timerId, mockArgumentProvider); assertThat(fn.window, equalTo(testWindow)); } - - private class OldDoFnIdentity extends OldDoFn { - public void processElement(ProcessContext c) {} - } - - @Test - public void testOldDoFnProcessElement() throws Exception { - new DoFnInvokers.OldDoFnInvoker<>(mockOldDoFn) - .invokeProcessElement(mockArgumentProvider); - verify(mockOldDoFn).processElement(any(OldDoFn.ProcessContext.class)); - } - - @Test - public void testOldDoFnStartBundle() throws Exception { - new DoFnInvokers.OldDoFnInvoker<>(mockOldDoFn).invokeStartBundle(mockProcessContext); - verify(mockOldDoFn).startBundle(any(OldDoFn.Context.class)); - } - - @Test - public void testOldDoFnFinishBundle() throws Exception { - new DoFnInvokers.OldDoFnInvoker<>(mockOldDoFn).invokeFinishBundle(mockProcessContext); - verify(mockOldDoFn).finishBundle(any(OldDoFn.Context.class)); - } - - @Test - public void testOldDoFnSetup() throws Exception { - new DoFnInvokers.OldDoFnInvoker<>(mockOldDoFn).invokeSetup(); - verify(mockOldDoFn).setup(); - } - - @Test - public void testOldDoFnTeardown() throws Exception { - new DoFnInvokers.OldDoFnInvoker<>(mockOldDoFn).invokeTeardown(); - verify(mockOldDoFn).teardown(); - } } From 50979f7262203987ef3ec4a3fbfeeb3f4ae769e7 Mon Sep 17 00:00:00 2001 From: Eugene Kirpichov Date: Fri, 9 Dec 2016 17:23:15 -0800 Subject: [PATCH 2/6] Removes ArgumentProvider.windowingInternals --- .../beam/runners/core/SimpleDoFnRunner.java | 63 ------------------- .../beam/runners/core/SplittableParDo.java | 7 --- .../beam/sdk/transforms/DoFnAdapters.java | 14 ----- .../beam/sdk/transforms/DoFnTester.java | 7 --- .../sdk/transforms/reflect/DoFnInvoker.java | 20 ------ .../transforms/reflect/DoFnInvokersTest.java | 6 -- 6 files changed, 117 deletions(-) diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java index b42c57d45683..df5f3f611924 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java @@ -56,10 +56,8 @@ import org.apache.beam.sdk.util.TimerSpec; import org.apache.beam.sdk.util.UserCodeException; import org.apache.beam.sdk.util.WindowedValue; -import org.apache.beam.sdk.util.WindowingInternals; import org.apache.beam.sdk.util.WindowingStrategy; import org.apache.beam.sdk.util.state.State; -import org.apache.beam.sdk.util.state.StateInternals; import org.apache.beam.sdk.util.state.StateNamespace; import org.apache.beam.sdk.util.state.StateNamespaces; import org.apache.beam.sdk.util.state.StateSpec; @@ -450,11 +448,6 @@ public OutputReceiver outputReceiver() { throw new UnsupportedOperationException("OutputReceiver is for testing only."); } - @Override - public WindowingInternals windowingInternals() { - throw new UnsupportedOperationException("WindowingInternals are unsupported."); - } - @Override public RestrictionTracker restrictionTracker() { throw new UnsupportedOperationException( @@ -670,57 +663,6 @@ public Timer timer(String timerId) { throw new RuntimeException(e); } } - - @Override - public WindowingInternals windowingInternals() { - return new WindowingInternals() { - @Override - public Collection windows() { - return windowedValue.getWindows(); - } - - @Override - public PaneInfo pane() { - return windowedValue.getPane(); - } - - @Override - public TimerInternals timerInternals() { - return context.stepContext.timerInternals(); - } - - @Override - public StateInternals stateInternals() { - return stepContext.stateInternals(); - } - - @Override - public void outputWindowedValue( - OutputT output, - Instant timestamp, - Collection windows, - PaneInfo pane) { - throw new UnsupportedOperationException("A DoFn cannot output to a different window"); - } - - @Override - public void sideOutputWindowedValue( - TupleTag tag, - SideOutputT output, - Instant timestamp, - Collection windows, - PaneInfo pane) { - throw new UnsupportedOperationException( - "A DoFn cannot side output to a different window"); - } - - @Override - public T sideInput(PCollectionView view, BoundedWindow sideInputWindow) { - return context.sideInput(view, sideInputWindow); - } - }; - } - } /** @@ -871,11 +813,6 @@ protected Aggregator createAggreg CombineFn combiner) { throw new UnsupportedOperationException("Cannot createAggregator in @OnTimer method"); } - - @Override - public WindowingInternals windowingInternals() { - throw new UnsupportedOperationException("WindowingInternals are unsupported."); - } } private static class TimerInternalsTimer implements Timer { diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java index e6a2466b2f17..f8d12ecb81f1 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java @@ -53,7 +53,6 @@ import org.apache.beam.sdk.util.Timer; import org.apache.beam.sdk.util.TimerInternals; import org.apache.beam.sdk.util.WindowedValue; -import org.apache.beam.sdk.util.WindowingInternals; import org.apache.beam.sdk.util.WindowingStrategy; import org.apache.beam.sdk.util.state.State; import org.apache.beam.sdk.util.state.StateInternals; @@ -684,12 +683,6 @@ public DoFn.OutputReceiver outputReceiver() { throw new IllegalStateException("Unexpected extra context access on a splittable DoFn"); } - @Override - public WindowingInternals windowingInternals() { - // DoFnSignatures should have verified that this DoFn doesn't access extra context. - throw new IllegalStateException("Unexpected extra context access on a splittable DoFn"); - } - @Override public TrackerT restrictionTracker() { return tracker; diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnAdapters.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnAdapters.java index d1c40a69f54f..0a71faab2815 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnAdapters.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnAdapters.java @@ -32,7 +32,6 @@ import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.util.Timer; -import org.apache.beam.sdk.util.WindowingInternals; import org.apache.beam.sdk.util.state.State; import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.TupleTag; @@ -201,14 +200,6 @@ public OnTimerContext onTimerContext(DoFn doFn) { "Timers are not supported for OldDoFn"); } - @Override - public WindowingInternals windowingInternals() { - // The OldDoFn doesn't allow us to ask for these outside ProcessElements, so this - // should be unreachable. - throw new UnsupportedOperationException( - "Can only get WindowingInternals in processElement"); - } - @Override public DoFn.InputProvider inputProvider() { throw new UnsupportedOperationException("inputProvider() exists only for testing"); @@ -321,11 +312,6 @@ public OnTimerContext onTimerContext(DoFn doFn) { throw new UnsupportedOperationException("Timers are not supported for OldDoFn"); } - @Override - public WindowingInternals windowingInternals() { - return context.windowingInternals(); - } - @Override public DoFn.InputProvider inputProvider() { throw new UnsupportedOperationException("inputProvider() exists only for testing"); diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java index 2d8684aaaff7..b2c3fd5ed2e9 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java @@ -48,7 +48,6 @@ import org.apache.beam.sdk.util.Timer; import org.apache.beam.sdk.util.UserCodeException; import org.apache.beam.sdk.util.WindowedValue; -import org.apache.beam.sdk.util.WindowingInternals; import org.apache.beam.sdk.util.state.InMemoryStateInternals; import org.apache.beam.sdk.util.state.StateInternals; import org.apache.beam.sdk.values.PCollectionView; @@ -327,12 +326,6 @@ public DoFn.OutputReceiver outputReceiver() { "Not expected to access OutputReceiver from DoFnTester"); } - @Override - public WindowingInternals windowingInternals() { - throw new UnsupportedOperationException( - "Not expected to access WindowingInternals from a new DoFn"); - } - @Override public RestrictionTracker restrictionTracker() { throw new UnsupportedOperationException( diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvoker.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvoker.java index 97ac9d3d53c7..354578e2a831 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvoker.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvoker.java @@ -27,11 +27,9 @@ import org.apache.beam.sdk.transforms.DoFn.StartBundle; import org.apache.beam.sdk.transforms.DoFn.StateId; import org.apache.beam.sdk.transforms.DoFn.TimerId; -import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.util.Timer; -import org.apache.beam.sdk.util.WindowingInternals; import org.apache.beam.sdk.util.state.State; /** @@ -121,19 +119,6 @@ interface ArgumentProvider { /** A placeholder for testing purposes. */ OutputReceiver outputReceiver(); - /** - * For migration from {@link OldDoFn} to {@link DoFn}, provide a {@link WindowingInternals} so - * an {@link OldDoFn} can be run via {@link DoFnInvoker}. - * - *

This is not exposed via the reflective capabilities of {@link DoFn}. - * - * @deprecated Please port occurences of {@link OldDoFn} to {@link DoFn}. If they require state - * and timers, they will need to wait for the arrival of those features. Do not introduce - * new uses of this method. - */ - @Deprecated - WindowingInternals windowingInternals(); - /** * If this is a splittable {@link DoFn}, returns the {@link RestrictionTracker} associated with * the current {@link ProcessElement} call. @@ -179,11 +164,6 @@ public OutputReceiver outputReceiver() { return null; } - @Override - public WindowingInternals windowingInternals() { - return null; - } - @Override public State state(String stateId) { return null; diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java index 55b8e7efb244..4c6bee13f79d 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java @@ -40,7 +40,6 @@ import org.apache.beam.sdk.coders.VarIntCoder; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.DoFn.ProcessContinuation; -import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.reflect.DoFnInvoker.FakeArgumentProvider; import org.apache.beam.sdk.transforms.reflect.testhelper.DoFnInvokersTestHelper; import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker; @@ -51,7 +50,6 @@ import org.apache.beam.sdk.util.TimerSpec; import org.apache.beam.sdk.util.TimerSpecs; import org.apache.beam.sdk.util.UserCodeException; -import org.apache.beam.sdk.util.WindowingInternals; import org.apache.beam.sdk.util.state.StateSpec; import org.apache.beam.sdk.util.state.StateSpecs; import org.apache.beam.sdk.util.state.ValueState; @@ -77,18 +75,14 @@ public class DoFnInvokersTest { @Mock private IntervalWindow mockWindow; @Mock private DoFn.InputProvider mockInputProvider; @Mock private DoFn.OutputReceiver mockOutputReceiver; - @Mock private WindowingInternals mockWindowingInternals; @Mock private DoFnInvoker.ArgumentProvider mockArgumentProvider; - @Mock private OldDoFn mockOldDoFn; - @Before public void setUp() { MockitoAnnotations.initMocks(this); when(mockArgumentProvider.window()).thenReturn(mockWindow); when(mockArgumentProvider.inputProvider()).thenReturn(mockInputProvider); when(mockArgumentProvider.outputReceiver()).thenReturn(mockOutputReceiver); - when(mockArgumentProvider.windowingInternals()).thenReturn(mockWindowingInternals); when(mockArgumentProvider.processContext(Matchers.any())).thenReturn(mockProcessContext); } From 149d52b56787bf3620db6b3adbad373366074a5d Mon Sep 17 00:00:00 2001 From: Eugene Kirpichov Date: Fri, 9 Dec 2016 17:28:16 -0800 Subject: [PATCH 3/6] Moves DoFnAdapters to runners-core --- .../apex/translation/WindowBoundTranslator.java | 2 +- .../operators/ApexGroupByKeyOperator.java | 2 +- .../translation/operators/ApexParDoOperator.java | 2 +- .../org/apache/beam/runners/core}/DoFnAdapters.java | 8 ++++++-- .../beam/runners/core/SimpleOldDoFnRunner.java | 4 ++-- .../runners/core/GroupAlsoByWindowsProperties.java | 2 +- .../translation/functions/FlinkDoFnFunction.java | 2 +- .../functions/FlinkMultiOutputDoFnFunction.java | 2 +- .../functions/FlinkProcessContextBase.java | 2 +- .../wrappers/streaming/DoFnOperator.java | 2 +- .../beam/sdk/transforms/AggregatorRetriever.java | 13 +++++++++++-- .../org/apache/beam/sdk/transforms/OldDoFn.java | 2 +- .../org/apache/beam/sdk/transforms/NoOpOldDoFn.java | 2 +- 13 files changed, 29 insertions(+), 16 deletions(-) rename {sdks/java/core/src/main/java/org/apache/beam/sdk/transforms => runners/core-java/src/main/java/org/apache/beam/runners/core}/DoFnAdapters.java (97%) diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/WindowBoundTranslator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/WindowBoundTranslator.java index 33b9269f9c43..ef049e19ce88 100644 --- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/WindowBoundTranslator.java +++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/WindowBoundTranslator.java @@ -22,8 +22,8 @@ import org.apache.beam.runners.apex.ApexPipelineOptions; import org.apache.beam.runners.apex.translation.operators.ApexParDoOperator; import org.apache.beam.runners.core.AssignWindowsDoFn; +import org.apache.beam.runners.core.DoFnAdapters; import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.DoFnAdapters; import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.Window; diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java index 49ec1c818c6e..173434f66c33 100644 --- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java +++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java @@ -413,7 +413,7 @@ public void sideOutputWithTimestamp(TupleTag tag, T output, Instant times } @Override - protected Aggregator createAggregatorInternal( + public Aggregator createAggregatorInternal( String name, Combine.CombineFn combiner) { throw new UnsupportedOperationException(); } diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java index c41cd454a66d..1a3387cb76a9 100644 --- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java +++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java @@ -38,6 +38,7 @@ import org.apache.beam.runners.apex.translation.utils.SerializablePipelineOptions; import org.apache.beam.runners.apex.translation.utils.ValueAndCoderKryoSerializable; import org.apache.beam.runners.core.AggregatorFactory; +import org.apache.beam.runners.core.DoFnAdapters; import org.apache.beam.runners.core.DoFnRunner; import org.apache.beam.runners.core.DoFnRunners; import org.apache.beam.runners.core.DoFnRunners.OutputManager; @@ -49,7 +50,6 @@ import org.apache.beam.sdk.transforms.Aggregator; import org.apache.beam.sdk.transforms.Combine.CombineFn; import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.DoFnAdapters; import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.util.NullSideInputReader; import org.apache.beam.sdk.util.SideInputReader; diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnAdapters.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnAdapters.java similarity index 97% rename from sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnAdapters.java rename to runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnAdapters.java index 0a71faab2815..0f5624f56166 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnAdapters.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnAdapters.java @@ -15,14 +15,18 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.beam.sdk.transforms; +package org.apache.beam.runners.core; import java.io.IOException; import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.transforms.Aggregator; +import org.apache.beam.sdk.transforms.AggregatorRetriever; import org.apache.beam.sdk.transforms.Combine.CombineFn; +import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.DoFn.Context; import org.apache.beam.sdk.transforms.DoFn.OnTimerContext; import org.apache.beam.sdk.transforms.DoFn.ProcessContext; +import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.transforms.reflect.DoFnInvoker; import org.apache.beam.sdk.transforms.reflect.DoFnInvokers; @@ -69,7 +73,7 @@ private static class SimpleDoFnAdapter extends OldDoFn invoker; SimpleDoFnAdapter(DoFn fn) { - super(fn.aggregators); + super(AggregatorRetriever.getDelegatingAggregators(fn)); this.fn = fn; this.invoker = DoFnInvokers.invokerFor(fn); } diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleOldDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleOldDoFnRunner.java index 1ff02122119d..9808e5683069 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleOldDoFnRunner.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleOldDoFnRunner.java @@ -330,7 +330,7 @@ public void sideOutputWithTimestamp(TupleTag tag, T output, Instant times } @Override - protected Aggregator createAggregatorInternal( + public Aggregator createAggregatorInternal( String name, CombineFn combiner) { checkNotNull(combiner, "Combiner passed to createAggregatorInternal cannot be null"); return aggregatorFactory.createAggregatorForDoFn(fn.getClass(), stepContext, name, combiner); @@ -512,7 +512,7 @@ public T sideInput(PCollectionView view, BoundedWindow sideInputWindow) { } @Override - protected Aggregator + public Aggregator createAggregatorInternal( String name, CombineFn combiner) { return context.createAggregatorInternal(name, combiner); diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/GroupAlsoByWindowsProperties.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/GroupAlsoByWindowsProperties.java index 97b67c653c9c..ef01106fdf77 100644 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/GroupAlsoByWindowsProperties.java +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/GroupAlsoByWindowsProperties.java @@ -744,7 +744,7 @@ public void sideOutputWithTimestamp(TupleTag tag, T output, Instant times } @Override - protected Aggregator createAggregatorInternal( + public Aggregator createAggregatorInternal( String name, CombineFn combiner) { throw new UnsupportedOperationException(); } diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java index ed200d58aa57..2a4a68e7b5bb 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java @@ -18,10 +18,10 @@ package org.apache.beam.runners.flink.translation.functions; import java.util.Map; +import org.apache.beam.runners.core.DoFnAdapters; import org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.DoFnAdapters; import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.reflect.DoFnSignatures; import org.apache.beam.sdk.util.WindowedValue; diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMultiOutputDoFnFunction.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMultiOutputDoFnFunction.java index 7f6a4369ad85..a97bd46fc77b 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMultiOutputDoFnFunction.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMultiOutputDoFnFunction.java @@ -18,10 +18,10 @@ package org.apache.beam.runners.flink.translation.functions; import java.util.Map; +import org.apache.beam.runners.core.DoFnAdapters; import org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.DoFnAdapters; import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.join.RawUnionValue; import org.apache.beam.sdk.transforms.reflect.DoFnSignatures; diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkProcessContextBase.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkProcessContextBase.java index 6afca38dcce4..53b98038adb9 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkProcessContextBase.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkProcessContextBase.java @@ -252,7 +252,7 @@ protected abstract void outputWithTimestampAndWindow( public abstract void sideOutputWithTimestamp(TupleTag tag, T output, Instant timestamp); @Override - protected Aggregator + public Aggregator createAggregatorInternal(String name, Combine.CombineFn combiner) { @SuppressWarnings("unchecked") SerializableFnAggregatorWrapper result = diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java index 057a3e746696..95f2bfdfc696 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java @@ -29,6 +29,7 @@ import java.util.List; import java.util.Map; import org.apache.beam.runners.core.AggregatorFactory; +import org.apache.beam.runners.core.DoFnAdapters; import org.apache.beam.runners.core.DoFnRunner; import org.apache.beam.runners.core.DoFnRunners; import org.apache.beam.runners.core.ExecutionContext; @@ -42,7 +43,6 @@ import org.apache.beam.sdk.transforms.Aggregator; import org.apache.beam.sdk.transforms.Combine; import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.DoFnAdapters; import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.join.RawUnionValue; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/AggregatorRetriever.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/AggregatorRetriever.java index ce47e22fda06..b1d3ead72ee2 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/AggregatorRetriever.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/AggregatorRetriever.java @@ -18,9 +18,10 @@ package org.apache.beam.sdk.transforms; import java.util.Collection; +import java.util.Map; /** - * An internal class for extracting {@link Aggregator Aggregators} from {@link OldDoFn DoFns}. + * An internal class for extracting {@link Aggregator Aggregators} from {@link DoFn DoFns}. */ public final class AggregatorRetriever { private AggregatorRetriever() { @@ -28,9 +29,17 @@ private AggregatorRetriever() { } /** - * Returns the {@link Aggregator Aggregators} created by the provided {@link OldDoFn}. + * Returns the {@link Aggregator Aggregators} created by the provided {@link DoFn}. */ public static Collection> getAggregators(DoFn fn) { return fn.getAggregators(); } + + /** + * Returns the {@link DelegatingAggregator delegating aggregators} created by the provided {@link + * DoFn}. + */ + public static Map> getDelegatingAggregators(DoFn fn) { + return fn.aggregators; + } } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/OldDoFn.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/OldDoFn.java index 0aef552f1c57..7b0453302a39 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/OldDoFn.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/OldDoFn.java @@ -192,7 +192,7 @@ public abstract void sideOutputWithTimestamp( * context */ @Experimental(Kind.AGGREGATOR) - protected abstract Aggregator + public abstract Aggregator createAggregatorInternal(String name, CombineFn combiner); /** diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/NoOpOldDoFn.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/NoOpOldDoFn.java index 504480b66cea..0db130db59dd 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/NoOpOldDoFn.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/NoOpOldDoFn.java @@ -63,7 +63,7 @@ public void sideOutputWithTimestamp(TupleTag tag, T output, Instant timestamp) { } @Override - protected Aggregator + public Aggregator createAggregatorInternal(String name, CombineFn combiner) { return null; } From 2b26ec8934725a600954ced9c4063766a582396a Mon Sep 17 00:00:00 2001 From: Eugene Kirpichov Date: Thu, 12 Jan 2017 13:10:40 -0800 Subject: [PATCH 4/6] Removes some OldDoFn code from DoFnRunners DoFnRunners.createDefault() can be replaced with simpleRunner() at the existing call sites, since it is never called with a ReduceFnExecutor at those call sites. --- .../operators/ApexParDoOperator.java | 2 +- .../apache/beam/runners/core/DoFnRunners.java | 137 +----------------- .../beam/runners/direct/ParDoEvaluator.java | 9 +- .../wrappers/streaming/DoFnOperator.java | 2 +- .../beam/runners/dataflow/util/DoFnInfo.java | 62 ++++---- .../spark/translation/DoFnFunction.java | 11 +- .../spark/translation/MultiDoFnFunction.java | 9 +- .../sdk/transforms/reflect/DoFnInvokers.java | 17 +-- 8 files changed, 55 insertions(+), 194 deletions(-) diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java index 1a3387cb76a9..de4c15d9017a 100644 --- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java +++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java @@ -305,7 +305,7 @@ public void setup(OperatorContext context) { sideOutputPortMapping.put(sideOutputTags.get(i), port); } - DoFnRunner doFnRunner = DoFnRunners.createDefault( + DoFnRunner doFnRunner = DoFnRunners.simpleRunner( pipelineOptions.get(), doFn, sideInputReader, diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java index 820bfcd3076e..2f3e93c5a4ab 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java @@ -18,9 +18,7 @@ package org.apache.beam.runners.core; import java.util.List; -import org.apache.beam.runners.core.DoFnRunner.ReduceFnExecutor; import org.apache.beam.runners.core.ExecutionContext.StepContext; -import org.apache.beam.runners.core.GroupByKeyViaGroupByKeyOnly.GroupAlsoByWindow; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.Aggregator; import org.apache.beam.sdk.transforms.DoFn; @@ -53,7 +51,7 @@ public interface OutputManager { * compressed {@link WindowedValue}. It is the responsibility of the runner to perform any key * partitioning needed, etc. */ - static DoFnRunner simpleRunner( + public static DoFnRunner simpleRunner( PipelineOptions options, DoFn fn, SideInputReader sideInputReader, @@ -119,137 +117,4 @@ DoFnRunner, KV> lateDataDroppingRunner( stepContext.timerInternals(), droppedDueToLatenessAggregator); } - - /** - * Creates a {@link DoFnRunner} for the provided {@link DoFn}. - */ - public static DoFnRunner createDefault( - PipelineOptions options, - DoFn doFn, - SideInputReader sideInputReader, - OutputManager outputManager, - TupleTag mainOutputTag, - List> sideOutputTags, - StepContext stepContext, - AggregatorFactory aggregatorFactory, - WindowingStrategy windowingStrategy) { - - // Unlike for OldDoFn, there is no ReduceFnExecutor that is a new DoFn, - // and window-exploded processing is achieved within the simple runner - return simpleRunner( - options, - doFn, - sideInputReader, - outputManager, - mainOutputTag, - sideOutputTags, - stepContext, - aggregatorFactory, - windowingStrategy); - } - - /** - * Creates a {@link DoFnRunner} for the provided {@link OldDoFn}. - * - *

In particular, if the {@link OldDoFn} is a {@link ReduceFnExecutor}, a specialized - * implementation detail of streaming {@link GroupAlsoByWindow}, then it will create a special - * runner that operates on {@link KeyedWorkItem KeyedWorkItems}, drops late data and counts - * dropped elements. - * - * @deprecated please port uses of {@link OldDoFn} to use {@link DoFn} - */ - @Deprecated - public static DoFnRunner createDefault( - PipelineOptions options, - OldDoFn doFn, - SideInputReader sideInputReader, - OutputManager outputManager, - TupleTag mainOutputTag, - List> sideOutputTags, - StepContext stepContext, - AggregatorFactory aggregatorFactory, - WindowingStrategy windowingStrategy) { - - DoFnRunner doFnRunner = simpleRunner( - options, - doFn, - sideInputReader, - outputManager, - mainOutputTag, - sideOutputTags, - stepContext, - aggregatorFactory, - windowingStrategy); - - if (!(doFn instanceof ReduceFnExecutor)) { - return doFnRunner; - } else { - // When a DoFn is a ReduceFnExecutor, we know it has to have an aggregator for dropped - // elements and we also learn that for some K and V, - // InputT = KeyedWorkItem - // OutputT = KV - - Aggregator droppedDueToLatenessAggregator = - ((ReduceFnExecutor) doFn).getDroppedDueToLatenessAggregator(); - - @SuppressWarnings({"unchecked", "cast", "rawtypes"}) - DoFnRunner runner = (DoFnRunner) lateDataDroppingRunner( - (DoFnRunner) doFnRunner, - stepContext, - (WindowingStrategy) windowingStrategy, - droppedDueToLatenessAggregator); - - return runner; - } - } - - /** - * Creates the right kind of {@link DoFnRunner} for an object that can be either a {@link DoFn} or - * {@link OldDoFn}. This can be used so that the client need not explicitly reference either such - * class, but merely deserialize a payload and pass it to this method. - * - * @deprecated for migration purposes only for services where users may still submit either {@link - * OldDoFn} or {@link DoFn}. If you know that you have a {@link DoFn} then you should use the - * variant for that instead. - */ - @Deprecated - public static DoFnRunner createDefault( - PipelineOptions options, - Object deserializedFn, - SideInputReader sideInputReader, - OutputManager outputManager, - TupleTag mainOutputTag, - List> sideOutputTags, - StepContext stepContext, - AggregatorFactory aggregatorFactory, - WindowingStrategy windowingStrategy) { - if (deserializedFn instanceof DoFn) { - return createDefault( - options, - (DoFn) deserializedFn, - sideInputReader, - outputManager, - mainOutputTag, - sideOutputTags, - stepContext, - aggregatorFactory, - windowingStrategy); - } else if (deserializedFn instanceof OldDoFn) { - return createDefault( - options, - (OldDoFn) deserializedFn, - sideInputReader, - outputManager, - mainOutputTag, - sideOutputTags, - stepContext, - aggregatorFactory, - windowingStrategy); - } else { - throw new IllegalArgumentException(String.format("Cannot create %s for %s of class %s", - DoFnRunner.class.getSimpleName(), - deserializedFn, - deserializedFn.getClass())); - } - } } diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java index e1464707a05f..97d53602ac39 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java @@ -18,7 +18,6 @@ package org.apache.beam.runners.direct; import com.google.common.collect.ImmutableList; -import java.io.Serializable; import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -30,6 +29,7 @@ import org.apache.beam.runners.direct.DirectExecutionContext.DirectStepContext; import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle; import org.apache.beam.sdk.transforms.AppliedPTransform; +import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.util.ReadyCheckingSideInputReader; import org.apache.beam.sdk.util.TimerInternals.TimerData; @@ -47,7 +47,7 @@ public static ParDoEvaluator create( DirectStepContext stepContext, AppliedPTransform application, WindowingStrategy windowingStrategy, - Serializable fn, // may be OldDoFn or DoFn + DoFn fn, StructuralKey key, List> sideInputs, TupleTag mainOutputTag, @@ -72,8 +72,11 @@ public static ParDoEvaluator create( ReadyCheckingSideInputReader sideInputReader = evaluationContext.createSideInputReader(sideInputs); + + // Unlike for OldDoFn, there is no ReduceFnExecutor that is a new DoFn, + // and window-exploded processing is achieved within the simple runner DoFnRunner underlying = - DoFnRunners.createDefault( + DoFnRunners.simpleRunner( evaluationContext.getPipelineOptions(), fn, sideInputReader, diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java index 95f2bfdfc696..90cdf4c25786 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java @@ -244,7 +244,7 @@ public Aggregator createAggregatorFor sideInputReader = sideInputHandler; } - DoFnRunner doFnRunner = DoFnRunners.createDefault( + DoFnRunner doFnRunner = DoFnRunners.simpleRunner( serializedOptions.getPipelineOptions(), oldDoFn, sideInputReader, diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DoFnInfo.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DoFnInfo.java index b84def8dbcea..0c5be9065000 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DoFnInfo.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DoFnInfo.java @@ -17,8 +17,6 @@ */ package org.apache.beam.runners.dataflow.util; -import static com.google.common.base.Preconditions.checkState; - import java.io.Serializable; import java.util.Map; import org.apache.beam.sdk.coders.Coder; @@ -29,14 +27,13 @@ import org.apache.beam.sdk.values.TupleTag; /** - * Wrapper class holding the necessary information to serialize a {@link OldDoFn} - * or {@link DoFn}. + * Wrapper class holding the necessary information to serialize a {@link DoFn}. * - * @param the type of the (main) input elements of the {@link OldDoFn} - * @param the type of the (main) output elements of the {@link OldDoFn} + * @param the type of the (main) input elements of the {@link DoFn} + * @param the type of the (main) output elements of the {@link DoFn} */ public class DoFnInfo implements Serializable { - private final Serializable doFn; + private final DoFn doFn; private final WindowingStrategy windowingStrategy; private final Iterable> sideInputViews; private final Coder inputCoder; @@ -47,6 +44,20 @@ public class DoFnInfo implements Serializable { * Creates a {@link DoFnInfo} for the given {@link Serializable} object, which is expected to be a * {@link DoFn} or {@link OldDoFn} or other context-appropriate UDF blob. */ + public static DoFnInfo forFn( + DoFn doFn, + WindowingStrategy windowingStrategy, + Iterable> sideInputViews, + Coder inputCoder, + long mainOutput, + Map> outputMap) { + return new DoFnInfo<>( + doFn, windowingStrategy, sideInputViews, inputCoder, mainOutput, outputMap); + } + + /** TODO: remove this when Dataflow worker uses the DoFn overload. */ + @Deprecated + @SuppressWarnings("unchecked") public static DoFnInfo forFn( Serializable doFn, WindowingStrategy windowingStrategy, @@ -54,11 +65,17 @@ public static DoFnInfo forFn( Coder inputCoder, long mainOutput, Map> outputMap) { - return new DoFnInfo(doFn, windowingStrategy, sideInputViews, inputCoder, mainOutput, outputMap); + return forFn( + (DoFn) doFn, + windowingStrategy, + sideInputViews, + inputCoder, + mainOutput, + outputMap); } private DoFnInfo( - Serializable doFn, + DoFn doFn, WindowingStrategy windowingStrategy, Iterable> sideInputViews, Coder inputCoder, @@ -72,34 +89,15 @@ private DoFnInfo( this.outputMap = outputMap; } - /** - * @deprecated use {@link #forFn}. - */ + /** TODO: remove this when Dataflow worker uses {@link #getDoFn}. */ @Deprecated - public DoFnInfo( - OldDoFn doFn, - WindowingStrategy windowingStrategy, - Iterable> sideInputViews, - Coder inputCoder, - long mainOutput, - Map> outputMap) { - this((Serializable) doFn, windowingStrategy, sideInputViews, inputCoder, mainOutput, outputMap); - } - - /** Returns the embedded serialized function. It may be a {@code DoFn} or {@code OldDoFn}. */ public Serializable getFn() { return doFn; } - /** @deprecated use {@link #getFn()} */ - @Deprecated - public OldDoFn getDoFn() { - checkState( - doFn instanceof OldDoFn, - "Deprecated %s.getDoFn() called when the payload was actually a new %s", - DoFnInfo.class.getSimpleName(), - DoFn.class.getSimpleName()); - return (OldDoFn) doFn; + /** Returns the embedded function. */ + public DoFn getDoFn() { + return doFn; } public WindowingStrategy getWindowingStrategy() { diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/DoFnFunction.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/DoFnFunction.java index af8e0897bd36..bd6cfbea051e 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/DoFnFunction.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/DoFnFunction.java @@ -80,18 +80,21 @@ public Iterable> call( Iterator> iter) throws Exception { DoFnOutputManager outputManager = new DoFnOutputManager(); + + // Unlike for OldDoFn, there is no ReduceFnExecutor that is a new DoFn, + // and window-exploded processing is achieved within the simple runner DoFnRunner doFnRunner = - DoFnRunners.createDefault( + DoFnRunners.simpleRunner( runtimeContext.getPipelineOptions(), doFn, new SparkSideInputReader(sideInputs), outputManager, - new TupleTag() {}, + new TupleTag() { + }, Collections.>emptyList(), new SparkProcessContext.NoOpStepContext(), new SparkAggregators.Factory(runtimeContext, accumulator), - windowingStrategy - ); + windowingStrategy); return new SparkProcessContext<>(doFn, doFnRunner, outputManager).processPartition(iter); } diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/MultiDoFnFunction.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/MultiDoFnFunction.java index 0f9417a1fadc..cceffc8134a7 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/MultiDoFnFunction.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/MultiDoFnFunction.java @@ -38,7 +38,6 @@ import org.apache.beam.sdk.values.TupleTag; import org.apache.spark.Accumulator; import org.apache.spark.api.java.function.PairFlatMapFunction; - import scala.Tuple2; @@ -88,8 +87,11 @@ public Iterable, WindowedValue>> call( Iterator> iter) throws Exception { DoFnOutputManager outputManager = new DoFnOutputManager(); + + // Unlike for OldDoFn, there is no ReduceFnExecutor that is a new DoFn, + // and window-exploded processing is achieved within the simple runner DoFnRunner doFnRunner = - DoFnRunners.createDefault( + DoFnRunners.simpleRunner( runtimeContext.getPipelineOptions(), doFn, new SparkSideInputReader(sideInputs), @@ -98,8 +100,7 @@ public Iterable, WindowedValue>> call( Collections.>emptyList(), new SparkProcessContext.NoOpStepContext(), new SparkAggregators.Factory(runtimeContext, accumulator), - windowingStrategy - ); + windowingStrategy); return new SparkProcessContext<>(doFn, doFnRunner, outputManager).processPartition(iter); } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokers.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokers.java index b141d51150ba..33c5a6ab2d1f 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokers.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokers.java @@ -36,21 +36,12 @@ public static DoFnInvoker invokerFor( return ByteBuddyDoFnInvokerFactory.only().newByteBuddyInvoker(fn); } - /** - * Temporarily retained for compatibility with Dataflow worker. - * TODO: delete this when Dataflow worker is fixed to call {@link #invokerFor(DoFn)}. - * - * @deprecated Use {@link #invokerFor(DoFn)}. - */ - @SuppressWarnings("unchecked") + /** TODO: remove this when Dataflow worker uses the DoFn overload. */ @Deprecated + @SuppressWarnings({"unchecked"}) public static DoFnInvoker invokerFor( - Serializable deserializedFn) { - if (deserializedFn instanceof DoFn) { - return invokerFor((DoFn) deserializedFn); - } - throw new UnsupportedOperationException( - "Only DoFn supported, was: " + deserializedFn.getClass()); + Serializable fn) { + return invokerFor((DoFn) fn); } private DoFnInvokers() {} From e382c40187754ad4f3c20565675cb3f131528070 Mon Sep 17 00:00:00 2001 From: Eugene Kirpichov Date: Thu, 12 Jan 2017 13:17:11 -0800 Subject: [PATCH 5/6] Moves PerKeyCombineFnRunners to Flink runner Flink is its only user. This removes the only remaining mentions of OldDoFn in the SDK that are not OldDoFn itself. --- .../runners/core/PerKeyCombineFnRunner.java | 25 ------- .../flink}/PerKeyCombineFnRunners.java | 69 +++++++------------ .../FlinkMergingNonShuffleReduceFunction.java | 2 +- .../FlinkMergingPartialReduceFunction.java | 2 +- .../functions/FlinkMergingReduceFunction.java | 2 +- .../functions/FlinkPartialReduceFunction.java | 2 +- .../functions/FlinkReduceFunction.java | 2 +- .../beam/sdk/util/CombineContextFactory.java | 18 ----- 8 files changed, 28 insertions(+), 94 deletions(-) rename runners/{core-java/src/main/java/org/apache/beam/runners/core => flink/runner/src/main/java/org/apache/beam/runners/flink}/PerKeyCombineFnRunners.java (82%) diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/PerKeyCombineFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/PerKeyCombineFnRunner.java index a927ecd31312..4550273d05e2 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/PerKeyCombineFnRunner.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/PerKeyCombineFnRunner.java @@ -75,31 +75,6 @@ AccumT mergeAccumulators( */ OutputT extractOutput(K key, AccumT accumulator, OldDoFn.ProcessContext c); - /** - * Forwards the call to a {@link PerKeyCombineFn} to compact the accumulator in a {@link OldDoFn}. - * - *

It constructs a {@code CombineWithContext.Context} from {@code OldDoFn.ProcessContext} - * if it is required. - */ - AccumT compact(K key, AccumT accumulator, OldDoFn.ProcessContext c); - - /** - * Forwards the call to a {@link PerKeyCombineFn} to combine the inputs and extract output - * in a {@link OldDoFn}. - * - *

It constructs a {@code CombineWithContext.Context} from {@code OldDoFn.ProcessContext} - * if it is required. - */ - OutputT apply(K key, Iterable inputs, OldDoFn.ProcessContext c); - - /** - * Forwards the call to a {@link PerKeyCombineFn} to add all inputs in a {@link OldDoFn}. - * - *

It constructs a {@code CombineWithContext.Context} from {@code OldDoFn.ProcessContext} - * if it is required. - */ - AccumT addInputs(K key, Iterable inputs, OldDoFn.ProcessContext c); - ///////////////////////////////////////////////////////////////////////////// /** diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/PerKeyCombineFnRunners.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/PerKeyCombineFnRunners.java similarity index 82% rename from runners/core-java/src/main/java/org/apache/beam/runners/core/PerKeyCombineFnRunners.java rename to runners/flink/runner/src/main/java/org/apache/beam/runners/flink/PerKeyCombineFnRunners.java index 34d711bc241d..f67257860106 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/PerKeyCombineFnRunners.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/PerKeyCombineFnRunners.java @@ -15,10 +15,11 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.beam.runners.core; +package org.apache.beam.runners.flink; import com.google.common.collect.Iterables; import java.util.Collection; +import org.apache.beam.runners.core.PerKeyCombineFnRunner; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.Combine.KeyedCombineFn; import org.apache.beam.sdk.transforms.CombineFnBase.PerKeyCombineFn; @@ -28,6 +29,7 @@ import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.util.CombineContextFactory; import org.apache.beam.sdk.util.SideInputReader; +import org.apache.beam.sdk.values.PCollectionView; /** * Static utility methods that provide {@link PerKeyCombineFnRunner} implementations @@ -51,6 +53,22 @@ public class PerKeyCombineFnRunners { } } + /** Returns a {@code Combine.Context} that wraps a {@code OldDoFn.ProcessContext}. */ + private static CombineWithContext.Context createFromProcessContext( + final OldDoFn.ProcessContext c) { + return new CombineWithContext.Context() { + @Override + public PipelineOptions getPipelineOptions() { + return c.getPipelineOptions(); + } + + @Override + public T sideInput(PCollectionView view) { + return c.sideInput(view); + } + }; + } + /** * An implementation of {@link PerKeyCombineFnRunner} with {@link KeyedCombineFn}. * @@ -92,25 +110,6 @@ public OutputT extractOutput(K key, AccumT accumulator, OldDoFn.ProcessCon return keyedCombineFn.extractOutput(key, accumulator); } - @Override - public AccumT compact(K key, AccumT accumulator, OldDoFn.ProcessContext c) { - return keyedCombineFn.compact(key, accumulator); - } - - @Override - public OutputT apply(K key, Iterable inputs, OldDoFn.ProcessContext c) { - return keyedCombineFn.apply(key, inputs); - } - - @Override - public AccumT addInputs(K key, Iterable inputs, OldDoFn.ProcessContext c) { - AccumT accum = keyedCombineFn.createAccumulator(key); - for (InputT input : inputs) { - accum = keyedCombineFn.addInput(key, accum, input); - } - return accum; - } - @Override public String toString() { return keyedCombineFn.toString(); @@ -169,49 +168,27 @@ public KeyedCombineFnWithContext fn() { @Override public AccumT createAccumulator(K key, OldDoFn.ProcessContext c) { return keyedCombineFnWithContext.createAccumulator(key, - CombineContextFactory.createFromProcessContext(c)); + createFromProcessContext(c)); } @Override public AccumT addInput( K key, AccumT accumulator, InputT value, OldDoFn.ProcessContext c) { return keyedCombineFnWithContext.addInput(key, accumulator, value, - CombineContextFactory.createFromProcessContext(c)); + createFromProcessContext(c)); } @Override public AccumT mergeAccumulators( K key, Iterable accumulators, OldDoFn.ProcessContext c) { return keyedCombineFnWithContext.mergeAccumulators( - key, accumulators, CombineContextFactory.createFromProcessContext(c)); + key, accumulators, createFromProcessContext(c)); } @Override public OutputT extractOutput(K key, AccumT accumulator, OldDoFn.ProcessContext c) { return keyedCombineFnWithContext.extractOutput(key, accumulator, - CombineContextFactory.createFromProcessContext(c)); - } - - @Override - public AccumT compact(K key, AccumT accumulator, OldDoFn.ProcessContext c) { - return keyedCombineFnWithContext.compact(key, accumulator, - CombineContextFactory.createFromProcessContext(c)); - } - - @Override - public OutputT apply(K key, Iterable inputs, OldDoFn.ProcessContext c) { - return keyedCombineFnWithContext.apply(key, inputs, - CombineContextFactory.createFromProcessContext(c)); - } - - @Override - public AccumT addInputs(K key, Iterable inputs, OldDoFn.ProcessContext c) { - CombineWithContext.Context combineContext = CombineContextFactory.createFromProcessContext(c); - AccumT accum = keyedCombineFnWithContext.createAccumulator(key, combineContext); - for (InputT input : inputs) { - accum = keyedCombineFnWithContext.addInput(key, accum, input, combineContext); - } - return accum; + createFromProcessContext(c)); } @Override diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingNonShuffleReduceFunction.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingNonShuffleReduceFunction.java index 041d0e8684cf..6412e63582cb 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingNonShuffleReduceFunction.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingNonShuffleReduceFunction.java @@ -25,7 +25,7 @@ import java.util.List; import java.util.Map; import org.apache.beam.runners.core.PerKeyCombineFnRunner; -import org.apache.beam.runners.core.PerKeyCombineFnRunners; +import org.apache.beam.runners.flink.PerKeyCombineFnRunners; import org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.CombineFnBase; diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingPartialReduceFunction.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingPartialReduceFunction.java index fef7921098bf..1456eeaa7c2e 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingPartialReduceFunction.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingPartialReduceFunction.java @@ -25,7 +25,7 @@ import java.util.List; import java.util.Map; import org.apache.beam.runners.core.PerKeyCombineFnRunner; -import org.apache.beam.runners.core.PerKeyCombineFnRunners; +import org.apache.beam.runners.flink.PerKeyCombineFnRunners; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.CombineFnBase; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingReduceFunction.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingReduceFunction.java index 59163e961430..2f56facfb497 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingReduceFunction.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingReduceFunction.java @@ -27,7 +27,7 @@ import java.util.List; import java.util.Map; import org.apache.beam.runners.core.PerKeyCombineFnRunner; -import org.apache.beam.runners.core.PerKeyCombineFnRunners; +import org.apache.beam.runners.flink.PerKeyCombineFnRunners; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.CombineFnBase; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkPartialReduceFunction.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkPartialReduceFunction.java index 8b6ec3a0ea1c..627cfa66e1b7 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkPartialReduceFunction.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkPartialReduceFunction.java @@ -25,7 +25,7 @@ import java.util.Iterator; import java.util.Map; import org.apache.beam.runners.core.PerKeyCombineFnRunner; -import org.apache.beam.runners.core.PerKeyCombineFnRunners; +import org.apache.beam.runners.flink.PerKeyCombineFnRunners; import org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.CombineFnBase; diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkReduceFunction.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkReduceFunction.java index fb5c90cb85b1..de0d416bbc7e 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkReduceFunction.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkReduceFunction.java @@ -27,7 +27,7 @@ import java.util.List; import java.util.Map; import org.apache.beam.runners.core.PerKeyCombineFnRunner; -import org.apache.beam.runners.core.PerKeyCombineFnRunners; +import org.apache.beam.runners.flink.PerKeyCombineFnRunners; import org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.CombineFnBase; diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CombineContextFactory.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CombineContextFactory.java index 149d27692445..a9830579f6a2 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CombineContextFactory.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CombineContextFactory.java @@ -19,7 +19,6 @@ import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.CombineWithContext.Context; -import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.util.state.StateContext; import org.apache.beam.sdk.values.PCollectionView; @@ -48,23 +47,6 @@ public static Context nullContext() { return NULL_CONTEXT; } - /** - * Returns a {@code Combine.Context} that wraps a {@code OldDoFn.ProcessContext}. - */ - public static Context createFromProcessContext(final OldDoFn.ProcessContext c) { - return new Context() { - @Override - public PipelineOptions getPipelineOptions() { - return c.getPipelineOptions(); - } - - @Override - public T sideInput(PCollectionView view) { - return c.sideInput(view); - } - }; - } - /** * Returns a {@code Combine.Context} that wraps a {@link StateContext}. */ From b17e5b0b768f8338f98c6e6f8c90c448bf460b65 Mon Sep 17 00:00:00 2001 From: Eugene Kirpichov Date: Thu, 12 Jan 2017 19:13:33 -0800 Subject: [PATCH 6/6] Points Dataflow runner to updated worker images --- .../org/apache/beam/runners/dataflow/dataflow.properties | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/runners/google-cloud-dataflow-java/src/main/resources/org/apache/beam/runners/dataflow/dataflow.properties b/runners/google-cloud-dataflow-java/src/main/resources/org/apache/beam/runners/dataflow/dataflow.properties index 77345d2d935d..bf73f89036ed 100644 --- a/runners/google-cloud-dataflow-java/src/main/resources/org/apache/beam/runners/dataflow/dataflow.properties +++ b/runners/google-cloud-dataflow-java/src/main/resources/org/apache/beam/runners/dataflow/dataflow.properties @@ -18,6 +18,6 @@ environment.major.version=6 -worker.image.batch=dataflow.gcr.io/v1beta3/beam-java-batch:beam-master-20170106 +worker.image.batch=dataflow.gcr.io/v1beta3/beam-java-batch:beam-master-20170112 -worker.image.streaming=dataflow.gcr.io/v1beta3/beam-java-streaming:beam-master-20170106 +worker.image.streaming=dataflow.gcr.io/v1beta3/beam-java-streaming:beam-master-20170112