<a href="https://colab.research.google.com/github/18708064/postblock1-774/blob/main/PostBlock1_Big_Data_Technologies.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

**Apache Beam Analytics**

Installing and importing the Beam

In [121]:
!pip install apache-beam




Importing  Libraries

In [122]:
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
import pandas as pd

Accessing Files from Github

In [123]:
import pandas as pd

# URLs to the raw CSV files on GitHub
orders_url = 'https://raw.githubusercontent.com/18708064/postblock1-774/main/orders.csv'
users_url = 'https://raw.githubusercontent.com/18708064/postblock1-774/main/users.csv'

# Download the files using wget
!wget -O users.csv {users_url}
!wget -O orders.csv {orders_url}

--2024-09-28 16:10:53--  https://raw.githubusercontent.com/18708064/postblock1-774/main/users.csv
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 185.199.109.133, 185.199.110.133, 185.199.111.133, ...
Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|185.199.109.133|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 143675 (140K) [text/plain]
Saving to: ‘users.csv’


2024-09-28 16:10:53 (4.53 MB/s) - ‘users.csv’ saved [143675/143675]

--2024-09-28 16:10:53--  https://raw.githubusercontent.com/18708064/postblock1-774/main/orders.csv
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 185.199.108.133, 185.199.109.133, 185.199.110.133, ...
Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|185.199.108.133|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 73312907 (70M) [text/plain]
Saving to: ‘orders.csv’


