<a href="https://colab.research.google.com/github/EthanGaoZhiyuan/ScaleStyle/blob/feat%2Fphase2-data-pipeline/data-pipeline/notebooks/H%26M_RecSys.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
# ==============================================================================
# DATA DOWNLOAD & INITIAL ENVIRONMENT SETUP (COLAB RUNNER)
# ------------------------------------------------------------------------------
# Purpose: Manages the secure download and extraction of the H&M dataset.
# Assisted by: Gemini (AI) for Colab file system and CLI integration.
# ==============================================================================

import os
import zipfile
from pathlib import Path
from google.colab import files, drive

# --- Configuration ---
# We define constants at the top for better maintainability (Senior Engineering practice)
COMPETITION_NAME = "h-and-m-personalized-fashion-recommendations"
DATA_DIR = Path("./hm_data")
DRIVE_MOUNT_PATH = Path("/content/drive")
DRIVE_PROJECT_PATH = DRIVE_MOUNT_PATH / "MyDrive/ScaleStyle_Project/data"


def setup_kaggle_auth():
    """
    Sets up Kaggle API authentication by asking the user to upload kaggle.json.
    Moves the file to the correct location and sets permissions.
    """
    kaggle_dir = Path.home() / ".kaggle"
    kaggle_json_path = kaggle_dir / "kaggle.json"

    if kaggle_json_path.exists():
        print(f"‚úÖ Kaggle authentication found at {kaggle_json_path}")
        return

    print("üîπ Please upload your 'kaggle.json' file now...")
    uploaded = files.upload()

    if "kaggle.json" not in uploaded:
        print("‚ùå Error: kaggle.json was not uploaded.")
        return

    # Create .kaggle directory and move file
    kaggle_dir.mkdir(parents=True, exist_ok=True)
    os.rename("kaggle.json", kaggle_json_path)

    # Security: Restrict file permissions (Linux command)
    os.chmod(kaggle_json_path, 0o600)
    print("‚úÖ Kaggle authentication setup complete.")


def download_and_extract(use_drive_cache: bool = True):
    """
    Downloads the dataset from Kaggle.

    Args:
        use_drive_cache (bool): If True, mounts Google Drive to persist the raw zip file
                                so you don't have to download 30GB every time.
    """
    DATA_DIR.mkdir(parents=True, exist_ok=True)

    # Option A: Use Google Drive to cache the huge zip file (Recommended)
    if use_drive_cache:
        print("üîπ Mounting Google Drive for persistent storage...")
        drive.mount(str(DRIVE_MOUNT_PATH))
        DRIVE_PROJECT_PATH.mkdir(parents=True, exist_ok=True)

        zip_path = DRIVE_PROJECT_PATH / f"{COMPETITION_NAME}.zip"

        # Check if we already downloaded it to Drive
        if not zip_path.exists():
            print(f"üîπ Downloading dataset to Google Drive ({zip_path})... this may take a while.")
            # Use Kaggle CLI to download to specific path
            !kaggle competitions download -c {COMPETITION_NAME} -p {DRIVE_PROJECT_PATH}
        else:
            print(f"‚úÖ Found cached dataset in Drive: {zip_path}")

    # Option B: Direct download to Colab (Ephemeral)
    else:
        print("üîπ Downloading dataset directly to Colab (Ephemeral storage)...")
        !kaggle competitions download -c {COMPETITION_NAME}
        zip_path = Path(f"{COMPETITION_NAME}.zip")

    # --- Extraction Phase ---
    print("üîπ Extracting core CSV files (Skipping images for now to save space)...")

    # We use the system 'unzip' command which is faster than Python's zipfile for large files
    # We explicitly ONLY extract the CSVs first. The images are too large (29GB+).
    # We will handle image extraction later in the pipeline when needed.
    zip_source = str(zip_path)
    target_dir = str(DATA_DIR)

    !unzip -q -o "{zip_source}" "articles.csv" "customers.csv" "transactions_train.csv" -d "{target_dir}"

    print(f"‚úÖ Extraction complete! Data is available at: {DATA_DIR}")
    print(f"üìÇ Files: {os.listdir(DATA_DIR)}")


# --- Execution ---
# if __name__ == "__main__":
#     setup_kaggle_auth()

#     # Set to True to save the zip to Drive (saves bandwidth on restarts)
#     # Set to False for a quick, one-off test
#     download_and_extract(use_drive_cache=True)

