<a href="https://colab.research.google.com/github/Dhanasree-Rajamani/Data-Mining/blob/main/Data%20Mining%20Assignment%203/Question%203%20-%20Apache%20Beam/Apache_beam_ShoppingData.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

### **Apache Beam**

In this notebook, I have set up Apache beam to perform operations such as:
 - Map, filter and core transforms
 - ParDo transform
 - Composite transform
 
The data set consists of shopping data of customers who shopped IT related products. We perform operations on these customers to group customers who bought specific category Items, customers who return to buy the same product and customers who dont.

The output file for each of these operations gets generated in colab locally. It can be viewed from the files pane on the left.

Installing Apache beam

In [None]:
!pip install apache-beam

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


Reading file from github using pandas, and writing it to a new file on colab which is used as input to apache beam 

In [None]:
import pandas as pd

input_file = 'https://raw.githubusercontent.com/Dhanasree-Rajamani/Data-Mining/main/Data%20Mining%20Assignment%203/Datasets/Shopping_Input_Apache.txt'
colab_input_file = '/content/sample_data/Shopping_Input_Apache.txt'
data=pd.read_csv(input_file, sep = ",")
data.to_csv(r'/content/sample_data/Shopping_Input_Apache.txt', index=None, sep=',', mode='a')

Get the input shopping data of customers, calculate number of customers who bought monitor. Write their details into an output file. Core transform- map, filter is used here, which is done on the basis of customerID. 

In [None]:
import apache_beam as beam

pipe1 = beam.Pipeline()

shopping_count = (
    
    pipe1
    |beam.io.ReadFromText(colab_input_file)

    |beam.Map(lambda record: record.split(","))
    |beam.Filter(lambda record: record[2] == 'Monitor')
    |beam.Map(lambda record: (record[1],1))
    |beam.CombinePerKey(sum)

    |beam.io.WriteToText('output_MonitorSales_count.txt')
)

pipe1.run()

<apache_beam.runners.portability.fn_api_runner.fn_runner.RunnerResult at 0x7f91bd5da8d0>

Filter and calculate the number of customers who bought headphones using ParDo transform, adn write result to an output file

In [None]:
import apache_beam as beam

class SplitRow(beam.DoFn):
 
  def process(self, element):
    return  [element.split(',')]
 

class FilterHeadphoneCustomers(beam.DoFn):
 
  def process(self, element):
    if element[2] == 'Headphones':
      return [element]  
    
class PairCustomers(beam.DoFn):
 
  def process(self, element):
    return [(element[1], 1)]
 
class Counting(beam.DoFn):
 
  def process(self, element):
    (key, values) = element         
    return [(key, sum(values))]
     

pipe2 = beam.Pipeline()

sales_count = (
    
   pipe2
    |beam.io.ReadFromText(colab_input_file)
    
    |beam.ParDo(SplitRow())
    |beam.ParDo(FilterHeadphoneCustomers())
    |beam.ParDo(PairCustomers())
    | 'Group ' >> beam.GroupByKey()
    | 'Sum using ParDo' >> beam.ParDo(Counting())  
    
    |beam.io.WriteToText('pardo_output_headphones_sales.txt')
 
)

pipe2.run()

<apache_beam.runners.portability.fn_api_runner.fn_runner.RunnerResult at 0x7f91bd55f7d0>

Using Composite transform to group and calculate all(returning and non-returning) customers of every item in the input file such as chair, monitor, mouse, headphones and keyboard - and write the details to an output file.

In [None]:
import apache_beam as beam

class CustomTransform(beam.PTransform):
  
  def expand(self, input_coll):
    
    a = ( 
        input_coll
                       | 'Group and sum' >> beam.CombinePerKey(sum)
                       | 'Customers and sales' >> beam.Map(format_output)
              
    )
    return a

def SplitRow(element):
    return element.split(',')
  
def format_output(element):
  name, count = element
  return ', '.join((name,str(count)))

pipe3 = beam.Pipeline()

input_collection = ( 
                      pipe3 
                      | "Read from text file" >> beam.io.ReadFromText(colab_input_file)
                      | "Split rows" >> beam.Map(SplitRow)
                   )

monitor_customer_sales_count = (
                      input_collection
                      | 'Get all cusotmer who shopped Monitors' >> beam.Filter(lambda record: record[2] == 'Monitor')
                      | 'Pair each Monitor customer with 1' >> beam.Map(lambda record: ("Monitor, " +record[1], 1))
                      | 'composite Monitor customers' >> CustomTransform()
                      | 'Write results for Monitor' >> beam.io.WriteToText('monitor_output.txt')
                 )

keyboard_customer_sales_count = (
                input_collection
                | 'Get all customers who shopped keyboards' >> beam.Filter(lambda record: record[2] == 'Keyboard')
                | 'Pair each Keyboard customer with 1' >> beam.Map(lambda record: ("Keyboard, " +record[1], 1))
                | 'composite Keyboard customers' >> CustomTransform()
                | 'Write results for Keyboard' >> beam.io.WriteToText('keyboard_output.txt')
           )

