### ParDo
a Beam transform for generic parallel processing

ParDo is useful for a variety of common data processing operations, including:

+ Filtering a data set. You can use ParDo to consider each element in a PCollection and either output that element to a new collection or discard it.
+ Formatting or type-converting each element in a data set. If your input PCollection contains elements that are of a different type or format than you want, you can use ParDo to perform a conversion on each element and output the result to a new PCollection.
+ Extracting parts of each element in a data set. If you have a PCollection of records with multiple fields, for example, you can use a ParDo to parse out just the fields you want to consider into a new PCollection.
+ Performing computations on each element in a data set. You can use ParDo to perform simple or complex computations on every element, or certain elements, of a PCollection and output the results as a new PCollection.

In [2]:
import apache_beam as beam

class SplitRow(beam.DoFn):
    def process(self, element):
        # return type -> list
        yield  element.split(',')   # return, retorna valores separado, como un FlatMap. Se usa yield o return [] 
    
class FilterAccountsEmployee(beam.DoFn):
    def process(self, element):
        if element[3] == 'Accounts':
            yield element
    
class PairEmployees(beam.DoFn):
    def process(self, element):
        yield (element[3]+","+element[1], 1)

class Counting(beam.DoFn):
    def process(self, element):       
        (key, values) = element  
        yield (key, sum(values))
        
        
with beam.Pipeline() as pipe:

    attendance_count = (
        pipe
        | beam.io.ReadFromText('data/dept_data.txt')

        | beam.ParDo(SplitRow())
       # | 'Compute WordLength' >> beam.ParDo(lambda element: [ element.split(',') ]) 

        | beam.ParDo(FilterAccountsEmployee())
        | beam.ParDo(PairEmployees())
        | 'Group ' >> beam.GroupByKey()  # Generate: ('Accounts,Marco', [1, 1, 1,...  ('Accounts,Rebekah', [1, 1, 1
        | 'Sum using ParDo' >> beam.ParDo(Counting())  

        |beam.io.WriteToText('data/output_new_final')
    )

    # Sample the first 20 results, remember there are no ordering guarantees.
!{('head -n 20 data/output_new_final-00000-of-00001')}



('Accounts,Marco', 31)
('Accounts,Rebekah', 31)
('Accounts,Itoe', 31)
('Accounts,Edouard', 31)
('Accounts,Kyle', 62)
('Accounts,Kumiko', 31)
('Accounts,Gaston', 31)
('Accounts,Ayumi', 30)
