<a href="https://colab.research.google.com/github/ROBEAZY/bdt-2024-17906342/blob/main/17906342_Tut4.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [62]:
#Question 1
import apache_beam as beam
import csv
import io

# Define transformation class
class Transform(beam.DoFn):
    def process(self, element):
        try:
            # Parse the line using CSV reader to handle commas inside quotes correctly
            reader = csv.reader(io.StringIO(element))
            fields = next(reader)  # Read the first (and only) row of the CSV reader

            # Ensure there are exactly 6 fields: user_id, name, gender, age, address, date_joined
            if len(fields) != 6:
                print(f"Skipping malformed line (not exactly 6 fields): {element}")
                return

            # Extract fields
            user_id = fields[0].strip()
            name = fields[1].strip()
            gender = fields[2].strip().capitalize()  # Capitalize gender to "Male" or "Female"
            age = fields[3].strip()

            # Split address into city, state, and zip
            address_parts = fields[4].strip().rsplit('-', 2)  # Split from the right to get last two parts as state and zip
            if len(address_parts) == 3:
                city = address_parts[0].strip()
                state = address_parts[1].strip()
                zip_code = address_parts[2].strip()
                address = f"{city},{state},{zip_code}"  # Format as City, State, Zip
            else:
                address = fields[4].strip()  # Fall back to original address if split doesn't work

            # Format the date to use hyphens instead of slashes
            date_joined = fields[5].strip().replace('/', '-')

            # Format the line as per the required output (semicolon-separated)
            formatted_line = f"{user_id};{name};{gender};{age}; {address};{date_joined}"
            yield formatted_line

        except Exception as e:
            print(f"Error processing line: {element}. Error: {e}")

# Custom print function for Beam
def myprint(element):
    print(element)

# Define the pipeline
with beam.Pipeline() as pipeline:
    (
        pipeline
        | 'Read lines' >> beam.io.ReadFromText('/content/users_v.csv', skip_header_lines=1)  # Skip header
        | 'Transform line' >> beam.ParDo(Transform())  # Apply transformations
        | 'Sample 5 lines' >> beam.combiners.Sample.FixedSizeGlobally(5)  # print 5 lines
        | 'Print 5 lines' >> beam.Map(lambda elements: [myprint(element) for element in elements])  # Print the elements
    )


INFO:root:Missing pipeline option (runner). Executing pipeline using the default runner: DirectRunner.
INFO:apache_beam.runners.worker.statecache:Creating state cache with size 104857600


1011;Colleen Vasquez;Female;62; Mirandaburgh,VT,52472;2013-02-15
1804;David Douglas;Male;26; Camachoshire,CT,52129;2019-08-23
1941;Patricia Harris;Female;58; Kennethmouth,HI,38828;2004-07-19
1449;Joyce Robinson;Female;37; North Peter,NH,40020;2017-11-17
41;Joshua Quinn;Male;62; Emilytown,NM,68576;2005-08-01


In [55]:
#Question 2a
import apache_beam as beam
import csv
import io

# Define transformation class
class Transform(beam.DoFn):
    def process(self, element):
        try:
            # Parse the line using CSV reader to handle commas inside quotes correctly
            reader = csv.reader(io.StringIO(element))
            fields = next(reader)  # Read the first (and only) row of the CSV reader

            # Ensure there are exactly 6 fields: user_id, name, gender, age, address, date_joined
            if len(fields) != 6:
                print(f"Skipping malformed line (not exactly 6 fields): {element}")
                return

            # Extract fields
            user_id = fields[0].strip()
            name = fields[1].strip()
            gender = fields[2].strip().capitalize()  # Capitalize gender to "Male" or "Female"
            age = fields[3].strip()

            # Split address into city, state, and zip
            address_parts = fields[4].strip().rsplit('-', 2)  # Split from the right to get last two parts as state and zip
            if len(address_parts) == 3:
                city = address_parts[0].strip()
                state = address_parts[1].strip()
                zip_code = address_parts[2].strip()
                address = f"{city},{state},{zip_code}"  # Format as City, State, Zip
            else:
                address = fields[4].strip()  # Fall back to original address if split doesn't work

            # Format the date to use hyphens instead of slashes
            date_joined = fields[5].strip().replace('/', '-')

            # Yield the gender as output
            yield gender

        except Exception as e:
            print(f"Error processing line: {element}. Error: {e}")

# Define the pipeline
with beam.Pipeline() as pipeline:
    # Read lines and transform them to extract the gender
    genders = (
        pipeline
        | 'Read lines' >> beam.io.ReadFromText('/content/users_v.csv', skip_header_lines=1)  # Skip header
        | 'Extract gender' >> beam.ParDo(Transform())  # Extract the gender from each record
    )

    # Count the number of male and female customers
    male_count = (
        genders
        | 'Filter males' >> beam.Filter(lambda gender: gender == 'Male')  # Filter male records
        | 'Count males' >> beam.combiners.Count.Globally()  # Count the male records
    )

    female_count = (
        genders
        | 'Filter females' >> beam.Filter(lambda gender: gender == 'Female')  # Filter female records
        | 'Count females' >> beam.combiners.Count.Globally()  # Count the female records
    )

    # Print the results
    male_count | 'Print male count' >> beam.Map(lambda count: print(f"Number of male customers: {count}"))
    female_count | 'Print female count' >> beam.Map(lambda count: print(f"Number of female customers: {count}"))



