Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
Revert accidental clobber of MinimalWordCount.java
  • Loading branch information
kennknowles committed Jun 30, 2018
1 parent d76402c commit adab3a3
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 27 deletions.
1 change: 0 additions & 1 deletion examples/java/build.gradle
Expand Up @@ -61,7 +61,6 @@ dependencies {
shadow library.java.joda_time
shadow library.java.slf4j_api
shadow project(path: ":beam-runners-direct-java", configuration: "shadow")
shadow project(path: ":beam-runners-google-cloud-dataflow-java", configuration: "shadow")
shadow library.java.slf4j_jdk14
shadowTest project(path: ":beam-sdks-java-io-google-cloud-platform", configuration: "shadowTest")
shadowTest library.java.hamcrest_core
Expand Down
Expand Up @@ -17,16 +17,17 @@
*/
package org.apache.beam.examples;

import org.apache.beam.runners.dataflow.DataflowRunner;
import java.util.Arrays;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.range.OffsetRange;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.StreamingOptions;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.splittabledofn.OffsetRangeTracker;
import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.Filter;
import org.apache.beam.sdk.transforms.FlatMapElements;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.TypeDescriptors;

/**
* An example that counts words in Shakespeare.
Expand Down Expand Up @@ -57,28 +58,63 @@
public class MinimalWordCount {

public static void main(String[] args) {

// Create a PipelineOptions object. This object lets us set various execution
// options for our pipeline, such as the runner you wish to use. This example
// will run with the DirectRunner by default, based on the class path configured
// in its dependencies.
PipelineOptions options = PipelineOptionsFactory.create();
options.setRunner(DataflowRunner.class);
options.as(StreamingOptions.class).setStreaming(true);

// In order to run your pipeline, you need to make following runner specific changes:
//
// CHANGE 1/3: Select a Beam runner, such as BlockingDataflowRunner
// or FlinkRunner.
// CHANGE 2/3: Specify runner-required options.
// For BlockingDataflowRunner, set project and temp location as follows:
// DataflowPipelineOptions dataflowOptions = options.as(DataflowPipelineOptions.class);
// dataflowOptions.setRunner(BlockingDataflowRunner.class);
// dataflowOptions.setProject("SET_YOUR_PROJECT_ID_HERE");
// dataflowOptions.setTempLocation("gs://SET_YOUR_BUCKET_NAME_HERE/AND_TEMP_DIRECTORY");
// For FlinkRunner, set the runner as follows. See {@code FlinkPipelineOptions}
// for more details.
// options.as(FlinkPipelineOptions.class)
// .setRunner(FlinkRunner.class);

// Create the Pipeline object with the options we defined above
Pipeline p = Pipeline.create(options);
p.apply(Create.of("foo"))
.apply(
ParDo.of(
new DoFn<String, String>() {
@ProcessElement
public void process(@Element String element, OffsetRangeTracker tracker) {
for (long i = tracker.currentRestriction().getFrom();
tracker.tryClaim(i);
++i) {
// do nothing
}
}

@GetInitialRestriction
public OffsetRange getInitialRestriction(String element) {
return new OffsetRange(0, 10);
}
}));
// Concept #1: Apply a root transform to the pipeline; in this case, TextIO.Read to read a set
// of input text files. TextIO.Read returns a PCollection where each element is one line from
// the input text (a set of Shakespeare's texts).

// This example reads a public data set consisting of the complete works of Shakespeare.
p.apply(TextIO.read().from("gs://apache-beam-samples/shakespeare/*"))

// Concept #2: Apply a FlatMapElements transform the PCollection of text lines.
// This transform splits the lines in PCollection<String>, where each element is an
// individual word in Shakespeare's collected texts.
.apply(
FlatMapElements.into(TypeDescriptors.strings())
.via((String word) -> Arrays.asList(word.split("[^\\p{L}]+"))))
// We use a Filter transform to avoid empty word
.apply(Filter.by((String word) -> !word.isEmpty()))
// Concept #3: Apply the Count transform to our PCollection of individual words. The Count
// transform returns a new PCollection of key/value pairs, where each key represents a
// unique word in the text. The associated value is the occurrence count for that word.
.apply(Count.perElement())
// Apply a MapElements transform that formats our PCollection of word counts into a
// printable string, suitable for writing to an output file.
.apply(
MapElements.into(TypeDescriptors.strings())
.via(
(KV<String, Long> wordCount) ->
wordCount.getKey() + ": " + wordCount.getValue()))
// Concept #4: Apply a write transform, TextIO.Write, at the end of the pipeline.
// TextIO.Write writes the contents of a PCollection (in this case, our PCollection of
// formatted strings) to a series of text files.
//
// By default, it will write to a set of files with names like wordcounts-00001-of-00005
.apply(TextIO.write().to("wordcounts"));

p.run().waitUntilFinish();
}
Expand Down

0 comments on commit adab3a3

Please sign in to comment.