Skip to content

Commit

Permalink
Merge 580243e into 9335738
Browse files Browse the repository at this point in the history
  • Loading branch information
tgroh committed Feb 17, 2017
2 parents 9335738 + 580243e commit 39c7e48
Show file tree
Hide file tree
Showing 13 changed files with 2,931 additions and 2,321 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.beam.runners.core.construction;

import static com.google.common.base.Preconditions.checkArgument;

import java.util.List;
import java.util.Map;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.VoidCoder;
import org.apache.beam.sdk.runners.PTransformOverrideFactory;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.Flatten;
import org.apache.beam.sdk.transforms.Flatten.FlattenPCollectionList;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionList;
import org.apache.beam.sdk.values.PValue;
import org.apache.beam.sdk.values.TaggedPValue;

/**
* A {@link PTransformOverrideFactory} that provides an empty {@link Create} to replace a {@link
* Flatten.FlattenPCollectionList} that takes no input {@link PCollection PCollections}.
*/
public class EmptyFlattenAsCreateFactory<T>
implements PTransformOverrideFactory<
PCollectionList<T>, PCollection<T>, Flatten.FlattenPCollectionList<T>> {
private static final EmptyFlattenAsCreateFactory<Object> INSTANCE =
new EmptyFlattenAsCreateFactory<>();

public static <T> EmptyFlattenAsCreateFactory<T> instance() {
return (EmptyFlattenAsCreateFactory<T>) INSTANCE;
}

private EmptyFlattenAsCreateFactory() {}

@Override
public PTransform<PCollectionList<T>, PCollection<T>> getReplacementTransform(
FlattenPCollectionList<T> transform) {
return (PTransform) Create.empty(VoidCoder.of());
}

@Override
public PCollectionList<T> getInput(
List<TaggedPValue> inputs, Pipeline p) {
checkArgument(
inputs.isEmpty(), "Must have an empty input to use %s", getClass().getSimpleName());
return PCollectionList.empty(p);
}

@Override
public Map<PValue, ReplacementOutput> mapOutputs(
List<TaggedPValue> outputs, PCollection<T> newOutput) {
return ReplacementOutputs.singleton(outputs, newOutput);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.beam.runners.core.construction;

import com.google.common.collect.Iterables;
import java.util.List;
import java.util.Map;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.runners.PTransformOverrideFactory;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.values.PValue;
import org.apache.beam.sdk.values.TaggedPValue;

/**
* A {@link PTransformOverrideFactory} which consumes from a {@link PValue} and produces a
* {@link PValue}. {@link #getInput(List, Pipeline)} and {@link #mapOutputs(List, PValue)} are
* implemented.
*/
public abstract class SingleInputOutputOverrideFactory<
InputT extends PValue,
OutputT extends PValue,
TransformT extends PTransform<InputT, OutputT>>
implements PTransformOverrideFactory<InputT, OutputT, TransformT> {
@Override
public final InputT getInput(List<TaggedPValue> inputs, Pipeline p) {
return (InputT) Iterables.getOnlyElement(inputs).getValue();
}

@Override
public final Map<PValue, ReplacementOutput> mapOutputs(
List<TaggedPValue> outputs, OutputT newOutput) {
return ReplacementOutputs.singleton(outputs, newOutput);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -60,14 +60,12 @@ public PTransform<InputT, OutputT> getReplacementTransform(TransformT transform)
}

@Override
public InputT getInput(
List<TaggedPValue> inputs, Pipeline p) {
public InputT getInput(List<TaggedPValue> inputs, Pipeline p) {
throw new UnsupportedOperationException(message);
}

@Override
public Map<PValue, ReplacementOutput> mapOutputs(
List<TaggedPValue> outputs, OutputT newOutput) {
public Map<PValue, ReplacementOutput> mapOutputs(List<TaggedPValue> outputs, OutputT newOutput) {
throw new UnsupportedOperationException(message);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.beam.runners.core.construction;

import static org.junit.Assert.assertThat;

import com.google.common.collect.Iterables;
import java.io.Serializable;
import java.util.Map;
import org.apache.beam.sdk.runners.PTransformOverrideFactory.ReplacementOutput;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionList;
import org.apache.beam.sdk.values.PValue;
import org.hamcrest.Matchers;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

/** Tests for {@link SingleInputOutputOverrideFactory}. */
@RunWith(JUnit4.class)
public class SingleInputOutputOverrideFactoryTest implements Serializable {
@Rule public transient ExpectedException thrown = ExpectedException.none();

@Rule
public transient TestPipeline pipeline =
TestPipeline.create().enableAbandonedNodeEnforcement(false);

private transient SingleInputOutputOverrideFactory<
PCollection<? extends Integer>, PCollection<Integer>, MapElements<Integer, Integer>>
factory =
new SingleInputOutputOverrideFactory<
PCollection<? extends Integer>, PCollection<Integer>,
MapElements<Integer, Integer>>() {
@Override
public PTransform<PCollection<? extends Integer>, PCollection<Integer>>
getReplacementTransform(MapElements<Integer, Integer> transform) {
return transform;
}
};

private SimpleFunction<Integer, Integer> fn = new SimpleFunction<Integer, Integer>() {
@Override
public Integer apply(Integer input) {
return input - 1;
}
};

@Test
public void testGetInput() {
PCollection<Integer> input = pipeline.apply(Create.of(1, 2, 3));
assertThat(
factory.getInput(input.expand(), pipeline),
Matchers.<PCollection<? extends Integer>>equalTo(input));
}

@Test
public void testGetInputMultipleInputsFails() {
PCollection<Integer> input = pipeline.apply(Create.of(1, 2, 3));
PCollection<Integer> otherInput = pipeline.apply("OtherCreate", Create.of(1, 2, 3));

thrown.expect(IllegalArgumentException.class);
factory.getInput(PCollectionList.of(input).and(otherInput).expand(), pipeline);
}

@Test
public void testMapOutputs() {
PCollection<Integer> input = pipeline.apply(Create.of(1, 2, 3));
PCollection<Integer> output = input.apply("Map", MapElements.via(fn));
PCollection<Integer> reappliedOutput = input.apply("ReMap", MapElements.via(fn));
Map<PValue, ReplacementOutput> replacementMap =
factory.mapOutputs(output.expand(), reappliedOutput);
assertThat(
replacementMap,
Matchers.<PValue, ReplacementOutput>hasEntry(
reappliedOutput,
ReplacementOutput.of(
Iterables.getOnlyElement(output.expand()),
Iterables.getOnlyElement(reappliedOutput.expand()))));
}

@Test
public void testMapOutputsMultipleOriginalOutputsFails() {
PCollection<Integer> input = pipeline.apply(Create.of(1, 2, 3));
PCollection<Integer> output = input.apply("Map", MapElements.via(fn));
PCollection<Integer> reappliedOutput = input.apply("ReMap", MapElements.via(fn));
thrown.expect(IllegalArgumentException.class);
Map<PValue, ReplacementOutput> replacementMap =
factory.mapOutputs(
PCollectionList.of(output).and(input).and(reappliedOutput).expand(), reappliedOutput);
}
}
9 changes: 9 additions & 0 deletions runners/google-cloud-dataflow-java/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,11 @@
<artifactId>beam-sdks-java-core</artifactId>
</dependency>

<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-runners-core-construction-java</artifactId>
</dependency>

<dependency>
<groupId>com.google.api-client</groupId>
<artifactId>google-api-client</artifactId>
Expand Down Expand Up @@ -349,5 +354,9 @@
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-runners-core-construction-java</artifactId>
</dependency>
</dependencies>
</project>

0 comments on commit 39c7e48

Please sign in to comment.