Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.beam.runners.core.construction;

import static com.google.common.base.Preconditions.checkArgument;

import com.google.common.collect.ImmutableMap;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import org.apache.beam.sdk.common.runner.v1.RunnerApi;
import org.apache.beam.sdk.common.runner.v1.RunnerApi.FunctionSpec;
import org.apache.beam.sdk.transforms.AppliedPTransform;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PValue;
import org.apache.beam.sdk.values.TupleTag;

/**
* Utilities for converting {@link PTransform PTransforms} to and from {@link RunnerApi Runner API
* protocol buffers}.
*/
public class PTransforms {
private static final Map<Class<? extends PTransform>, TransformPayloadTranslator>
KNOWN_PAYLOAD_TRANSLATORS =
ImmutableMap.<Class<? extends PTransform>, TransformPayloadTranslator>builder().build();
// TODO: ParDoPayload, WindowIntoPayload, ReadPayload, CombinePayload
// TODO: "Flatten Payload", etc?
// TODO: Load via service loader.
private PTransforms() {}

/**
* Translates an {@link AppliedPTransform} into a runner API proto.
*
* <p>Does not register the {@code appliedPTransform} within the provided {@link SdkComponents}.
*/
static RunnerApi.PTransform toProto(
AppliedPTransform<?, ?, ?> appliedPTransform,
List<AppliedPTransform<?, ?, ?>> subtransforms,
SdkComponents components)
throws IOException {
RunnerApi.PTransform.Builder transformBuilder = RunnerApi.PTransform.newBuilder();
for (Map.Entry<TupleTag<?>, PValue> taggedInput : appliedPTransform.getInputs().entrySet()) {
checkArgument(
taggedInput.getValue() instanceof PCollection,
"Unexpected input type %s",
taggedInput.getValue().getClass());
transformBuilder.putInputs(
toProto(taggedInput.getKey()),
components.registerPCollection((PCollection<?>) taggedInput.getValue()));
}
for (Map.Entry<TupleTag<?>, PValue> taggedOutput : appliedPTransform.getOutputs().entrySet()) {
checkArgument(
taggedOutput.getValue() instanceof PCollection,
"Unexpected output type %s",
taggedOutput.getValue().getClass());
transformBuilder.putOutputs(
toProto(taggedOutput.getKey()),
components.registerPCollection((PCollection<?>) taggedOutput.getValue()));
}
for (AppliedPTransform<?, ?, ?> subtransform : subtransforms) {
transformBuilder.addSubtransforms(components.getExistingPTransformId(subtransform));
}

transformBuilder.setUniqueName(appliedPTransform.getFullName());
// TODO: Display Data

PTransform<?, ?> transform = appliedPTransform.getTransform();
if (KNOWN_PAYLOAD_TRANSLATORS.containsKey(transform.getClass())) {
FunctionSpec payload =
KNOWN_PAYLOAD_TRANSLATORS
.get(transform.getClass())
.translate(appliedPTransform, components);
transformBuilder.setSpec(payload);
}

return transformBuilder.build();
}

private static String toProto(TupleTag<?> tag) {
return tag.getId();
}

/**
* A translator consumes a {@link PTransform} application and produces the appropriate
* FunctionSpec for a distinguished or primitive transform within the Beam runner API.
*/
public interface TransformPayloadTranslator<T extends PTransform<?, ?>> {
FunctionSpec translate(AppliedPTransform<?, ?, T> transform, SdkComponents components);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,14 @@

package org.apache.beam.runners.core.construction;

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;

import com.google.common.base.Equivalence;
import com.google.common.collect.BiMap;
import com.google.common.collect.HashBiMap;
import java.io.IOException;
import java.util.List;
import java.util.Set;
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.coders.Coder;
Expand Down Expand Up @@ -62,20 +66,52 @@ private SdkComponents() {
* Registers the provided {@link AppliedPTransform} into this {@link SdkComponents}, returning a
* unique ID for the {@link AppliedPTransform}. Multiple registrations of the same
* {@link AppliedPTransform} will return the same unique ID.
*
* <p>All of the children must already be registered within this {@link SdkComponents}.
*/
String registerPTransform(AppliedPTransform<?, ?, ?> pTransform) {
String existing = transformIds.get(pTransform);
String registerPTransform(
AppliedPTransform<?, ?, ?> appliedPTransform, List<AppliedPTransform<?, ?, ?>> children)
throws IOException {
String name = getApplicationName(appliedPTransform);
// If this transform is present in the components, nothing to do. return the existing name.
// Otherwise the transform must be translated and added to the components.
if (componentsBuilder.getTransformsOrDefault(name, null) != null) {
return name;
}
checkNotNull(children, "child nodes may not be null");
componentsBuilder.putTransforms(name, PTransforms.toProto(appliedPTransform, children, this));
return name;
}

/**
* Gets the ID for the provided {@link AppliedPTransform}. The provided {@link AppliedPTransform}
* will not be added to the components produced by this {@link SdkComponents} until it is
* translated via {@link #registerPTransform(AppliedPTransform, List)}.
*/
private String getApplicationName(AppliedPTransform<?, ?, ?> appliedPTransform) {
String existing = transformIds.get(appliedPTransform);
if (existing != null) {
return existing;
}
String name = pTransform.getFullName();

String name = appliedPTransform.getFullName();
if (name.isEmpty()) {
name = uniqify("unnamed_ptransform", transformIds.values());
name = "unnamed-ptransform";
}
transformIds.put(pTransform, name);
name = uniqify(name, transformIds.values());
transformIds.put(appliedPTransform, name);
return name;
}

String getExistingPTransformId(AppliedPTransform<?, ?, ?> appliedPTransform) {
checkArgument(
transformIds.containsKey(appliedPTransform),
"%s %s has not been previously registered",
AppliedPTransform.class.getSimpleName(),
appliedPTransform);
return transformIds.get(appliedPTransform);
}

/**
* Registers the provided {@link PCollection} into this {@link SdkComponents}, returning a unique
* ID for the {@link PCollection}. Multiple registrations of the same {@link PCollection} will
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,189 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.beam.runners.core.construction;

import static org.hamcrest.Matchers.equalTo;
import static org.junit.Assert.assertThat;

import com.google.auto.value.AutoValue;
import com.google.common.collect.ImmutableList;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.common.runner.v1.RunnerApi;
import org.apache.beam.sdk.common.runner.v1.RunnerApi.Components;
import org.apache.beam.sdk.common.runner.v1.RunnerApi.PTransform;
import org.apache.beam.sdk.io.CountingInput;
import org.apache.beam.sdk.io.CountingInput.UnboundedCountingInput;
import org.apache.beam.sdk.io.CountingSource;
import org.apache.beam.sdk.io.Read;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.AppliedPTransform;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.View;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.PValue;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TupleTagList;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameter;
import org.junit.runners.Parameterized.Parameters;

/**
* Tests for {@link PTransforms}.
*/
@RunWith(Parameterized.class)
public class PTransformsTest {

@Parameters(name = "{index}: {0}")
public static Iterable<ToAndFromProtoSpec> data() {
// This pipeline exists for construction, not to run any test.
// TODO: Leaf node with understood payload - i.e. validate payloads
ToAndFromProtoSpec readLeaf = ToAndFromProtoSpec.leaf(read(TestPipeline.create()));
ToAndFromProtoSpec readMultipleInAndOut =
ToAndFromProtoSpec.leaf(multiMultiParDo(TestPipeline.create()));
TestPipeline compositeReadPipeline = TestPipeline.create();
ToAndFromProtoSpec compositeRead =
ToAndFromProtoSpec.composite(
countingInput(compositeReadPipeline),
ToAndFromProtoSpec.leaf(read(compositeReadPipeline)));
return ImmutableList.<ToAndFromProtoSpec>builder()
.add(readLeaf)
.add(readMultipleInAndOut)
.add(compositeRead)
// TODO: Composite with multiple children
// TODO: Composite with a composite child
.build();
}

@AutoValue
abstract static class ToAndFromProtoSpec {
public static ToAndFromProtoSpec leaf(AppliedPTransform<?, ?, ?> transform) {
return new AutoValue_PTransformsTest_ToAndFromProtoSpec(
transform, Collections.<ToAndFromProtoSpec>emptyList());
}

public static ToAndFromProtoSpec composite(
AppliedPTransform<?, ?, ?> topLevel, ToAndFromProtoSpec spec, ToAndFromProtoSpec... specs) {
List<ToAndFromProtoSpec> childSpecs = new ArrayList<>();
childSpecs.add(spec);
childSpecs.addAll(Arrays.asList(specs));
return new AutoValue_PTransformsTest_ToAndFromProtoSpec(topLevel, childSpecs);
}

abstract AppliedPTransform<?, ?, ?> getTransform();
abstract Collection<ToAndFromProtoSpec> getChildren();
}

@Parameter(0)
public ToAndFromProtoSpec spec;

@Test
public void toAndFromProto() throws IOException {
SdkComponents components = SdkComponents.create();
RunnerApi.PTransform converted = convert(spec, components);
Components protoComponents = components.toComponents();

// Sanity checks
assertThat(converted.getInputsCount(), equalTo(spec.getTransform().getInputs().size()));
assertThat(converted.getOutputsCount(), equalTo(spec.getTransform().getOutputs().size()));
assertThat(converted.getSubtransformsCount(), equalTo(spec.getChildren().size()));

assertThat(converted.getUniqueName(), equalTo(spec.getTransform().getFullName()));
for (PValue inputValue : spec.getTransform().getInputs().values()) {
PCollection<?> inputPc = (PCollection<?>) inputValue;
protoComponents.getPcollectionsOrThrow(components.registerPCollection(inputPc));
}
for (PValue outputValue : spec.getTransform().getOutputs().values()) {
PCollection<?> outputPc = (PCollection<?>) outputValue;
protoComponents.getPcollectionsOrThrow(components.registerPCollection(outputPc));
}
}

private RunnerApi.PTransform convert(ToAndFromProtoSpec spec, SdkComponents components)
throws IOException {
List<AppliedPTransform<?, ?, ?>> childTransforms = new ArrayList<>();
for (ToAndFromProtoSpec child : spec.getChildren()) {
childTransforms.add(child.getTransform());
System.out.println("Converting child " + child);
convert(child, components);
// Sanity call
components.getExistingPTransformId(child.getTransform());
}
PTransform convert = PTransforms.toProto(spec.getTransform(), childTransforms, components);
// Make sure the converted transform is registered. Convert it independently, but if this is a
// child spec, the child must be in the components.
components.registerPTransform(spec.getTransform(), childTransforms);
return convert;
}

private static class TestDoFn extends DoFn<Long, KV<Long, String>> {
// Exists to stop the ParDo application from throwing
@ProcessElement public void process(ProcessContext context) {}
}

private static AppliedPTransform<?, ?, ?> countingInput(Pipeline pipeline) {
UnboundedCountingInput input = CountingInput.unbounded();
PCollection<Long> pcollection = pipeline.apply(input);
return AppliedPTransform.<PBegin, PCollection<Long>, UnboundedCountingInput>of(
"Count", pipeline.begin().expand(), pcollection.expand(), input, pipeline);
}

private static AppliedPTransform<?, ?, ?> read(Pipeline pipeline) {
Read.Unbounded<Long> transform = Read.from(CountingSource.unbounded());
PCollection<Long> pcollection = pipeline.apply(transform);
return AppliedPTransform.<PBegin, PCollection<Long>, Read.Unbounded<Long>>of(
"ReadTheCount", pipeline.begin().expand(), pcollection.expand(), transform, pipeline);
}

private static AppliedPTransform<?, ?, ?> multiMultiParDo(Pipeline pipeline) {
PCollectionView<String> view =
pipeline.apply(Create.of("foo")).apply(View.<String>asSingleton());
PCollection<Long> input = pipeline.apply(CountingInput.unbounded());
ParDo.MultiOutput<Long, KV<Long, String>> parDo =
ParDo.of(new TestDoFn())
.withSideInputs(view)
.withOutputTags(
new TupleTag<KV<Long, String>>() {},
TupleTagList.of(new TupleTag<KV<String, Long>>() {}));
PCollectionTuple output = input.apply(parDo);

Map<TupleTag<?>, PValue> inputs = new HashMap<>();
inputs.putAll(parDo.getAdditionalInputs());
inputs.putAll(input.expand());

return AppliedPTransform
.<PCollection<Long>, PCollectionTuple, ParDo.MultiOutput<Long, KV<Long, String>>>of(
"MultiParDoInAndOut", inputs, output.expand(), parDo, pipeline);
}
}
Loading