# COMP 3610 – A3

- Zidane Timothy, Maia Neptune, Christophe Gittens

In [None]:
# import findspark
from pathlib import Path
import os
import tarfile
import pandas as pd
import shutil

import time, matplotlib.pyplot as plt, seaborn as sns, matplotlib.ticker as ticker
import numpy as np


In [None]:
from datasets import load_dataset
from pathlib import Path
import pyarrow as pa
import pyarrow.parquet as pq
from datetime import datetime
import json
import dask.dataframe as dd
import tarfile
import os

In [3]:
from dask.distributed import Client
client = Client()
print(client)


<Client: 'tcp://127.0.0.1:57545' processes=4 threads=12, memory=15.67 GiB>


## Function for extraction of RAW .tar files, creates .arrow files

In [4]:
def extract_tar_bz2(tar_path, extract_dir):
    if not os.path.exists(tar_path):
        print(f"Error: File {tar_path} does not exist.")
        return
    if not tar_path.endswith(".tar.bz2"):
        print(f"Error: File {tar_path} is not a .tar.bz2 file.")
        return

    try:
        with tarfile.open(tar_path, "r:bz2") as tar:
            print(f"Extracting {tar_path} to {extract_dir}")
            tar.extractall(path=extract_dir)
    except Exception as e:
        print(f"Error during extraction: {e}")


## Preprocess Category: Creates a temp path for management of Disk Storage <br>



In [5]:
import os
import shutil
from datasets import load_dataset
from pathlib import Path
import pandas as pd

def preprocess_category(review_tar_path, meta_tar_path, output_folder, category, batch_size=1000):
    temp_path = os.path.join(output_folder, "temp_extract", category)
    os.makedirs(temp_path, exist_ok=True)
    os.makedirs(output_folder, exist_ok=True)

    print(f"Extracting tar files for {category}...")
    extract_tar_bz2(review_tar_path, temp_path)
    extract_tar_bz2(meta_tar_path, temp_path)

    arrow_files = list(Path(temp_path).rglob("*.arrow"))
    print(f"Found {len(arrow_files)} Arrow files")

    batch_num = 0
    total_rows = 0

    for arrow_file in arrow_files:
        try:
            is_meta = "meta" in str(arrow_file).lower()
            folder_name = "meta" if is_meta else "reviews"
            out_path = os.path.join(output_folder, f"{folder_name}_parquet")
            os.makedirs(out_path, exist_ok=True)

            dataset = load_dataset("arrow", data_files=str(arrow_file), split="train", streaming=True)

            batch = []
            seen_keys = set()

            for row in dataset:
                if not row:
                    continue
                if not is_meta:
                    key = (row.get("user_id"), row.get("asin"), row.get("text"))
                    if key in seen_keys:
                        continue
                    seen_keys.add(key)
                batch.append(row)

                if len(batch) >= batch_size:
                    df = pd.DataFrame(batch)
                    df.to_parquet(os.path.join(out_path, f"{category}_batch_{batch_num}.parquet"), index=False)
                    print(f"Saved batch {batch_num} ({len(batch)} rows)")
                    batch = []
                    batch_num += 1
                    total_rows += 1

            if batch:
                df = pd.DataFrame(batch)
                df.to_parquet(os.path.join(out_path, f"{category}_batch_{batch_num}.parquet"), index=False)
                print(f"Saved final batch {batch_num} ({len(batch)} rows)")

        except Exception as e:
            print(f"Error processing {arrow_file.name}: {e}")

    shutil.rmtree(temp_path)
    print(f"Temp folder removed: {temp_path}")


Meta and Review parsing

In [6]:

def convert_to_dd(folder, category):
    files = [os.path.join(folder, f) for f in os.listdir(folder)
             if f.endswith(".parquet") and category.lower() in f.lower()]
    if not files:
        print("No parquet files found")
        return None
    df = dd.read_parquet(files)
    print(f"Loaded {len(files)} files into Dask DataFrame")
    return df


Dealing with the brand

In [7]:
def extract_brand(details, store):
    try:
        if isinstance(details, dict) and "brand" in details and details["brand"]:
            return details["brand"]
    except Exception:
        pass
    if isinstance(store, str) and store.strip():
        return store
    return "Unknown"

Clean data

In [8]:
import dask.dataframe as dd
import os
from dask.diagnostics import ProgressBar

