## Setup

### Install Apache Beam

https://colab.research.google.com/?utm_source=scs-index

In [45]:
!{'pip install --quiet apache-beam'}
!{'mkdir -p data'}

### Upload Data

In [7]:
from google.colab import files
uploaded = files.upload()

Saving data.txt to data.txt


In [8]:
!ls

data  data.txt	dept_data.txt  exclude_ids.txt	sample_data


### Load Data from Google Drive

In [24]:
from google.colab import drive
drive.mount("/content/drive")

Mounted at /content/drive


In [29]:
!ls "drive/MyDrive/Colab Notebooks/data"

data.txt  dept_data.txt  exclude_ids.txt  location.txt


In [7]:
!{'head -n 5 dept_data.txt'}

149633CM,Marco,10,Accounts,1-01-2019
212539MU,Rebekah,10,Accounts,1-01-2019
231555ZZ,Itoe,10,Accounts,1-01-2019
503996WI,Edouard,10,Accounts,1-01-2019
704275DC,Kyle,10,Accounts,1-01-2019


## General

### Lamda Function

In [1]:
example = lambda x,y : x*y
print(example(3,5))

15


## Pipelines

### Simple

In [8]:
import apache_beam as beam

def SplitRow(element):
  return element.split(',')

def filtering(record):
  return record[3] == 'Accounts'

with beam.Pipeline() as p1:

  attendance_count = (
      p1
      | 'Read from file' >> beam.io.ReadFromText('dept_data.txt')
      | 'Map transform Split' >> beam.Map(SplitRow)
      #| beam.FlatMap(SplitRow)
      #| beam.Map(lambda record: record.split(','))
      | 'Filter' >> beam.Filter(filtering)
      #| beam.Filter(lambda record: record[3] == 'Accounts')
      | beam.Map(lambda record:(record[1],1))
      | beam.CombinePerKey(sum)
      | beam.io.WriteToText('data/output_new')
  )

!{('head -n 5 data/output_new-00000-of-00001')}





('Marco', 31)
('Rebekah', 31)
('Itoe', 31)
('Edouard', 31)
('Kyle', 62)


### Branching

In [21]:
import apache_beam as beam

def SlitRow(element):
  return element.split(',')

p = beam.Pipeline()

input_collection = (
    p
    | "Read from text file" >> beam.io.ReadFromText('dept_data.txt')
    | "Split rows" >> beam.Map(SplitRow)
)

accounts_count = (
    input_collection
    | "Get all Accounts dept persons" >> beam.Filter(lambda record: record[3] == "Accounts")
    | "Pair each accounts employee with 1" >> beam.Map(lambda record: ("Accounts, " + record[1], 1))
    | "Group and sum1" >> beam.CombinePerKey(sum)
    #| "Write results for account" >> beam.io.WriteToText("data/Account")
)

hr_count = (
    input_collection
    | "Get all HR dept persons" >> beam.Filter(lambda record: record[3] == "HR")
    | "Pair reach HR employee with 1" >> beam.Map(lambda record: ("HR, " + record[1],1))
    | "Group and sum" >> beam.CombinePerKey(sum)
    #| "Write results for HR" >> beam.io.WriteToText("data/HR")
)

output = (
    (accounts_count,hr_count)
    | beam.Flatten()
    | beam.io.WriteToText('data/both')
)

p.run()

#!{('head -n 5 data/Account-00000-of-00001')}
#!{('head -n 5 data/HR-00000-of-00001')}
!{('head -n 5 data/both-00000-of-00001')}



('Accounts, Marco', 31)
('Accounts, Rebekah', 31)
('Accounts, Itoe', 31)
('Accounts, Edouard', 31)
('Accounts, Kyle', 62)


### Wordcount

In [54]:
import apache_beam as beam
import re

def SlitRow(element):
  return element.split(',')

p = beam.Pipeline()

wordcount = (
    p
    | "Read from text file" >> beam.io.ReadFromText('data.txt')
    | "Lowercase" >> beam.Map(lambda record: (record.lower()))
    | "Remove Non-Ascii" >> beam.Map(lambda record: (re.sub(r'[^a-z]', ',', record)))
    | "Split rows" >> beam.FlatMap(SplitRow)
    | "Remove Empty Cells" >> beam.Filter(lambda record: record != "")
    | "Match each Word with 1" >> beam.Map(lambda record: (record,1))
    | "Group and sum" >> beam.CombinePerKey(sum)
    | beam.io.WriteToText('data/wordcount') 
)

p.run()

!{('head -n 5 data/wordcount-00000-of-00001')}



('king', 311)
('lear', 257)
('dramatis', 1)
('personae', 1)
('of', 483)


### ParDo Transformations


In [64]:
import apache_beam as beam

class SplitRow(beam.DoFn):

  def process(self, element):
    return [element.split(',')]

class FilterAccountsEmployee(beam.DoFn):
  
  def process(self, element):
    if element[3] == "Accounts":
      return [element]

