From 5dbf06fe985de22602c04d4da632434e3b4c7d88 Mon Sep 17 00:00:00 2001 From: Thomas Groh Date: Thu, 9 Feb 2017 11:43:24 -0800 Subject: [PATCH 1/2] Add ParDo Matchers to PTransformMatchers These match Splittable ParDos and ParDos that use State and Timers. Update tests to remove excess generic args. --- .../beam/runners/core/PTransformMatchers.java | 64 +++++ .../runners/core/PTransformMatchersTest.java | 224 +++++++++++++++--- 2 files changed, 257 insertions(+), 31 deletions(-) diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/PTransformMatchers.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/PTransformMatchers.java index 362e8dc32223..ff6b541dc24b 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/PTransformMatchers.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/PTransformMatchers.java @@ -21,7 +21,11 @@ import org.apache.beam.sdk.annotations.Experimental.Kind; import org.apache.beam.sdk.runners.PTransformMatcher; import org.apache.beam.sdk.transforms.AppliedPTransform; +import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.reflect.DoFnSignature; +import org.apache.beam.sdk.transforms.reflect.DoFnSignatures; /** * A {@link PTransformMatcher} that matches {@link PTransform PTransforms} based on the class of the @@ -56,4 +60,64 @@ public boolean matches(AppliedPTransform application) { return application.getTransform().getClass().equals(clazz); } } + + public static PTransformMatcher splittableParDoSingle() { + return new PTransformMatcher() { + @Override + public boolean matches(AppliedPTransform application) { + PTransform transform = application.getTransform(); + if (transform instanceof ParDo.Bound) { + DoFn fn = ((ParDo.Bound) transform).getFn(); + DoFnSignature signature = DoFnSignatures.signatureForDoFn(fn); + return signature.processElement().isSplittable(); + } + return false; + } + }; + } + + public static PTransformMatcher stateOrTimerParDoSingle() { + return new PTransformMatcher() { + @Override + public boolean matches(AppliedPTransform application) { + PTransform transform = application.getTransform(); + if (transform instanceof ParDo.Bound) { + DoFn fn = ((ParDo.Bound) transform).getFn(); + DoFnSignature signature = DoFnSignatures.signatureForDoFn(fn); + return signature.usesState() || signature.usesTimers(); + } + return false; + } + }; + } + + public static PTransformMatcher splittableParDoMulti() { + return new PTransformMatcher() { + @Override + public boolean matches(AppliedPTransform application) { + PTransform transform = application.getTransform(); + if (transform instanceof ParDo.BoundMulti) { + DoFn fn = ((ParDo.BoundMulti) transform).getFn(); + DoFnSignature signature = DoFnSignatures.signatureForDoFn(fn); + return signature.processElement().isSplittable(); + } + return false; + } + }; + } + + public static PTransformMatcher stateOrTimerParDoMulti() { + return new PTransformMatcher() { + @Override + public boolean matches(AppliedPTransform application) { + PTransform transform = application.getTransform(); + if (transform instanceof ParDo.BoundMulti) { + DoFn fn = ((ParDo.BoundMulti) transform).getFn(); + DoFnSignature signature = DoFnSignatures.signatureForDoFn(fn); + return signature.usesState() || signature.usesTimers(); + } + return false; + } + }; + } } diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/PTransformMatchersTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/PTransformMatchersTest.java index fe0c449f0acc..83795f1de9f6 100644 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/PTransformMatchersTest.java +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/PTransformMatchersTest.java @@ -23,18 +23,33 @@ import static org.hamcrest.Matchers.not; import static org.junit.Assert.assertThat; +import com.google.common.base.MoreObjects; import java.io.Serializable; +import org.apache.beam.sdk.coders.VarIntCoder; import org.apache.beam.sdk.runners.PTransformMatcher; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.AppliedPTransform; -import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker; import org.apache.beam.sdk.transforms.windowing.GlobalWindows; import org.apache.beam.sdk.transforms.windowing.Window; +import org.apache.beam.sdk.util.TimeDomain; +import org.apache.beam.sdk.util.Timer; +import org.apache.beam.sdk.util.TimerSpec; +import org.apache.beam.sdk.util.TimerSpecs; +import org.apache.beam.sdk.util.WindowingStrategy; +import org.apache.beam.sdk.util.state.StateSpec; +import org.apache.beam.sdk.util.state.StateSpecs; +import org.apache.beam.sdk.util.state.ValueState; +import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollection.IsBounded; +import org.apache.beam.sdk.values.TupleTag; +import org.apache.beam.sdk.values.TupleTagList; import org.hamcrest.Matchers; +import org.joda.time.Duration; import org.junit.Rule; import org.junit.Test; import org.junit.runner.RunWith; @@ -48,47 +63,52 @@ public class PTransformMatchersTest implements Serializable { @Rule public transient TestPipeline p = TestPipeline.create().enableAbandonedNodeEnforcement(false); + /** + * Gets the {@link AppliedPTransform} that has a created {@code PCollection>} + * as input. + */ + private AppliedPTransform getAppliedTransform(PTransform pardo) { + PCollection> input = + PCollection.createPrimitiveOutputInternal( + p, WindowingStrategy.globalDefault(), IsBounded.BOUNDED); + PCollection output = + PCollection.createPrimitiveOutputInternal( + p, WindowingStrategy.globalDefault(), IsBounded.BOUNDED); + + return AppliedPTransform.of("pardo", input.expand(), output.expand(), pardo, p); + } + @Test public void classEqualToMatchesSameClass() { PTransformMatcher matcher = PTransformMatchers.classEqualTo(ParDo.Bound.class); - PCollection input = p.apply(Create.of(1)); - ParDo.Bound pardo = ParDo.of(new DoFn() { - @ProcessElement - public void doStuff(ProcessContext ctxt) { - } - }); - PCollection output = input.apply(pardo); - AppliedPTransform application = - AppliedPTransform - ., PCollection, - PTransform, PCollection>> - of("DoStuff", input.expand(), output.expand(), pardo, p); + getAppliedTransform( + ParDo.of( + new DoFn, Integer>() { + @ProcessElement + public void doStuff(ProcessContext ctxt) {} + })); assertThat(matcher.matches(application), is(true)); } @Test - public void classEqualToMatchesSubClass() { - class MyPTransform extends PTransform, PCollection> { + public void classEqualToDoesNotMatchSubclass() { + class MyPTransform extends PTransform>, PCollection> { @Override - public PCollection expand(PCollection input) { - return input; + public PCollection expand(PCollection> input) { + return PCollection.createPrimitiveOutputInternal( + input.getPipeline(), input.getWindowingStrategy(), input.isBounded()); } } PTransformMatcher matcher = PTransformMatchers.classEqualTo(MyPTransform.class); - PCollection input = p.apply(Create.of(1)); MyPTransform subclass = new MyPTransform() {}; assertThat(subclass.getClass(), not(Matchers.>equalTo(MyPTransform.class))); assertThat(subclass, instanceOf(MyPTransform.class)); - PCollection output = input.apply(subclass); AppliedPTransform application = - AppliedPTransform - ., PCollection, - PTransform, PCollection>> - of("DoStuff", input.expand(), output.expand(), subclass, p); + getAppliedTransform(subclass); assertThat(matcher.matches(application), is(false)); } @@ -96,16 +116,158 @@ public PCollection expand(PCollection input) { @Test public void classEqualToDoesNotMatchUnrelatedClass() { PTransformMatcher matcher = PTransformMatchers.classEqualTo(ParDo.Bound.class); - PCollection input = p.apply(Create.of(1)); - Window.Bound window = Window.into(new GlobalWindows()); - PCollection output = input.apply(window); - AppliedPTransform application = - AppliedPTransform - ., PCollection, - PTransform, PCollection>> - of("DoStuff", input.expand(), output.expand(), window, p); + getAppliedTransform(Window.>into(new GlobalWindows())); assertThat(matcher.matches(application), is(false)); } + + private DoFn, Integer> doFn = + new DoFn, Integer>() { + @ProcessElement + public void simpleProcess(ProcessContext ctxt) { + ctxt.output(ctxt.element().getValue() + 1); + } + }; + private abstract static class SomeTracker implements RestrictionTracker {} + private DoFn, Integer> splittableDoFn = + new DoFn, Integer>() { + @ProcessElement + public void processElement(ProcessContext context, SomeTracker tracker) {} + + @GetInitialRestriction + public Void getInitialRestriction(KV element) { + return null; + } + + @NewTracker + public SomeTracker newTracker(Void restriction) { + return null; + } + }; + private DoFn, Integer> doFnWithState = + new DoFn, Integer>() { + private final String stateId = "mystate"; + + @StateId(stateId) + private final StateSpec> intState = + StateSpecs.value(VarIntCoder.of()); + + @ProcessElement + public void processElement(ProcessContext c, @StateId(stateId) ValueState state) { + Integer currentValue = MoreObjects.firstNonNull(state.read(), 0); + c.output(currentValue); + state.write(currentValue + 1); + } + }; + private DoFn, Integer> doFnWithTimers = + new DoFn, Integer>() { + private final String timerId = "myTimer"; + + @TimerId(timerId) + private final TimerSpec spec = TimerSpecs.timer(TimeDomain.EVENT_TIME); + + @ProcessElement + public void processElement(ProcessContext context, @TimerId(timerId) Timer timer) { + timer.setForNowPlus(Duration.standardSeconds(1)); + context.output(3); + } + + @OnTimer(timerId) + public void onTimer(OnTimerContext context) { + context.output(42); + } + }; + + /** + * Demonstrates that a {@link ParDo.Bound} does not match any ParDo matcher. + */ + @Test + public void parDoSingle() { + AppliedPTransform parDoApplication = getAppliedTransform(ParDo.of(doFn)); + + assertThat(PTransformMatchers.splittableParDoMulti().matches(parDoApplication), is(false)); + assertThat(PTransformMatchers.stateOrTimerParDoMulti().matches(parDoApplication), is(false)); + assertThat(PTransformMatchers.splittableParDoSingle().matches(parDoApplication), is(false)); + assertThat(PTransformMatchers.stateOrTimerParDoSingle().matches(parDoApplication), is(false)); + } + + @Test + public void parDoSingleSplittable() { + AppliedPTransform parDoApplication = getAppliedTransform(ParDo.of(splittableDoFn)); + assertThat(PTransformMatchers.splittableParDoSingle().matches(parDoApplication), is(true)); + + assertThat(PTransformMatchers.splittableParDoMulti().matches(parDoApplication), is(false)); + assertThat(PTransformMatchers.stateOrTimerParDoMulti().matches(parDoApplication), is(false)); + assertThat(PTransformMatchers.stateOrTimerParDoSingle().matches(parDoApplication), is(false)); + } + + @Test + public void parDoSingleWithState() { + AppliedPTransform parDoApplication = getAppliedTransform(ParDo.of(doFnWithState)); + assertThat(PTransformMatchers.stateOrTimerParDoSingle().matches(parDoApplication), is(true)); + + assertThat(PTransformMatchers.splittableParDoMulti().matches(parDoApplication), is(false)); + assertThat(PTransformMatchers.stateOrTimerParDoMulti().matches(parDoApplication), is(false)); + assertThat(PTransformMatchers.splittableParDoSingle().matches(parDoApplication), is(false)); + } + + @Test + public void parDoSingleWithTimers() { + AppliedPTransform parDoApplication = + getAppliedTransform(ParDo.of(doFnWithTimers)); + assertThat(PTransformMatchers.stateOrTimerParDoSingle().matches(parDoApplication), is(true)); + + assertThat(PTransformMatchers.splittableParDoMulti().matches(parDoApplication), is(false)); + assertThat(PTransformMatchers.stateOrTimerParDoMulti().matches(parDoApplication), is(false)); + assertThat(PTransformMatchers.splittableParDoSingle().matches(parDoApplication), is(false)); + } + + @Test + public void parDoMulti() { + AppliedPTransform parDoApplication = + getAppliedTransform( + ParDo.of(doFn).withOutputTags(new TupleTag(), TupleTagList.empty())); + + assertThat(PTransformMatchers.splittableParDoMulti().matches(parDoApplication), is(false)); + assertThat(PTransformMatchers.stateOrTimerParDoMulti().matches(parDoApplication), is(false)); + assertThat(PTransformMatchers.splittableParDoSingle().matches(parDoApplication), is(false)); + assertThat(PTransformMatchers.stateOrTimerParDoSingle().matches(parDoApplication), is(false)); + } + + @Test + public void parDoMultiSplittable() { + AppliedPTransform parDoApplication = + getAppliedTransform( + ParDo.of(splittableDoFn).withOutputTags(new TupleTag(), TupleTagList.empty())); + assertThat(PTransformMatchers.splittableParDoMulti().matches(parDoApplication), is(true)); + + assertThat(PTransformMatchers.stateOrTimerParDoMulti().matches(parDoApplication), is(false)); + assertThat(PTransformMatchers.splittableParDoSingle().matches(parDoApplication), is(false)); + assertThat(PTransformMatchers.stateOrTimerParDoSingle().matches(parDoApplication), is(false)); + } + + @Test + public void parDoMultiWithState() { + AppliedPTransform parDoApplication = + getAppliedTransform( + ParDo.of(doFnWithState).withOutputTags(new TupleTag(), TupleTagList.empty())); + assertThat(PTransformMatchers.stateOrTimerParDoMulti().matches(parDoApplication), is(true)); + + assertThat(PTransformMatchers.splittableParDoMulti().matches(parDoApplication), is(false)); + assertThat(PTransformMatchers.splittableParDoSingle().matches(parDoApplication), is(false)); + assertThat(PTransformMatchers.stateOrTimerParDoSingle().matches(parDoApplication), is(false)); + } + + @Test + public void parDoMultiWithTimers() { + AppliedPTransform parDoApplication = + getAppliedTransform( + ParDo.of(doFnWithTimers).withOutputTags(new TupleTag(), TupleTagList.empty())); + assertThat(PTransformMatchers.stateOrTimerParDoMulti().matches(parDoApplication), is(true)); + + assertThat(PTransformMatchers.splittableParDoMulti().matches(parDoApplication), is(false)); + assertThat(PTransformMatchers.splittableParDoSingle().matches(parDoApplication), is(false)); + assertThat(PTransformMatchers.stateOrTimerParDoSingle().matches(parDoApplication), is(false)); + } } From 96ab5d1aa8d63641164870a05bf77b2d2f725f1b Mon Sep 17 00:00:00 2001 From: Thomas Groh Date: Tue, 14 Feb 2017 11:32:01 -0800 Subject: [PATCH 2/2] fixup! Add ParDo Matchers to PTransformMatchers --- .../beam/runners/core/PTransformMatchers.java | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/PTransformMatchers.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/PTransformMatchers.java index ff6b541dc24b..1d7e24e0f0b8 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/PTransformMatchers.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/PTransformMatchers.java @@ -25,6 +25,7 @@ import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.reflect.DoFnSignature; +import org.apache.beam.sdk.transforms.reflect.DoFnSignature.ProcessElementMethod; import org.apache.beam.sdk.transforms.reflect.DoFnSignatures; /** @@ -61,6 +62,10 @@ public boolean matches(AppliedPTransform application) { } } + /** + * A {@link PTransformMatcher} that matches a {@link ParDo.Bound} containing a {@link DoFn} that + * is splittable, as signified by {@link ProcessElementMethod#isSplittable()}. + */ public static PTransformMatcher splittableParDoSingle() { return new PTransformMatcher() { @Override @@ -76,6 +81,11 @@ public boolean matches(AppliedPTransform application) { }; } + /** + * A {@link PTransformMatcher} that matches a {@link ParDo.Bound} containing a {@link DoFn} that + * uses state or timers, as specified by {@link DoFnSignature#usesState()} and + * {@link DoFnSignature#usesTimers()}. + */ public static PTransformMatcher stateOrTimerParDoSingle() { return new PTransformMatcher() { @Override @@ -91,6 +101,10 @@ public boolean matches(AppliedPTransform application) { }; } + /** + * A {@link PTransformMatcher} that matches a {@link ParDo.BoundMulti} containing a {@link DoFn} + * that is splittable, as signified by {@link ProcessElementMethod#isSplittable()}. + */ public static PTransformMatcher splittableParDoMulti() { return new PTransformMatcher() { @Override @@ -106,6 +120,11 @@ public boolean matches(AppliedPTransform application) { }; } + /** + * A {@link PTransformMatcher} that matches a {@link ParDo.BoundMulti} containing a {@link DoFn} + * that uses state or timers, as specified by {@link DoFnSignature#usesState()} and + * {@link DoFnSignature#usesTimers()}. + */ public static PTransformMatcher stateOrTimerParDoMulti() { return new PTransformMatcher() { @Override