<a href="https://colab.research.google.com/github/LucasLaibly/apache-beam-colab/blob/main/apache_beam.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!pip install --quiet apache-beam[gcp]

In [None]:
!{'mkdir -p data'}

In [None]:
!ls

data  dept_data.txt  location.txt  sample_data


In [None]:
from google.colab import files
uploaded = files.upload()

Saving data.txt to data.txt


In [None]:
import apache_beam as beam

p2 = beam.Pipeline()

lines = (
    p2
     |beam.Create(['Using create transform',
                   'to generate in memory',
                   'This is the thired line.',
                   'Now fourth'])
     |beam.io.WriteToText('data/outCreate1')
)

p2.run()

# see output
!{('head -n 20 data/outCreate1-00000-of-00001')}





Using create transform
to generate in memory
This is the thired line.
Now fourth


In [None]:
import apache_beam as beam

pipe3 = beam.Pipeline()

lines2 = (
    pipe3
      | beam.Create([1,2,3,4])
      | beam.io.WriteToText('data/outCreate2')
)

pipe3.run()

# see output
!{('head -n 20 data/outCreate2-00000-of-00001')}



1
2
3
4


In [None]:
import apache_beam as beam

p4 = beam.Pipeline()

lines = (
    p4
    | beam.Create({
        'row1':[1,2,3],
        'row2':[4,5,6]})
    | beam.Map(lambda element: element)
    | beam.io.WriteToText('data/outCreate3')
)

p4.run()

# see output
!{('head -n 20 data/outCreate3-00000-of-00001')}



('row1', [1, 2, 3])
('row2', [4, 5, 6])


In [None]:
import apache_beam as beam

def SplitRow(element):
  return element.split(',')

def filtering(record):
  return record[3] == 'Accounts'

p1 = beam.Pipeline()

attendance_count = (
    p1
    | 'Read from file' >> beam.io.ReadFromText('dept_data.txt')
    | 'Map transform' >> beam.Map(SplitRow) # FlatMap will return output elements individually.
    #| beam.Map(lambda record: record.split(',))

    | beam.Filter(filtering)
    #| beam.Filter(lambda record: record[3] == 'Accounts)

    | beam.Map(lambda record: (record[1], 1))
    | beam.CombinePerKey(sum)
    | beam.io.WriteToText('data/output_new')
)

p1.run()

!{('head -n 20 data/output_new-00000-of-00001')}



('Marco', 31)
('Rebekah', 31)
('Itoe', 31)
('Edouard', 31)
('Kyle', 62)
('Kumiko', 31)
('Gaston', 31)
('Ayumi', 30)


In [None]:
import apache_beam as beam

def SplitRow(element):
  return element.split(',')

branch = beam.Pipeline()

input_collection = (
  branch
    | 'Read from txt file' >> beam.io.ReadFromText('dept_data.txt')
    | 'Split rows'         >> beam.Map(SplitRow)
)

accounts_count = (
    input_collection
    | 'Get all accounts dept persons'   >> beam.Filter(lambda record: record[3] == 'Accounts')
    | 'Pair ea account employee with 1' >> beam.Map(lambda record: ('Accounts, ' +record[1], 1))
    | 'Write results for account'       >> beam.CombinePerKey(sum)
    #| 'Write results to file'           >> beam.io.WriteToText('data/Account')
)

hr_count = (
    input_collection
    | 'Get all HR dept persons'      >> beam.Filter(lambda record: record[3] == 'HR')
    | 'Pair each HR employee with 1' >> beam.Map(lambda record: ('HR, ' +record[1], 1))
    | 'Group and sum'                >> beam.CombinePerKey(sum)
    #| 'Write reuslts to file'        >> beam.io.WriteToText('data/HR')
)

# Flatten Transform
output = (
    (accounts_count,hr_count)
    | beam.Flatten()
    | beam.io.WriteToText('data/both')
)

branch.run()

# !{('head -n 20 data/Account-00000-of-00001')}
# !{('head -n 20 data/HR-00000-of-00001')}
!{('head -n 20 data/both-00000-of-00001')}



