From 20208d68142e756800507048d9b8339041f2db70 Mon Sep 17 00:00:00 2001 From: Kenneth Knowles Date: Tue, 9 Aug 2016 20:42:04 -0700 Subject: [PATCH] Remove WindowingInternals support from DoFnReflector The test themselves are replaced by mostly-hidden placeholders, to ensure that our code for handling generic parameters remains in place until new context parameters that use generics are added back. --- .../org/apache/beam/sdk/transforms/DoFn.java | 44 ++++- .../beam/sdk/transforms/DoFnReflector.java | 92 ++++++---- .../sdk/transforms/DoFnReflectorTest.java | 157 +++++++++++++----- .../transforms/DoFnReflectorBenchmark.java | 13 +- 4 files changed, 214 insertions(+), 92 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java index 6f9a6b660db7..a06467e9b381 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java @@ -28,7 +28,6 @@ import org.apache.beam.sdk.transforms.display.HasDisplayData; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; -import org.apache.beam.sdk.util.WindowingInternals; import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.TupleTag; import org.apache.beam.sdk.values.TypeDescriptor; @@ -302,11 +301,43 @@ public interface ExtraContextFactory { BoundedWindow window(); /** - * Construct the {@link WindowingInternals} to use within a {@link DoFn} that - * needs it. This is called if the {@link ProcessElement} method has a parameter of type - * {@link WindowingInternals}. + * A placeholder for testing purposes. The return type itself is package-private and not + * implemented. */ - WindowingInternals windowingInternals(); + InputProvider inputProvider(); + + /** + * A placeholder for testing purposes. The return type itself is package-private and not + * implemented. + */ + OutputReceiver outputReceiver(); + } + + static interface OutputReceiver { + void output(T output); + } + + static interface InputProvider { + T get(); + } + + /** For testing only, this {@link ExtraContextFactory} returns {@code null} for all parameters. */ + public static class FakeExtraContextFactory + implements ExtraContextFactory { + @Override + public BoundedWindow window() { + return null; + } + + @Override + public InputProvider inputProvider() { + return null; + } + + @Override + public OutputReceiver outputReceiver() { + return null; + } } ///////////////////////////////////////////////////////////////////////////// @@ -331,8 +362,7 @@ public interface ExtraContextFactory { *
    *
  • It must have at least one argument. *
  • Its first argument must be a {@link DoFn.ProcessContext}. - *
  • Its remaining arguments must be {@link BoundedWindow}, or - * {@link WindowingInternals WindowingInternals<InputT, OutputT>}. + *
  • Its remaining argument, if any, must be {@link BoundedWindow}. *
