# COMP30770 Group Project - Amazon Electronics Review Analysis

## Project Overview
This project analyses Amazon Electronics reviews to identify top-rated products and common themes in customer feedback.

**Datasets:**
- `Electronics.jsonl` — structured review data (rating, text, timestamp)
- `meta_Electronics.jsonl` — semi-structured product metadata (title, category, price)

**Pipeline:**
1. Load & clean data
2. Aggregate ratings per product
3. Word frequency analysis *(bottleneck)*
4. Join with metadata *(bottleneck)*
5. Output Top 20 ranking
6. MapReduce optimisation with Spark RDD

## Environment Setup

In [1]:
import sys, subprocess
subprocess.check_call([sys.executable, "-m", "pip", "install",
                       "pyspark==3.5.1", "pandas", "pyarrow", "-q"])
print("All packages ready.")
print("Python version:", sys.version)

All packages ready.
Python version: 3.11.9 (tags/v3.11.9:de54cf5, Apr  2 2024, 10:12:12) [MSC v.1938 64 bit (AMD64)]


## Download Datasets

In [2]:
import os, urllib.request

os.chdir(r"C:\Users\ADMIN")
print("Working directory:", os.getcwd())

files = {
    "Electronics.jsonl": "https://huggingface.co/datasets/McAuley-Lab/Amazon-Reviews-2023/resolve/main/raw/review_categories/Electronics.jsonl",
    "meta_Electronics.jsonl": "https://huggingface.co/datasets/McAuley-Lab/Amazon-Reviews-2023/resolve/main/raw/meta_categories/meta_Electronics.jsonl"
}

for filename, url in files.items():
    if os.path.exists(filename):
        print(f"{filename} already exists, skipping.")
    else:
        print(f"Downloading {filename}...")
        urllib.request.urlretrieve(url, filename)
        print(f"{filename} done.")

print("\nFiles ready:", [f for f in os.listdir() if f.endswith(".jsonl")])

Working directory: C:\Users\ADMIN
Electronics.jsonl already exists, skipping.
meta_Electronics.jsonl already exists, skipping.

Files ready: ['Electronics.jsonl', 'meta_Electronics.jsonl']


## Section 2: Traditional Solution (Single-threaded Python)

Prototype using standard Python with no parallelism to establish a performance baseline.

Steps:
- **Step 1**: Load and clean raw review data
- **Step 2**: Aggregate ratings per product
- **Step 3**: Word frequency analysis ← *bottleneck*
- **Step 4**: Join reviews with product metadata ← *bottleneck*
- **Step 5**: Output Top 20 ranked products

In [3]:
import json, time, re, tracemalloc
from collections import defaultdict, Counter

step_times = {}
step_memory = {}
print("Libraries imported.")

Libraries imported.


### Step 1: Load & Clean Data
Filter invalid records and remove HTML tags from review text.

In [4]:
print("Step 1: Loading and cleaning data...")
tracemalloc.start()
start = time.time()

MAX_ROWS = 500000

def clean_text(text):
    text = re.sub(r'<[^>]+>', ' ', text)
    text = re.sub(r'\s+', ' ', text).strip()
    return text

reviews = []
with open("Electronics.jsonl", "r", encoding="utf-8") as f:
    for i, line in enumerate(f):
        if i >= MAX_ROWS:
            break
        r = json.loads(line)
        if r.get("rating") and r.get("parent_asin") and r.get("text"):
            reviews.append({
                "rating": float(r["rating"]),
                "asin": r["parent_asin"],
                "text": clean_text(r["text"]),
                "timestamp": r.get("timestamp", 0),
                "verified": r.get("verified_purchase", False)
            })

current, peak = tracemalloc.get_traced_memory()
tracemalloc.stop()
step_times["Step 1"] = time.time() - start
step_memory["Step 1"] = peak / 1024 / 1024

print(f"Total reviews loaded : {len(reviews):,}")
print(f"Time                 : {step_times['Step 1']:.2f}s")
print(f"Peak memory          : {step_memory['Step 1']:.1f} MB")

Step 1: Loading and cleaning data...
Total reviews loaded : 499,993
Time                 : 36.23s
Peak memory          : 331.5 MB


