# Amazon US Reviews Dataset – Cleaning & Validation

## Objectives
- Identify and remove only reviews that are *truly empty* or *junk*.
- Retain all short, valid reviews.
- Log counts and reasons for each removal for auditability.

## Key Cleaning Rules
- Remove if: review is empty, only whitespace, or non-language (e.g., ".", "asdf", "!!!" by itself).
- Retain if: short but meaningful (e.g., "ok", "good", "5 stars", emojis, single word adjectives).
- Optionally deduplicate exact review text per (product_id, customer_id).

## Cleaning Pipeline Steps
1. Flag empty/whitespace-only reviews.
2. Flag likely junk (regex: single punctuation, repeated chars, random keyboard mashes).
3. (Optional) Deduplicate by review_id or (product_id, customer_id, review_body).
4. Summarize number and percent removed in each category.

In [None]:
import os
import subprocess

java_home = subprocess.check_output(
    "/usr/libexec/java_home -v 17", shell=True, text=True
).strip()
os.environ["JAVA_HOME"] = java_home
os.environ["PATH"] = f"{java_home}/bin:" + os.environ["PATH"]
print("[JAVA CONFIGURED] JAVA_HOME set to:", os.environ["JAVA_HOME"])


In [None]:
import os
os.environ["PYSPARK_SUBMIT_ARGS"] = "--driver-memory 8G --executor-memory 8G pyspark-shell"

import glob
from pyspark.sql import SparkSession
import pyspark.sql.functions as F

spark = SparkSession.builder.appName("Amazon Review Cleaning").getOrCreate()
raw_root = "../data/raw"
processed_root = "../data/processed"
parquet_files = glob.glob(os.path.join(raw_root, "**/*.parquet"), recursive=True)
print(f"Found {len(parquet_files)} raw Parquet files.")

junk_regex = r"^(?:[.!?*\s]{1,5}|asdf|qwerty|lorem ipsum)$"

for pf in parquet_files:
    print(f"Cleaning: {pf}")
    df = spark.read.parquet(pf)
    orig_count = df.count()
    df_clean = df.filter(F.length(F.trim(F.col("review_body"))) > 0)
    df_clean = df_clean.filter(~F.lower(F.trim(F.col("review_body"))).rlike(junk_regex))
    # Split into more partitions for large files
    df_clean = df_clean.repartition(16)
    clean_count = df_clean.count()
    removed = orig_count - clean_count
    print(f"Rows before: {orig_count}, after cleaning: {clean_count}, removed: {removed} ({removed/orig_count:.2%})")
    rel_path = os.path.relpath(pf, raw_root)
    out_path = os.path.join(processed_root, rel_path)
    out_dir = os.path.dirname(out_path)
    os.makedirs(out_dir, exist_ok=True)
    df_clean.write.mode("overwrite").option("compression", "snappy").parquet(out_path)
    print(f"Saved cleaned file: {out_path}")

spark.stop()
print("Batch cleaning complete.")