In [2]:
# ==============================================================================
# DATA LOADER & SETUP (OPTIMIZED FOR GOOGLE DRIVE)
# ------------------------------------------------------------------------------
# Purpose: Mounts Google Drive and loads the H&M dataset.
#          Skips download if data already exists in Drive to save time.
# ==============================================================================
import os
from pathlib import Path

# --- Configuration ---
COMPETITION_NAME = "h-and-m-personalized-fashion-recommendations"
# This is where we want the data to be available for our Spark code
LOCAL_DATA_DIR = Path("/content/hm_data")

# This is where the persistent data lives in your Google Drive
# NOTE: Ensure this path matches where you saved it last time!
DRIVE_MOUNT_POINT = "/content/drive"
DRIVE_PROJECT_PATH = Path(DRIVE_MOUNT_POINT) / "MyDrive/ScaleStyle_Project/data"
ZIP_FILE_PATH = DRIVE_PROJECT_PATH / f"{COMPETITION_NAME}.zip"


def setup_data_from_drive():
    """
    Mounts Google Drive and prepares the dataset.
    1. Mounts Drive.
    2. Checks if the ZIP file exists in Drive.
    3. Extracts CSVs to the local Colab environment for fast access.
    """

    # 1. Mount Google Drive
    if not os.path.exists(DRIVE_MOUNT_POINT):
        print("üîπ Mounting Google Drive...")
        drive.mount(DRIVE_MOUNT_POINT)
    else:
        print("‚úÖ Google Drive is already mounted.")

    # 2. Check for the dataset in Drive
    if not ZIP_FILE_PATH.exists():
        print(f"‚ùå Error: Dataset not found at {ZIP_FILE_PATH}")
        print("   Did you save it to a different folder last time?")
        print("   If this is a fresh start, you may need to run the download script once.")
        return

    print(f"‚úÖ Found cached dataset in Drive: {ZIP_FILE_PATH}")

    # 3. Extract to local environment (Faster IO than reading from Drive directly)
    # We only extract if the target directory is empty or missing
    if not LOCAL_DATA_DIR.exists():
        print("üîπ Extracting data to local Colab environment (this improves speed)...")
        LOCAL_DATA_DIR.mkdir(parents=True, exist_ok=True)

        # Unzip command is faster than Python zipfile
        zip_source = str(ZIP_FILE_PATH)
        target_dir = str(LOCAL_DATA_DIR)

        # Extract specific CSVs to save space/time
        !unzip -q -o "{zip_source}" "articles.csv" "customers.csv" "transactions_train.csv" -d "{target_dir}"

        print(f"‚úÖ Extraction complete! Data is ready at: {LOCAL_DATA_DIR}")
    else:
        print(f"‚úÖ Data already extracted at {LOCAL_DATA_DIR}. Skipping extraction.")

    print(f"üìÇ Available files: {os.listdir(LOCAL_DATA_DIR)}")


# --- Execution ---
if __name__ == "__main__":
    setup_data_from_drive()

‚úÖ Google Drive is already mounted.
‚úÖ Found cached dataset in Drive: /content/drive/MyDrive/ScaleStyle_Project/data/h-and-m-personalized-fashion-recommendations.zip
‚úÖ Data already extracted at /content/hm_data. Skipping extraction.
üìÇ Available files: ['articles.csv', 'transactions_train.csv', 'customers.csv']


In [3]:
from google.colab import userdata

# 1. Securely retrieve Token (will not be displayed on screen)
try:
    token = userdata.get("GITHUB_TOKEN")
    print("‚úÖ Token retrieved successfully")
except Exception:
    print("‚ùå Token not found. Please check the Secrets panel settings on the left")

# 2. Configure repository information
username = "EthanGaoZhiyuan"
repo = "ScaleStyle"
branch = "feat/phase1-data-pipeline"  # Specify the target branch

# 3. Construct HTTPS URL with Token (Token used for authentication)
# Format: https://token@github.com/username/repo.git
clone_url = f"https://{token}@github.com/{username}/{repo}.git"

# 4. Execute clone command
# Use the -b flag to directly clone a specific branch
!git clone -b main {clone_url}

# 5. Verification
if os.path.exists(repo):
    print(f"\nüéâ Code downloaded to: {repo}")
    !ls {repo}
else:
    print("\n‚ùå Clone failed. Please check Token permissions or repository existence")

‚úÖ Token retrieved successfully
fatal: destination path 'ScaleStyle' already exists and is not an empty directory.

üéâ Code downloaded to: ScaleStyle
data-pipeline	    docs	     inference-service	README.md
docker-compose.yml  gateway-service  infrastructure


