# Set up notebook

In [1]:
!pip install apache_beam[dataframe] -q
!pip install apache-beam[gcp] -q
!pip install apache-beam[interactive] -q
# Run and print a shell command.
def run(cmd):
  print('>> {}'.format(cmd))
  !{cmd}
  print('')

run('pip install --upgrade pip')

# Install apache-beam.
run('pip install --quiet apache-beam')

[0m>> pip install --upgrade pip
Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
[0m
>> pip install --quiet apache-beam
[0m


In [2]:
from __future__ import absolute_import
import argparse
import logging
import apache_beam as beam
from apache_beam.io import ReadFromText
from apache_beam.io import WriteToText
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions
from apache_beam.dataframe import convert
import pandas as pd
import csv
from apache_beam.dataframe import convert
from apache_beam.dataframe.io import read_csv
from apache_beam.dataframe.convert import to_dataframe
from apache_beam.dataframe.convert import to_pcollection
import re
from apache_beam.runners.interactive.interactive_runner import InteractiveRunner
import apache_beam as beam
from apache_beam.dataframe.convert import to_dataframe
import apache_beam.io.parquetio
import apache_beam.runners.interactive.interactive_beam as ib

# Inspect the current file

In [3]:
marketing = pd.read_csv('/content/sample_data/marketing_format.csv', delimiter=';')

In [4]:
marketing.head(10)

Unnamed: 0,user_id,name,gender,age,address,date_joined
0,1,Anthony Wolf,male,73,"New Rachelburgh,VA,49583",2019-03-13
1,2,James Armstrong,male,56,"North Jillianfort,UT,86454",2020-11-06
2,3,Cody Shaw,male,75,"North Anne,SC,53799",2004-05-29
3,4,Sierra Hamilton,female,76,"New Angelafurt,ME,46190",2005-08-26
4,5,Chase Davis,male,31,"South Bethmouth,WI,18562",2018-04-30
5,6,Sierra Andrews,female,21,"Ryanville,MI,69690",2007-05-25
6,7,Ann Stone,female,41,"Smithmouth,SD,17340",2005-01-05
7,8,Karen Santos,female,34,"Mariaville,AK,29888",2003-12-12
8,9,Ronald Meyer,male,41,"North Cherylhaven,NJ,04197",2015-11-14
9,10,Steven Rivera,male,43,"Wayneside,VT,29040",2003-05-15


In [5]:
gender = marketing['gender']
gender.value_counts()

male      1207
female    1150
Name: gender, dtype: int64

#Task 2 : DataFrames vs Beam specific operations 

## Helper function

In [6]:
def myprint(x):
  print('{}'.format(x))

What is the gender composition of the customers

In [7]:
class SplitRecords(beam.DoFn):
    """Spilt the element into records, return rideable_type record."""
    def process(self, element):
        records = element.split(";")
        yield records[2]


with beam.Pipeline() as pipeline:

  (pipeline 
      | 'Read lines' >> beam.io.ReadFromText('/content/sample_data/marketing_format.csv', skip_header_lines=1)
      | 'Transform' >> beam.ParDo(SplitRecords())
      | 'map 1 to each element in order to count' >> beam.Map(lambda x: (x, 1)) 
      | 'perform a count' >> beam.CombinePerKey(sum)
      | "print" >> beam.Map(myprint)
  )


('male', 1207)
('female', 1150)


In [8]:
#create pipeline
pipeline = beam.Pipeline(InteractiveRunner())

# Create a deferred Beam DataFrame with the contents of our csv file.
beam_df = pipeline | 'Read CSV' >> beam.dataframe.io.read_csv('/content/sample_data/marketing_format.csv', sep=';')

# Collect the Beam DataFrame into a Pandas DataFrame.
df = ib.collect(beam_df)

#Group the values by gender
df['gender'].value_counts() #Normalzing provide you with percentages in lieu of values

male      1207
female    1150
Name: gender, dtype: int64

What was the number of customers that joined each day

In [9]:
class SplitRecords(beam.DoFn):
    """Spilt the element into records, return rideable_type record."""
    def process(self, element):
        records = element.split(";")
        yield (records[5])


with beam.Pipeline() as pipeline:

  (pipeline 
      | 'Read lines' >> beam.io.ReadFromText('/content/sample_data/marketing_format.csv', skip_header_lines=1)
      | 'Transform' >> beam.ParDo(SplitRecords())
      | 'map 1 to each element in order to count' >> beam.Map(lambda x: (x, 1)) 
      | 'perform a count' >> beam.CombinePerKey(sum)
      | "print" >> beam.Map(myprint)
  )


('2019-03-13', 1)
('2020-11-06', 1)
('2004-05-29', 2)
('2005-08-26', 1)
('2018-04-30', 1)
('2007-05-25', 1)
('2005-01-05', 2)
('2003-12-12', 2)
('2015-11-14', 1)
('2003-05-15', 1)
('2003-10-15', 2)
('2013-09-27', 2)
('2002-03-13', 1)
('2020-12-26', 3)
('2015-11-13', 1)
('2017-07-12', 1)
('2005-02-23', 2)
('2008-08-09', 1)
('2000-08-09', 1)
('2014-05-18', 2)
('2002-02-22', 1)
('2006-04-11', 1)
('2003-03-09', 1)
('2019-03-07', 2)
('2010-11-14', 1)
('2006-12-26', 1)
('2000-06-20', 2)
('2016-02-03', 2)
('2004-07-30', 2)
('2003-05-06', 1)
('2000-09-10', 1)
('2016-04-10', 1)
('2007-10-10', 1)
('2019-12-22', 1)
('2016-02-25', 1)
('2004-06-21', 2)
('2011-11-26', 1)
('2020-05-05', 1)
('2007-06-07', 1)
('2016-07-27', 2)
('2005-08-01', 1)
('2014-11-15', 2)
('2006-02-16', 2)
('2018-11-06', 1)
('2004-07-12', 1)
('2012-03-22', 1)
('2016-03-12', 2)
('2009-08-31', 1)
('2017-01-09', 1)
('2007-12-30', 1)
('2000-01-17', 1)
('2008-11-17', 2)
('2001-06-09', 1)
('2017-12-22', 2)
('2007-07-27', 1)
('2000-12-

In [10]:
#create pipeline
pipeline = beam.Pipeline(InteractiveRunner())

# Create a deferred Beam DataFrame with the contents of our csv file.
beam_df = pipeline | 'Read CSV' >> beam.dataframe.io.read_csv('/content/sample_data/marketing_format.csv', sep=';')

# Collect the Beam DataFrame into a Pandas DataFrame.
df = ib.collect(beam_df)

#Sort values 
df = df.sort_values(by='date_joined')

# We can now use any Pandas transforms with our data.
df['date_joined'].value_counts()

2018-10-17    5
2011-05-13    4
2010-05-14    3
2005-03-21    3
2015-01-26    3
             ..
2007-05-22    1
2007-05-21    1
2007-05-06    1
2007-04-28    1
2021-09-30    1
Name: date_joined, Length: 2040, dtype: int64

What is the number of customers in each state

In [11]:
class SplitRecords(beam.DoFn):
    """Spilt the element into records, return rideable_type record."""
    def process(self, element):
        records = element.split(";")
        yield records[4]


class SplitWords(beam.DoFn):
    #This function further split the nested record into words so that the state can be isolated
    """Spilt the record into words, return rideable_type word."""
    def process(self, word):
        words = word.split(",")
        yield words[1]


with beam.Pipeline() as pipeline:

  (pipeline 
      | 'Read lines' >> beam.io.ReadFromText('/content/sample_data/marketing_format.csv', skip_header_lines=1)
      | 'Split records' >> beam.ParDo(SplitRecords())
      | 'Split words' >> beam.ParDo(SplitWords())
      | 'Map the value 1 to each state' >> beam.Map(lambda x: (x, 1)) 
      | 'perform a count' >> beam.CombinePerKey(sum)
      | "print" >> beam.Map(myprint)
  )

('VA', 44)
('UT', 50)
('SC', 50)
('ME', 43)
('WI', 56)
('MI', 56)
('SD', 48)
('AK', 52)
('NJ', 58)
('VT', 54)
('NY', 45)
('AZ', 50)
('KY', 43)
('MT', 49)
('CT', 63)
('ID', 51)
('GA', 53)
('OR', 39)
('AR', 53)
('DC', 50)
('NC', 54)
('MA', 45)
('OH', 28)
('ND', 46)
('NM', 42)
('HI', 51)
('CA', 49)
('CO', 48)
('NH', 39)
('DE', 48)
('WY', 50)
('WA', 42)
('OK', 47)
('IN', 45)
('AL', 55)
('NV', 40)
('KS', 49)
('WV', 40)
('TX', 48)
('RI', 35)
('IL', 40)
('MO', 42)
('MN', 41)
('FL', 43)
('NE', 43)
('MS', 36)
('TN', 44)
('MD', 41)
('IA', 53)
('PA', 35)
('LA', 31)


In [12]:
#create pipeline
pipeline = beam.Pipeline(InteractiveRunner())

# Create a deferred Beam DataFrame with the contents of our csv file.
beam_df = pipeline | 'Read CSV' >> beam.dataframe.io.read_csv('/content/sample_data/marketing_format.csv', sep=';')

# Collect the Beam DataFrame into a Pandas DataFrame.
df = ib.collect(beam_df)

#Split column 
df_split = df['address'].str.split(",", n = 2, expand = True)

# # We can now use any Pandas transforms with our data.
df_split.iloc[:, 1].value_counts().sort_values()

OH    28
LA    31
RI    35
PA    35
MS    36
NH    39
OR    39
NV    40
WV    40
IL    40
MD    41
MN    41
WA    42
NM    42
MO    42
NE    43
KY    43
ME    43
FL    43
VA    44
TN    44
IN    45
NY    45
MA    45
ND    46
OK    47
CO    48
TX    48
SD    48
DE    48
KS    49
MT    49
CA    49
SC    50
UT    50
AZ    50
DC    50
WY    50
HI    51
ID    51
AK    52
GA    53
AR    53
IA    53
VT    54
NC    54
AL    55
MI    56
WI    56
NJ    58
CT    63
Name: 1, dtype: int64