class PairEmployees(beam.DoFn):

  def process(self, element):
    return [(element[3]+","+element[1],1)]

class Counting(beam.DoFn):

  def process(self, element):
    (key, values) = element
    return [(key, sum(values))]

p1 = beam.Pipeline()

attendance_count = (
    p1
    | 'Read from file' >> beam.io.ReadFromText('dept_data.txt')
    | 'ParDo Split' >> beam.ParDo(SplitRow())
    #| 'ParDo Split Lambda' >> beam.ParDo(lambda element: [element.split(',')])
    | 'Filter Accounts' >> beam.ParDo(FilterAccountsEmployee())
    | 'Map 1 on each Name' >> beam.ParDo(PairEmployees())
    | 'Group' >> beam.GroupByKey()
    | 'Sum' >> beam.ParDo(Counting())
    | beam.io.WriteToText('data/output_new')
)

p1.run()

!{('head -n 5 data/output_new-00000-of-00001')}



('Accounts,Marco', 31)
('Accounts,Rebekah', 31)
('Accounts,Itoe', 31)
('Accounts,Edouard', 31)
('Accounts,Kyle', 62)


### Combiner

In [76]:
import apache_beam as beam

class AverageFn(beam.CombineFn):

  def create_accumulator(self):
    return(0.0, 0)

  def add_input(self, sum_count, input):
    (sum, count) = sum_count
    return sum + input, count + 1
  
  def merge_accumulators(self, accumulators):
    ind_sums, ind_counts = zip(*accumulators)
    return sum(ind_sums), sum(ind_counts)
  
  def extract_output(self, sum_count):
    (sum, count) = sum_count
    return sum / count if count else float('NaN')

p = beam.Pipeline()

small_sum = (
    p
    | beam.Create([15,5,7,7,9,23,13,5])
    | "Combine Globally" >> beam.CombineGlobally(AverageFn())
    | "Write Results" >> beam.io.WriteToText('data/combine')
)

p.run()

!{'head -n 5 data/combine-00000-of-00001'}



10.5


### Composite Transforms

In [85]:
import apache_beam as beam

class MyTransform(beam.PTransform):

  def expand(self, input_coll):
    a = (
        input_coll
        | "Group and sum" >> beam.CombinePerKey(sum)
        | "Count Filter" >> beam.Filter(filter_on_count)
        | "Regular accounts employee" >> beam.Map(format_output)
    )

    return a

def SplitRow(element):
  return element.split(',')

def filter_on_count(element):
  name, count = element
  if count > 30:
    return element

def format_output(element):
  name, count = element
  return ((name.encode('ascii'),str(count),'Regular employee'))

p = beam.Pipeline()

input_collection = (
    p
    | "Read from text file" >> beam.io.ReadFromText('dept_data.txt')
    | "Split rows" >> beam.Map(SplitRow)
)

accounts_count = (
    input_collection
    | "Get all Accounts dept persons" >> beam.Filter(lambda record: record[3] == "Accounts")
    | "Pair each accounts employee with 1" >> beam.Map(lambda record: ("Accounts, " + record[1], 1))
    | "Composite Accounts" >> MyTransform()
)

hr_count = (
    input_collection
    | "Get all HR dept persons" >> beam.Filter(lambda record: record[3] == "HR")
    | "Pair reach HR employee with 1" >> beam.Map(lambda record: ("HR, " + record[1],1))
    | "Composite HR" >> MyTransform()
)

output = (
    (accounts_count,hr_count)
    | beam.Flatten()
    | beam.io.WriteToText('data/both')
)

p.run()

!{('head -n 5 data/both-00000-of-00001')}



(b'Accounts, Marco', '31', 'Regular employee')
(b'Accounts, Rebekah', '31', 'Regular employee')
(b'Accounts, Itoe', '31', 'Regular employee')
(b'Accounts, Edouard', '31', 'Regular employee')
(b'Accounts, Kyle', '62', 'Regular employee')


### CoGroupBykey

In [88]:
import apache_beam as beam

def retTuple(element):
  thisTuple=element.split(',')
  return(thisTuple[0],thisTuple[1:])

p1 = beam.Pipeline()

dep_rows = (
    p1
    | "Reading File 1" >> beam.io.ReadFromText("dept_data.txt")
    | "Pair each employee with key" >> beam.Map(retTuple)
)

loc_rows = (
    p1
    | "Reading File 2" >> beam.io.ReadFromText("location.txt")
    | "Pair each loc with key" >> beam.Map(retTuple)
)

results = (
    {"dep_data": dep_rows, "loc_data": loc_rows}
    | beam.CoGroupByKey()
    | "Write reults" >> beam.io.WriteToText("data/result")
)

p1.run()

!{("head -n 5 data/result-00000-of-00001")}



