<a href="https://colab.research.google.com/github/Student-1469/day4/blob/main/TUT_4_Apache_Beam.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
!pip install apache-beam[gcp]




In [2]:
# Step 1: Use curl to download the users_v.csv file and save it locally
!curl -o users_v.csv https://storage.googleapis.com/bdt-beam/users_v.csv

# Now proceed with Apache Beam pipeline
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from datetime import datetime

# Define your pipeline options (optional)
pipeline_options = PipelineOptions()

# Function to split each row into a dictionary of fields
def split_row(row):
    fields = row.split(',')
    return {
        'user_id': fields[0],
        'name': fields[1],
        'gender': fields[2],
        'age': fields[3],
        'address': fields[4],
        'date_joined': fields[5]
    }

# Function to reformat the date to YYYY-MM-dd format
def format_date(element):
    element['date_joined'] = datetime.strptime(element['date_joined'].strip(), '%Y/%m/%d').strftime('%Y-%m-%d')
    return element

# Function to reformat the address to "City, State, Zip Code" format
def format_address(element):
    address_parts = element['address'].split('-')
    if len(address_parts) == 3:
        element['address'] = f"{address_parts[0]}, {address_parts[1]}, {address_parts[2]}"
    return element

# Function to format the output line in the required format
def format_output(element):
    return f"{element['user_id']}; {element['name']}; {element['gender']}; {element['age']}; {element['address']}; {element['date_joined']}"

# Define the Beam pipeline
with beam.Pipeline(options=pipeline_options) as p:
    # Read the input CSV file from the local path
    input_pcoll = (
        p
        | 'Read CSV File' >> beam.io.ReadFromText('users_v.csv', skip_header_lines=1)
        | 'Split Rows' >> beam.Map(split_row)  # Split rows into separate elements
        | 'Format Date' >> beam.Map(format_date)  # Change the date format
        | 'Format Address' >> beam.Map(format_address)  # Change the address format
        | 'Format Output' >> beam.Map(format_output)  # Format the output for writing
    )

    # Write the formatted data to a file titled "marketing_format.csv"
    input_pcoll | 'Write to CSV' >> beam.io.WriteToText('marketing_format',
                                                        file_name_suffix='.csv',
                                                        header='user_id; name; gender; age; address; date_joined',
                                                        shard_name_template='')

# Step 2: Download the output file
from google.colab import files

# This will download the file with the exact name marketing_format.csv
files.download('marketing_format.csv')


  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100  140k  100  140k    0     0   139k      0  0:00:01  0:00:01 --:--:--  139k






<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

In [3]:
# Step 1: Download the users_v.csv file using curl
!curl -o users_v.csv https://storage.googleapis.com/bdt-beam/users_v.csv

# Step 2: Import Apache Beam and other necessary libraries
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions

# Define your pipeline options (optional)
pipeline_options = PipelineOptions()

# Function to split each row into a dictionary of fields
def split_row(row):
    fields = row.split(',')
    return {
        'user_id': fields[0],
        'name': fields[1],
        'gender': fields[2],
        'age': fields[3],
        'address': fields[4],
        'date_joined': fields[5]
    }

# Function to extract gender for gender composition
def extract_gender(element):
    return element['gender'], 1  # Output a tuple (gender, 1)

# Define the Beam pipeline
with beam.Pipeline(options=pipeline_options) as p:
    # Read the input CSV file from the local path
    gender_composition = (
        p
        | 'Read CSV File' >> beam.io.ReadFromText('users_v.csv', skip_header_lines=1)
        | 'Split Rows' >> beam.Map(split_row)  # Split rows into separate elements
        | 'Extract Gender' >> beam.Map(extract_gender)  # Extract gender and emit (gender, 1)
        | 'Count Genders' >> beam.CombinePerKey(sum)  # Combine counts by gender
        | 'Print Gender Composition' >> beam.Map(print)  # Print the results to the console
    )


  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100  140k  100  140k    0     0   122k      0  0:00:01  0:00:01 --:--:--  122k




('male', 1207)
('female', 1150)


In [4]:
# Step 1: Download the users_v.csv file using curl
!curl -o users_v.csv https://storage.googleapis.com/bdt-beam/users_v.csv

# Step 2: Import Apache Beam and other necessary libraries
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions

# Define your pipeline options (optional)
pipeline_options = PipelineOptions()

# Function to split each row into a dictionary of fields
def split_row(row):
    fields = row.split(',')
    return {
        'user_id': fields[0],
        'name': fields[1],
        'gender': fields[2],
        'age': fields[3],
        'address': fields[4],
        'date_joined': fields[5]
    }

# Function to extract and normalize the date for accounts created per day
def extract_date_joined(element):
    date_joined = element['date_joined'].strip()  # Remove extra spaces
    return date_joined, 1  # Return a tuple (date_joined, 1)

