From ca37bc3429075f3c7803c949602169e0f8ab1da6 Mon Sep 17 00:00:00 2001 From: Pei He Date: Fri, 24 Jun 2016 15:22:59 -0700 Subject: [PATCH] Make beam-examples run with spark runner --- examples/java/pom.xml | 14 ++++++++++++ .../org/apache/beam/examples/TfIdfIT.java | 22 ++++++++++++------- runners/spark/pom.xml | 11 ---------- .../beam/runners/spark/io/NumShardsTest.java | 20 ++++++++++++----- 4 files changed, 42 insertions(+), 25 deletions(-) rename runners/spark/src/test/java/org/apache/beam/runners/spark/TfIdfTest.java => examples/java/src/test/java/org/apache/beam/examples/TfIdfIT.java (76%) diff --git a/examples/java/pom.xml b/examples/java/pom.xml index 3b67797431c9..57acec3f4787 100644 --- a/examples/java/pom.xml +++ b/examples/java/pom.xml @@ -270,12 +270,26 @@ runtime + + org.apache.beam + beam-runners-flink_2.10 + ${project.version} + runtime + + org.apache.beam beam-runners-google-cloud-dataflow-java ${project.version} + + org.apache.beam + beam-runners-spark + ${project.version} + runtime + + org.slf4j slf4j-jdk14 diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/TfIdfTest.java b/examples/java/src/test/java/org/apache/beam/examples/TfIdfIT.java similarity index 76% rename from runners/spark/src/test/java/org/apache/beam/runners/spark/TfIdfTest.java rename to examples/java/src/test/java/org/apache/beam/examples/TfIdfIT.java index df78338d4269..45b1ca23e10b 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/TfIdfTest.java +++ b/examples/java/src/test/java/org/apache/beam/examples/TfIdfIT.java @@ -16,13 +16,17 @@ * limitations under the License. */ -package org.apache.beam.runners.spark; +package org.apache.beam.examples; import org.apache.beam.examples.complete.TfIdf; +import org.apache.beam.runners.spark.EvaluationResult; +import org.apache.beam.runners.spark.SparkPipelineOptions; +import org.apache.beam.runners.spark.SparkRunner; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.coders.StringDelegateCoder; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.Keys; import org.apache.beam.sdk.transforms.RemoveDuplicates; @@ -30,20 +34,23 @@ import org.apache.beam.sdk.values.PCollection; import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; import java.net.URI; import java.util.Arrays; /** - * A test based on {@code TfIdf} from the SDK. + * End-to-end tests of {@link TfIdf}. */ -public class TfIdfTest { +@RunWith(JUnit4.class) +public class TfIdfIT { @Test - public void testTfIdf() throws Exception { - SparkPipelineOptions opts = PipelineOptionsFactory.as(SparkPipelineOptions.class); - opts.setRunner(SparkRunner.class); - Pipeline pipeline = Pipeline.create(opts); + public void testE2ETfIdfSpark() throws Exception { + SparkPipelineOptions options = PipelineOptionsFactory.as(SparkPipelineOptions.class); + options.setRunner(SparkRunner.class); + Pipeline pipeline = TestPipeline.create(options); pipeline.getCoderRegistry().registerCoder(URI.class, StringDelegateCoder.of(URI.class)); @@ -63,5 +70,4 @@ public void testTfIdf() throws Exception { EvaluationResult res = SparkRunner.create().run(pipeline); res.close(); } - } diff --git a/runners/spark/pom.xml b/runners/spark/pom.xml index 94c42bd663f6..66215deb1355 100644 --- a/runners/spark/pom.xml +++ b/runners/spark/pom.xml @@ -221,17 +221,6 @@ - - org.apache.beam - beam-examples-java - - - - org.slf4j - slf4j-jdk14 - - - org.apache.avro avro-mapred diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/NumShardsTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/NumShardsTest.java index b4268d6127c1..dbee4ac1abef 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/NumShardsTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/NumShardsTest.java @@ -21,7 +21,6 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; -import org.apache.beam.examples.WordCount; import org.apache.beam.runners.spark.EvaluationResult; import org.apache.beam.runners.spark.SparkPipelineOptions; import org.apache.beam.runners.spark.SparkRunner; @@ -29,8 +28,11 @@ import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.io.TextIO; import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.transforms.Count; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.MapElements; +import org.apache.beam.sdk.transforms.SimpleFunction; +import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; import com.google.common.base.Charsets; @@ -55,8 +57,7 @@ public class NumShardsTest { private static final String[] WORDS_ARRAY = { - "hi there", "hi", "hi sue bob", - "hi sue", "", "bob hi"}; + "hi", "there", "hi", "hi", "sue", "bob", "hi", "sue", "", "bob", "hi"}; private static final List WORDS = Arrays.asList(WORDS_ARRAY); private File outputDir; @@ -70,14 +71,22 @@ public void setUp() throws IOException { outputDir.delete(); } + /** A SimpleFunction that converts a Word and Count into a printable string. */ + private static class FormatAsTextFn extends SimpleFunction, String> { + @Override + public String apply(KV input) { + return input.getKey() + ": " + input.getValue(); + } + } + @Test public void testText() throws Exception { SparkPipelineOptions options = PipelineOptionsFactory.as(SparkPipelineOptions.class); options.setRunner(SparkRunner.class); Pipeline p = Pipeline.create(options); PCollection inputWords = p.apply(Create.of(WORDS).withCoder(StringUtf8Coder.of())); - PCollection output = inputWords.apply(new WordCount.CountWords()) - .apply(MapElements.via(new WordCount.FormatAsTextFn())); + PCollection output = inputWords.apply(Count.perElement()) + .apply(MapElements.via(new FormatAsTextFn())); output.apply(TextIO.Write.to(outputDir.getAbsolutePath()).withNumShards(3).withSuffix(".txt")); EvaluationResult res = SparkRunner.create().run(p); res.close(); @@ -97,5 +106,4 @@ public void testText() throws Exception { assertEquals(3, count); assertTrue(expected.isEmpty()); } - }