In [None]:
!pip install -q apache-beam[interactive]

## Overview

I am playing with apache beam to create a simple pipeline.  This is based on  [An interactive overview of beam](https://beam.apache.org/get-started/an-interactive-overview-of-beam/) so check it out for more background.

As in the example, I will start the pipeline with a simple python list
of data.  Then I will try to make the input more complicated.

In [None]:
import apache_beam as beam

inputs = [0, 1, 2, 3, 4, 5]

Let's make our first pipeline

In [None]:
with beam.Pipeline() as pipeline:
  outputs = (
      pipeline
      | 'Create pipeline input' >> beam.Create(inputs)
  )

  # you cannot print the outputs directly,
  # outputs is a PCollection, which we can
  # contiue using...
  outputs | 'Print output' >> beam.Map(print)

Maps allow 1-1 transfroms...

In [None]:
with beam.Pipeline() as pipeline:
  outputs = (
      pipeline
      | 'Create pipeline input' >> beam.Create(inputs)
      | 'Multiply the value by 2' >> beam.Map(lambda x: x * 2)
      | 'Print output' >> beam.Map(print)
  )


FlatMaps allow 1-many transformationss

In [None]:
import math

with beam.Pipeline() as pipeline:
  outputs = (
      pipeline
      | 'Create pipeline input' >> beam.Create(inputs)
      | 'Multiply the value by 2' >> beam.Map(lambda x: x * 2)
      | 'FlatMap' >> beam.FlatMap(lambda x: [x, math.sqrt(x)])
      | 'Print output' >> beam.Map(print)
  )

filter does what you expect... filter

In [None]:
with beam.Pipeline() as pipeline:
  outputs = (
      pipeline
      | 'Create pipeline input' >> beam.Create(inputs)
      | 'Multiply the value by 2' >> beam.Map(lambda x: x * 2)
      | 'FlatMap' >> beam.FlatMap(lambda x: [x, math.sqrt(x)])
      | 'Filter' >> beam.Filter(lambda x: x % 2 == 0)
      | 'Print output' >> beam.Map(print)
  )

combine is like reduce
first use CombineGlobally which returns a single value.  sum is the python built-in function.  you can also give a custom function CombineFn

In [None]:
with beam.Pipeline() as pipeline:
  outputs = (
      pipeline
      | 'Create pipeline input' >> beam.Create(inputs)
      | 'Multiply the value by 2' >> beam.Map(lambda x: x * 2)
      | 'Combine' >> beam.CombineGlobally(sum)
      | 'Print output' >> beam.Map(print)
  )

group by key needs data with keys

In [None]:
keyed_inputs = [ ('Daniel',22), ('Daniel',30),('Bob',28),('Bob',10)]
with beam.Pipeline() as pipeline:
  outputs = (
      pipeline
      | 'Create pipeline input' >> beam.Create(keyed_inputs)
      | 'Group by Person' >> beam.GroupByKey()
      | 'sum over key' >> beam.CombineValues(sum)
      | 'Print output' >> beam.Map(print)
  )

Building an average for each key is more difficult and you need to define a *CombineFn*

In [None]:
class AverageFn(beam.CombineFn):
  def create_accumulator(self):
    return (0, 0)  # (total, count)

  def add_input(self, accumulator, input):
    total, count = accumulator
    return total + input, count + 1

  def merge_accumulators(self, accumulators):
    total, count = 0, 0
    for acc in accumulators:
      total += acc[0]
      count += acc[1]
    return total, count

  def extract_output(self, accumulator):
    total, count = accumulator
    return total / count

In [None]:
keyed_inputs = [ ('Daniel',22), ('Daniel',30),('Bob',28),('Bob',10)]
with beam.Pipeline() as pipeline:
  outputs = (
      pipeline
      | 'Create pipeline input' >> beam.Create(keyed_inputs)
      | 'Group by Person' >> beam.GroupByKey()
      | 'mean per person' >> beam.CombineValues(AverageFn())
      | 'Print output' >> beam.Map(print)
  )

## Playing with FileIO

Lets make 3 files with random digits (rnd_data_1.csv,rnd_data_2.csv,rnd_data_3.csv) we will load the data later

In [None]:
from apache_beam.io import fileio
import pandas as pd
import numpy as np
import os,glob

# Create a random matrix of size 10x3
random_matrix = np.random.rand(10, 3)
np.savetxt("rnd_data_1.csv", random_matrix, delimiter=",")
random_matrix = np.random.rand(10, 3)
np.savetxt("rnd_data_2.csv", random_matrix, delimiter=",")
random_matrix = np.random.rand(10, 3)
np.savetxt("rnd_data_3.csv", random_matrix, delimiter=",")

In [None]:
with beam.Pipeline() as pipeline:
  csv_files = (
      pipeline
      | 'get csv files' >> fileio.MatchFiles('*.csv')
      | 'read csv files' >> fileio.ReadMatches()
      | 'Extract filenames' >> beam.Map(lambda file: file.metadata.path)
      | 'print files' >> beam.Map(print)
  )



In [None]:
def read_csv(file):
  print(f'reading {file}')
  return pd.read_csv(file).values

def run_pipeline(file_pattern):
    with beam.Pipeline() as pipeline:
        filenames = (
            pipeline
            | 'Match files' >> fileio.MatchFiles(file_pattern)
            | 'Read matched files' >> fileio.ReadMatches()
            | 'Extract filenames' >> beam.Map(lambda file: file.metadata.path)
            | 'read csv' >> beam.Map(read_csv)
            | 'combine into list' >> beam.combiners.ToList()
            | 'Print output' >> beam.Map(print)
        )

file_pattern = './*.csv'
run_pipeline(file_pattern)

print(f'glob: {glob.glob(os.path.join(".", "*.csv"))}')

In [None]:
## clean up
files = [ file for file in os.listdir() if file.endswith('.csv') ]
for file in files:
  os.unlink(file)