From d81477f1d63412456dc9aa517f6a30b729af87f3 Mon Sep 17 00:00:00 2001 From: Pei He Date: Fri, 17 Jun 2016 15:39:59 -0700 Subject: [PATCH 1/2] Remove Dataflow runner references in WordCount examples. --- examples/java/pom.xml | 33 +++++++++++-------- .../beam/examples/MinimalWordCount.java | 26 ++++++++++----- examples/java8/pom.xml | 27 +++++++++++---- .../beam/examples/MinimalWordCountJava8.java | 30 +++++++++-------- 4 files changed, 74 insertions(+), 42 deletions(-) diff --git a/examples/java/pom.xml b/examples/java/pom.xml index 51678108581e..bb54c3e47fc5 100644 --- a/examples/java/pom.xml +++ b/examples/java/pom.xml @@ -220,19 +220,6 @@ beam-sdks-java-core - - org.apache.beam - beam-runners-direct-java - ${project.version} - runtime - - - - org.apache.beam - beam-runners-google-cloud-dataflow-java - ${project.version} - - com.google.api-client google-api-client @@ -288,6 +275,26 @@ slf4j-api + + org.apache.beam + beam-runners-direct-java + ${project.version} + runtime + + + + org.apache.beam + beam-runners-google-cloud-dataflow-java + ${project.version} + + + + org.apache.beam + beam-runners-flink_2.10 + ${project.version} + runtime + + org.slf4j slf4j-jdk14 diff --git a/examples/java/src/main/java/org/apache/beam/examples/MinimalWordCount.java b/examples/java/src/main/java/org/apache/beam/examples/MinimalWordCount.java index 6d4bfd4702a4..355a1ff70adb 100644 --- a/examples/java/src/main/java/org/apache/beam/examples/MinimalWordCount.java +++ b/examples/java/src/main/java/org/apache/beam/examples/MinimalWordCount.java @@ -17,10 +17,9 @@ */ package org.apache.beam.examples; -import org.apache.beam.runners.dataflow.BlockingDataflowRunner; -import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.io.TextIO; +import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.transforms.Count; import org.apache.beam.sdk.transforms.DoFn; @@ -63,13 +62,22 @@ public static void main(String[] args) { // Create a PipelineOptions object. This object lets us set various execution // options for our pipeline, such as the associated Cloud Platform project and the location // in Google Cloud Storage to stage files. - DataflowPipelineOptions options = PipelineOptionsFactory.create() - .as(DataflowPipelineOptions.class); - options.setRunner(BlockingDataflowRunner.class); - // CHANGE 1/3: Your project ID is required in order to run your pipeline on the Google Cloud. - options.setProject("SET_YOUR_PROJECT_ID_HERE"); - // CHANGE 2/3: Your Google Cloud Storage path is required for staging local files. - options.setTempLocation("gs://SET_YOUR_BUCKET_NAME_HERE/AND_TEMP_DIRECTORY"); + PipelineOptions options = PipelineOptionsFactory.create(); + + // In order to run your pipeline, you need to make following runner specific changes: + // + // CHANGE 1/3: Select a Beam runner, such as BlockingDataflowRunner + // or FlinkPipelineRunner. + // CHANGE 2/3: Specify runner-required options. + // For BlockingDataflowRunner, set project and temp location as follows: + // DataflowPipelineOptions dataflowOptions = options.as(DataflowPipelineOptions.class); + // dataflowOptions.setRunner(BlockingDataflowRunner.class); + // dataflowOptions.setProject("SET_YOUR_PROJECT_ID_HERE"); + // dataflowOptions.setTempLocation("gs://SET_YOUR_BUCKET_NAME_HERE/AND_TEMP_DIRECTORY"); + // For FlinkPipelineRunner, set the runner as follows. See {@code FlinkPipelineOptions} + // for more details. + // options.as(FlinkPipelineOptions.class) + // .setRunner(FlinkPipelineRunner.class); // Create the Pipeline object with the options we defined above. Pipeline p = Pipeline.create(options); diff --git a/examples/java8/pom.xml b/examples/java8/pom.xml index 08b811bc96e1..ed5b48e925ce 100644 --- a/examples/java8/pom.xml +++ b/examples/java8/pom.xml @@ -116,12 +116,6 @@ ${project.version} - - org.apache.beam - beam-runners-google-cloud-dataflow-java - ${project.version} - - com.google.guava guava @@ -185,6 +179,25 @@ google-api-client - + + org.apache.beam + beam-runners-direct-java + ${project.version} + runtime + + + org.apache.beam + beam-runners-google-cloud-dataflow-java + ${project.version} + runtime + + + + org.apache.beam + beam-runners-flink_2.10 + ${project.version} + runtime + + diff --git a/examples/java8/src/main/java/org/apache/beam/examples/MinimalWordCountJava8.java b/examples/java8/src/main/java/org/apache/beam/examples/MinimalWordCountJava8.java index cf3bbf955fb5..6362b962ea32 100644 --- a/examples/java8/src/main/java/org/apache/beam/examples/MinimalWordCountJava8.java +++ b/examples/java8/src/main/java/org/apache/beam/examples/MinimalWordCountJava8.java @@ -17,10 +17,9 @@ */ package org.apache.beam.examples; -import org.apache.beam.runners.dataflow.BlockingDataflowRunner; -import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.io.TextIO; +import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.transforms.Count; import org.apache.beam.sdk.transforms.Filter; @@ -39,16 +38,21 @@ public class MinimalWordCountJava8 { public static void main(String[] args) { - DataflowPipelineOptions options = PipelineOptionsFactory.create() - .as(DataflowPipelineOptions.class); - - options.setRunner(BlockingDataflowRunner.class); - - // CHANGE 1 of 3: Your project ID is required in order to run your pipeline on the Google Cloud. - options.setProject("SET_YOUR_PROJECT_ID_HERE"); - - // CHANGE 2 of 3: Your Google Cloud Storage path is required for staging local files. - options.setStagingLocation("gs://SET_YOUR_BUCKET_NAME_HERE/AND_STAGING_DIRECTORY"); + PipelineOptions options = PipelineOptionsFactory.create(); + // In order to run your pipeline, you need to make following runner specific changes: + // + // CHANGE 1/3: Select a Beam runner, such as BlockingDataflowRunner + // or FlinkPipelineRunner. + // CHANGE 2/3: Specify runner-required options. + // For BlockingDataflowRunner, set project and temp location as follows: + // DataflowPipelineOptions dataflowOptions = options.as(DataflowPipelineOptions.class); + // dataflowOptions.setRunner(BlockingDataflowRunner.class); + // dataflowOptions.setProject("SET_YOUR_PROJECT_ID_HERE"); + // dataflowOptions.setTempLocation("gs://SET_YOUR_BUCKET_NAME_HERE/AND_TEMP_DIRECTORY"); + // For FlinkPipelineRunner, set the runner as follows. See {@code FlinkPipelineOptions} + // for more details. + // options.as(FlinkPipelineOptions.class) + // .setRunner(FlinkPipelineRunner.class); Pipeline p = Pipeline.create(options); @@ -61,7 +65,7 @@ public static void main(String[] args) { .via((KV wordCount) -> wordCount.getKey() + ": " + wordCount.getValue()) .withOutputType(TypeDescriptors.strings())) - // CHANGE 3 of 3: The Google Cloud Storage path is required for outputting the results to. + // CHANGE 3/3: The Google Cloud Storage path is required for outputting the results to. .apply(TextIO.Write.to("gs://YOUR_OUTPUT_BUCKET/AND_OUTPUT_PREFIX")); p.run(); From 924503fc1fc000f18efb32b866dac91038244dd3 Mon Sep 17 00:00:00 2001 From: Pei He Date: Tue, 21 Jun 2016 11:02:18 -0700 Subject: [PATCH 2/2] Fix build by removing flink and dataflow dependency. --- examples/java/pom.xml | 7 ------- examples/java8/pom.xml | 7 ------- 2 files changed, 14 deletions(-) diff --git a/examples/java/pom.xml b/examples/java/pom.xml index bb54c3e47fc5..4f8667671806 100644 --- a/examples/java/pom.xml +++ b/examples/java/pom.xml @@ -288,13 +288,6 @@ ${project.version} - - org.apache.beam - beam-runners-flink_2.10 - ${project.version} - runtime - - org.slf4j slf4j-jdk14 diff --git a/examples/java8/pom.xml b/examples/java8/pom.xml index ed5b48e925ce..82b1c4662754 100644 --- a/examples/java8/pom.xml +++ b/examples/java8/pom.xml @@ -186,13 +186,6 @@ runtime - - org.apache.beam - beam-runners-google-cloud-dataflow-java - ${project.version} - runtime - - org.apache.beam beam-runners-flink_2.10