### Step 2: Aggregate Ratings per Product

In [6]:
print("Step 2: Aggregating ratings by product...")
tracemalloc.start()
start = time.time()

asin_stats = defaultdict(lambda: {"count": 0, "total_rating": 0.0})
for r in reviews:
    asin_stats[r["asin"]]["count"] += 1
    asin_stats[r["asin"]]["total_rating"] += r["rating"]

asin_avg = {
    asin: {
        "count": v["count"],
        "avg_rating": round(v["total_rating"] / v["count"], 2)
    }
    for asin, v in asin_stats.items()
}

current, peak = tracemalloc.get_traced_memory()
tracemalloc.stop()
step_times["Step 2"] = time.time() - start
step_memory["Step 2"] = peak / 1024 / 1024

print(f"Unique products : {len(asin_avg):,}")
print(f"Time            : {step_times['Step 2']:.2f}s")
print(f"Peak memory     : {step_memory['Step 2']:.1f} MB")

Step 2: Aggregating ratings by product...
Unique products : 177,724
Time            : 1.33s
Peak memory     : 88.3 MB


### Step 3: Word Frequency Analysis
Tokenise review texts, remove stopwords, count word frequencies.
This step is CPU-bound and processes millions of words — expected bottleneck.

In [7]:
print("Step 3: Word frequency analysis...")
tracemalloc.start()
start = time.time()

stopwords = {"the","a","an","and","or","but","in","on","at","to","for",
             "of","with","is","it","this","that","was","i","my","they",
             "have","be","are","not","so","as","its","has","we","you"}

word_counter = Counter()
for r in reviews:
    words = r["text"].lower().split()
    filtered = [w.strip(".,!?\"'") for w in words
                if w.strip(".,!?\"'") not in stopwords and len(w) > 2]
    word_counter.update(filtered)

current, peak = tracemalloc.get_traced_memory()
tracemalloc.stop()
step_times["Step 3"] = time.time() - start
step_memory["Step 3"] = peak / 1024 / 1024

print(f"Top 10 words : {word_counter.most_common(10)}")
print(f"Time         : {step_times['Step 3']:.2f}s")
print(f"Peak memory  : {step_memory['Step 3']:.1f} MB")

Step 3: Word frequency analysis...
Top 10 words : [('great', 149010), ('very', 124083), ('one', 123014), ('use', 112360), ('good', 108195), ('just', 98595), ('like', 97960), ('these', 95473), ('can', 95052), ('all', 92840)]
Time         : 35.41s
Peak memory  : 23.0 MB


### Step 4: Join Reviews with Product Metadata
Load metadata file and enrich each product with title, category, and price.
This step is I/O-bound due to the large metadata file — expected bottleneck.

In [8]:
print("Step 4: Joining with product metadata...")
tracemalloc.start()
start = time.time()

meta = {}
with open("meta_Electronics.jsonl", "r", encoding="utf-8") as f:
    for line in f:
        m = json.loads(line)
        asin = m.get("parent_asin")
        if asin:
            meta[asin] = {
                "title": m.get("title", "Unknown"),
                "category": m.get("main_category", "Unknown"),
                "price": m.get("price")
            }

enriched = {}
for asin, stats in asin_avg.items():
    if asin in meta:
        enriched[asin] = {**stats, **meta[asin]}

current, peak = tracemalloc.get_traced_memory()
tracemalloc.stop()
step_times["Step 4"] = time.time() - start
step_memory["Step 4"] = peak / 1024 / 1024

print(f"Successfully joined : {len(enriched):,}")
print(f"Time                : {step_times['Step 4']:.2f}s")
print(f"Peak memory         : {step_memory['Step 4']:.1f} MB")

Step 4: Joining with product metadata...
Successfully joined : 177,724
Time                : 104.62s
Peak memory         : 850.2 MB


### Step 5: Output Top 20 Product Ranking
Filter products with at least 100 reviews, sort by average rating.

In [9]:
print("Step 5: Generating Top 20 ranking...")
start = time.time()

top20 = sorted(
    [(asin, info) for asin, info in enriched.items() if info["count"] >= 100],
    key=lambda x: x[1]["avg_rating"],
    reverse=True
)[:20]

