In [1]:
# Run and print a shell command.
def run(cmd):
  print('>> {}'.format(cmd))
  !{cmd}
  print('')

run('pip install --upgrade pip')

# Install apache-beam.
run('pip install --quiet apache-beam')

# Copy the input file into the local file system.
run('mkdir -p data')
run('gsutil cp gs://dataflow-samples/shakespeare/kinglear.txt data/')

>> pip install --upgrade pip
Collecting pip
  Downloading pip-23.2.1-py3-none-any.whl (2.1 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m2.1/2.1 MB[0m [31m20.7 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: pip
  Attempting uninstall: pip
    Found existing installation: pip 23.1.2
    Uninstalling pip-23.1.2:
      Successfully uninstalled pip-23.1.2
Successfully installed pip-23.2.1

>> pip install --quiet apache-beam
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m89.7/89.7 kB[0m [31m2.5 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m49.2/49.2 kB[0m [31m4.1 MB/s[0m eta [36m0:00:00[0m
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m152.0/152.0 kB[0m [31m9.2 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━

In [2]:
import pandas as pd
from google.colab import drive
drive.mount('/content/drive')
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt

users_file = '/content/drive/My Drive/Datasets/users.csv'
orders_file = '/content/drive/My Drive/Datasets/orders.csv'




Mounted at /content/drive


In [4]:
import apache_beam as beam

with beam.Pipeline() as pipeline:
    users = (
        pipeline
        | 'Read Users Data' >> beam.io.ReadFromText(users_file)
        | 'Parse Users Data' >> beam.Map(lambda line: line.split(','))
        | 'User Data Keyed' >> beam.Map(lambda data: (data[0], data))
    )

    orders = (
        pipeline
        | 'Read Orders Data' >> beam.io.ReadFromText(orders_file)
        | 'Parse Orders Data' >> beam.Map(lambda line: line.split(','))
        | 'Order Data Keyed' >> beam.Map(lambda data: (data[0], data))
    )

    joined_data = (
        {'users': users, 'orders': orders}
        | 'CoGroupByKey' >> beam.CoGroupByKey()
    )
result = pipeline.run()
result.wait_until_finish()

'DONE'

In [5]:
class CalculateAverageOrders(beam.DoFn):
    def process(self, element):
        user_key, data_dict = element
        users = data_dict['users']
        orders = data_dict['orders']

        female_users = [user for user in users if user[2] == 'Female']
        male_users = [user for user in users if user[2] == 'Male']

        avg_female_orders = len(orders) / len(female_users) if female_users else 0
        avg_male_orders = len(orders) / len(male_users) if male_users else 0

        yield f'Average Orders for Female Customers: {avg_female_orders}', f'Average Orders for Male Customers: {avg_male_orders}'

with beam.Pipeline() as pipeline:
    average_orders = (
        joined_data
        | 'Calculate Average Orders' >> beam.ParDo(CalculateAverageOrders())
        | 'Format Output' >> beam.Map(lambda data: f'{data[0]}\n{data[1]}')
        | 'Write Output' >> beam.io.WriteToText('average_orders_output.txt')
    )
result = pipeline.run()
result.wait_until_finish()

'DONE'

In [7]:
class CountOrders(beam.PTransform):
    def expand(self, pcoll):
        return (
            pcoll
            | 'Count Orders' >> beam.combiners.Count.Globally()
            | 'Format Count' >> beam.Map(lambda count: f'Total Orders Processed: {count}')
        )

with beam.Pipeline() as pipeline:
    average_orders = (
        joined_data
        | 'Calculate Average Orders' >> beam.ParDo(CalculateAverageOrders())
        | 'Format Output' >> beam.Map(lambda data: f'{data[0]}\n{data[1]}')
    )
    total_orders = (
        average_orders
        | 'Count Total Orders' >> CountOrders()
        | 'Write Total Orders' >> beam.io.WriteToText('total_orders_output.txt')
    )
result = pipeline.run()
result.wait_until_finish()

'DONE'

In [8]:
import apache_beam as beam

class JoinUsersAndOrders(beam.DoFn):
    def process(self, element):
        user_key, data_dict = element
        users = data_dict['users']
        orders = data_dict['orders']

        for user in users:
            for order in orders:
                yield user, order

class CalculateAverageOrders(beam.DoFn):
    def process(self, element):
        user, order = element

        # Check if the user is female or male
        gender = user[2]
        if gender == 'Female':
            yield 'Female', 1
        elif gender == 'Male':
            yield 'Male', 1

with beam.Pipeline() as pipeline:
    users = (
        pipeline
        | 'Read Users Data' >> beam.io.ReadFromText(users_file)
        | 'Parse Users Data' >> beam.Map(lambda line: line.split(','))

    orders = (
        pipeline
        | 'Read Orders Data' >> beam.io.ReadFromText(orders_file)
        | 'Parse Orders Data' >> beam.Map(lambda line: line.split(','))
    )

    users_keyed = users | 'Key Users Data' >> beam.Map(lambda data: (data[0], data))
    orders_keyed = orders | 'Key Orders Data' >> beam.Map(lambda data: (data[0], data))


    joined_data = (
        {'users': users_keyed, 'orders': orders_keyed}
        | 'CoGroupByKey' >> beam.CoGroupByKey()
    )


    average_orders = (
        joined_data
        | 'Join Users and Orders' >> beam.ParDo(JoinUsersAndOrders())
        | 'Calculate Average Orders' >> beam.ParDo(CalculateAverageOrders())
        | 'Group by Gender' >> beam.CombinePerKey(sum)
        | 'Format Output' >> beam.Map(lambda data: f'Gender: {data[0]}, Average Orders: {data[1]}')
        | 'Write Output' >> beam.io.WriteToText('average_orders_output.txt')
    )

result = pipeline.run()
result.wait_until_finish()

'DONE'

In [None]:
class CalculateAgeGroupOrders(beam.DoFn):
    def process(self, element):
        user_key, data_dict = element
        users = data_dict['users']
        orders = data_dict['orders']

        age_groups = {'16-26': 0, '26-36': 0, '36-46': 0, '46-56': 0}

        for user in users:
            age = int(user[1])
            for group in age_groups.keys():
                lower, upper = map(int, group.split('-'))
                if lower <= age < upper:
                    age_groups[group] += len(orders)

        yield age_groups

with beam.Pipeline() as pipeline:

    age_group_orders = (
        joined_data
        | 'Calculate Age Group Orders' >> beam.ParDo(CalculateAgeGroupOrders())
        | 'Combine Age Group Orders' >> beam.combiners.ToDict()
        | 'Write Age Group Orders' >> beam.io.WriteToText('age_group_orders_output.txt


In [18]:
import apache_beam as beam
from apache_beam.transforms.window import FixedWindows
from datetime import timedelta
import csv
import pyarrow as pa
import pyarrow.parquet as pq
from google.colab import drive

# Mount Google Drive
drive.mount('/content/drive')

# Define file paths
users_file = '/content/drive/My Drive/Datasets/users.csv'
orders_file = '/content/drive/My Drive/Datasets/orders.csv'


def parse_csv_line(line):
    try:
        reader = csv.reader([line])
        fields = next(reader)
        if len(fields) == 4:
            return {
                'order_no': fields[0],
                'user_id': fields[1],
                'product_list': fields[2],
                'date_purchase': fields[3]
            }
        else:
            # Log or print a message for lines with incorrect format
            print(f"Skipping line with incorrect format: {line}")
            return None
    except Exception as e:
        # Log or print the exception for troubleshooting
        print(f"Error parsing line: {line}\n{e}")
        return None


class CalculateRollingAverage(beam.PTransform):
    def __init__(self, window_size):
        self.window_size = window_size

    def expand(self, pcoll):
        return (
            pcoll
            | 'Filter None Values' >> beam.Filter(lambda x: x is not None)  # Remove None values
            | 'Window' >> beam.WindowInto(FixedWindows(int(self.window_size.total_seconds())))
            | 'Extract Date and Count' >> beam.Map(lambda data: (data['date_purchase'], 1))
            | 'Calculate Sum and Count' >> beam.CombinePerKey(sum)
            | 'Calculate Average' >> beam.Map(lambda kv: {'date_purchase': kv[0], 'rolling_average': kv[1] / self.window_size.total_seconds()})
        )

with beam.Pipeline() as pipeline:

    data = (
        pipeline
        | 'Read CSV File' >> beam.io.ReadFromText(orders_file)
        | 'Parse CSV Lines' >> beam.Map(parse_csv_line)
    )

    rolling_average_seven_days = (
        data
        | 'Calculate Rolling Average (7 days)' >> CalculateRollingAverage(window_size=timedelta(days=7))
    )

    rolling_average_thirty_days = (
        data
        | 'Calculate Rolling Average (30 days)' >> CalculateRollingAverage(window_size=timedelta(days=30))
    )

result = pipeline.run()
result.wait_until_finish()


[1;30;43mStreaming output truncated to the last 5000 lines.[0m
Skipping line with incorrect format: 1594536;1334;Winter Melon, Jícama;2021-09-06
Skipping line with incorrect format: 1594537;1278;Dandelion Greens, Garlic;2021-09-06
Skipping line with incorrect format: 1594538;2191;Water Spinach, Kohlrabi Greens;2021-09-06
Skipping line with incorrect format: 1594539;918;Brussels Sprouts, Winter Melon;2021-09-06
Skipping line with incorrect format: 1594540;480;Plantain, Parsley;2021-09-06
Skipping line with incorrect format: 1594541;867;Lotus Root, Fiddlehead;2021-09-06
Skipping line with incorrect format: 1594543;1712;Wax Beans;2021-09-06
Skipping line with incorrect format: 1594544;1501;Chayote, Cassava, Lotus Root;2021-09-06
Skipping line with incorrect format: 1594545;519;Bitter Melon, Lotus Seed, Swiss Chard;2021-09-06
Skipping line with incorrect format: 1594546;213;Beet, Winter Melon;2021-09-06
Skipping line with incorrect format: 1594547;2184;Brussels Sprouts, Chinese Broccoli,

'DONE'

In [None]:

class WriteToParquet(beam.DoFn):
    def __init__(self, output_path):
        self.output_path = output_path

    def process(self, element):
        schema = pa.schema([
            ('date_purchase', pa.timestamp('s')),
            ('rolling_average', pa.float64())
        ])

        table = pa.Table.from_pandas(element, schema=schema)
        output_file = f'{self.output_path}/rolling_average.parquet'

        with pq.ParquetWriter(output_file, schema) as writer:
            writer.write_table(table)

with beam.Pipeline() as pipeline:
    # ... Previous code ...


    rolling_average_seven_days = (
        data
        | 'Calculate Rolling Average (7 days)' >> CalculateRollingAverage(window_size=timedelta(days=7))
    )


    rolling_average_thirty_days = (
        data
        | 'Calculate Rolling Average (30 days)' >> CalculateRollingAverage(window_size=timedelta(days=30))
    )
output_path = '/content/drive/My Drive/output'


rolling_average_seven_days | 'Write Rolling Average (7 days) to Parquet' >> beam.ParDo(WriteToParquet(output_path=output_path, prefix='rolling_average_7_days'))

rolling_average_thirty_days | 'Write Rolling Average (30 days) to Parquet' >> beam.ParDo(WriteToParquet(output_path=output_path, prefix='rolling_average_30_days'))