def clean_data_dask(category, review_path, meta_path):
    output_dir = r"C:\Users\maian\Downloads\cleaned_files"
    os.makedirs(output_dir, exist_ok=True)

    print("Reading parquet files as Dask DataFrames")
    review_df = dd.read_parquet(review_path)
    meta_df = dd.read_parquet(meta_path)

    print("Merging review and meta on 'parent_asin'")
    merged = dd.merge(review_df, meta_df, on="parent_asin", how="left")

    print("Filtering bad data")
    if "rating" in merged.columns:
        merged = merged[merged["rating"].between(1, 5)]
    if "text" in merged.columns:
        merged = merged[merged["text"].notnull() & (merged["text"].str.strip() != "")]

    print("Extracting brand")
    def fast_extract_brand(details, store):
        if isinstance(details, dict) and details.get("brand"):
            return details["brand"]
        elif isinstance(store, str) and store.strip():
            return store
        return "Unknown"

    merged["brand"] = merged.map_partitions(
        lambda df: df.apply(lambda row: fast_extract_brand(row.get("details"), row.get("store")), axis=1),
        meta=("brand", "object")
    )

    print("Computing derived columns")
    if "text" in merged.columns:
        merged["review_length"] = merged["text"].str.split().map(
            lambda x: len(x) if x else 0, meta=("review_length", "int")
        )
    if "timestamp" in merged.columns:
        merged["year"] = dd.to_datetime(merged["timestamp"], unit="ms", errors="coerce").dt.year

    print("Selecting necessary columns")
    necessary_columns = [
        "user_id", "asin", "parent_asin", "rating", "text", "verified_purchase",
        "helpful_vote", "review_length", "year", "brand", "main_category",
        "title", "average_rating", "rating_number", "price"
    ]
    merged = merged[[col for col in necessary_columns if col in merged.columns]]

    # print("Repartitioning to reduce write overhead")
    # merged = merged.repartition(npartitions=50)

    # output_file = os.path.join(output_dir, f"{category}_cleaned.parquet")
    # print(f"Saving cleaned file to {output_file}")

    # with ProgressBar():
    #     merged.to_parquet(output_file, compression="snappy", write_index=False, overwrite=True)

    # print(f"Done! File saved to: {output_file}")

    return merged


## Define Categories that will be cleaned

In [None]:
categories = [
    "Grocery_and_Gourmet_Food",
    "Handmade_Products",
    "Health_and_Household",
    "Home_and_Kitchen",
    "Industrial_and_Scientific",
    "Kindle_Store",
    "Magazine_Subscriptions"
    "Movies_and_TV",
    "Musical_Instruments"
]


## Running Preprocess then Clean_Data for the Categories defined above

In [10]:
import os
import shutil
import gc
from pathlib import Path

base_dir = r"C:\Users\maian\OneDrive - The University of the West Indies, St. Augustine\Desktop\big_data_a3"
raw_dir = os.path.join(base_dir, "raw_files")
output_dir = os.path.join(base_dir, "output_folder")

# Store cleaned Dask DataFrames for each category
cleaned_all_categories = {}

for category in categories:
    print(f"\n=== Processing category: {category} ===")

    review_tar = os.path.join(raw_dir, f"raw_review_{category}.tar.bz2")
    meta_tar = os.path.join(raw_dir, f"raw_meta_{category}.tar.bz2")

    try:
        preprocess_category(review_tar, meta_tar, output_dir, category)

        review_df = convert_to_dd(os.path.join(output_dir, "reviews_parquet"), category)
        meta_df = convert_to_dd(os.path.join(output_dir, "meta_parquet"), category)

        if review_df is not None and meta_df is not None:
            cleaned = clean_data_dask(category, review_df, meta_df)
            cleaned_all_categories[category] = cleaned
            print(f"Cleaned and stored: {category}")
    except Exception as e:
        print(f"Error while processing {category}: {e}")

    finally:
        gc.collect()
        for sub in ["reviews_parquet", "meta_parquet", "temp_extract"]:
            path = os.path.join(output_dir, sub)
            if os.path.exists(path):
                try:
                    shutil.rmtree(path)
                    print(f"Deleted: {path}")
                except Exception as e:
                    print(f"Couldn't delete {path}: {e}")



=== Processing category: Magazine_Subscriptions ===
Extracting tar files for Magazine_Subscriptions...
Extracting C:\Users\maian\OneDrive - The University of the West Indies, St. Augustine\Desktop\big_data_a3\raw_files\raw_review_Magazine_Subscriptions.tar.bz2 to C:\Users\maian\OneDrive - The University of the West Indies, St. Augustine\Desktop\big_data_a3\output_folder\temp_extract\Magazine_Subscriptions


  tar.extractall(path=extract_dir)


