Skip to content

Commit

Permalink
Merge 1af755f into 2982238
Browse files Browse the repository at this point in the history
  • Loading branch information
tgroh committed Feb 21, 2017
2 parents 2982238 + 1af755f commit 153e374
Show file tree
Hide file tree
Showing 11 changed files with 551 additions and 187 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.beam.runners.core.construction;

import static com.google.common.base.Preconditions.checkArgument;

import java.util.List;
import java.util.Map;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.VoidCoder;
import org.apache.beam.sdk.runners.PTransformOverrideFactory;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.Flatten;
import org.apache.beam.sdk.transforms.Flatten.FlattenPCollectionList;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionList;
import org.apache.beam.sdk.values.PValue;
import org.apache.beam.sdk.values.TaggedPValue;

/**
* A {@link PTransformOverrideFactory} that provides an empty {@link Create} to replace a {@link
* Flatten.FlattenPCollectionList} that takes no input {@link PCollection PCollections}.
*/
public class EmptyFlattenAsCreateFactory<T>
implements PTransformOverrideFactory<
PCollectionList<T>, PCollection<T>, Flatten.FlattenPCollectionList<T>> {
private static final EmptyFlattenAsCreateFactory<Object> INSTANCE =
new EmptyFlattenAsCreateFactory<>();

public static <T> EmptyFlattenAsCreateFactory<T> instance() {
return (EmptyFlattenAsCreateFactory<T>) INSTANCE;
}

private EmptyFlattenAsCreateFactory() {}

@Override
public PTransform<PCollectionList<T>, PCollection<T>> getReplacementTransform(
FlattenPCollectionList<T> transform) {
return (PTransform) Create.empty(VoidCoder.of());
}

@Override
public PCollectionList<T> getInput(
List<TaggedPValue> inputs, Pipeline p) {
checkArgument(
inputs.isEmpty(), "Must have an empty input to use %s", getClass().getSimpleName());
return PCollectionList.empty(p);
}

@Override
public Map<PValue, ReplacementOutput> mapOutputs(
List<TaggedPValue> outputs, PCollection<T> newOutput) {
return ReplacementOutputs.singleton(outputs, newOutput);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.beam.runners.core.construction;

import com.google.common.collect.Iterables;
import java.util.List;
import java.util.Map;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.runners.PTransformOverrideFactory;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.values.PValue;
import org.apache.beam.sdk.values.TaggedPValue;

/**
* A {@link PTransformOverrideFactory} which consumes from a {@link PValue} and produces a
* {@link PValue}. {@link #getInput(List, Pipeline)} and {@link #mapOutputs(List, PValue)} are
* implemented.
*/
public abstract class SingleInputOutputOverrideFactory<
InputT extends PValue,
OutputT extends PValue,
TransformT extends PTransform<InputT, OutputT>>
implements PTransformOverrideFactory<InputT, OutputT, TransformT> {
@Override
public final InputT getInput(List<TaggedPValue> inputs, Pipeline p) {
return (InputT) Iterables.getOnlyElement(inputs).getValue();
}

@Override
public final Map<PValue, ReplacementOutput> mapOutputs(
List<TaggedPValue> outputs, OutputT newOutput) {
return ReplacementOutputs.singleton(outputs, newOutput);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -60,14 +60,12 @@ public PTransform<InputT, OutputT> getReplacementTransform(TransformT transform)
}

@Override
public InputT getInput(
List<TaggedPValue> inputs, Pipeline p) {
public InputT getInput(List<TaggedPValue> inputs, Pipeline p) {
throw new UnsupportedOperationException(message);
}

@Override
public Map<PValue, ReplacementOutput> mapOutputs(
List<TaggedPValue> outputs, OutputT newOutput) {
public Map<PValue, ReplacementOutput> mapOutputs(List<TaggedPValue> outputs, OutputT newOutput) {
throw new UnsupportedOperationException(message);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.beam.runners.core.construction;

import static org.junit.Assert.assertThat;

import com.google.common.collect.Iterables;
import java.io.Serializable;
import java.util.Map;
import org.apache.beam.sdk.runners.PTransformOverrideFactory.ReplacementOutput;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionList;
import org.apache.beam.sdk.values.PValue;
import org.hamcrest.Matchers;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

/** Tests for {@link SingleInputOutputOverrideFactory}. */
@RunWith(JUnit4.class)
public class SingleInputOutputOverrideFactoryTest implements Serializable {
@Rule public transient ExpectedException thrown = ExpectedException.none();

@Rule
public transient TestPipeline pipeline =
TestPipeline.create().enableAbandonedNodeEnforcement(false);

private transient SingleInputOutputOverrideFactory<
PCollection<? extends Integer>, PCollection<Integer>, MapElements<Integer, Integer>>
factory =
new SingleInputOutputOverrideFactory<
PCollection<? extends Integer>, PCollection<Integer>,
MapElements<Integer, Integer>>() {
@Override
public PTransform<PCollection<? extends Integer>, PCollection<Integer>>
getReplacementTransform(MapElements<Integer, Integer> transform) {
return transform;
}
};

private SimpleFunction<Integer, Integer> fn = new SimpleFunction<Integer, Integer>() {
@Override
public Integer apply(Integer input) {
return input - 1;
}
};

@Test
public void testGetInput() {
PCollection<Integer> input = pipeline.apply(Create.of(1, 2, 3));
assertThat(
factory.getInput(input.expand(), pipeline),
Matchers.<PCollection<? extends Integer>>equalTo(input));
}

@Test
public void testGetInputMultipleInputsFails() {
PCollection<Integer> input = pipeline.apply(Create.of(1, 2, 3));
PCollection<Integer> otherInput = pipeline.apply("OtherCreate", Create.of(1, 2, 3));

thrown.expect(IllegalArgumentException.class);
factory.getInput(PCollectionList.of(input).and(otherInput).expand(), pipeline);
}

@Test
public void testMapOutputs() {
PCollection<Integer> input = pipeline.apply(Create.of(1, 2, 3));
PCollection<Integer> output = input.apply("Map", MapElements.via(fn));
PCollection<Integer> reappliedOutput = input.apply("ReMap", MapElements.via(fn));
Map<PValue, ReplacementOutput> replacementMap =
factory.mapOutputs(output.expand(), reappliedOutput);
assertThat(
replacementMap,
Matchers.<PValue, ReplacementOutput>hasEntry(
reappliedOutput,
ReplacementOutput.of(
Iterables.getOnlyElement(output.expand()),
Iterables.getOnlyElement(reappliedOutput.expand()))));
}

@Test
public void testMapOutputsMultipleOriginalOutputsFails() {
PCollection<Integer> input = pipeline.apply(Create.of(1, 2, 3));
PCollection<Integer> output = input.apply("Map", MapElements.via(fn));
PCollection<Integer> reappliedOutput = input.apply("ReMap", MapElements.via(fn));
thrown.expect(IllegalArgumentException.class);
Map<PValue, ReplacementOutput> replacementMap =
factory.mapOutputs(
PCollectionList.of(output).and(input).and(reappliedOutput).expand(), reappliedOutput);
}
}
9 changes: 9 additions & 0 deletions runners/google-cloud-dataflow-java/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,11 @@
<artifactId>beam-sdks-java-core</artifactId>
</dependency>

<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-runners-core-construction-java</artifactId>
</dependency>

<dependency>
<groupId>com.google.api-client</groupId>
<artifactId>google-api-client</artifactId>
Expand Down Expand Up @@ -349,5 +354,9 @@
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-runners-core-construction-java</artifactId>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
import java.util.concurrent.atomic.AtomicLong;
import javax.annotation.Nullable;
import org.apache.beam.runners.dataflow.BatchViewOverrides.GroupByKeyAndSortValuesOnly;
import org.apache.beam.runners.dataflow.DataflowRunner.CombineGroupedValues;
import org.apache.beam.runners.dataflow.TransformTranslator.StepTranslationContext;
import org.apache.beam.runners.dataflow.TransformTranslator.TranslationContext;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
Expand All @@ -72,7 +73,6 @@
import org.apache.beam.sdk.runners.TransformHierarchy;
import org.apache.beam.sdk.transforms.AppliedPTransform;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.Combine.GroupedValues;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.Flatten;
import org.apache.beam.sdk.transforms.GroupByKey;
Expand Down Expand Up @@ -405,20 +405,12 @@ public String getFullName(PTransform<?, ?> transform) {
return currentTransform;
}


@Override
public void leaveCompositeTransform(TransformHierarchy.Node node) {
}

@Override
public void visitPrimitiveTransform(TransformHierarchy.Node node) {
PTransform<?, ?> transform = node.getTransform();
TransformTranslator translator =
getTransformTranslator(transform.getClass());
if (translator == null) {
throw new IllegalStateException(
"no translator registered for " + transform);
}
TransformTranslator translator = getTransformTranslator(transform.getClass());
checkState(
translator != null, "no translator registered for primitive transform %s", transform);
LOG.debug("Translating {}", transform);
currentTransform = node.toAppliedPTransform();
translator.translate(transform, this);
Expand Down Expand Up @@ -718,32 +710,36 @@ private <ElemT, ViewT> void translateTyped(
});

DataflowPipelineTranslator.registerTransformTranslator(
Combine.GroupedValues.class,
new TransformTranslator<GroupedValues>() {
DataflowRunner.CombineGroupedValues.class,
new TransformTranslator<CombineGroupedValues>() {
@Override
public void translate(
Combine.GroupedValues transform,
TranslationContext context) {
public void translate(CombineGroupedValues transform, TranslationContext context) {
translateHelper(transform, context);
}

private <K, InputT, OutputT> void translateHelper(
final Combine.GroupedValues<K, InputT, OutputT> transform,
final CombineGroupedValues<K, InputT, OutputT> primitiveTransform,
TranslationContext context) {
StepTranslationContext stepContext = context.addStep(transform, "CombineValues");
Combine.GroupedValues<K, InputT, OutputT> originalTransform =
primitiveTransform.getOriginalCombine();
StepTranslationContext stepContext =
context.addStep(primitiveTransform, "CombineValues");
translateInputs(
stepContext, context.getInput(transform), transform.getSideInputs(), context);
stepContext,
context.getInput(primitiveTransform),
originalTransform.getSideInputs(),
context);

AppliedCombineFn<? super K, ? super InputT, ?, OutputT> fn =
transform.getAppliedFn(
context.getInput(transform).getPipeline().getCoderRegistry(),
context.getInput(transform).getCoder(),
context.getInput(transform).getWindowingStrategy());
originalTransform.getAppliedFn(
context.getInput(primitiveTransform).getPipeline().getCoderRegistry(),
context.getInput(primitiveTransform).getCoder(),
context.getInput(primitiveTransform).getWindowingStrategy());

stepContext.addEncodingInput(fn.getAccumulatorCoder());
stepContext.addInput(
PropertyNames.SERIALIZED_FN, byteArrayToJsonString(serializeToByteArray(fn)));
stepContext.addOutput(context.getOutput(transform));
stepContext.addOutput(context.getOutput(primitiveTransform));
}
});

Expand Down

0 comments on commit 153e374

Please sign in to comment.