INFO:root:Missing pipeline option (runner). Executing pipeline using the default runner: DirectRunner.
INFO:apache_beam.runners.worker.statecache:Creating state cache with size 104857600


Number of female customers: 1150
Number of male customers: 1207


In [57]:
#Question 2b
import apache_beam as beam
import csv
import io

# Define transformation class
class ExtractDate(beam.DoFn):
    def process(self, element):
        try:
            # Parse the line using CSV reader to handle commas inside quotes correctly
            reader = csv.reader(io.StringIO(element))
            fields = next(reader)  # Read the first (and only) row of the CSV reader

            # Ensure there are exactly 6 fields: user_id, name, gender, age, address, date_joined
            if len(fields) != 6:
                print(f"Skipping malformed line (not exactly 6 fields): {element}")
                return

            # Extract the date_joined field and yield it
            date_joined = fields[5].strip().replace('/', '-')  # Ensure date is formatted with hyphens
            yield (date_joined, 1)  # Emit the date and count of 1 for each record

        except Exception as e:
            print(f"Error processing line: {element}. Error: {e}")

# Define the pipeline
with beam.Pipeline() as pipeline:
    # Read lines and extract the date_joined field with a count of 1 for each record
    date_counts = (
        pipeline
        | 'Read lines' >> beam.io.ReadFromText('/content/users_v.csv', skip_header_lines=1)  # Skip header
        | 'Extract date' >> beam.ParDo(ExtractDate())  # Extract date_joined and emit (date_joined, 1)
        | 'Group by date' >> beam.CombinePerKey(sum)  # Sum the counts for each date
    )

    # Get the top 5 days by customer count
    top_5_days = (
        date_counts
        | 'Top 5 days' >> beam.combiners.Top.Of(5, key=lambda x: x[1])  # Get top 5 based on customer count
    )

    # Print the top 5 results
    top_5_days | 'Print top 5' >> beam.Map(lambda days: [print(f"{day[0]}: {day[1]} customers") for day in days])



INFO:root:Missing pipeline option (runner). Executing pipeline using the default runner: DirectRunner.
INFO:apache_beam.runners.worker.statecache:Creating state cache with size 104857600


2018-10-17: 5 customers
2011-05-13: 4 customers
2001-10-11: 3 customers
2015-05-16: 3 customers
2015-01-26: 3 customers


In [61]:
#Question 2c
import apache_beam as beam
import csv
import io

# Define transformation class
class ExtractState(beam.DoFn):
    def process(self, element):
        try:
            # Parse the line using CSV reader to handle commas inside quotes correctly
            reader = csv.reader(io.StringIO(element))
            fields = next(reader)  # Read the first (and only) row of the CSV reader

            # Ensure there are exactly 6 fields: user_id, name, gender, age, address, date_joined
            if len(fields) != 6:
                print(f"Skipping malformed line (not exactly 6 fields): {element}")
                return

            # Extract the state from the address field (assumes address format: City-State-ZIP)
            address_parts = fields[4].strip().rsplit('-', 2)  # Split from the right to get last two parts as state and zip
            if len(address_parts) == 3:
                state = address_parts[1].strip()  # State is the second-to-last part of the address
                yield (state, 1)  # Emit the state and count of 1 for each record
            else:
                print(f"Skipping malformed address: {fields[4]}")

        except Exception as e:
            print(f"Error processing line: {element}. Error: {e}")

# Define the pipeline
with beam.Pipeline() as pipeline:
    # Read lines and extract the state with a count of 1 for each record
    state_counts = (
        pipeline
        | 'Read lines' >> beam.io.ReadFromText('/content/users_v.csv', skip_header_lines=1)  # Skip header
        | 'Extract state' >> beam.ParDo(ExtractState())  # Extract state and emit (state, 1)
        | 'Group by state' >> beam.CombinePerKey(sum)  # Sum the counts for each state
    )

    # Print the results
    state_counts | 'Print state counts' >> beam.Map(lambda state_count: print(f"{state_count[0]}: {state_count[1]} customers"))



INFO:root:Missing pipeline option (runner). Executing pipeline using the default runner: DirectRunner.
INFO:apache_beam.runners.worker.statecache:Creating state cache with size 104857600


VA: 44 customers
UT: 50 customers
SC: 50 customers
ME: 43 customers
WI: 56 customers
MI: 56 customers
SD: 48 customers
AK: 52 customers
NJ: 58 customers
VT: 54 customers
NY: 45 customers
AZ: 50 customers
KY: 43 customers
MT: 49 customers
CT: 63 customers
ID: 51 customers
GA: 53 customers
OR: 39 customers
AR: 53 customers
DC: 50 customers
NC: 54 customers
MA: 45 customers
OH: 28 customers
ND: 46 customers
NM: 42 customers
HI: 51 customers
CA: 49 customers
CO: 48 customers
NH: 39 customers
DE: 48 customers
WY: 50 customers
WA: 42 customers
OK: 47 customers
IN: 45 customers
AL: 55 customers
NV: 40 customers
KS: 49 customers
WV: 40 customers
TX: 48 customers
RI: 35 customers
IL: 40 customers
MO: 42 customers
MN: 41 customers
FL: 43 customers
NE: 43 customers
MS: 36 customers
TN: 44 customers
MD: 41 customers
IA: 53 customers
PA: 35 customers
LA: 31 customers