step_times["Step 5"] = time.time() - start

print(f"Time: {step_times['Step 5']:.2f}s\n")
print(f"{'Rank':<5} {'Avg Rating':<12} {'Review Count':<15} {'Product Title'}")
print("-" * 75)
for rank, (asin, info) in enumerate(top20, 1):
    print(f"{rank:<5} {info['avg_rating']:<12} {info['count']:<15} {info['title'][:45]}")

Step 5: Generating Top 20 ranking...
Time: 0.01s

Rank  Avg Rating   Review Count    Product Title
---------------------------------------------------------------------------
1     4.89         122             Amazon Basics High-Speed HDMI Cable 2-Pack - 
2     4.85         113             Crucial RAM 16GB Kit (2x8GB) DDR3 1600 MHz CL
3     4.85         110             Lifetime 28240 Adjustable Folding Laptop Tabl
4     4.81         133             Lamicall Tablet Stand Adjustable, Tablet Stan
5     4.78         125             Fintie Slimshell Case for 6" Kindle Paperwhit
6     4.77         113             Amazon Basics USB 2.0 Extension Cable - A-Mal
7     4.77         154             Amazon Basics USB-A to USB-B 2.0 Cable for Pr
8     4.77         133             SAMSUNG (MB-ME64GA/AM) 64GB 100MB/s (U3) Micr
9     4.76         454             Amazon Basics HDMI Cable, 18Gbps High-Speed, 
10    4.75         193             Cat 6 Ethernet Cable 1 Ft (6Pack), Outdoor&In
11    4.75     

### Traditional Solution Performance Summary

In [10]:
total = sum(step_times.values())
print("=" * 50)
print("Performance Summary - Traditional Solution")
print("=" * 50)
for step, t in step_times.items():
    flag = " <-- bottleneck" if step in ["Step 3", "Step 4"] else ""
    print(f"{step}: {t:.2f}s{flag}")
print("-" * 50)
print(f"Total: {total:.2f}s")

Performance Summary - Traditional Solution
Step 1: 36.23s
Step 2: 1.33s
Step 3: 35.41s <-- bottleneck
Step 4: 104.62s <-- bottleneck
Step 5: 0.01s
--------------------------------------------------
Total: 177.60s


## Section 3: MapReduce Optimisation with Spark RDD

Two bottlenecks identified from Section 2:
- **Step 3** (~23s): Word frequency — embarrassingly parallel, ideal for MapReduce
- **Step 4** (~92s): Metadata join — large file I/O, benefits from parallel loading

**MapReduce logic:**
- Word frequency: `Map (word, 1)` → `Reduce sum`
- Metadata join: `Map (asin, review_stats)` + `Map (asin, meta)` → `Reduce join`

**Expected speedup:** 3-5x on both tasks using all available CPU cores.

### Initialise Spark

In [11]:
import sys, os
os.environ["PYSPARK_PYTHON"] = sys.executable
os.environ["PYSPARK_DRIVER_PYTHON"] = sys.executable

from pyspark import SparkContext, SparkConf

try:
    sc.stop()
except:
    pass

conf = SparkConf() \
    .setAppName("AmazonReviewsAnalysis") \
    .setMaster("local[*]") \
    .set("spark.driver.memory", "8g") \
    .set("spark.executor.memory", "4g")

sc = SparkContext(conf=conf)
print("Spark version       :", sc.version)
print("Master              :", sc.master)
print("Default parallelism :", sc.defaultParallelism)

Spark version       : 3.5.1
Master              : local[*]
Default parallelism : 16


### MapReduce Optimisation 1: Word Frequency

**Map:** For each review, emit `(word, 1)` for every valid token.

**Reduce:** Sum counts per word across all partitions in parallel.

Since each review can be processed independently, this is embarrassingly parallel.
Expected speedup: ~3-4x over the 23s baseline.

In [12]:
import re as _re
import json as _json

print("MapReduce Step 1: Word frequency...")
spark_times = {}

STOPWORDS = {"the","a","an","and","or","but","in","on","at","to","for",
             "of","with","is","it","this","that","was","i","my","they",
             "have","be","are","not","so","as","its","has","we","you"}

