# Apache Beam Programming Model

Below operations are performed using Apache beam setup:

* Map, filter and core transforms
* ParDo transform
* Composite transform

The data set contains data of customers who purchased fruits. Perform operations on these customers to group buyers who bought specific fruit item.

# Install Apache beam package

In [None]:
# Run and print a shell command.
def run(cmd):
  print('>> {}'.format(cmd))
  !{cmd}
  print('')

# Install apache-beam.
run('pip install --quiet apache-beam')


>> pip install --quiet apache-beam



* Read the input data of customers
* Calculate number of customers who bought Grapes 🍇 .
* Write all the details to an output text file.

In [None]:
import apache_beam as beam

p1 = beam.Pipeline()

visit_count = (
    p1
    |beam.io.ReadFromText('/content/Buying_Fruits_List.txt')
    |beam.Map(lambda x: x.split(','))
    |beam.Filter(lambda x:x[2]=='Grapes')
    |beam.Map(lambda x:(x[1], 1))
    |beam.CombinePerKey(sum)


    |beam.io.WriteToText('Output.txt1')
)

p1.run()

<apache_beam.runners.portability.fn_api_runner.fn_runner.RunnerResult at 0x7f2df9ae8750>

* Calculate the number of persons who bought Apples 🍎  and Grapes 🍇

In [None]:
import apache_beam as beam


p2 = beam.Pipeline()


input_collection = (

    p2
    | "Read input data" >> beam.io.ReadFromText('/content/Buying_Fruits_List.txt')
    | "Split rows into columns" >> beam.Map(lambda record: record.split(','))
                  )

grapes_buyers_count = (
    input_collection
    | 'Filter Grapes buyers' >> beam.Filter(lambda record: record[2] == 'Grapes')
    | 'Pair each grapes buyer with 1' >> beam.Map(lambda record: ("grapes, " +record[1], 1))
    | 'Aggregate all Grapes buyers' >> beam.CombinePerKey(sum)
    )

apples_buyers_count = (
    input_collection
    | 'Filter Apples buyers' >> beam.Filter(lambda record: record[2] == 'Apples')
    | 'Pair each buyer with 1' >> beam.Map(lambda record: ("apples, " +record[1], 1))
    | 'Aggregate all Apples buyers' >> beam.CombinePerKey(sum)
    )

output =(
        (grapes_buyers_count,grapes_buyers_count)
  | beam.Flatten()
  | beam.io.WriteToText('data/both')
)

p2.run()



<apache_beam.runners.portability.fn_api_runner.fn_runner.RunnerResult at 0x7f2df9b5ff50>

# ParDo Transform

* Calculate the number of people who bought strawberries 🍓  using ParDo transform.

In [None]:
import apache_beam as beam

class SplitRow(beam.DoFn):
 
  def process(self, element):
    # return type -> list
    return  [element.split(',')]
 

class FilterStrawberriesBuyers(beam.DoFn):
 
  def process(self, element):
    if element[2] == 'Strawberries':
      return [element]  
    
class PairBuyers(beam.DoFn):
 
  def process(self, element):
    return [(element[1], 1)]
 
class Counting(beam.DoFn):
 
  def process(self, element):
    # return type -> list
    (key, values) = element         
    return [(key, sum(values))]
     

p3 = beam.Pipeline()

strawberries_count = (
    
   p3
    |beam.io.ReadFromText('/content/Buying_Fruits_List.txt')
    
    |beam.ParDo(SplitRow())
    |beam.ParDo(FilterStrawberriesBuyers())
    |beam.ParDo(PairBuyers())
    | 'Group ' >> beam.GroupByKey()
    | 'Sum using ParDo' >> beam.ParDo(Counting())  
    
    |beam.io.WriteToText('parddo_output.txt')
 
)

p3.run()

<apache_beam.runners.portability.fn_api_runner.fn_runner.RunnerResult at 0x7f2df9390d90>

# Composite Transform

* Group and calculate all purchasers of every item in the input file( Apples 🍎 , Bananas 🍌 , Grapes 🍇 and Strawberries 🍓).
* Write all the counts to an output file using Composite transform.

In [None]:
import apache_beam as beam

class CustomTransform(beam.PTransform):
  
  def expand(self, input_coll):
    
    a = ( 
        input_coll
                       | 'Group and sum' >> beam.CombinePerKey(sum)
                       | 'Customers and sales' >> beam.Map(format_output)
              
    )
    return a

def SplitRow(element):
    return element.split(',')
  
def format_output(element):
  name, count = element
  return ', '.join((name,str(count)))

p4 = beam.Pipeline()

input_collection = ( 
                      p4
                      | "Read from text file" >> beam.io.ReadFromText('/content/Buying_Fruits_List.txt')
                      | "Split rows" >> beam.Map(SplitRow)
                   )

Grapes_count = (
                      input_collection
                      | 'Filter Grapes buyers' >> beam.Filter(lambda record: record[2] == 'Grapes')
                      | 'Pair each Grapes buyer with 1' >> beam.Map(lambda record: ("Grapes, " +record[1], 1))
                      | 'composite Grapes buyers' >> CustomTransform()
                      | 'Write results for Grapes' >> beam.io.WriteToText('Grapes_output.txt')
                 )

Apples_count = (
                input_collection
                | 'Filter Apples buyers' >> beam.Filter(lambda record: record[2] == 'Apples')
                | 'Pair each Apples buyer with 1' >> beam.Map(lambda record: ("Apples, " +record[1], 1))
                | 'composite Apples buyers' >> CustomTransform()
                | 'Write results for Apples' >> beam.io.WriteToText('Apples_output.txt')
           )

Bananas_count = (
                      input_collection
                      | 'Filter Bananas buyers' >> beam.Filter(lambda record: record[2] == 'Bananas')
                      | 'Pair each bananas buyer with 1' >> beam.Map(lambda record: ("Bananas, " +record[1], 1))
                      | 'composite Bananas buyers' >> CustomTransform()
                      | 'Write results for Bananas' >> beam.io.WriteToText('Bananas_output.txt')
                 )

Strawberries_count = (
                input_collection
                | 'Filter Strawberries buyers' >> beam.Filter(lambda record: record[2] == 'Strawberries')
                | 'Pair each Strawberries buyer with 1' >> beam.Map(lambda record: ("Strawberries, " +record[1], 1))
                | 'composite Strawberries buyers' >> CustomTransform()
                | 'Write results for Strawberries' >> beam.io.WriteToText('Strawberries_output.txt')
                )

p4.run()

<apache_beam.runners.portability.fn_api_runner.fn_runner.RunnerResult at 0x7f2df9a62990>