From ae7bc1d781f793d5091b70bab1c788b795866a8f Mon Sep 17 00:00:00 2001 From: Kenneth Knowles Date: Mon, 5 Jun 2017 19:47:31 -0700 Subject: [PATCH 1/2] Clarify javadoc on PTransformTranslation --- .../runners/core/construction/PTransformTranslation.java | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java index fd3f9f3e3648..fcbe84bef623 100644 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java @@ -29,6 +29,7 @@ import java.util.Map; import java.util.ServiceLoader; import javax.annotation.Nullable; +import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.common.runner.v1.RunnerApi; import org.apache.beam.sdk.common.runner.v1.RunnerApi.FunctionSpec; import org.apache.beam.sdk.runners.AppliedPTransform; @@ -123,7 +124,10 @@ static RunnerApi.PTransform toProto( } /** - * Translates a non-composite {@link AppliedPTransform} into a runner API proto. + * Translates a composite {@link AppliedPTransform} into a runner API proto with no component + * transforms. + * + *

This should not be used when translating a {@link Pipeline}. * *

Does not register the {@code appliedPTransform} within the provided {@link SdkComponents}. */ From c3b036a243c768546f0273e22fb44eaa2fcfb245 Mon Sep 17 00:00:00 2001 From: Kenneth Knowles Date: Thu, 25 May 2017 06:56:23 -0700 Subject: [PATCH 2/2] Add CreatePCollectionView translation --- .../CreatePCollectionViewTranslation.java | 126 ++++++++++++++++ .../construction/PTransformTranslation.java | 10 +- .../CreatePCollectionViewTranslationTest.java | 136 ++++++++++++++++++ 3 files changed, 270 insertions(+), 2 deletions(-) create mode 100644 runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CreatePCollectionViewTranslation.java create mode 100644 runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CreatePCollectionViewTranslationTest.java diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CreatePCollectionViewTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CreatePCollectionViewTranslation.java new file mode 100644 index 000000000000..aa24909c2bf6 --- /dev/null +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CreatePCollectionViewTranslation.java @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.core.construction; + +import static com.google.common.base.Preconditions.checkArgument; + +import com.google.auto.service.AutoService; +import com.google.protobuf.Any; +import com.google.protobuf.ByteString; +import com.google.protobuf.BytesValue; +import java.io.IOException; +import java.util.Collections; +import java.util.Map; +import org.apache.beam.runners.core.construction.PTransformTranslation.TransformPayloadTranslator; +import org.apache.beam.sdk.common.runner.v1.RunnerApi; +import org.apache.beam.sdk.common.runner.v1.RunnerApi.FunctionSpec; +import org.apache.beam.sdk.runners.AppliedPTransform; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.View; +import org.apache.beam.sdk.transforms.View.CreatePCollectionView; +import org.apache.beam.sdk.util.SerializableUtils; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionView; + +/** + * Utility methods for translating a {@link View} transforms to and from {@link RunnerApi} + * representations. + * + * @deprecated this should generally be done as part of {@link ParDo} translation, or moved into a + * dedicated runners-core-construction auxiliary class + */ +@Deprecated +public class CreatePCollectionViewTranslation { + + /** + * @deprecated Since {@link CreatePCollectionView} is not a part of the Beam model, there is no + * SDK-agnostic specification. Using this method means your runner is tied to Java. + */ + @Deprecated + public static PCollectionView getView( + AppliedPTransform< + PCollection, PCollectionView, + PTransform, PCollectionView>> + application) + throws IOException { + + RunnerApi.PTransform transformProto = + PTransformTranslation.toProto( + application, + Collections.>emptyList(), + SdkComponents.create()); + + checkArgument( + PTransformTranslation.CREATE_VIEW_TRANSFORM_URN.equals(transformProto.getSpec().getUrn()), + "Illegal attempt to extract %s from transform %s with name \"%s\" and URN \"%s\"", + PCollectionView.class.getSimpleName(), + application.getTransform(), + application.getFullName(), + transformProto.getSpec().getUrn()); + + return (PCollectionView) + SerializableUtils.deserializeFromByteArray( + transformProto + .getSpec() + .getParameter() + .unpack(BytesValue.class) + .getValue() + .toByteArray(), + PCollectionView.class.getSimpleName()); + } + + @Deprecated + static class CreatePCollectionViewTranslator + implements TransformPayloadTranslator> { + @Override + public String getUrn(View.CreatePCollectionView transform) { + return PTransformTranslation.CREATE_VIEW_TRANSFORM_URN; + } + + @Override + public FunctionSpec translate( + AppliedPTransform> transform, + SdkComponents components) { + return FunctionSpec.newBuilder() + .setUrn(getUrn(transform.getTransform())) + .setParameter( + Any.pack( + BytesValue.newBuilder() + .setValue( + ByteString.copyFrom( + SerializableUtils.serializeToByteArray( + transform.getTransform().getView()))) + .build())) + .build(); + } + } + + /** Registers {@link CreatePCollectionViewTranslator}. */ + @AutoService(TransformPayloadTranslatorRegistrar.class) + @Deprecated + public static class Registrar implements TransformPayloadTranslatorRegistrar { + @Override + public Map, ? extends TransformPayloadTranslator> + getTransformPayloadTranslators() { + return Collections.singletonMap( + View.CreatePCollectionView.class, new CreatePCollectionViewTranslator()); + } + } +} diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java index fcbe84bef623..7c5c5930a1cb 100644 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java @@ -56,6 +56,9 @@ public class PTransformTranslation { // Less well-known. And where shall these live? public static final String WRITE_FILES_TRANSFORM_URN = "urn:beam:transform:write_files:0.1"; + @Deprecated + public static final String CREATE_VIEW_TRANSFORM_URN = "urn:beam:transform:create_view:v1"; + private static final Map, TransformPayloadTranslator> KNOWN_PAYLOAD_TRANSLATORS = loadTransformPayloadTranslators(); @@ -141,9 +144,11 @@ private static String toProto(TupleTag tag) { return tag.getId(); } + /** + * Returns the URN for the transform if it is known, otherwise throws. + */ public static String urnForTransform(PTransform transform) { - TransformPayloadTranslator translator = - KNOWN_PAYLOAD_TRANSLATORS.get(transform.getClass()); + TransformPayloadTranslator translator = KNOWN_PAYLOAD_TRANSLATORS.get(transform.getClass()); if (translator == null) { throw new IllegalStateException( String.format("No translator known for %s", transform.getClass().getName())); @@ -158,6 +163,7 @@ public static String urnForTransform(PTransform transform) { */ public interface TransformPayloadTranslator> { String getUrn(T transform); + FunctionSpec translate(AppliedPTransform application, SdkComponents components) throws IOException; } diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CreatePCollectionViewTranslationTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CreatePCollectionViewTranslationTest.java new file mode 100644 index 000000000000..0d209a0425fc --- /dev/null +++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CreatePCollectionViewTranslationTest.java @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.core.construction; + +import static org.junit.Assert.assertThat; + +import com.google.common.collect.ImmutableList; +import com.google.protobuf.BytesValue; +import org.apache.beam.sdk.common.runner.v1.RunnerApi.FunctionSpec; +import org.apache.beam.sdk.common.runner.v1.RunnerApi.ParDoPayload; +import org.apache.beam.sdk.runners.AppliedPTransform; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.View.CreatePCollectionView; +import org.apache.beam.sdk.util.SerializableUtils; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionView; +import org.apache.beam.sdk.values.PCollectionViews; +import org.hamcrest.Matchers; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameter; +import org.junit.runners.Parameterized.Parameters; +import org.junit.runners.Suite; + +/** Tests for {@link CreatePCollectionViewTranslation}. */ +@RunWith(Suite.class) +@Suite.SuiteClasses({ + CreatePCollectionViewTranslationTest.TestCreatePCollectionViewPayloadTranslation.class, +}) +public class CreatePCollectionViewTranslationTest { + + /** Tests for translating various {@link ParDo} transforms to/from {@link ParDoPayload} protos. */ + @RunWith(Parameterized.class) + public static class TestCreatePCollectionViewPayloadTranslation { + + // Two parameters suffices because the nature of the serialization/deserialization of + // the view is not what is being tested; it is just important that the round trip + // is not vacuous. + @Parameters(name = "{index}: {0}") + public static Iterable> data() { + return ImmutableList.>of( + CreatePCollectionView.of( + PCollectionViews.singletonView( + testPCollection, + testPCollection.getWindowingStrategy(), + false, + null, + testPCollection.getCoder())), + CreatePCollectionView.of( + PCollectionViews.listView( + testPCollection, + testPCollection.getWindowingStrategy(), + testPCollection.getCoder()))); + } + + @Parameter(0) + public CreatePCollectionView createViewTransform; + + public static TestPipeline p = TestPipeline.create().enableAbandonedNodeEnforcement(false); + + private static final PCollection testPCollection = p.apply(Create.of("one")); + + @Test + public void testEncodedProto() throws Exception { + SdkComponents components = SdkComponents.create(); + components.registerPCollection(testPCollection); + + AppliedPTransform appliedPTransform = + AppliedPTransform.of( + "foo", + testPCollection.expand(), + createViewTransform.getView().expand(), + createViewTransform, + p); + + FunctionSpec payload = PTransformTranslation.toProto(appliedPTransform, components).getSpec(); + + // Checks that the payload is what it should be + PCollectionView deserializedView = + (PCollectionView) + SerializableUtils.deserializeFromByteArray( + payload.getParameter().unpack(BytesValue.class).getValue().toByteArray(), + PCollectionView.class.getSimpleName()); + + assertThat( + deserializedView, Matchers.>equalTo(createViewTransform.getView())); + } + + @Test + public void testExtractionDirectFromTransform() throws Exception { + SdkComponents components = SdkComponents.create(); + components.registerPCollection(testPCollection); + + AppliedPTransform appliedPTransform = + AppliedPTransform.of( + "foo", + testPCollection.expand(), + createViewTransform.getView().expand(), + createViewTransform, + p); + + CreatePCollectionViewTranslation.getView((AppliedPTransform) appliedPTransform); + + FunctionSpec payload = PTransformTranslation.toProto(appliedPTransform, components).getSpec(); + + // Checks that the payload is what it should be + PCollectionView deserializedView = + (PCollectionView) + SerializableUtils.deserializeFromByteArray( + payload.getParameter().unpack(BytesValue.class).getValue().toByteArray(), + PCollectionView.class.getSimpleName()); + + assertThat( + deserializedView, Matchers.>equalTo(createViewTransform.getView())); + } + } +}