# Try Apache Beam - Python

In this notebook, we set up your development environment and work through a simple example using the [DirectRunner](https://beam.apache.org/documentation/runners/direct/). You can explore other runners with the [Beam Capatibility Matrix](https://beam.apache.org/documentation/runners/capability-matrix/).

To navigate through different sections, use the table of contents. From **View**  drop-down list, select **Table of contents**.

To run a code cell, you can click the **Run cell** button at the top left of the cell, or by select it and press **`Shift+Enter`**. Try modifying a code cell and re-running it to see what happens.

To learn more about Colab, see [Welcome to Colaboratory!](https://colab.sandbox.google.com/notebooks/welcome.ipynb).

# Setup

First, you need to set up your environment, which includes installing `apache-beam` and downloading a text file from Cloud Storage to your local file system. We are using this file to test your pipeline.

In [21]:
# Run and print a shell command.
def run(cmd):
    print('>> {}'.format(cmd))
    !{cmd}
    print('')

# Install apache-beam.
# run('pip install --quiet apache-beam')

# Copy the input file into the local file system.
run('mkdir -p data')
run('cp /Users/noviadsilva/Desktop/MLOps/Labs/Data_Labs/Apache_Beam_Labs/data/amazon.csv data/')

>> mkdir -p data

>> cp /Users/noviadsilva/Desktop/MLOps/Labs/Data_Labs/Apache_Beam_Labs/data/amazon.csv data/
cp: data/amazon.csv and /Users/noviadsilva/Desktop/MLOps/Labs/Data_Labs/Apache_Beam_Labs/data/amazon.csv are identical (not copied).



# Word count with comments

In this lab i have advanced the basic Apache Beam word count example by adapting it to process real-world Amazon CSV data instead of plain text files, implementing CSV parsing, data validation, and dual outputs. This transformation demonstrates how the same pipeline pattern can analyze structured e-commerce data to extract business insights like product naming trends and popular keywords

In [None]:
"""
Word Count Analysis on Amazon Product Names
Uses Apache Beam - Counts words in Amazon product names
"""

import apache_beam as beam
import csv
import re
import os

# ==================== HELPER FUNCTION ====================

def parse_amazon_line(line):
    """
    Parse CSV line and extract product name
    Returns the product name as a string
    """
    try:
        # Use csv reader to handle quoted fields
        reader = csv.reader([line])
        fields = next(reader)
        
        if len(fields) < 2:
            return None
        
        # Get product name (field index 1)
        product_name = fields[1].strip()
        
        if not product_name:
            return None
            
        return product_name
    except:
        return None


# ==================== MAIN PIPELINE ====================

# Setup paths - ALL FILES MENTION "AMAZON"
inputs_pattern = 'data/amazon.csv'
outputs_prefix = 'outputs/amazon_word_count'
os.makedirs('outputs', exist_ok=True)

print("Starting Amazon Product Name Word Count Pipeline...")
print("=" * 60)

# Running locally in the DirectRunner
with beam.Pipeline() as pipeline:
    
    # Store the word counts in a PCollection
    # Each element is a tuple of (word, count) of types (str, int)
    word_counts = (
        # The input PCollection is an empty pipeline
        pipeline
        
        # Read lines from Amazon CSV file (skip header)
        | 'Read Amazon CSV' >> beam.io.ReadFromText(inputs_pattern, skip_header_lines=1)
        # Element type: str - CSV line
        
        # Parse CSV to get Amazon product names
        | 'Extract Amazon product names' >> beam.Map(parse_amazon_line)
        # Element type: str - product name
        
        # Filter out invalid/None values
        | 'Filter valid Amazon products' >> beam.Filter(lambda x: x is not None)
        # Element type: str - valid product name
        
        # Use a regular expression to iterate over all words in the product name
        # FlatMap will yield an element for every element in an iterable
        | 'Find words in Amazon products' >> beam.FlatMap(lambda name: re.findall(r"[a-zA-Z']+", name))
        # Element type: str - word
        
        # Convert words to lowercase for better grouping
        | 'Lowercase words' >> beam.Map(lambda word: word.lower())
        # Element type: str - lowercase word
        
        # Create key-value pairs where the value is 1, this way we can group by
        # the same word while adding those 1s and get the counts for every word
        | 'Pair words with 1' >> beam.Map(lambda word: (word, 1))
        # Element type: (str, int) - key: word, value: 1
        
        # Group by key while combining the value using the sum() function
        | 'Group and sum' >> beam.CombinePerKey(sum)
        # Element type: (str, int) - key: word, value: counts
    )
    
    # We can process a PCollection through other pipelines too
    (
        # The input PCollection is the word_counts created from the previous step
        word_counts
        
        # Format the results into a string so we can write them to a file
        | 'Format results' >> beam.Map(lambda word_count: str(word_count))
        # Element type: str - text line
        
        # Finally, write the results to a file
        | 'Write Amazon word count results' >> beam.io.WriteToText(outputs_prefix)
    )
    
    # Also create a TOP 50 AMAZON WORDS analysis
    (
        word_counts
        
        # Get top 50 most common words in Amazon products
        | 'Get Top 50 Amazon words' >> beam.transforms.combiners.Top.Of(50, key=lambda x: x[1])
        # Element type: list of (str, int)
        
        # Flatten the list
        | 'Flatten top Amazon words' >> beam.FlatMap(lambda x: x)
        # Element type: (str, int)
        
        # Format nicely
        | 'Format top Amazon words' >> beam.Map(
            lambda x: f"{x[0]:20} {x[1]:>8,} times"
        )
        # Element type: str
        
        # Write to separate file with Amazon in the name
        | 'Write top Amazon words' >> beam.io.WriteToText(
            'outputs/amazon_top_50_words',
            shard_name_template='',
            file_name_suffix='.txt',
            header='=== TOP 50 MOST COMMON WORDS IN AMAZON PRODUCT NAMES ==='
        )
    )

print("\nAmazon Word Count Pipeline execution complete!")
print("\nGenerated Amazon Analysis Files:")
print("   outputs/amazon_word_count-00000-of-00001  (All Amazon word counts)")
print("   outputs/amazon_top_50_words.txt           (Top 50 Amazon words formatted)")
print("=" * 60)

# Sample the first 200 Amazon word count results
print("\nFirst 200 Amazon Word Counts:")
print("=" * 60)

def run(cmd):
    print('>> {}'.format(cmd))
    os.system(cmd)
    print('')

# Display first 200 word counts from Amazon products
run('head -n 200 {}-00000-of-*'.format(outputs_prefix))

🚀 Starting Amazon Product Name Word Count Pipeline...





Amazon Word Count Pipeline execution complete!

Generated Amazon Analysis Files:
   outputs/amazon_word_count-00000-of-00001  (All Amazon word counts)
   outputs/amazon_top_50_words.txt           (Top 50 Amazon words formatted)

First 200 Amazon Word Counts:
>> head -n 200 outputs/amazon_word_count-00000-of-*
('wayona', 24)
('nylon', 56)
('braided', 94)
('usb', 419)
('to', 219)
('lightning', 53)
('fast', 212)
('charging', 247)
('and', 331)
('data', 136)
('sync', 92)
('cable', 414)
('compatible', 146)
('for', 674)
('iphone', 92)
('x', 79)
('ipad', 83)
('air', 54)
('pro', 129)
('mini', 78)
('ft', 34)
('pack', 76)
('of', 101)
('grey', 106)
('ambrane', 29)
('unbreakable', 23)
('w', 210)
('a', 267)
('m', 183)
('type', 243)
('c', 314)
('smartphones', 32)
('tablets', 19)
('laptops', 18)
('other', 19)
('devices', 65)
('pd', 31)
('technology', 43)
('mbps', 69)
('quick', 41)
('charge', 76)
('rct', 5)
('black', 530)
('sounce', 11)
('phone', 40)
('ios', 12)
('boat', 69)
('deuce', 4)
('in', 184)
(

In [17]:
# Run this:
with open('/Users/noviadsilva/Desktop/MLOps/Labs/Data_Labs/Apache_Beam_Labs/data/amazon.csv', 'r', encoding='utf-8') as f:
    for i in range(5):
        print(f.readline().strip())

product_id,product_name,category,discounted_price,actual_price,discount_percentage,rating,rating_count,about_product,user_id,user_name,review_id,review_title,review_content,img_link,product_link
B07JW9H4J1,"Wayona Nylon Braided USB to Lightning Fast Charging and Data Sync Cable Compatible for iPhone 13, 12,11, X, 8, 7, 6, 5, iPad Air, Pro, Mini (3 FT Pack of 1, Grey)",Computers&Accessories|Accessories&Peripherals|Cables&Accessories|Cables|USBCables,₹399,"₹1,099",64%,4.2,"24,269","High Compatibility : Compatible With iPhone 12, 11, X/XsMax/Xr ,iPhone 8/8 Plus,iPhone 7/7 Plus,iPhone 6s/6s Plus,iPhone 6/6 Plus,iPhone 5/5s/5c/se,iPad Pro,iPad Air 1/2,iPad mini 1/2/3,iPod nano7,iPod touch and more apple devices.|Fast Charge&Data Sync : It can charge and sync simultaneously at a rapid speed, Compatible with any charging adaptor, multi-port charging station or power bank.|Durability : Durable nylon braided design with premium aluminum housing and toughened nylon fiber wound tightly around the