In [1]:
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions

## ParDo

In [2]:
class ComputeWordLengthFn(beam.DoFn):
    def process(self, x):
        return [len(x)]

with beam.Pipeline(options=PipelineOptions()) as p:

    lines = p | beam.Create([
        'this', 'is', 'a', 'list'
    ])

    word_lengths = lines | beam.ParDo(ComputeWordLengthFn())

    (word_lengths | beam.io.WriteToText('output.txt'))

In [3]:
!cat output.txt-00000-of-00001

4
2
1
4


In [4]:
with beam.Pipeline(options=PipelineOptions()) as p:
    lines = p | beam.Create([
        'this', 'is', 'a', 'list'
    ])

    word_lengths = lines | beam.Map(lambda x: '%s,%s'%(x,len(x)))

    (word_lengths | beam.io.WriteToText('output-2.txt'))

In [5]:
!cat output-2.txt-00000-of-00001

this,4
is,2
a,1
list,4


## GroupByKey

In [6]:
with beam.Pipeline(options=PipelineOptions()) as p:
    lines = p | beam.io.ReadFromText('group-by-key-input.csv')
    
    key_values = lines | beam.Map(lambda x: x.split(','))
    
    group_by = key_values | beam.GroupByKey()
    
    format_out_lines = group_by | beam.Map(lambda x: '%s,%s'%(x[0], x[1]))
    
    (format_out_lines | beam.io.WriteToText('output-3.txt'))

In [7]:
!cat output-3.txt-00000-of-00001

tree,[u'2']
and,[u'1', u'2', u'6']
cat,[u'1', u'5', u'9']
jump,[u'3']
dog,[u'5', u'2']


## CoGroupByKey

In [8]:
with beam.Pipeline(options=PipelineOptions()) as p:
    emails_list = [
        ('amy', 'amy@example.com'),
        ('carl', 'carl@example.com'),
        ('julia', 'julia@example.com'),
        ('carl', 'carl@email.com'),
    ]
    phones_list = [
        ('amy', '111-222-3333'),
        ('james', '222-333-4444'),
        ('amy', '333-444-5555'),
        ('carl', '444-555-6666'),
    ]

    emails = p | 'CreateEmails' >> beam.Create(emails_list)
    phones = p | 'CreatePhones' >> beam.Create(phones_list)

    results = ({'emails': emails, 'phones': phones}
           | beam.CoGroupByKey())

    def join_info(name_info):
      (name, info) = name_info
      return '%s; %s; %s' %\
          (name, sorted(info['emails']), sorted(info['phones']))

    contact_lines = results | beam.Map(join_info)
    
    (contact_lines | beam.io.WriteToText('output-4.txt'))

In [9]:
!cat output-4.txt-00000-of-00001

amy; ['amy@example.com']; ['111-222-3333', '333-444-5555']
james; []; ['222-333-4444']
julia; ['julia@example.com']; []
carl; ['carl@email.com', 'carl@example.com']; ['444-555-6666']


## Combine

For combining values accross a `PCollection`

In [10]:
with beam.Pipeline(options=PipelineOptions()) as p:
    pc = p | beam.Create([1, 10, 100, 1000])

    def bounded_sum(values, bound=500):
        return min(sum(values), bound)

    # if operations are the same, they need a unique label per the Pipeline
    small_sum = pc | "SmallSum" >> beam.CombineGlobally(bounded_sum)
    large_sum = pc | "LargeSum" >> beam.CombineGlobally(bounded_sum, bound=5000)
    
    (small_sum | "WriteSmallSum" >> beam.io.WriteToText('output-5.txt'))

    # NOTE: writing to the same file twice overwrites the value
    #(large_sum | "WriteLargeSum" >> beam.io.WriteToText('output-5.txt'))

In [11]:
!cat output-5.txt-00000-of-00001

500


### CombineFn

For more sophisticated combining, i.e.

> must perform additional pre- or post-processing, might change the output type, or takes the key into account.

In [12]:
class AverageFn(beam.CombineFn):
    def create_accumulator(self):
        return (0.0, 0)

    def add_input(self, sum_count, input):
        (sum, count) = sum_count
        return sum + input, count + 1

    def merge_accumulators(self, accumulators):
        sums, counts = zip(*accumulators)
        return sum(sums), sum(counts)

    def extract_output(self, sum_count):
        (sum, count) = sum_count
        return sum / count if count else float('NaN')

    
with beam.Pipeline(options=PipelineOptions()) as p:
    pc = p | beam.Create([1, 2, 3, 20])
    
    average = pc | beam.CombineGlobally(AverageFn())
    
    (average | beam.io.WriteToText('output-6.txt'))

In [13]:
!cat output-6.txt-00000-of-00001

6.5


### CombinePerKey

Combine values per Key

In [14]:
def split_key_and_int(x):
    k, v = x.split(',')
    return k, int(v)

with beam.Pipeline(options=PipelineOptions()) as p:
    lines = p | beam.io.ReadFromText('group-by-key-input.csv')
    
    key_values = lines | beam.Map(split_key_and_int)
    
    average_per_key = (
        key_values | beam.CombinePerKey(beam.combiners.MeanCombineFn()))

    (average_per_key | beam.io.WriteToText('output-7.txt'))

In [15]:
!cat output-7.txt-00000-of-00001

(u'tree', 2.0)
(u'and', 3.0)
(u'cat', 5.0)
(u'jump', 3.0)
(u'dog', 3.5)


## Flatten

Use to combine 2 or more `PCollection`'s (must have the same data type)

In [16]:
with beam.Pipeline(options=PipelineOptions()) as p:
    work_emails_list = [
        ('amy', 'amy@work.com'),
        ('carl', 'carl@work.com')
    ]
    personal_emails_list = [
        ('jay', 'julia@personal.com'),
        ('steve', 'carl@personal.com'),
    ]

    work_emails = p | 'CreateWorkEmails' >> beam.Create(work_emails_list)
    personal_emails = p | 'CreatePersonalEmails' >> beam.Create(personal_emails_list)

    merged = (work_emails_list, personal_emails_list) | beam.Flatten()
    
    (merged | beam.io.WriteToText('output-8.txt'))

In [17]:
!cat output-8.txt-00000-of-00001

('amy', 'amy@work.com')
('carl', 'carl@work.com')
('jay', 'julia@personal.com')
('steve', 'carl@personal.com')


## Partition

To separate a `PCollection` into 2 or more `PCollection`'s

In [18]:
with beam.Pipeline(options=PipelineOptions()) as p:
    emails_list = [
        ('amy', 'amy@work.com'),
        ('carl', 'carl@work.com'),
        ('jay', 'julia@personal.com'),
        ('steve', 'carl@personal.com'),
    ]

    emails = p | 'CreateEmails' >> beam.Create(emails_list)
    
    def partition_fn(value, num_partitions):
        name, email = value
        if email.endswith('work.com'):
            return 0
        return 1

    email_partition = emails | beam.Partition(partition_fn, 2)

    (email_partition[0] | "WritePartition0" >> beam.io.WriteToText('output-9-partition-0.txt'))
    (email_partition[1] | "WritePartition1" >> beam.io.WriteToText('output-9-partition-1.txt'))

In [19]:
!cat output-9-partition-0.txt-00000-of-00001

('amy', 'amy@work.com')
('carl', 'carl@work.com')


In [20]:
!cat output-9-partition-1.txt-00000-of-00001

('jay', 'julia@personal.com')
('steve', 'carl@personal.com')
