<a href="https://colab.research.google.com/github/Sameersah/eda-and-apache-beam/blob/main/assignment-3/assignment_3_apache_beam.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

Step 1: Set up the environment for Apache Beam

In [1]:
# Install Apache Beam
!pip install apache-beam

# Import necessary libraries
import apache_beam as beam
import pandas as pd

# Load the dataset
dataset_path = '/content/data/gym_members_exercise_tracking.csv'

# Read the dataset to verify it loads correctly
df = pd.read_csv(dataset_path)
print("Dataset loaded successfully!")
print(df.head())  # Display the first few rows of the dataset


Collecting apache-beam
  Downloading apache_beam-2.60.0-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (7.6 kB)
Collecting crcmod<2.0,>=1.7 (from apache-beam)
  Downloading crcmod-1.7.tar.gz (89 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m89.7/89.7 kB[0m [31m2.6 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting dill<0.3.2,>=0.3.1.1 (from apache-beam)
  Downloading dill-0.3.1.1.tar.gz (151 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m152.0/152.0 kB[0m [31m6.4 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting cloudpickle~=2.2.1 (from apache-beam)
  Downloading cloudpickle-2.2.1-py3-none-any.whl.metadata (6.9 kB)
Collecting fastavro<2,>=0.23.6 (from apache-beam)
  Downloading fastavro-1.9.7-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (5.5 kB)
Collecting fasteners<1.0,>=0.3 (from apache-beam)
  Do

Step 2: Create a simple pipeline to read data and print it using Apache Beam.


In [31]:
# Define the Beam pipeline
pipeline = beam.Pipeline()

# Create the pipeline
(
    pipeline
    | "Read Data" >> beam.io.ReadFromText(dataset_path, skip_header_lines=1)  # Skip the header
    | "Print Rows" >> beam.Map(print)  # Print each row
)

# Run the pipeline
pipeline.run()


56,Male,88.3,1.71,180,157,60,1.69,1313.0,Yoga,12.6,3.5,4,3,30.2
46,Female,74.9,1.53,179,151,66,1.3,883.0,HIIT,33.9,2.1,4,2,32.0
32,Female,68.1,1.66,167,122,54,1.11,677.0,Cardio,33.4,2.3,4,2,24.71
25,Male,53.2,1.7,190,164,56,0.59,532.0,Strength,28.8,2.1,3,1,18.41
38,Male,46.1,1.79,188,158,68,0.64,556.0,Strength,29.2,2.8,3,1,14.39
56,Female,58.0,1.68,168,156,74,1.59,1116.0,HIIT,15.5,2.7,5,3,20.55
36,Male,70.3,1.72,174,169,73,1.49,1385.0,Cardio,21.3,2.3,3,2,23.76
40,Female,69.7,1.51,189,141,64,1.27,895.0,Cardio,30.6,1.9,3,2,30.57
28,Male,121.7,1.94,185,127,52,1.03,719.0,Strength,28.9,2.6,4,2,32.34
28,Male,101.8,1.84,169,136,64,1.08,808.0,Cardio,29.7,2.7,3,1,30.07
41,Male,120.8,1.67,188,146,54,0.82,593.0,HIIT,20.5,3.0,2,1,43.31
53,Male,51.7,1.7,175,152,72,1.15,865.0,HIIT,23.6,3.5,3,2,17.89
57,Male,112.5,1.61,195,165,61,1.24,1013.0,Cardio,22.1,2.7,3,2,43.4
41,Male,94.5,2.0,179,136,69,1.18,794.0,HIIT,27.6,3.7,3,1,23.62
20,Male,117.7,1.81,196,161,54,1.35,1195.0,Yoga,26.4,3.3,3,2,35.93
39,Fema

<apache_beam.runners.portability.fn_api_runner.fn_runner.RunnerResult at 0x79972ad7ea40>

Step 3: Implement a Composite Transform.

In [32]:
 # Define a composite transform
class ExtractFields(beam.PTransform):
    def expand(self, pcoll):
        return (
            pcoll
            | "Split Rows" >> beam.Map(lambda row: row.split(','))  # Split rows by comma
            | "Extract Fields" >> beam.Map(lambda fields: (fields[0], fields[2]))  # Extract member ID and calories burned
        )

# Define and run the pipeline
pipeline = beam.Pipeline()

(
    pipeline
    | "Read Data" >> beam.io.ReadFromText(dataset_path, skip_header_lines=1)
    | "Process Data" >> ExtractFields()  # Apply the composite transform
    | "Print Extracted Fields" >> beam.Map(print)  # Print the processed rows
)

# Run the pipeline
pipeline.run()


('56', '88.3')
('46', '74.9')
('32', '68.1')
('25', '53.2')
('38', '46.1')
('56', '58.0')
('36', '70.3')
('40', '69.7')
('28', '121.7')
('28', '101.8')
('41', '120.8')
('53', '51.7')
('57', '112.5')
('41', '94.5')
('20', '117.7')
('39', '42.5')
('19', '64.0')
('41', '43.8')
('47', '66.8')
('55', '75.2')
('19', '89.0')
('38', '71.9')
('50', '71.0')
('29', '120.9')
('39', '64.3')
('42', '63.7')
('44', '65.2')
('59', '53.9')
('45', '84.9')
('33', '78.0')
('32', '108.2')
('20', '65.4')
('54', '50.2')
('24', '58.9')
('38', '81.4')
('26', '127.6')
('56', '59.3')
('35', '96.9')
('21', '62.6')
('42', '45.5')
('31', '48.8')
('26', '44.3')
('43', '113.2')
('19', '60.5')
('37', '124.2')
('45', '52.4')
('24', '54.7')
('25', '88.1')
('52', '59.7')
('31', '79.7')
('34', '51.0')
('53', '84.2')
('57', '122.1')
('21', '96.7')
('19', '73.0')
('23', '114.8')
('59', '65.5')
('21', '50.3')
('46', '61.0')
('35', '44.6')
('43', '58.2')
('51', '44.8')
('27', '87.5')
('53', '51.8')
('31', '64.4')
('48', '67.1'

<apache_beam.runners.portability.fn_api_runner.fn_runner.RunnerResult at 0x7997299cc130>

 Step 4: Add Windowing to the Pipeline

In [33]:
from apache_beam.transforms.window import FixedWindows
import time  # To simulate streaming

# Simulate streaming data by reading the dataset line-by-line
def simulate_streaming(lines):
    for line in lines:
       # time.sleep(0.1)  # Simulate delay between data rows
        yield line

# Define and run the pipeline with windowing
pipeline = beam.Pipeline()

(
    pipeline
    | "Read Data" >> beam.io.ReadFromText(dataset_path, skip_header_lines=1)
    | "Simulate Streaming" >> beam.FlatMap(simulate_streaming)  # Simulate streaming
    | "Add Timestamps" >> beam.Map(lambda row: beam.window.TimestampedValue(row, time.time()))  # Add timestamps
    | "Fixed Windows" >> beam.WindowInto(FixedWindows(5))  # Apply 5-second fixed windowing
    | "Print Windowed Data" >> beam.Map(print)  # Print data for each window
)

# Run the pipeline
pipeline.run()


[1;30;43mStreaming output truncated to the last 5000 lines.[0m
4
0
,
M
a
l
e
,
8
0
.
3
,
1
.
7
6
,
1
6
8
,
1
6
7
,
7
4
,
1
.
6
3
,
1
4
9
7
.
0
,
S
t
r
e
n
g
t
h
,
1
3
.
1
,
3
.
5
,
5
,
3
,
2
5
.
9
2
2
7
,
M
a
l
e
,
1
0
4
.
3
,
1
.
6
8
,
1
6
2
,
1
3
5
,
6
2
,
1
.
2
8
,
9
5
0
.
0
,
C
a
r
d
i
o
,
2
9
.
7
,
3
.
5
,
3
,
2
,
3
6
.
9
5
1
9
,
F
e
m
a
l
e
,
6
5
.
7
,
1
.
5
8
,
1
7
0
,
1
2
1
,
5
7
,
1
.
4
6
,
8
8
3
.
0
,
C
a
r
d
i
o
,
2
5
.
9
,
1
.
9
,
3
,
2
,
2
6
.
3
2
3
0
,
M
a
l
e
,
1
2
0
.
3
,
1
.
9
6
,
1
7
1
,
1
3
6
,
6
5
,
0
.
6
6
,
4
9
4
.
0
,
Y
o
g
a
,
2
3
.
9
,
2
.
6
,
2
,
1
,
3
1
.
3
2
5
7
,
F
e
m
a
l
e
,
5
1
.
9
,
1
.
5
7
,
1
6
9
,
1
5
1
,
6
7
,
1
.
0
5
,
7
1
3
.
0
,
H
I
I
T
,
2
9
.
1
,
2
.
6
,
3
,
2
,
2
1
.
0
6
1
9
,
F
e
m
a
l
e
,
5
9
.
1
,
1
.
5
7
,
1
8
4
,
1
6
2
,
5
0
,
1
.
6
5
,
1
3
3
6
.
0
,
C
a
r
d
i
o
,
1
7
.
9
,
2
.
7
,
5
,
3
,
2
3
.
9
8
3
7
,
F
e
m
a
l
e
,
6
6
.
9
,
1
.
7
5
,
1
8
4
,
1
2
7
,
5
0
,
1
.
3
8
,
8
7
6
.
0
,
S
t
r
e
n
g
t
h
,
2
6
.
5
,
2
.
1
,
4
,
2
,
2
1
.
8
4
1

<apache_beam.runners.portability.fn_api_runner.fn_runner.RunnerResult at 0x79972a9d7610>

Step 5: Implement a ParDo Transform in Apache Beam

In [34]:
# Define a custom DoFn to filter and process rows with float values for calories
class FilterHighCalories(beam.DoFn):
    def process(self, element):
        fields = element.split(',')
        member_id = fields[0]
        try:
            calories = float(fields[2])  # Parse as float
            if calories > 500:  # Check if calories burned is greater than 500
                yield f"Member ID: {member_id}, Calories Burned: {calories}"
        except ValueError:
            yield f"Invalid calories value in row: {element}"

# Define and run the pipeline
pipeline = beam.Pipeline()

(
    pipeline
    | "Read Data" >> beam.io.ReadFromText(dataset_path, skip_header_lines=1)
    | "Filter High Calories" >> beam.ParDo(FilterHighCalories())  # Apply the custom ParDo
    | "Print Filtered Data" >> beam.Map(print)  # Print the filtered rows
)

# Run the pipeline
pipeline.run()


<apache_beam.runners.portability.fn_api_runner.fn_runner.RunnerResult at 0x79972a9ad5a0>

Step 6: Implement Triggers in Apache Beam

In [35]:
from apache_beam.transforms.window import FixedWindows

# Function to convert a row into a key-value pair
def to_key_value(row):
    fields = row.split(',')
    member_id = fields[0]  # Assuming the first column is member_id
    return member_id, 1  # Return a key-value pair with 1 as the value for counting

# Define and run the pipeline
pipeline = beam.Pipeline()

(
    pipeline
    | "Read Data" >> beam.io.ReadFromText(dataset_path, skip_header_lines=1)
    | "Add Timestamps" >> beam.Map(lambda row: beam.window.TimestampedValue(row, time.time()))  # Add timestamps
    | "To Key-Value Pairs" >> beam.Map(to_key_value)  # Convert rows to key-value pairs
    | "Apply Fixed Windows" >> beam.WindowInto(FixedWindows(5))  # Fixed window of 5 seconds
    | "Count Elements Per Key" >> beam.CombinePerKey(sum)  # Sum values per key within each window
    | "Print Results" >> beam.Map(print)  # Print the counts
)

# Run the pipeline
pipeline.run()


('56', 27)
('46', 21)
('32', 21)
('25', 26)
('38', 20)
('36', 21)
('40', 25)
('28', 23)
('41', 26)
('53', 23)
('57', 23)
('20', 25)
('39', 23)
('19', 26)
('47', 23)
('55', 18)
('50', 33)
('29', 23)
('42', 27)
('44', 16)
('59', 19)
('45', 30)
('33', 21)
('54', 30)
('24', 15)
('26', 21)
('35', 17)
('21', 20)
('31', 24)
('43', 34)
('37', 19)
('52', 32)
('34', 24)
('23', 25)
('51', 19)
('27', 18)
('48', 17)
('58', 19)
('18', 27)
('22', 27)
('30', 19)
('49', 26)


<apache_beam.runners.portability.fn_api_runner.fn_runner.RunnerResult at 0x79972b7172e0>

Step 7: Simulate Streaming with Apache Beam

In [36]:
from apache_beam.transforms.window import FixedWindows

# Simulate a streaming data source
def simulate_streaming(file_path):
    with open(file_path, 'r') as file:
        next(file)  # Skip the header
        for line in file:
            time.sleep(0.1)  # Simulate delay between data rows
            yield line

# Define a DoFn to print results with windowing
class PrintResults(beam.DoFn):
    def process(self, element, window=beam.DoFn.WindowParam):
        yield f"Window {window.start.to_utc_datetime()} - {window.end.to_utc_datetime()}: {element}"

# Define and run the pipeline
pipeline = beam.Pipeline()

(
    pipeline
    | "Simulate Streaming" >> beam.Create(list(simulate_streaming(dataset_path)))  # Simulate streaming
    | "Add Timestamps" >> beam.Map(lambda row: beam.window.TimestampedValue(row, time.time()))  # Add timestamps
    | "To Key-Value Pairs" >> beam.Map(to_key_value)  # Convert rows to key-value pairs
    | "Apply Fixed Windows" >> beam.WindowInto(FixedWindows(5))  # Fixed window of 5 seconds
    | "Count Per Key" >> beam.CombinePerKey(sum)  # Aggregate counts per key
    | "Print Windowed Results" >> beam.ParDo(PrintResults())  # Print windowed results
)

# Run the pipeline
pipeline.run()


<apache_beam.runners.portability.fn_api_runner.fn_runner.RunnerResult at 0x799723fbaa40>

Step 8: Integrate All Concepts and Finalize the Pipeline

In [37]:
from apache_beam.transforms.window import FixedWindows
import time

# Composite transform to clean and format data
class CleanAndFormatData(beam.PTransform):
    def expand(self, pcoll):
        return (
            pcoll
            | "Split Rows" >> beam.Map(lambda row: row.split(','))  # Split rows by commas
            | "Filter Valid Data" >> beam.Filter(lambda fields: len(fields) > 2 and fields[2].replace('.', '', 1).isdigit())  # Ensure valid calories
            | "Format as Key-Value" >> beam.Map(lambda fields: (fields[0], float(fields[2])))  # Convert to (member_id, calories)
        )

# DoFn to process aggregated results and format output
class FormatOutput(beam.DoFn):
    def process(self, element, window=beam.DoFn.WindowParam):
        key, value = element
        # Handle timestamps correctly to avoid overflow
        window_start = window.start.to_utc_datetime().isoformat()
        window_end = window.end.to_utc_datetime().isoformat()
        yield f"Window {window_start} - {window_end} | Member ID: {key}, Total Calories Burned: {value}"

# Define and run the pipeline
pipeline = beam.Pipeline()

(
    pipeline
    | "Read Data" >> beam.io.ReadFromText(dataset_path, skip_header_lines=1)  # Read data from file
    | "Add Timestamps" >> beam.Map(lambda row: beam.window.TimestampedValue(row, time.time()))  # Add valid timestamps
    | "Clean and Format Data" >> CleanAndFormatData()  # Apply the composite transform
    | "Apply Fixed Windows" >> beam.WindowInto(FixedWindows(5))  # Group data into fixed windows
    | "Sum Calories Per Member" >> beam.CombinePerKey(sum)  # Sum calories per member in each window
    | "Format and Print Output" >> beam.ParDo(FormatOutput())  # Format and print the results
)

# Run the pipeline
pipeline.run()



<apache_beam.runners.portability.fn_api_runner.fn_runner.RunnerResult at 0x79972c048580>

Step 9: Add Metrics and Refine Results

In [38]:
from apache_beam.transforms.window import FixedWindows

# Define and run the pipeline
pipeline = beam.Pipeline()

(
    pipeline
    | "Read Data" >> beam.io.ReadFromText(dataset_path, skip_header_lines=1)  # Read data from the file
    | "Add Timestamps" >> beam.Map(lambda row: beam.window.TimestampedValue(row, time.time()))  # Add valid timestamps
    | "Clean and Format Data" >> CleanAndFormatData()  # Clean and format data
    | "Apply Fixed Windows" >> beam.WindowInto(FixedWindows(5))  # Group into fixed windows
    | "Sum Calories Per Member" >> beam.CombinePerKey(sum)  # Aggregate calories by member within windows
    | "Format for Sorting" >> beam.Map(lambda x: (float(x[1]), str(x[0])))  # Ensure consistent data types
    | "Sort and Get Top 5" >> beam.FlatMap(
        lambda data: sorted([data], key=lambda x: x[0], reverse=True)[:5]
    )  # Sort within each window and get the top 5
    | "Print Results" >> beam.Map(print)  # Print final output
)

# Run the pipeline
pipeline_result = pipeline.run()
pipeline_result.wait_until_finish()


(2187.199999999999, '56')
(1428.7, '46')
(1690.1999999999998, '32')
(1865.8999999999996, '25')
(1365.8, '38')
(1706.4, '36')
(1693.1000000000004, '40')
(1843.3999999999999, '28')
(1894.3999999999996, '41')
(1632.1, '53')
(1761.3, '57')
(2094.0999999999995, '20')
(1683.7, '39')
(1876.3, '19')
(1684.7, '47')
(1445.5999999999997, '55')
(2257.0000000000005, '50')
(1871.4, '29')
(1748.5000000000002, '42')
(1234.6, '44')
(1520.0000000000002, '59')
(2056.5000000000005, '45')
(1496.4, '33')
(2230.899999999999, '54')
(926.5000000000001, '24')
(1550.9, '26')
(1307.4, '35')
(1570.3000000000002, '21')
(1527.4000000000003, '31')
(2468.2999999999993, '43')
(1391.2, '37')
(2252.2999999999997, '52')
(1828.4, '34')
(1908.4, '23')
(1341.4999999999998, '51')
(1462.5, '27')
(1326.9, '48')
(1354.7000000000003, '58')
(1968.4000000000003, '18')
(2032.6, '22')
(1548.9, '30')
(1825.8000000000004, '49')


'DONE'

Step 10: Visualize Results Using Matplotlib


In [39]:
from apache_beam.transforms.window import FixedWindows

# Function to collect pipeline output for visualization
class CollectResults(beam.DoFn):
    def __init__(self):
        self.results = []

    def process(self, element):
        self.results.append(element)
        yield element

# Define and run the pipeline with proper collection
collector = CollectResults()

pipeline = beam.Pipeline()

(
    pipeline
    | "Read Data" >> beam.io.ReadFromText(dataset_path, skip_header_lines=1)
    | "Add Timestamps" >> beam.Map(lambda row: beam.window.TimestampedValue(row, time.time()))  # Add valid timestamps
    | "Clean and Format Data" >> CleanAndFormatData()  # Clean and format data
    | "Apply Fixed Windows" >> beam.WindowInto(FixedWindows(5))  # Group into fixed windows
    | "Sum Calories Per Member" >> beam.CombinePerKey(sum)  # Aggregate calories by member within windows
    | "Group By Window" >> beam.GroupByKey()  # Group results by window
    | "Collect Results" >> beam.ParDo(collector)  # Collect results for visualization
)

# Run the pipeline
pipeline_result = pipeline.run()
pipeline_result.wait_until_finish()

# Debug: Check collected results
if collector.results:
    print("Collected Results:")
    print(collector.results)

    # Process results for visualization
    final_results = []
    for _, grouped_data in collector.results:
        top_5 = sorted(grouped_data, key=lambda x: x[1], reverse=True)[:5]
        final_results.extend(top_5)

    # Visualize the top 5 performers
    if final_results:
        calories = [item[1] for item in final_results]
        member_ids = [item[0] for item in final_results]

        # Plot the results
        import matplotlib.pyplot as plt

        plt.figure(figsize=(10, 6))
        plt.bar(member_ids, calories, color="skyblue")
        plt.xlabel("Member ID")
        plt.ylabel("Total Calories Burned")
        plt.title("Top 5 Performers by Calories Burned")
        plt.xticks(rotation=45)
        plt.show()
    else:
        print("No data to visualize after sorting.")
else:
    print("No results collected.")


No results collected.


 Final Output Collection and Visualization

In [30]:
import matplotlib.pyplot as plt

# Function to collect pipeline output for visualization
class CollectResults(beam.DoFn):
    def __init__(self):
        self.results = []

    def process(self, element):
        self.results.append(element)
        yield element

# Collector to store results
collector = CollectResults()

# Define and run the pipeline
pipeline = beam.Pipeline()

(
    pipeline
    | "Read Data" >> beam.io.ReadFromText(dataset_path, skip_header_lines=1)
    | "Add Timestamps" >> beam.Map(lambda row: beam.window.TimestampedValue(row, time.time()))  # Add valid timestamps
    | "Clean and Format Data" >> CleanAndFormatData()  # Clean and format data
    | "Apply Fixed Windows" >> beam.WindowInto(FixedWindows(5))  # Group into fixed windows
    | "Sum Calories Per Member" >> beam.CombinePerKey(sum)  # Aggregate calories by member within windows
    | "Collect Results" >> beam.ParDo(collector)  # Collect results for visualization
)

# Run the pipeline
pipeline_result = pipeline.run()
pipeline_result.wait_until_finish()

# Process and visualize results
if collector.results:
    # Print collected results for debugging
    print("Collected Results:")
    print(collector.results)

    # Extract data for visualization
    member_ids, calories = zip(*collector.results)  # Unpack results into separate lists

    # Plot the results
    plt.figure(figsize=(10, 6))
    plt.bar(member_ids, calories, color="skyblue")
    plt.xlabel("Member ID")
    plt.ylabel("Total Calories Burned")
    plt.title("Top Performers by Calories Burned")
    plt.xticks(rotation=45)
    plt.show()
else:
    print("No results collected for visualization.")


No results collected for visualization.
