In [1]:
import requests
import zipfile
import io
import gzip
import json
import pandas as pd
import os
from abc import ABC, abstractmethod

class BasePipeline(ABC):
    """
    Abstract base class for dataset pipelines.
    Enforces a standard interface for downloading and loading data.
    """
    def __init__(self, save_dir="data"):
        self.save_dir = save_dir
        if not os.path.exists(save_dir):
            os.makedirs(save_dir)

    def _download_file(self, url):
        """Helper to download a file with progress indication."""
        print(f"Downloading from {url}...")
        try:
            response = requests.get(url, stream=True)
            response.raise_for_status()
            return response.content
        except requests.exceptions.RequestException as e:
            print(f"Error downloading {url}: {e}")
            return None

    @abstractmethod
    def load_data(self):
        """Downloads (if necessary) and returns the dataset as a DataFrame."""
        pass

class MovieLensPipeline(BasePipeline):
    """
    Pipeline for the MovieLens Latest Small Dataset.
    Standard benchmark for collaborative filtering.
    """
    URL = "https://files.grouplens.org/datasets/movielens/ml-latest-small.zip"

    def load_data(self):
        print("-" * 50)
        print("Starting MovieLens Pipeline...")

        content = self._download_file(self.URL)
        if not content:
            return None, None

        print("Extracting zip file in memory...")
        with zipfile.ZipFile(io.BytesIO(content)) as z:
            # We specifically want ratings.csv and movies.csv
            # The zip structure usually has a root folder like 'ml-latest-small/'

            # 1. Load Ratings
            print("Loading ratings...")
            with z.open("ml-latest-small/ratings.csv") as f:
                ratings_df = pd.read_csv(f)

            # 2. Load Metadata (Movies)
            print("Loading movie metadata...")
            with z.open("ml-latest-small/movies.csv") as f:
                movies_df = pd.read_csv(f)

        print(f"MovieLens Loaded: {len(ratings_df)} ratings, {len(movies_df)} movies.")
        return ratings_df, movies_df

class AmazonBeautyPipeline(BasePipeline):
    """
    Pipeline for Amazon Beauty (5-core) Dataset.
    '5-core' means all users and items have at least 5 reviews.
    Good for testing typically sparse e-commerce data.
    """
    # Switching to the Stanford SNAP mirror (2014 version) which is more stable
    # than the UCSD datarepo which frequently throws 404s.
    URL = "http://snap.stanford.edu/data/amazon/productGraph/categoryFiles/reviews_Beauty_5.json.gz"

    def load_data(self):
        print("-" * 50)
        print("Starting Amazon Beauty Pipeline...")

        # Define local path to cache the large file
        local_filename = os.path.join(self.save_dir, "Beauty_5.json.gz")

        if os.path.exists(local_filename):
            print(f"Found cached file at {local_filename}")
            content = open(local_filename, "rb").read()
        else:
            content = self._download_file(self.URL)
            if content:
                with open(local_filename, "wb") as f:
                    f.write(content)

        if not content:
            return None

        print("Parsing JSON-lines (this may take a moment)...")
        data = []
        # gzip decompression
        with gzip.open(io.BytesIO(content), 'rt', encoding='utf-8') as f:
            for i, line in enumerate(f):
                try:
                    record = json.loads(line)
                    # Print structure of first record for report
                    if i == 0:
                        print(f"\n[Raw JSON Structure] Sample keys in first record: {list(record.keys())}")
                    data.append(record)
                except json.JSONDecodeError:
                    continue

        df = pd.DataFrame(data)

        # Standardize columns for easier merging later
        # Keeping: reviewerID, asin (item ID), overall (rating), reviewText, unixReviewTime
        df = df[['reviewerID', 'asin', 'overall', 'reviewText', 'summary', 'unixReviewTime']]
        df.rename(columns={'overall': 'rating', 'unixReviewTime': 'timestamp'}, inplace=True)

        print(f"Amazon Beauty Loaded: {len(df)} reviews.")
        return df

