In [3]:
import polars as pl
from amazon.constants import *
import gc

`extract_brand()` returns a Polars expression that creates a brand column which holds either the value from store after stripping white spaces or the value from the "details" column after removing the 'Brand:'

`process_category()` loads the meta and review parquet files into LazyFrames and joins them on parent_asin. It filters rows with valid ratings and non-empty text, adds the brand column using extract_brand(), removes duplicates, and adds a "year" column extracted from the timestamp. The cleaned LazyFrame is then saved as a Parquet file in the intermediate folder.

In [None]:
def extract_brand() -> pl.Expr:
    store_clean: pl.Then = pl.when(pl.col("store").str.strip_chars() != "").then(pl.col("store"))
    details_brand: pl.Expr = pl.col("details").str.extract(r"Brand[:\s\-]*([A-Za-z0-9&\s]+)", 1)
    return pl.coalesce([store_clean, details_brand, pl.lit("Unknown")]).alias("brand")


def process_category(category: str) -> None:
    lf_review: pl.LazyFrame = pl.scan_parquet(f"{RAW}/{REVIEW}/{category}.parquet")
    lf_meta: pl.LazyFrame = pl.scan_parquet(f"{RAW}/{META}/{category}.parquet")

    # a) Merge on parent asin
    lf: pl.LazyFrame = lf_review.join(lf_meta, on="parent_asin", how="inner")
    
    # b) Handle Invalid / Missing Values
    lf: pl.LazyFrame = lf.filter(pl.col("rating").is_in([1, 2, 3, 4, 5]))
    lf = lf.filter(pl.col("text").str.strip_chars().str.len_chars() > 0)
    lf = lf.with_columns([extract_brand()])
    
    # c) Remove Duplicates
    lf = lf.unique(subset=["user_id", "text", "asin"], keep="first")
   
    # d) Derived Columns:
    lf = lf.with_columns([
        pl.col("text").str.count_matches(r"\b\w+\b").alias("review_length"),
        (pl.col("timestamp").cast(pl.Datetime("ms")).dt.year()).alias("year")
    ])

    lf.sink_parquet(f"{INTERMEDIATE}/{category}.parquet", engine="streaming")

Process and clean each parquet using the functions detailed above and uses grabage collections freeing up RAM on each iteration.

In [None]:
for category in ALL_CATEGORIES:
    print(f"Cleaning dataset for {category}...")
    process_category(category)
    gc.collect()

After cleaning each category parquet in `ALL_CATEGORIES` the parquets are then loaded into a lazy frame, merged into a singular combined parquet and saved in the processed folder(Gold Layer).

In [None]:
merged: pl.LazyFrame = pl.scan_parquet(f"{INTERMEDIATE}/*.parquet")
merged = merged.unique(subset=["user_id", "text", "asin"], keep="first")
merged.sink_parquet("data/processed/amazon-2023.parquet", engine="streaming")

Collated DataFrame was 502,984,947 rows.

In [None]:
# (502_984_947, 27)
merged.collect(engine="streaming").shape

(502_984_947, 27)
