# Part 2 - Data Cleaning and Preprocessing - a. Merge on parent asin

# Mounting Google Drive
This cell mounts Google Drive to the Google Colab environment, enabling access to the compressed dataset files stored in Google Drive. This step is necessary to retrieve the raw_review and raw_meta zip files for the categories.

In [None]:
from google.colab import drive
drive.mount('/content/drive')


Mounted at /content/drive


In [None]:
drive.mount("/content/drive", force_remount=True)

Mounted at /content/drive


# Extracting Dataset Files
This cell extracts the compressed review and metadata files for a single category (e.g., Beauty and Personal Care) from Google Drive into a local directory (/content/Amazon_Reviews_2023) in Google Colab. Using zipfile, it unzips the category-specific files (e.g., raw_review_Beauty_and_Personal_Care.zip and raw_meta_Beauty_and_Personal_Care.zip). Due to storage constraints, only one category is extracted at a time, with the category name manually updated for each run. After merging, the extracted raw files are deleted to free up space for the next category.

In [None]:
import zipfile
import os

# Define the Google Drive zip paths
meta_zip_path = '/content/drive/MyDrive/raw_meta_Beauty_and_Personal_Care.zip'
review_zip_path = '/content/drive/MyDrive/raw_review_Beauty_and_Personal_Care.zip'


extract_path = '/content/Amazon_Reviews_2023'

# Make sure the directory exists
os.makedirs(extract_path, exist_ok=True)

# Extract raw_meta_Books.zip
with zipfile.ZipFile(meta_zip_path, 'r') as zip_ref:
    zip_ref.extractall(extract_path)

# Extract raw_review_Books.zip
with zipfile.ZipFile(review_zip_path, 'r') as zip_ref:
    zip_ref.extractall(extract_path)

# Check extracted folders
print(os.listdir(extract_path))


['raw_review_Beauty_and_Personal_Care', 'raw_meta_Beauty_and_Personal_Care']


# Installing Required Packages
This cell installs the datasets library, which is used to load the extracted review and metadata datasets for the current category.

In [None]:
pip install datasets

Collecting datasets
  Downloading datasets-3.5.0-py3-none-any.whl.metadata (19 kB)
Collecting dill<0.3.9,>=0.3.0 (from datasets)
  Downloading dill-0.3.8-py3-none-any.whl.metadata (10 kB)
Collecting xxhash (from datasets)
  Downloading xxhash-3.5.0-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (12 kB)
Collecting multiprocess<0.70.17 (from datasets)
  Downloading multiprocess-0.70.16-py311-none-any.whl.metadata (7.2 kB)
Collecting fsspec<=2024.12.0,>=2023.1.0 (from fsspec[http]<=2024.12.0,>=2023.1.0->datasets)
  Downloading fsspec-2024.12.0-py3-none-any.whl.metadata (11 kB)