('HR, Beryl', 62)
('HR, Olga', 31)
('HR, Leslie', 31)
('HR, Mindy', 31)
('HR, Vicky', 31)
('HR, Richard', 31)
('HR, Kirk', 31)
('HR, Kaori', 31)
('HR, Oscar', 31)
('Accounts, Marco', 31)
('Accounts, Rebekah', 31)
('Accounts, Itoe', 31)
('Accounts, Edouard', 31)
('Accounts, Kyle', 62)
('Accounts, Kumiko', 31)
('Accounts, Gaston', 31)
('Accounts, Ayumi', 30)


In [None]:
import apache_beam as beam

class SplitRow(beam.DoFn):
  def process(slef, element):
    # return type -> list
    return [element.split(',')]

class Filtering(beam.DoFn):
  def process(self, element):
    if element[3] == 'Acccounts':
      return [element]

class PairEmployees(beam.DoFn):
  def process(self, element):
    return [(elements[3]+","+element[1], 1)]

class Counting(beam.DoFn):
  def process(self, element):
    # return type-> list
    (key, values) = element
    return [(key, sum(values))]

attendance_count = (
    p1
    | beam.io.ReadFromText('dept_data.txt')
    | beam.ParDo(SplitRow())
    | beam.ParDo(Filtering())
    | beam.ParDo(PairEmployees())
    | beam.GroupByKey()
    | beam.ParDo(Counting())
    | beam.io.WriteToText('data/output_new_pardo')
)

p1.run()

!{('head -n 20 data/output_new_pardo-00000-of-00001')}



## Advanced Combiner of Beam

In [None]:
import apache_beam as beam

class AverageFn(beam.CombineFn):
  def create_accumulator(self):
    return(0.0, 0) # initalize (sum, count)
  
  def add_input(self, sum_count, input):
    (sum, count) = sum_count
    return sum + input, count+1
  
  def merge_accumulators(self, accumulators):
    ind_sums, ind_counts = zip(*accumulators)
    return sum(ind_sums), sum(ind_counts)

  def extract_output(self, sum_count):
    (sum,count) = sum_count
    return sum / count if count else float('NaN')

accum = beam.Pipeline()

small_sum = (
    accum
      | beam.Create([15,5,7,7,9,23,13,5])
      | beam.CombineGlobally(AverageFn())
      | 'Write Results' >> beam.io.WriteToText('data/combine')
)

accum.run()

!{('head -n 20 data/combine-00000-of-00001')}






10.5


## Create Composite Transforms

In [None]:
import apache_beam as beam

class MyTransform(beam.PTransform):
  def expand(self, input_col):
    a = (
        input_col
         | 'Write results for account'       >> beam.CombinePerKey(sum)
         | 'Count Filter for Accounts'       >> beam.Filter(filter_on_count)
         | 'Regular accounts employee'       >> beam.Map(format_output)
    )
    return a

def SplitRow(element):
  return element.split(',')

def filter_on_count(element):
  name, count = element
  if count > 30:
    return element

def format_output(element):
  name, count = element
  return ', '.join((name,str(count),'Regular Employee'))

branch = beam.Pipeline()

input_collection = (
  branch
    | 'Read from txt file' >> beam.io.ReadFromText('dept_data.txt')
    | 'Split rows'         >> beam.Map(SplitRow)
)

accounts_count = (
    input_collection
    | 'Get all accounts dept persons'   >> beam.Filter(lambda record: record[3] == 'Accounts')
    | 'Pair ea account employee with 1' >> beam.Map(lambda record: ('Accounts, ' +record[1], 1))
    | 'composite accounts' >> MyTransform()
    | 'Write results to file'           >> beam.io.WriteToText('data/Account')
)

hr_count = (
    input_collection
    | 'Get all HR dept persons'      >> beam.Filter(lambda record: record[3] == 'HR')
    | 'Pair each HR employee with 1' >> beam.Map(lambda record: ('HR, ' +record[1], 1))
    | 'composite accounts 2' >> MyTransform()
    | 'Write reuslts to file'        >> beam.io.WriteToText('data/HR')
)

