From 3c6bea08f14cf42d0f16cf0094990e7a0ced7682 Mon Sep 17 00:00:00 2001 From: Thomas Groh Date: Tue, 7 Feb 2017 09:58:16 -0800 Subject: [PATCH] Explicitly pass Pipeline in AppliedPTransform --- .../runners/core/PTransformMatchersTest.java | 20 ++++++++++++----- .../runners/direct/CommittedResultTest.java | 17 +++++++++----- .../direct/WindowEvaluatorFactoryTest.java | 7 +++++- .../dataflow/DataflowPipelineJobTest.java | 9 +++++++- .../beam/sdk/runners/TransformHierarchy.java | 7 +++++- .../sdk/transforms/AppliedPTransform.java | 22 ++++++++++--------- 6 files changed, 58 insertions(+), 24 deletions(-) diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/PTransformMatchersTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/PTransformMatchersTest.java index c286a37de600..fe0c449f0acc 100644 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/PTransformMatchersTest.java +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/PTransformMatchersTest.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.beam.runners.core.runnerapi; +package org.apache.beam.runners.core; import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.is; @@ -24,7 +24,6 @@ import static org.junit.Assert.assertThat; import java.io.Serializable; -import org.apache.beam.runners.core.PTransformMatchers; import org.apache.beam.sdk.runners.PTransformMatcher; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.AppliedPTransform; @@ -60,7 +59,11 @@ public void doStuff(ProcessContext ctxt) { }); PCollection output = input.apply(pardo); - AppliedPTransform application = AppliedPTransform.of("DoStuff", input, output, pardo); + AppliedPTransform application = + AppliedPTransform + ., PCollection, + PTransform, PCollection>> + of("DoStuff", input.expand(), output.expand(), pardo, p); assertThat(matcher.matches(application), is(true)); } @@ -82,7 +85,10 @@ public PCollection expand(PCollection input) { PCollection output = input.apply(subclass); AppliedPTransform application = - AppliedPTransform.of("DoStuff", input, output, subclass); + AppliedPTransform + ., PCollection, + PTransform, PCollection>> + of("DoStuff", input.expand(), output.expand(), subclass, p); assertThat(matcher.matches(application), is(false)); } @@ -94,7 +100,11 @@ public void classEqualToDoesNotMatchUnrelatedClass() { Window.Bound window = Window.into(new GlobalWindows()); PCollection output = input.apply(window); - AppliedPTransform application = AppliedPTransform.of("DoStuff", input, output, window); + AppliedPTransform application = + AppliedPTransform + ., PCollection, + PTransform, PCollection>> + of("DoStuff", input.expand(), output.expand(), window, p); assertThat(matcher.matches(application), is(false)); } diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CommittedResultTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CommittedResultTest.java index 736f554210bb..68d6ebac9bf8 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CommittedResultTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CommittedResultTest.java @@ -54,12 +54,17 @@ public class CommittedResultTest implements Serializable { private transient PCollection created = p.apply(Create.of(1, 2)); private transient AppliedPTransform transform = - AppliedPTransform.of("foo", p.begin(), PDone.in(p), new PTransform() { - @Override - public PDone expand(PBegin begin) { - throw new IllegalArgumentException("Should never be applied"); - } - }); + AppliedPTransform.>of( + "foo", + p.begin().expand(), + PDone.in(p).expand(), + new PTransform() { + @Override + public PDone expand(PBegin begin) { + throw new IllegalArgumentException("Should never be applied"); + } + }, + p); private transient BundleFactory bundleFactory = ImmutableListBundleFactory.create(); @Test diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WindowEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WindowEvaluatorFactoryTest.java index 9d0c68d46ba5..aa841edee593 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WindowEvaluatorFactoryTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WindowEvaluatorFactoryTest.java @@ -34,6 +34,7 @@ import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.AppliedPTransform; import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.windowing.AfterPane; import org.apache.beam.sdk.transforms.windowing.AfterWatermark; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; @@ -311,7 +312,11 @@ private TransformResult runEvaluator( throws Exception { TransformEvaluator evaluator = factory.forApplication( - AppliedPTransform.of("Window", input, windowed, windowTransform), inputBundle); + AppliedPTransform + ., PCollection, + PTransform, PCollection>> + of("Window", input.expand(), windowed.expand(), windowTransform, p), + inputBundle); evaluator.processElement(valueInGlobalWindow); evaluator.processElement(valueInGlobalAndTwoIntervalWindows); diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java index 36bf1290dcbf..2690e71ab4b0 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java @@ -52,6 +52,7 @@ import java.io.IOException; import java.math.BigDecimal; import java.net.SocketTimeoutException; +import java.util.Collections; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; import org.apache.beam.runners.dataflow.testing.TestDataflowPipelineOptions; @@ -71,6 +72,7 @@ import org.apache.beam.sdk.transforms.Sum; import org.apache.beam.sdk.values.PInput; import org.apache.beam.sdk.values.POutput; +import org.apache.beam.sdk.values.TaggedPValue; import org.joda.time.Duration; import org.junit.Before; import org.junit.Rule; @@ -671,7 +673,12 @@ public String getName() { String fullName, PTransform transform, Pipeline p) { PInput input = mock(PInput.class); when(input.getPipeline()).thenReturn(p); - return AppliedPTransform.of(fullName, input, mock(POutput.class), transform); + return AppliedPTransform.of( + fullName, + Collections.emptyList(), + Collections.emptyList(), + transform, + p); } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformHierarchy.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformHierarchy.java index dc8f823db257..a4c28b858b1f 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformHierarchy.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformHierarchy.java @@ -323,7 +323,12 @@ public List getOutputs() { * Returns the {@link AppliedPTransform} representing this {@link Node}. */ public AppliedPTransform toAppliedPTransform() { - return AppliedPTransform.of(getFullName(), input, output, (PTransform) getTransform()); + return AppliedPTransform.of( + getFullName(), + input.expand(), + output.expand(), + (PTransform) getTransform(), + input.getPipeline()); } /** diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/AppliedPTransform.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/AppliedPTransform.java index a6d885971dd7..4de81ac739a3 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/AppliedPTransform.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/AppliedPTransform.java @@ -30,23 +30,25 @@ * *

For internal use. * - * @param transform input type - * @param transform output type + * @param transform input type + * @param transform output type * @param transform type */ @AutoValue -public abstract class AppliedPTransform - > { +public abstract class AppliedPTransform< + InputT extends PInput, OutputT extends POutput, + TransformT extends PTransform> { - public static < - InputT extends PInput, - OutputT extends POutput, + public static > AppliedPTransform of( - String fullName, InputT input, OutputT output, TransformT transform) { + String fullName, + List input, + List output, + TransformT transform, + Pipeline p) { return new AutoValue_AppliedPTransform( - fullName, input.expand(), output.expand(), transform, input.getPipeline()); + fullName, input, output, transform, p); } public abstract String getFullName();