In [4]:
import sys
import os

# 1. Change directory to the data-pipeline folder to access requirements.txt
# This ensures pip installs the exact versions specified in your repo.
repo_path = "/content/ScaleStyle/data-pipeline"
os.chdir(repo_path)

print(f"üìÇ Current Working Directory: {os.getcwd()}")

# 2. Install dependencies
# Using -q to keep the output clean.
print("‚¨áÔ∏è Installing dependencies from requirements.txt...")
!pip install -q -r requirements.txt

# 3. Add the project root to system path
# This allows Python to recognize 'src' as a module so we can do:
# "from src.feature_engineering import ..."
if repo_path not in sys.path:
    sys.path.append(repo_path)
    print(f"‚úÖ Added {repo_path} to system path.")

print("‚úÖ Environment setup complete.")

üìÇ Current Working Directory: /content/ScaleStyle/data-pipeline
‚¨áÔ∏è Installing dependencies from requirements.txt...
‚úÖ Added /content/ScaleStyle/data-pipeline to system path.
‚úÖ Environment setup complete.


In [None]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F

# Import the custom function you wrote in VS Code
from src.feature_engineering import preprocess_customers

# 1. Initialize Spark Session
# We set memory to 4g to handle the large H&M dataset smoothly in Colab.
spark = (
    SparkSession.builder.appName("HM_Feature_Engineering_Colab")
    .config("spark.driver.memory", "4g")
    .getOrCreate()
)

print("‚úÖ Spark Session initialized.")

# 2. Define File Paths
# The data was extracted to /content/hm_data/ in the previous download step.
data_dir = "/content/hm_data"
customers_file = os.path.join(data_dir, "customers.csv")

# 3. Load Raw Data
if not os.path.exists(customers_file):
    print(f"‚ùå Error: File not found at {customers_file}. Please run the download script first.")
else:
    print(f"üìÇ Loading data from: {customers_file}")

    # Read the CSV with header and infer schema
    df_customers = spark.read.csv(customers_file, header=True, inferSchema=True)

    # Display initial stats
    print(f"üìä Total Rows: {df_customers.count()}")
    print("üîπ Sample of raw data (Top 5):")
    df_customers.show(5)

    # 4. Apply Feature Engineering Logic
    # This calls the 'preprocess_customers' function from your repo
    print("‚öôÔ∏è Running 'preprocess_customers' logic...")
    df_processed = preprocess_customers(df_customers)

    # 5. Validate Results
    print("‚úÖ Processing complete! Showing result sample:")
    df_processed.show(5)

    # Verification: Check if Null values in 'age' are gone (Logic check)
    null_age_count = df_processed.filter(F.col("age").isNull()).count()
    print(f"üîç Validation: Count of NULLs in 'age' column after processing: {null_age_count}")

    if null_age_count == 0:
        print("üéâ SUCCESS: Null imputation logic worked correctly on the full dataset.")
    else:
        print("‚ö†Ô∏è WARNING: There are still null values. Check the logic.")

In [None]:
# Save Customers
cust_output_path = "/content/drive/MyDrive/ScaleStyle_Project/data/processed/customers_parquet"
print(f"üíæ Saving customers to: {cust_output_path}")
df_processed.write.mode("overwrite").parquet(cust_output_path)

In [None]:
# 1. update code
%cd /content/ScaleStyle
!git pull
%cd /content/ScaleStyle/data-pipeline

In [None]:
from src.feature_engineering import preprocess_articles
from pyspark.sql import functions as F
import os

# 1. Define Paths
data_dir = "/content/hm_data"
articles_file = os.path.join(data_dir, "articles.csv")

# 2. Load raw data
print(f"üìÇ Loading articles from: {articles_file}")
df_articles = spark.read.csv(articles_file, header=True, inferSchema=True)
print(f"   Original Count: {df_articles.count()}")

# 3. execute the preprocessing logic
print("‚öôÔ∏è Running preprocess_articles...")
df_articles_processed = preprocess_articles(df_articles)

# 4. validate results
print("‚úÖ Processing complete!")
df_articles_processed.printSchema()
df_articles_processed.show(5)

In [None]:
# 1. define output path
output_path = "/content/drive/MyDrive/ScaleStyle_Project/data/processed/articles_parquet"

# 2. save data
print(f"üíæ Saving to: {output_path}")
# coalesce(1) still suitable for this data size and makes viewing easier
df_articles_processed.coalesce(1).write.mode("overwrite").parquet(output_path)