def print_dataset_stats(df, name, file_type, user_col, item_col, rating_col, extra_info=None):
    """
    Generates a statistical report for a given dataframe.
    Includes file type, variable structure, and pipeline fit.
    """
    print(f"\n{'='*20} {name} Dataset Report {'='*20}")
    print(f"File Source Type:   {file_type}")

    print(f"\nData Structure (Variables):")
    print(f"{'Variable':<15} | {'Type':<10} | {'Sample Value'}")
    print("-" * 45)
    for col in df.columns:
        # Get a sample value (dropping NAs to be safe)
        sample = df[col].dropna().iloc[0] if not df[col].dropna().empty else "N/A"
        # Truncate long strings for display
        sample_str = str(sample)[:20] + "..." if len(str(sample)) > 20 else str(sample)
        print(f"{col:<15} | {str(df[col].dtype):<10} | {sample_str}")

    print(f"\nRecommender Pipeline Fit:")
    print(f"  This dataset structures the essential 'User-Item-Interaction' triplets:")
    print(f"  1. User Entity (Query):     '{user_col}' -> The ID of the customer/user.")
    print(f"  2. Item Entity (Candidate): '{item_col}' -> The ID of the product/movie.")
    print(f"  3. Target Label (Signal):   '{rating_col}' -> Explicit feedback (1-5) to train the model.")

    if extra_info:
        print(f"  4. Side Information:        {extra_info}")
        print("     (Useful for Hybrid Filtering or cold-start scenarios)")

    n_users = df[user_col].nunique()
    n_items = df[item_col].nunique()
    n_interactions = len(df)

    # Sparsity calculation: 1 - (interactions / (users * items))
    # This is critical for RecSys: high sparsity (>99%) often requires
    # specific techniques like Matrix Factorization or BPR.
    matrix_size = n_users * n_items
    sparsity = (1 - (n_interactions / matrix_size)) * 100 if matrix_size > 0 else 0

    print(f"\nStatistics:")
    print(f"  Total Interactions: {n_interactions:,}")
    print(f"  Unique Users:       {n_users:,}")
    print(f"  Unique Items:       {n_items:,}")
    print(f"  Matrix Sparsity:    {sparsity:.4f}%")

    print(f"\nRating Distribution:")
    print(df[rating_col].describe().round(2))
    print("=" * 60)

def run_pipelines():
    """
    Orchestrator to run both pipelines.
    """

    # 1. MovieLens
    ml_pipeline = MovieLensPipeline()
    ml_ratings, ml_movies = ml_pipeline.load_data()

    if ml_ratings is not None:
        print_dataset_stats(
            ml_ratings,
            "MovieLens Latest Small",
            "CSV (Zipped)",
            "userId",
            "movieId",
            "rating",
            extra_info="Time-stamp data allows for temporal splitting (Train/Test by time)."
        )

    # 2. Amazon
    amzn_pipeline = AmazonBeautyPipeline()
    amzn_df = amzn_pipeline.load_data()

    if amzn_df is not None:
        print_dataset_stats(
            amzn_df,
            "Amazon Beauty (5-core)",
            "JSON Lines (Gzipped)",
            "reviewerID",
            "asin",
            "rating",
            extra_info="Includes 'reviewText' for NLP and 'timestamp' for sequential/RL modeling.\n     'asin' is the Amazon Standard Identification Number (Product ID)."
        )

if __name__ == "__main__":
    try:
        run_pipelines()
        print("\nPipeline execution complete. Data is ready for the recommender model.")
    except Exception as e:
        print(f"\nAn error occurred: {e}")

--------------------------------------------------
Starting MovieLens Pipeline...
Downloading from https://files.grouplens.org/datasets/movielens/ml-latest-small.zip...
Extracting zip file in memory...
Loading ratings...
Loading movie metadata...
MovieLens Loaded: 100836 ratings, 9742 movies.

File Source Type:   CSV (Zipped)

Data Structure (Variables):
Variable        | Type       | Sample Value
---------------------------------------------
userId          | int64      | 1
movieId         | int64      | 1
rating          | float64    | 4.0
timestamp       | int64      | 964982703

Recommender Pipeline Fit:
  This dataset structures the essential 'User-Item-Interaction' triplets:
  1. User Entity (Query):     'userId' -> The ID of the customer/user.
  2. Item Entity (Candidate): 'movieId' -> The ID of the product/movie.
  3. Target Label (Signal):   'rating' -> Explicit feedback (1-5) to train the model.
  4. Side Information:        Time-stamp data allows for temporal splitting (Trai