headphones_customer_sales_count = (
                      input_collection
                      | 'Get all cusotmer who shopped Headphones' >> beam.Filter(lambda record: record[2] == 'Headphones')
                      | 'Pair each headphone customer with 1' >> beam.Map(lambda record: ("Headphones, " +record[1], 1))
                      | 'composite Headphones customers' >> CustomTransform()
                      | 'Write results for Headphones' >> beam.io.WriteToText('headphones_output.txt')
                 )

mouse_customer_sales_count = (
                input_collection
                | 'Get all customers who shopped mouse' >> beam.Filter(lambda record: record[2] == 'Mouse')
                | 'Pair each mouse customer with 1' >> beam.Map(lambda record: ("Mouse, " +record[1], 1))
                | 'composite Keyboard mouse' >> CustomTransform()
                | 'Write results for mouse' >> beam.io.WriteToText('mouse_output.txt')
                )

chair_customer_sales_count = (
                      input_collection
                      | 'Get all cusotmer who shopped chairs' >> beam.Filter(lambda record: record[2] == 'Chair')
                      | 'Pair each chair customer with 1' >> beam.Map(lambda record: ("Chair, " +record[1], 1))
                      | 'composite chair customers' >> CustomTransform()
                      | 'Write results for chair' >> beam.io.WriteToText('chair_output.txt')
                 )

pipe3.run()

<apache_beam.runners.portability.fn_api_runner.fn_runner.RunnerResult at 0x7f91bf938390>

Using Composite transform to group, filter and identify returning customers of every item in the input file such as chair, monitor, mouse, headphones and keyboard - and write the details to an output file.

In [None]:
import apache_beam as beam

class CustomTransform(beam.PTransform):
  
  def expand(self, input_coll):
    
    a = ( 
        input_coll
                       | 'Group and sum' >> beam.CombinePerKey(sum)
                       | 'count filter accounts' >> beam.Filter(filter_on_count)
                       | 'Returning Customers' >> beam.Map(format_output)
              
    )
    return a

def SplitRow(element):
    return element.split(',')

def filter_on_count(element):
  name, count = element
  if count > 1:
    return element
  
def format_output(element):
  name, count = element
  return ', '.join((name,str(count), 'Returning Customer'))

pipe4 = beam.Pipeline()

input_collection = ( 
                      pipe4
                      | "Read from text file" >> beam.io.ReadFromText(colab_input_file)
                      | "Split rows" >> beam.Map(SplitRow)
                   )

monitor_customer_sales_count = (
                      input_collection
                      | 'Get all cusotmer who shopped Monitors' >> beam.Filter(lambda record: record[2] == 'Monitor')
                      | 'Pair each Monitor customer with 1' >> beam.Map(lambda record: ("Monitor, " +record[1], 1))
                      | 'composite Monitor customers' >> CustomTransform()
                      | 'Write results for Monitor' >> beam.io.WriteToText('regular_cust_monitor_output.txt')
                 )

keyboard_customer_sales_count = (
                input_collection
                | 'Get all customers who shopped keyboards' >> beam.Filter(lambda record: record[2] == 'Keyboard')
                | 'Pair each Keyboard customer with 1' >> beam.Map(lambda record: ("Keyboard, " +record[1], 1))
                | 'composite Keyboard customers' >> CustomTransform()
                | 'Write results for Keyboard' >> beam.io.WriteToText('regular_cust_keyboard_output.txt')
           )

headphones_customer_sales_count = (
                      input_collection
                      | 'Get all cusotmer who shopped Headphones' >> beam.Filter(lambda record: record[2] == 'Headphones')
                      | 'Pair each headphone customer with 1' >> beam.Map(lambda record: ("Headphones, " +record[1], 1))
                      | 'composite Headphones customers' >> CustomTransform()
                      | 'Write results for Headphones' >> beam.io.WriteToText('regular_cust_headphones_output.txt')
                 )

mouse_customer_sales_count = (
                input_collection
                | 'Get all customers who shopped mouse' >> beam.Filter(lambda record: record[2] == 'Mouse')
                | 'Pair each mouse customer with 1' >> beam.Map(lambda record: ("Mouse, " +record[1], 1))
                | 'composite Keyboard mouse' >> CustomTransform()
                | 'Write results for mouse' >> beam.io.WriteToText('regular_cust_mouse_output.txt')
                )

chair_customer_sales_count = (
                      input_collection
                      | 'Get all cusotmer who shopped chairs' >> beam.Filter(lambda record: record[2] == 'Chair')
                      | 'Pair each chair customer with 1' >> beam.Map(lambda record: ("Chair, " +record[1], 1))
                      | 'composite chair customers' >> CustomTransform()
                      | 'Write results for chair' >> beam.io.WriteToText('regular_cust_chair_output.txt')
                 )

pipe4.run()

<apache_beam.runners.portability.fn_api_runner.fn_runner.RunnerResult at 0x7f91bd27b490>