print("üéâ Articles data saved successfully!")

In [None]:
%cd /content/ScaleStyle
!git pull

In [5]:
from src.feature_engineering import preprocess_transactions
from pyspark.sql import SparkSession
import os

# 1. Initialize Spark (Allocate sufficient memory)
spark = (
    SparkSession.builder.appName("HM_Transactions_ETL")
    .config("spark.driver.memory", "8g")
    .config("spark.executor.memory", "8g")
    .getOrCreate()
)

# 2. Define paths
data_dir = "/content/hm_data"
transactions_file = os.path.join(data_dir, "transactions_train.csv")
output_path = "/content/drive/MyDrive/ScaleStyle_Project/data/processed/transactions_parquet"

# 3. Load raw data (31M+ rows)
print(f"üìÇ Loading transactions from: {transactions_file}")
# This step might take a while
df_trans = spark.read.csv(transactions_file, header=True, inferSchema=True)
print(f"   Original Count: {df_trans.count()}")

# 4. Run your core logic
print("‚öôÔ∏è Running preprocess_transactions (Aggregation & Scaling)...")
# Note: This step involves groupBy and Scaling, which is computationally expensive. Please wait patiently.
df_trans_processed = preprocess_transactions(df_trans)

# 5. Validate results
print("‚úÖ Processing complete! Showing sample:")
df_trans_processed.printSchema()
df_trans_processed.show(5)

# 6. Save data
print(f"üíæ Saving to: {output_path}")
# Since the data volume is large, do not use coalesce(1) here; let Spark write multiple files in parallel to improve speed.
df_trans_processed.write.mode("overwrite").parquet(output_path)

print("üéâ Transactions ETL Finished!")

üìÇ Loading transactions from: /content/hm_data/transactions_train.csv
   Original Count: 31788324
‚öôÔ∏è Running preprocess_transactions (Aggregation & Scaling)...
‚úÖ Processing complete! Showing sample:
root
 |-- customer_id: string (nullable = true)
 |-- article_id: integer (nullable = true)
 |-- total_decay_weight: double (nullable = true)
 |-- article_purchase_count: long (nullable = false)
 |-- last_purchase_date: date (nullable = true)
 |-- price: double (nullable = true)
 |-- sales_channel_id: integer (nullable = true)

+--------------------+----------+-------------------+----------------------+------------------+--------------------+----------------+
|         customer_id|article_id| total_decay_weight|article_purchase_count|last_purchase_date|               price|sales_channel_id|
+--------------------+----------+-------------------+----------------------+------------------+--------------------+----------------+
|016d8f0519d9e0572...| 527687006|0.42895454691819734|         

In [6]:
%cd /content/ScaleStyle
!git pull
%cd /content/ScaleStyle/data-pipeline


from pyspark.sql import SparkSession
from src.feature_engineering import merge_datasets
import os

# 1. Init Spark (Max Memory)
spark = (
    SparkSession.builder.appName("HM_Data_Merge")
    .config("spark.driver.memory", "12g")
    .config("spark.executor.memory", "12g")
    .config("spark.sql.shuffle.partitions", "200")
    .getOrCreate()
)

# 2. Paths
base_path = "/content/drive/MyDrive/ScaleStyle_Project/data/processed"
trans_path = os.path.join(base_path, "transactions_parquet")
cust_path = os.path.join(base_path, "customers_parquet")
art_path = os.path.join(base_path, "articles_parquet")
output_path = os.path.join(base_path, "train_data_parquet")

# 3. Load Data
print("‚è≥ Loading Parquet files...")
df_trans = spark.read.parquet(trans_path)
df_cust = spark.read.parquet(cust_path)
df_art = spark.read.parquet(art_path)

print(f"   Trans Count: {df_trans.count()}")

# 4. Merge
print("‚öôÔ∏è Merging datasets...")
df_train = merge_datasets(df_trans, df_cust, df_art)

# 5. Sanity Check
final_count = df_train.count()
print(f"‚úÖ Merge Complete! Final Row Count: {final_count}")

if final_count != df_trans.count():
    print("‚ö†Ô∏è WARNING: Row count changed! Check for duplicates in Cust/Art tables.")
else:
    print("üéâ Integrity Check Passed: Row counts match.")

# 6. Save
print(f"üíæ Saving final train data to: {output_path}")

df_train.write.mode("overwrite").parquet(output_path)

print("üöÄ MISSION COMPLETE: Data Pipeline Finished!")