Extracting C:\Users\maian\OneDrive - The University of the West Indies, St. Augustine\Desktop\big_data_a3\raw_files\raw_meta_Magazine_Subscriptions.tar.bz2 to C:\Users\maian\OneDrive - The University of the West Indies, St. Augustine\Desktop\big_data_a3\output_folder\temp_extract\Magazine_Subscriptions
Found 2 Arrow files
Saved batch 0 (1000 rows)
Saved batch 1 (1000 rows)
Saved batch 2 (1000 rows)
Saved final batch 3 (391 rows)
Saved batch 3 (1000 rows)
Saved batch 4 (1000 rows)
Saved batch 5 (1000 rows)
Saved batch 6 (1000 rows)
Saved batch 7 (1000 rows)
Saved batch 8 (1000 rows)
Saved batch 9 (1000 rows)
Saved batch 10 (1000 rows)
Saved batch 11 (1000 rows)
Saved batch 12 (1000 rows)
Saved batch 13 (1000 rows)
Saved batch 14 (1000 rows)
Saved batch 15 (1000 rows)
Saved batch 16 (1000 rows)
Saved batch 17 (1000 rows)
Saved batch 18 (1000 rows)
Saved batch 19 (1000 rows)
Saved batch 20 (1000 rows)
Saved batch 21 (1000 rows)
Saved batch 22 (1000 rows)
Saved batch 23 (1000 rows)
Saved b

In [11]:
combined_cleaned_df = dd.concat(list(cleaned_all_categories.values()))


## EDA

In [12]:
import matplotlib.pyplot as plt
import seaborn as sns
import matplotlib.ticker as ticker
import numpy as np

In [13]:
# Gets first few rows from first few partitions
preview = combined_cleaned_df.head(1000, compute=True)
print(preview)


