# Basic Setup
Import your dataflow libraries and initilize the pipeline runner.

In [2]:
# Standard imports
import google.cloud.dataflow as df

## Hello world
Create a transform from an iterable and use the pipe operator to chain transforms:

In [16]:
# Create a pipeline executing on a direct runner (local, non-cloud).
p = df.Pipeline('DirectPipelineRunner')

# Create a PCollection with names and write it to a file.
(p
 | df.Create('add names', ['Ann', 'Joe'])
 | df.Write('save', df.io.TextFileSink('./names')))
# Execute the pipeline.
p.run()

<google.cloud.dataflow.runners.direct_runner.DirectPipelineResult at 0x110f30f90>

## Hello world (with Map)
The Map transform takes a callable, which will be applied to each element of the input PCollection and must return an element to go into the output PCollection.

In [41]:
# Create a pipeline executing on a direct runner (local, non-cloud).
p = df.Pipeline('DirectPipelineRunner')

# Read file with names, add a greeting for each, and write results.
(p
 | df.Read('load messages', df.io.TextFileSource('./names*'))
 | df.Map('add greeting', lambda name, msg: '%s %s!' % (msg, name), 'Hello')
 | df.Write('save', df.io.TextFileSink('./greetings')))
p.run()

<google.cloud.dataflow.runners.direct_runner.DirectPipelineResult at 0x110e643d0>

## Hello world (with FlatMap)
A `FlatMap` is like a `Map` except its callable returns a (possibly empty) iterable of elements for the output `PCollection`.

In [46]:
p = df.Pipeline('DirectPipelineRunner')
# Read previous file, add a name to each greeting and write results.
(p
 | df.Read('load messages', df.io.TextFileSource('./names*'))
 | df.FlatMap('add greetings', lambda name, msgs: ['%s %s!' % (m, name) for m in msgs], ['Hello', 'Hola'])
 | df.Write('save', df.io.TextFileSink('./greetings')))
p.run()

<google.cloud.dataflow.runners.direct_runner.DirectPipelineResult at 0x110e83850>

## Hello world (with FlatMap and yield)
The callable of a `FlatMap` can be a generator, that is, a function using `yield`.

In [49]:
p = df.Pipeline('DirectPipelineRunner')
# Add greetings using a FlatMap function using yield.
def add_greetings(name, messages):
    for m in messages:
        yield '%s %s!' % (m, name)

(p
 | df.Read('load names', df.io.TextFileSource('./names*'))
 | df.FlatMap('greet', add_greetings, ['Hello', 'Hola', 'Hi'])
 | df.Write('save', df.io.TextFileSink('./greetings')))
p.run()

<google.cloud.dataflow.runners.direct_runner.DirectPipelineResult at 0x110e56990>

## Counting words
This example counts the words in a text and also shows how to read a text file from [Google Cloud Storage](https://cloud.google.com/storage/).

In [50]:
import re

p = df.Pipeline('DirectPipelineRunner')

(p
 | df.Read('read', df.io.TextFileSource('gs://dataflow-samples/shakespeare/kinglear.txt'))
 | df.FlatMap('split', lambda x: re.findall(r'\w+', x))
 | df.combiners.Count.PerElement('count words')
 | df.Write('write', df.io.TextFileSink('./results')))
p.run()

<google.cloud.dataflow.runners.direct_runner.DirectPipelineResult at 0x110f66350>

## Counting words with GroupByKey
Here we use `GroupByKey` to count the words. 
This is a somewhat forced example of `GroupByKey`; 
normally one would use the transform `df.combiners.Count.PerElement`, 
as in the previous example. The example also shows the use of a wild-card in specifying the text file source.

In [54]:
import re

p = df.Pipeline('DirectPipelineRunner')

class MyCountTransform(df.PTransform):
    def apply(self, pcoll):
        return (pcoll
        | df.Map('one word', lambda w: (w, 1))
        # GroupByKey accepts a PCollection of (w, 1) and
        # outputs a PCollection of (w, (1, 1, ...))
        | df.GroupByKey('group words')
        | df.Map('count words', lambda (word, counts): (word, len(counts))))

(p
| df.Read('read', df.io.TextFileSource('./names*'))
| df.FlatMap('split', lambda x: re.findall(r'\w+', x))
| MyCountTransform()
| df.Write('write', df.io.TextFileSink('./results')))
p.run()

<google.cloud.dataflow.runners.direct_runner.DirectPipelineResult at 0x110f66ed0>

## Type hints
In some cases, you can improve the efficiency of the data encoding by providing type hints. For example:

In [55]:
from google.cloud.dataflow.typehints import typehints

p = df.Pipeline('DirectPipelineRunner')

(p
 | df.Read('A', df.io.TextFileSource('./names*'))
 | df.Map('B1', lambda x: (x, 1)).with_output_types(typehints.KV[str, int])
 | df.GroupByKey('GBK')
 | df.Write('C', df.io.TextFileSink('./results')))
p.run()

<google.cloud.dataflow.runners.direct_runner.DirectPipelineResult at 0x110f1be90>

## BigQuery
Here is a pipeline that reads input from a BigQuery table and writes the result to a different table. This example calculates the number of tornadoes per month from weather data. To run it you will need to provide an output table that you can write to.

In [57]:
input_table = 'clouddataflow-readonly:samples.weather_stations'
project = 'djomniture'

p = df.Pipeline(argv=['--project', project])
(p
 | df.Read('read', df.io.BigQuerySource(input_table))
 | df.FlatMap( 'months with tornadoes', lambda row: [(int(row['month']), 1)] if row['tornado'] else [])
 | df.CombinePerKey('monthly count', sum)
 | df.Map('format', lambda (k, v): {'month': k, 'tornado_count': v})
 | df.Write('write', df.io.TextFileSink('./tornadoes')))
p.run()

<google.cloud.dataflow.runners.direct_runner.DirectPipelineResult at 0x110d6dc10>

Here is a pipeline that achieves the same functionality, i.e., calculates the number of tornadoes per month, but uses a query to filter out input instead of using the whole table.

In [58]:
project = 'djomniture'
input_query = 'SELECT month, COUNT(month) AS tornado_count ' \
        'FROM [clouddataflow-readonly:samples.weather_stations] ' \
        'WHERE tornado=true GROUP BY month'
p = df.Pipeline(argv=['--project', project])
(p
| df.Read('read', df.io.BigQuerySource(query=input_query))
| df.Write('write', df.io.TextFileSink('./tornadoes')))
p.run()

<google.cloud.dataflow.runners.direct_runner.DirectPipelineResult at 0x110ee3290>

## Combiner Examples
A common case for Dataflow combiners is to sum (or max or min) over the values of each key. Such standard Python functions can be used directly as combiner functions. In fact, any function "reducing" an iterable to a single value can be used.

In [59]:
p = df.Pipeline('DirectPipelineRunner')

SAMPLE_DATA = [('a', 1), ('b', 10), ('a', 2), ('a', 3), ('b', 20)]

(p
 | df.Create(SAMPLE_DATA)
 | df.CombinePerKey(sum)
 | df.Write(df.io.TextFileSink('./results')))
p.run()

<google.cloud.dataflow.runners.direct_runner.DirectPipelineResult at 0x110ee3210>