Downloading datasets-3.5.0-py3-none-any.whl (491 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m491.2/491.2 kB[0m [31m9.8 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading dill-0.3.8-py3-none-any.whl (116 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m116.3/116.3 kB[0m [31m9.5 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading fsspec-2024.12.0-py3-none-any.wh

# Merging Reviews with Metadata
This cell merges the review and metadata datasets for a single category (e.g., Beauty and Personal Care) on the parent_asin key. Due to storage constraints in Google Colab, categories are processed sequentially by manually updating the category name in the categories list. The code processes data in batches (20,000 rows) to manage memory, producing a single Parquet file (e.g., merged_Beauty_and_Personal_Care.parquet). After merging, the raw review and metadata files are deleted to free up storage for the next category. The process includes checkpointing for resuming interrupted merges, schema validation for consistency, and memory management to handle the large-scale dataset. Progress updates and a final summary report the number of records merged and the time taken.

In [None]:
import pandas as pd
from datasets import load_from_disk
from pathlib import Path
import os
import time
from datetime import timedelta
import pyarrow.parquet as pq
import pyarrow as pa
import gc

# Define paths for Colab
base_path = Path("/content/Amazon_Reviews_2023")
output_path = base_path / "merged"
output_path.mkdir(parents=True, exist_ok=True)

# Categories
categories = ["Beauty_and_Personal_Care"]

def merge_category(category, base_path, output_path, category_num, total_categories, batch_size=20000):
    """Merge one category's data in batches with progress, saving incrementally."""
    start_time = time.time()
    review_path = base_path / f"raw_review_{category}"
    meta_path = base_path / f"raw_meta_{category}"
    final_output = output_path / f"merged_{category}.parquet"
    checkpoint_file = output_path / f"checkpoint_{category}.txt"

    # Skip if already fully merged
    if final_output.exists() and not checkpoint_file.exists():
        print(f"[SKIP] {category} already merged (Category {category_num}/{total_categories})")
        df = pd.read_parquet(final_output)
        return len(df)

    try:
        # Initialize records
        records = 0

        # Load review and metadata datasets
        print(f"Loading reviews for {category}")
        review_dataset = load_from_disk(str(review_path))["full"]
        print(f"Loading metadata for {category}")
        meta_dataset = load_from_disk(str(meta_path))["full"]

        # Convert to DataFrames
        review_df = review_dataset.to_pandas()
        meta_df = meta_dataset.to_pandas().set_index("parent_asin")

        # Delete datasets to free memory
        del review_dataset
        del meta_dataset
        gc.collect()

        # Determine starting point for resumption
        start_idx = 0
        if final_output.exists():
            existing_df = pd.read_parquet(final_output)
            records = len(existing_df)
            if checkpoint_file.exists():
                with open(checkpoint_file, 'r') as f:
                    start_idx = int(f.read().strip())
            print(f"Resuming {category} from row {start_idx:,} ({records:,} already merged)")

        # Initialize Parquet writer
        writer = None
        schema = None
        if final_output.exists():
            schema = pq.read_schema(final_output)
            writer = pq.ParquetWriter(final_output, schema, compression='snappy')

        # Process reviews in batches
        print(f"Merging {category} on parent_asin (batch size: {batch_size:,})")
        total_rows = len(review_df)
        for i in range(start_idx, total_rows, batch_size):
            batch_df = review_df[i:i + batch_size]
            merged_batch = batch_df.merge(meta_df, on="parent_asin", how="inner")

            if not merged_batch.empty:
                # Ensure all columns are nullable to handle missing data
                for col in merged_batch.columns:
                    if merged_batch[col].dtype == 'object':
                        merged_batch[col] = merged_batch[col].astype('string').fillna('')
                    elif merged_batch[col].dtype.name.startswith('list'):
                        merged_batch[col] = merged_batch[col].apply(lambda x: x if isinstance(x, list) else [])

                table = pa.Table.from_pandas(merged_batch, preserve_index=False)

                # For first batch of new file, set schema
                if writer is None:
                    schema = table.schema
                    writer = pq.ParquetWriter(final_output, schema, compression='snappy')

                # Check schema compatibility
                try:
                    if table.schema != schema:
                        print(f"Schema mismatch in batch {i:,}-{min(i + batch_size, total_rows):,}")
                        print("Expected schema:", schema)
                        print("Actual schema:", table.schema)
                        raise ValueError(f"Schema mismatch for batch {i:,}-{min(i + batch_size, total_rows):,}")
                    writer.write_table(table)
                except Exception as e:
                    print(f"Failed to write batch {i:,}-{min(i + batch_size, total_rows):,}: {str(e)}")
                    raise

                records += len(merged_batch)

                # Save checkpoint
                with open(checkpoint_file, 'w') as f:
                    f.write(str(i + len(batch_df)))

                print(f"Processed batch {i:,}-{min(i + batch_size, total_rows):,} of {total_rows:,} rows ({records:,} merged)")

        # Close Parquet writer
        if writer:
            writer.close()

        # Clean up checkpoint
        if checkpoint_file.exists():
            os.remove(checkpoint_file)

        elapsed_time = time.time() - start_time
        print(f"[DONE] {category} merged - {records:,} records in {elapsed_time:.2f} seconds "
              f"(Category {category_num}/{total_categories})")

        return records

    except Exception as e:
        print(f"[ERROR] Failed to process {category}: {str(e)} "
              f"(Category {category_num}/{total_categories})")
        return records
    finally:
        # Free memory
        if 'review_df' in locals():
            del review_df
        if 'meta_df' in locals():
            del meta_df
        if 'writer' in locals() and writer:
            writer.close()
        gc.collect()

# Process all categories with progress updates
total_categories = len(categories)
total_records = 0
processed_categories = 0
total_time = 0

print(f"Starting merge of {total_categories} categories...")
for i, category in enumerate(categories, 1):
    start_time = time.time()
    records = merge_category(category, base_path, output_path, i, total_categories)
    total_records += records
    if records > 0:
        processed_categories += 1
        total_time += time.time() - start_time

    # Progress update
    print(f"\nProgress: {processed_categories}/{total_categories} categories processed")
    print(f"Total records merged so far: {total_records:,}")
    print("-" * 50)

# Final summary
print(f"\n🎉 Merge complete!")
print(f"Total categories processed: {processed_categories}/{total_categories}")
print(f"Total merged records: {total_records:,}")
print(f"Total time taken: {str(timedelta(seconds=int(total_time)))}")
print(f"Output saved to: {output_path}")



Starting merge of 1 categories...
Loading reviews for Beauty_and_Personal_Care
Loading metadata for Beauty_and_Personal_Care
Merging Beauty_and_Personal_Care on parent_asin (batch size: 20,000)
Processed batch 0-20,000 of 23,911,390 rows (20,000 merged)
Processed batch 20,000-40,000 of 23,911,390 rows (40,000 merged)
Processed batch 40,000-60,000 of 23,911,390 rows (60,000 merged)
Processed batch 60,000-80,000 of 23,911,390 rows (80,000 merged)
Processed batch 80,000-100,000 of 23,911,390 rows (100,000 merged)
Processed batch 100,000-120,000 of 23,911,390 rows (120,000 merged)
Processed batch 120,000-140,000 of 23,911,390 rows (140,000 merged)
Processed batch 140,000-160,000 of 23,911,390 rows (160,000 merged)
Processed batch 160,000-180,000 of 23,911,390 rows (180,000 merged)
Processed batch 180,000-200,000 of 23,911,390 rows (200,000 merged)
Processed batch 200,000-220,000 of 23,911,390 rows (220,000 merged)
Processed batch 220,000-240,000 of 23,911,390 rows (240,000 merged)
Processe