From c05c6fbf4bad59cb403b7cfbea880afc22f9ba8f Mon Sep 17 00:00:00 2001 From: Pei He Date: Thu, 7 Jul 2016 13:45:24 -0700 Subject: [PATCH 1/2] Rename DataflowExampleUtils and DataflowExampleOptions --- .../apache/beam/examples/WindowedWordCount.java | 11 ++++------- ...flowExampleOptions.java => ExampleOptions.java} | 4 ++-- ...DataflowExampleUtils.java => ExampleUtils.java} | 14 +++++++------- .../beam/examples/complete/AutoComplete.java | 4 ++-- .../examples/complete/StreamingWordExtract.java | 4 ++-- .../beam/examples/complete/TrafficMaxLaneFlow.java | 8 ++++---- .../beam/examples/complete/TrafficRoutes.java | 8 ++++---- .../beam/examples/cookbook/TriggerExample.java | 8 ++++---- .../beam/examples/complete/game/GameStats.java | 4 ++-- .../beam/examples/complete/game/LeaderBoard.java | 8 ++++---- 10 files changed, 35 insertions(+), 38 deletions(-) rename examples/java/src/main/java/org/apache/beam/examples/common/{DataflowExampleOptions.java => ExampleOptions.java} (91%) rename examples/java/src/main/java/org/apache/beam/examples/common/{DataflowExampleUtils.java => ExampleUtils.java} (97%) diff --git a/examples/java/src/main/java/org/apache/beam/examples/WindowedWordCount.java b/examples/java/src/main/java/org/apache/beam/examples/WindowedWordCount.java index d9dc26d77b05..b392985a18e5 100644 --- a/examples/java/src/main/java/org/apache/beam/examples/WindowedWordCount.java +++ b/examples/java/src/main/java/org/apache/beam/examples/WindowedWordCount.java @@ -17,9 +17,9 @@ */ package org.apache.beam.examples; -import org.apache.beam.examples.common.DataflowExampleOptions; -import org.apache.beam.examples.common.DataflowExampleUtils; import org.apache.beam.examples.common.ExampleBigQueryTableOptions; +import org.apache.beam.examples.common.ExampleOptions; +import org.apache.beam.examples.common.ExampleUtils; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.PipelineResult; import org.apache.beam.sdk.io.BigQueryIO; @@ -41,8 +41,6 @@ import org.joda.time.Duration; import org.joda.time.Instant; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.ArrayList; @@ -102,7 +100,6 @@ * and then exits. */ public class WindowedWordCount { - private static final Logger LOG = LoggerFactory.getLogger(WindowedWordCount.class); static final int WINDOW_SIZE = 1; // Default window duration in minutes /** @@ -179,7 +176,7 @@ private static TableReference getTableReference(Options options) { * specification of the input file. */ public static interface Options extends WordCount.WordCountOptions, - DataflowExampleOptions, ExampleBigQueryTableOptions { + ExampleOptions, ExampleBigQueryTableOptions { @Description("Fixed window duration, in minutes") @Default.Integer(WINDOW_SIZE) Integer getWindowSize(); @@ -195,7 +192,7 @@ public static void main(String[] args) throws IOException { options.setBigQuerySchema(getSchema()); // DataflowExampleUtils creates the necessary input sources to simplify execution of this // Pipeline. - DataflowExampleUtils exampleDataflowUtils = new DataflowExampleUtils(options, + ExampleUtils exampleDataflowUtils = new ExampleUtils(options, options.isUnbounded()); Pipeline pipeline = Pipeline.create(options); diff --git a/examples/java/src/main/java/org/apache/beam/examples/common/DataflowExampleOptions.java b/examples/java/src/main/java/org/apache/beam/examples/common/ExampleOptions.java similarity index 91% rename from examples/java/src/main/java/org/apache/beam/examples/common/DataflowExampleOptions.java rename to examples/java/src/main/java/org/apache/beam/examples/common/ExampleOptions.java index 2e8ef3d7ed7c..bba7b21931d6 100644 --- a/examples/java/src/main/java/org/apache/beam/examples/common/DataflowExampleOptions.java +++ b/examples/java/src/main/java/org/apache/beam/examples/common/ExampleOptions.java @@ -22,9 +22,9 @@ import org.apache.beam.sdk.options.Description; /** - * Options that can be used to configure the Dataflow examples. + * Options that can be used to configure the Beam examples. */ -public interface DataflowExampleOptions extends DataflowPipelineOptions { +public interface ExampleOptions extends DataflowPipelineOptions { @Description("Whether to keep jobs running on the Dataflow service after local process exit") @Default.Boolean(false) boolean getKeepJobsRunning(); diff --git a/examples/java/src/main/java/org/apache/beam/examples/common/DataflowExampleUtils.java b/examples/java/src/main/java/org/apache/beam/examples/common/ExampleUtils.java similarity index 97% rename from examples/java/src/main/java/org/apache/beam/examples/common/DataflowExampleUtils.java rename to examples/java/src/main/java/org/apache/beam/examples/common/ExampleUtils.java index a90968a445f8..e30b1e45aee3 100644 --- a/examples/java/src/main/java/org/apache/beam/examples/common/DataflowExampleUtils.java +++ b/examples/java/src/main/java/org/apache/beam/examples/common/ExampleUtils.java @@ -56,12 +56,12 @@ import java.util.concurrent.TimeUnit; /** - * The utility class that sets up and tears down external resources, starts the Google Cloud Pub/Sub - * injector, and cancels the streaming and the injector pipelines once the program terminates. + * The utility class that sets up and tears down external resources, + * and cancels the streaming pipelines once the program terminates. * - *

It is used to run Dataflow examples, such as TrafficMaxLaneFlow and TrafficRoutes. + *

It is used to run Beam examples, such as TrafficMaxLaneFlow and TrafficRoutes. */ -public class DataflowExampleUtils { +public class ExampleUtils { private static final int SC_NOT_FOUND = 404; @@ -72,14 +72,14 @@ public class DataflowExampleUtils { private Set jobsToCancel = Sets.newHashSet(); private List pendingMessages = Lists.newArrayList(); - public DataflowExampleUtils(DataflowPipelineOptions options) { + public ExampleUtils(DataflowPipelineOptions options) { this.options = options; } /** * Do resources and runner options setup. */ - public DataflowExampleUtils(DataflowPipelineOptions options, boolean isUnbounded) + public ExampleUtils(DataflowPipelineOptions options, boolean isUnbounded) throws IOException { this.options = options; setupResourcesAndRunner(isUnbounded); @@ -316,7 +316,7 @@ public void waitToFinish(PipelineResult result) { if (result instanceof DataflowPipelineJob) { final DataflowPipelineJob job = (DataflowPipelineJob) result; jobsToCancel.add(job); - if (!options.as(DataflowExampleOptions.class).getKeepJobsRunning()) { + if (!options.as(ExampleOptions.class).getKeepJobsRunning()) { addShutdownHook(jobsToCancel); } try { diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/AutoComplete.java b/examples/java/src/main/java/org/apache/beam/examples/complete/AutoComplete.java index 98c4994deb53..f8cd0f1356cf 100644 --- a/examples/java/src/main/java/org/apache/beam/examples/complete/AutoComplete.java +++ b/examples/java/src/main/java/org/apache/beam/examples/complete/AutoComplete.java @@ -20,8 +20,8 @@ import static com.google.common.base.Preconditions.checkArgument; import static com.google.datastore.v1beta3.client.DatastoreHelper.makeValue; -import org.apache.beam.examples.common.DataflowExampleUtils; import org.apache.beam.examples.common.ExampleBigQueryTableOptions; +import org.apache.beam.examples.common.ExampleUtils; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.PipelineResult; import org.apache.beam.sdk.coders.AvroCoder; @@ -451,7 +451,7 @@ public static void main(String[] args) throws IOException { Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class); options.setBigQuerySchema(FormatForBigquery.getSchema()); - DataflowExampleUtils dataflowUtils = new DataflowExampleUtils(options); + ExampleUtils dataflowUtils = new ExampleUtils(options); // We support running the same pipeline in either // batch or windowed streaming mode. diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/StreamingWordExtract.java b/examples/java/src/main/java/org/apache/beam/examples/complete/StreamingWordExtract.java index 4ea199c06077..046428c797bb 100644 --- a/examples/java/src/main/java/org/apache/beam/examples/complete/StreamingWordExtract.java +++ b/examples/java/src/main/java/org/apache/beam/examples/complete/StreamingWordExtract.java @@ -17,8 +17,8 @@ */ package org.apache.beam.examples.complete; -import org.apache.beam.examples.common.DataflowExampleUtils; import org.apache.beam.examples.common.ExampleBigQueryTableOptions; +import org.apache.beam.examples.common.ExampleUtils; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.PipelineResult; import org.apache.beam.sdk.io.BigQueryIO; @@ -120,7 +120,7 @@ public static void main(String[] args) throws IOException { options.setStreaming(true); options.setBigQuerySchema(StringToRowConverter.getSchema()); - DataflowExampleUtils dataflowUtils = new DataflowExampleUtils(options); + ExampleUtils dataflowUtils = new ExampleUtils(options); dataflowUtils.setup(); Pipeline pipeline = Pipeline.create(options); diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/TrafficMaxLaneFlow.java b/examples/java/src/main/java/org/apache/beam/examples/complete/TrafficMaxLaneFlow.java index 2db7c9e99d68..96cecb2a8867 100644 --- a/examples/java/src/main/java/org/apache/beam/examples/complete/TrafficMaxLaneFlow.java +++ b/examples/java/src/main/java/org/apache/beam/examples/complete/TrafficMaxLaneFlow.java @@ -17,9 +17,9 @@ */ package org.apache.beam.examples.complete; -import org.apache.beam.examples.common.DataflowExampleOptions; -import org.apache.beam.examples.common.DataflowExampleUtils; import org.apache.beam.examples.common.ExampleBigQueryTableOptions; +import org.apache.beam.examples.common.ExampleOptions; +import org.apache.beam.examples.common.ExampleUtils; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.PipelineResult; import org.apache.beam.sdk.coders.AvroCoder; @@ -307,7 +307,7 @@ public PCollection apply(PBegin begin) { * *

Inherits standard configuration options. */ - private interface TrafficMaxLaneFlowOptions extends DataflowExampleOptions, + private interface TrafficMaxLaneFlowOptions extends ExampleOptions, ExampleBigQueryTableOptions { @Description("Path of the file to read from") @Default.String("gs://dataflow-samples/traffic_sensor/" @@ -342,7 +342,7 @@ public static void main(String[] args) throws IOException { .as(TrafficMaxLaneFlowOptions.class); options.setBigQuerySchema(FormatMaxesFn.getSchema()); // Using DataflowExampleUtils to set up required resources. - DataflowExampleUtils dataflowUtils = new DataflowExampleUtils(options, options.isUnbounded()); + ExampleUtils dataflowUtils = new ExampleUtils(options, options.isUnbounded()); Pipeline pipeline = Pipeline.create(options); TableReference tableRef = new TableReference(); diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/TrafficRoutes.java b/examples/java/src/main/java/org/apache/beam/examples/complete/TrafficRoutes.java index 89cfbfcb083f..6fa63a83f6cc 100644 --- a/examples/java/src/main/java/org/apache/beam/examples/complete/TrafficRoutes.java +++ b/examples/java/src/main/java/org/apache/beam/examples/complete/TrafficRoutes.java @@ -17,9 +17,9 @@ */ package org.apache.beam.examples.complete; -import org.apache.beam.examples.common.DataflowExampleOptions; -import org.apache.beam.examples.common.DataflowExampleUtils; import org.apache.beam.examples.common.ExampleBigQueryTableOptions; +import org.apache.beam.examples.common.ExampleOptions; +import org.apache.beam.examples.common.ExampleUtils; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.PipelineResult; import org.apache.beam.sdk.coders.AvroCoder; @@ -317,7 +317,7 @@ public PCollection apply(PBegin begin) { * *

Inherits standard configuration options. */ - private interface TrafficRoutesOptions extends DataflowExampleOptions, + private interface TrafficRoutesOptions extends ExampleOptions, ExampleBigQueryTableOptions { @Description("Path of the file to read from") @Default.String("gs://dataflow-samples/traffic_sensor/" @@ -353,7 +353,7 @@ public static void main(String[] args) throws IOException { options.setBigQuerySchema(FormatStatsFn.getSchema()); // Using DataflowExampleUtils to set up required resources. - DataflowExampleUtils dataflowUtils = new DataflowExampleUtils(options, options.isUnbounded()); + ExampleUtils dataflowUtils = new ExampleUtils(options, options.isUnbounded()); Pipeline pipeline = Pipeline.create(options); TableReference tableRef = new TableReference(); diff --git a/examples/java/src/main/java/org/apache/beam/examples/cookbook/TriggerExample.java b/examples/java/src/main/java/org/apache/beam/examples/cookbook/TriggerExample.java index 5e608355d58c..c7926220d25b 100644 --- a/examples/java/src/main/java/org/apache/beam/examples/cookbook/TriggerExample.java +++ b/examples/java/src/main/java/org/apache/beam/examples/cookbook/TriggerExample.java @@ -17,9 +17,9 @@ */ package org.apache.beam.examples.cookbook; -import org.apache.beam.examples.common.DataflowExampleOptions; -import org.apache.beam.examples.common.DataflowExampleUtils; import org.apache.beam.examples.common.ExampleBigQueryTableOptions; +import org.apache.beam.examples.common.ExampleOptions; +import org.apache.beam.examples.common.ExampleUtils; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.PipelineResult; import org.apache.beam.sdk.io.BigQueryIO; @@ -420,7 +420,7 @@ public void processElement(ProcessContext c) throws Exception { * Inherits standard configuration options. */ public interface TrafficFlowOptions - extends ExampleBigQueryTableOptions, DataflowExampleOptions { + extends ExampleBigQueryTableOptions, ExampleOptions { @Description("Input file to read from") @Default.String("gs://dataflow-samples/traffic_sensor/" @@ -444,7 +444,7 @@ public static void main(String[] args) throws Exception { options.setBigQuerySchema(getSchema()); - DataflowExampleUtils dataflowUtils = new DataflowExampleUtils(options); + ExampleUtils dataflowUtils = new ExampleUtils(options); dataflowUtils.setup(); Pipeline pipeline = Pipeline.create(options); diff --git a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java index b1cb312f58a1..5b27f8328d5b 100644 --- a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java +++ b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java @@ -17,7 +17,7 @@ */ package org.apache.beam.examples.complete.game; -import org.apache.beam.examples.common.DataflowExampleUtils; +import org.apache.beam.examples.common.ExampleUtils; import org.apache.beam.examples.complete.game.utils.WriteWindowedToBigQuery; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.PipelineResult; @@ -242,7 +242,7 @@ public static void main(String[] args) throws Exception { Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class); // Enforce that this pipeline is always run in streaming mode. options.setStreaming(true); - DataflowExampleUtils dataflowUtils = new DataflowExampleUtils(options); + ExampleUtils dataflowUtils = new ExampleUtils(options); Pipeline pipeline = Pipeline.create(options); // Read Events from Pub/Sub using custom timestamps diff --git a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/LeaderBoard.java b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/LeaderBoard.java index a14d53341337..051b4dee17c3 100644 --- a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/LeaderBoard.java +++ b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/LeaderBoard.java @@ -17,8 +17,8 @@ */ package org.apache.beam.examples.complete.game; -import org.apache.beam.examples.common.DataflowExampleOptions; -import org.apache.beam.examples.common.DataflowExampleUtils; +import org.apache.beam.examples.common.ExampleOptions; +import org.apache.beam.examples.common.ExampleUtils; import org.apache.beam.examples.complete.game.utils.WriteToBigQuery; import org.apache.beam.examples.complete.game.utils.WriteWindowedToBigQuery; import org.apache.beam.sdk.Pipeline; @@ -102,7 +102,7 @@ public class LeaderBoard extends HourlyTeamScore { /** * Options supported by {@link LeaderBoard}. */ - static interface Options extends HourlyTeamScore.Options, DataflowExampleOptions { + static interface Options extends HourlyTeamScore.Options, ExampleOptions { @Description("Pub/Sub topic to read from") @Validation.Required @@ -178,7 +178,7 @@ public static void main(String[] args) throws Exception { Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class); // Enforce that this pipeline is always run in streaming mode. options.setStreaming(true); - DataflowExampleUtils dataflowUtils = new DataflowExampleUtils(options); + ExampleUtils dataflowUtils = new ExampleUtils(options); Pipeline pipeline = Pipeline.create(options); // Read game events from Pub/Sub using custom timestamps, which are extracted from the pubsub From d12b5a69be6f91a364bcdce4d39a724dd96472a7 Mon Sep 17 00:00:00 2001 From: Pei He Date: Thu, 7 Jul 2016 14:52:03 -0700 Subject: [PATCH 2/2] addressed comments --- .../main/java/org/apache/beam/examples/WindowedWordCount.java | 3 +-- .../org/apache/beam/examples/complete/TrafficMaxLaneFlow.java | 3 +-- .../java/org/apache/beam/examples/complete/TrafficRoutes.java | 3 +-- .../java/org/apache/beam/examples/cookbook/TriggerExample.java | 3 +-- 4 files changed, 4 insertions(+), 8 deletions(-) diff --git a/examples/java/src/main/java/org/apache/beam/examples/WindowedWordCount.java b/examples/java/src/main/java/org/apache/beam/examples/WindowedWordCount.java index b392985a18e5..b32128a0eb5c 100644 --- a/examples/java/src/main/java/org/apache/beam/examples/WindowedWordCount.java +++ b/examples/java/src/main/java/org/apache/beam/examples/WindowedWordCount.java @@ -192,8 +192,7 @@ public static void main(String[] args) throws IOException { options.setBigQuerySchema(getSchema()); // DataflowExampleUtils creates the necessary input sources to simplify execution of this // Pipeline. - ExampleUtils exampleDataflowUtils = new ExampleUtils(options, - options.isUnbounded()); + ExampleUtils exampleDataflowUtils = new ExampleUtils(options, options.isUnbounded()); Pipeline pipeline = Pipeline.create(options); diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/TrafficMaxLaneFlow.java b/examples/java/src/main/java/org/apache/beam/examples/complete/TrafficMaxLaneFlow.java index 96cecb2a8867..1bbc68bdb7c4 100644 --- a/examples/java/src/main/java/org/apache/beam/examples/complete/TrafficMaxLaneFlow.java +++ b/examples/java/src/main/java/org/apache/beam/examples/complete/TrafficMaxLaneFlow.java @@ -307,8 +307,7 @@ public PCollection apply(PBegin begin) { * *

Inherits standard configuration options. */ - private interface TrafficMaxLaneFlowOptions extends ExampleOptions, - ExampleBigQueryTableOptions { + private interface TrafficMaxLaneFlowOptions extends ExampleOptions, ExampleBigQueryTableOptions { @Description("Path of the file to read from") @Default.String("gs://dataflow-samples/traffic_sensor/" + "Freeways-5Minaa2010-01-01_to_2010-02-15_test2.csv") diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/TrafficRoutes.java b/examples/java/src/main/java/org/apache/beam/examples/complete/TrafficRoutes.java index 6fa63a83f6cc..8af0922b5ee9 100644 --- a/examples/java/src/main/java/org/apache/beam/examples/complete/TrafficRoutes.java +++ b/examples/java/src/main/java/org/apache/beam/examples/complete/TrafficRoutes.java @@ -317,8 +317,7 @@ public PCollection apply(PBegin begin) { * *

Inherits standard configuration options. */ - private interface TrafficRoutesOptions extends ExampleOptions, - ExampleBigQueryTableOptions { + private interface TrafficRoutesOptions extends ExampleOptions, ExampleBigQueryTableOptions { @Description("Path of the file to read from") @Default.String("gs://dataflow-samples/traffic_sensor/" + "Freeways-5Minaa2010-01-01_to_2010-02-15_test2.csv") diff --git a/examples/java/src/main/java/org/apache/beam/examples/cookbook/TriggerExample.java b/examples/java/src/main/java/org/apache/beam/examples/cookbook/TriggerExample.java index c7926220d25b..aa91ac69353d 100644 --- a/examples/java/src/main/java/org/apache/beam/examples/cookbook/TriggerExample.java +++ b/examples/java/src/main/java/org/apache/beam/examples/cookbook/TriggerExample.java @@ -419,8 +419,7 @@ public void processElement(ProcessContext c) throws Exception { /** * Inherits standard configuration options. */ - public interface TrafficFlowOptions - extends ExampleBigQueryTableOptions, ExampleOptions { + public interface TrafficFlowOptions extends ExampleBigQueryTableOptions, ExampleOptions { @Description("Input file to read from") @Default.String("gs://dataflow-samples/traffic_sensor/"