# Define the Beam pipeline
with beam.Pipeline(options=pipeline_options) as p:
    # Read the input CSV file from the local path
    accounts_per_day = (
        p
        | 'Read CSV File' >> beam.io.ReadFromText('users_v.csv', skip_header_lines=1)
        | 'Split Rows' >> beam.Map(split_row)  # Split rows into separate elements
        | 'Extract Date Joined' >> beam.Map(extract_date_joined)  # Extract and normalize the date_joined field
        | 'Count Accounts per Day' >> beam.CombinePerKey(sum)  # Combine counts by date_joined
        | 'Print Accounts per Day' >> beam.Map(print)  # Print the results to the console
    )


  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100  140k  100  140k    0     0   135k      0  0:00:01  0:00:01 --:--:--  136k




('2019/03/13', 1)
('2020/11/06', 1)
('2004/05/29', 2)
('2005/08/26', 1)
('2018/04/30', 1)
('2007/05/25', 1)
('2005/01/05', 2)
('2003/12/12', 2)
('2015/11/14', 1)
('2003/05/15', 1)
('2003/10/15', 2)
('2013/09/27', 2)
('2002/03/13', 1)
('2020/12/26', 3)
('2015/11/13', 1)
('2017/07/12', 1)
('2005/02/23', 2)
('2008/08/09', 1)
('2000/08/09', 1)
('2014/05/18', 2)
('2002/02/22', 1)
('2006/04/11', 1)
('2003/03/09', 1)
('2019/03/07', 2)
('2010/11/14', 1)
('2006/12/26', 1)
('2000/06/20', 2)
('2016/02/03', 2)
('2004/07/30', 2)
('2003/05/06', 1)
('2000/09/10', 1)
('2016/04/10', 1)
('2007/10/10', 1)
('2019/12/22', 1)
('2016/02/25', 1)
('2004/06/21', 2)
('2011/11/26', 1)
('2020/05/05', 1)
('2007/06/07', 1)
('2016/07/27', 2)
('2005/08/01', 1)
('2014/11/15', 2)
('2006/02/16', 2)
('2018/11/06', 1)
('2004/07/12', 1)
('2012/03/22', 1)
('2016/03/12', 2)
('2009/08/31', 1)
('2017/01/09', 1)
('2007/12/30', 1)
('2000/01/17', 1)
('2008/11/17', 2)
('2001/06/09', 1)
('2017/12/22', 2)
('2007/07/27', 1)
('2000/12/

In [5]:
# Step 1: Download the users_v.csv file using curl
!curl -o users_v.csv https://storage.googleapis.com/bdt-beam/users_v.csv

# Step 2: Import Apache Beam and other necessary libraries
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions

# Define your pipeline options (optional)
pipeline_options = PipelineOptions()

# Function to split each row into a dictionary of fields
def split_row(row):
    fields = row.split(',')
    return {
        'user_id': fields[0],
        'name': fields[1],
        'gender': fields[2],
        'age': fields[3],
        'address': fields[4],
        'date_joined': fields[5]
    }

# Function to extract the state from the address
def extract_state(element):
    # Assuming the address is formatted as 'City-State-ZipCode'
    address_parts = element['address'].split('-')
    state = address_parts[1].strip()  # Extract the state part
    return state, 1  # Return a tuple (state, 1) to count later

# Define the Beam pipeline
with beam.Pipeline(options=pipeline_options) as p:
    # Read the input CSV file from the local path
    customers_per_state = (
        p
        | 'Read CSV File' >> beam.io.ReadFromText('users_v.csv', skip_header_lines=1)
        | 'Split Rows' >> beam.Map(split_row)  # Split rows into separate elements
        | 'Extract State' >> beam.Map(extract_state)  # Extract the state and emit (state, 1)
        | 'Count Customers per State' >> beam.CombinePerKey(sum)  # Combine counts by state
        | 'Print Customers per State' >> beam.Map(print)  # Print the results to the console
    )


  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100  140k  100  140k    0     0   124k      0  0:00:01  0:00:01 --:--:--  124k




('VA', 44)
('UT', 50)
('SC', 50)
('ME', 43)
('WI', 56)
('MI', 56)
('SD', 48)
('AK', 52)
('NJ', 58)
('VT', 54)
('NY', 45)
('AZ', 50)
('KY', 43)
('MT', 49)
('CT', 63)
('ID', 51)
('GA', 53)
('OR', 39)
('AR', 53)
('DC', 50)
('NC', 54)
('MA', 45)
('OH', 28)
('ND', 46)
('NM', 42)
('HI', 51)
('CA', 49)
('CO', 48)
('NH', 39)
('DE', 48)
('WY', 50)
('WA', 42)
('OK', 47)
('IN', 45)
('AL', 55)
('NV', 40)
('KS', 49)
('WV', 40)
('TX', 48)
('RI', 35)
('IL', 40)
('MO', 42)
('MN', 41)
('FL', 43)
('NE', 43)
('MS', 36)
('TN', 44)
('MD', 41)
('IA', 53)
('PA', 35)
('LA', 31)