/content/ScaleStyle
Already up to date.
/content/ScaleStyle/data-pipeline
‚è≥ Loading Parquet files...
   Trans Count: 27306439
‚öôÔ∏è Merging datasets...
‚úÖ Merge Complete! Final Row Count: 27306439
üéâ Integrity Check Passed: Row counts match.
üíæ Saving final train data to: /content/drive/MyDrive/ScaleStyle_Project/data/processed/train_data_parquet
üöÄ MISSION COMPLETE: Data Pipeline Finished!


In [7]:
%cd /content/ScaleStyle
!git pull
%cd /content/ScaleStyle/data-pipeline

import importlib
import src.feature_engineering

importlib.reload(src.feature_engineering)
from src.feature_engineering import preprocess_transactions, merge_datasets

/content/ScaleStyle
Already up to date.
/content/ScaleStyle/data-pipeline


In [11]:
from src.feature_engineering import generate_negative_samples
from pyspark.sql import functions as F
from pyspark.sql.types import DateType, TimestampType

base_path = "/content/drive/MyDrive/ScaleStyle_Project/data/processed"
train_data_path = f"{base_path}/train_data_parquet"
cust_path = f"{base_path}/customers_parquet"
art_path = f"{base_path}/articles_parquet"
final_output_path = f"{base_path}/train_data_with_negatives_parquet"

# 2. Load data
print("‚è≥ Loading Phase 1 positive samples and feature tables...")
df_positives = spark.read.parquet(train_data_path)
df_cust = spark.read.parquet(cust_path)
df_art = spark.read.parquet(art_path)

# Ensure positive samples have label=1
if "label" not in df_positives.columns:
    df_positives = df_positives.withColumn("label", F.lit(1))

# 3. Generate Negative Samples
# ratio=4 means 1 positive : 4 negatives
print("‚öôÔ∏è Generating negative samples (Negative Sampling)...")
df_neg_ids = generate_negative_samples(df_positives, ratio=4)

# 4. Feature Backfill
print("‚öôÔ∏è Backfilling negative sample features...")
# Join customer and article features back to the negative IDs
df_neg_full = df_neg_ids.join(df_cust, on="customer_id", how="left") \
                        .join(df_art, on="article_id", how="left")

# Assign label=0 for negative samples
df_neg_full = df_neg_full.withColumn("label", F.lit(0))

# Identify columns present in Positives but missing in Negatives (e.g., price, date)
pos_cols = set(df_positives.columns)
neg_cols = set(df_neg_full.columns)
missing_cols = list(pos_cols - neg_cols)

print(f"üîß Detected missing columns in negative samples: {missing_cols}")
print("   Auto-filling missing columns (0 for numbers, 1970-01-01 for dates)...")

for col_name in missing_cols:
    # Get the data type from the positive dataset
    target_type = df_positives.schema[col_name].dataType

    # Check type to decide how to fill
    if isinstance(target_type, (DateType, TimestampType)):
        # For Dates: Use a dummy default date (Epoch)
        default_val = F.lit("1970-01-01").cast(target_type)
    else:
        # For Numbers (Int, Double, Long): Use 0
        default_val = F.lit(0).cast(target_type)

    df_neg_full = df_neg_full.withColumn(col_name, default_val)

# 5. Merge Data (Union)
# Ensure column order is identical for Union
common_columns = df_positives.columns
df_neg_final = df_neg_full.select(*common_columns)

print(f"üìä Stats: Positives {df_positives.count()} rows, Negatives {df_neg_final.count()} rows")

# Union
df_final_train = df_positives.unionByName(df_neg_final)

# 6. Save Final Result
print(f"üíæ Saving final training set to: {final_output_path}")
# Repartition to prevent OOM
df_final_train.repartition(100).write.mode("overwrite").parquet(final_output_path)

print("üöÄ Phase 2 Complete! Data saved successfully.")

‚è≥ Loading Phase 1 positive samples and feature tables...
‚öôÔ∏è Generating negative samples (Negative Sampling)...
‚öôÔ∏è Backfilling negative sample features...
üîß Detected missing columns in negative samples: ['last_purchase_date', 'article_purchase_count']
   Auto-filling missing columns (0 for numbers, 1970-01-01 for dates)...
üìä Stats: Positives 27306439 rows, Negatives 135467852 rows
üíæ Saving final training set to: /content/drive/MyDrive/ScaleStyle_Project/data/processed/train_data_with_negatives_parquet
üöÄ Phase 2 Complete! Data saved successfully.