*/ @Documented diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnReflector.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnReflector.java index c6168b35a299..3dfda55caed4 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnReflector.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnReflector.java @@ -26,7 +26,6 @@ import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.util.UserCodeException; -import org.apache.beam.sdk.util.WindowingInternals; import org.apache.beam.sdk.util.common.ReflectHelpers; import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.TupleTag; @@ -35,6 +34,7 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Function; import com.google.common.base.Preconditions; +import com.google.common.base.Predicate; import com.google.common.collect.FluentIterable; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; @@ -91,6 +91,7 @@ import java.util.LinkedHashSet; import java.util.List; import java.util.Map; +import javax.annotation.Nonnull; import javax.annotation.Nullable; @@ -121,20 +122,35 @@ private enum AdditionalParameter { /** Any {@link BoundedWindow} parameter is populated by the window of the current element. */ WINDOW_OF_ELEMENT(Availability.PROCESS_ELEMENT_ONLY, BoundedWindow.class, "window") { @Override - public TypeToken - tokenFor(TypeToken in, TypeToken out) { + public TypeToken tokenFor(TypeToken in, TypeToken out) { return TypeToken.of(BoundedWindow.class); } }, - WINDOWING_INTERNALS(Availability.PROCESS_ELEMENT_ONLY, - WindowingInternals.class, "windowingInternals") { + INPUT_PROVIDER(Availability.PROCESS_ELEMENT_ONLY, DoFn.InputProvider.class, "inputProvider") { @Override - public TypeToken tokenFor( - TypeToken in, TypeToken out) { - return new TypeToken>() {} - .where(new TypeParameter() {}, in) - .where(new TypeParameter() {}, out); + public TypeToken tokenFor(TypeToken in, TypeToken out) { + return new TypeToken>() {}.where( + new TypeParameter() {}, in); + } + + @Override + public boolean isHidden() { + return true; + } + }, + + OUTPUT_RECEIVER( + Availability.PROCESS_ELEMENT_ONLY, DoFn.OutputReceiver.class, "outputReceiver") { + @Override + public TypeToken tokenFor(TypeToken in, TypeToken out) { + return new TypeToken>() {}.where( + new TypeParameter() {}, out); + } + + @Override + public boolean isHidden() { + return true; } }; @@ -146,6 +162,14 @@ public TypeToken tokenFor( abstract TypeToken tokenFor( TypeToken in, TypeToken out); + /** + * Indicates whether this enum is for testing only, hence should not appear in error messages, + * etc. Defaults to {@code false}. + */ + boolean isHidden() { + return false; + } + private final Class rawType; private final Availability availability; private final transient MethodDescription method; @@ -241,16 +265,17 @@ private static Collection describeSupportedTypes( final TypeToken in, final TypeToken out) { return FluentIterable .from(extraProcessContexts.values()) + .filter(new Predicate() { + @Override + public boolean apply(@Nonnull AdditionalParameter additionalParameter) { + return !additionalParameter.isHidden(); + } + }) .transform(new Function() { - @Override - @Nullable - public String apply(@Nullable AdditionalParameter input) { - if (input == null) { - return null; - } else { - return formatType(input.tokenFor(in, out)); - } + @Nonnull + public String apply(@Nonnull AdditionalParameter input) { + return formatType(input.tokenFor(in, out)); } }) .toSortedSet(String.CASE_INSENSITIVE_ORDER); @@ -285,10 +310,9 @@ static List verifyBundleMethodArguments(M *
  • The method has at least one argument. *
  • The first argument is of type firstContextArg. *
  • The remaining arguments have raw types that appear in {@code contexts} - *
  • Any generics on the extra context arguments match what is expected. Eg., - * {@code WindowingInternals} either matches the - * {@code InputT} and {@code OutputT} parameters of the - * {@code OldDoFn.ProcessContext}, or it uses a wildcard, etc. + *
  • Any generics on the extra context arguments match what is expected. Currently, this + * is exercised only by placeholders. For example, {@code InputReceiver must either match + * the {@code InputT} {@code OldDoFn.ProcessContext} or use a wildcard, etc. * * * @param m the method to verify @@ -298,7 +322,8 @@ static List verifyBundleMethodArguments(M * @param iParam TypeParameter representing the input type * @param oParam TypeParameter representing the output type */ - @VisibleForTesting static List verifyMethodArguments( + @VisibleForTesting + static List verifyMethodArguments( Method m, Map, AdditionalParameter> contexts, TypeToken firstContextArg, @@ -607,11 +632,13 @@ public BoundedWindow window() { } @Override - public WindowingInternals windowingInternals() { - // The DoFn doesn't allow us to ask for these outside ProcessElements, so this - // should be unreachable. - throw new UnsupportedOperationException( - "Can only get the windowingInternals in ProcessElements"); + public DoFn.InputProvider inputProvider() { + throw new UnsupportedOperationException("inputProvider() exists only for testing"); + } + + @Override + public DoFn.OutputReceiver outputReceiver() { + throw new UnsupportedOperationException("outputReceiver() exists only for testing"); } } @@ -679,8 +706,13 @@ public BoundedWindow window() { } @Override - public WindowingInternals windowingInternals() { - return context.windowingInternals(); + public DoFn.InputProvider inputProvider() { + throw new UnsupportedOperationException("inputProvider() exists only for testing"); + } + + @Override + public DoFn.OutputReceiver outputReceiver() { + throw new UnsupportedOperationException("outputReceiver() exists only for testing"); } } diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnReflectorTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnReflectorTest.java index df9e441f08b0..c47e0cf5a9d8 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnReflectorTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnReflectorTest.java @@ -28,7 +28,6 @@ import org.apache.beam.sdk.transforms.dofnreflector.DoFnReflectorTestHelper; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.util.UserCodeException; -import org.apache.beam.sdk.util.WindowingInternals; import org.junit.Before; import org.junit.Rule; @@ -71,7 +70,9 @@ public Invocations(String name) { @Mock private BoundedWindow mockWindow; @Mock - private WindowingInternals mockWindowingInternals; + private DoFn.InputProvider mockInputProvider; + @Mock + private DoFn.OutputReceiver mockOutputReceiver; private ExtraContextFactory extraContextFactory; @@ -85,8 +86,13 @@ public BoundedWindow window() { } @Override - public WindowingInternals windowingInternals() { - return mockWindowingInternals; + public DoFn.InputProvider inputProvider() { + return mockInputProvider; + } + + @Override + public DoFn.OutputReceiver outputReceiver() { + return mockOutputReceiver; } }; } @@ -257,16 +263,35 @@ public void processElement(ProcessContext c, BoundedWindow w) } @Test - public void testDoFnWithWindowingInternals() throws Exception { + public void testDoFnWithOutputReceiver() throws Exception { + final Invocations invocations = new Invocations("AnonymousClass"); + DoFnReflector reflector = underTest(new DoFn() { + + @ProcessElement + public void processElement(ProcessContext c, DoFn.OutputReceiver o) + throws Exception { + invocations.wasProcessElementInvoked = true; + assertSame(c, mockContext); + assertSame(o, mockOutputReceiver); + } + }); + + assertFalse(reflector.usesSingleWindow()); + + checkInvokeProcessElementWorks(reflector, invocations); + } + + @Test + public void testDoFnWithInputProvider() throws Exception { final Invocations invocations = new Invocations("AnonymousClass"); DoFnReflector reflector = underTest(new DoFn() { @ProcessElement - public void processElement(ProcessContext c, WindowingInternals w) + public void processElement(ProcessContext c, DoFn.InputProvider i) throws Exception { invocations.wasProcessElementInvoked = true; assertSame(c, mockContext); - assertSame(w, mockWindowingInternals); + assertSame(i, mockInputProvider); } }); @@ -513,7 +538,7 @@ public void testBadExtraProcessContextType() throws Exception { thrown.expectMessage( "Integer is not a valid context parameter for method " + getClass().getName() + "#badExtraProcessContext(ProcessContext, Integer)" - + ". Should be one of [BoundedWindow, WindowingInternals]"); + + ". Should be one of [BoundedWindow]"); DoFnReflector.verifyProcessMethodArguments( getClass().getDeclaredMethod("badExtraProcessContext", @@ -534,102 +559,148 @@ public void testBadReturnType() throws Exception { } @SuppressWarnings("unused") - private void goodGenerics(DoFn.ProcessContext c, - WindowingInternals i1) {} + private void goodGenerics( + DoFn.ProcessContext c, + DoFn.InputProvider input, + DoFn.OutputReceiver output) {} @Test public void testValidGenerics() throws Exception { - Method method = getClass().getDeclaredMethod("goodGenerics", - DoFn.ProcessContext.class, WindowingInternals.class); + Method method = + getClass() + .getDeclaredMethod( + "goodGenerics", + DoFn.ProcessContext.class, + DoFn.InputProvider.class, + DoFn.OutputReceiver.class); DoFnReflector.verifyProcessMethodArguments(method); } @SuppressWarnings("unused") - private void goodWildcards(DoFn.ProcessContext c, - WindowingInternals i1) {} + private void goodWildcards( + DoFn.ProcessContext c, + DoFn.InputProvider input, + DoFn.OutputReceiver output) {} @Test public void testGoodWildcards() throws Exception { - Method method = getClass().getDeclaredMethod("goodWildcards", - DoFn.ProcessContext.class, WindowingInternals.class); + Method method = + getClass() + .getDeclaredMethod( + "goodWildcards", + DoFn.ProcessContext.class, + DoFn.InputProvider.class, + DoFn.OutputReceiver.class); DoFnReflector.verifyProcessMethodArguments(method); } @SuppressWarnings("unused") - private void goodBoundedWildcards(DoFn.ProcessContext c, - WindowingInternals i1) {} + private void goodBoundedWildcards( + DoFn.ProcessContext c, + DoFn.InputProvider input, + DoFn.OutputReceiver output) {} @Test public void testGoodBoundedWildcards() throws Exception { - Method method = getClass().getDeclaredMethod("goodBoundedWildcards", - DoFn.ProcessContext.class, WindowingInternals.class); + Method method = + getClass() + .getDeclaredMethod( + "goodBoundedWildcards", + DoFn.ProcessContext.class, + DoFn.InputProvider.class, + DoFn.OutputReceiver.class); DoFnReflector.verifyProcessMethodArguments(method); } @SuppressWarnings("unused") private void goodTypeVariables( DoFn.ProcessContext c, - WindowingInternals i1) {} + DoFn.InputProvider input, + DoFn.OutputReceiver output) {} @Test public void testGoodTypeVariables() throws Exception { - Method method = getClass().getDeclaredMethod("goodTypeVariables", - DoFn.ProcessContext.class, WindowingInternals.class); + Method method = + getClass() + .getDeclaredMethod( + "goodTypeVariables", + DoFn.ProcessContext.class, + DoFn.InputProvider.class, + DoFn.OutputReceiver.class); DoFnReflector.verifyProcessMethodArguments(method); } @SuppressWarnings("unused") - private void badGenericTwoArgs(DoFn.ProcessContext c, - WindowingInternals i1) {} + private void badGenericTwoArgs( + DoFn.ProcessContext c, + DoFn.InputProvider input, + DoFn.OutputReceiver output) {} @Test public void testBadGenericsTwoArgs() throws Exception { - Method method = getClass().getDeclaredMethod("badGenericTwoArgs", - DoFn.ProcessContext.class, WindowingInternals.class); + Method method = + getClass() + .getDeclaredMethod( + "badGenericTwoArgs", + DoFn.ProcessContext.class, + DoFn.InputProvider.class, + DoFn.OutputReceiver.class); thrown.expect(IllegalStateException.class); thrown.expectMessage("Incompatible generics in context parameter " - + "WindowingInternals " + + "OutputReceiver " + "for method " + getClass().getName() - + "#badGenericTwoArgs(ProcessContext, WindowingInternals). Should be " - + "WindowingInternals"); + + "#badGenericTwoArgs(ProcessContext, InputProvider, OutputReceiver). Should be " + + "OutputReceiver"); DoFnReflector.verifyProcessMethodArguments(method); } @SuppressWarnings("unused") - private void badGenericWildCards(DoFn.ProcessContext c, - WindowingInternals i1) {} + private void badGenericWildCards( + DoFn.ProcessContext c, + DoFn.InputProvider input, + DoFn.OutputReceiver output) {} @Test public void testBadGenericWildCards() throws Exception { - Method method = getClass().getDeclaredMethod("badGenericWildCards", - DoFn.ProcessContext.class, WindowingInternals.class); + Method method = + getClass() + .getDeclaredMethod( + "badGenericWildCards", + DoFn.ProcessContext.class, + DoFn.InputProvider.class, + DoFn.OutputReceiver.class); thrown.expect(IllegalStateException.class); thrown.expectMessage("Incompatible generics in context parameter " - + "WindowingInternals for method " + + "OutputReceiver for method " + getClass().getName() - + "#badGenericWildCards(ProcessContext, WindowingInternals). Should be " - + "WindowingInternals"); + + "#badGenericWildCards(ProcessContext, InputProvider, OutputReceiver). Should be " + + "OutputReceiver"); DoFnReflector.verifyProcessMethodArguments(method); } @SuppressWarnings("unused") private void badTypeVariables(DoFn.ProcessContext c, - WindowingInternals i1) {} + DoFn.InputProvider input, DoFn.OutputReceiver output) {} @Test public void testBadTypeVariables() throws Exception { - Method method = getClass().getDeclaredMethod("badTypeVariables", - DoFn.ProcessContext.class, WindowingInternals.class); + Method method = + getClass() + .getDeclaredMethod( + "badTypeVariables", + DoFn.ProcessContext.class, + DoFn.InputProvider.class, + DoFn.OutputReceiver.class); thrown.expect(IllegalStateException.class); thrown.expectMessage("Incompatible generics in context parameter " - + "WindowingInternals for method " + getClass().getName() - + "#badTypeVariables(ProcessContext, WindowingInternals). Should be " - + "WindowingInternals"); + + "OutputReceiver for method " + getClass().getName() + + "#badTypeVariables(ProcessContext, InputProvider, OutputReceiver). Should be " + + "OutputReceiver"); DoFnReflector.verifyProcessMethodArguments(method); } diff --git a/sdks/java/microbenchmarks/src/main/java/org/apache/beam/sdk/microbenchmarks/transforms/DoFnReflectorBenchmark.java b/sdks/java/microbenchmarks/src/main/java/org/apache/beam/sdk/microbenchmarks/transforms/DoFnReflectorBenchmark.java index 233b8bec7004..91ecd162d875 100644 --- a/sdks/java/microbenchmarks/src/main/java/org/apache/beam/sdk/microbenchmarks/transforms/DoFnReflectorBenchmark.java +++ b/sdks/java/microbenchmarks/src/main/java/org/apache/beam/sdk/microbenchmarks/transforms/DoFnReflectorBenchmark.java @@ -58,18 +58,7 @@ public class DoFnReflectorBenchmark { private StubDoFnProcessContext stubDoFnContext = new StubDoFnProcessContext(doFn, ELEMENT); private ExtraContextFactory extraContextFactory = - new ExtraContextFactory() { - - @Override - public BoundedWindow window() { - return null; - } - - @Override - public WindowingInternals windowingInternals() { - return null; - } - }; + new DoFn.FakeExtraContextFactory<>(); private DoFnReflector doFnReflector; private OldDoFn adaptedDoFnWithContext;