# Flatten Transform
output = (
    (accounts_count,hr_count)
    | beam.Flatten()
    | beam.io.WriteToText('data/both')
)

branch.run()

!{('head -n 20 data/Account-00000-of-00001')}
!{('head -n 20 data/HR-00000-of-00001')}



Accounts, Marco, 31, Regular Employee
Accounts, Rebekah, 31, Regular Employee
Accounts, Itoe, 31, Regular Employee
Accounts, Edouard, 31, Regular Employee
Accounts, Kyle, 62, Regular Employee
Accounts, Kumiko, 31, Regular Employee
Accounts, Gaston, 31, Regular Employee
HR, Beryl, 62, Regular Employee
HR, Olga, 31, Regular Employee
HR, Leslie, 31, Regular Employee
HR, Mindy, 31, Regular Employee
HR, Vicky, 31, Regular Employee
HR, Richard, 31, Regular Employee
HR, Kirk, 31, Regular Employee
HR, Kaori, 31, Regular Employee
HR, Oscar, 31, Regular Employee


## CoGroupBy for Joins


In [None]:
import apache_beam as beam

def retTuple(element):
  thisTuple = element.split(',')
  return (thisTuple[0],thisTuple[1:])

pipe = beam.Pipeline()

dep_rows = (
    pipe
    | 'Reading' >> beam.io.ReadFromText('dept_data.txt')
    | 'Pairing' >> beam.Map(retTuple)
)

loc_rows = (
    pipe
    | 'Reading 2' >> beam.io.ReadFromText('location.txt')
    | 'Pairing 2' >> beam.Map(retTuple)
)

results = ({'dep_data': dep_rows, 'loc_data': loc_rows}
           | beam.CoGroupByKey()
           | 'Write Results' >> beam.io.WriteToText('data/results-text')
)

pipe.run()

!{('head -n 20 data/results-text-00000-of-00001')}



