<a href="https://colab.research.google.com/github/Patric-Ramz/bdt-2023-26720051/blob/main/Day%204/Beam_Task_1.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [14]:
!pip install --upgrade pip
!pip install --quiet apache-beam[interactive]

[0m

In [15]:
import pandas as pd

from google.colab import drive
drive.mount('/content/drive')

users_v = pd.read_csv('/content/drive/My Drive/Datasets/users_v.csv')
orders_v = pd.read_csv('/content/drive/My Drive/Datasets/orders_v.csv')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [16]:
# Create a directory for data
!mkdir -p data

# Download kinglear.txt from Cloud Storage
!gsutil cp gs://dataflow-samples/shakespeare/kinglear.txt data/

# Display the number of lines in kinglear.txt
!wc -l data/kinglear.txt

# Display the first three lines of kinglear.txt
!head -3 data/kinglear.txt

# Import necessary Python packages
import apache_beam as beam
import re

# Set input and output patterns
inputs_pattern = 'data/*'
outputs_prefix = 'outputs/part'

Copying gs://dataflow-samples/shakespeare/kinglear.txt...
/ [0 files][    0.0 B/153.6 KiB]                                                / [1 files][153.6 KiB/153.6 KiB]                                                
Operation completed over 1 objects/153.6 KiB.                                    
5525 data/kinglear.txt
	KING LEAR




In [17]:
import apache_beam as beam
from datetime import datetime

def format_user(element):
    # Split each line based on comma
    fields = element.split(',')

    user_id = fields[0]
    full_name = fields[1].split()
    first_name = full_name[0]
    last_name = ' '.join(full_name[1:])

    gender = "Male" if fields[2] == "male" else "Female"

    # Categorizing the age
    age = int(fields[3])
    age_category = "16-55" if 16 <= age <= 55 else "56+"

    # Splitting address into City, State and Zip code
    address_parts = fields[4].rsplit('-', 2)
    city = address_parts[0]
    state = address_parts[1]
    zip_code = address_parts[2]

    # Changing date format
    date_format = "%Y/%m/%d"
    date_joined = datetime.strptime(fields[5], date_format).strftime('%Y-%m-%d')

    return f'{user_id}; {first_name} {last_name}; {gender}; {age_category}; {city}, {state}, {zip_code}; {date_joined}'

with beam.Pipeline() as p:
    (
        p | 'Read CSV File' >> beam.io.ReadFromText('/content/drive/My Drive/Datasets/users_v.csv', skip_header_lines=1)
          | 'Format Users' >> beam.Map(format_user)
          | 'Write to Output CSV' >> beam.io.WriteToText('/content/drive/My Drive/Datasets/marketing_format', header='User Id;Name Surname;Male/Female;16-55;City,state,zip code;YYYY-MM-dd', file_name_suffix='.csv', shard_name_template='')
    )



In [18]:
from apache_beam import combiners

def extract_gender(element):
    fields = element.split(';')
    gender = fields[2].strip()  # Male or Female
    return (gender, 1)

def extract_date_joined(element):
    fields = element.split(';')
    date_joined = fields[-1].strip()
    return (date_joined, 1)

def extract_state(element):
    fields = element.split(';')
    state = fields[-3].strip()
    return (state, 1)

with beam.Pipeline() as p:
    formatted_data = (
        p | 'Read CSV File' >> beam.io.ReadFromText('/content/drive/My Drive/Datasets/users_v.csv', skip_header_lines=1)
          | 'Format Users' >> beam.Map(format_user)
    )

    gender_counts = (
        formatted_data | 'Extract Gender' >> beam.Map(extract_gender)
                       | 'Count Genders' >> beam.CombinePerKey(sum)
    )

    join_dates_counts = (
        formatted_data | 'Extract Join Date' >> beam.Map(extract_date_joined)
                       | 'Count Join Dates' >> beam.CombinePerKey(sum)
    )

    state_counts = (
        formatted_data | 'Extract State' >> beam.Map(extract_state)
                       | 'Count States' >> beam.CombinePerKey(sum)
    )

    gender_counts | 'Write Gender Counts' >> beam.io.WriteToText('/content/drive/My Drive/Datasets/gender_counts')
    join_dates_counts | 'Write Join Dates Counts' >> beam.io.WriteToText('/content/drive/My Drive/Datasets/join_dates_counts')
    state_counts | 'Write State Counts' >> beam.io.WriteToText('/content/drive/My Drive/Datasets/state_counts')



In [19]:
join_dates_df = pd.read_csv('/content/drive/My Drive/Datasets/join_dates_counts-00000-of-00001', header=None, names=['Join Date', 'Count'])
join_dates_df.sort_values(by='Join Date')

Unnamed: 0,Join Date,Count
1882,('2000-01-02',1)
1578,('2000-01-08',1)
1762,('2000-01-13',1)
1767,('2000-01-16',1)
50,('2000-01-17',1)
...,...,...
1550,('2021-09-18',1)
441,('2021-09-19',1)
276,('2021-09-23',2)
1426,('2021-09-25',1)


In [20]:
state_df = pd.read_csv('/content/drive/My Drive/Datasets/state_counts-00000-of-00001', header=None, names=['State', 'Count'])
state_df.sort_values(by='Count', ascending=False)

Unnamed: 0,State,Count
0,('56+',945)
1,('16-55',1412)


In [21]:
# Read the output file into a DataFrame
gender_counts_df = pd.read_csv('/content/drive/My Drive/Datasets/gender_counts-00000-of-00001', header=None, names=['Gender', 'Count'])

# Display the DataFrame
gender_counts_df

Unnamed: 0,Gender,Count
0,('Male',1207)
1,('Female',1150)
