From dad03a279d81678637949ab5e46efe65ab0bc1d8 Mon Sep 17 00:00:00 2001 From: Thomas Groh Date: Tue, 11 Apr 2017 16:50:06 -0700 Subject: [PATCH 1/5] Translate PTransforms to and from Runner API Protos Update SdkComponents to handle the translations. --- .../core/construction/PTransforms.java | 95 ++++++++++ .../core/construction/SdkComponents.java | 51 ++++- .../core/construction/PTransformsTest.java | 176 ++++++++++++++++++ .../core/construction/SdkComponentsTest.java | 96 +++++++++- 4 files changed, 414 insertions(+), 4 deletions(-) create mode 100644 runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransforms.java create mode 100644 runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformsTest.java diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransforms.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransforms.java new file mode 100644 index 000000000000..357c53b02b0e --- /dev/null +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransforms.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.core.construction; + +import static com.google.common.base.Preconditions.checkArgument; + +import com.google.common.collect.ImmutableMap; +import java.io.IOException; +import java.util.List; +import java.util.Map; +import org.apache.beam.sdk.common.runner.v1.RunnerApi; +import org.apache.beam.sdk.common.runner.v1.RunnerApi.FunctionSpec; +import org.apache.beam.sdk.transforms.AppliedPTransform; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PValue; +import org.apache.beam.sdk.values.TupleTag; + +/** Created by tgroh on 4/7/17. */ +public class PTransforms { + private static final Map, TransformPayloadTranslator> + KNOWN_PAYLOAD_TRANSLATORS = + ImmutableMap., TransformPayloadTranslator>builder().build(); + // TODO: ParDoPayload, WindowIntoPayload, ReadPayload, CombinePayload + // TODO: "Flatten Payload", etc? + private PTransforms() {} + + /** + * Translates an {@link AppliedPTransform} into a runner API proto. + * + *

Does not register {@code application} within the provided {@link SdkComponents}. + */ + public static RunnerApi.PTransform toProto( + AppliedPTransform application, + List> subtransforms, + SdkComponents components) + throws IOException { + RunnerApi.PTransform.Builder transformBuilder = RunnerApi.PTransform.newBuilder(); + for (Map.Entry, PValue> taggedInput : application.getInputs().entrySet()) { + checkArgument(taggedInput.getValue() instanceof PCollection); + transformBuilder.putInputs( + toProto(taggedInput.getKey()), + components.registerPCollection((PCollection) taggedInput.getValue())); + } + for (Map.Entry, PValue> taggedOutput : application.getOutputs().entrySet()) { + checkArgument(taggedOutput.getValue() instanceof PCollection); + transformBuilder.putOutputs( + toProto(taggedOutput.getKey()), + components.registerPCollection((PCollection) taggedOutput.getValue())); + } + for (AppliedPTransform subtransform : subtransforms) { + transformBuilder.addSubtransforms(components.registerPTransform(subtransform)); + } + + transformBuilder.setUniqueName(application.getFullName()); + // TODO: Display Data + + PTransform transform = application.getTransform(); + if (KNOWN_PAYLOAD_TRANSLATORS.containsKey(transform.getClass())) { + FunctionSpec payload = + KNOWN_PAYLOAD_TRANSLATORS.get(transform.getClass()).translate(application, components); + transformBuilder.setSpec(payload); + } + + return transformBuilder.build(); + } + + private static String toProto(TupleTag tag) { + return tag.getId(); + } + + /** + * A translator consumes a {@link PTransform} application and produces the appropriate + * FunctionSpec for a distinguished or primitive transform within the Beam runner API. + */ + public interface TransformPayloadTranslator> { + FunctionSpec translate(AppliedPTransform transform, SdkComponents components); + } +} diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SdkComponents.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SdkComponents.java index 3f1748514845..0b5b6131294c 100644 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SdkComponents.java +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SdkComponents.java @@ -18,10 +18,17 @@ package org.apache.beam.runners.core.construction; +import static com.google.common.base.Preconditions.checkNotNull; +import static com.google.common.base.Preconditions.checkState; + import com.google.common.base.Equivalence; import com.google.common.collect.BiMap; import com.google.common.collect.HashBiMap; import java.io.IOException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; import java.util.Set; import org.apache.beam.sdk.annotations.Experimental; import org.apache.beam.sdk.coders.Coder; @@ -63,15 +70,36 @@ private SdkComponents() { * unique ID for the {@link AppliedPTransform}. Multiple registrations of the same * {@link AppliedPTransform} will return the same unique ID. */ - String registerPTransform(AppliedPTransform pTransform) { + public String registerPTransform( + AppliedPTransform pTransform, List> children) + throws IOException { + String name = registerPTransform(pTransform); + // If this transform is present in the components, nothing to do. return the existing name. + // Otherwise the transform must be translated and added to the components. + if (componentsBuilder.getTransformsOrDefault(name, null) != null) { + return name; + } + checkNotNull(children, "child nodes may not be null"); + componentsBuilder.putTransforms(name, PTransforms.toProto(pTransform, children, this)); + return name; + } + + /** + * Gets the ID for the provided {@link AppliedPTransform}. The provided {@link AppliedPTransform} + * will not be added to the components produced by this {@link SdkComponents} until it is + * translated via {@link #registerPTransform(AppliedPTransform, List)}. + */ + public String registerPTransform(AppliedPTransform pTransform) { String existing = transformIds.get(pTransform); if (existing != null) { return existing; } + String name = pTransform.getFullName(); if (name.isEmpty()) { - name = uniqify("unnamed_ptransform", transformIds.values()); + name = "unnamed-ptransform"; } + name = uniqify(name, transformIds.values()); transformIds.put(pTransform, name); return name; } @@ -156,4 +184,23 @@ private String uniqify(String baseName, Set existing) { RunnerApi.Components toComponents() { return componentsBuilder.build(); } + + /** + * Checks to ensure that all elements that were registered are present within the components + * returned by {@link #toComponents()}. + */ + public void validate() { + Map, String> missingTransforms = new HashMap<>(); + for (Entry, String> registeredTransform : transformIds.entrySet()) { + if (componentsBuilder.getTransformsMap().containsKey(registeredTransform.getValue())) { + missingTransforms.put( + registeredTransform.getKey(), registeredTransform.getValue()); + } + } + checkState( + missingTransforms.isEmpty(), + "%s transforms were registered but were never added to components. Missing transform %s", + missingTransforms.size(), + missingTransforms); + } } diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformsTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformsTest.java new file mode 100644 index 000000000000..9f28d38cefb4 --- /dev/null +++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformsTest.java @@ -0,0 +1,176 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.core.construction; + +import static org.hamcrest.Matchers.equalTo; +import static org.junit.Assert.assertThat; + +import com.google.auto.value.AutoValue; +import com.google.common.collect.ImmutableList; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.common.runner.v1.RunnerApi; +import org.apache.beam.sdk.common.runner.v1.RunnerApi.Components; +import org.apache.beam.sdk.io.CountingInput; +import org.apache.beam.sdk.io.CountingInput.UnboundedCountingInput; +import org.apache.beam.sdk.io.CountingSource; +import org.apache.beam.sdk.io.Read; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.transforms.AppliedPTransform; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.View; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PBegin; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionTuple; +import org.apache.beam.sdk.values.PCollectionView; +import org.apache.beam.sdk.values.PValue; +import org.apache.beam.sdk.values.TupleTag; +import org.apache.beam.sdk.values.TupleTagList; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameter; +import org.junit.runners.Parameterized.Parameters; + +/** + * Tests for {@link PTransforms}. + */ +@RunWith(Parameterized.class) +public class PTransformsTest { + + @Parameters(name = "{index}: {0}") + public static Iterable data() { + // This pipeline exists for construction, not to run any test. + TestPipeline pipeline = TestPipeline.create(); + // TODO: Leaf node with understood payload - i.e. validate payloads + return ImmutableList.builder() + .add(ToAndFromProtoSpec.leaf(read(pipeline))) + .add(ToAndFromProtoSpec.leaf(multiMultiParDo(pipeline))) + .add( + ToAndFromProtoSpec.composite( + countingInput(pipeline), ToAndFromProtoSpec.leaf(read(pipeline)))) + // TODO: Composite with multiple children + // TODO: Composite with a composite child + .build(); + } + + @AutoValue + abstract static class ToAndFromProtoSpec { + public static ToAndFromProtoSpec leaf(AppliedPTransform transform) { + return new AutoValue_PTransformsTest_ToAndFromProtoSpec( + transform, Collections.emptyList()); + } + + public static ToAndFromProtoSpec composite( + AppliedPTransform topLevel, ToAndFromProtoSpec spec, ToAndFromProtoSpec... specs) { + List childSpecs = new ArrayList<>(); + childSpecs.add(spec); + childSpecs.addAll(Arrays.asList(specs)); + return new AutoValue_PTransformsTest_ToAndFromProtoSpec(topLevel, childSpecs); + } + + abstract AppliedPTransform getTransform(); + abstract Collection getChildren(); + } + + @Parameter(0) + public ToAndFromProtoSpec spec; + + @Test + public void toAndFromProto() throws IOException { + SdkComponents components = SdkComponents.create(); + RunnerApi.PTransform converted = convert(spec, components); + Components protoComponents = components.toComponents(); + + // Sanity checks + assertThat(converted.getInputsCount(), equalTo(spec.getTransform().getInputs().size())); + assertThat(converted.getOutputsCount(), equalTo(spec.getTransform().getOutputs().size())); + assertThat(converted.getSubtransformsCount(), equalTo(spec.getChildren().size())); + + assertThat(converted.getUniqueName(), equalTo(spec.getTransform().getFullName())); + for (PValue inputValue : spec.getTransform().getInputs().values()) { + PCollection inputPc = (PCollection) inputValue; + protoComponents.getPcollectionsOrThrow(components.registerPCollection(inputPc)); + } + for (PValue outputValue : spec.getTransform().getOutputs().values()) { + PCollection outputPc = (PCollection) outputValue; + protoComponents.getPcollectionsOrThrow(components.registerPCollection(outputPc)); + } + } + + private RunnerApi.PTransform convert(ToAndFromProtoSpec spec, SdkComponents components) + throws IOException { + List> childTransforms = new ArrayList<>(); + for (ToAndFromProtoSpec child : spec.getChildren()) { + childTransforms.add(child.getTransform()); + convert(child, components); + } + return PTransforms.toProto(spec.getTransform(), childTransforms, components); + } + + private static class TestDoFn extends DoFn> { + // Exists to stop the ParDo application from throwing + @ProcessElement public void process(ProcessContext context) {} + } + + private static AppliedPTransform countingInput(Pipeline pipeline) { + UnboundedCountingInput input = CountingInput.unbounded(); + PCollection pcollection = pipeline.apply(input); + return AppliedPTransform., UnboundedCountingInput>of( + "Count", pipeline.begin().expand(), pcollection.expand(), input, pipeline); + } + + private static AppliedPTransform read(Pipeline pipeline) { + Read.Unbounded transform = Read.from(CountingSource.unbounded()); + PCollection pcollection = pipeline.apply(transform); + return AppliedPTransform., Read.Unbounded>of( + "ReadTheCount", pipeline.begin().expand(), pcollection.expand(), transform, pipeline); + } + + private static AppliedPTransform multiMultiParDo(Pipeline pipeline) { + PCollectionView view = + pipeline.apply(Create.of("foo")).apply(View.asSingleton()); + PCollection input = pipeline.apply(CountingInput.unbounded()); + ParDo.MultiOutput> parDo = + ParDo.of(new TestDoFn()) + .withSideInputs(view) + .withOutputTags( + new TupleTag>() {}, + TupleTagList.of(new TupleTag>() {})); + PCollectionTuple output = input.apply(parDo); + + Map, PValue> inputs = new HashMap<>(); + inputs.putAll(parDo.getAdditionalInputs()); + inputs.putAll(input.expand()); + + return AppliedPTransform + ., PCollectionTuple, ParDo.MultiOutput>>of( + "MultiParDoInAndOut", inputs, output.expand(), parDo, pipeline); + } +} diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/SdkComponentsTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/SdkComponentsTest.java index 1854e5a449e0..254bce728855 100644 --- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/SdkComponentsTest.java +++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/SdkComponentsTest.java @@ -20,11 +20,14 @@ import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.isEmptyOrNullString; import static org.hamcrest.Matchers.not; +import static org.hamcrest.Matchers.nullValue; import static org.junit.Assert.assertThat; import java.io.IOException; +import java.util.Collections; import org.apache.beam.sdk.coders.ByteArrayCoder; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.IterableCoder; @@ -87,7 +90,7 @@ public void registerCoderEqualsNotSame() throws IOException { } @Test - public void registerTransform() { + public void registerTransform() throws IOException { Create.Values create = Create.of(1, 2, 3); PCollection pt = pipeline.apply(create); String userName = "my_transform/my_nesting"; @@ -100,7 +103,7 @@ public void registerTransform() { } @Test - public void registerTransformIdEmptyFullName() { + public void registerTransformEmptyFullName() throws IOException { Create.Values create = Create.of(1, 2, 3); PCollection pt = pipeline.apply(create); AppliedPTransform transform = @@ -108,7 +111,96 @@ public void registerTransformIdEmptyFullName() { "", pipeline.begin().expand(), pt.expand(), create, pipeline); String assignedName = components.registerPTransform(transform); + // A name should be assigned when this PTransform is registered assertThat(assignedName, not(isEmptyOrNullString())); + + // However, until its component nodes are specified, that PTransform cannot be a component. + // Validation should fail + assertThat( + components.toComponents().getTransformsOrDefault(assignedName, null), is(nullValue())); + thrown.expect(IllegalStateException.class); + thrown.expectMessage(assignedName); + thrown.expectMessage(transform.toString()); + components.validate(); + } + + @Test + public void registerTransformNullComponents() throws IOException { + Create.Values create = Create.of(1, 2, 3); + PCollection pt = pipeline.apply(create); + String userName = "my_transform/my_nesting"; + AppliedPTransform transform = + AppliedPTransform., Create.Values>of( + userName, pipeline.begin().expand(), pt.expand(), create, pipeline); + thrown.expect(NullPointerException.class); + thrown.expectMessage("child nodes may not be null"); + components.registerPTransform(transform, null); + } + + @Test + public void registerTransformNoChildren() throws IOException { + Create.Values create = Create.of(1, 2, 3); + PCollection pt = pipeline.apply(create); + String userName = "my_transform/my_nesting"; + AppliedPTransform transform = + AppliedPTransform., Create.Values>of( + userName, pipeline.begin().expand(), pt.expand(), create, pipeline); + assertThat(components.registerPTransform(transform), not(isEmptyOrNullString())); + assertThat( + "Getting a transform ID without providing component transforms " + + "should not insert a component", + components.toComponents().getTransformsCount(), + equalTo(0)); + } + + /** + * Demonstrates that getting the ID of an {@link AppliedPTransform}, then getting the ID with a + * list of {@link AppliedPTransform} components returns the same ID, and on the latter call + * inserts the translation into the components. + */ + @Test + public void registerTransformThenRegisterPTransformWithEmptyChildren() throws IOException { + Create.Values create = Create.of(1, 2, 3); + PCollection pt = pipeline.apply(create); + String userName = "my_transform/my_nesting"; + AppliedPTransform transform = + AppliedPTransform., Create.Values>of( + userName, pipeline.begin().expand(), pt.expand(), create, pipeline); + String originalId = components.registerPTransform(transform); + assertThat(originalId, not(isEmptyOrNullString())); + // The original transform should not yet be present in the components, as it hasn't been + // translated with its children + assertThat(components.toComponents().getTransformsOrDefault(originalId, null), nullValue()); + + String reinserted = + components.registerPTransform( + transform, Collections.>emptyList()); + assertThat(reinserted, equalTo(originalId)); + components.toComponents().getTransformsOrThrow(originalId); + } + + @Test + public void registerTransformWithUnregisteredChildren() throws IOException { + Create.Values create = Create.of(1L, 2L, 3L); + CountingInput.UnboundedCountingInput createChild = CountingInput.unbounded(); + + PCollection pt = pipeline.apply(create); + String userName = "my_transform"; + String childUserName = "my_transform/my_nesting"; + AppliedPTransform transform = + AppliedPTransform., Create.Values>of( + userName, pipeline.begin().expand(), pt.expand(), create, pipeline); + AppliedPTransform childTransform = + AppliedPTransform., CountingInput.UnboundedCountingInput>of( + childUserName, pipeline.begin().expand(), pt.expand(), createChild, pipeline); + + String parentId = + components.registerPTransform( + transform, Collections.>singletonList(childTransform)); + assertThat(parentId, not(isEmptyOrNullString())); + components.toComponents().getTransformsOrThrow(parentId); + // Only the parent should be present within the components + assertThat(components.toComponents().getTransformsCount(), equalTo(1)); } @Test From ae3077509185756ba259e79db70db3ed5ec2fc34 Mon Sep 17 00:00:00 2001 From: Thomas Groh Date: Fri, 14 Apr 2017 13:06:46 -0700 Subject: [PATCH 2/5] squash! Translate PTransforms to and from Runner API Protos ParDo and Combine additional inputs should have the PCollections that are input to their PCollectionViews, not the PCollectionViews. --- .../runners/core/construction/PTransforms.java | 10 ++++++++-- .../core/construction/SdkComponents.java | 2 +- .../core/construction/PTransformsTest.java | 17 +++++++++++------ .../org/apache/beam/sdk/transforms/Combine.java | 4 ++-- .../org/apache/beam/sdk/transforms/ParDo.java | 4 ++-- 5 files changed, 24 insertions(+), 13 deletions(-) diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransforms.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransforms.java index 357c53b02b0e..e0cb9956c8c0 100644 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransforms.java +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransforms.java @@ -53,13 +53,19 @@ public static RunnerApi.PTransform toProto( throws IOException { RunnerApi.PTransform.Builder transformBuilder = RunnerApi.PTransform.newBuilder(); for (Map.Entry, PValue> taggedInput : application.getInputs().entrySet()) { - checkArgument(taggedInput.getValue() instanceof PCollection); + checkArgument( + taggedInput.getValue() instanceof PCollection, + "Unexpected input type %s", + taggedInput.getValue().getClass()); transformBuilder.putInputs( toProto(taggedInput.getKey()), components.registerPCollection((PCollection) taggedInput.getValue())); } for (Map.Entry, PValue> taggedOutput : application.getOutputs().entrySet()) { - checkArgument(taggedOutput.getValue() instanceof PCollection); + checkArgument( + taggedOutput.getValue() instanceof PCollection, + "Unexpected output type %s", + taggedOutput.getValue().getClass()); transformBuilder.putOutputs( toProto(taggedOutput.getKey()), components.registerPCollection((PCollection) taggedOutput.getValue())); diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SdkComponents.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SdkComponents.java index 0b5b6131294c..423f97013c8a 100644 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SdkComponents.java +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SdkComponents.java @@ -192,7 +192,7 @@ RunnerApi.Components toComponents() { public void validate() { Map, String> missingTransforms = new HashMap<>(); for (Entry, String> registeredTransform : transformIds.entrySet()) { - if (componentsBuilder.getTransformsMap().containsKey(registeredTransform.getValue())) { + if (!componentsBuilder.getTransformsMap().containsKey(registeredTransform.getValue())) { missingTransforms.put( registeredTransform.getKey(), registeredTransform.getValue()); } diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformsTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformsTest.java index 9f28d38cefb4..9a84da3f1c09 100644 --- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformsTest.java +++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformsTest.java @@ -67,14 +67,19 @@ public class PTransformsTest { @Parameters(name = "{index}: {0}") public static Iterable data() { // This pipeline exists for construction, not to run any test. - TestPipeline pipeline = TestPipeline.create(); // TODO: Leaf node with understood payload - i.e. validate payloads + ToAndFromProtoSpec readLeaf = ToAndFromProtoSpec.leaf(read(TestPipeline.create())); + ToAndFromProtoSpec readMultipleInAndOut = + ToAndFromProtoSpec.leaf(multiMultiParDo(TestPipeline.create())); + TestPipeline compositeReadPipeline = TestPipeline.create(); + ToAndFromProtoSpec compositeRead = + ToAndFromProtoSpec.composite( + countingInput(compositeReadPipeline), + ToAndFromProtoSpec.leaf(read(compositeReadPipeline))); return ImmutableList.builder() - .add(ToAndFromProtoSpec.leaf(read(pipeline))) - .add(ToAndFromProtoSpec.leaf(multiMultiParDo(pipeline))) - .add( - ToAndFromProtoSpec.composite( - countingInput(pipeline), ToAndFromProtoSpec.leaf(read(pipeline)))) + .add(readLeaf) + .add(readMultipleInAndOut) + .add(compositeRead) // TODO: Composite with multiple children // TODO: Composite with a composite child .build(); diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java index 8fe483109127..e69d6d87b726 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java @@ -1444,7 +1444,7 @@ public List> getSideInputs() { public Map, PValue> getAdditionalInputs() { ImmutableMap.Builder, PValue> additionalInputs = ImmutableMap.builder(); for (PCollectionView sideInput : sideInputs) { - additionalInputs.put(sideInput.getTagInternal(), sideInput); + additionalInputs.put(sideInput.getTagInternal(), sideInput.getPCollection()); } return additionalInputs.build(); } @@ -1900,7 +1900,7 @@ public List> getSideInputs() { public Map, PValue> getAdditionalInputs() { ImmutableMap.Builder, PValue> additionalInputs = ImmutableMap.builder(); for (PCollectionView sideInput : sideInputs) { - additionalInputs.put(sideInput.getTagInternal(), sideInput); + additionalInputs.put(sideInput.getTagInternal(), sideInput.getPCollection()); } return additionalInputs.build(); } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java index 3de845b6b46c..d83270a7f48d 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java @@ -664,7 +664,7 @@ public List> getSideInputs() { public Map, PValue> getAdditionalInputs() { ImmutableMap.Builder, PValue> additionalInputs = ImmutableMap.builder(); for (PCollectionView sideInput : sideInputs) { - additionalInputs.put(sideInput.getTagInternal(), sideInput); + additionalInputs.put(sideInput.getTagInternal(), sideInput.getPCollection()); } return additionalInputs.build(); } @@ -811,7 +811,7 @@ public List> getSideInputs() { public Map, PValue> getAdditionalInputs() { ImmutableMap.Builder, PValue> additionalInputs = ImmutableMap.builder(); for (PCollectionView sideInput : sideInputs) { - additionalInputs.put(sideInput.getTagInternal(), sideInput); + additionalInputs.put(sideInput.getTagInternal(), sideInput.getPCollection()); } return additionalInputs.build(); } From 1c8472b47be3cf07d33a25c0c57a487d26ba364f Mon Sep 17 00:00:00 2001 From: Thomas Groh Date: Mon, 17 Apr 2017 11:22:39 -0700 Subject: [PATCH 3/5] fixup! Translate PTransforms to and from Runner API Protos --- .../core/construction/PTransforms.java | 24 ++++++++++++------- .../core/construction/SdkComponents.java | 16 ++++++------- 2 files changed, 23 insertions(+), 17 deletions(-) diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransforms.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransforms.java index e0cb9956c8c0..fe0d15b82510 100644 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransforms.java +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransforms.java @@ -32,27 +32,31 @@ import org.apache.beam.sdk.values.PValue; import org.apache.beam.sdk.values.TupleTag; -/** Created by tgroh on 4/7/17. */ +/** + * Utilities for converting {@link PTransform PTransforms} to and from {@link RunnerApi Runner API + * protocol buffers}. + */ public class PTransforms { private static final Map, TransformPayloadTranslator> KNOWN_PAYLOAD_TRANSLATORS = ImmutableMap., TransformPayloadTranslator>builder().build(); // TODO: ParDoPayload, WindowIntoPayload, ReadPayload, CombinePayload // TODO: "Flatten Payload", etc? + // TODO: Load via service loader. private PTransforms() {} /** * Translates an {@link AppliedPTransform} into a runner API proto. * - *

Does not register {@code application} within the provided {@link SdkComponents}. + *

Does not register the {@code appliedPTransform} within the provided {@link SdkComponents}. */ - public static RunnerApi.PTransform toProto( - AppliedPTransform application, + static RunnerApi.PTransform toProto( + AppliedPTransform appliedPTransform, List> subtransforms, SdkComponents components) throws IOException { RunnerApi.PTransform.Builder transformBuilder = RunnerApi.PTransform.newBuilder(); - for (Map.Entry, PValue> taggedInput : application.getInputs().entrySet()) { + for (Map.Entry, PValue> taggedInput : appliedPTransform.getInputs().entrySet()) { checkArgument( taggedInput.getValue() instanceof PCollection, "Unexpected input type %s", @@ -61,7 +65,7 @@ public static RunnerApi.PTransform toProto( toProto(taggedInput.getKey()), components.registerPCollection((PCollection) taggedInput.getValue())); } - for (Map.Entry, PValue> taggedOutput : application.getOutputs().entrySet()) { + for (Map.Entry, PValue> taggedOutput : appliedPTransform.getOutputs().entrySet()) { checkArgument( taggedOutput.getValue() instanceof PCollection, "Unexpected output type %s", @@ -74,13 +78,15 @@ public static RunnerApi.PTransform toProto( transformBuilder.addSubtransforms(components.registerPTransform(subtransform)); } - transformBuilder.setUniqueName(application.getFullName()); + transformBuilder.setUniqueName(appliedPTransform.getFullName()); // TODO: Display Data - PTransform transform = application.getTransform(); + PTransform transform = appliedPTransform.getTransform(); if (KNOWN_PAYLOAD_TRANSLATORS.containsKey(transform.getClass())) { FunctionSpec payload = - KNOWN_PAYLOAD_TRANSLATORS.get(transform.getClass()).translate(application, components); + KNOWN_PAYLOAD_TRANSLATORS + .get(transform.getClass()) + .translate(appliedPTransform, components); transformBuilder.setSpec(payload); } diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SdkComponents.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SdkComponents.java index 423f97013c8a..be8a77aa7dc4 100644 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SdkComponents.java +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SdkComponents.java @@ -70,17 +70,17 @@ private SdkComponents() { * unique ID for the {@link AppliedPTransform}. Multiple registrations of the same * {@link AppliedPTransform} will return the same unique ID. */ - public String registerPTransform( - AppliedPTransform pTransform, List> children) + String registerPTransform( + AppliedPTransform appliedPTransform, List> children) throws IOException { - String name = registerPTransform(pTransform); + String name = registerPTransform(appliedPTransform); // If this transform is present in the components, nothing to do. return the existing name. // Otherwise the transform must be translated and added to the components. if (componentsBuilder.getTransformsOrDefault(name, null) != null) { return name; } checkNotNull(children, "child nodes may not be null"); - componentsBuilder.putTransforms(name, PTransforms.toProto(pTransform, children, this)); + componentsBuilder.putTransforms(name, PTransforms.toProto(appliedPTransform, children, this)); return name; } @@ -89,18 +89,18 @@ public String registerPTransform( * will not be added to the components produced by this {@link SdkComponents} until it is * translated via {@link #registerPTransform(AppliedPTransform, List)}. */ - public String registerPTransform(AppliedPTransform pTransform) { - String existing = transformIds.get(pTransform); + String registerPTransform(AppliedPTransform appliedPTransform) { + String existing = transformIds.get(appliedPTransform); if (existing != null) { return existing; } - String name = pTransform.getFullName(); + String name = appliedPTransform.getFullName(); if (name.isEmpty()) { name = "unnamed-ptransform"; } name = uniqify(name, transformIds.values()); - transformIds.put(pTransform, name); + transformIds.put(appliedPTransform, name); return name; } From b2c772d2ca481d75957c9aec68393f25d74d08e0 Mon Sep 17 00:00:00 2001 From: Thomas Groh Date: Mon, 17 Apr 2017 11:37:34 -0700 Subject: [PATCH 4/5] fixup! Translate PTransforms to and from Runner API Protos --- .../core/construction/PTransforms.java | 2 +- .../core/construction/SdkComponents.java | 39 +++---- .../core/construction/SdkComponentsTest.java | 100 +++++++----------- 3 files changed, 52 insertions(+), 89 deletions(-) diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransforms.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransforms.java index fe0d15b82510..7ec0863860b6 100644 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransforms.java +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransforms.java @@ -75,7 +75,7 @@ static RunnerApi.PTransform toProto( components.registerPCollection((PCollection) taggedOutput.getValue())); } for (AppliedPTransform subtransform : subtransforms) { - transformBuilder.addSubtransforms(components.registerPTransform(subtransform)); + transformBuilder.addSubtransforms(components.getExistingPTransformId(subtransform)); } transformBuilder.setUniqueName(appliedPTransform.getFullName()); diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SdkComponents.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SdkComponents.java index be8a77aa7dc4..35af3006d2fe 100644 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SdkComponents.java +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SdkComponents.java @@ -18,17 +18,14 @@ package org.apache.beam.runners.core.construction; +import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkNotNull; -import static com.google.common.base.Preconditions.checkState; import com.google.common.base.Equivalence; import com.google.common.collect.BiMap; import com.google.common.collect.HashBiMap; import java.io.IOException; -import java.util.HashMap; import java.util.List; -import java.util.Map; -import java.util.Map.Entry; import java.util.Set; import org.apache.beam.sdk.annotations.Experimental; import org.apache.beam.sdk.coders.Coder; @@ -69,11 +66,13 @@ private SdkComponents() { * Registers the provided {@link AppliedPTransform} into this {@link SdkComponents}, returning a * unique ID for the {@link AppliedPTransform}. Multiple registrations of the same * {@link AppliedPTransform} will return the same unique ID. + * + *

All of the children must already be registered within this {@link SdkComponents}. */ String registerPTransform( AppliedPTransform appliedPTransform, List> children) throws IOException { - String name = registerPTransform(appliedPTransform); + String name = getApplicationName(appliedPTransform); // If this transform is present in the components, nothing to do. return the existing name. // Otherwise the transform must be translated and added to the components. if (componentsBuilder.getTransformsOrDefault(name, null) != null) { @@ -89,7 +88,7 @@ String registerPTransform( * will not be added to the components produced by this {@link SdkComponents} until it is * translated via {@link #registerPTransform(AppliedPTransform, List)}. */ - String registerPTransform(AppliedPTransform appliedPTransform) { + private String getApplicationName(AppliedPTransform appliedPTransform) { String existing = transformIds.get(appliedPTransform); if (existing != null) { return existing; @@ -104,6 +103,15 @@ String registerPTransform(AppliedPTransform appliedPTransform) { return name; } + String getExistingPTransformId(AppliedPTransform appliedPTransform) { + checkArgument( + transformIds.containsKey(appliedPTransform), + "%s %s has not been previously registered", + AppliedPTransform.class.getSimpleName(), + appliedPTransform); + return transformIds.get(appliedPTransform); + } + /** * Registers the provided {@link PCollection} into this {@link SdkComponents}, returning a unique * ID for the {@link PCollection}. Multiple registrations of the same {@link PCollection} will @@ -184,23 +192,4 @@ private String uniqify(String baseName, Set existing) { RunnerApi.Components toComponents() { return componentsBuilder.build(); } - - /** - * Checks to ensure that all elements that were registered are present within the components - * returned by {@link #toComponents()}. - */ - public void validate() { - Map, String> missingTransforms = new HashMap<>(); - for (Entry, String> registeredTransform : transformIds.entrySet()) { - if (!componentsBuilder.getTransformsMap().containsKey(registeredTransform.getValue())) { - missingTransforms.put( - registeredTransform.getKey(), registeredTransform.getValue()); - } - } - checkState( - missingTransforms.isEmpty(), - "%s transforms were registered but were never added to components. Missing transform %s", - missingTransforms.size(), - missingTransforms); - } } diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/SdkComponentsTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/SdkComponentsTest.java index 254bce728855..895aec48572d 100644 --- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/SdkComponentsTest.java +++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/SdkComponentsTest.java @@ -20,10 +20,8 @@ import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; -import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.isEmptyOrNullString; import static org.hamcrest.Matchers.not; -import static org.hamcrest.Matchers.nullValue; import static org.junit.Assert.assertThat; import java.io.IOException; @@ -35,6 +33,7 @@ import org.apache.beam.sdk.coders.SetCoder; import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.coders.VarLongCoder; +import org.apache.beam.sdk.common.runner.v1.RunnerApi.Components; import org.apache.beam.sdk.io.CountingInput; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.AppliedPTransform; @@ -90,16 +89,42 @@ public void registerCoderEqualsNotSame() throws IOException { } @Test - public void registerTransform() throws IOException { + public void registerTransformNoChildren() throws IOException { Create.Values create = Create.of(1, 2, 3); PCollection pt = pipeline.apply(create); String userName = "my_transform/my_nesting"; AppliedPTransform transform = AppliedPTransform., Create.Values>of( userName, pipeline.begin().expand(), pt.expand(), create, pipeline); - String componentName = components.registerPTransform(transform); + String componentName = + components.registerPTransform( + transform, Collections.>emptyList()); assertThat(componentName, equalTo(userName)); - assertThat(components.registerPTransform(transform), equalTo(componentName)); + assertThat(components.getExistingPTransformId(transform), equalTo(componentName)); + } + + @Test + public void registerTransformAfterChildren() throws IOException { + Create.Values create = Create.of(1L, 2L, 3L); + CountingInput.UnboundedCountingInput createChild = CountingInput.unbounded(); + + PCollection pt = pipeline.apply(create); + String userName = "my_transform"; + String childUserName = "my_transform/my_nesting"; + AppliedPTransform transform = + AppliedPTransform., Create.Values>of( + userName, pipeline.begin().expand(), pt.expand(), create, pipeline); + AppliedPTransform childTransform = + AppliedPTransform., CountingInput.UnboundedCountingInput>of( + childUserName, pipeline.begin().expand(), pt.expand(), createChild, pipeline); + + String childId = components.registerPTransform(childTransform, + Collections.>emptyList()); + String parentId = components.registerPTransform(transform, + Collections.>singletonList(childTransform)); + Components components = this.components.toComponents(); + assertThat(components.getTransformsOrThrow(parentId).getSubtransforms(0), equalTo(childId)); + assertThat(components.getTransformsOrThrow(childId).getSubtransformsCount(), equalTo(0)); } @Test @@ -109,19 +134,10 @@ public void registerTransformEmptyFullName() throws IOException { AppliedPTransform transform = AppliedPTransform., Create.Values>of( "", pipeline.begin().expand(), pt.expand(), create, pipeline); - String assignedName = components.registerPTransform(transform); - - // A name should be assigned when this PTransform is registered - assertThat(assignedName, not(isEmptyOrNullString())); - // However, until its component nodes are specified, that PTransform cannot be a component. - // Validation should fail - assertThat( - components.toComponents().getTransformsOrDefault(assignedName, null), is(nullValue())); - thrown.expect(IllegalStateException.class); - thrown.expectMessage(assignedName); + thrown.expect(IllegalArgumentException.class); thrown.expectMessage(transform.toString()); - components.validate(); + components.getExistingPTransformId(transform); } @Test @@ -137,48 +153,9 @@ public void registerTransformNullComponents() throws IOException { components.registerPTransform(transform, null); } - @Test - public void registerTransformNoChildren() throws IOException { - Create.Values create = Create.of(1, 2, 3); - PCollection pt = pipeline.apply(create); - String userName = "my_transform/my_nesting"; - AppliedPTransform transform = - AppliedPTransform., Create.Values>of( - userName, pipeline.begin().expand(), pt.expand(), create, pipeline); - assertThat(components.registerPTransform(transform), not(isEmptyOrNullString())); - assertThat( - "Getting a transform ID without providing component transforms " - + "should not insert a component", - components.toComponents().getTransformsCount(), - equalTo(0)); - } - /** - * Demonstrates that getting the ID of an {@link AppliedPTransform}, then getting the ID with a - * list of {@link AppliedPTransform} components returns the same ID, and on the latter call - * inserts the translation into the components. + * Tests that trying to register a transform which has unregistered children throws. */ - @Test - public void registerTransformThenRegisterPTransformWithEmptyChildren() throws IOException { - Create.Values create = Create.of(1, 2, 3); - PCollection pt = pipeline.apply(create); - String userName = "my_transform/my_nesting"; - AppliedPTransform transform = - AppliedPTransform., Create.Values>of( - userName, pipeline.begin().expand(), pt.expand(), create, pipeline); - String originalId = components.registerPTransform(transform); - assertThat(originalId, not(isEmptyOrNullString())); - // The original transform should not yet be present in the components, as it hasn't been - // translated with its children - assertThat(components.toComponents().getTransformsOrDefault(originalId, null), nullValue()); - - String reinserted = - components.registerPTransform( - transform, Collections.>emptyList()); - assertThat(reinserted, equalTo(originalId)); - components.toComponents().getTransformsOrThrow(originalId); - } - @Test public void registerTransformWithUnregisteredChildren() throws IOException { Create.Values create = Create.of(1L, 2L, 3L); @@ -194,13 +171,10 @@ public void registerTransformWithUnregisteredChildren() throws IOException { AppliedPTransform., CountingInput.UnboundedCountingInput>of( childUserName, pipeline.begin().expand(), pt.expand(), createChild, pipeline); - String parentId = - components.registerPTransform( - transform, Collections.>singletonList(childTransform)); - assertThat(parentId, not(isEmptyOrNullString())); - components.toComponents().getTransformsOrThrow(parentId); - // Only the parent should be present within the components - assertThat(components.toComponents().getTransformsCount(), equalTo(1)); + thrown.expect(IllegalArgumentException.class); + thrown.expectMessage(childTransform.toString()); + components.registerPTransform( + transform, Collections.>singletonList(childTransform)); } @Test From 6c31b141fa2000f3b356a0285370fdc86cd6aeb0 Mon Sep 17 00:00:00 2001 From: Thomas Groh Date: Mon, 17 Apr 2017 11:55:35 -0700 Subject: [PATCH 5/5] fixup! Translate PTransforms to and from Runner API Protos --- .../runners/core/construction/PTransformsTest.java | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformsTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformsTest.java index 9a84da3f1c09..4e3cdb63bcb5 100644 --- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformsTest.java +++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformsTest.java @@ -34,6 +34,7 @@ import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.common.runner.v1.RunnerApi; import org.apache.beam.sdk.common.runner.v1.RunnerApi.Components; +import org.apache.beam.sdk.common.runner.v1.RunnerApi.PTransform; import org.apache.beam.sdk.io.CountingInput; import org.apache.beam.sdk.io.CountingInput.UnboundedCountingInput; import org.apache.beam.sdk.io.CountingSource; @@ -134,9 +135,16 @@ private RunnerApi.PTransform convert(ToAndFromProtoSpec spec, SdkComponents comp List> childTransforms = new ArrayList<>(); for (ToAndFromProtoSpec child : spec.getChildren()) { childTransforms.add(child.getTransform()); + System.out.println("Converting child " + child); convert(child, components); + // Sanity call + components.getExistingPTransformId(child.getTransform()); } - return PTransforms.toProto(spec.getTransform(), childTransforms, components); + PTransform convert = PTransforms.toProto(spec.getTransform(), childTransforms, components); + // Make sure the converted transform is registered. Convert it independently, but if this is a + // child spec, the child must be in the components. + components.registerPTransform(spec.getTransform(), childTransforms); + return convert; } private static class TestDoFn extends DoFn> {