2024-09-28 16:10:54 (219 MB/s) - ‘orders.csv’ saved [73312907/733129

Inspect the Data

In [124]:
# Read the CSV files into DataFrames
users_df = pd.read_csv('users.csv', delimiter=',')
orders_df = pd.read_csv('orders.csv', delimiter=';')

print("Users DataFrame:")
print(users_df.head())

print("\nOrders DataFrame:")
print(orders_df.head())

Users DataFrame:
   user_id             name  gender  age                     address  \
0        1     Anthony Wolf    male   73    New Rachelburgh-VA-49583   
1        2  James Armstrong    male   56  North Jillianfort-UT-86454   
2        3        Cody Shaw    male   75         North Anne-SC-53799   
3        4  Sierra Hamilton  female   76     New Angelafurt-ME-46190   
4        5      Chase Davis    male   31    South Bethmouth-WI-18562   

  date_joined  
0  2019/03/13  
1  2020/11/06  
2  2004/05/29  
3  2005/08/26  
4  2018/04/30  

Orders DataFrame:
   order_no  user_id             product_list date_purchased
0      1000     1887                  Cassava     2000-01-01
1      1001      838  Calabash, Water Spinach     2000-01-01
2      1002     2032            Onion, Rapini     2000-01-01
3      1003     1482   Swiss Chard, Artichoke     2000-01-01
4      1004      475  Turnip Greens, Plantain     2000-01-01


Defining Parsing Functions

In [125]:
def parse_users(line):
    import csv
    from io import StringIO

    # Read the line as CSV
    reader = csv.reader(StringIO(line), delimiter=',')
    fields = next(reader)

    # Check if the line has the expected number of fields
    if len(fields) != 6:
        return None

    user_id, name, gender, age, address, date_joined = fields
    user_info = {
        'name': name,
        'gender': gender,
        'age': int(age),
        'address': address,
        'date_joined': date_joined
    }
    return (int(user_id), user_info)




Orders Function

In [126]:
def parse_orders(line):
    import csv
    from io import StringIO

    # Read the line as CSV
    reader = csv.reader(StringIO(line), delimiter=';')
    fields = next(reader)

    # Check if the line has the expected number of fields
    if len(fields) != 4:
        return None

    order_no, user_id, product_list, date_purchased = fields
    order_info = {
        'order_no': order_no,
        'product_list': product_list,
        'date_purchased': date_purchased
    }
    return (int(user_id), order_info)



Create the Beam Pipline

In [127]:
options = PipelineOptions()
with beam.Pipeline(options=options) as p:
    # Read and parse the users data
    users = (
        p
        | 'ReadUsers' >> beam.io.ReadFromText('users.csv', skip_header_lines=1)
        | 'ParseUsers' >> beam.Map(parse_users)
        | 'FilterValidUsers' >> beam.Filter(lambda x: x is not None)
    )

    # Read and parse the orders data
    orders = (
        p
        | 'ReadOrders' >> beam.io.ReadFromText('orders.csv', skip_header_lines=1)
        | 'ParseOrders' >> beam.Map(parse_orders)
        | 'FilterValidOrders' >> beam.Filter(lambda x: x is not None)
    )

    # Join the input files as streams using CoGroupByKey
    joined_data = ({'users': users, 'orders': orders}
                   | 'GroupByUserID' >> beam.CoGroupByKey())

    # Format the result for output
    def format_result(element):
        user_id, grouped_data = element
        users_info = grouped_data['users']
        orders_info = grouped_data['orders']

        user_info = users_info[0] if users_info else {}
        return {
            'user_id': user_id,
            'user_info': user_info,
            'orders': orders_info
        }

    # Collect the output
    output = (
        joined_data
        | 'FormatResult' >> beam.Map(format_result)
        | 'WriteOutput' >> beam.io.WriteToText('joined_output.txt')
    )






Read and display

In [128]:
# Read and display only the first 5 lines of the joined data
print("\nJoined Data (First 5 Lines):")
with open('joined_output.txt-00000-of-00001') as f:
    for i, line in enumerate(f):
        if i >= 5:  # Stop after 5 lines
            break
        print(line.strip())


Joined Data (First 5 Lines):
{'user_id': 1887, 'user_info': {'name': 'Andrea Hartman', 'gender': 'female', 'age': 58, 'address': 'Karenmouth-MN-95314', 'date_joined': '2016/07/06'}, 'orders': [{'order_no': '1000', 'product_list': 'Cassava', 'date_purchased': '2000-01-01'}, {'order_no': '1356', 'product_list': 'Bitter Melon, Turnip, Water Chestnut', 'date_purchased': '2000-01-04'}, {'order_no': '2151', 'product_list': 'English Cucumber, Plantain, Taro', 'date_purchased': '2000-01-09'}, {'order_no': '3075', 'product_list': 'Bell Pepper, Shallots', 'date_purchased': '2000-01-13'}, {'order_no': '5406', 'product_list': 'Watercress, Hearts of Palm', 'date_purchased': '2000-01-24'}, {'order_no': '7815', 'product_list': 'Swiss Chard, Fiddlehead, Fennel', 'date_purchased': '2000-02-09'}, {'order_no': '11147', 'product_list': 'Tomato, Mustard Greens, Lotus Seed, Grape Leaves', 'date_purchased': '2000-02-22'}, {'order_no': '15456', 'product_list': 'Fennel', 'date_purchased': '2000-03-14'}, {'ord

Calculate Average Orders by Gender

In [129]:
def calculate_order_counts(element):
    user_id, grouped_data = element
    orders_info = grouped_data['orders']
    num_orders = len(orders_info)
    return (grouped_data['users'][0]['gender'], num_orders)

# Calculate total orders per gender
gender_order_counts = (joined_data
                       | 'CalculateOrderCounts' >> beam.Map(calculate_order_counts)
                       | 'GroupByGender' >> beam.CombinePerKey(sum))

# Calculate average orders per gender
gender_order_counts_avg = (gender_order_counts
                           | 'CalculateAverageOrders' >> beam.Map(lambda x: (x[0], x[1] / 2)))  # Replace 2 with actual user count


In [130]:
def run_pipeline():
    options = PipelineOptions()
    with beam.Pipeline(options=options) as p:
        # Read and parse the users data
        users = (
            p
            | 'ReadUsers' >> beam.io.ReadFromText('users.csv', skip_header_lines=1)
            | 'ParseUsers' >> beam.Map(parse_users)
            | 'FilterValidUsers' >> beam.Filter(lambda x: x is not None)
        )

        # Read and parse the orders data
        orders = (
            p
            | 'ReadOrders' >> beam.io.ReadFromText('orders.csv', skip_header_lines=1)
            | 'ParseOrders' >> beam.Map(parse_orders)
            | 'FilterValidOrders' >> beam.Filter(lambda x: x is not None)
        )

        # Join the input files as streams using CoGroupByKey
        joined_data = (
            {'users': users, 'orders': orders}
            | 'GroupByUserID' >> beam.CoGroupByKey()
        )

        # Calculate total orders per gender and user count
        def calculate_order_counts(element):
            user_id, grouped_data = element
            orders_info = grouped_data['orders']
            num_orders = len(orders_info)
            return (grouped_data['users'][0]['gender'], (num_orders, 1))  # Return both order count and user count

        gender_order_counts = (
            joined_data
            | 'CalculateOrderCounts' >> beam.Map(calculate_order_counts)
            | 'GroupByGender' >> beam.CombinePerKey(lambda counts: (sum(c[0] for c in counts), sum(c[1] for c in counts)))
        )

        # Calculate average orders per gender
        gender_order_counts_avg = (
            gender_order_counts
            | 'CalculateAverageOrders' >> beam.Map(lambda x: (x[0], x[1][0] / x[1][1]))  # Divide total orders by user count
        )

        # Output results
        def format_result(element):
            gender, avg_orders = element
            return f'Gender: {gender}, Average Orders: {avg_orders}'

        # Print results to verify
        gender_order_counts_avg | 'PrintResults' >> beam.Map(print)

        # Write the results to a text file
        gender_order_counts_avg | 'FormatResults' >> beam.Map(format_result) | 'WriteOutput' >> beam.io.WriteToText('average_orders_by_gender.txt')

# Execute the pipeline
run_pipeline()




('female', 678.5173913043478)
('male', 678.2220381110191)


In [131]:
gender_order_counts_avg | 'WriteAverageOrders' >> beam.io.WriteToText('average_orders_by_gender.txt')


<PCollection[[131]: WriteAverageOrders/Write/WriteImpl/FinalizeWrite.None] at 0x7ddb300cb3d0>

Display Output

In [132]:
!cat average_orders_by_gender.txt*


Gender: female, Average Orders: 678.5173913043478
Gender: male, Average Orders: 678.2220381110191