2025-04-22 21:38:17,075 - distributed.protocol.core - CRITICAL - Failed to deserialize
Traceback (most recent call last):
  File "c:\Users\maian\anaconda3\Lib\site-packages\distributed\protocol\core.py", line 175, in loads
    return msgpack.loads(
           ^^^^^^^^^^^^^^
  File "c:\Users\maian\anaconda3\Lib\site-packages\msgpack\fallback.py", line 136, in unpackb
    raise ExtraData(ret, unpacker._get_extradata())
msgpack.exceptions.ExtraData: unpack(b) received extra data.
2025-04-22 21:38:17,083 - distributed.core - ERROR - Exception while handling op register-client
Traceback (most recent call last):
  File "c:\Users\maian\anaconda3\Lib\site-packages\distributed\core.py", line 831, in _handle_comm
    result = await result
             ^^^^^^^^^^^^
  File "c:\Users\maian\anaconda3\Lib\site-packages\distributed\scheduler.py", line 5902, in add_client
    await self.handle_stream(comm=comm, extra={"client": client})
  File "c:\Users\maian\anaconda3\Lib\site-packages\distributed\cor

FutureCancelledError: ('getitem-fused-getitem-b1e8c4efb2c11dfd393346fe0b7b0702', 0) cancelled for reason: scheduler-connection-lost.
Client lost the connection to the scheduler. Please check your connection and re-run your work.

In [14]:
import matplotlib.pyplot as plt

sample = combined_cleaned_df.sample(frac=0.01, random_state=42).persist()
small_df = sample.compute()

small_df["rating"].value_counts().sort_index().plot(kind="bar")
plt.xlabel("Star Rating")
plt.ylabel("Number of Reviews")
plt.title("Sampled Distribution of Star Ratings")
plt.show()


2025-04-22 21:40:02,247 - distributed.protocol.core - CRITICAL - Failed to deserialize
Traceback (most recent call last):
  File "c:\Users\maian\anaconda3\Lib\site-packages\distributed\protocol\core.py", line 175, in loads
    return msgpack.loads(
           ^^^^^^^^^^^^^^
  File "c:\Users\maian\anaconda3\Lib\site-packages\msgpack\fallback.py", line 136, in unpackb
    raise ExtraData(ret, unpacker._get_extradata())
msgpack.exceptions.ExtraData: unpack(b) received extra data.
2025-04-22 21:40:02,248 - distributed.core - ERROR - Exception while handling op register-client
Traceback (most recent call last):
  File "c:\Users\maian\anaconda3\Lib\site-packages\distributed\core.py", line 831, in _handle_comm
    result = await result
             ^^^^^^^^^^^^
  File "c:\Users\maian\anaconda3\Lib\site-packages\distributed\scheduler.py", line 5902, in add_client
    await self.handle_stream(comm=comm, extra={"client": client})
  File "c:\Users\maian\anaconda3\Lib\site-packages\distributed\cor

FutureCancelledError: ('repartitiontofewer-104a9870ec0d53945563bc2fec569a49', 0) cancelled for reason: scheduler-connection-lost.
Client lost the connection to the scheduler. Please check your connection and re-run your work.

In [15]:
# Safer approach: extract necessary columns only
if combined_cleaned_df is not None:
    print("Running Dask-compatible EDA...")

    small_df = combined_cleaned_df[["rating"]]  # only the needed column
    rating_counts = small_df["rating"].value_counts().compute().sort_index()

    # Only use matplotlib *after* computing
    import matplotlib.pyplot as plt
    rating_counts.plot(kind="bar")
    plt.xlabel("Star Rating")
    plt.ylabel("Number of Reviews")
    plt.title("Distribution of Star Ratings")
    plt.show()


2025-04-22 21:40:08,244 - distributed.protocol.core - CRITICAL - Failed to deserialize
Traceback (most recent call last):
  File "c:\Users\maian\anaconda3\Lib\site-packages\distributed\protocol\core.py", line 175, in loads
    return msgpack.loads(
           ^^^^^^^^^^^^^^
  File "c:\Users\maian\anaconda3\Lib\site-packages\msgpack\fallback.py", line 136, in unpackb
    raise ExtraData(ret, unpacker._get_extradata())
msgpack.exceptions.ExtraData: unpack(b) received extra data.
2025-04-22 21:40:08,248 - distributed.core - ERROR - Exception while handling op register-client
Traceback (most recent call last):
  File "c:\Users\maian\anaconda3\Lib\site-packages\distributed\core.py", line 831, in _handle_comm
    result = await result
             ^^^^^^^^^^^^
  File "c:\Users\maian\anaconda3\Lib\site-packages\distributed\scheduler.py", line 5902, in add_client
    await self.handle_stream(comm=comm, extra={"client": client})
  File "c:\Users\maian\anaconda3\Lib\site-packages\distributed\cor

Running Dask-compatible EDA...


FutureCancelledError: ('repartitiontofewer-b05e27d13310221815e546959f402372', 0) cancelled for reason: scheduler-connection-lost.
Client lost the connection to the scheduler. Please check your connection and re-run your work.

In [16]:
if combined_cleaned_df is not None:
    top_categories = combined_cleaned_df["main_category"].value_counts().compute().head(10)
    top_categories.plot(kind="bar")
    plt.xlabel("Main Category")
    plt.ylabel("Review Count")
    plt.title("Top 10 Categories by Review Count")
    plt.xticks(rotation=45)
    plt.show()

2025-04-22 21:40:16,161 - distributed.protocol.core - CRITICAL - Failed to deserialize
Traceback (most recent call last):
  File "c:\Users\maian\anaconda3\Lib\site-packages\distributed\protocol\core.py", line 175, in loads
    return msgpack.loads(
           ^^^^^^^^^^^^^^
  File "c:\Users\maian\anaconda3\Lib\site-packages\msgpack\fallback.py", line 136, in unpackb
    raise ExtraData(ret, unpacker._get_extradata())
msgpack.exceptions.ExtraData: unpack(b) received extra data.
2025-04-22 21:40:16,163 - distributed.core - ERROR - Exception while handling op register-client
Traceback (most recent call last):
  File "c:\Users\maian\anaconda3\Lib\site-packages\distributed\core.py", line 831, in _handle_comm
    result = await result
             ^^^^^^^^^^^^
  File "c:\Users\maian\anaconda3\Lib\site-packages\distributed\scheduler.py", line 5902, in add_client
    await self.handle_stream(comm=comm, extra={"client": client})
  File "c:\Users\maian\anaconda3\Lib\site-packages\distributed\cor

FutureCancelledError: ('repartitiontofewer-02908ff2be5cb50a598f52e3e6fcf656', 0) cancelled for reason: scheduler-connection-lost.
Client lost the connection to the scheduler. Please check your connection and re-run your work.

In [None]:
if "brand" in cleaned.columns:
    top_brands = combined_cleaned_df[combined_cleaned_df["brand"] != "Unknown"]["brand"].value_counts().compute().head(10)
    top_brands.plot(kind="bar")
    plt.xlabel("Brand")
    plt.ylabel("Review Count")
    plt.title("Top 10 Brands by Review Count")
    plt.xticks(rotation=45)
    plt.show()

In [None]:
if "year" in combined_cleaned_df.columns:
    yearly_avg = combined_cleaned_df.groupby("year")["rating"].mean().compute()
    yearly_avg.plot(kind="line", marker='o')
    plt.xlabel("Year")
    plt.ylabel("Average Rating")
    plt.title("Average Rating Over Time")
    plt.grid(True)
    plt.show()

In [None]:
if "review_length" in combined_cleaned_df.columns:
    corr = combined_cleaned_df[["review_length", "rating"]].corr().compute()
    print(f"Pearson correlation between review length and rating: {corr.loc['review_length', 'rating']:.4f}")