diff --git a/.gitignore b/.gitignore index fcfeafc0a91ea..39e2f62cd8920 100644 --- a/.gitignore +++ b/.gitignore @@ -7,6 +7,7 @@ target/ bin/ # Ignore generated archetypes +sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/ sdks/java/maven-archetypes/examples-java8/src/main/resources/archetype-resources/src/ # Ignore IntelliJ files. diff --git a/sdks/java/maven-archetypes/update-examples-archetype.sh b/sdks/java/maven-archetypes/examples/generate-sources.sh similarity index 73% rename from sdks/java/maven-archetypes/update-examples-archetype.sh rename to sdks/java/maven-archetypes/examples/generate-sources.sh index 3c222be0d0f84..b1a86a7c06be0 100755 --- a/sdks/java/maven-archetypes/update-examples-archetype.sh +++ b/sdks/java/maven-archetypes/examples/generate-sources.sh @@ -24,10 +24,13 @@ HERE="$(dirname $0)" # The directory of the examples-java module -EXAMPLES_ROOT="${HERE}/../../../examples/java" +EXAMPLES_ROOT="${HERE}/../../../../examples/java" # The root of the examples archetype -ARCHETYPE_ROOT="${HERE}/examples/src/main/resources/archetype-resources" +ARCHETYPE_ROOT="${HERE}/src/main/resources/archetype-resources" + +mkdir -p "${ARCHETYPE_ROOT}/src/main/java" +mkdir -p "${ARCHETYPE_ROOT}/src/test/java" # # Copy the Java subset of the examples project verbatim. @@ -44,16 +47,22 @@ rsync -a --exclude cookbook --exclude complete --exclude '*IT.java' \ # Replace 'package org.apache.beam.examples' with 'package ${package}' in all Java code # find "${ARCHETYPE_ROOT}/src/main/java" -name '*.java' -print0 \ - | xargs -0 sed -i 's/^package org\.apache\.beam\.examples/package ${package}/g' + | xargs -0 sed -i.bak 's/^package org\.apache\.beam\.examples/package ${package}/g' find "${ARCHETYPE_ROOT}/src/test/java" -name '*.java' -print0 \ - | xargs -0 sed -i 's/^package org\.apache\.beam\.examples/package ${package}/g' + | xargs -0 sed -i.bak 's/^package org\.apache\.beam\.examples/package ${package}/g' # # Replace 'import org.apache.beam.examples.' with 'import ${package}.' in all Java code # find "${ARCHETYPE_ROOT}/src/main/java" -name '*.java' -print0 \ - | xargs -0 sed -i 's/^import org\.apache\.beam\.examples/import ${package}/g' + | xargs -0 sed -i.bak 's/^import org\.apache\.beam\.examples/import ${package}/g' find "${ARCHETYPE_ROOT}/src/test/java" -name '*.java' -print0 \ - | xargs -0 sed -i 's/^import org\.apache\.beam\.examples/import ${package}/g' + | xargs -0 sed -i.bak 's/^import org\.apache\.beam\.examples/import ${package}/g' + +# +# The use of -i.bak is necessary for the above to work with both GNU and BSD sed. +# Delete the files now. +# +find "${ARCHETYPE_ROOT}/src" -name '*.bak' -delete diff --git a/sdks/java/maven-archetypes/examples/pom.xml b/sdks/java/maven-archetypes/examples/pom.xml index b8555bca46bef..637231fc7c9b0 100644 --- a/sdks/java/maven-archetypes/examples/pom.xml +++ b/sdks/java/maven-archetypes/examples/pom.xml @@ -15,7 +15,9 @@ See the License for the specific language governing permissions and limitations under the License. --> - + 4.0.0 @@ -29,7 +31,8 @@ beam-sdks-java-maven-archetypes-examples Apache Beam :: SDKs :: Java :: Maven Archetypes :: Examples A Maven Archetype to create a project containing all the - example pipelines from the Apache Beam Java SDK. + example pipelines from the Apache Beam Java SDK. + maven-archetype @@ -65,7 +68,27 @@ + + + + exec-maven-plugin + org.codehaus.mojo + + + generate-archetype-contents + generate-sources + + exec + + + ${project.basedir}/generate-sources.sh + + + + + + org.apache.beam diff --git a/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/pom.xml b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/pom.xml index 272157d8d77b8..b18c57c1bd989 100644 --- a/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/pom.xml +++ b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/pom.xml @@ -81,24 +81,7 @@ false - - - org.apache.maven.plugins - maven-dependency-plugin - 2.10 - - - analyze-only - - - true - true - - - - - - + diff --git a/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/DebuggingWordCount.java b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/DebuggingWordCount.java deleted file mode 100644 index 99ae79687cb05..0000000000000 --- a/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/DebuggingWordCount.java +++ /dev/null @@ -1,164 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package ${package}; - -import java.util.Arrays; -import java.util.List; -import java.util.regex.Pattern; -import org.apache.beam.sdk.Pipeline; -import org.apache.beam.sdk.io.TextIO; -import org.apache.beam.sdk.options.Default; -import org.apache.beam.sdk.options.Description; -import org.apache.beam.sdk.options.PipelineOptionsFactory; -import org.apache.beam.sdk.testing.PAssert; -import org.apache.beam.sdk.transforms.Aggregator; -import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.ParDo; -import org.apache.beam.sdk.transforms.Sum; -import org.apache.beam.sdk.values.KV; -import org.apache.beam.sdk.values.PCollection; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - - -/** - * An example that verifies word counts in Shakespeare and includes Beam best practices. - * - *

This class, {@link DebuggingWordCount}, is the third in a series of four successively more - * detailed 'word count' examples. You may first want to take a look at {@link MinimalWordCount} - * and {@link WordCount}. After you've looked at this example, then see the - * {@link WindowedWordCount} pipeline, for introduction of additional concepts. - * - *

Basic concepts, also in the MinimalWordCount and WordCount examples: - * Reading text files; counting a PCollection; executing a Pipeline both locally - * and using a selected runner; defining DoFns. - * - *

New Concepts: - *

- *   1. Logging using SLF4J, even in a distributed environment
- *   2. Creating a custom aggregator (runners have varying levels of support)
- *   3. Testing your Pipeline via PAssert
- * 
- * - *

