From 0d7cabef297bc340910e2cc1728d7b9743e3f44b Mon Sep 17 00:00:00 2001 From: Luke Cwik Date: Wed, 7 Jun 2017 08:03:43 -0700 Subject: [PATCH 1/4] [BEAM-2421] Make ReflectiveRootOverride pass output PCollection to override implementations --- .../beam/runners/dataflow/DataflowRunner.java | 24 ++++++++++++------- 1 file changed, 15 insertions(+), 9 deletions(-) diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java index cce6ce79b883..ed29330b6504 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java @@ -428,12 +428,15 @@ private ReflectiveRootOverrideFactory( public PTransformReplacement> getReplacementTransform( AppliedPTransform, PTransform>> transform) { PTransform> original = transform.getTransform(); + PCollection output = + (PCollection) Iterables.getOnlyElement(transform.getOutputs().values()); return PTransformReplacement.of( transform.getPipeline().begin(), InstanceBuilder.ofType(replacement) .withArg(DataflowRunner.class, runner) .withArg( (Class>>) original.getClass(), original) + .withArg((Class>) output.getClass(), output) .build()); } @@ -809,11 +812,12 @@ private static class StreamingPubsubIORead extends PTransform> { private final PubsubUnboundedSource transform; - /** - * Builds an instance of this class from the overridden transform. - */ + /** Builds an instance of this class from the overridden transform. */ + @SuppressWarnings("unused") // used via reflection in DataflowRunner#apply() public StreamingPubsubIORead( - DataflowRunner runner, PubsubUnboundedSource transform) { + DataflowRunner runner, + PubsubUnboundedSource transform, + PCollection originalOutput) { this.transform = transform; } @@ -992,11 +996,11 @@ public void translate( private static class StreamingUnboundedRead extends PTransform> { private final UnboundedSource source; - /** - * Builds an instance of this class from the overridden transform. - */ + /** Builds an instance of this class from the overridden transform. */ @SuppressWarnings("unused") // used via reflection in DataflowRunner#apply() - public StreamingUnboundedRead(DataflowRunner runner, Read.Unbounded transform) { + public StreamingUnboundedRead(DataflowRunner runner, + Read.Unbounded transform, + PCollection originalOutput) { this.source = transform.getSource(); } @@ -1111,7 +1115,9 @@ private static class StreamingBoundedRead extends PTransform transform) { + public StreamingBoundedRead(DataflowRunner runner, + Read.Bounded transform, + PCollection originalOutput) { this.source = transform.getSource(); } From f3c3391f33c00ab925a37e66040b977b154e50b1 Mon Sep 17 00:00:00 2001 From: Luke Cwik Date: Wed, 7 Jun 2017 08:53:14 -0700 Subject: [PATCH 2/4] [BEAM-2421] Swap to use an Impulse primitive + DoFn for Create when executing with the Fn API. --- .../beam/runners/dataflow/DataflowRunner.java | 119 ++++++++++++++++++ 1 file changed, 119 insertions(+) diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java index ed29330b6504..d5225eb361f0 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java @@ -49,6 +49,7 @@ import java.nio.channels.Channels; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; import java.util.Collections; import java.util.HashSet; import java.util.List; @@ -57,6 +58,7 @@ import java.util.Set; import java.util.SortedSet; import java.util.TreeSet; +import org.apache.beam.runners.core.construction.CoderTranslation; import org.apache.beam.runners.core.construction.DeduplicatedFlattenFactory; import org.apache.beam.runners.core.construction.EmptyFlattenAsCreateFactory; import org.apache.beam.runners.core.construction.PTransformMatchers; @@ -79,10 +81,12 @@ import org.apache.beam.sdk.PipelineResult.State; import org.apache.beam.sdk.PipelineRunner; import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.coders.ByteArrayCoder; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.Coder.NonDeterministicException; import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.coders.VoidCoder; +import org.apache.beam.sdk.common.runner.v1.RunnerApi; import org.apache.beam.sdk.extensions.gcp.storage.PathValidator; import org.apache.beam.sdk.io.BoundedSource; import org.apache.beam.sdk.io.FileSystems; @@ -103,6 +107,7 @@ import org.apache.beam.sdk.runners.TransformHierarchy.Node; import org.apache.beam.sdk.transforms.Combine; import org.apache.beam.sdk.transforms.Combine.GroupedValues; +import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; @@ -113,6 +118,7 @@ import org.apache.beam.sdk.transforms.WithKeys; import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.util.CoderUtils; import org.apache.beam.sdk.util.InstanceBuilder; import org.apache.beam.sdk.util.MimeTypes; import org.apache.beam.sdk.util.NameUtils; @@ -312,6 +318,12 @@ private List getOverrides(boolean streaming) { PTransformMatchers.classEqualTo(PubsubUnboundedSink.class), new StreamingPubsubIOWriteOverrideFactory(this))); } + if (hasExperiment(options, "beam_fn_api")) { + overridesBuilder.add( + PTransformOverride.of( + PTransformMatchers.classEqualTo(Create.Values.class), + new ReflectiveRootOverrideFactory(StreamingFnApiCreate.class, this))); + } overridesBuilder .add( // Streaming Bounded Read is implemented in terms of Streaming Unbounded Read, and @@ -985,6 +997,113 @@ public void translate( // ================================================================================ + /** + * Specialized implementation for + * {@link org.apache.beam.sdk.transforms.Create.Values Create.Values} for the Dataflow runner in + * streaming mode over the Fn API. + */ + private static class StreamingFnApiCreate extends PTransform> { + private final Create.Values transform; + private final PCollection originalOutput; + + /** Builds an instance of this class from the overridden transform. */ + @SuppressWarnings("unused") // used via reflection in DataflowRunner#apply() + public StreamingFnApiCreate(DataflowRunner runner, + Create.Values transform, + PCollection originalOutput) { + this.transform = transform; + this.originalOutput = originalOutput; + } + + @Override + public final PCollection expand(PBegin input) { + try { + PCollection pc = Pipeline + .applyTransform(input, new Impulse(IsBounded.BOUNDED)) + .apply(ParDo.of(DecodeAndEmitDoFn + .fromIterable(transform.getElements(), originalOutput.getCoder()))); + pc.setCoder(originalOutput.getCoder()); + return pc; + } catch (IOException e) { + throw new IllegalStateException("Unable to encode elements.", e); + } + } + + /** + * A DoFn which stores encoded versions of elements and a representation of a Coder + * capable of decoding those elements. + * + *

TODO: Make this a SplittableDoFn. + */ + private static class DecodeAndEmitDoFn extends DoFn { + public static DecodeAndEmitDoFn fromIterable(Iterable elements, Coder elemCoder) + throws IOException { + ImmutableList.Builder allElementsBytes = ImmutableList.builder(); + for (T element : elements) { + byte[] bytes = CoderUtils.encodeToByteArray(elemCoder, element); + allElementsBytes.add(bytes); + } + return new DecodeAndEmitDoFn<>(allElementsBytes.build(), elemCoder); + } + + private final Collection elements; + private final RunnerApi.MessageWithComponents coderSpec; + + private DecodeAndEmitDoFn(Collection elements, Coder coder) throws IOException { + this.elements = elements; + this.coderSpec = CoderTranslation.toProto(coder); + } + + @ProcessElement + public void processElement(ProcessContext context) throws IOException { + Coder coder = + (Coder) CoderTranslation.fromProto(coderSpec.getCoder(), coderSpec.getComponents()); + for (byte[] element : elements) { + context.output(CoderUtils.decodeFromByteArray(coder, element)); + } + } + } + } + + /** The Dataflow specific override for the impulse primitive. */ + private static class Impulse extends PTransform> { + private final IsBounded isBounded; + + private Impulse(IsBounded isBounded) { + this.isBounded = isBounded; + } + + @Override + public PCollection expand(PBegin input) { + return PCollection.createPrimitiveOutputInternal( + input.getPipeline(), WindowingStrategy.globalDefault(), isBounded); + } + + @Override + protected Coder getDefaultOutputCoder() { + return ByteArrayCoder.of(); + } + + private static class Translator implements TransformTranslator { + @Override + public void translate(Impulse transform, TranslationContext context) { + if (context.getPipelineOptions().isStreaming()) { + StepTranslationContext stepContext = context.addStep(transform, "ParallelRead"); + stepContext.addInput(PropertyNames.FORMAT, "pubsub"); + stepContext.addInput(PropertyNames.PUBSUB_SUBSCRIPTION, "_starting_signal/"); + stepContext.addOutput(context.getOutput(transform)); + } else { + throw new UnsupportedOperationException( + "Impulse source for batch pipelines has not been defined."); + } + } + } + + static { + DataflowPipelineTranslator.registerTransformTranslator(Impulse.class, new Translator()); + } + } + /** * Specialized implementation for * {@link org.apache.beam.sdk.io.Read.Unbounded Read.Unbounded} for the From c6cbaf4ca8dc7b570aad18931fcc854f234b6e30 Mon Sep 17 00:00:00 2001 From: Luke Cwik Date: Wed, 7 Jun 2017 10:56:14 -0700 Subject: [PATCH 3/4] fixup! Address PR comments --- .../beam/runners/dataflow/DataflowRunner.java | 49 ++++++++++++------- 1 file changed, 31 insertions(+), 18 deletions(-) diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java index d5225eb361f0..320c46c51219 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java @@ -322,7 +322,7 @@ private List getOverrides(boolean streaming) { overridesBuilder.add( PTransformOverride.of( PTransformMatchers.classEqualTo(Create.Values.class), - new ReflectiveRootOverrideFactory(StreamingFnApiCreate.class, this))); + new StreamingFnApiCreateOverrideFactory())); } overridesBuilder .add( @@ -440,15 +440,12 @@ private ReflectiveRootOverrideFactory( public PTransformReplacement> getReplacementTransform( AppliedPTransform, PTransform>> transform) { PTransform> original = transform.getTransform(); - PCollection output = - (PCollection) Iterables.getOnlyElement(transform.getOutputs().values()); return PTransformReplacement.of( transform.getPipeline().begin(), InstanceBuilder.ofType(replacement) .withArg(DataflowRunner.class, runner) .withArg( (Class>>) original.getClass(), original) - .withArg((Class>) output.getClass(), output) .build()); } @@ -826,10 +823,7 @@ private static class StreamingPubsubIORead /** Builds an instance of this class from the overridden transform. */ @SuppressWarnings("unused") // used via reflection in DataflowRunner#apply() - public StreamingPubsubIORead( - DataflowRunner runner, - PubsubUnboundedSource transform, - PCollection originalOutput) { + public StreamingPubsubIORead(DataflowRunner runner, PubsubUnboundedSource transform) { this.transform = transform; } @@ -997,6 +991,31 @@ public void translate( // ================================================================================ + /** + * A PTranform override factory which maps Create.Values PTransforms for streaming pipelines + * into a Dataflow specific variant. + */ + private static class StreamingFnApiCreateOverrideFactory + implements PTransformOverrideFactory, Create.Values> { + + @Override + public PTransformReplacement> getReplacementTransform( + AppliedPTransform, Create.Values> transform) { + Create.Values original = transform.getTransform(); + PCollection output = + (PCollection) Iterables.getOnlyElement(transform.getOutputs().values()); + return PTransformReplacement.of( + transform.getPipeline().begin(), + new StreamingFnApiCreate<>(original, output)); + } + + @Override + public Map mapOutputs( + Map, PValue> outputs, PCollection newOutput) { + return ReplacementOutputs.singleton(outputs, newOutput); + } + } + /** * Specialized implementation for * {@link org.apache.beam.sdk.transforms.Create.Values Create.Values} for the Dataflow runner in @@ -1006,9 +1025,7 @@ private static class StreamingFnApiCreate extends PTransform transform; private final PCollection originalOutput; - /** Builds an instance of this class from the overridden transform. */ - @SuppressWarnings("unused") // used via reflection in DataflowRunner#apply() - public StreamingFnApiCreate(DataflowRunner runner, + private StreamingFnApiCreate( Create.Values transform, PCollection originalOutput) { this.transform = transform; @@ -1033,7 +1050,7 @@ public final PCollection expand(PBegin input) { * A DoFn which stores encoded versions of elements and a representation of a Coder * capable of decoding those elements. * - *

TODO: Make this a SplittableDoFn. + *

TODO(BEAM-2422): Make this a SplittableDoFn. */ private static class DecodeAndEmitDoFn extends DoFn { public static DecodeAndEmitDoFn fromIterable(Iterable elements, Coder elemCoder) @@ -1117,9 +1134,7 @@ private static class StreamingUnboundedRead extends PTransform transform, - PCollection originalOutput) { + public StreamingUnboundedRead(DataflowRunner runner, Read.Unbounded transform) { this.source = transform.getSource(); } @@ -1234,9 +1249,7 @@ private static class StreamingBoundedRead extends PTransform transform, - PCollection originalOutput) { + public StreamingBoundedRead(DataflowRunner runner, Read.Bounded transform) { this.source = transform.getSource(); } From e06ac4bcf4b44af3c4a0e890f3299f84ca8051cd Mon Sep 17 00:00:00 2001 From: Luke Cwik Date: Wed, 7 Jun 2017 11:01:17 -0700 Subject: [PATCH 4/4] fixup! Fix TODO clause --- .../java/org/apache/beam/runners/dataflow/DataflowRunner.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java index 320c46c51219..3e7c8ce98475 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java @@ -1050,7 +1050,7 @@ public final PCollection expand(PBegin input) { * A DoFn which stores encoded versions of elements and a representation of a Coder * capable of decoding those elements. * - *

TODO(BEAM-2422): Make this a SplittableDoFn. + *

TODO: BEAM-2422 - Make this a SplittableDoFn. */ private static class DecodeAndEmitDoFn extends DoFn { public static DecodeAndEmitDoFn fromIterable(Iterable elements, Coder elemCoder)