From c6d567d08cc52063910b5c8facb1ab27fc7ee60a Mon Sep 17 00:00:00 2001 From: Ahmet Altay Date: Mon, 8 May 2017 13:42:02 -0700 Subject: [PATCH 1/3] Remove apex README.md. This information is already in the website in quickstart and apex runner pages. https://github.com/apache/beam-site/pull/232 moves small bits of missing content. --- runners/apex/README.md | 76 ------------------------------------------ 1 file changed, 76 deletions(-) delete mode 100644 runners/apex/README.md diff --git a/runners/apex/README.md b/runners/apex/README.md deleted file mode 100644 index b9bc74f75d9b..000000000000 --- a/runners/apex/README.md +++ /dev/null @@ -1,76 +0,0 @@ - - -Apex Beam Runner ﴾Apex‐Runner﴿ -============================= - -Apex‐Runner is a Runner for Apache Beam which executes Beam pipelines with Apache Apex as underlying engine. The runner has broad support for the Beam model and supports streaming and batch pipelines. - -[Apache Apex](http://apex.apache.org/) is a stream processing platform and framework for low-latency, high-throughput and fault-tolerant analytics applications on Apache Hadoop. Apex is Java based and also provides its own API for application development (native compositional and declarative Java API, SQL) with a comprehensive [operator library](https://github.com/apache/apex-malhar). Apex has a unified streaming architecture and can be used for real-time and batch processing. With its stateful stream processing architecture Apex can support all of the concepts in the Beam model (event time, triggers, watermarks etc.). - -##Status - -Apex-Runner is relatively new. It is fully functional and can currently be used to run pipelines in embedded mode. It does not take advantage of all the performance and scalability that Apex can deliver. This is expected to be addressed with upcoming work, leveraging features like incremental checkpointing, partitioning and operator affinity from Apex. Please see [JIRA](https://issues.apache.org/jira/issues/?jql=project%20%3D%20BEAM%20AND%20component%20%3D%20runner-apex%20AND%20resolution%20%3D%20Unresolved) and we welcome contributions! - -##Getting Started - -The following shows how to run the WordCount example that is provided with the source code on Apex (the example is identical with the one provided as part of the Beam examples). - -###Installing Beam - -To get the latest version of Beam with Apex-Runner, first clone the Beam repository: - -``` -git clone https://github.com/apache/beam -``` - -Then switch to the newly created directory and run Maven to build the Apache Beam: - -``` -cd beam -mvn clean install -DskipTests -``` - -Now Apache Beam and the Apex Runner are installed in your local Maven repository. - -###Running an Example - -Download something to count: - -``` -curl http://www.gutenberg.org/cache/epub/1128/pg1128.txt > /tmp/kinglear.txt -``` - -Run the pipeline, using the Apex runner: - -``` -cd examples/java -mvn exec:java -Dexec.mainClass=org.apache.beam.examples.WordCount -Dexec.args="--inputFile=/tmp/kinglear.txt --output=/tmp/wordcounts.txt --runner=ApexRunner" -Papex-runner -``` - -Once completed, there will be multiple output files with the base name given above: - -``` -$ ls /tmp/out-* -/tmp/out-00000-of-00003 /tmp/out-00001-of-00003 /tmp/out-00002-of-00003 -``` - -##Running pipelines on an Apex YARN cluster - -Coming soon. From b95fcbf6ab2f448d563ff223789c603bb101da01 Mon Sep 17 00:00:00 2001 From: Ahmet Altay Date: Mon, 8 May 2017 15:00:33 -0700 Subject: [PATCH 2/3] Remove readme.md files that are covered in web site --- runners/spark/README.md | 124 ------------------- sdks/java/javadoc/README.md | 23 ---- sdks/python/apache_beam/tests/data/README.md | 20 --- 3 files changed, 167 deletions(-) delete mode 100644 runners/spark/README.md delete mode 100644 sdks/java/javadoc/README.md delete mode 100644 sdks/python/apache_beam/tests/data/README.md diff --git a/runners/spark/README.md b/runners/spark/README.md deleted file mode 100644 index 15f70819f6ce..000000000000 --- a/runners/spark/README.md +++ /dev/null @@ -1,124 +0,0 @@ - - -Spark Beam Runner (Spark-Runner) -================================ - -## Intro - -The Spark-Runner allows users to execute data pipelines written against the Apache Beam API -with Apache Spark. This runner allows to execute both batch and streaming pipelines on top of the Spark engine. - -## Overview - -### Features - -- ParDo -- GroupByKey -- Combine -- Windowing -- Flatten -- View -- Side inputs/outputs -- Encoding - -### Fault-Tolerance - -The Spark runner fault-tolerance guarantees the same guarantees as [Apache Spark](http://spark.apache.org/). - -### Monitoring - -The Spark runner supports user-defined counters via Beam Aggregators implemented on top of Spark's [Accumulators](http://spark.apache.org/docs/1.6.3/programming-guide.html#accumulators). -The Aggregators (defined by the pipeline author) and Spark's internal metrics are reported using Spark's [metrics system](http://spark.apache.org/docs/1.6.3/monitoring.html#metrics). -Spark also provides a web UI for monitoring, more details [here](http://spark.apache.org/docs/1.6.3/monitoring.html). - -## Beam Model support - -### Batch - -The Spark runner provides full support for the Beam Model in batch processing via Spark [RDD](http://spark.apache.org/docs/1.6.3/programming-guide.html#resilient-distributed-datasets-rdds)s. - -### Streaming - -Providing full support for the Beam Model in streaming pipelines is under development. To follow-up you can subscribe to our [mailing list](http://beam.apache.org/get-started/support/). - -### issue tracking - -See [Beam JIRA](https://issues.apache.org/jira/browse/BEAM) (runner-spark) - - -## Getting Started - -To get the latest version of the Spark Runner, first clone the Beam repository: - - git clone https://github.com/apache/beam - - -Then switch to the newly created directory and run Maven to build the Apache Beam: - - cd beam - mvn clean install -DskipTests - -Now Apache Beam and the Spark Runner are installed in your local maven repository. - -If we wanted to run a Beam pipeline with the default options of a Spark instance in local mode, -we would do the following: - - Pipeline p = - PipelineResult result = p.run(); - result.waitUntilFinish(); - -To create a pipeline runner to run against a different Spark cluster, with a custom master url we -would do the following: - - SparkPipelineOptions options = PipelineOptionsFactory.as(SparkPipelineOptions.class); - options.setSparkMaster("spark://host:port"); - Pipeline p = - PipelineResult result = p.run(); - result.waitUntilFinish(); - -## Word Count Example - -First download a text document to use as input: - - curl http://www.gutenberg.org/cache/epub/1128/pg1128.txt > /tmp/kinglear.txt - -Switch to the Spark runner directory: - - cd runners/spark - -Then run the [word count example][wc] from the SDK using a Spark instance in local mode: - - mvn exec:exec -DmainClass=org.apache.beam.runners.spark.examples.WordCount \ - -Dinput=/tmp/kinglear.txt -Doutput=/tmp/out -Drunner=SparkRunner \ - -DsparkMaster=local - -Check the output by running: - - head /tmp/out-00000-of-00001 - -__Note: running examples using `mvn exec:exec` only works for Spark local mode at the -moment. See the next section for how to run on a cluster.__ - -[wc]: https://github.com/apache/beam/blob/master/runners/spark/src/main/java/org/apache/beam/runners/spark/examples/WordCount.java -## Running on a Cluster - -Spark Beam pipelines can be run on a cluster using the `spark-submit` command. - -TBD pending native HDFS support (currently blocked by [BEAM-59](https://issues.apache.org/jira/browse/BEAM-59)). diff --git a/sdks/java/javadoc/README.md b/sdks/java/javadoc/README.md deleted file mode 100644 index bb17c3f331f3..000000000000 --- a/sdks/java/javadoc/README.md +++ /dev/null @@ -1,23 +0,0 @@ - - -# SDK Javadoc - -This directory contains package-info files for external javadoc we would like -our javadoc to link to using `-linkoffline`. diff --git a/sdks/python/apache_beam/tests/data/README.md b/sdks/python/apache_beam/tests/data/README.md deleted file mode 100644 index 58563423ac40..000000000000 --- a/sdks/python/apache_beam/tests/data/README.md +++ /dev/null @@ -1,20 +0,0 @@ - -# Generating Test Data Files -* [privatekey.p12](https://github.com/google/oauth2client/blob/master/tests/data/privatekey.p12) From 5c29f132ed4c4e7f485fd34255912d71f60c1862 Mon Sep 17 00:00:00 2001 From: Ahmet Altay Date: Mon, 8 May 2017 15:26:50 -0700 Subject: [PATCH 3/3] Remove sdk & example README.md that are covered in web site --- .gitignore | 1 + examples/java/README.md | 64 +--- .../beam/examples/complete/game/README.md | 131 -------- pom.xml | 2 + sdks/python/README.md | 298 ------------------ .../examples/complete/game/README.md | 69 ---- 6 files changed, 5 insertions(+), 560 deletions(-) delete mode 100644 examples/java8/src/main/java/org/apache/beam/examples/complete/game/README.md delete mode 100644 sdks/python/README.md delete mode 100644 sdks/python/apache_beam/examples/complete/game/README.md diff --git a/.gitignore b/.gitignore index 9cfae09cb4a1..1ecb993777b0 100644 --- a/.gitignore +++ b/.gitignore @@ -24,6 +24,7 @@ sdks/python/**/*.so sdks/python/**/*.egg sdks/python/LICENSE sdks/python/NOTICE +sdks/python/README.md # Ignore IntelliJ files. .idea/ diff --git a/examples/java/README.md b/examples/java/README.md index d891fb87842d..75b70dde8528 100644 --- a/examples/java/README.md +++ b/examples/java/README.md @@ -40,69 +40,9 @@ demonstrates some best practices for instrumenting your pipeline code. 1. [`WindowedWordCount`](https://github.com/apache/beam/blob/master/examples/java/src/main/java/org/apache/beam/examples/WindowedWordCount.java) shows how to run the same pipeline over either unbounded PCollections in streaming mode or bounded PCollections in batch mode. -## Building and Running +## Running Examples -Change directory into `examples/java` and run the examples: - - mvn compile exec:java \ - -Dexec.mainClass=
\ - -Dexec.args="" - -Alternatively, you may choose to bundle all dependencies into a single JAR and -execute it outside of the Maven environment. - -### Direct Runner - -You can execute the `WordCount` pipeline on your local machine as follows: - - mvn compile exec:java \ - -Dexec.mainClass=org.apache.beam.examples.WordCount \ - -Dexec.args="--inputFile= --output=" - -To create the bundled JAR of the examples and execute it locally: - - mvn package - - java -cp examples/java/target/beam-examples-java-bundled-.jar \ - org.apache.beam.examples.WordCount \ - --inputFile= --output= - -### Google Cloud Dataflow Runner - -After you have followed the general Cloud Dataflow -[prerequisites and setup](https://beam.apache.org/documentation/runners/dataflow/), you can execute -the pipeline on fully managed resources in Google Cloud Platform: - - mvn compile exec:java \ - -Dexec.mainClass=org.apache.beam.examples.WordCount \ - -Dexec.args="--project= \ - --tempLocation= \ - --runner=DataflowRunner" - -Make sure to use your project id, not the project number or the descriptive name. -The Google Cloud Storage location should be entered in the form of -`gs://bucket/path/to/staging/directory`. - -To create the bundled JAR of the examples and execute it in Google Cloud Platform: - - mvn package - - java -cp examples/java/target/beam-examples-java-bundled-.jar \ - org.apache.beam.examples.WordCount \ - --project= \ - --tempLocation= \ - --runner=DataflowRunner - -## Other Examples - -Other examples can be run similarly by replacing the `WordCount` class path with the example classpath, e.g. -`org.apache.beam.examples.cookbook.CombinePerKeyExamples`, -and adjusting runtime options under the `Dexec.args` parameter, as specified in -the example itself. - -Note that when running Maven on Microsoft Windows platform, backslashes (`\`) -under the `Dexec.args` parameter should be escaped with another backslash. For -example, input file pattern of `c:\*.txt` should be entered as `c:\\*.txt`. +See [Apache Beam WordCount Example](https://beam.apache.org/get-started/wordcount-example/) for information on running these examples. ## Beyond Word Count diff --git a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/README.md b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/README.md deleted file mode 100644 index fdce05cd2405..000000000000 --- a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/README.md +++ /dev/null @@ -1,131 +0,0 @@ - - -# 'Gaming' examples - - -This directory holds a series of example Apache Beam pipelines in a simple 'mobile -gaming' domain. They all require Java 8. Each pipeline successively introduces -new concepts, and gives some examples of using Java 8 syntax in constructing -Beam pipelines. Other than usage of Java 8 lambda expressions, the concepts -that are used apply equally well in Java 7. - -In the gaming scenario, many users play, as members of different teams, over -the course of a day, and their actions are logged for processing. Some of the -logged game events may be late-arriving, if users play on mobile devices and go -transiently offline for a period. - -The scenario includes not only "regular" users, but "robot users", which have a -higher click rate than the regular users, and may move from team to team. - -The first two pipelines in the series use pre-generated batch data samples. The -second two pipelines read from a [PubSub](https://cloud.google.com/pubsub/) -topic input. For these examples, you will also need to run the -`injector.Injector` program, which generates and publishes the gaming data to -PubSub. The javadocs for each pipeline have more detailed information on how to -run that pipeline. - -All of these pipelines write their results to BigQuery table(s). - - -## The pipelines in the 'gaming' series - -### UserScore - -The first pipeline in the series is `UserScore`. This pipeline does batch -processing of data collected from gaming events. It calculates the sum of -scores per user, over an entire batch of gaming data (collected, say, for each -day). The batch processing will not include any late data that arrives after -the day's cutoff point. - -### HourlyTeamScore - -The next pipeline in the series is `HourlyTeamScore`. This pipeline also -processes data collected from gaming events in batch. It builds on `UserScore`, -but uses [fixed windows](https://beam.apache.org/documentation/programming-guide/#windowing), by -default an hour in duration. It calculates the sum of scores per team, for each -window, optionally allowing specification of two timestamps before and after -which data is filtered out. This allows a model where late data collected after -the intended analysis window can be included in the analysis, and any late- -arriving data prior to the beginning of the analysis window can be removed as -well. - -By using windowing and adding element timestamps, we can do finer-grained -analysis than with the `UserScore` pipeline — we're now tracking scores for -each hour rather than over the course of a whole day. However, our batch -processing is high-latency, in that we don't get results from plays at the -beginning of the batch's time period until the complete batch is processed. - -### LeaderBoard - -The third pipeline in the series is `LeaderBoard`. This pipeline processes an -unbounded stream of 'game events' from a PubSub topic. The calculation of the -team scores uses fixed windowing based on event time (the time of the game play -event), not processing time (the time that an event is processed by the -pipeline). The pipeline calculates the sum of scores per team, for each window. -By default, the team scores are calculated using one-hour windows. - -In contrast — to demo another windowing option — the user scores are calculated -using a global window, which periodically (every ten minutes) emits cumulative -user score sums. - -In contrast to the previous pipelines in the series, which used static, finite -input data, here we're using an unbounded data source, which lets us provide -_speculative_ results, and allows handling of late data, at much lower latency. -E.g., we could use the early/speculative results to keep a 'leaderboard' -updated in near-realtime. Our handling of late data lets us generate correct -results, e.g. for 'team prizes'. We're now outputing window results as they're -calculated, giving us much lower latency than with the previous batch examples. - -### GameStats - -The fourth pipeline in the series is `GameStats`. This pipeline builds -on the `LeaderBoard` functionality — supporting output of speculative and late -data — and adds some "business intelligence" analysis: identifying abuse -detection. The pipeline derives the Mean user score sum for a window, and uses -that information to identify likely spammers/robots. (The injector is designed -so that the "robots" have a higher click rate than the "real" users). The robot -users are then filtered out when calculating the team scores. - -Additionally, user sessions are tracked: that is, we find bursts of user -activity using session windows. Then, the mean session duration information is -recorded in the context of subsequent fixed windowing. (This could be used to -tell us what games are giving us greater user retention). - -### Running the PubSub Injector - -The `LeaderBoard` and `GameStats` example pipelines read unbounded data -from a PubSub topic. - -Use the `injector.Injector` program to generate this data and publish to a -PubSub topic. See the `Injector`javadocs for more information on how to run the -injector. Set up the injector before you start one of these pipelines. Then, -when you start the pipeline, pass as an argument the name of that PubSub topic. -See the pipeline javadocs for the details. - -## Viewing the results in BigQuery - -All of the pipelines write their results to BigQuery. `UserScore` and -`HourlyTeamScore` each write one table, and `LeaderBoard` and -`GameStats` each write two. The pipelines have default table names that -you can override when you start up the pipeline if those tables already exist. - -Depending on the windowing intervals defined in a given pipeline, you may have -to wait for a while (more than an hour) before you start to see results written -to the BigQuery tables. diff --git a/pom.xml b/pom.xml index 991144793f00..56ba4fceef83 100644 --- a/pom.xml +++ b/pom.xml @@ -1543,6 +1543,7 @@ **/*.egg-info/ **/sdks/python/LICENSE **/sdks/python/NOTICE + **/sdks/python/README.md false @@ -1680,6 +1681,7 @@ LICENSE NOTICE + README.md diff --git a/sdks/python/README.md b/sdks/python/README.md deleted file mode 100644 index 7836a04ead89..000000000000 --- a/sdks/python/README.md +++ /dev/null @@ -1,298 +0,0 @@ - -# Apache Beam - Python SDK - -[Apache Beam](http://beam.apache.org) is a unified model for defining both batch and streaming data-parallel processing pipelines. Beam provides a set of language-specific SDKs for constructing pipelines. These pipelines can be executed on distributed processing backends like [Apache Spark](http://spark.apache.org/), [Apache Flink](http://flink.apache.org), and [Google Cloud Dataflow](http://cloud.google.com/dataflow). - -Apache Beam for Python provides access to Beam capabilities from the Python programming language. - -## Table of Contents - * [Overview of the Beam Programming Model](#overview-of-the-programming-model) - * [Getting Started](#getting-started) - * [A Quick Tour of the Source Code](#a-quick-tour-of-the-source-code) - * [Simple Examples](#simple-examples) - * [Basic pipeline](#basic-pipeline) - * [Basic pipeline (with Map)](#basic-pipeline-with-map) - * [Basic pipeline (with FlatMap)](#basic-pipeline-with-flatmap) - * [Basic pipeline (with FlatMap and yield)](#basic-pipeline-with-flatmap-and-yield) - * [Counting words](#counting-words) - * [Counting words with GroupByKey](#counting-words-with-groupbykey) - * [Type hints](#type-hints) - * [BigQuery](#bigquery) - * [Combiner examples](#combiner-examples) - * [Organizing Your Code](#organizing-your-code) - * [Contact Us](#contact-us) - -## Overview of the Programming Model - -The key concepts of the programming model are: - -* PCollection - represents a collection of data, which could be bounded or unbounded in size. -* PTransform - represents a computation that transforms input PCollections into output -PCollections. -* Pipeline - manages a directed acyclic graph of PTransforms and PCollections that is ready -for execution. -* Runner - specifies where and how the Pipeline should execute. - -For a further, detailed introduction, please read the -[Beam Programming Model](http://beam.apache.org/documentation/programming-guide). - -## Getting Started - -See [Apache Beam Python SDK Quickstart](https://beam.apache.org/get-started/quickstart-py/). - -## A Quick Tour of the Source Code - -With your virtual environment active, you can follow along this tour by running a `pydoc` server on a local port of your choosing (this example uses port 8888): - -``` -pydoc -p 8888 -``` - -Open your browser and go to -http://localhost:8888/apache_beam.html - -Some interesting classes to navigate to: - -* `PCollection`, in file -[`apache_beam/pvalue.py`](http://localhost:8888/apache_beam.pvalue.html) -* `PTransform`, in file -[`apache_beam/transforms/ptransform.py`](http://localhost:8888/apache_beamtransforms.ptransform.html) -* `FlatMap`, `GroupByKey`, and `Map`, in file -[`apache_beam/transforms/core.py`](http://localhost:8888/apache_beam.transforms.core.html) -* combiners, in file -[`apache_beam/transforms/combiners.py`](http://localhost:8888/apache_beam.transforms.combiners.html) - -Make sure you installed the package first. If not, run `python setup.py install`, then run pydoc with `pydoc -p 8888`. - -## Simple Examples - -The following examples demonstrate some basic, fundamental concepts for using Apache Beam's Python SDK. For more detailed examples, Beam provides a [directory of examples](https://github.com/apache/beam/tree/master/sdks/python/apache_beam/examples) for Python. - -### Basic pipeline - -A basic pipeline will take as input an iterable, apply the -beam.Create `PTransform`, and produce a `PCollection` that can -be written to a file or modified by further `PTransform`s. -The `>>` operator is used to label `PTransform`s and -the `|` operator is used to chain them. - -```python -# Standard imports -import apache_beam as beam -# Create a pipeline executing on a direct runner (local, non-cloud). -p = beam.Pipeline('DirectRunner') -# Create a PCollection with names and write it to a file. -(p - | 'add names' >> beam.Create(['Ann', 'Joe']) - | 'save' >> beam.io.WriteToText('./names')) -# Execute the pipeline. -p.run() -``` - -### Basic pipeline (with Map) - -The `Map` `PTransform` returns one output per input. It takes a callable that is applied to each element of the input `PCollection` and returns an element to the output `PCollection`. - -```python -import apache_beam as beam -p = beam.Pipeline('DirectRunner') -# Read a file containing names, add a greeting to each name, and write to a file. -(p - | 'load names' >> beam.io.ReadFromText('./names') - | 'add greeting' >> beam.Map(lambda name, msg: '%s, %s!' % (msg, name), 'Hello') - | 'save' >> beam.io.WriteToText('./greetings')) -p.run() -``` - -### Basic pipeline (with FlatMap) - -A `FlatMap` is like a `Map` except its callable returns a (possibly -empty) iterable of elements for the output `PCollection`. - -The `FlatMap` transform returns zero to many output per input. It accepts a callable that is applied to each element of the input `PCollection` and returns an iterable with zero or more elements to the output `PCollection`. - -```python -import apache_beam as beam -p = beam.Pipeline('DirectRunner') -# Read a file containing names, add two greetings to each name, and write to a file. -(p - | 'load names' >> beam.io.ReadFromText('./names') - | 'add greetings' >> beam.FlatMap( - lambda name, messages: ['%s %s!' % (msg, name) for msg in messages], - ['Hello', 'Hola']) - | 'save' >> beam.io.WriteToText('./greetings')) -p.run() -``` - -### Basic pipeline (with FlatMap and yield) - -The callable of a `FlatMap` can be a generator, that is, -a function using `yield`. - -```python -import apache_beam as beam -p = beam.Pipeline('DirectRunner') -# Read a file containing names, add two greetings to each name -# (with FlatMap using a yield generator), and write to a file. -def add_greetings(name, messages): - for msg in messages: - yield '%s %s!' % (msg, name) - -(p - | 'load names' >> beam.io.ReadFromText('./names') - | 'add greetings' >> beam.FlatMap(add_greetings, ['Hello', 'Hola']) - | 'save' >> beam.io.WriteToText('./greetings')) -p.run() -``` - -### Counting words - -This example shows how to read a text file from [Google Cloud Storage](https://cloud.google.com/storage/) and count its words. - -```python -import re -import apache_beam as beam -p = beam.Pipeline('DirectRunner') -(p - | 'read' >> beam.io.ReadFromText('gs://dataflow-samples/shakespeare/kinglear.txt') - | 'split' >> beam.FlatMap(lambda x: re.findall(r'\w+', x)) - | 'count words' >> beam.combiners.Count.PerElement() - | 'save' >> beam.io.WriteToText('./word_count')) -p.run() -``` - -### Counting words with GroupByKey - -This is a somewhat forced example of `GroupByKey` to count words as the previous example did, but without using `beam.combiners.Count.PerElement`. As shown in the example, you can use a wildcard to specify the text file source. - -```python -import re -import apache_beam as beam -p = beam.Pipeline('DirectRunner') -class MyCountTransform(beam.PTransform): - def expand(self, pcoll): - return (pcoll - | 'one word' >> beam.Map(lambda word: (word, 1)) - # GroupByKey accepts a PCollection of (word, 1) elements and - # outputs a PCollection of (word, [1, 1, ...]) - | 'group words' >> beam.GroupByKey() - | 'count words' >> beam.Map(lambda (word, counts): (word, len(counts)))) - -(p - | 'read' >> beam.io.ReadFromText('./names*') - | 'split' >> beam.FlatMap(lambda x: re.findall(r'\w+', x)) - | MyCountTransform() - | 'write' >> beam.io.WriteToText('./word_count')) -p.run() -``` - -### Type hints - -In some cases, providing type hints can improve the efficiency -of the data encoding. - -```python -import apache_beam as beam -from apache_beam.typehints import typehints -p = beam.Pipeline('DirectRunner') -(p - | 'read' >> beam.io.ReadFromText('./names') - | 'add types' >> beam.Map(lambda x: (x, 1)).with_output_types(typehints.KV[str, int]) - | 'group words' >> beam.GroupByKey() - | 'save' >> beam.io.WriteToText('./typed_names')) -p.run() -``` - -### BigQuery - -This example reads weather data from a BigQuery table, calculates the number of tornadoes per month, and writes the results to a table you specify. - -```python -import apache_beam as beam -project = 'DESTINATION-PROJECT-ID' -input_table = 'clouddataflow-readonly:samples.weather_stations' -output_table = 'DESTINATION-DATASET.DESTINATION-TABLE' - -p = beam.Pipeline(argv=['--project', project]) -(p - | 'read' >> beam.Read(beam.io.BigQuerySource(input_table)) - | 'months with tornadoes' >> beam.FlatMap( - lambda row: [(int(row['month']), 1)] if row['tornado'] else []) - | 'monthly count' >> beam.CombinePerKey(sum) - | 'format' >> beam.Map(lambda (k, v): {'month': k, 'tornado_count': v}) - | 'save' >> beam.Write( - beam.io.BigQuerySink( - output_table, - schema='month:INTEGER, tornado_count:INTEGER', - create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED, - write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE))) -p.run() -``` - -This pipeline, like the one above, calculates the number of tornadoes per month, but it uses a query to filter out the input instead of using the whole table. - -```python -import apache_beam as beam -project = 'DESTINATION-PROJECT-ID' -output_table = 'DESTINATION-DATASET.DESTINATION-TABLE' -input_query = 'SELECT month, COUNT(month) AS tornado_count ' \ - 'FROM [clouddataflow-readonly:samples.weather_stations] ' \ - 'WHERE tornado=true GROUP BY month' -p = beam.Pipeline(argv=['--project', project]) -(p - | 'read' >> beam.Read(beam.io.BigQuerySource(query=input_query)) - | 'save' >> beam.Write(beam.io.BigQuerySink( - output_table, - schema='month:INTEGER, tornado_count:INTEGER', - create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED, - write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE))) -p.run() -``` - -### Combiner Examples - -Combiner transforms use "reducing" functions, such as sum, min, or max, to combine multiple values of a `PCollection` into a single value. - -```python -import apache_beam as beam -p = beam.Pipeline('DirectRunner') - -SAMPLE_DATA = [('a', 1), ('b', 10), ('a', 2), ('a', 3), ('b', 20)] - -(p - | beam.Create(SAMPLE_DATA) - | beam.CombinePerKey(sum) - | beam.io.WriteToText('./sums')) -p.run() -``` - -The [combiners_test.py](https://github.com/apache/beam/blob/master/sdks/python/apache_beam/examples/cookbook/combiners_test.py) file contains more combiner examples. - -## Organizing Your Code - -Many projects will grow to multiple source code files. It is recommended that you organize your project so that all code involved in running your pipeline can be built as a Python package. This way, the package can easily be installed in the VM workers executing the job. - -Follow the [Juliaset example](https://github.com/apache/beam/tree/master/sdks/python/apache_beam/examples/complete/juliaset). If the code is organized in this fashion, you can use the `--setup_file` command line option to create a source distribution out of the project files, stage the resulting tarball, and later install it in the workers executing the job. - -## More Information - -Please report any issues on [JIRA](https://issues.apache.org/jira/browse/BEAM/component/12328910). - -If you’re interested in contributing to the Beam SDK, start by reading the [Contribute](http://beam.apache.org/contribute/) guide. diff --git a/sdks/python/apache_beam/examples/complete/game/README.md b/sdks/python/apache_beam/examples/complete/game/README.md deleted file mode 100644 index 39677e477af6..000000000000 --- a/sdks/python/apache_beam/examples/complete/game/README.md +++ /dev/null @@ -1,69 +0,0 @@ - -# 'Gaming' examples - -This directory holds a series of example Dataflow pipelines in a simple 'mobile -gaming' domain. Each pipeline successively introduces new concepts. - -In the gaming scenario, many users play, as members of different teams, over -the course of a day, and their actions are logged for processing. Some of the -logged game events may be late-arriving, if users play on mobile devices and go -transiently offline for a period. - -The scenario includes not only "regular" users, but "robot users", which have a -higher click rate than the regular users, and may move from team to team. - -The first two pipelines in the series use pre-generated batch data samples. - -All of these pipelines write their results to Google BigQuery table(s). - -## The pipelines in the 'gaming' series - -### user_score - -The first pipeline in the series is `user_score`. This pipeline does batch -processing of data collected from gaming events. It calculates the sum of -scores per user, over an entire batch of gaming data (collected, say, for each -day). The batch processing will not include any late data that arrives after -the day's cutoff point. - -### hourly_team_score - -The next pipeline in the series is `hourly_team_score`. This pipeline also -processes data collected from gaming events in batch. It builds on `user_score`, -but uses [fixed windows](https://beam.apache.org/documentation/programming-guide/#windowing), -by default an hour in duration. It calculates the sum of scores per team, for -each window, optionally allowing specification of two timestamps before and -after which data is filtered out. This allows a model where late data collected -after the intended analysis window can be included in the analysis, and any -late-arriving data prior to the beginning of the analysis window can be removed -as well. - -By using windowing and adding element timestamps, we can do finer-grained -analysis than with the `UserScore` pipeline — we're now tracking scores for -each hour rather than over the course of a whole day. However, our batch -processing is high-latency, in that we don't get results from plays at the -beginning of the batch's time period until the complete batch is processed. - -## Viewing the results in BigQuery - -All of the pipelines write their results to BigQuery. `user_score` and -`hourly_team_score` each write one table. The pipelines have default table names -that you can override when you start up the pipeline if those tables already -exist.