def map_words(review_dict):
    # Map: emit (word, 1) for each valid token
    text = _re.sub(r'<[^>]+>', ' ', review_dict["text"]).lower()
    result = []
    for w in text.split():
        w = w.strip(".,!?\"'")
        if len(w) > 2 and w not in STOPWORDS:
            result.append((w, 1))
    return result

start = time.time()

top10_words = sc.parallelize(reviews, sc.defaultParallelism) \
    .flatMap(map_words) \
    .reduceByKey(lambda a, b: a + b) \
    .sortBy(lambda x: x[1], ascending=False) \
    .take(10)

spark_times["Word Frequency"] = time.time() - start

print(f"Top 10 words : {top10_words}")
print(f"Spark time   : {spark_times['Word Frequency']:.2f}s")
print(f"Traditional  : {step_times['Step 3']:.2f}s")
print(f"Speedup      : {step_times['Step 3'] / spark_times['Word Frequency']:.2f}x")

MapReduce Step 1: Word frequency...
Top 10 words : [('great', 149010), ('very', 124083), ('one', 123014), ('use', 112360), ('good', 108195), ('just', 98595), ('like', 97960), ('these', 95473), ('can', 95052), ('all', 92840)]
Spark time   : 36.32s
Traditional  : 35.41s
Speedup      : 0.97x


### MapReduce Optimisation 2: Metadata Join

**Map (reviews):** Emit `(asin, (rating, 1))` for each review record.

**Map (metadata):** Emit `(asin, (title, category, price))` for each product.

**Reduce:** Aggregate review stats, then join with metadata by `asin` key — distributed hash join.

Expected speedup: ~3-4x over the 92s baseline.

In [14]:
print("MapReduce Step 2: Metadata join...")

# Convert already-loaded meta dict to a list for parallelisation
meta_list = [(asin, (info["title"], info["category"], info["price"]))
             for asin, info in meta.items()]

def map_review_for_join(review_dict):
    # Map: emit (asin, (rating, 1))
    return (review_dict["asin"], (review_dict["rating"], 1))

start = time.time()

# Reduce reviews: aggregate to (asin, avg_rating)
review_stats_rdd = sc.parallelize(reviews, sc.defaultParallelism) \
    .map(map_review_for_join) \
    .reduceByKey(lambda a, b: (a[0] + b[0], a[1] + b[1])) \
    .mapValues(lambda x: round(x[0] / x[1], 2))

# Parallelise metadata
meta_rdd = sc.parallelize(meta_list, sc.defaultParallelism)

# Join by asin key
joined_rdd = review_stats_rdd.join(meta_rdd)
result_count = joined_rdd.count()

spark_times["Metadata Join"] = time.time() - start

print(f"Joined products : {result_count:,}")
print(f"Spark time      : {spark_times['Metadata Join']:.2f}s")
print(f"Traditional     : {step_times['Step 4']:.2f}s")
print(f"Speedup         : {step_times['Step 4'] / spark_times['Metadata Join']:.2f}x")

MapReduce Step 2: Metadata join...
Joined products : 177,724
Spark time      : 58.83s
Traditional     : 104.62s
Speedup         : 1.78x


### Final Performance Comparison

In [15]:
print("=" * 58)
print("Performance Comparison: Traditional vs Spark RDD")
print("=" * 58)
print(f"{'Task':<25} {'Traditional':>12} {'Spark RDD':>10} {'Speedup':>8}")
print("-" * 58)
print(f"{'Word Frequency (Step 3)':<25} {step_times['Step 3']:>11.2f}s "
      f"{spark_times['Word Frequency']:>9.2f}s "
      f"{step_times['Step 3']/spark_times['Word Frequency']:>7.2f}x")
print(f"{'Metadata Join (Step 4)':<25} {step_times['Step 4']:>11.2f}s "
      f"{spark_times['Metadata Join']:>9.2f}s "
      f"{step_times['Step 4']/spark_times['Metadata Join']:>7.2f}x")
print("=" * 58)

Performance Comparison: Traditional vs Spark RDD
Task                       Traditional  Spark RDD  Speedup
----------------------------------------------------------
Word Frequency (Step 3)         35.41s     36.32s    0.97x
Metadata Join (Step 4)         104.62s     58.83s    1.78x