('149633CM', {'dep_data': [['Marco', '10', 'Accounts', '1-01-2019'], ['Marco', '10', 'Accounts', '2-01-2019'], ['Marco', '10', 'Accounts', '3-01-2019'], ['Marco', '10', 'Accounts', '4-01-2019'], ['Marco', '10', 'Accounts', '5-01-2019'], ['Marco', '10', 'Accounts', '6-01-2019'], ['Marco', '10', 'Accounts', '7-01-2019'], ['Marco', '10', 'Accounts', '8-01-2019'], ['Marco', '10', 'Accounts', '9-01-2019'], ['Marco', '10', 'Accounts', '10-01-2019'], ['Marco', '10', 'Accounts', '11-01-2019'], ['Marco', '10', 'Accounts', '12-01-2019'], ['Marco', '10', 'Accounts', '13-01-2019'], ['Marco', '10', 'Accounts', '14-01-2019'], ['Marco', '10', 'Accounts', '15-01-2019'], ['Marco', '10', 'Accounts', '16-01-2019'], ['Marco', '10', 'Accounts', '17-01-2019'], ['Marco', '10', 'Accounts', '18-01-2019'], ['Marco', '10', 'Accounts', '19-01-2019'], ['Marco', '10', 'Accounts', '20-01-2019'], ['Marco', '10', 'Accounts', '21-01-2019'], ['Marco', '10', 'Accounts', '22-01-2019'], ['Marco', '10', 'Accounts', '23-01-2

## Side Inputs

In [None]:
import apache_beam as beam

side_list=list()
with open ('exclude_ids.txt', 'r') as my_file:
  for line in my_file:
    side_list.append(line.rstrip())

pipe = beam.Pipeline()

class FilterUsingLength(beam.DoFn):
  def process(self, element, side_list, lower_bound, upper_bound=float('inf')):
    id = element.split(',')[0]
    name = element.split(',')[1]
    id=id.encode('utf-8')
    element_list=element.split(',')
    if (lower_bound <= len(name) <= upper_bound) and id not in side_list:
      return [element_list]

small_names = (
    pipe
    | 'Read from file' >> beam.io.ReadFromText('dept_data.txt')
    | 'ParDo with wide inputs' >> beam.ParDo(FilterUsingLength(), side_list, 3,10)
    | 'Get Accounts' >> beam.Filter(lambda record: record[3] == 'Accounts')
    | 'Map ID , User Name, pairs' >> beam.Map(lambda record: (record[0] + ' ' + record[1],1))
    | 'Combine' >> beam.CombinePerKey(sum)
    | beam.io.WriteToText('data/output-side-inputs')
)

pipe.run()

!{('head -n 20 data/output-side-inputs-00000-of-00001')}



('149633CM Marco', 31)
('212539MU Rebekah', 31)
('231555ZZ Itoe', 31)
('503996WI Edouard', 31)
('704275DC Kyle', 31)
('957149WC Kyle', 31)
('241316NX Kumiko', 31)
('796656IE Gaston', 31)
('718737IX Ayumi', 30)


## Additional PCollection Outputs

In [None]:
import apache_beam as beam 

class ProcessWords(beam.DoFn):
  def process(self, element, cutoff_length, marker):
    name = element.split(',')[1]

    if len(name) <= cutoff_length:
      return [beam.pvalue.TaggedOutput('Short_Names', name)]
    
    else:
      return [beam.pvalue.TaggedOutput('Long_Names', name)]
    
    if name.startswith(maker):
      return name

p = beam.Pipeline()

results = (
    p
    | beam.io.ReadFromText('dept_data.txt')
    | beam.ParDo(ProcessWords(), cutoff_length=4, marker='A').with_outputs('Short_Names', 'Long_Names', main='Names_A')
)

short_collection = results.Short_Names
long_collection = results.Long_Names
startWithA = results.Names_A

# Write to File
short_collection | 'Write Shorts' >> beam.io.WriteToText('short')
long_collection  | 'writ Long' >> beam.io.WriteToText('long')
startWithA | 'Write Start with A' >> beam.io.WriteToText('start_a')

p.run()

!{('head -n 5 short-00000-of-00001')}
!{('head -n 5 long-00000-of-00001')}
!{('head -n 5 start_a-00000-of-00001')}



Itoe
Kyle
Kyle
Olga
Kirk
Marco
Rebekah
Edouard
Kumiko
Gaston


## Case Studies of Beam in Production
First Case : Credit Card Skippers


-- !cat data/card-skipper-00000-of-00001

In [None]:
import apache_beam as beam

p = beam.Pipeline()

def calculate_points(element):
  customer_id, first_name, last_name, relationship_id, card_type, max_limit, spent, cash_withdrawn, payment_cleared, payment_date = element.split(',')

  spent = int(spent)
  payment_cleared = int(payment_cleared)
  max_limit = int(max_limit)

  key_name = customer_id + ', ' + first_name + ' ' + last_name # key_name: CT2345, Lucas Laibly
  defaulter_points = 0

  if payment_cleared < (spent * 0.7):
    defaulter_points += 1
  
  if (spent == max_limit) and (payment_cleared < spent):
    defaulter_points += 1
  
  if (spent == max_limit) and (payment_cleared < (spent*0.7)):
    defaulter_points += 1
  
  return key_name, defaulter_points

def format_result(sum_pair):
  key_name, points = sum_pair
  return str(key_name) + ', ' + str(points) + ' fraud_points'

card_defaulter = (
    p
    | beam.io.ReadFromText('cards.txt', skip_header_lines=1)
    | beam.Map(calculate_points)
    | beam.CombinePerKey(sum)
    | beam.Filter(lambda element: element[1] > 0)
    | beam.Map(format_result)
    | beam.io.WriteToText('data/card-skipper')
)

p.run()

!{('head -n 10 {}-00000-of-*'.format('data/card_skipper'))}




/bin/bash: -c: line 0: syntax error near unexpected token `'data/card_skipper''
/bin/bash: -c: line 0: `{('head -n 10 {}-00000-of-*'.format('data/card_skipper'))}'


Second Case : Loan Defaulter

In [None]:
import apache_beam as beam

# for datetime manipulation
from datetime import datetime

p = beam.Pipeline()

def calculate_points(element):

  customer_id, first_name, last_name, realtionship_id, card_type, max_limit, spent, cash_withdrawn,payment_cleared,payment_date = element.split(',')
  #[CT28383,Miyako,Burns,R_7488,Issuers,500,490,38,101,30-01-2018]
  
  spent = int(spent)    # spent = 490
  payment_cleared = int(payment_cleared)   #payment_cleared = 101
  max_limit = int(max_limit)               # max_limit = 500
  
  key_name = customer_id + ', ' + first_name + ' ' + last_name     # key_name = CT28383,Miyako Burns
  defaulter_points = 0
  
  # payment_cleared is less than 70% of spent - give 1 point
  if payment_cleared < (spent * 0.7): 
     defaulter_points += 1                                                # defaulter_points =  1 
 
  # spend is = 100% of max limit and any amount of payment is pending
  if (spent == max_limit) and (payment_cleared < spent): 
     defaulter_points += 1                                                # defaulter_points =  2
   
  if (spent == max_limit) and (payment_cleared < (spent*0.7)): 
     defaulter_points += 1                                                # defaulter_points = 3
                                  
  return key_name, defaulter_points                                     # {CT28383,Miyako Burns  3}

def format_result(sum_pair):
  key_name, points = sum_pair
  return str(key_name) + ', ' + str(points) + ' fraud_points'  

def calculate_late_payment(elements):               # [CT88330,Humberto,Banks,Serviceman,LN_1559,Medical Loan,26-01-2018,2000,30-01-2018]
  
  due_date = datetime.strptime(elements[6].rstrip().lstrip(), '%d-%m-%Y')           # due_date = 26-01-2018
  payment_date = datetime.strptime(elements[8].rstrip().lstrip(), '%d-%m-%Y')       # payment_date = 30-01-2018
  
  if payment_date <= due_date:
    elements.append('0') 
  else:
    elements.append('1')                           # [CT88330,Humberto,Banks,Serviceman,LN_1559,Medical Loan,26-01-2018,2000,30-01-2018,1]
    
  return elements

def format_output(sum_pair):
  key_name, miss_months = sum_pair
  return str(key_name) + ', ' + str(miss_months) + ' missed'

def calculate_month(input_list):        #input  [CT88330,Humberto,Banks,Serviceman,LN_1559,Medical Loan,26-01-2018, 2000, 30-01-2018]
                                       
  # Convert payment_date to datetime and extract month of payment
  payment_date = datetime.strptime(input_list[8].rstrip().lstrip(), '%d-%m-%Y')  # payment_date = 30-01-2018
  input_list.append(str(payment_date.month))                                     # [CT88330,Humberto,Banks,Serviceman,LN_1559,Medical Loan,26-01-2018, 2000, 30-01-2018, 01]
  
  return input_list 

def calculate_personal_loan_defaulter(input):       #input key -> CT68554,Ronald Chiki   value --> [01,05,06,07,08,09,10,11,12]
    max_allowed_missed_months = 4
    max_allowed_consecutive_missing = 2
    
    name, months_list = input                                   # [CT68554,Ronald,Chiki,Serviceman,LN_8460,Personal Loan,25-01-2018,50000,25-01-2018]
      
    months_list.sort()
    sorted_months = months_list                                 # sorted_months = [01,05,06,07,08,09,10,11,12]
    total_payments = len(sorted_months)                         # total_payments = 10
    
    missed_payments = 12 - total_payments                       # missed_payments = 2

    if missed_payments > max_allowed_missed_months:             # false
       return name, missed_payments                             #  N/A
    
    consecutive_missed_months = 0

    temp = sorted_months[0] - 1                                 # temp = 0
    if temp > consecutive_missed_months:                        # false
        consecutive_missed_months = temp                        #NA

    temp = 12 - sorted_months[total_payments-1]                  
    if temp > consecutive_missed_months:
        consecutive_missed_months = temp                        # temp = 0

    for i in range(1, len(sorted_months)):                      # [01,05,06,07,08,09,10,11,12]
        temp = sorted_months[i] - sorted_months[i-1] -1         # temp = 5-1-1 = 3
        if temp > consecutive_missed_months:
            consecutive_missed_months = temp                    # consecutive_missed_months = 3
    
    if consecutive_missed_months > max_allowed_consecutive_missing:
       return name, consecutive_missed_months                   # CT68554,Ronald Chiki   3
    
    return name, 0 

def return_tuple(element):
  thisTuple=element.split(',')
  return (thisTuple[0],thisTuple[1:])    

card_defaulter = (
                  p
                  | 'Read credit card data' >> beam.io.ReadFromText('cards.txt',skip_header_lines=1)
                  | 'Calculate defaulter points' >> beam.Map(calculate_points)                            
                  | 'Combine points for defaulters' >> beam.CombinePerKey(sum)                            # key--> CT28383,Miyako Burns   value --> 6 
                  | 'Filter card defaulters' >> beam.Filter(lambda element: element[1] > 0)
                  | 'Format output' >> beam.Map(format_result)                                            # CT28383,Miyako Burns,6 fraud_points
                 # | 'Write credit card data' >> beam.io.WriteToText('outputs/card_skippers') 
                  | 'tuple ' >> beam.Map(return_tuple)  
                  )		

medical_loan_defaulter = (
                            p
                            |  beam.io.ReadFromText('loan.txt',skip_header_lines=1)   # 1stRow--> CT88330,Humberto,Banks,Serviceman,LN_1559,Medical Loan,26-01-2018, 2000, 30-01-2018
                            | 'Split Row' >> beam.Map(lambda row : row.split(','))
                            | 'Filter medical loan' >> beam.Filter(lambda element : (element[5]).rstrip().lstrip() == 'Medical Loan')
                            | 'Calculate late payment' >> beam.Map(calculate_late_payment)
                            | 'Make key value pairs' >> beam.Map(lambda elements: (elements[0] + ', ' + elements[1]+' '+elements[2], int(elements[9])) ) 
                            | 'Group medical loan based on month' >> beam.CombinePerKey(sum)                       # key--> (CT88330,Humberto Banks)  value --> 7
                            | 'Check for medical loan defaulter' >> beam.Filter(lambda element: element[1] >= 3)
                            | 'Format medical loan output' >> beam.Map(format_output)      # CT88330,Humberto Banks,7 missed
                         )     

personal_loan_defaulter = (
                            p
                            | 'Read' >> beam.io.ReadFromText('loan.txt',skip_header_lines=1)   
                            | 'Split' >> beam.Map(lambda row : row.split(','))
                            | 'Filter personal loan' >> beam.Filter(lambda element : (element[5]).rstrip().lstrip() == 'Personal Loan')
                            | 'Split and Append New Month Column' >> beam.Map(calculate_month)   
                            | 'Make key value pairs loan' >> beam.Map(lambda elements: (elements[0] + ', ' + elements[1]+' '+elements[2], int(elements[9])) ) 
                            | 'Group personal loan based on month' >> beam.GroupByKey()                                  # CT68554,Ronald Chiki [01,05,06,07,08,09,10,11,12]
                            | 'Check for personal loan defaulter' >> beam.Map(calculate_personal_loan_defaulter)          # CT68554,Ronald Chiki   3
                            | 'Filter only personal loan defaulters' >> beam.Filter(lambda element: element[1] > 0)
                            | 'Format personal loan output' >> beam.Map(format_output)        # CT68554,Ronald Chiki,3 missed
                          )   
                          
final_loan_defaulters = (
                          ( personal_loan_defaulter, medical_loan_defaulter )
                          | 'Combine all defaulters' >> beam.Flatten()
                          #| 'Write all defaulters to text file' >> beam.io.WriteToText('outputs/loan_defaulters')
                          | 'tuple for loan' >> beam.Map(return_tuple)
                        )  

 # Group Both card_defaulter and loan_defaulter                       
both_defaulters =  (
                    {'card_defaulter': card_defaulter, 'loan_defaulter': final_loan_defaulters}
                    | beam.CoGroupByKey()
                    |'Write p3 results' >> beam.io.WriteToText('outputs/both')
                   )            

				  		  
p.run()	



<apache_beam.runners.portability.fn_api_runner.fn_runner.RunnerResult at 0x7f5f8909d4d0>

In [None]:
!{('head -n 10 outputs/both-00000-of-00001')}

('CT68554', {'card_defaulter': [], 'loan_defaulter': [[' Ronald Chiki', ' 3 missed']]})
('CT56276', {'card_defaulter': [], 'loan_defaulter': [[' Fay Carr', ' 10 missed']]})
('CT30950', {'card_defaulter': [], 'loan_defaulter': [[' Arlene Calderon', ' 10 missed']]})
('CT27126', {'card_defaulter': [], 'loan_defaulter': [[' Nicole Acevedo', ' 6 missed']]})
('CT29233', {'card_defaulter': [], 'loan_defaulter': [[' Wilma Abbott', ' 5 missed']]})
('CT74474', {'card_defaulter': [[' Nanaho Brennan', ' 3 fraud_points']], 'loan_defaulter': [[' Nanaho Brennan', ' 5 missed']]})
('CT27486', {'card_defaulter': [], 'loan_defaulter': [[' Virginie Ashley', ' 7 missed']]})
('CT37576', {'card_defaulter': [], 'loan_defaulter': [[' Stan Alvarado', ' 3 missed']]})
('CT85320', {'card_defaulter': [], 'loan_defaulter': [[' Bill Briggs', ' 3 missed']]})
('CT54266', {'card_defaulter': [], 'loan_defaulter': [[' Charley Cannon', ' 10 missed']]})


## Coder class in Beam

goto : https://github.com/apache/beam/tree/master/sdks/python/apache_beam/coders

### example:

class StrUtf8Coder(Coder):

  def encode(self, value):
    return value.encode('utf-8')
  
  def decode(self, value):
    return value.decode('utf-8')

  def to_type_hint(self):
    return unicode
  
  Coder.register_structured_urn(common_urns.coders.STRING_UTF8.urn, StrUtf8Coder)

## Beam Type Hints
Inline: Provided during pipeline constrcution (on Transforms)


Outline: Provided as properties of the DoFn using decorators


### Categories

Simple : Includes primitive types (int, str, user defined classes)

Parametermized : Includes nested types, basically for container Python objects (list, tuple)

Special : Includes special types (Any, Union[], Optional[T])

In [None]:
import apache_beam as beam

p = beam.Pipeline()

evens = (
    p
    | beam.Create(['one', 'two', 'three'])
    | beam.Filter(lambda x: x % 2 == 0).with_input_types(int) ## inline type hint
)

p.run()

In [None]:
import apache_beam as beam

p = beam.Pipeline()

@beam.typehints.with_output_types(int)
@beam.typehints.with_input_types(int) ## type hint as decorator
class FilterEvensDoFn(int):
  def process(self, element):
    if element % 2 == 0:
      yield element

evens = (
    p
    | beam.Create(['1', '2', '3'])
    | beam.ParDo(FilterEvensDoFn)
)

p.run()

In [None]:
import apache_beam as beam 
import typing

p = beam.Pipeline()

class Employee(object):
  def __init__(self, id, name):
    self.id = id
    self.name = name

class EmployeeCoder(beam.coders.Coder):
  def encode(self, employee):
    return ('%s:%s' % (employee.id, employee.name)).encode('utf-8')
  
  def decode(self, s):
    return Employee(*s.decode('utf-8').split(':'))
  
  def is_deterministic(self):
    return True
  
beam.coders.registry.register_coder(Employee, EmployeeCoder)

def split_file(input):
  name, id, salary = input.split(', ')
  return Employee(id, name), int(salary)
  
result = (
    p
    | beam.io.ReadFromText('data.txt')
    | beam.Map(split_file)
    | beam.CombinePerKey(sum).with_input_types(typing.Tuple[Employee, int])
)

p.run()