From 26c93681795dc9b98c3f00c2cd61c38b36656e0b Mon Sep 17 00:00:00 2001 From: Kenneth Knowles Date: Fri, 19 May 2017 21:35:43 -0700 Subject: [PATCH 01/11] Make SdkComponents public for TransformPayloadTranslator --- .../apache/beam/runners/core/construction/SdkComponents.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SdkComponents.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SdkComponents.java index b0f164f4bcbd..0d3ba6093c3e 100644 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SdkComponents.java +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SdkComponents.java @@ -46,7 +46,7 @@ import org.apache.beam.sdk.values.WindowingStrategy; /** SDK objects that will be represented at some later point within a {@link Components} object. */ -class SdkComponents { +public class SdkComponents { private final RunnerApi.Components.Builder componentsBuilder; private final BiMap, String> transformIds; From 1dc134cf341d1eb4fa936f6fe7b83a3edbb64687 Mon Sep 17 00:00:00 2001 From: Kenneth Knowles Date: Fri, 19 May 2017 20:54:36 -0700 Subject: [PATCH 02/11] Centralize primitive URNs in PTransformTranslation class --- .../runners/core/construction/PTransformTranslation.java | 7 +++++++ .../beam/runners/core/construction/ParDoTranslation.java | 9 +++------ 2 files changed, 10 insertions(+), 6 deletions(-) diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java index 86638dec8098..8be023ad413b 100644 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java @@ -38,6 +38,13 @@ * protocol buffers}. */ public class PTransformTranslation { + + public static final String PAR_DO_TRANSFORM_URN = "urn:beam:transform:pardo:v1"; + public static final String FLATTEN_TRANSFORM_URN = "urn:beam:transform:flatten:v1"; + public static final String GROUP_BY_KEY_TRANSFORM_URN = "urn:beam:transform:groupbykey:v1"; + public static final String READ_TRANSFORM_URN = "urn:beam:transform:read:v1"; + public static final String WINDOW_TRANSFORM_URN = "urn:beam:transform:window:v1"; + private static final Map, TransformPayloadTranslator> KNOWN_PAYLOAD_TRANSLATORS = loadTransformPayloadTranslators(); diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java index 28d577ff2211..83277bbefd65 100644 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java @@ -19,6 +19,7 @@ package org.apache.beam.runners.core.construction; import static com.google.common.base.Preconditions.checkArgument; +import static org.apache.beam.runners.core.construction.PTransformTranslation.PAR_DO_TRANSFORM_URN; import com.google.auto.service.AutoService; import com.google.auto.value.AutoValue; @@ -74,10 +75,6 @@ * Utilities for interacting with {@link ParDo} instances and {@link ParDoPayload} protos. */ public class ParDoTranslation { - /** - * The URN for a {@link ParDoPayload}. - */ - public static final String PAR_DO_PAYLOAD_URN = "urn:beam:pardo:v1"; /** * The URN for an unknown Java {@link DoFn}. */ @@ -108,7 +105,7 @@ public FunctionSpec translate( AppliedPTransform> transform, SdkComponents components) { ParDoPayload payload = toProto(transform.getTransform(), components); return RunnerApi.FunctionSpec.newBuilder() - .setUrn(PAR_DO_PAYLOAD_URN) + .setUrn(PAR_DO_TRANSFORM_URN) .setParameter(Any.pack(payload)) .build(); } @@ -166,7 +163,7 @@ public static TupleTag getMainOutputTag(ParDoPayload payload) public static RunnerApi.PCollection getMainInput( RunnerApi.PTransform ptransform, Components components) throws IOException { checkArgument( - ptransform.getSpec().getUrn().equals(PAR_DO_PAYLOAD_URN), + ptransform.getSpec().getUrn().equals(PAR_DO_TRANSFORM_URN), "Unexpected payload type %s", ptransform.getSpec().getUrn()); ParDoPayload payload = ptransform.getSpec().getParameter().unpack(ParDoPayload.class); From 877408e078b783b3a782ae1ba0e335ea44c4a0e2 Mon Sep 17 00:00:00 2001 From: Kenneth Knowles Date: Wed, 24 May 2017 15:28:38 -0700 Subject: [PATCH 03/11] Add URN for Splittable ProcessElement pseudo-primitive --- .../apache/beam/runners/core/construction/SplittableParDo.java | 3 +++ 1 file changed, 3 insertions(+) diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDo.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDo.java index 23ba66ab983d..dfca7d2d94eb 100644 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDo.java +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDo.java @@ -64,6 +64,9 @@ public class SplittableParDo extends PTransform, PCollectionTuple> { private final ParDo.MultiOutput parDo; + public static final String SPLITTABLE_PROCESS_URN = + "urn:beam:runners_core:transforms:splittable_process:v1"; + /** * Creates the transform for the given original multi-output {@link ParDo}. * From 0bf4ddbea7d0f790fad0cc8df20f0d01f38dd568 Mon Sep 17 00:00:00 2001 From: Kenneth Knowles Date: Fri, 19 May 2017 20:55:25 -0700 Subject: [PATCH 04/11] Allow getting URN for class of transform via translator --- .../core/construction/PTransformTranslation.java | 14 +++++++++++++- .../core/construction/ParDoTranslation.java | 5 +++++ .../core/construction/WindowIntoTranslation.java | 7 +++++++ 3 files changed, 25 insertions(+), 1 deletion(-) diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java index 8be023ad413b..35bb0e3dad4e 100644 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java @@ -116,11 +116,23 @@ private static String toProto(TupleTag tag) { return tag.getId(); } + public static String urnForTransform(PTransform transform) { + TransformPayloadTranslator translator = + KNOWN_PAYLOAD_TRANSLATORS.get(transform.getClass()); + if (translator == null) { + throw new IllegalStateException( + String.format("No translator known for %s", transform.getClass().getName())); + } + + return translator.getUrn(transform); + } + /** * A translator consumes a {@link PTransform} application and produces the appropriate * FunctionSpec for a distinguished or primitive transform within the Beam runner API. */ public interface TransformPayloadTranslator> { - FunctionSpec translate(AppliedPTransform transform, SdkComponents components); + String getUrn(T transform); + FunctionSpec translate(AppliedPTransform application, SdkComponents components); } } diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java index 83277bbefd65..1c81f8ce05d5 100644 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java @@ -100,6 +100,11 @@ public static TransformPayloadTranslator create() { private ParDoPayloadTranslator() {} + @Override + public String getUrn(ParDo.MultiOutput transform) { + return PAR_DO_TRANSFORM_URN; + } + @Override public FunctionSpec translate( AppliedPTransform> transform, SdkComponents components) { diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowIntoTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowIntoTranslation.java index 215beba43cd8..33faa02f8801 100644 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowIntoTranslation.java +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowIntoTranslation.java @@ -27,6 +27,7 @@ import org.apache.beam.sdk.common.runner.v1.RunnerApi.WindowIntoPayload; import org.apache.beam.sdk.runners.AppliedPTransform; import org.apache.beam.sdk.transforms.windowing.Window; +import org.apache.beam.sdk.transforms.windowing.Window.Assign; import org.apache.beam.sdk.transforms.windowing.WindowFn; /** @@ -36,6 +37,12 @@ public class WindowIntoTranslation { static class WindowAssignTranslator implements TransformPayloadTranslator> { + + @Override + public String getUrn(Assign transform) { + return PTransforms.WINDOW_TRANSFORM_URN; + } + @Override public FunctionSpec translate( AppliedPTransform> transform, SdkComponents components) { From afaebb13f73868799b83c7af95cca2732e3ecb9a Mon Sep 17 00:00:00 2001 From: Kenneth Knowles Date: Mon, 22 May 2017 21:00:02 -0700 Subject: [PATCH 05/11] Add registration for Read and WindowInto translators --- .../core/construction/ReadTranslation.java | 74 ++++++++++++++++++- .../construction/WindowIntoTranslation.java | 43 ++++++++++- 2 files changed, 115 insertions(+), 2 deletions(-) diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ReadTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ReadTranslation.java index d6c34008f29e..aff5fc9686f7 100644 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ReadTranslation.java +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ReadTranslation.java @@ -20,10 +20,15 @@ import static com.google.common.base.Preconditions.checkArgument; +import com.google.auto.service.AutoService; +import com.google.common.collect.ImmutableMap; import com.google.protobuf.Any; import com.google.protobuf.ByteString; import com.google.protobuf.BytesValue; import com.google.protobuf.InvalidProtocolBufferException; +import java.util.Map; +import org.apache.beam.runners.core.construction.PTransformTranslation.TransformPayloadTranslator; +import org.apache.beam.sdk.common.runner.v1.RunnerApi; import org.apache.beam.sdk.common.runner.v1.RunnerApi.FunctionSpec; import org.apache.beam.sdk.common.runner.v1.RunnerApi.IsBounded; import org.apache.beam.sdk.common.runner.v1.RunnerApi.ReadPayload; @@ -32,12 +37,13 @@ import org.apache.beam.sdk.io.Read; import org.apache.beam.sdk.io.Source; import org.apache.beam.sdk.io.UnboundedSource; +import org.apache.beam.sdk.runners.AppliedPTransform; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.util.SerializableUtils; /** * Methods for translating {@link Read.Bounded} and {@link Read.Unbounded} - * {@link PTransform PTransforms} into {@link ReadPayload} protos. + * {@link PTransform PTransformTranslation} into {@link ReadPayload} protos. */ public class ReadTranslation { private static final String JAVA_SERIALIZED_BOUNDED_SOURCE = "urn:beam:java:boundedsource:v1"; @@ -124,4 +130,70 @@ private static SdkFunctionSpec toProto(UnboundedSource source) { "BoundedSource"); } + /** + * A {@link TransformPayloadTranslator} for {@link Read.Unbounded}. + */ + public static class UnboundedReadPayloadTranslator + implements PTransformTranslation.TransformPayloadTranslator> { + public static TransformPayloadTranslator create() { + return new UnboundedReadPayloadTranslator(); + } + + private UnboundedReadPayloadTranslator() {} + + @Override + public String getUrn(Read.Unbounded transform) { + return PTransformTranslation.WINDOW_TRANSFORM_URN; + } + + @Override + public FunctionSpec translate( + AppliedPTransform> transform, SdkComponents components) { + ReadPayload payload = toProto(transform.getTransform()); + return RunnerApi.FunctionSpec.newBuilder() + .setUrn(PTransformTranslation.READ_TRANSFORM_URN) + .setParameter(Any.pack(payload)) + .build(); + } + } + + /** + * A {@link TransformPayloadTranslator} for {@link Read.Bounded}. + */ + public static class BoundedReadPayloadTranslator + implements PTransformTranslation.TransformPayloadTranslator> { + public static TransformPayloadTranslator create() { + return new BoundedReadPayloadTranslator(); + } + + private BoundedReadPayloadTranslator() {} + + @Override + public String getUrn(Read.Bounded transform) { + return PTransformTranslation.WINDOW_TRANSFORM_URN; + } + + @Override + public FunctionSpec translate( + AppliedPTransform> transform, SdkComponents components) { + ReadPayload payload = toProto(transform.getTransform()); + return RunnerApi.FunctionSpec.newBuilder() + .setUrn(PTransformTranslation.READ_TRANSFORM_URN) + .setParameter(Any.pack(payload)) + .build(); + } + } + + /** Registers {@link UnboundedReadPayloadTranslator} and {@link BoundedReadPayloadTranslator}. */ + @AutoService(TransformPayloadTranslatorRegistrar.class) + public static class Registrar implements TransformPayloadTranslatorRegistrar { + @Override + public Map, ? extends TransformPayloadTranslator> + getTransformPayloadTranslators() { + return ImmutableMap., TransformPayloadTranslator>builder() + .put(Read.Unbounded.class, new UnboundedReadPayloadTranslator()) + .put(Read.Bounded.class, new BoundedReadPayloadTranslator()) + .build(); + } + } } diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowIntoTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowIntoTranslation.java index 33faa02f8801..5ed4d24906b1 100644 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowIntoTranslation.java +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowIntoTranslation.java @@ -18,14 +18,18 @@ package org.apache.beam.runners.core.construction; +import com.google.auto.service.AutoService; import com.google.protobuf.Any; import com.google.protobuf.InvalidProtocolBufferException; +import java.util.Collections; +import java.util.Map; import org.apache.beam.runners.core.construction.PTransformTranslation.TransformPayloadTranslator; import org.apache.beam.sdk.common.runner.v1.RunnerApi; import org.apache.beam.sdk.common.runner.v1.RunnerApi.FunctionSpec; import org.apache.beam.sdk.common.runner.v1.RunnerApi.SdkFunctionSpec; import org.apache.beam.sdk.common.runner.v1.RunnerApi.WindowIntoPayload; import org.apache.beam.sdk.runners.AppliedPTransform; +import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.windowing.Window; import org.apache.beam.sdk.transforms.windowing.Window.Assign; import org.apache.beam.sdk.transforms.windowing.WindowFn; @@ -40,7 +44,7 @@ static class WindowAssignTranslator implements TransformPayloadTranslator transform) { - return PTransforms.WINDOW_TRANSFORM_URN; + return PTransformTranslation.WINDOW_TRANSFORM_URN; } @Override @@ -65,4 +69,41 @@ public static WindowIntoPayload toProto(Window.Assign transform, SdkComponent SdkFunctionSpec spec = payload.getWindowFn(); return WindowingStrategyTranslation.windowFnFromProto(spec); } + + /** + * A {@link TransformPayloadTranslator} for {@link Window}. + */ + public static class WindowIntoPayloadTranslator + implements PTransformTranslation.TransformPayloadTranslator> { + public static TransformPayloadTranslator create() { + return new WindowIntoPayloadTranslator(); + } + + private WindowIntoPayloadTranslator() {} + + @Override + public String getUrn(Window.Assign transform) { + return PTransformTranslation.WINDOW_TRANSFORM_URN; + } + + @Override + public FunctionSpec translate( + AppliedPTransform> transform, SdkComponents components) { + WindowIntoPayload payload = toProto(transform.getTransform(), components); + return RunnerApi.FunctionSpec.newBuilder() + .setUrn(PTransformTranslation.WINDOW_TRANSFORM_URN) + .setParameter(Any.pack(payload)) + .build(); + } + } + + /** Registers {@link WindowIntoPayloadTranslator}. */ + @AutoService(TransformPayloadTranslatorRegistrar.class) + public static class Registrar implements TransformPayloadTranslatorRegistrar { + @Override + public Map, ? extends TransformPayloadTranslator> + getTransformPayloadTranslators() { + return Collections.singletonMap(Window.Assign.class, new WindowIntoPayloadTranslator()); + } + } } From d91c840f009bdfda14f8668172d076a7e9f12f5e Mon Sep 17 00:00:00 2001 From: Kenneth Knowles Date: Mon, 22 May 2017 21:09:20 -0700 Subject: [PATCH 06/11] Register ReadTranslator --- .../beam/runners/core/construction/ReadTranslation.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ReadTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ReadTranslation.java index aff5fc9686f7..3ddde8d767aa 100644 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ReadTranslation.java +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ReadTranslation.java @@ -143,7 +143,7 @@ private UnboundedReadPayloadTranslator() {} @Override public String getUrn(Read.Unbounded transform) { - return PTransformTranslation.WINDOW_TRANSFORM_URN; + return PTransformTranslation.READ_TRANSFORM_URN; } @Override @@ -151,7 +151,7 @@ public FunctionSpec translate( AppliedPTransform> transform, SdkComponents components) { ReadPayload payload = toProto(transform.getTransform()); return RunnerApi.FunctionSpec.newBuilder() - .setUrn(PTransformTranslation.READ_TRANSFORM_URN) + .setUrn(getUrn(transform.getTransform())) .setParameter(Any.pack(payload)) .build(); } @@ -170,7 +170,7 @@ private BoundedReadPayloadTranslator() {} @Override public String getUrn(Read.Bounded transform) { - return PTransformTranslation.WINDOW_TRANSFORM_URN; + return PTransformTranslation.READ_TRANSFORM_URN; } @Override @@ -178,7 +178,7 @@ public FunctionSpec translate( AppliedPTransform> transform, SdkComponents components) { ReadPayload payload = toProto(transform.getTransform()); return RunnerApi.FunctionSpec.newBuilder() - .setUrn(PTransformTranslation.READ_TRANSFORM_URN) + .setUrn(getUrn(transform.getTransform())) .setParameter(Any.pack(payload)) .build(); } From 69bb1be4d3bd20e5c080bcce0eb464fac3d2ff73 Mon Sep 17 00:00:00 2001 From: Kenneth Knowles Date: Mon, 22 May 2017 21:09:33 -0700 Subject: [PATCH 07/11] Register WindowIntoTranslator --- .../beam/runners/core/construction/WindowIntoTranslation.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowIntoTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowIntoTranslation.java index 5ed4d24906b1..aa17bc90d6a3 100644 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowIntoTranslation.java +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowIntoTranslation.java @@ -91,7 +91,7 @@ public FunctionSpec translate( AppliedPTransform> transform, SdkComponents components) { WindowIntoPayload payload = toProto(transform.getTransform(), components); return RunnerApi.FunctionSpec.newBuilder() - .setUrn(PTransformTranslation.WINDOW_TRANSFORM_URN) + .setUrn(getUrn(transform.getTransform())) .setParameter(Any.pack(payload)) .build(); } From 121631c62f94a6e553d3bbd8708cee2dbdf53923 Mon Sep 17 00:00:00 2001 From: Kenneth Knowles Date: Mon, 22 May 2017 21:09:54 -0700 Subject: [PATCH 08/11] Add trivial FlattenTranslator to access URN --- .../core/construction/FlattenTranslator.java | 63 +++++++++++++++++++ 1 file changed, 63 insertions(+) create mode 100644 runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/FlattenTranslator.java diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/FlattenTranslator.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/FlattenTranslator.java new file mode 100644 index 000000000000..f1d553dc8abd --- /dev/null +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/FlattenTranslator.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.core.construction; + +import com.google.auto.service.AutoService; +import java.util.Collections; +import java.util.Map; +import org.apache.beam.runners.core.construction.PTransformTranslation.TransformPayloadTranslator; +import org.apache.beam.sdk.common.runner.v1.RunnerApi; +import org.apache.beam.sdk.common.runner.v1.RunnerApi.FunctionSpec; +import org.apache.beam.sdk.runners.AppliedPTransform; +import org.apache.beam.sdk.transforms.Flatten; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.windowing.Window.Assign; + +/** + * Utility methods for translating a {@link Assign} to and from {@link RunnerApi} representations. + */ +public class FlattenTranslator implements TransformPayloadTranslator> { + + public static TransformPayloadTranslator create() { + return new FlattenTranslator(); + } + + private FlattenTranslator() {} + + @Override + public String getUrn(Flatten.PCollections transform) { + return PTransformTranslation.FLATTEN_TRANSFORM_URN; + } + + @Override + public FunctionSpec translate( + AppliedPTransform> transform, SdkComponents components) { + return RunnerApi.FunctionSpec.newBuilder().setUrn(getUrn(transform.getTransform())).build(); + } + + /** Registers {@link FlattenTranslator}. */ + @AutoService(TransformPayloadTranslatorRegistrar.class) + public static class Registrar implements TransformPayloadTranslatorRegistrar { + @Override + public Map, ? extends TransformPayloadTranslator> + getTransformPayloadTranslators() { + return Collections.singletonMap(Flatten.PCollections.class, new FlattenTranslator()); + } + } +} From 8e09596a981fe69edb7a1560e864bc852978ba81 Mon Sep 17 00:00:00 2001 From: Kenneth Knowles Date: Tue, 23 May 2017 21:15:48 -0700 Subject: [PATCH 09/11] Add transform-analysis helpers to ReadTranslation These helpers allow a runner to extract a source and inspect boundedness without reference to the specific user-facing Java classes Read.Bounded and Read.Unbounded. --- .../construction/PCollectionTranslation.java | 4 +- .../core/construction/ReadTranslation.java | 43 +++++++++++++++++++ 2 files changed, 45 insertions(+), 2 deletions(-) diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PCollectionTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PCollectionTranslation.java index 303c02d44a11..968966f459e1 100644 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PCollectionTranslation.java +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PCollectionTranslation.java @@ -64,7 +64,7 @@ public static Coder getCoder( components.getWindowingStrategiesOrThrow(pCollection.getWindowingStrategyId()), components); } - private static RunnerApi.IsBounded toProto(IsBounded bounded) { + static RunnerApi.IsBounded toProto(IsBounded bounded) { switch (bounded) { case BOUNDED: return RunnerApi.IsBounded.BOUNDED; @@ -76,7 +76,7 @@ private static RunnerApi.IsBounded toProto(IsBounded bounded) { } } - private static IsBounded fromProto(RunnerApi.IsBounded isBounded) { + static IsBounded fromProto(RunnerApi.IsBounded isBounded) { switch (isBounded) { case BOUNDED: return IsBounded.BOUNDED; diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ReadTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ReadTranslation.java index 3ddde8d767aa..572384bdd549 100644 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ReadTranslation.java +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ReadTranslation.java @@ -26,6 +26,8 @@ import com.google.protobuf.ByteString; import com.google.protobuf.BytesValue; import com.google.protobuf.InvalidProtocolBufferException; +import java.io.IOException; +import java.util.Collections; import java.util.Map; import org.apache.beam.runners.core.construction.PTransformTranslation.TransformPayloadTranslator; import org.apache.beam.sdk.common.runner.v1.RunnerApi; @@ -40,6 +42,8 @@ import org.apache.beam.sdk.runners.AppliedPTransform; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.util.SerializableUtils; +import org.apache.beam.sdk.values.PBegin; +import org.apache.beam.sdk.values.PCollection; /** * Methods for translating {@link Read.Bounded} and {@link Read.Unbounded} @@ -102,6 +106,29 @@ public static BoundedSource boundedSourceFromProto(ReadPayload payload) "BoundedSource"); } + public static BoundedSource boundedSourceFromTransform( + AppliedPTransform, PTransform>> transform) + throws IOException { + return (BoundedSource) boundedSourceFromProto(getReadPayload(transform)); + } + + public static + UnboundedSource unboundedSourceFromTransform( + AppliedPTransform, PTransform>> transform) + throws IOException { + return (UnboundedSource) unboundedSourceFromProto(getReadPayload(transform)); + } + + private static ReadPayload getReadPayload( + AppliedPTransform, PTransform>> transform) + throws IOException { + return PTransformTranslation.toProto( + transform, Collections.>emptyList(), SdkComponents.create()) + .getSpec() + .getParameter() + .unpack(ReadPayload.class); + } + private static SdkFunctionSpec toProto(UnboundedSource source) { return SdkFunctionSpec.newBuilder() .setSpec( @@ -130,6 +157,22 @@ private static SdkFunctionSpec toProto(UnboundedSource source) { "BoundedSource"); } + public static PCollection.IsBounded sourceIsBounded(AppliedPTransform transform) { + try { + return PCollectionTranslation.fromProto( + PTransformTranslation.toProto( + transform, + Collections.>emptyList(), + SdkComponents.create()) + .getSpec() + .getParameter() + .unpack(ReadPayload.class) + .getIsBounded()); + } catch (IOException e) { + throw new RuntimeException("Internal error determining boundedness of Read", e); + } + } + /** * A {@link TransformPayloadTranslator} for {@link Read.Unbounded}. */ From 663ad88178ceadffe4cfa592555986ed7dde58b4 Mon Sep 17 00:00:00 2001 From: Kenneth Knowles Date: Fri, 19 May 2017 21:28:19 -0700 Subject: [PATCH 10/11] Add RawPTransform, which can just vend its URN and payload This is the type that will be returned when a pipeline is deserialized. This also is convenient for direct runner overrides which do not really merit translator registrations, yet URNs need to be known in order to key the evaluator registry off URN. --- .../construction/PTransformTranslation.java | 51 +++++++++++++++++++ 1 file changed, 51 insertions(+) diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java index 35bb0e3dad4e..9f5f3b50b723 100644 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java @@ -21,15 +21,20 @@ import static com.google.common.base.Preconditions.checkArgument; import com.google.common.collect.ImmutableMap; +import com.google.protobuf.Any; +import com.google.protobuf.Message; import java.io.IOException; import java.util.List; import java.util.Map; import java.util.ServiceLoader; +import javax.annotation.Nullable; import org.apache.beam.sdk.common.runner.v1.RunnerApi; import org.apache.beam.sdk.common.runner.v1.RunnerApi.FunctionSpec; import org.apache.beam.sdk.runners.AppliedPTransform; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PInput; +import org.apache.beam.sdk.values.POutput; import org.apache.beam.sdk.values.PValue; import org.apache.beam.sdk.values.TupleTag; @@ -135,4 +140,50 @@ public interface TransformPayloadTranslator> { String getUrn(T transform); FunctionSpec translate(AppliedPTransform application, SdkComponents components); } + + /** + * A {@link PTransform} that indicates its URN and payload directly. + * + *

This is the result of rehydrating transforms from a pipeline proto. There is no {@link + * #expand} method since the definition of the transform may be lost. The transform is already + * fully expanded in the pipeline proto. + */ + public abstract static class RawPTransform< + InputT extends PInput, OutputT extends POutput, PayloadT extends Message> + extends PTransform { + + public abstract String getUrn(); + + @Nullable + PayloadT getPayload() { + return null; + } + } + + /** + * A translator that uses the explicit URN and payload from a {@link RawPTransform}. + */ + public static class RawPTransformTranslator + implements TransformPayloadTranslator> { + @Override + public String getUrn(RawPTransform transform) { + return transform.getUrn(); + } + + @Override + public FunctionSpec translate( + AppliedPTransform> transform, + SdkComponents components) { + PayloadT payload = transform.getTransform().getPayload(); + + FunctionSpec.Builder transformSpec = + FunctionSpec.newBuilder().setUrn(getUrn(transform.getTransform())); + + if (payload != null) { + transformSpec.setParameter(Any.pack(payload)); + } + + return transformSpec.build(); + } + } } From 0e29cc52a3e4b0d9ae5ff3907f10e4e87b734186 Mon Sep 17 00:00:00 2001 From: Kenneth Knowles Date: Fri, 19 May 2017 20:53:32 -0700 Subject: [PATCH 11/11] URNs for DirectRunner TransformEvaluator and RootInputProvider This makes two of the Java DirectRunner's registries key off URN instead of Java class. A root TransformEvaluator requires shards generated by its associated RootInputProvider, hence changing both at once. --- runners/direct-java/pom.xml | 5 + .../direct/BoundedReadEvaluatorFactory.java | 14 +- .../beam/runners/direct/DirectGroupByKey.java | 21 ++- .../runners/direct/EmptyInputProvider.java | 8 +- .../direct/ParDoMultiOverrideFactory.java | 13 +- .../runners/direct/ReadEvaluatorFactory.java | 97 +++++++++++++ .../runners/direct/RootInputProvider.java | 7 +- .../runners/direct/RootProviderRegistry.java | 28 ++-- .../beam/runners/direct/SourceShard.java | 33 +++++ .../direct/TestStreamEvaluatorFactory.java | 28 ++-- .../direct/TransformEvaluatorRegistry.java | 128 +++++++++++++----- .../direct/UnboundedReadEvaluatorFactory.java | 31 +++-- .../runners/direct/ViewOverrideFactory.java | 12 +- .../main/resources/beam/findbugs-filter.xml | 7 + 14 files changed, 344 insertions(+), 88 deletions(-) create mode 100644 runners/direct-java/src/main/java/org/apache/beam/runners/direct/ReadEvaluatorFactory.java create mode 100644 runners/direct-java/src/main/java/org/apache/beam/runners/direct/SourceShard.java diff --git a/runners/direct-java/pom.xml b/runners/direct-java/pom.xml index bec21139d989..cba4b099ff0f 100644 --- a/runners/direct-java/pom.xml +++ b/runners/direct-java/pom.xml @@ -207,6 +207,11 @@ test + + com.google.protobuf + protobuf-java + + joda-time joda-time diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java index 76db861897e0..fcaaa8442082 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java @@ -33,10 +33,10 @@ import java.util.concurrent.Executors; import java.util.concurrent.Future; import javax.annotation.Nullable; +import org.apache.beam.runners.core.construction.ReadTranslation; import org.apache.beam.runners.direct.StepTransformResult.Builder; import org.apache.beam.sdk.io.BoundedSource; import org.apache.beam.sdk.io.BoundedSource.BoundedReader; -import org.apache.beam.sdk.io.Read; import org.apache.beam.sdk.io.Read.Bounded; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.runners.AppliedPTransform; @@ -180,16 +180,17 @@ public TransformResult> finishBundle() { } @AutoValue - abstract static class BoundedSourceShard { + abstract static class BoundedSourceShard implements SourceShard { static BoundedSourceShard of(BoundedSource source) { return new AutoValue_BoundedReadEvaluatorFactory_BoundedSourceShard<>(source); } - abstract BoundedSource getSource(); + @Override + public abstract BoundedSource getSource(); } static class InputProvider - implements RootInputProvider, PBegin, Read.Bounded> { + implements RootInputProvider, PBegin> { private final EvaluationContext evaluationContext; InputProvider(EvaluationContext evaluationContext) { @@ -198,9 +199,10 @@ static class InputProvider @Override public Collection>> getInitialInputs( - AppliedPTransform, Read.Bounded> transform, int targetParallelism) + AppliedPTransform, PTransform>> transform, + int targetParallelism) throws Exception { - BoundedSource source = transform.getTransform().getSource(); + BoundedSource source = ReadTranslation.boundedSourceFromTransform(transform); PipelineOptions options = evaluationContext.getPipelineOptions(); long estimatedBytes = source.getEstimatedSizeBytes(options); long bytesPerBundle = estimatedBytes / targetParallelism; diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKey.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKey.java index 791615a2a194..f239070d925b 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKey.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKey.java @@ -20,9 +20,11 @@ import static com.google.common.base.Preconditions.checkArgument; +import com.google.protobuf.Message; import org.apache.beam.runners.core.KeyedWorkItem; import org.apache.beam.runners.core.KeyedWorkItemCoder; import org.apache.beam.runners.core.construction.ForwardingPTransform; +import org.apache.beam.runners.core.construction.PTransformTranslation; import org.apache.beam.sdk.coders.CannotProvideCoderException; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.IterableCoder; @@ -37,6 +39,9 @@ class DirectGroupByKey extends ForwardingPTransform>, PCollection>>> { private final GroupByKey original; + static final String DIRECT_GBKO_URN = "urn:beam:directrunner:transforms:gbko:v1"; + static final String DIRECT_GABW_URN = "urn:beam:directrunner:transforms:gabw:v1"; + DirectGroupByKey(GroupByKey from) { this.original = from; } @@ -68,7 +73,8 @@ public PCollection>> expand(PCollection> input) { } static final class DirectGroupByKeyOnly - extends PTransform>, PCollection>> { + extends PTransformTranslation.RawPTransform< + PCollection>, PCollection>, Message> { @Override public PCollection> expand(PCollection> input) { return PCollection.createPrimitiveOutputInternal( @@ -86,10 +92,16 @@ protected Coder getDefaultOutputCoder( GroupByKey.getInputValueCoder(input.getCoder()), input.getWindowingStrategy().getWindowFn().windowCoder()); } + + @Override + public String getUrn() { + return DIRECT_GBKO_URN; + } } static final class DirectGroupAlsoByWindow - extends PTransform>, PCollection>>> { + extends PTransformTranslation.RawPTransform< + PCollection>, PCollection>>, Message> { private final WindowingStrategy inputWindowingStrategy; private final WindowingStrategy outputWindowingStrategy; @@ -135,5 +147,10 @@ public PCollection>> expand(PCollection> i return PCollection.createPrimitiveOutputInternal( input.getPipeline(), outputWindowingStrategy, input.isBounded()); } + + @Override + public String getUrn() { + return DIRECT_GABW_URN; + } } } diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EmptyInputProvider.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EmptyInputProvider.java index c36879a5f7b6..a5a53bc8ea6c 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EmptyInputProvider.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EmptyInputProvider.java @@ -20,13 +20,12 @@ import java.util.Collection; import java.util.Collections; import org.apache.beam.sdk.runners.AppliedPTransform; -import org.apache.beam.sdk.transforms.Flatten; +import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionList; /** A {@link RootInputProvider} that provides a singleton empty bundle. */ -class EmptyInputProvider - implements RootInputProvider, Flatten.PCollections> { +class EmptyInputProvider implements RootInputProvider> { EmptyInputProvider() {} /** @@ -36,7 +35,8 @@ class EmptyInputProvider */ @Override public Collection> getInitialInputs( - AppliedPTransform, PCollection, Flatten.PCollections> + AppliedPTransform< + PCollectionList, PCollection, PTransform, PCollection>> transform, int targetParallelism) { return Collections.emptyList(); diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java index be433db1ca00..df2054b333ab 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java @@ -19,11 +19,13 @@ import static com.google.common.base.Preconditions.checkState; +import com.google.protobuf.Message; import java.util.Map; import org.apache.beam.runners.core.KeyedWorkItem; import org.apache.beam.runners.core.KeyedWorkItemCoder; import org.apache.beam.runners.core.KeyedWorkItems; import org.apache.beam.runners.core.construction.PTransformReplacements; +import org.apache.beam.runners.core.construction.PTransformTranslation; import org.apache.beam.runners.core.construction.ReplacementOutputs; import org.apache.beam.runners.core.construction.SplittableParDo; import org.apache.beam.sdk.coders.CannotProvideCoderException; @@ -165,8 +167,12 @@ public PCollectionTuple expand(PCollection> input) { } } + static final String DIRECT_STATEFUL_PAR_DO_URN = + "urn:beam:directrunner:transforms:stateful_pardo:v1"; + static class StatefulParDo - extends PTransform>>, PCollectionTuple> { + extends PTransformTranslation.RawPTransform< + PCollection>>, PCollectionTuple, Message> { private final transient MultiOutput, OutputT> underlyingParDo; private final transient PCollection> originalInput; @@ -201,6 +207,11 @@ public PCollectionTuple expand(PCollection TransformEvaluator forApplication( + AppliedPTransform application, CommittedBundle inputBundle) throws Exception { + switch (ReadTranslation.sourceIsBounded(application)) { + case BOUNDED: + return boundedFactory.forApplication(application, inputBundle); + case UNBOUNDED: + return unboundedFactory.forApplication(application, inputBundle); + default: + throw new IllegalArgumentException("PCollection is neither bounded nor unbounded?!?"); + } + } + + @Override + public void cleanup() throws Exception { + boundedFactory.cleanup(); + unboundedFactory.cleanup(); + } + + static InputProvider inputProvider(EvaluationContext context) { + return new InputProvider(context); + } + + private static class InputProvider implements RootInputProvider, PBegin> { + + private final UnboundedReadEvaluatorFactory.InputProvider unboundedInputProvider; + private final BoundedReadEvaluatorFactory.InputProvider boundedInputProvider; + + InputProvider(EvaluationContext context) { + this.unboundedInputProvider = new UnboundedReadEvaluatorFactory.InputProvider(context); + this.boundedInputProvider = new BoundedReadEvaluatorFactory.InputProvider(context); + } + + @Override + public Collection>> getInitialInputs( + AppliedPTransform, PTransform>> + appliedTransform, + int targetParallelism) + throws Exception { + switch (ReadTranslation.sourceIsBounded(appliedTransform)) { + case BOUNDED: + // This cast could be made unnecessary, but too much bounded polymorphism + return (Collection) + boundedInputProvider.getInitialInputs(appliedTransform, targetParallelism); + case UNBOUNDED: + // This cast could be made unnecessary, but too much bounded polymorphism + return (Collection) + unboundedInputProvider.getInitialInputs(appliedTransform, targetParallelism); + default: + throw new IllegalArgumentException("PCollection is neither bounded nor unbounded?!?"); + } + } + } +} diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/RootInputProvider.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/RootInputProvider.java index ce6951805117..0b3de3226aca 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/RootInputProvider.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/RootInputProvider.java @@ -29,8 +29,7 @@ * Provides {@link CommittedBundle bundles} that will be provided to the {@link PTransform * PTransforms} that are at the root of a {@link Pipeline}. */ -interface RootInputProvider< - T, ShardT, InputT extends PInput, TransformT extends PTransform>> { +interface RootInputProvider { /** * Get the initial inputs for the {@link AppliedPTransform}. The {@link AppliedPTransform} will be * provided with these {@link CommittedBundle bundles} as input when the {@link Pipeline} runs. @@ -44,6 +43,8 @@ interface RootInputProvider< * greater than or equal to 1. */ Collection> getInitialInputs( - AppliedPTransform, TransformT> transform, int targetParallelism) + AppliedPTransform, PTransform>> + transform, + int targetParallelism) throws Exception; } diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/RootProviderRegistry.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/RootProviderRegistry.java index 4b0c06d3611f..5cbeab7db249 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/RootProviderRegistry.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/RootProviderRegistry.java @@ -18,13 +18,14 @@ package org.apache.beam.runners.direct; import static com.google.common.base.Preconditions.checkNotNull; +import static org.apache.beam.runners.core.construction.PTransformTranslation.FLATTEN_TRANSFORM_URN; +import static org.apache.beam.runners.direct.TestStreamEvaluatorFactory.DirectTestStreamFactory.DIRECT_TEST_STREAM_URN; import com.google.common.collect.ImmutableMap; import java.util.Collection; import java.util.Map; -import org.apache.beam.sdk.io.Read; +import org.apache.beam.runners.core.construction.PTransformTranslation; import org.apache.beam.sdk.runners.AppliedPTransform; -import org.apache.beam.sdk.transforms.Flatten.PCollections; import org.apache.beam.sdk.transforms.PTransform; /** @@ -33,34 +34,31 @@ */ class RootProviderRegistry { public static RootProviderRegistry defaultRegistry(EvaluationContext context) { - ImmutableMap.Builder, RootInputProvider> + ImmutableMap.Builder> defaultProviders = ImmutableMap.builder(); defaultProviders - .put(Read.Bounded.class, new BoundedReadEvaluatorFactory.InputProvider(context)) - .put(Read.Unbounded.class, new UnboundedReadEvaluatorFactory.InputProvider(context)) - .put( - TestStreamEvaluatorFactory.DirectTestStreamFactory.DirectTestStream.class, - new TestStreamEvaluatorFactory.InputProvider(context)) - .put(PCollections.class, new EmptyInputProvider()); + .put(PTransformTranslation.READ_TRANSFORM_URN, ReadEvaluatorFactory.inputProvider(context)) + .put(DIRECT_TEST_STREAM_URN, new TestStreamEvaluatorFactory.InputProvider(context)) + .put(FLATTEN_TRANSFORM_URN, new EmptyInputProvider()); return new RootProviderRegistry(defaultProviders.build()); } - private final Map, RootInputProvider> providers; + private final Map> providers; private RootProviderRegistry( - Map, RootInputProvider> providers) { + Map> providers) { this.providers = providers; } public Collection> getInitialInputs( AppliedPTransform transform, int targetParallelism) throws Exception { - Class transformClass = transform.getTransform().getClass(); + String transformUrn = PTransformTranslation.urnForTransform(transform.getTransform()); RootInputProvider provider = checkNotNull( - providers.get(transformClass), - "Tried to get a %s for a Transform of type %s, but there is no such provider", + providers.get(transformUrn), + "Tried to get a %s for a transform \"%s\", but there is no such provider", RootInputProvider.class.getSimpleName(), - transformClass.getSimpleName()); + transformUrn); return provider.getInitialInputs(transform, targetParallelism); } } diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SourceShard.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SourceShard.java new file mode 100644 index 000000000000..a054333d70fc --- /dev/null +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SourceShard.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.direct; + +import org.apache.beam.sdk.io.BoundedSource; +import org.apache.beam.sdk.io.Read; +import org.apache.beam.sdk.io.Source; +import org.apache.beam.sdk.io.UnboundedSource; + +/** + * A shard for a source in the {@link Read} transform. + * + *

Since {@link UnboundedSource} and {@link BoundedSource} have radically different needs, this + * is a mostly-empty interface. + */ +interface SourceShard { + Source getSource(); +} diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java index 8b21d5affd60..b1db58f7c4a0 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java @@ -22,12 +22,14 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Supplier; import com.google.common.collect.Iterables; +import com.google.protobuf.Message; import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.Map; import java.util.concurrent.atomic.AtomicReference; import javax.annotation.Nullable; +import org.apache.beam.runners.core.construction.PTransformTranslation; import org.apache.beam.runners.core.construction.ReplacementOutputs; import org.apache.beam.sdk.runners.AppliedPTransform; import org.apache.beam.sdk.runners.PTransformOverrideFactory; @@ -180,7 +182,10 @@ public Map mapOutputs( return ReplacementOutputs.singleton(outputs, newOutput); } - static class DirectTestStream extends PTransform> { + static final String DIRECT_TEST_STREAM_URN = "urn:beam:directrunner:transforms:test_stream:v1"; + + static class DirectTestStream + extends PTransformTranslation.RawPTransform, Message> { private final transient DirectRunner runner; private final TestStream original; @@ -197,12 +202,15 @@ public PCollection expand(PBegin input) { input.getPipeline(), WindowingStrategy.globalDefault(), IsBounded.UNBOUNDED) .setCoder(original.getValueCoder()); } + + @Override + public String getUrn() { + return DIRECT_TEST_STREAM_URN; + } } } - static class InputProvider - implements RootInputProvider< - T, TestStreamIndex, PBegin, DirectTestStreamFactory.DirectTestStream> { + static class InputProvider implements RootInputProvider, PBegin> { private final EvaluationContext evaluationContext; InputProvider(EvaluationContext evaluationContext) { @@ -211,15 +219,17 @@ static class InputProvider @Override public Collection>> getInitialInputs( - AppliedPTransform, DirectTestStreamFactory.DirectTestStream> - transform, + AppliedPTransform, PTransform>> transform, int targetParallelism) { + + // This will always be run on an execution-time transform, so it can be downcast + DirectTestStreamFactory.DirectTestStream testStream = + (DirectTestStreamFactory.DirectTestStream) transform.getTransform(); + CommittedBundle> initialBundle = evaluationContext .>createRootBundle() - .add( - WindowedValue.valueInGlobalWindow( - TestStreamIndex.of(transform.getTransform().original))) + .add(WindowedValue.valueInGlobalWindow(TestStreamIndex.of(testStream.original))) .commit(BoundedWindow.TIMESTAMP_MAX_VALUE); return Collections.singleton(initialBundle); } diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java index 718cca2f5ddb..d144b2042d2b 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java @@ -19,23 +19,33 @@ import static com.google.common.base.Preconditions.checkNotNull; import static com.google.common.base.Preconditions.checkState; +import static org.apache.beam.runners.core.construction.PTransformTranslation.FLATTEN_TRANSFORM_URN; +import static org.apache.beam.runners.core.construction.PTransformTranslation.PAR_DO_TRANSFORM_URN; +import static org.apache.beam.runners.core.construction.PTransformTranslation.READ_TRANSFORM_URN; +import static org.apache.beam.runners.core.construction.PTransformTranslation.WINDOW_TRANSFORM_URN; +import static org.apache.beam.runners.core.construction.SplittableParDo.SPLITTABLE_PROCESS_URN; +import static org.apache.beam.runners.direct.DirectGroupByKey.DIRECT_GABW_URN; +import static org.apache.beam.runners.direct.DirectGroupByKey.DIRECT_GBKO_URN; +import static org.apache.beam.runners.direct.ParDoMultiOverrideFactory.DIRECT_STATEFUL_PAR_DO_URN; +import static org.apache.beam.runners.direct.TestStreamEvaluatorFactory.DirectTestStreamFactory.DIRECT_TEST_STREAM_URN; +import static org.apache.beam.runners.direct.ViewOverrideFactory.DIRECT_WRITE_VIEW_URN; +import com.google.auto.service.AutoService; import com.google.common.collect.ImmutableMap; import java.util.ArrayList; import java.util.Collection; import java.util.Map; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.beam.runners.core.SplittableParDoViaKeyedWorkItems; -import org.apache.beam.runners.direct.DirectGroupByKey.DirectGroupAlsoByWindow; -import org.apache.beam.runners.direct.DirectGroupByKey.DirectGroupByKeyOnly; -import org.apache.beam.runners.direct.ParDoMultiOverrideFactory.StatefulParDo; -import org.apache.beam.runners.direct.ViewOverrideFactory.WriteView; -import org.apache.beam.sdk.io.Read; +import org.apache.beam.runners.core.SplittableParDoViaKeyedWorkItems.ProcessElements; +import org.apache.beam.runners.core.construction.PTransformTranslation; +import org.apache.beam.runners.core.construction.PTransformTranslation.TransformPayloadTranslator; +import org.apache.beam.runners.core.construction.SdkComponents; +import org.apache.beam.runners.core.construction.TransformPayloadTranslatorRegistrar; +import org.apache.beam.runners.direct.TestStreamEvaluatorFactory.DirectTestStreamFactory.DirectTestStream; +import org.apache.beam.sdk.common.runner.v1.RunnerApi.FunctionSpec; import org.apache.beam.sdk.runners.AppliedPTransform; -import org.apache.beam.sdk.transforms.Flatten.PCollections; import org.apache.beam.sdk.transforms.PTransform; -import org.apache.beam.sdk.transforms.ParDo; -import org.apache.beam.sdk.transforms.windowing.Window; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -45,43 +55,93 @@ */ class TransformEvaluatorRegistry implements TransformEvaluatorFactory { private static final Logger LOG = LoggerFactory.getLogger(TransformEvaluatorRegistry.class); + public static TransformEvaluatorRegistry defaultRegistry(EvaluationContext ctxt) { - @SuppressWarnings({"rawtypes"}) - ImmutableMap, TransformEvaluatorFactory> primitives = - ImmutableMap., TransformEvaluatorFactory>builder() - .put(Read.Bounded.class, new BoundedReadEvaluatorFactory(ctxt)) - .put(Read.Unbounded.class, new UnboundedReadEvaluatorFactory(ctxt)) + ImmutableMap primitives = + ImmutableMap.builder() + // Beam primitives + .put(READ_TRANSFORM_URN, new ReadEvaluatorFactory(ctxt)) .put( - ParDo.MultiOutput.class, + PAR_DO_TRANSFORM_URN, new ParDoEvaluatorFactory<>(ctxt, ParDoEvaluator.defaultRunnerFactory())) - .put(StatefulParDo.class, new StatefulParDoEvaluatorFactory<>(ctxt)) - .put(PCollections.class, new FlattenEvaluatorFactory(ctxt)) - .put(WriteView.class, new ViewEvaluatorFactory(ctxt)) - .put(Window.Assign.class, new WindowEvaluatorFactory(ctxt)) - // Runner-specific primitives used in expansion of GroupByKey - .put(DirectGroupByKeyOnly.class, new GroupByKeyOnlyEvaluatorFactory(ctxt)) - .put(DirectGroupAlsoByWindow.class, new GroupAlsoByWindowEvaluatorFactory(ctxt)) - .put( - TestStreamEvaluatorFactory.DirectTestStreamFactory.DirectTestStream.class, - new TestStreamEvaluatorFactory(ctxt)) - // Runner-specific primitive used in expansion of SplittableParDo - .put( - SplittableParDoViaKeyedWorkItems.ProcessElements.class, - new SplittableProcessElementsEvaluatorFactory<>(ctxt)) + .put(FLATTEN_TRANSFORM_URN, new FlattenEvaluatorFactory(ctxt)) + .put(WINDOW_TRANSFORM_URN, new WindowEvaluatorFactory(ctxt)) + + // Runner-specific primitives + .put(DIRECT_WRITE_VIEW_URN, new ViewEvaluatorFactory(ctxt)) + .put(DIRECT_STATEFUL_PAR_DO_URN, new StatefulParDoEvaluatorFactory<>(ctxt)) + .put(DIRECT_GBKO_URN, new GroupByKeyOnlyEvaluatorFactory(ctxt)) + .put(DIRECT_GABW_URN, new GroupAlsoByWindowEvaluatorFactory(ctxt)) + .put(DIRECT_TEST_STREAM_URN, new TestStreamEvaluatorFactory(ctxt)) + + // Runners-core primitives + .put(SPLITTABLE_PROCESS_URN, new SplittableProcessElementsEvaluatorFactory<>(ctxt)) .build(); return new TransformEvaluatorRegistry(primitives); } + /** Registers classes specialized to the direct runner. */ + @AutoService(TransformPayloadTranslatorRegistrar.class) + public static class DirectTransformsRegistrar implements TransformPayloadTranslatorRegistrar { + @Override + public Map< + ? extends Class, + ? extends PTransformTranslation.TransformPayloadTranslator> + getTransformPayloadTranslators() { + return ImmutableMap + ., PTransformTranslation.TransformPayloadTranslator>builder() + .put( + DirectGroupByKey.DirectGroupByKeyOnly.class, + new PTransformTranslation.RawPTransformTranslator<>()) + .put( + DirectGroupByKey.DirectGroupAlsoByWindow.class, + new PTransformTranslation.RawPTransformTranslator()) + .put( + ParDoMultiOverrideFactory.StatefulParDo.class, + new PTransformTranslation.RawPTransformTranslator<>()) + .put( + ViewOverrideFactory.WriteView.class, + new PTransformTranslation.RawPTransformTranslator<>()) + .put(DirectTestStream.class, new PTransformTranslation.RawPTransformTranslator<>()) + .put( + SplittableParDoViaKeyedWorkItems.ProcessElements.class, + new SplittableParDoProcessElementsTranslator()) + .build(); + } + } + + /** + * A translator just to vend the URN. This will need to be moved to runners-core-construction-java + * once SDF is reorganized appropriately. + */ + private static class SplittableParDoProcessElementsTranslator + implements TransformPayloadTranslator> { + + private SplittableParDoProcessElementsTranslator() {} + + @Override + public String getUrn(ProcessElements transform) { + return SPLITTABLE_PROCESS_URN; + } + + @Override + public FunctionSpec translate( + AppliedPTransform> transform, SdkComponents components) { + throw new UnsupportedOperationException( + String.format("%s should never be translated", + ProcessElements.class.getCanonicalName())); + } + } + // the TransformEvaluatorFactories can construct instances of all generic types of transform, // so all instances of a primitive can be handled with the same evaluator factory. - @SuppressWarnings("rawtypes") - private final Map, TransformEvaluatorFactory> factories; + private final Map factories; private final AtomicBoolean finished = new AtomicBoolean(false); private TransformEvaluatorRegistry( @SuppressWarnings("rawtypes") - Map, TransformEvaluatorFactory> factories) { + Map factories) { this.factories = factories; } @@ -91,10 +151,12 @@ public TransformEvaluator forApplication( throws Exception { checkState( !finished.get(), "Tried to get an evaluator for a finished TransformEvaluatorRegistry"); - Class transformClass = application.getTransform().getClass(); + + String urn = PTransformTranslation.urnForTransform(application.getTransform()); + TransformEvaluatorFactory factory = checkNotNull( - factories.get(transformClass), "No evaluator for PTransform type %s", transformClass); + factories.get(urn), "No evaluator for PTransform \"%s\"", urn); return factory.forApplication(application, inputBundle); } diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java index cba826ccc98c..7d4bba112385 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java @@ -29,6 +29,7 @@ import java.util.List; import java.util.concurrent.ThreadLocalRandom; import javax.annotation.Nullable; +import org.apache.beam.runners.core.construction.ReadTranslation; import org.apache.beam.runners.direct.UnboundedReadDeduplicator.NeverDeduplicator; import org.apache.beam.sdk.io.Read; import org.apache.beam.sdk.io.Read.Unbounded; @@ -253,7 +254,8 @@ public TransformResult> finishBun } @AutoValue - abstract static class UnboundedSourceShard { + abstract static class UnboundedSourceShard + implements SourceShard { static UnboundedSourceShard unstarted( UnboundedSource source, UnboundedReadDeduplicator deduplicator) { return of(source, deduplicator, null, null); @@ -268,7 +270,8 @@ static UnboundedSourceShard getSource(); + @Override + public abstract UnboundedSource getSource(); abstract UnboundedReadDeduplicator getDeduplicator(); @@ -283,9 +286,8 @@ UnboundedSourceShard withCheckpoint(CheckpointT newCheckpoint) { } } - static class InputProvider - implements RootInputProvider< - OutputT, UnboundedSourceShard, PBegin, Unbounded> { + static class InputProvider + implements RootInputProvider, PBegin> { private final EvaluationContext evaluationContext; InputProvider(EvaluationContext evaluationContext) { @@ -293,27 +295,28 @@ static class InputProvider } @Override - public Collection>> getInitialInputs( - AppliedPTransform, Unbounded> transform, + public Collection>> getInitialInputs( + AppliedPTransform, PTransform>> + transform, int targetParallelism) throws Exception { - UnboundedSource source = transform.getTransform().getSource(); - List> splits = + UnboundedSource source = ReadTranslation.unboundedSourceFromTransform(transform); + List> splits = source.split(targetParallelism, evaluationContext.getPipelineOptions()); UnboundedReadDeduplicator deduplicator = source.requiresDeduping() ? UnboundedReadDeduplicator.CachedIdDeduplicator.create() : NeverDeduplicator.create(); - ImmutableList.Builder>> initialShards = + ImmutableList.Builder>> initialShards = ImmutableList.builder(); - for (UnboundedSource split : splits) { - UnboundedSourceShard shard = + for (UnboundedSource split : splits) { + UnboundedSourceShard shard = UnboundedSourceShard.unstarted(split, deduplicator); initialShards.add( evaluationContext - .>createRootBundle() - .add(WindowedValue.>valueInGlobalWindow(shard)) + .>createRootBundle() + .add(WindowedValue.>valueInGlobalWindow(shard)) .commit(BoundedWindow.TIMESTAMP_MAX_VALUE)); } return initialShards.build(); diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewOverrideFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewOverrideFactory.java index b3bbac827fd3..501b4365835a 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewOverrideFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewOverrideFactory.java @@ -18,10 +18,12 @@ package org.apache.beam.runners.direct; +import com.google.protobuf.Message; import java.util.Collections; import java.util.Map; import org.apache.beam.runners.core.construction.ForwardingPTransform; import org.apache.beam.runners.core.construction.PTransformReplacements; +import org.apache.beam.runners.core.construction.PTransformTranslation.RawPTransform; import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.coders.VoidCoder; import org.apache.beam.sdk.runners.AppliedPTransform; @@ -93,7 +95,7 @@ protected PTransform, PCollectionView> delegate() { * to {@link ViewT}. */ static final class WriteView - extends PTransform>, PCollectionView> { + extends RawPTransform>, PCollectionView, Message> { private final CreatePCollectionView og; WriteView(CreatePCollectionView og) { @@ -110,5 +112,13 @@ public PCollectionView expand(PCollection> input) { public PCollectionView getView() { return og.getView(); } + + @Override + public String getUrn() { + return DIRECT_WRITE_VIEW_URN; + } } + + public static final String DIRECT_WRITE_VIEW_URN = + "urn:beam:directrunner:transforms:write_view:v1"; } diff --git a/sdks/java/build-tools/src/main/resources/beam/findbugs-filter.xml b/sdks/java/build-tools/src/main/resources/beam/findbugs-filter.xml index 8ff0cb02eb3c..3430750d37a8 100644 --- a/sdks/java/build-tools/src/main/resources/beam/findbugs-filter.xml +++ b/sdks/java/build-tools/src/main/resources/beam/findbugs-filter.xml @@ -405,4 +405,11 @@ + + + + + + +