To execute this pipeline locally, specify general pipeline configuration: - *

{@code
- *   --project=YOUR_PROJECT_ID
- * }
- * 
- * - *

To change the runner, specify: - *

{@code
- *   --runner=YOUR_SELECTED_RUNNER
- * }
- * 
- * - *

The input file defaults to a public data set containing the text of of King Lear, - * by William Shakespeare. You can override it and choose your own input with {@code --inputFile}. - * - */ -public class DebuggingWordCount { - /** A DoFn that filters for a specific key based upon a regular expression. */ - public static class FilterTextFn extends DoFn, KV> { - /** - * Concept #1: The logger below uses the fully qualified class name of FilterTextFn as the - * logger. Depending on your SLF4J configuration, log statements will likely be qualified by - * this name. - * - *

Note that this is entirely standard SLF4J usage. Some runners may provide a default SLF4J - * configuration that is most appropriate for their logging integration. - */ - private static final Logger LOG = LoggerFactory.getLogger(FilterTextFn.class); - - private final Pattern filter; - public FilterTextFn(String pattern) { - filter = Pattern.compile(pattern); - } - - /** - * Concept #2: A custom aggregator can track values in your pipeline as it runs. Each - * runner provides varying levels of support for aggregators, and may expose them - * in a dashboard, etc. - */ - private final Aggregator matchedWords = - createAggregator("matchedWords", new Sum.SumLongFn()); - private final Aggregator unmatchedWords = - createAggregator("umatchedWords", new Sum.SumLongFn()); - - @ProcessElement - public void processElement(ProcessContext c) { - if (filter.matcher(c.element().getKey()).matches()) { - // Log at the "DEBUG" level each element that we match. When executing this pipeline - // these log lines will appear only if the log level is set to "DEBUG" or lower. - LOG.debug("Matched: " + c.element().getKey()); - matchedWords.addValue(1L); - c.output(c.element()); - } else { - // Log at the "TRACE" level each element that is not matched. Different log levels - // can be used to control the verbosity of logging providing an effective mechanism - // to filter less important information. - LOG.trace("Did not match: " + c.element().getKey()); - unmatchedWords.addValue(1L); - } - } - } - - /** - * Options supported by {@link DebuggingWordCount}. - * - *

Inherits standard configuration options and all options defined in - * {@link WordCount.WordCountOptions}. - */ - public interface WordCountOptions extends WordCount.WordCountOptions { - - @Description("Regex filter pattern to use in DebuggingWordCount. " - + "Only words matching this pattern will be counted.") - @Default.String("Flourish|stomach") - String getFilterPattern(); - void setFilterPattern(String value); - } - - public static void main(String[] args) { - WordCountOptions options = PipelineOptionsFactory.fromArgs(args).withValidation() - .as(WordCountOptions.class); - Pipeline p = Pipeline.create(options); - - PCollection> filteredWords = - p.apply("ReadLines", TextIO.Read.from(options.getInputFile())) - .apply(new WordCount.CountWords()) - .apply(ParDo.of(new FilterTextFn(options.getFilterPattern()))); - - /** - * Concept #3: PAssert is a set of convenient PTransforms in the style of - * Hamcrest's collection matchers that can be used when writing Pipeline level tests - * to validate the contents of PCollections. PAssert is best used in unit tests - * with small data sets but is demonstrated here as a teaching tool. - * - *

Below we verify that the set of filtered words matches our expected counts. Note - * that PAssert does not provide any output and that successful completion of the - * Pipeline implies that the expectations were met. Learn more at - * https://cloud.google.com/dataflow/pipelines/testing-your-pipeline on how to test - * your Pipeline and see {@link DebuggingWordCountTest} for an example unit test. - */ - List> expectedResults = Arrays.asList( - KV.of("Flourish", 3L), - KV.of("stomach", 1L)); - PAssert.that(filteredWords).containsInAnyOrder(expectedResults); - - p.run().waitUntilFinish(); - } -} diff --git a/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/MinimalWordCount.java b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/MinimalWordCount.java deleted file mode 100644 index 97bd8243b87f5..0000000000000 --- a/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/MinimalWordCount.java +++ /dev/null @@ -1,118 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package ${package}; - -import org.apache.beam.sdk.Pipeline; -import org.apache.beam.sdk.io.TextIO; -import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.beam.sdk.options.PipelineOptionsFactory; -import org.apache.beam.sdk.transforms.Count; -import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.MapElements; -import org.apache.beam.sdk.transforms.ParDo; -import org.apache.beam.sdk.transforms.SimpleFunction; -import org.apache.beam.sdk.values.KV; - - -/** - * An example that counts words in Shakespeare. - * - *

This class, {@link MinimalWordCount}, is the first in a series of four successively more - * detailed 'word count' examples. Here, for simplicity, we don't show any error-checking or - * argument processing, and focus on construction of the pipeline, which chains together the - * application of core transforms. - * - *

Next, see the {@link WordCount} pipeline, then the {@link DebuggingWordCount}, and finally the - * {@link WindowedWordCount} pipeline, for more detailed examples that introduce additional - * concepts. - * - *

Concepts: - * - *

- *   1. Reading data from text files
- *   2. Specifying 'inline' transforms
- *   3. Counting items in a PCollection
- *   4. Writing data to text files
- * 
- * - *

No arguments are required to run this pipeline. It will be executed with the DirectRunner. You - * can see the results in the output files in your current working directory, with names like - * "wordcounts-00001-of-00005. When running on a distributed service, you would use an appropriate - * file service. - */ -public class MinimalWordCount { - - public static void main(String[] args) { - // Create a PipelineOptions object. This object lets us set various execution - // options for our pipeline, such as the runner you wish to use. This example - // will run with the DirectRunner by default, based on the class path configured - // in its dependencies. - PipelineOptions options = PipelineOptionsFactory.create(); - - // Create the Pipeline object with the options we defined above. - Pipeline p = Pipeline.create(options); - - // Apply the pipeline's transforms. - - // Concept #1: Apply a root transform to the pipeline; in this case, TextIO.Read to read a set - // of input text files. TextIO.Read returns a PCollection where each element is one line from - // the input text (a set of Shakespeare's texts). - - // This example reads a public data set consisting of the complete works of Shakespeare. - p.apply(TextIO.Read.from("gs://apache-beam-samples/shakespeare/*")) - - // Concept #2: Apply a ParDo transform to our PCollection of text lines. This ParDo invokes a - // DoFn (defined in-line) on each element that tokenizes the text line into individual words. - // The ParDo returns a PCollection, where each element is an individual word in - // Shakespeare's collected texts. - .apply("ExtractWords", ParDo.of(new DoFn() { - @ProcessElement - public void processElement(ProcessContext c) { - for (String word : c.element().split("[^a-zA-Z']+")) { - if (!word.isEmpty()) { - c.output(word); - } - } - } - })) - - // Concept #3: Apply the Count transform to our PCollection of individual words. The Count - // transform returns a new PCollection of key/value pairs, where each key represents a unique - // word in the text. The associated value is the occurrence count for that word. - .apply(Count.perElement()) - - // Apply a MapElements transform that formats our PCollection of word counts into a printable - // string, suitable for writing to an output file. - .apply("FormatResults", MapElements.via(new SimpleFunction, String>() { - @Override - public String apply(KV input) { - return input.getKey() + ": " + input.getValue(); - } - })) - - // Concept #4: Apply a write transform, TextIO.Write, at the end of the pipeline. - // TextIO.Write writes the contents of a PCollection (in this case, our PCollection of - // formatted strings) to a series of text files. - // - // By default, it will write to a set of files with names like wordcount-00001-of-00005 - .apply(TextIO.Write.to("wordcounts")); - - // Run the pipeline. - p.run().waitUntilFinish(); - } -} diff --git a/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/WindowedWordCount.java b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/WindowedWordCount.java deleted file mode 100644 index 28125319e2124..0000000000000 --- a/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/WindowedWordCount.java +++ /dev/null @@ -1,229 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package ${package}; - -import com.google.api.services.bigquery.model.TableFieldSchema; -import com.google.api.services.bigquery.model.TableReference; -import com.google.api.services.bigquery.model.TableRow; -import com.google.api.services.bigquery.model.TableSchema; -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; -import ${package}.common.ExampleBigQueryTableOptions; -import ${package}.common.ExampleOptions; -import ${package}.common.ExampleUtils; -import org.apache.beam.sdk.Pipeline; -import org.apache.beam.sdk.PipelineResult; -import org.apache.beam.sdk.io.TextIO; -import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO; -import org.apache.beam.sdk.options.Default; -import org.apache.beam.sdk.options.Description; -import org.apache.beam.sdk.options.PipelineOptionsFactory; -import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.ParDo; -import org.apache.beam.sdk.transforms.windowing.FixedWindows; -import org.apache.beam.sdk.transforms.windowing.Window; -import org.apache.beam.sdk.values.KV; -import org.apache.beam.sdk.values.PCollection; -import org.joda.time.Duration; -import org.joda.time.Instant; - - -/** - * An example that counts words in text, and can run over either unbounded or bounded input - * collections. - * - *

This class, {@link WindowedWordCount}, is the last in a series of four successively more - * detailed 'word count' examples. First take a look at {@link MinimalWordCount}, - * {@link WordCount}, and {@link DebuggingWordCount}. - * - *

Basic concepts, also in the MinimalWordCount, WordCount, and DebuggingWordCount examples: - * Reading text files; counting a PCollection; writing to GCS; executing a Pipeline both locally - * and using a selected runner; defining DoFns; creating a custom aggregator; - * user-defined PTransforms; defining PipelineOptions. - * - *

New Concepts: - *

- *   1. Unbounded and bounded pipeline input modes
- *   2. Adding timestamps to data
- *   3. Windowing
- *   4. Re-using PTransforms over windowed PCollections
- *   5. Writing to BigQuery
- * 
- * - *

By default, the examples will run with the {@code DirectRunner}. - * To change the runner, specify: - *

{@code
- *   --runner=YOUR_SELECTED_RUNNER
- * }
- * 
- * See examples/java/README.md for instructions about how to configure different runners. - * - *

Optionally specify the input file path via: - * {@code --inputFile=gs://INPUT_PATH}, - * which defaults to {@code gs://apache-beam-samples/shakespeare/kinglear.txt}. - * - *

Specify an output BigQuery dataset and optionally, a table for the output. If you don't - * specify the table, one will be created for you using the job name. If you don't specify the - * dataset, a dataset called {@code beam_examples} must already exist in your project. - * {@code --bigQueryDataset=YOUR-DATASET --bigQueryTable=YOUR-NEW-TABLE-NAME}. - * - *

By default, the pipeline will do fixed windowing, on 1-minute windows. You can - * change this interval by setting the {@code --windowSize} parameter, e.g. {@code --windowSize=10} - * for 10-minute windows. - * - *

The example will try to cancel the pipelines on the signal to terminate the process (CTRL-C) - * and then exits. - */ -public class WindowedWordCount { - static final int WINDOW_SIZE = 1; // Default window duration in minutes - - /** - * Concept #2: A DoFn that sets the data element timestamp. This is a silly method, just for - * this example, for the bounded data case. - * - *

Imagine that many ghosts of Shakespeare are all typing madly at the same time to recreate - * his masterworks. Each line of the corpus will get a random associated timestamp somewhere in a - * 2-hour period. - */ - static class AddTimestampFn extends DoFn { - private static final Duration RAND_RANGE = Duration.standardHours(2); - private final Instant minTimestamp; - - AddTimestampFn() { - this.minTimestamp = new Instant(System.currentTimeMillis()); - } - - @ProcessElement - public void processElement(ProcessContext c) { - // Generate a timestamp that falls somewhere in the past two hours. - long randMillis = (long) (Math.random() * RAND_RANGE.getMillis()); - Instant randomTimestamp = minTimestamp.plus(randMillis); - /** - * Concept #2: Set the data element with that timestamp. - */ - c.outputWithTimestamp(c.element(), new Instant(randomTimestamp)); - } - } - - /** A DoFn that converts a Word and Count into a BigQuery table row. */ - static class FormatAsTableRowFn extends DoFn, TableRow> { - @ProcessElement - public void processElement(ProcessContext c) { - TableRow row = new TableRow() - .set("word", c.element().getKey()) - .set("count", c.element().getValue()) - // include a field for the window timestamp - .set("window_timestamp", c.timestamp().toString()); - c.output(row); - } - } - - /** - * Helper method that defines the BigQuery schema used for the output. - */ - private static TableSchema getSchema() { - List fields = new ArrayList<>(); - fields.add(new TableFieldSchema().setName("word").setType("STRING")); - fields.add(new TableFieldSchema().setName("count").setType("INTEGER")); - fields.add(new TableFieldSchema().setName("window_timestamp").setType("TIMESTAMP")); - TableSchema schema = new TableSchema().setFields(fields); - return schema; - } - - /** - * Concept #5: We'll stream the results to a BigQuery table. The BigQuery output source is one - * that supports both bounded and unbounded data. This is a helper method that creates a - * TableReference from input options, to tell the pipeline where to write its BigQuery results. - */ - private static TableReference getTableReference(Options options) { - TableReference tableRef = new TableReference(); - tableRef.setProjectId(options.getProject()); - tableRef.setDatasetId(options.getBigQueryDataset()); - tableRef.setTableId(options.getBigQueryTable()); - return tableRef; - } - - /** - * Options supported by {@link WindowedWordCount}. - * - *

Inherits standard example configuration options, which allow specification of the BigQuery - * table, as well as the {@link WordCount.WordCountOptions} support for - * specification of the input file. - */ - public interface Options extends WordCount.WordCountOptions, - ExampleOptions, ExampleBigQueryTableOptions { - @Description("Fixed window duration, in minutes") - @Default.Integer(WINDOW_SIZE) - Integer getWindowSize(); - void setWindowSize(Integer value); - } - - public static void main(String[] args) throws IOException { - Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class); - options.setBigQuerySchema(getSchema()); - // ExampleUtils creates the necessary input sources to simplify execution of this Pipeline. - ExampleUtils exampleUtils = new ExampleUtils(options); - exampleUtils.setup(); - - Pipeline pipeline = Pipeline.create(options); - - /** - * Concept #1: the Beam SDK lets us run the same pipeline with either a bounded or - * unbounded input source. - */ - PCollection input = pipeline - /** Read from the GCS file. */ - .apply(TextIO.Read.from(options.getInputFile())) - // Concept #2: Add an element timestamp, using an artificial time just to show windowing. - // See AddTimestampFn for more detail on this. - .apply(ParDo.of(new AddTimestampFn())); - - /** - * Concept #3: Window into fixed windows. The fixed window size for this example defaults to 1 - * minute (you can change this with a command-line option). See the documentation for more - * information on how fixed windows work, and for information on the other types of windowing - * available (e.g., sliding windows). - */ - PCollection windowedWords = input - .apply(Window.into( - FixedWindows.of(Duration.standardMinutes(options.getWindowSize())))); - - /** - * Concept #4: Re-use our existing CountWords transform that does not have knowledge of - * windows over a PCollection containing windowed values. - */ - PCollection> wordCounts = windowedWords.apply(new WordCount.CountWords()); - - /** - * Concept #5: Format the results for a BigQuery table, then write to BigQuery. - * The BigQuery output source supports both bounded and unbounded data. - */ - wordCounts.apply(ParDo.of(new FormatAsTableRowFn())) - .apply(BigQueryIO.Write - .to(getTableReference(options)) - .withSchema(getSchema()) - .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED) - .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)); - - PipelineResult result = pipeline.run(); - - // ExampleUtils will try to cancel the pipeline before the program exists. - exampleUtils.waitToFinish(result); - } -} diff --git a/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/WordCount.java b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/WordCount.java deleted file mode 100644 index 8fe71370c74fd..0000000000000 --- a/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/WordCount.java +++ /dev/null @@ -1,186 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package ${package}; - -import org.apache.beam.sdk.Pipeline; -import org.apache.beam.sdk.io.TextIO; -import org.apache.beam.sdk.options.Default; -import org.apache.beam.sdk.options.Description; -import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.beam.sdk.options.PipelineOptionsFactory; -import org.apache.beam.sdk.options.Validation.Required; -import org.apache.beam.sdk.transforms.Aggregator; -import org.apache.beam.sdk.transforms.Count; -import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.MapElements; -import org.apache.beam.sdk.transforms.PTransform; -import org.apache.beam.sdk.transforms.ParDo; -import org.apache.beam.sdk.transforms.SimpleFunction; -import org.apache.beam.sdk.transforms.Sum; -import org.apache.beam.sdk.values.KV; -import org.apache.beam.sdk.values.PCollection; - -/** - * An example that counts words in Shakespeare and includes Beam best practices. - * - *

This class, {@link WordCount}, is the second in a series of four successively more detailed - * 'word count' examples. You may first want to take a look at {@link MinimalWordCount}. - * After you've looked at this example, then see the {@link DebuggingWordCount} - * pipeline, for introduction of additional concepts. - * - *

For a detailed walkthrough of this example, see - * - * http://beam.incubator.apache.org/use/walkthroughs/ - * - * - *

Basic concepts, also in the MinimalWordCount example: - * Reading text files; counting a PCollection; writing to text files - * - *

New Concepts: - *

- *   1. Executing a Pipeline both locally and using the selected runner
- *   2. Using ParDo with static DoFns defined out-of-line
- *   3. Building a composite transform
- *   4. Defining your own pipeline options
- * 
- * - *

Concept #1: you can execute this pipeline either locally or using by selecting another runner. - * These are now command-line options and not hard-coded as they were in the MinimalWordCount - * example. - * - *

To change the runner, specify: - *

{@code
- *   --runner=YOUR_SELECTED_RUNNER
- * }
- * 
- * - *

To execute this pipeline, specify a local output file (if using the - * {@code DirectRunner}) or output prefix on a supported distributed file system. - *

{@code
- *   --output=[YOUR_LOCAL_FILE | YOUR_OUTPUT_PREFIX]
- * }
- * - *

The input file defaults to a public data set containing the text of of King Lear, - * by William Shakespeare. You can override it and choose your own input with {@code --inputFile}. - */ -public class WordCount { - - /** - * Concept #2: You can make your pipeline assembly code less verbose by defining your DoFns - * statically out-of-line. This DoFn tokenizes lines of text into individual words; we pass it - * to a ParDo in the pipeline. - */ - static class ExtractWordsFn extends DoFn { - private final Aggregator emptyLines = - createAggregator("emptyLines", new Sum.SumLongFn()); - - @ProcessElement - public void processElement(ProcessContext c) { - if (c.element().trim().isEmpty()) { - emptyLines.addValue(1L); - } - - // Split the line into words. - String[] words = c.element().split("[^a-zA-Z']+"); - - // Output each word encountered into the output PCollection. - for (String word : words) { - if (!word.isEmpty()) { - c.output(word); - } - } - } - } - - /** A SimpleFunction that converts a Word and Count into a printable string. */ - public static class FormatAsTextFn extends SimpleFunction, String> { - @Override - public String apply(KV input) { - return input.getKey() + ": " + input.getValue(); - } - } - - /** - * A PTransform that converts a PCollection containing lines of text into a PCollection of - * formatted word counts. - * - *

Concept #3: This is a custom composite transform that bundles two transforms (ParDo and - * Count) as a reusable PTransform subclass. Using composite transforms allows for easy reuse, - * modular testing, and an improved monitoring experience. - */ - public static class CountWords extends PTransform, - PCollection>> { - @Override - public PCollection> apply(PCollection lines) { - - // Convert lines of text into individual words. - PCollection words = lines.apply( - ParDo.of(new ExtractWordsFn())); - - // Count the number of times each word occurs. - PCollection> wordCounts = - words.apply(Count.perElement()); - - return wordCounts; - } - } - - /** - * Options supported by {@link WordCount}. - * - *

Concept #4: Defining your own configuration options. Here, you can add your own arguments - * to be processed by the command-line parser, and specify default values for them. You can then - * access the options values in your pipeline code. - * - *

Inherits standard configuration options. - */ - public interface WordCountOptions extends PipelineOptions { - - /** - * By default, this example reads from a public dataset containing the text of - * King Lear. Set this option to choose a different input file or glob. - */ - @Description("Path of the file to read from") - @Default.String("gs://apache-beam-samples/shakespeare/kinglear.txt") - String getInputFile(); - void setInputFile(String value); - - /** - * Set this required option to specify where to write the output. - */ - @Description("Path of the file to write to") - @Required - String getOutput(); - void setOutput(String value); - } - - public static void main(String[] args) { - WordCountOptions options = PipelineOptionsFactory.fromArgs(args).withValidation() - .as(WordCountOptions.class); - Pipeline p = Pipeline.create(options); - - // Concepts #2 and #3: Our pipeline applies the composite CountWords transform, and passes the - // static FormatAsTextFn() to the ParDo transform. - p.apply("ReadLines", TextIO.Read.from(options.getInputFile())) - .apply(new CountWords()) - .apply(MapElements.via(new FormatAsTextFn())) - .apply("WriteCounts", TextIO.Write.to(options.getOutput())); - - p.run().waitUntilFinish(); - } -} diff --git a/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/common/ExampleBigQueryTableOptions.java b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/common/ExampleBigQueryTableOptions.java deleted file mode 100644 index 6b51074f44d0b..0000000000000 --- a/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/common/ExampleBigQueryTableOptions.java +++ /dev/null @@ -1,55 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package ${package}.common; - -import com.google.api.services.bigquery.model.TableSchema; -import org.apache.beam.sdk.options.Default; -import org.apache.beam.sdk.options.DefaultValueFactory; -import org.apache.beam.sdk.options.Description; -import org.apache.beam.sdk.options.GcpOptions; -import org.apache.beam.sdk.options.PipelineOptions; - -/** - * Options that can be used to configure BigQuery tables in Beam examples. - * The project defaults to the project being used to run the example. - */ -public interface ExampleBigQueryTableOptions extends GcpOptions { - @Description("BigQuery dataset name") - @Default.String("beam_examples") - String getBigQueryDataset(); - void setBigQueryDataset(String dataset); - - @Description("BigQuery table name") - @Default.InstanceFactory(BigQueryTableFactory.class) - String getBigQueryTable(); - void setBigQueryTable(String table); - - @Description("BigQuery table schema") - TableSchema getBigQuerySchema(); - void setBigQuerySchema(TableSchema schema); - - /** - * Returns the job name as the default BigQuery table name. - */ - class BigQueryTableFactory implements DefaultValueFactory { - @Override - public String create(PipelineOptions options) { - return options.getJobName().replace('-', '_'); - } - } -} diff --git a/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/common/ExampleOptions.java b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/common/ExampleOptions.java deleted file mode 100644 index 90f935c3cec0f..0000000000000 --- a/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/common/ExampleOptions.java +++ /dev/null @@ -1,37 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package ${package}.common; - -import org.apache.beam.sdk.options.Default; -import org.apache.beam.sdk.options.Description; -import org.apache.beam.sdk.options.PipelineOptions; - -/** - * Options that can be used to configure the Beam examples. - */ -public interface ExampleOptions extends PipelineOptions { - @Description("Whether to keep jobs running after local process exit") - @Default.Boolean(false) - boolean getKeepJobsRunning(); - void setKeepJobsRunning(boolean keepJobsRunning); - - @Description("Number of workers to use when executing the injector pipeline") - @Default.Integer(1) - int getInjectorNumWorkers(); - void setInjectorNumWorkers(int numWorkers); -} diff --git a/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/common/ExamplePubsubTopicAndSubscriptionOptions.java b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/common/ExamplePubsubTopicAndSubscriptionOptions.java deleted file mode 100644 index daeb398f7fca5..0000000000000 --- a/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/common/ExamplePubsubTopicAndSubscriptionOptions.java +++ /dev/null @@ -1,45 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package ${package}.common; - -import org.apache.beam.sdk.options.Default; -import org.apache.beam.sdk.options.DefaultValueFactory; -import org.apache.beam.sdk.options.Description; -import org.apache.beam.sdk.options.GcpOptions; -import org.apache.beam.sdk.options.PipelineOptions; - -/** - * Options that can be used to configure Pub/Sub topic/subscription in Beam examples. - */ -public interface ExamplePubsubTopicAndSubscriptionOptions extends ExamplePubsubTopicOptions { - @Description("Pub/Sub subscription") - @Default.InstanceFactory(PubsubSubscriptionFactory.class) - String getPubsubSubscription(); - void setPubsubSubscription(String subscription); - - /** - * Returns a default Pub/Sub subscription based on the project and the job names. - */ - class PubsubSubscriptionFactory implements DefaultValueFactory { - @Override - public String create(PipelineOptions options) { - return "projects/" + options.as(GcpOptions.class).getProject() - + "/subscriptions/" + options.getJobName(); - } - } -} diff --git a/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/common/ExamplePubsubTopicOptions.java b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/common/ExamplePubsubTopicOptions.java deleted file mode 100644 index 936bff5675077..0000000000000 --- a/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/common/ExamplePubsubTopicOptions.java +++ /dev/null @@ -1,45 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package ${package}.common; - -import org.apache.beam.sdk.options.Default; -import org.apache.beam.sdk.options.DefaultValueFactory; -import org.apache.beam.sdk.options.Description; -import org.apache.beam.sdk.options.GcpOptions; -import org.apache.beam.sdk.options.PipelineOptions; - -/** - * Options that can be used to configure Pub/Sub topic in Beam examples. - */ -public interface ExamplePubsubTopicOptions extends GcpOptions { - @Description("Pub/Sub topic") - @Default.InstanceFactory(PubsubTopicFactory.class) - String getPubsubTopic(); - void setPubsubTopic(String topic); - - /** - * Returns a default Pub/Sub topic based on the project and the job names. - */ - class PubsubTopicFactory implements DefaultValueFactory { - @Override - public String create(PipelineOptions options) { - return "projects/" + options.as(GcpOptions.class).getProject() - + "/topics/" + options.getJobName(); - } - } -} diff --git a/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/common/ExampleUtils.java b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/common/ExampleUtils.java deleted file mode 100644 index 570b3827b7848..0000000000000 --- a/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/common/ExampleUtils.java +++ /dev/null @@ -1,352 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package ${package}.common; - -import com.google.api.client.googleapis.json.GoogleJsonResponseException; -import com.google.api.client.googleapis.services.AbstractGoogleClientRequest; -import com.google.api.client.util.BackOff; -import com.google.api.client.util.BackOffUtils; -import com.google.api.client.util.Sleeper; -import com.google.api.services.bigquery.Bigquery; -import com.google.api.services.bigquery.Bigquery.Datasets; -import com.google.api.services.bigquery.Bigquery.Tables; -import com.google.api.services.bigquery.model.Dataset; -import com.google.api.services.bigquery.model.DatasetReference; -import com.google.api.services.bigquery.model.Table; -import com.google.api.services.bigquery.model.TableReference; -import com.google.api.services.bigquery.model.TableSchema; -import com.google.api.services.pubsub.Pubsub; -import com.google.api.services.pubsub.model.Subscription; -import com.google.api.services.pubsub.model.Topic; -import com.google.common.collect.Lists; -import com.google.common.collect.Sets; -import com.google.common.util.concurrent.Uninterruptibles; -import java.io.IOException; -import java.util.Collection; -import java.util.List; -import java.util.Set; -import java.util.concurrent.TimeUnit; -import org.apache.beam.sdk.PipelineResult; -import org.apache.beam.sdk.options.BigQueryOptions; -import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.beam.sdk.options.PubsubOptions; -import org.apache.beam.sdk.util.FluentBackoff; -import org.apache.beam.sdk.util.Transport; -import org.joda.time.Duration; - -/** - * The utility class that sets up and tears down external resources, - * and cancels the streaming pipelines once the program terminates. - * - *

It is used to run Beam examples. - */ -public class ExampleUtils { - - private static final int SC_NOT_FOUND = 404; - - private final PipelineOptions options; - private Bigquery bigQueryClient = null; - private Pubsub pubsubClient = null; - private Set pipelinesToCancel = Sets.newHashSet(); - private List pendingMessages = Lists.newArrayList(); - - /** - * Do resources and runner options setup. - */ - public ExampleUtils(PipelineOptions options) { - this.options = options; - } - - /** - * Sets up external resources that are required by the example, - * such as Pub/Sub topics and BigQuery tables. - * - * @throws IOException if there is a problem setting up the resources - */ - public void setup() throws IOException { - Sleeper sleeper = Sleeper.DEFAULT; - BackOff backOff = - FluentBackoff.DEFAULT - .withMaxRetries(3).withInitialBackoff(Duration.millis(200)).backoff(); - Throwable lastException = null; - try { - do { - try { - setupPubsub(); - setupBigQueryTable(); - return; - } catch (GoogleJsonResponseException e) { - lastException = e; - } - } while (BackOffUtils.next(sleeper, backOff)); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - // Ignore InterruptedException - } - throw new RuntimeException(lastException); - } - - /** - * Sets up the Google Cloud Pub/Sub topic. - * - *

If the topic doesn't exist, a new topic with the given name will be created. - * - * @throws IOException if there is a problem setting up the Pub/Sub topic - */ - public void setupPubsub() throws IOException { - ExamplePubsubTopicAndSubscriptionOptions pubsubOptions = - options.as(ExamplePubsubTopicAndSubscriptionOptions.class); - if (!pubsubOptions.getPubsubTopic().isEmpty()) { - pendingMessages.add("**********************Set Up Pubsub************************"); - setupPubsubTopic(pubsubOptions.getPubsubTopic()); - pendingMessages.add("The Pub/Sub topic has been set up for this example: " - + pubsubOptions.getPubsubTopic()); - - if (!pubsubOptions.getPubsubSubscription().isEmpty()) { - setupPubsubSubscription( - pubsubOptions.getPubsubTopic(), pubsubOptions.getPubsubSubscription()); - pendingMessages.add("The Pub/Sub subscription has been set up for this example: " - + pubsubOptions.getPubsubSubscription()); - } - } - } - - /** - * Sets up the BigQuery table with the given schema. - * - *

If the table already exists, the schema has to match the given one. Otherwise, the example - * will throw a RuntimeException. If the table doesn't exist, a new table with the given schema - * will be created. - * - * @throws IOException if there is a problem setting up the BigQuery table - */ - public void setupBigQueryTable() throws IOException { - ExampleBigQueryTableOptions bigQueryTableOptions = - options.as(ExampleBigQueryTableOptions.class); - if (bigQueryTableOptions.getBigQueryDataset() != null - && bigQueryTableOptions.getBigQueryTable() != null - && bigQueryTableOptions.getBigQuerySchema() != null) { - pendingMessages.add("******************Set Up Big Query Table*******************"); - setupBigQueryTable(bigQueryTableOptions.getProject(), - bigQueryTableOptions.getBigQueryDataset(), - bigQueryTableOptions.getBigQueryTable(), - bigQueryTableOptions.getBigQuerySchema()); - pendingMessages.add("The BigQuery table has been set up for this example: " - + bigQueryTableOptions.getProject() - + ":" + bigQueryTableOptions.getBigQueryDataset() - + "." + bigQueryTableOptions.getBigQueryTable()); - } - } - - /** - * Tears down external resources that can be deleted upon the example's completion. - */ - private void tearDown() { - pendingMessages.add("*************************Tear Down*************************"); - ExamplePubsubTopicAndSubscriptionOptions pubsubOptions = - options.as(ExamplePubsubTopicAndSubscriptionOptions.class); - if (!pubsubOptions.getPubsubTopic().isEmpty()) { - try { - deletePubsubTopic(pubsubOptions.getPubsubTopic()); - pendingMessages.add("The Pub/Sub topic has been deleted: " - + pubsubOptions.getPubsubTopic()); - } catch (IOException e) { - pendingMessages.add("Failed to delete the Pub/Sub topic : " - + pubsubOptions.getPubsubTopic()); - } - if (!pubsubOptions.getPubsubSubscription().isEmpty()) { - try { - deletePubsubSubscription(pubsubOptions.getPubsubSubscription()); - pendingMessages.add("The Pub/Sub subscription has been deleted: " - + pubsubOptions.getPubsubSubscription()); - } catch (IOException e) { - pendingMessages.add("Failed to delete the Pub/Sub subscription : " - + pubsubOptions.getPubsubSubscription()); - } - } - } - - ExampleBigQueryTableOptions bigQueryTableOptions = - options.as(ExampleBigQueryTableOptions.class); - if (bigQueryTableOptions.getBigQueryDataset() != null - && bigQueryTableOptions.getBigQueryTable() != null - && bigQueryTableOptions.getBigQuerySchema() != null) { - pendingMessages.add("The BigQuery table might contain the example's output, " - + "and it is not deleted automatically: " - + bigQueryTableOptions.getProject() - + ":" + bigQueryTableOptions.getBigQueryDataset() - + "." + bigQueryTableOptions.getBigQueryTable()); - pendingMessages.add("Please go to the Developers Console to delete it manually." - + " Otherwise, you may be charged for its usage."); - } - } - - private void setupBigQueryTable(String projectId, String datasetId, String tableId, - TableSchema schema) throws IOException { - if (bigQueryClient == null) { - bigQueryClient = Transport.newBigQueryClient(options.as(BigQueryOptions.class)).build(); - } - - Datasets datasetService = bigQueryClient.datasets(); - if (executeNullIfNotFound(datasetService.get(projectId, datasetId)) == null) { - Dataset newDataset = new Dataset().setDatasetReference( - new DatasetReference().setProjectId(projectId).setDatasetId(datasetId)); - datasetService.insert(projectId, newDataset).execute(); - } - - Tables tableService = bigQueryClient.tables(); - Table table = executeNullIfNotFound(tableService.get(projectId, datasetId, tableId)); - if (table == null) { - Table newTable = new Table().setSchema(schema).setTableReference( - new TableReference().setProjectId(projectId).setDatasetId(datasetId).setTableId(tableId)); - tableService.insert(projectId, datasetId, newTable).execute(); - } else if (!table.getSchema().equals(schema)) { - throw new RuntimeException( - "Table exists and schemas do not match, expecting: " + schema.toPrettyString() - + ", actual: " + table.getSchema().toPrettyString()); - } - } - - private void setupPubsubTopic(String topic) throws IOException { - if (pubsubClient == null) { - pubsubClient = Transport.newPubsubClient(options.as(PubsubOptions.class)).build(); - } - if (executeNullIfNotFound(pubsubClient.projects().topics().get(topic)) == null) { - pubsubClient.projects().topics().create(topic, new Topic().setName(topic)).execute(); - } - } - - private void setupPubsubSubscription(String topic, String subscription) throws IOException { - if (pubsubClient == null) { - pubsubClient = Transport.newPubsubClient(options.as(PubsubOptions.class)).build(); - } - if (executeNullIfNotFound(pubsubClient.projects().subscriptions().get(subscription)) == null) { - Subscription subInfo = new Subscription() - .setAckDeadlineSeconds(60) - .setTopic(topic); - pubsubClient.projects().subscriptions().create(subscription, subInfo).execute(); - } - } - - /** - * Deletes the Google Cloud Pub/Sub topic. - * - * @throws IOException if there is a problem deleting the Pub/Sub topic - */ - private void deletePubsubTopic(String topic) throws IOException { - if (pubsubClient == null) { - pubsubClient = Transport.newPubsubClient(options.as(PubsubOptions.class)).build(); - } - if (executeNullIfNotFound(pubsubClient.projects().topics().get(topic)) != null) { - pubsubClient.projects().topics().delete(topic).execute(); - } - } - - /** - * Deletes the Google Cloud Pub/Sub subscription. - * - * @throws IOException if there is a problem deleting the Pub/Sub subscription - */ - private void deletePubsubSubscription(String subscription) throws IOException { - if (pubsubClient == null) { - pubsubClient = Transport.newPubsubClient(options.as(PubsubOptions.class)).build(); - } - if (executeNullIfNotFound(pubsubClient.projects().subscriptions().get(subscription)) != null) { - pubsubClient.projects().subscriptions().delete(subscription).execute(); - } - } - - /** - * Waits for the pipeline to finish and cancels it before the program exists. - */ - public void waitToFinish(PipelineResult result) { - pipelinesToCancel.add(result); - if (!options.as(ExampleOptions.class).getKeepJobsRunning()) { - addShutdownHook(pipelinesToCancel); - } - try { - result.waitUntilFinish(); - } catch (UnsupportedOperationException e) { - // Do nothing if the given PipelineResult doesn't support waitUntilFinish(), - // such as EvaluationResults returned by DirectRunner. - tearDown(); - printPendingMessages(); - } catch (Exception e) { - throw new RuntimeException("Failed to wait the pipeline until finish: " + result); - } - } - - private void addShutdownHook(final Collection pipelineResults) { - Runtime.getRuntime().addShutdownHook(new Thread() { - @Override - public void run() { - tearDown(); - printPendingMessages(); - for (PipelineResult pipelineResult : pipelineResults) { - try { - pipelineResult.cancel(); - } catch (IOException e) { - System.out.println("Failed to cancel the job."); - System.out.println(e.getMessage()); - } - } - - for (PipelineResult pipelineResult : pipelineResults) { - boolean cancellationVerified = false; - for (int retryAttempts = 6; retryAttempts > 0; retryAttempts--) { - if (pipelineResult.getState().isTerminal()) { - cancellationVerified = true; - break; - } else { - System.out.println( - "The example pipeline is still running. Verifying the cancellation."); - } - Uninterruptibles.sleepUninterruptibly(10, TimeUnit.SECONDS); - } - if (!cancellationVerified) { - System.out.println("Failed to verify the cancellation for job: " + pipelineResult); - } - } - } - }); - } - - private void printPendingMessages() { - System.out.println(); - System.out.println("***********************************************************"); - System.out.println("***********************************************************"); - for (String message : pendingMessages) { - System.out.println(message); - } - System.out.println("***********************************************************"); - System.out.println("***********************************************************"); - } - - private static T executeNullIfNotFound( - AbstractGoogleClientRequest request) throws IOException { - try { - return request.execute(); - } catch (GoogleJsonResponseException e) { - if (e.getStatusCode() == SC_NOT_FOUND) { - return null; - } else { - throw e; - } - } - } -} diff --git a/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/test/java/DebuggingWordCountTest.java b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/test/java/DebuggingWordCountTest.java deleted file mode 100644 index 155242d9969ff..0000000000000 --- a/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/test/java/DebuggingWordCountTest.java +++ /dev/null @@ -1,52 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package ${package}; - -import com.google.common.io.Files; -import java.io.File; -import java.nio.charset.StandardCharsets; -import ${package}.DebuggingWordCount.WordCountOptions; -import org.apache.beam.sdk.testing.TestPipeline; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.TemporaryFolder; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; - -/** - * Tests for {@link DebuggingWordCount}. - */ -@RunWith(JUnit4.class) -public class DebuggingWordCountTest { - @Rule public TemporaryFolder tmpFolder = new TemporaryFolder(); - - @Test - public void testDebuggingWordCount() throws Exception { - File inputFile = tmpFolder.newFile(); - File outputFile = tmpFolder.newFile(); - Files.write( - "stomach secret Flourish message Flourish here Flourish", - inputFile, - StandardCharsets.UTF_8); - WordCountOptions options = - TestPipeline.testingPipelineOptions().as(WordCountOptions.class); - options.setInputFile(inputFile.getAbsolutePath()); - options.setOutput(outputFile.getAbsolutePath()); - DebuggingWordCount.main(TestPipeline.convertToArgs(options)); - } -} diff --git a/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/test/java/WordCountTest.java b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/test/java/WordCountTest.java deleted file mode 100644 index e86c2aac969cc..0000000000000 --- a/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/test/java/WordCountTest.java +++ /dev/null @@ -1,85 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package ${package}; - -import java.util.Arrays; -import java.util.List; -import ${package}.WordCount.CountWords; -import ${package}.WordCount.ExtractWordsFn; -import ${package}.WordCount.FormatAsTextFn; -import org.apache.beam.sdk.Pipeline; -import org.apache.beam.sdk.coders.StringUtf8Coder; -import org.apache.beam.sdk.testing.PAssert; -import org.apache.beam.sdk.testing.RunnableOnService; -import org.apache.beam.sdk.testing.TestPipeline; -import org.apache.beam.sdk.transforms.Create; -import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.DoFnTester; -import org.apache.beam.sdk.transforms.MapElements; -import org.apache.beam.sdk.values.PCollection; -import org.hamcrest.CoreMatchers; -import org.junit.Assert; -import org.junit.Test; -import org.junit.experimental.categories.Category; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; - -/** - * Tests of WordCount. - */ -@RunWith(JUnit4.class) -public class WordCountTest { - - /** Example test that tests a specific {@link DoFn}. */ - @Test - public void testExtractWordsFn() throws Exception { - DoFnTester extractWordsFn = - DoFnTester.of(new ExtractWordsFn()); - - Assert.assertThat(extractWordsFn.processBundle(" some input words "), - CoreMatchers.hasItems("some", "input", "words")); - Assert.assertThat(extractWordsFn.processBundle(" "), - CoreMatchers.hasItems()); - Assert.assertThat(extractWordsFn.processBundle(" some ", " input", " words"), - CoreMatchers.hasItems("some", "input", "words")); - } - - static final String[] WORDS_ARRAY = new String[] { - "hi there", "hi", "hi sue bob", - "hi sue", "", "bob hi"}; - - static final List WORDS = Arrays.asList(WORDS_ARRAY); - - static final String[] COUNTS_ARRAY = new String[] { - "hi: 5", "there: 1", "sue: 2", "bob: 2"}; - - /** Example test that tests a PTransform by using an in-memory input and inspecting the output. */ - @Test - @Category(RunnableOnService.class) - public void testCountWords() throws Exception { - Pipeline p = TestPipeline.create(); - - PCollection input = p.apply(Create.of(WORDS).withCoder(StringUtf8Coder.of())); - - PCollection output = input.apply(new CountWords()) - .apply(MapElements.via(new FormatAsTextFn())); - - PAssert.that(output).containsInAnyOrder(COUNTS_ARRAY); - p.run().waitUntilFinish(); - } -}