From a12fd8c580d3b1ea46c5be951f39046bfa0dacf3 Mon Sep 17 00:00:00 2001 From: Eugene Kirpichov Date: Fri, 16 Dec 2016 15:26:28 -0800 Subject: [PATCH 1/4] Revert "Removes code for wrapping DoFn as an OldDoFn" This reverts commit a22de15012c51e8b7e31143021f0a298e093bf51. --- .../beam/runners/core/DoFnAdapters.java | 150 +++++++++ .../apache/beam/sdk/transforms/OldDoFn.java | 295 +++++++++++++++++- .../sdk/transforms/reflect/DoFnInvokers.java | 141 ++++++++- .../transforms/reflect/DoFnInvokersTest.java | 36 +++ 4 files changed, 611 insertions(+), 11 deletions(-) diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnAdapters.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnAdapters.java index 0f5624f56166..a4002daf9028 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnAdapters.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnAdapters.java @@ -18,6 +18,8 @@ package org.apache.beam.runners.core; import java.io.IOException; +import java.util.Collection; +import javax.annotation.Nullable; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.Aggregator; import org.apache.beam.sdk.transforms.AggregatorRetriever; @@ -39,6 +41,7 @@ import org.apache.beam.sdk.util.state.State; import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.TupleTag; +import org.apache.beam.sdk.values.TypeDescriptor; import org.joda.time.Duration; import org.joda.time.Instant; @@ -53,6 +56,18 @@ public class DoFnAdapters { /** Should not be instantiated. */ private DoFnAdapters() {} + /** + * If this is an {@link OldDoFn} produced via {@link #toOldDoFn}, returns the class of the + * original {@link DoFn}, otherwise returns {@code fn.getClass()}. + */ + public static Class getDoFnClass(OldDoFn fn) { + if (fn instanceof SimpleDoFnAdapter) { + return ((SimpleDoFnAdapter) fn).fn.getClass(); + } else { + return fn.getClass(); + } + } + /** Creates an {@link OldDoFn} that delegates to the {@link DoFn}. */ @SuppressWarnings({"unchecked", "rawtypes"}) public static OldDoFn toOldDoFn(DoFn fn) { @@ -64,6 +79,126 @@ public static OldDoFn toOldDoFn(DoFn OldDoFn.ProcessContext adaptProcessContext( + OldDoFn fn, + final DoFn.ProcessContext c, + final DoFnInvoker.ArgumentProvider extra) { + return fn.new ProcessContext() { + @Override + public InputT element() { + return c.element(); + } + + @Override + public T sideInput(PCollectionView view) { + return c.sideInput(view); + } + + @Override + public Instant timestamp() { + return c.timestamp(); + } + + @Override + public BoundedWindow window() { + return extra.window(); + } + + @Override + public PaneInfo pane() { + return c.pane(); + } + + @Override + public WindowingInternals windowingInternals() { + return extra.windowingInternals(); + } + + @Override + public PipelineOptions getPipelineOptions() { + return c.getPipelineOptions(); + } + + @Override + public void output(OutputT output) { + c.output(output); + } + + @Override + public void outputWithTimestamp(OutputT output, Instant timestamp) { + c.outputWithTimestamp(output, timestamp); + } + + @Override + public void sideOutput(TupleTag tag, T output) { + c.sideOutput(tag, output); + } + + @Override + public void sideOutputWithTimestamp(TupleTag tag, T output, Instant timestamp) { + c.sideOutputWithTimestamp(tag, output, timestamp); + } + + @Override + protected Aggregator createAggregatorInternal( + String name, CombineFn combiner) { + return c.createAggregator(name, combiner); + } + }; + } + + /** Creates a {@link OldDoFn.ProcessContext} from a {@link DoFn.ProcessContext}. */ + public static OldDoFn.Context adaptContext( + OldDoFn fn, + final DoFn.Context c) { + return fn.new Context() { + @Override + public PipelineOptions getPipelineOptions() { + return c.getPipelineOptions(); + } + + @Override + public void output(OutputT output) { + c.output(output); + } + + @Override + public void outputWithTimestamp(OutputT output, Instant timestamp) { + c.outputWithTimestamp(output, timestamp); + } + + @Override + public void sideOutput(TupleTag tag, T output) { + c.sideOutput(tag, output); + } + + @Override + public void sideOutputWithTimestamp(TupleTag tag, T output, Instant timestamp) { + c.sideOutputWithTimestamp(tag, output, timestamp); + } + + @Override + protected Aggregator createAggregatorInternal( + String name, CombineFn combiner) { + return c.createAggregator(name, combiner); + } + }; + } + + /** + * If the fn was created using {@link #toOldDoFn}, returns the original {@link DoFn}. Otherwise, + * returns {@code null}. + */ + @Nullable + public static DoFn getDoFn(OldDoFn fn) { + if (fn instanceof SimpleDoFnAdapter) { + return ((SimpleDoFnAdapter) fn).fn; + } else { + return null; + } + } + /** * Wraps a {@link DoFn} that doesn't require access to {@link BoundedWindow} as an {@link * OldDoFn}. @@ -105,6 +240,21 @@ public void processElement(ProcessContext c) throws Exception { invoker.invokeProcessElement(adapter); } + @Override + protected TypeDescriptor getInputTypeDescriptor() { + return fn.getInputTypeDescriptor(); + } + + @Override + protected TypeDescriptor getOutputTypeDescriptor() { + return fn.getOutputTypeDescriptor(); + } + + @Override + Collection> getAggregators() { + return fn.getAggregators(); + } + @Override public Duration getAllowedTimestampSkew() { return fn.getAllowedTimestampSkew(); diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/OldDoFn.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/OldDoFn.java index 7b0453302a39..d1bb42bec7f6 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/OldDoFn.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/OldDoFn.java @@ -38,6 +38,7 @@ import org.apache.beam.sdk.util.WindowingInternals; import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.TupleTag; +import org.apache.beam.sdk.values.TypeDescriptor; import org.joda.time.Duration; import org.joda.time.Instant; @@ -70,6 +71,21 @@ */ @Deprecated public abstract class OldDoFn implements Serializable, HasDisplayData { + + public DoFn toDoFn() { + DoFn doFn = DoFnAdapters.getDoFn(this); + if (doFn != null) { + return doFn; + } + if (this instanceof RequiresWindowAccess) { + // No parameters as it just accesses `this` + return new AdaptedRequiresWindowAccessDoFn(); + } else { + // No parameters as it just accesses `this` + return new AdaptedDoFn(); + } + } + /** * Information accessible to all methods in this {@code OldDoFn}. * Used primarily to output elements. @@ -318,7 +334,7 @@ public OldDoFn() { this(new HashMap>()); } - public OldDoFn(Map> aggregators) { + OldDoFn(Map> aggregators) { this.aggregators = aggregators; } @@ -402,6 +418,32 @@ public void populateDisplayData(DisplayData.Builder builder) { ///////////////////////////////////////////////////////////////////////////// + /** + * Returns a {@link TypeDescriptor} capturing what is known statically + * about the input type of this {@code OldDoFn} instance's most-derived + * class. + * + *

See {@link #getOutputTypeDescriptor} for more discussion. + */ + protected TypeDescriptor getInputTypeDescriptor() { + return new TypeDescriptor(getClass()) {}; + } + + /** + * Returns a {@link TypeDescriptor} capturing what is known statically + * about the output type of this {@code OldDoFn} instance's + * most-derived class. + * + *

In the normal case of a concrete {@code OldDoFn} subclass with + * no generic type parameters of its own (including anonymous inner + * classes), this will be a complete non-generic type, which is good + * for choosing a default output {@code Coder} for the output + * {@code PCollection}. + */ + protected TypeDescriptor getOutputTypeDescriptor() { + return new TypeDescriptor(getClass()) {}; + } + /** * Returns an {@link Aggregator} with aggregation logic specified by the * {@link CombineFn} argument. The name provided must be unique across @@ -462,4 +504,255 @@ protected final Aggregator createAggregator(St Collection> getAggregators() { return Collections.>unmodifiableCollection(aggregators.values()); } + + /** + * A {@link Context} for an {@link OldDoFn} via a context for a proper {@link DoFn}. + */ + private class AdaptedContext extends Context { + + private final DoFn.Context newContext; + + public AdaptedContext( + DoFn.Context newContext) { + this.newContext = newContext; + super.setupDelegateAggregators(); + } + + @Override + public PipelineOptions getPipelineOptions() { + return newContext.getPipelineOptions(); + } + + @Override + public void output(OutputT output) { + newContext.output(output); + } + + @Override + public void outputWithTimestamp(OutputT output, Instant timestamp) { + newContext.outputWithTimestamp(output, timestamp); + } + + @Override + public void sideOutput(TupleTag tag, T output) { + newContext.sideOutput(tag, output); + } + + @Override + public void sideOutputWithTimestamp(TupleTag tag, T output, Instant timestamp) { + newContext.sideOutputWithTimestamp(tag, output, timestamp); + } + + @Override + protected Aggregator createAggregatorInternal( + String name, CombineFn combiner) { + return newContext.createAggregator(name, combiner); + } + } + + /** + * A {@link ProcessContext} for an {@link OldDoFn} via a context for a proper {@link DoFn}. + */ + private class AdaptedProcessContext extends ProcessContext { + + private final DoFn.ProcessContext newContext; + + public AdaptedProcessContext(DoFn.ProcessContext newContext) { + this.newContext = newContext; + } + + @Override + public InputT element() { + return newContext.element(); + } + + @Override + public T sideInput(PCollectionView view) { + return newContext.sideInput(view); + } + + @Override + public Instant timestamp() { + return newContext.timestamp(); + } + + @Override + public BoundedWindow window() { + throw new UnsupportedOperationException(String.format( + "%s.%s.windowingInternals() is no longer supported. Please convert your %s to a %s", + OldDoFn.class.getSimpleName(), + OldDoFn.ProcessContext.class.getSimpleName(), + OldDoFn.class.getSimpleName(), + DoFn.class.getSimpleName())); + } + + @Override + public PaneInfo pane() { + return newContext.pane(); + } + + @Override + public WindowingInternals windowingInternals() { + throw new UnsupportedOperationException(String.format( + "%s.%s.windowingInternals() is no longer supported. Please convert your %s to a %s", + OldDoFn.class.getSimpleName(), + OldDoFn.ProcessContext.class.getSimpleName(), + OldDoFn.class.getSimpleName(), + DoFn.class.getSimpleName())); + } + + @Override + public PipelineOptions getPipelineOptions() { + return newContext.getPipelineOptions(); + } + + @Override + public void output(OutputT output) { + newContext.output(output); + } + + @Override + public void outputWithTimestamp(OutputT output, Instant timestamp) { + newContext.outputWithTimestamp(output, timestamp); + } + + @Override + public void sideOutput(TupleTag tag, T output) { + newContext.sideOutput(tag, output); + } + + @Override + public void sideOutputWithTimestamp(TupleTag tag, T output, Instant timestamp) { + newContext.sideOutputWithTimestamp(tag, output, timestamp); + } + + @Override + protected Aggregator createAggregatorInternal( + String name, CombineFn combiner) { + return newContext.createAggregator(name, combiner); + } + } + + private class AdaptedDoFn extends DoFn { + + @Setup + public void setup() throws Exception { + OldDoFn.this.setup(); + } + + @StartBundle + public void startBundle(Context c) throws Exception { + OldDoFn.this.startBundle(OldDoFn.this.new AdaptedContext(c)); + } + + @ProcessElement + public void processElement(ProcessContext c) throws Exception { + OldDoFn.this.processElement(OldDoFn.this.new AdaptedProcessContext(c)); + } + + @FinishBundle + public void finishBundle(Context c) throws Exception { + OldDoFn.this.finishBundle(OldDoFn.this.new AdaptedContext(c)); + } + + @Teardown + public void teardown() throws Exception { + OldDoFn.this.teardown(); + } + + @Override + public Duration getAllowedTimestampSkew() { + return OldDoFn.this.getAllowedTimestampSkew(); + } + + @Override + public void populateDisplayData(DisplayData.Builder builder) { + OldDoFn.this.populateDisplayData(builder); + } + + @Override + public TypeDescriptor getInputTypeDescriptor() { + return OldDoFn.this.getInputTypeDescriptor(); + } + + @Override + Collection> getAggregators() { + return OldDoFn.this.getAggregators(); + } + + @Override + public TypeDescriptor getOutputTypeDescriptor() { + return OldDoFn.this.getOutputTypeDescriptor(); + } + } + + /** + * A {@link ProcessContext} for an {@link OldDoFn} that implements + * {@link RequiresWindowAccess}, via a context for a proper {@link DoFn}. + */ + private class AdaptedRequiresWindowAccessProcessContext extends AdaptedProcessContext { + + private final BoundedWindow window; + + public AdaptedRequiresWindowAccessProcessContext( + DoFn.ProcessContext newContext, + BoundedWindow window) { + super(newContext); + this.window = window; + } + + @Override + public BoundedWindow window() { + return window; + } + } + + private class AdaptedRequiresWindowAccessDoFn extends DoFn { + + @Setup + public void setup() throws Exception { + OldDoFn.this.setup(); + } + + @StartBundle + public void startBundle(Context c) throws Exception { + OldDoFn.this.startBundle(OldDoFn.this.new AdaptedContext(c)); + } + + @ProcessElement + public void processElement(ProcessContext c, BoundedWindow window) throws Exception { + OldDoFn.this.processElement( + OldDoFn.this.new AdaptedRequiresWindowAccessProcessContext(c, window)); + } + + @FinishBundle + public void finishBundle(Context c) throws Exception { + OldDoFn.this.finishBundle(OldDoFn.this.new AdaptedContext(c)); + } + + @Teardown + public void teardown() throws Exception { + OldDoFn.this.teardown(); + } + + @Override + public Duration getAllowedTimestampSkew() { + return OldDoFn.this.getAllowedTimestampSkew(); + } + + @Override + public void populateDisplayData(DisplayData.Builder builder) { + OldDoFn.this.populateDisplayData(builder); + } + + @Override + public TypeDescriptor getInputTypeDescriptor() { + return OldDoFn.this.getInputTypeDescriptor(); + } + + @Override + public TypeDescriptor getOutputTypeDescriptor() { + return OldDoFn.this.getOutputTypeDescriptor(); + } + } } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokers.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokers.java index b141d51150ba..50a7082cdad5 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokers.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokers.java @@ -18,7 +18,13 @@ package org.apache.beam.sdk.transforms.reflect; import java.io.Serializable; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.CoderRegistry; import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.DoFnAdapters; +import org.apache.beam.sdk.transforms.OldDoFn; +import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker; +import org.apache.beam.sdk.util.UserCodeException; /** Static utilities for working with {@link DoFnInvoker}. */ public class DoFnInvokers { @@ -36,22 +42,137 @@ public static DoFnInvoker invokerFor( return ByteBuddyDoFnInvokerFactory.only().newByteBuddyInvoker(fn); } + private DoFnInvokers() {} + /** - * Temporarily retained for compatibility with Dataflow worker. - * TODO: delete this when Dataflow worker is fixed to call {@link #invokerFor(DoFn)}. + * Returns a {@link DoFnInvoker} for the given {@link Object}, which should be either a {@link + * DoFn} or an {@link OldDoFn}. The expected use would be to deserialize a user's function as an + * {@link Object} and then pass it to this method, so there is no need to statically specify what + * sort of object it is. * - * @deprecated Use {@link #invokerFor(DoFn)}. + * @deprecated this is to be used only as a migration path for decoupling upgrades */ - @SuppressWarnings("unchecked") @Deprecated - public static DoFnInvoker invokerFor( - Serializable deserializedFn) { + public static DoFnInvoker invokerFor(Serializable deserializedFn) { if (deserializedFn instanceof DoFn) { - return invokerFor((DoFn) deserializedFn); + return invokerFor((DoFn) deserializedFn); + } else if (deserializedFn instanceof OldDoFn) { + return new OldDoFnInvoker<>((OldDoFn) deserializedFn); + } else { + throw new IllegalArgumentException( + String.format( + "Cannot create a %s for %s; it should be either a %s or an %s.", + DoFnInvoker.class.getSimpleName(), + deserializedFn.toString(), + DoFn.class.getSimpleName(), + OldDoFn.class.getSimpleName())); } - throw new UnsupportedOperationException( - "Only DoFn supported, was: " + deserializedFn.getClass()); } - private DoFnInvokers() {} + /** @deprecated use {@link DoFnInvokers#invokerFor(DoFn)}. */ + @Deprecated public static final DoFnInvokers INSTANCE = new DoFnInvokers(); + + /** @deprecated use {@link DoFnInvokers#invokerFor(DoFn)}. */ + @Deprecated + public DoFnInvoker invokerFor(Object deserializedFn) { + return (DoFnInvoker) DoFnInvokers.invokerFor((Serializable) deserializedFn); + } + + + static class OldDoFnInvoker implements DoFnInvoker { + + private final OldDoFn fn; + + public OldDoFnInvoker(OldDoFn fn) { + this.fn = fn; + } + + @Override + public DoFn.ProcessContinuation invokeProcessElement( + ArgumentProvider extra) { + // The outer DoFn is immaterial - it exists only to avoid typing InputT and OutputT repeatedly + DoFn.ProcessContext newCtx = + extra.processContext(new DoFn() {}); + OldDoFn.ProcessContext oldCtx = + DoFnAdapters.adaptProcessContext(fn, newCtx, extra); + try { + fn.processElement(oldCtx); + return DoFn.ProcessContinuation.stop(); + } catch (Throwable exc) { + throw UserCodeException.wrap(exc); + } + } + + @Override + public void invokeOnTimer(String timerId, ArgumentProvider arguments) { + throw new UnsupportedOperationException( + String.format("Timers are not supported for %s", OldDoFn.class.getSimpleName())); + } + + @Override + public void invokeStartBundle(DoFn.Context c) { + OldDoFn.Context oldCtx = DoFnAdapters.adaptContext(fn, c); + try { + fn.startBundle(oldCtx); + } catch (Throwable exc) { + throw UserCodeException.wrap(exc); + } + } + + @Override + public void invokeFinishBundle(DoFn.Context c) { + OldDoFn.Context oldCtx = DoFnAdapters.adaptContext(fn, c); + try { + fn.finishBundle(oldCtx); + } catch (Throwable exc) { + throw UserCodeException.wrap(exc); + } + } + + @Override + public void invokeSetup() { + try { + fn.setup(); + } catch (Throwable exc) { + throw UserCodeException.wrap(exc); + } + } + + @Override + public void invokeTeardown() { + try { + fn.teardown(); + } catch (Throwable exc) { + throw UserCodeException.wrap(exc); + } + } + + @Override + public RestrictionT invokeGetInitialRestriction(InputT element) { + throw new UnsupportedOperationException("OldDoFn is not splittable"); + } + + @Override + public Coder invokeGetRestrictionCoder( + CoderRegistry coderRegistry) { + throw new UnsupportedOperationException("OldDoFn is not splittable"); + } + + @Override + public void invokeSplitRestriction( + InputT element, RestrictionT restriction, DoFn.OutputReceiver receiver) { + throw new UnsupportedOperationException("OldDoFn is not splittable"); + } + + @Override + public > + TrackerT invokeNewTracker(RestrictionT restriction) { + throw new UnsupportedOperationException("OldDoFn is not splittable"); + } + + @Override + public DoFn getFn() { + throw new UnsupportedOperationException("getFn is not supported for OldDoFn"); + } + } } diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java index 4c6bee13f79d..4233b390a216 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java @@ -25,6 +25,7 @@ import static org.junit.Assert.assertThat; import static org.mockito.Matchers.eq; import static org.mockito.Matchers.same; +import static org.mockito.Mockito.any; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; @@ -731,4 +732,39 @@ public void onMyTimer(IntervalWindow w) { invoker.invokeOnTimer(timerId, mockArgumentProvider); assertThat(fn.window, equalTo(testWindow)); } + + private class OldDoFnIdentity extends OldDoFn { + public void processElement(ProcessContext c) {} + } + + @Test + public void testOldDoFnProcessElement() throws Exception { + new DoFnInvokers.OldDoFnInvoker<>(mockOldDoFn) + .invokeProcessElement(mockArgumentProvider); + verify(mockOldDoFn).processElement(any(OldDoFn.ProcessContext.class)); + } + + @Test + public void testOldDoFnStartBundle() throws Exception { + new DoFnInvokers.OldDoFnInvoker<>(mockOldDoFn).invokeStartBundle(mockProcessContext); + verify(mockOldDoFn).startBundle(any(OldDoFn.Context.class)); + } + + @Test + public void testOldDoFnFinishBundle() throws Exception { + new DoFnInvokers.OldDoFnInvoker<>(mockOldDoFn).invokeFinishBundle(mockProcessContext); + verify(mockOldDoFn).finishBundle(any(OldDoFn.Context.class)); + } + + @Test + public void testOldDoFnSetup() throws Exception { + new DoFnInvokers.OldDoFnInvoker<>(mockOldDoFn).invokeSetup(); + verify(mockOldDoFn).setup(); + } + + @Test + public void testOldDoFnTeardown() throws Exception { + new DoFnInvokers.OldDoFnInvoker<>(mockOldDoFn).invokeTeardown(); + verify(mockOldDoFn).teardown(); + } } From 4aa0ee1436a8d94f7c1c75bd0151790d14635c64 Mon Sep 17 00:00:00 2001 From: Eugene Kirpichov Date: Fri, 16 Dec 2016 15:26:32 -0800 Subject: [PATCH 2/4] Revert "Removes ArgumentProvider.windowingInternals" This reverts commit f3e8a0383bf9cb3f9452e0364f7deba113cadff9. --- .../beam/runners/core/DoFnAdapters.java | 14 +++++ .../beam/runners/core/SimpleDoFnRunner.java | 57 +++++++++++++++++++ .../beam/runners/core/SplittableParDo.java | 7 +++ .../beam/sdk/transforms/DoFnTester.java | 7 +++ .../sdk/transforms/reflect/DoFnInvoker.java | 20 +++++++ .../transforms/reflect/DoFnInvokersTest.java | 6 ++ 6 files changed, 111 insertions(+) diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnAdapters.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnAdapters.java index a4002daf9028..fc5847c75a7d 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnAdapters.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnAdapters.java @@ -38,6 +38,7 @@ import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.util.Timer; +import org.apache.beam.sdk.util.WindowingInternals; import org.apache.beam.sdk.util.state.State; import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.TupleTag; @@ -354,6 +355,14 @@ public OnTimerContext onTimerContext(DoFn doFn) { "Timers are not supported for OldDoFn"); } + @Override + public WindowingInternals windowingInternals() { + // The OldDoFn doesn't allow us to ask for these outside ProcessElements, so this + // should be unreachable. + throw new UnsupportedOperationException( + "Can only get WindowingInternals in processElement"); + } + @Override public DoFn.InputProvider inputProvider() { throw new UnsupportedOperationException("inputProvider() exists only for testing"); @@ -466,6 +475,11 @@ public OnTimerContext onTimerContext(DoFn doFn) { throw new UnsupportedOperationException("Timers are not supported for OldDoFn"); } + @Override + public WindowingInternals windowingInternals() { + return context.windowingInternals(); + } + @Override public DoFn.InputProvider inputProvider() { throw new UnsupportedOperationException("inputProvider() exists only for testing"); diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java index d504b403002c..29ef3ef73935 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java @@ -51,10 +51,13 @@ import org.apache.beam.sdk.util.SideInputReader; import org.apache.beam.sdk.util.SystemDoFnInternal; import org.apache.beam.sdk.util.Timer; +import org.apache.beam.sdk.util.TimerInternals; import org.apache.beam.sdk.util.UserCodeException; import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.util.WindowingInternals; import org.apache.beam.sdk.util.WindowingStrategy; import org.apache.beam.sdk.util.state.State; +import org.apache.beam.sdk.util.state.StateInternals; import org.apache.beam.sdk.util.state.StateNamespace; import org.apache.beam.sdk.util.state.StateNamespaces; import org.apache.beam.sdk.util.state.StateSpec; @@ -415,6 +418,11 @@ public OutputReceiver outputReceiver() { throw new UnsupportedOperationException("OutputReceiver is for testing only."); } + @Override + public WindowingInternals windowingInternals() { + throw new UnsupportedOperationException("WindowingInternals are unsupported."); + } + @Override public RestrictionTracker restrictionTracker() { throw new UnsupportedOperationException( @@ -625,5 +633,54 @@ public Timer timer(String timerId) { throw new UnsupportedOperationException("Timer parameters are not supported."); } + @Override + public WindowingInternals windowingInternals() { + return new WindowingInternals() { + @Override + public Collection windows() { + return windowedValue.getWindows(); + } + + @Override + public PaneInfo pane() { + return windowedValue.getPane(); + } + + @Override + public TimerInternals timerInternals() { + return context.stepContext.timerInternals(); + } + + @Override + public StateInternals stateInternals() { + return stepContext.stateInternals(); + } + + @Override + public void outputWindowedValue( + OutputT output, + Instant timestamp, + Collection windows, + PaneInfo pane) { + throw new UnsupportedOperationException("A DoFn cannot output to a different window"); + } + + @Override + public void sideOutputWindowedValue( + TupleTag tag, + SideOutputT output, + Instant timestamp, + Collection windows, + PaneInfo pane) { + throw new UnsupportedOperationException( + "A DoFn cannot side output to a different window"); + } + + @Override + public T sideInput(PCollectionView view, BoundedWindow sideInputWindow) { + return context.sideInput(view, sideInputWindow); + } + }; + } } } diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java index f8d12ecb81f1..e6a2466b2f17 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java @@ -53,6 +53,7 @@ import org.apache.beam.sdk.util.Timer; import org.apache.beam.sdk.util.TimerInternals; import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.util.WindowingInternals; import org.apache.beam.sdk.util.WindowingStrategy; import org.apache.beam.sdk.util.state.State; import org.apache.beam.sdk.util.state.StateInternals; @@ -683,6 +684,12 @@ public DoFn.OutputReceiver outputReceiver() { throw new IllegalStateException("Unexpected extra context access on a splittable DoFn"); } + @Override + public WindowingInternals windowingInternals() { + // DoFnSignatures should have verified that this DoFn doesn't access extra context. + throw new IllegalStateException("Unexpected extra context access on a splittable DoFn"); + } + @Override public TrackerT restrictionTracker() { return tracker; diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java index b2c3fd5ed2e9..2d8684aaaff7 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java @@ -48,6 +48,7 @@ import org.apache.beam.sdk.util.Timer; import org.apache.beam.sdk.util.UserCodeException; import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.util.WindowingInternals; import org.apache.beam.sdk.util.state.InMemoryStateInternals; import org.apache.beam.sdk.util.state.StateInternals; import org.apache.beam.sdk.values.PCollectionView; @@ -326,6 +327,12 @@ public DoFn.OutputReceiver outputReceiver() { "Not expected to access OutputReceiver from DoFnTester"); } + @Override + public WindowingInternals windowingInternals() { + throw new UnsupportedOperationException( + "Not expected to access WindowingInternals from a new DoFn"); + } + @Override public RestrictionTracker restrictionTracker() { throw new UnsupportedOperationException( diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvoker.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvoker.java index 354578e2a831..97ac9d3d53c7 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvoker.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvoker.java @@ -27,9 +27,11 @@ import org.apache.beam.sdk.transforms.DoFn.StartBundle; import org.apache.beam.sdk.transforms.DoFn.StateId; import org.apache.beam.sdk.transforms.DoFn.TimerId; +import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.util.Timer; +import org.apache.beam.sdk.util.WindowingInternals; import org.apache.beam.sdk.util.state.State; /** @@ -119,6 +121,19 @@ interface ArgumentProvider { /** A placeholder for testing purposes. */ OutputReceiver outputReceiver(); + /** + * For migration from {@link OldDoFn} to {@link DoFn}, provide a {@link WindowingInternals} so + * an {@link OldDoFn} can be run via {@link DoFnInvoker}. + * + *

This is not exposed via the reflective capabilities of {@link DoFn}. + * + * @deprecated Please port occurences of {@link OldDoFn} to {@link DoFn}. If they require state + * and timers, they will need to wait for the arrival of those features. Do not introduce + * new uses of this method. + */ + @Deprecated + WindowingInternals windowingInternals(); + /** * If this is a splittable {@link DoFn}, returns the {@link RestrictionTracker} associated with * the current {@link ProcessElement} call. @@ -164,6 +179,11 @@ public OutputReceiver outputReceiver() { return null; } + @Override + public WindowingInternals windowingInternals() { + return null; + } + @Override public State state(String stateId) { return null; diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java index 4233b390a216..456a6ebff0a6 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java @@ -41,6 +41,7 @@ import org.apache.beam.sdk.coders.VarIntCoder; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.DoFn.ProcessContinuation; +import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.reflect.DoFnInvoker.FakeArgumentProvider; import org.apache.beam.sdk.transforms.reflect.testhelper.DoFnInvokersTestHelper; import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker; @@ -51,6 +52,7 @@ import org.apache.beam.sdk.util.TimerSpec; import org.apache.beam.sdk.util.TimerSpecs; import org.apache.beam.sdk.util.UserCodeException; +import org.apache.beam.sdk.util.WindowingInternals; import org.apache.beam.sdk.util.state.StateSpec; import org.apache.beam.sdk.util.state.StateSpecs; import org.apache.beam.sdk.util.state.ValueState; @@ -76,14 +78,18 @@ public class DoFnInvokersTest { @Mock private IntervalWindow mockWindow; @Mock private DoFn.InputProvider mockInputProvider; @Mock private DoFn.OutputReceiver mockOutputReceiver; + @Mock private WindowingInternals mockWindowingInternals; @Mock private DoFnInvoker.ArgumentProvider mockArgumentProvider; + @Mock private OldDoFn mockOldDoFn; + @Before public void setUp() { MockitoAnnotations.initMocks(this); when(mockArgumentProvider.window()).thenReturn(mockWindow); when(mockArgumentProvider.inputProvider()).thenReturn(mockInputProvider); when(mockArgumentProvider.outputReceiver()).thenReturn(mockOutputReceiver); + when(mockArgumentProvider.windowingInternals()).thenReturn(mockWindowingInternals); when(mockArgumentProvider.processContext(Matchers.any())).thenReturn(mockProcessContext); } From 954e57d7696fd14f7d1015f4e40f025ef8538802 Mon Sep 17 00:00:00 2001 From: Eugene Kirpichov Date: Fri, 16 Dec 2016 15:37:02 -0800 Subject: [PATCH 3/4] Revert "Moves DoFnAdapters to runners-core" This reverts commit 33ed3238e2b3899cff061be3056c5cc29fc60a04. --- .../apex/translation/WindowBoundTranslator.java | 2 +- .../operators/ApexGroupByKeyOperator.java | 2 +- .../translation/operators/ApexParDoOperator.java | 2 +- .../beam/runners/core/SimpleOldDoFnRunner.java | 4 ++-- .../runners/core/GroupAlsoByWindowsProperties.java | 2 +- .../translation/functions/FlinkDoFnFunction.java | 2 +- .../functions/FlinkMultiOutputDoFnFunction.java | 2 +- .../functions/FlinkProcessContextBase.java | 2 +- .../wrappers/streaming/DoFnOperator.java | 2 +- .../beam/sdk/transforms/AggregatorRetriever.java | 13 ++----------- .../apache/beam/sdk/transforms}/DoFnAdapters.java | 8 ++------ .../org/apache/beam/sdk/transforms/OldDoFn.java | 2 +- .../org/apache/beam/sdk/transforms/NoOpOldDoFn.java | 2 +- 13 files changed, 16 insertions(+), 29 deletions(-) rename {runners/core-java/src/main/java/org/apache/beam/runners/core => sdks/java/core/src/main/java/org/apache/beam/sdk/transforms}/DoFnAdapters.java (98%) diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/WindowBoundTranslator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/WindowBoundTranslator.java index ef049e19ce88..33b9269f9c43 100644 --- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/WindowBoundTranslator.java +++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/WindowBoundTranslator.java @@ -22,8 +22,8 @@ import org.apache.beam.runners.apex.ApexPipelineOptions; import org.apache.beam.runners.apex.translation.operators.ApexParDoOperator; import org.apache.beam.runners.core.AssignWindowsDoFn; -import org.apache.beam.runners.core.DoFnAdapters; import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.DoFnAdapters; import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.Window; diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java index 4af7ff0330e2..48ac177ead7b 100644 --- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java +++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java @@ -413,7 +413,7 @@ public void sideOutputWithTimestamp(TupleTag tag, T output, Instant times } @Override - public Aggregator createAggregatorInternal( + protected Aggregator createAggregatorInternal( String name, Combine.CombineFn combiner) { throw new UnsupportedOperationException(); } diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java index 4538fb5a052b..a3d3a97e2ce0 100644 --- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java +++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java @@ -38,7 +38,6 @@ import org.apache.beam.runners.apex.translation.utils.SerializablePipelineOptions; import org.apache.beam.runners.apex.translation.utils.ValueAndCoderKryoSerializable; import org.apache.beam.runners.core.AggregatorFactory; -import org.apache.beam.runners.core.DoFnAdapters; import org.apache.beam.runners.core.DoFnRunner; import org.apache.beam.runners.core.DoFnRunners; import org.apache.beam.runners.core.DoFnRunners.OutputManager; @@ -49,6 +48,7 @@ import org.apache.beam.sdk.transforms.Aggregator; import org.apache.beam.sdk.transforms.Combine.CombineFn; import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.DoFnAdapters; import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.util.ExecutionContext; import org.apache.beam.sdk.util.NullSideInputReader; diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleOldDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleOldDoFnRunner.java index 7d9320062f27..1048fdcc09ad 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleOldDoFnRunner.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleOldDoFnRunner.java @@ -322,7 +322,7 @@ public void sideOutputWithTimestamp(TupleTag tag, T output, Instant times } @Override - public Aggregator createAggregatorInternal( + protected Aggregator createAggregatorInternal( String name, CombineFn combiner) { checkNotNull(combiner, "Combiner passed to createAggregatorInternal cannot be null"); return aggregatorFactory.createAggregatorForDoFn(fn.getClass(), stepContext, name, combiner); @@ -504,7 +504,7 @@ public T sideInput(PCollectionView view, BoundedWindow sideInputWindow) { } @Override - public Aggregator + protected Aggregator createAggregatorInternal( String name, CombineFn combiner) { return context.createAggregatorInternal(name, combiner); diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/GroupAlsoByWindowsProperties.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/GroupAlsoByWindowsProperties.java index ef01106fdf77..97b67c653c9c 100644 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/GroupAlsoByWindowsProperties.java +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/GroupAlsoByWindowsProperties.java @@ -744,7 +744,7 @@ public void sideOutputWithTimestamp(TupleTag tag, T output, Instant times } @Override - public Aggregator createAggregatorInternal( + protected Aggregator createAggregatorInternal( String name, CombineFn combiner) { throw new UnsupportedOperationException(); } diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java index 2a4a68e7b5bb..ed200d58aa57 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java @@ -18,10 +18,10 @@ package org.apache.beam.runners.flink.translation.functions; import java.util.Map; -import org.apache.beam.runners.core.DoFnAdapters; import org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.DoFnAdapters; import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.reflect.DoFnSignatures; import org.apache.beam.sdk.util.WindowedValue; diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMultiOutputDoFnFunction.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMultiOutputDoFnFunction.java index a97bd46fc77b..7f6a4369ad85 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMultiOutputDoFnFunction.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMultiOutputDoFnFunction.java @@ -18,10 +18,10 @@ package org.apache.beam.runners.flink.translation.functions; import java.util.Map; -import org.apache.beam.runners.core.DoFnAdapters; import org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.DoFnAdapters; import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.join.RawUnionValue; import org.apache.beam.sdk.transforms.reflect.DoFnSignatures; diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkProcessContextBase.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkProcessContextBase.java index 53b98038adb9..6afca38dcce4 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkProcessContextBase.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkProcessContextBase.java @@ -252,7 +252,7 @@ protected abstract void outputWithTimestampAndWindow( public abstract void sideOutputWithTimestamp(TupleTag tag, T output, Instant timestamp); @Override - public Aggregator + protected Aggregator createAggregatorInternal(String name, Combine.CombineFn combiner) { @SuppressWarnings("unchecked") SerializableFnAggregatorWrapper result = diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java index 001e3b68ce0c..870430844a7c 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java @@ -29,7 +29,6 @@ import java.util.List; import java.util.Map; import org.apache.beam.runners.core.AggregatorFactory; -import org.apache.beam.runners.core.DoFnAdapters; import org.apache.beam.runners.core.DoFnRunner; import org.apache.beam.runners.core.DoFnRunners; import org.apache.beam.runners.core.PushbackSideInputDoFnRunner; @@ -42,6 +41,7 @@ import org.apache.beam.sdk.transforms.Aggregator; import org.apache.beam.sdk.transforms.Combine; import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.DoFnAdapters; import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.join.RawUnionValue; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/AggregatorRetriever.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/AggregatorRetriever.java index b1d3ead72ee2..ce47e22fda06 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/AggregatorRetriever.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/AggregatorRetriever.java @@ -18,10 +18,9 @@ package org.apache.beam.sdk.transforms; import java.util.Collection; -import java.util.Map; /** - * An internal class for extracting {@link Aggregator Aggregators} from {@link DoFn DoFns}. + * An internal class for extracting {@link Aggregator Aggregators} from {@link OldDoFn DoFns}. */ public final class AggregatorRetriever { private AggregatorRetriever() { @@ -29,17 +28,9 @@ private AggregatorRetriever() { } /** - * Returns the {@link Aggregator Aggregators} created by the provided {@link DoFn}. + * Returns the {@link Aggregator Aggregators} created by the provided {@link OldDoFn}. */ public static Collection> getAggregators(DoFn fn) { return fn.getAggregators(); } - - /** - * Returns the {@link DelegatingAggregator delegating aggregators} created by the provided {@link - * DoFn}. - */ - public static Map> getDelegatingAggregators(DoFn fn) { - return fn.aggregators; - } } diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnAdapters.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnAdapters.java similarity index 98% rename from runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnAdapters.java rename to sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnAdapters.java index fc5847c75a7d..e15b08b46d79 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnAdapters.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnAdapters.java @@ -15,20 +15,16 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.beam.runners.core; +package org.apache.beam.sdk.transforms; import java.io.IOException; import java.util.Collection; import javax.annotation.Nullable; import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.beam.sdk.transforms.Aggregator; -import org.apache.beam.sdk.transforms.AggregatorRetriever; import org.apache.beam.sdk.transforms.Combine.CombineFn; -import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.DoFn.Context; import org.apache.beam.sdk.transforms.DoFn.OnTimerContext; import org.apache.beam.sdk.transforms.DoFn.ProcessContext; -import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.transforms.reflect.DoFnInvoker; import org.apache.beam.sdk.transforms.reflect.DoFnInvokers; @@ -209,7 +205,7 @@ private static class SimpleDoFnAdapter extends OldDoFn invoker; SimpleDoFnAdapter(DoFn fn) { - super(AggregatorRetriever.getDelegatingAggregators(fn)); + super(fn.aggregators); this.fn = fn; this.invoker = DoFnInvokers.invokerFor(fn); } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/OldDoFn.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/OldDoFn.java index d1bb42bec7f6..2d2c1fde737b 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/OldDoFn.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/OldDoFn.java @@ -208,7 +208,7 @@ public abstract void sideOutputWithTimestamp( * context */ @Experimental(Kind.AGGREGATOR) - public abstract Aggregator + protected abstract Aggregator createAggregatorInternal(String name, CombineFn combiner); /** diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/NoOpOldDoFn.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/NoOpOldDoFn.java index 0db130db59dd..504480b66cea 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/NoOpOldDoFn.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/NoOpOldDoFn.java @@ -63,7 +63,7 @@ public void sideOutputWithTimestamp(TupleTag tag, T output, Instant timestamp) { } @Override - public Aggregator + protected Aggregator createAggregatorInternal(String name, CombineFn combiner) { return null; } From 45ed5c70c18a806d0fc2e7385886285206fd18e4 Mon Sep 17 00:00:00 2001 From: Eugene Kirpichov Date: Fri, 16 Dec 2016 16:33:51 -0800 Subject: [PATCH 4/4] Revert "Move InMemoryTimerInternals to runners-core" This reverts commit ec0bf7b4023ff75f4ec6723d2e77ed507eb57c51. --- ...GroupAlsoByWindowsViaOutputBufferDoFn.java | 1 + .../beam/runners/core/ReduceFnTester.java | 1 + .../runners/core/SplittableParDoTest.java | 16 ++------- .../triggers/TriggerStateMachineTester.java | 2 +- .../translation/SparkGroupAlsoByWindowFn.java | 2 +- .../beam/sdk/transforms/DoFnTester.java | 36 +++++++++++++++++++ .../util/state}/InMemoryTimerInternals.java | 3 +- .../state}/InMemoryTimerInternalsTest.java | 4 +-- 8 files changed, 45 insertions(+), 20 deletions(-) rename {runners/core-java/src/main/java/org/apache/beam/runners/core => sdks/java/core/src/main/java/org/apache/beam/sdk/util/state}/InMemoryTimerInternals.java (99%) rename {runners/core-java/src/test/java/org/apache/beam/runners/core => sdks/java/core/src/test/java/org/apache/beam/sdk/util/state}/InMemoryTimerInternalsTest.java (97%) diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsViaOutputBufferDoFn.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsViaOutputBufferDoFn.java index efcd771d01b9..918919170ec6 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsViaOutputBufferDoFn.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsViaOutputBufferDoFn.java @@ -27,6 +27,7 @@ import org.apache.beam.sdk.util.TimerInternals; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.util.WindowingStrategy; +import org.apache.beam.sdk.util.state.InMemoryTimerInternals; import org.apache.beam.sdk.util.state.StateInternals; import org.apache.beam.sdk.util.state.StateInternalsFactory; import org.joda.time.Instant; diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java index 890195a0d3c1..db0cf9186a7d 100644 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java @@ -73,6 +73,7 @@ import org.apache.beam.sdk.util.WindowingInternals; import org.apache.beam.sdk.util.WindowingStrategy; import org.apache.beam.sdk.util.WindowingStrategy.AccumulationMode; +import org.apache.beam.sdk.util.state.InMemoryTimerInternals; import org.apache.beam.sdk.util.state.StateNamespace; import org.apache.beam.sdk.util.state.StateNamespaces; import org.apache.beam.sdk.util.state.StateTag; diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java index 41d419baa8b2..cf96b660bea6 100644 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java @@ -28,7 +28,6 @@ import static org.junit.Assert.assertTrue; import java.io.Serializable; -import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.List; @@ -191,8 +190,6 @@ private static class ProcessFnTester< tester; private Instant currentProcessingTime; - private InMemoryTimerInternals timerInternals; - ProcessFnTester( Instant currentProcessingTime, DoFn fn, @@ -203,7 +200,6 @@ private static class ProcessFnTester< new SplittableParDo.ProcessFn<>( fn, inputCoder, restrictionCoder, IntervalWindow.getCoder()); this.tester = DoFnTester.of(processFn); - this.timerInternals = new InMemoryTimerInternals(); processFn.setStateInternalsFactory( new StateInternalsFactory() { @Override @@ -215,7 +211,7 @@ public StateInternals stateInternalsForKey(String key) { new TimerInternalsFactory() { @Override public TimerInternals timerInternalsForKey(String key) { - return timerInternals; + return tester.getTimerInternals(); } }); processFn.setOutputWindowedValue( @@ -251,7 +247,7 @@ public void sideOutputWindowedValue( // through the state/timer/output callbacks. this.tester.setCloningBehavior(DoFnTester.CloningBehavior.DO_NOT_CLONE); this.tester.startBundle(); - timerInternals.advanceProcessingTime(currentProcessingTime); + this.tester.advanceProcessingTime(currentProcessingTime); this.currentProcessingTime = currentProcessingTime; } @@ -289,13 +285,7 @@ void startElement( */ boolean advanceProcessingTimeBy(Duration duration) throws Exception { currentProcessingTime = currentProcessingTime.plus(duration); - timerInternals.advanceProcessingTime(currentProcessingTime); - - List timers = new ArrayList<>(); - TimerInternals.TimerData nextTimer; - while ((nextTimer = timerInternals.removeNextProcessingTimer()) != null) { - timers.add(nextTimer); - } + List timers = tester.advanceProcessingTime(currentProcessingTime); if (timers.isEmpty()) { return false; } diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/TriggerStateMachineTester.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/TriggerStateMachineTester.java index 2a626d4311d5..be63c0644e27 100644 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/TriggerStateMachineTester.java +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/TriggerStateMachineTester.java @@ -34,7 +34,6 @@ import javax.annotation.Nullable; import org.apache.beam.runners.core.ActiveWindowSet; import org.apache.beam.runners.core.ActiveWindowSet.MergeCallback; -import org.apache.beam.runners.core.InMemoryTimerInternals; import org.apache.beam.runners.core.MergingActiveWindowSet; import org.apache.beam.runners.core.NonMergingActiveWindowSet; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; @@ -47,6 +46,7 @@ import org.apache.beam.sdk.util.WindowTracing; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.util.WindowingStrategy.AccumulationMode; +import org.apache.beam.sdk.util.state.InMemoryTimerInternals; import org.apache.beam.sdk.util.state.StateInternals; import org.apache.beam.sdk.util.state.StateNamespace; import org.apache.beam.sdk.util.state.StateNamespaces; diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGroupAlsoByWindowFn.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGroupAlsoByWindowFn.java index 5432d58994af..87d3f5059039 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGroupAlsoByWindowFn.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGroupAlsoByWindowFn.java @@ -23,7 +23,6 @@ import java.util.Collection; import java.util.List; import org.apache.beam.runners.core.GroupAlsoByWindowsDoFn; -import org.apache.beam.runners.core.InMemoryTimerInternals; import org.apache.beam.runners.core.OutputWindowedValue; import org.apache.beam.runners.core.ReduceFnRunner; import org.apache.beam.runners.core.SystemReduceFn; @@ -38,6 +37,7 @@ import org.apache.beam.sdk.util.TimerInternals; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.util.WindowingStrategy; +import org.apache.beam.sdk.util.state.InMemoryTimerInternals; import org.apache.beam.sdk.util.state.StateInternals; import org.apache.beam.sdk.util.state.StateInternalsFactory; import org.apache.beam.sdk.values.KV; diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java index 2d8684aaaff7..93b3f5954898 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java @@ -46,10 +46,12 @@ import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.util.SerializableUtils; import org.apache.beam.sdk.util.Timer; +import org.apache.beam.sdk.util.TimerInternals; import org.apache.beam.sdk.util.UserCodeException; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.util.WindowingInternals; import org.apache.beam.sdk.util.state.InMemoryStateInternals; +import org.apache.beam.sdk.util.state.InMemoryTimerInternals; import org.apache.beam.sdk.util.state.StateInternals; import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.TimestampedValue; @@ -141,6 +143,10 @@ public StateInternals getStateInternals() { return (StateInternals) stateInternals; } + public TimerInternals getTimerInternals() { + return timerInternals; + } + /** * When a {@link DoFnTester} should clone the {@link DoFn} under test and how it should manage * the lifecycle of the {@link DoFn}. @@ -227,6 +233,7 @@ public void startBundle() throws Exception { context.setupDelegateAggregators(); // State and timer internals are per-bundle. stateInternals = InMemoryStateInternals.forKey(new Object()); + timerInternals = new InMemoryTimerInternals(); try { fnInvoker.invokeStartBundle(context); } catch (UserCodeException e) { @@ -535,6 +542,34 @@ public AggregateT getAggregatorValue(Aggregator agg) return extractAggregatorValue(agg.getName(), agg.getCombineFn()); } + public List advanceInputWatermark(Instant newWatermark) { + try { + timerInternals.advanceInputWatermark(newWatermark); + final List firedTimers = new ArrayList<>(); + TimerInternals.TimerData timer; + while ((timer = timerInternals.removeNextEventTimer()) != null) { + firedTimers.add(timer); + } + return firedTimers; + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + public List advanceProcessingTime(Instant newProcessingTime) { + try { + timerInternals.advanceProcessingTime(newProcessingTime); + final List firedTimers = new ArrayList<>(); + TimerInternals.TimerData timer; + while ((timer = timerInternals.removeNextProcessingTimer()) != null) { + firedTimers.add(timer); + } + return firedTimers; + } catch (Exception e) { + throw new RuntimeException(e); + } + } + private AggregateT extractAggregatorValue( String name, CombineFn combiner) { @SuppressWarnings("unchecked") @@ -779,6 +814,7 @@ private enum State { private Map, List>> outputs; private InMemoryStateInternals stateInternals; + private InMemoryTimerInternals timerInternals; /** The state of processing of the {@link DoFn} under test. */ private State state = State.UNINITIALIZED; diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryTimerInternals.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/InMemoryTimerInternals.java similarity index 99% rename from runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryTimerInternals.java rename to sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/InMemoryTimerInternals.java index b22fcb3d2c0a..44b44f06ead6 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryTimerInternals.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/InMemoryTimerInternals.java @@ -15,7 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.beam.runners.core; +package org.apache.beam.sdk.util.state; import static com.google.common.base.Preconditions.checkNotNull; import static com.google.common.base.Preconditions.checkState; @@ -29,7 +29,6 @@ import org.apache.beam.sdk.util.TimeDomain; import org.apache.beam.sdk.util.TimerInternals; import org.apache.beam.sdk.util.WindowTracing; -import org.apache.beam.sdk.util.state.StateNamespace; import org.joda.time.Instant; /** diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/InMemoryTimerInternalsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/state/InMemoryTimerInternalsTest.java similarity index 97% rename from runners/core-java/src/test/java/org/apache/beam/runners/core/InMemoryTimerInternalsTest.java rename to sdks/java/core/src/test/java/org/apache/beam/sdk/util/state/InMemoryTimerInternalsTest.java index 2caa8742dea1..4a2763ccc76e 100644 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/InMemoryTimerInternalsTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/state/InMemoryTimerInternalsTest.java @@ -15,7 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.beam.runners.core; +package org.apache.beam.sdk.util.state; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.nullValue; @@ -25,8 +25,6 @@ import org.apache.beam.sdk.util.TimeDomain; import org.apache.beam.sdk.util.TimerInternals.TimerData; -import org.apache.beam.sdk.util.state.StateNamespace; -import org.apache.beam.sdk.util.state.StateNamespaceForTest; import org.joda.time.Instant; import org.junit.Test; import org.junit.runner.RunWith;