Skip to content

Commit

Permalink
This closes #1536
Browse files Browse the repository at this point in the history
  • Loading branch information
kennknowles committed Dec 12, 2016
2 parents 59f1fb2 + 42595dc commit d9657ff
Show file tree
Hide file tree
Showing 10 changed files with 909 additions and 283 deletions.
Expand Up @@ -17,26 +17,25 @@
*/
package org.apache.beam.examples;

import com.google.api.services.bigquery.model.TableFieldSchema;
import com.google.api.services.bigquery.model.TableReference;
import com.google.api.services.bigquery.model.TableRow;
import com.google.api.services.bigquery.model.TableSchema;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ThreadLocalRandom;
import org.apache.beam.examples.common.ExampleBigQueryTableOptions;
import org.apache.beam.examples.common.ExampleOptions;
import org.apache.beam.examples.common.ExampleUtils;
import org.apache.beam.examples.common.WriteWindowedFilesDoFn;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.DefaultValueFactory;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.GroupByKey;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
Expand All @@ -63,7 +62,8 @@
* 2. Adding timestamps to data
* 3. Windowing
* 4. Re-using PTransforms over windowed PCollections
* 5. Writing to BigQuery
* 5. Accessing the window of an element
* 6. Writing data to per-window text files
* </pre>
*
* <p>By default, the examples will run with the {@code DirectRunner}.
Expand All @@ -74,25 +74,23 @@
* </pre>
* See examples/java/README.md for instructions about how to configure different runners.
*
* <p>Optionally specify the input file path via:
* {@code --inputFile=gs://INPUT_PATH},
* which defaults to {@code gs://apache-beam-samples/shakespeare/kinglear.txt}.
* <p>To execute this pipeline locally, specify a local output file (if using the
* {@code DirectRunner}) or output prefix on a supported distributed file system.
* <pre>{@code
* --output=[YOUR_LOCAL_FILE | YOUR_OUTPUT_PREFIX]
* }</pre>
*
* <p>Specify an output BigQuery dataset and optionally, a table for the output. If you don't
* specify the table, one will be created for you using the job name. If you don't specify the
* dataset, a dataset called {@code beam_examples} must already exist in your project.
* {@code --bigQueryDataset=YOUR-DATASET --bigQueryTable=YOUR-NEW-TABLE-NAME}.
* <p>The input file defaults to a public data set containing the text of of King Lear,
* by William Shakespeare. You can override it and choose your own input with {@code --inputFile}.
*
* <p>By default, the pipeline will do fixed windowing, on 1-minute windows. You can
* change this interval by setting the {@code --windowSize} parameter, e.g. {@code --windowSize=10}
* for 10-minute windows.
*
* <p>The example will try to cancel the pipelines on the signal to terminate the process (CTRL-C)
* and then exits.
* <p>The example will try to cancel the pipeline on the signal to terminate the process (CTRL-C).
*/
public class WindowedWordCount {
static final int WINDOW_SIZE = 1; // Default window duration in minutes

static final int WINDOW_SIZE = 10; // Default window duration in minutes
/**
* Concept #2: A DoFn that sets the data element timestamp. This is a silly method, just for
* this example, for the bounded data case.
Expand All @@ -102,84 +100,77 @@ public class WindowedWordCount {
* 2-hour period.
*/
static class AddTimestampFn extends DoFn<String, String> {
private static final Duration RAND_RANGE = Duration.standardHours(2);
private static final Duration RAND_RANGE = Duration.standardHours(1);
private final Instant minTimestamp;
private final Instant maxTimestamp;

AddTimestampFn() {
this.minTimestamp = new Instant(System.currentTimeMillis());
AddTimestampFn(Instant minTimestamp, Instant maxTimestamp) {
this.minTimestamp = minTimestamp;
this.maxTimestamp = maxTimestamp;
}

@ProcessElement
public void processElement(ProcessContext c) {
// Generate a timestamp that falls somewhere in the past two hours.
long randMillis = (long) (Math.random() * RAND_RANGE.getMillis());
Instant randomTimestamp = minTimestamp.plus(randMillis);
Instant randomTimestamp =
new Instant(
ThreadLocalRandom.current()
.nextLong(minTimestamp.getMillis(), maxTimestamp.getMillis()));

/**
* Concept #2: Set the data element with that timestamp.
*/
c.outputWithTimestamp(c.element(), new Instant(randomTimestamp));
}
}

/** A DoFn that converts a Word and Count into a BigQuery table row. */
static class FormatAsTableRowFn extends DoFn<KV<String, Long>, TableRow> {
@ProcessElement
public void processElement(ProcessContext c) {
TableRow row = new TableRow()
.set("word", c.element().getKey())
.set("count", c.element().getValue())
// include a field for the window timestamp
.set("window_timestamp", c.timestamp().toString());
c.output(row);
/** A {@link DefaultValueFactory} that returns the current system time. */
public static class DefaultToCurrentSystemTime implements DefaultValueFactory<Long> {
@Override
public Long create(PipelineOptions options) {
return System.currentTimeMillis();
}
}

/**
* Helper method that defines the BigQuery schema used for the output.
*/
private static TableSchema getSchema() {
List<TableFieldSchema> fields = new ArrayList<>();
fields.add(new TableFieldSchema().setName("word").setType("STRING"));
fields.add(new TableFieldSchema().setName("count").setType("INTEGER"));
fields.add(new TableFieldSchema().setName("window_timestamp").setType("TIMESTAMP"));
TableSchema schema = new TableSchema().setFields(fields);
return schema;
}

/**
* Concept #5: We'll stream the results to a BigQuery table. The BigQuery output source is one
* that supports both bounded and unbounded data. This is a helper method that creates a
* TableReference from input options, to tell the pipeline where to write its BigQuery results.
*/
private static TableReference getTableReference(Options options) {
TableReference tableRef = new TableReference();
tableRef.setProjectId(options.getProject());
tableRef.setDatasetId(options.getBigQueryDataset());
tableRef.setTableId(options.getBigQueryTable());
return tableRef;
/** A {@link DefaultValueFactory} that returns the minimum timestamp plus one hour. */
public static class DefaultToMinTimestampPlusOneHour implements DefaultValueFactory<Long> {
@Override
public Long create(PipelineOptions options) {
return options.as(Options.class).getMinTimestampMillis()
+ Duration.standardHours(1).getMillis();
}
}

/**
* Options supported by {@link WindowedWordCount}.
* Options for {@link WindowedWordCount}.
*
* <p>Inherits standard example configuration options, which allow specification of the BigQuery
* table, as well as the {@link WordCount.WordCountOptions} support for
* specification of the input file.
* <p>Inherits standard example configuration options, which allow specification of the
* runner, as well as the {@link WordCount.WordCountOptions} support for
* specification of the input and output files.
*/
public interface Options extends WordCount.WordCountOptions,
ExampleOptions, ExampleBigQueryTableOptions {
@Description("Fixed window duration, in minutes")
@Default.Integer(WINDOW_SIZE)
Integer getWindowSize();
void setWindowSize(Integer value);

@Description("Minimum randomly assigned timestamp, in milliseconds-since-epoch")
@Default.InstanceFactory(DefaultToCurrentSystemTime.class)
Long getMinTimestampMillis();
void setMinTimestampMillis(Long value);

@Description("Maximum randomly assigned timestamp, in milliseconds-since-epoch")
@Default.InstanceFactory(DefaultToMinTimestampPlusOneHour.class)
Long getMaxTimestampMillis();
void setMaxTimestampMillis(Long value);
}

public static void main(String[] args) throws IOException {
Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
options.setBigQuerySchema(getSchema());
// ExampleUtils creates the necessary input sources to simplify execution of this Pipeline.
ExampleUtils exampleUtils = new ExampleUtils(options);
exampleUtils.setup();
final String output = options.getOutput();
final Duration windowSize = Duration.standardMinutes(options.getWindowSize());
final Instant minTimestamp = new Instant(options.getMinTimestampMillis());
final Instant maxTimestamp = new Instant(options.getMaxTimestampMillis());

Pipeline pipeline = Pipeline.create(options);

Expand All @@ -192,17 +183,18 @@ public static void main(String[] args) throws IOException {
.apply(TextIO.Read.from(options.getInputFile()))
// Concept #2: Add an element timestamp, using an artificial time just to show windowing.
// See AddTimestampFn for more detail on this.
.apply(ParDo.of(new AddTimestampFn()));
.apply(ParDo.of(new AddTimestampFn(minTimestamp, maxTimestamp)));

/**
* Concept #3: Window into fixed windows. The fixed window size for this example defaults to 1
* minute (you can change this with a command-line option). See the documentation for more
* information on how fixed windows work, and for information on the other types of windowing
* available (e.g., sliding windows).
*/
PCollection<String> windowedWords = input
.apply(Window.<String>into(
FixedWindows.of(Duration.standardMinutes(options.getWindowSize()))));
PCollection<String> windowedWords =
input.apply(
Window.<String>into(
FixedWindows.of(Duration.standardMinutes(options.getWindowSize()))));

/**
* Concept #4: Re-use our existing CountWords transform that does not have knowledge of
Expand All @@ -211,19 +203,40 @@ public static void main(String[] args) throws IOException {
PCollection<KV<String, Long>> wordCounts = windowedWords.apply(new WordCount.CountWords());

/**
* Concept #5: Format the results for a BigQuery table, then write to BigQuery.
* The BigQuery output source supports both bounded and unbounded data.
* Concept #5: Customize the output format using windowing information
*
* <p>At this point, the data is organized by window. We're writing text files and and have no
* late data, so for simplicity we can use the window as the key and {@link GroupByKey} to get
* one output file per window. (if we had late data this key would not be unique)
*
* <p>To access the window in a {@link DoFn}, add a {@link BoundedWindow} parameter. This will
* be automatically detected and populated with the window for the current element.
*/
wordCounts.apply(ParDo.of(new FormatAsTableRowFn()))
.apply(BigQueryIO.Write
.to(getTableReference(options))
.withSchema(getSchema())
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND));
PCollection<KV<IntervalWindow, KV<String, Long>>> keyedByWindow =
wordCounts.apply(
ParDo.of(
new DoFn<KV<String, Long>, KV<IntervalWindow, KV<String, Long>>>() {
@ProcessElement
public void processElement(ProcessContext context, IntervalWindow window) {
context.output(KV.of(window, context.element()));
}
}));

PipelineResult result = pipeline.run();
/**
* Concept #6: Format the results and write to a sharded file partitioned by window, using a
* simple ParDo operation. Because there may be failures followed by retries, the
* writes must be idempotent, but the details of writing to files is elided here.
*/
keyedByWindow
.apply(GroupByKey.<IntervalWindow, KV<String, Long>>create())
.apply(ParDo.of(new WriteWindowedFilesDoFn(output)));

// ExampleUtils will try to cancel the pipeline before the program exists.
exampleUtils.waitToFinish(result);
PipelineResult result = pipeline.run();
try {
result.waitUntilFinish();
} catch (Exception exc) {
result.cancel();
}
}

}
@@ -0,0 +1,77 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.examples.common;

import com.google.common.annotations.VisibleForTesting;
import java.io.OutputStream;
import java.nio.channels.Channels;
import java.nio.charset.StandardCharsets;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
import org.apache.beam.sdk.util.IOChannelFactory;
import org.apache.beam.sdk.util.IOChannelUtils;
import org.apache.beam.sdk.values.KV;
import org.joda.time.format.DateTimeFormatter;
import org.joda.time.format.ISODateTimeFormat;

/**
* A {@link DoFn} that writes elements to files with names deterministically derived from the lower
* and upper bounds of their key (an {@link IntervalWindow}).
*
* <p>This is test utility code, not for end-users, so examples can be focused
* on their primary lessons.
*/
public class WriteWindowedFilesDoFn
extends DoFn<KV<IntervalWindow, Iterable<KV<String, Long>>>, Void> {

static final byte[] NEWLINE = "\n".getBytes(StandardCharsets.UTF_8);
static final Coder<String> STRING_CODER = StringUtf8Coder.of();

private static DateTimeFormatter formatter = ISODateTimeFormat.hourMinute();

private final String output;

public WriteWindowedFilesDoFn(String output) {
this.output = output;
}

@VisibleForTesting
public static String fileForWindow(String output, IntervalWindow window) {
return String.format(
"%s-%s-%s", output, formatter.print(window.start()), formatter.print(window.end()));
}

@ProcessElement
public void processElement(ProcessContext context) throws Exception {
// Build a file name from the window
IntervalWindow window = context.element().getKey();
String outputShard = fileForWindow(output, window);

// Open the file and write all the values
IOChannelFactory factory = IOChannelUtils.getFactory(outputShard);
OutputStream out = Channels.newOutputStream(factory.create(outputShard, "text/plain"));
for (KV<String, Long> wordCount : context.element().getValue()) {
STRING_CODER.encode(
wordCount.getKey() + ": " + wordCount.getValue(), out, Coder.Context.OUTER);
out.write(NEWLINE);
}
out.close();
}
}

0 comments on commit d9657ff

Please sign in to comment.