In [1]:
# %% 
import csv, random, os

os.makedirs("data", exist_ok=True)

products = ["Headphones", "Charger", "Laptop", "Desk", "Lights"]

positive_phrases = [
    "good quality", "works great", "very useful", "amazing product",
    "excellent performance", "love it", "value for money", "super fast",
    "bright and clear", "comfortable to use"
]

neutral_phrases = [
    "okay", "decent", "average", "works fine", "not bad", "reasonable"
]

negative_phrases = [
    "bad", "poor quality", "stopped working", "heats up", "disappointing",
    "not working", "too expensive", "low quality"
]

rows = []
for i in range(1, 501):
    product = random.choice(products)
    sentiment_group = random.choices(
        [positive_phrases, neutral_phrases, negative_phrases],
        weights=[0.5, 0.3, 0.2],
        k=1
    )[0]
    review_text = " ".join(random.choices(sentiment_group, k=random.randint(1, 3)))
    rating = random.randint(1, 5)
    rows.append([i, product, review_text, rating])

with open("data/product_reviews.csv", "w", newline="", encoding="utf-8") as f:
    writer = csv.writer(f)
    writer.writerow(["review_id", "product_name", "review_text", "rating"])
    writer.writerows(rows)

print("CSV file created: data/product_reviews.csv (500 rows)")


CSV file created: data/product_reviews.csv (500 rows)


In [4]:
# %%
import apache_beam as beam
import csv
import re

# Input and output paths
input_file = 'data/product_reviews.csv'
output_prefix = 'outputs/product_wordcount'

def parse_csv(line):
    """Parse CSV line and return (product_name, review_text)."""
    try:
        for row in csv.reader([line]):
            if row[0] == 'review_id':  # Skip header
                return None
            return (row[1], row[2])  # (product_name, review_text)
    except Exception:
        return None

def extract_words(record):
    """Extract all words from review_text for each product."""
    product, review = record
    words = re.findall(r"[A-Za-z']+", review.lower())  # tokenize and lowercase
    return [(product, word) for word in words]

with beam.Pipeline() as p:
    (
        p
        | 'Read CSV' >> beam.io.ReadFromText(input_file, skip_header_lines=1)
        | 'Parse CSV' >> beam.Map(lambda line: next(csv.reader([line])))
        | 'Extract words' >> beam.FlatMap(
            lambda row: [(row[1], word) for word in re.findall(r"[A-Za-z']+", row[2].lower())]
        )
        | 'Pair with 1' >> beam.Map(lambda x: (x, 1))
        | 'Count per product-word' >> beam.CombinePerKey(sum)
        | 'Format results' >> beam.Map(lambda kv: f"{kv[0][0]} - {kv[0][1]}: {kv[1]}")
        | 'Write results' >> beam.io.WriteToText(output_prefix, file_name_suffix='.txt')
    )

print("Beam pipeline completed. Check 'outputs/' folder for results.")


Beam pipeline completed. Check 'outputs/' folder for results.