('149633CM', {'dep_data': [['Marco', '10', 'Accounts', '1-01-2019'], ['Marco', '10', 'Accounts', '2-01-2019'], ['Marco', '10', 'Accounts', '3-01-2019'], ['Marco', '10', 'Accounts', '4-01-2019'], ['Marco', '10', 'Accounts', '5-01-2019'], ['Marco', '10', 'Accounts', '6-01-2019'], ['Marco', '10', 'Accounts', '7-01-2019'], ['Marco', '10', 'Accounts', '8-01-2019'], ['Marco', '10', 'Accounts', '9-01-2019'], ['Marco', '10', 'Accounts', '10-01-2019'], ['Marco', '10', 'Accounts', '11-01-2019'], ['Marco', '10', 'Accounts', '12-01-2019'], ['Marco', '10', 'Accounts', '13-01-2019'], ['Marco', '10', 'Accounts', '14-01-2019'], ['Marco', '10', 'Accounts', '15-01-2019'], ['Marco', '10', 'Accounts', '16-01-2019'], ['Marco', '10', 'Accounts', '17-01-2019'], ['Marco', '10', 'Accounts', '18-01-2019'], ['Marco', '10', 'Accounts', '19-01-2019'], ['Marco', '10', 'Accounts', '20-01-2019'], ['Marco', '10', 'Accounts', '21-01-2019'], ['Marco', '10', 'Accounts', '22-01-2019'], ['Marco', '10', 'Accounts', '23-01-2

### Side Input

In [40]:
import apache_beam as beam

side_list=list()

with open ('drive/MyDrive/Colab Notebooks/data/exclude_ids.txt','r') as my_file:
  for line in my_file:
    side_list.append(line.rstrip())

p = beam.Pipeline()

class FilterUsingLength(beam.DoFn):
  
  def process(self, element, side_list, lower_bound, upper_bound=float('inf')):
    id = element.split(',')[0]
    name = element.split(',')[1]
    #id=id.decode('utf-8','ignore').encode("utf-8")
    element_list = element.split(',')
    if (lower_bound <= len(name) <= upper_bound) and id not in side_list:
      return [element_list]

small_names = (
    p
    | "Read from text file" >> beam.io.ReadFromText('drive/MyDrive/Colab Notebooks/data/dept_data.txt')
    | "ParDo with side inputs" >> beam.ParDo(FilterUsingLength(),side_list,3,10)
    | beam.Filter(lambda record: record[3] == 'Accounts')
    | beam.Map(lambda record: (record[0] + " " + record[1], 1))
    | beam.CombinePerKey(sum)
    | "Write results" >> beam.io.WriteToText('data/output_new_final')
)

p.run()

!{('head -n 5 data/output_new_final-00000-of-00001')}



('503996WI Edouard', 31)
('957149WC Kyle', 31)
('241316NX Kumiko', 31)
('796656IE Gaston', 31)
('718737IX Ayumi', 30)


### Additional Outputs

In [59]:
import apache_beam as beam

class ProcessWords(beam.DoFn):

  def process(self,element,cutoff_length,marker):

    name = element.split(',')[1]

    if name.startswith(marker):
      return [name]

    elif len(name) <= cutoff_length:
      return [beam.pvalue.TaggedOutput('Short_Names',name)]

    else:
      return [beam.pvalue.TaggedOutput('Long_Names',name)]

p = beam.Pipeline()

results = (
    p
    | beam.io.ReadFromText('drive/MyDrive/Colab Notebooks/data/dept_data.txt')
    | beam.ParDo(ProcessWords(), cutoff_length=4, marker='A').with_outputs('Short_Names','Long_Names',main='Names_A')
)

short_collection = results.Short_Names
long_collection = results.Long_Names
startA_collection = results.Names_A

short_collection | "Write 1" >> beam.io.WriteToText('short')
long_collection | "Write 2" >> beam.io.WriteToText('long')
startA_collection | "Write 3" >> beam.io.WriteToText('start_a')

p.run()

!{'head -n 5 short-00000-of-00001'}
!{'head -n 5 long-00000-of-00001'}
!{'head -n 5 start_a-00000-of-00001'}



Itoe
Kyle
Kyle
Olga
Kirk
Marco
Rebekah
Edouard
Kumiko
Gaston
Ayumi
Ayumi
Ayumi
Ayumi
Ayumi


## Coders, Type Hints

In [None]:
import apache_beam as beam
import typing

class Employee(object):

  def __init__(self, id, name):
    self.id = id
    self.name = name

class EmployeeCoder(beam.coders.Coder):

  def encode(self, emplyee):
    return('%s:%s' % (employee.id, employee.name)).encode('utf-8')

  def decode(self, s):
    return Employee(*s.decode('utf-8').split(':'))
  
  def is_determinisitic(self):
    return True

beam.coders.registry.register_coder(Employee, EmployeeCoder)

def split_file(input):
  name, id, salary = input.split(',')
  return Employee(id, name), int(salary)

result = (
    p
    | beam.io.ReadFromTest('data.txt')
    | beam.Map(split_file)
    | beam.CombinePerKey(sum).with_input_types(typing.Tuple[Employee, int])
)

p.run()