# Part 2B2 Training Data Preparation

In [1]:
# INSTALL AND IMPORT DEPENDENCIES
import subprocess
import sys

subprocess.run([sys.executable, "-m", "pip", "install", "polars==0.20.31"], check=True)

import polars as pl
import pandas as pd
import numpy as np
import pickle
import json
import os
import gc
import time
import psutil
from datetime import datetime, timedelta
from typing import Dict, List, Tuple, Set, Optional
from collections import defaultdict, Counter
import warnings
warnings.filterwarnings('ignore')

# MOUNT GOOGLE DRIVE
from google.colab import drive
drive.mount('/content/drive', force_remount=True)

Mounted at /content/drive


## ENHANCED CONFIGURATION - FIXED PARAMETERS

In [2]:
class Config:
    DATA_PATH = '/content/drive/MyDrive/Colab Notebooks/CML/Assignment 1/content/otto-data'
    OUTPUT_PATH = '/content/drive/MyDrive/Colab Notebooks/CML/Assignment 1/content/otto-output'

    # FIXED: Increased training data parameters for better positive samples
    VALIDATION_DAYS = 3                    # Increased from 2
    MAX_VALIDATION_SESSIONS = 25000        # Increased from 10000
    VALIDATION_CANDIDATES_PER_TYPE = 100   # Increased from 40
    MIN_CANDIDATES_PER_SESSION = 50        # New: minimum candidates required

    # FIXED: Improved memory management - less aggressive
    CHUNK_SIZE = 2000                      # Increased from 500
    LARGE_CHUNK_SIZE = 5000               # Increased from 2000
    EMERGENCY_CHUNK_SIZE = 1000           # Increased from 100

    # FIXED: Balanced memory monitoring
    MEMORY_THRESHOLD = 0.75               # Less aggressive (was 0.70)
    HIGH_MEMORY_THRESHOLD = 0.85          # Less aggressive (was 0.80)
    CRITICAL_MEMORY_THRESHOLD = 0.90      # Less aggressive (was 0.85)
    EMERGENCY_THRESHOLD = 0.95            # Less aggressive (was 0.90)

    # FIXED: Less frequent but more effective cleanup
    GC_FREQUENCY = 500                    # Less frequent (was 250)
    MEMORY_CHECK_FREQUENCY = 10           # Less frequent (was 5)

    # Enhanced candidate generation parameters
    MAX_CLICK_CANDIDATES = 30             # Increased from 20
    MAX_CART_CANDIDATES = 25              # Increased from 15
    MAX_ORDER_CANDIDATES = 25             # Increased from 15

    # Diversity parameters
    MIN_ITEM_DIVERSITY = 100              # Minimum unique items in validation
    MIN_POSITIVE_RATE = 0.005             # Target: 0.5% minimum positive rate

config = Config()

# Configure Polars for better performance
pl.enable_string_cache()
pl.Config.set_streaming_chunk_size(config.CHUNK_SIZE)
pl.Config.set_fmt_str_lengths(50)
pl.Config.set_tbl_rows(10)

## ENHANCED UTILITY FUNCTIONS

In [3]:
def get_memory_usage():
    """Get current memory usage in GB"""
    return psutil.Process().memory_info().rss / (1024**3)

def log(message: str):
    """Enhanced logging with memory tracking"""
    memory_gb = get_memory_usage()
    memory_pct = (memory_gb / 51.0) * 100  # Assuming 51GB total RAM
    status = "OK" if memory_pct < 75 else "HIGH" if memory_pct < 85 else "CRITICAL"
    timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
    print(f"[{timestamp}] [MEM: {memory_gb:.1f}GB/{memory_pct:.1f}%/{status}] {message}")

def force_garbage_collection():
    """Enhanced garbage collection"""
    for _ in range(3):
        gc.collect()
    time.sleep(0.2)

def check_memory_usage(operation_name: str, force_cleanup: bool = False):
    """Monitor memory usage with intelligent cleanup"""
    memory_gb = get_memory_usage()
    memory_pct = (memory_gb / 51.0) * 100

    if memory_pct > config.EMERGENCY_THRESHOLD or force_cleanup:
        log(f"Emergency memory cleanup during {operation_name}: {memory_gb:.1f}GB ({memory_pct:.1f}%)")
        force_garbage_collection()
        new_memory = get_memory_usage()
        log(f"Memory after cleanup: {new_memory:.1f}GB")
        return new_memory
    elif memory_pct > config.CRITICAL_MEMORY_THRESHOLD:
        log(f"High memory usage during {operation_name}: {memory_gb:.1f}GB ({memory_pct:.1f}%)")
        force_garbage_collection()
        return get_memory_usage()
    return memory_gb

## ENHANCED INPUT VALIDATION AND LOADING

In [4]:
def validate_and_load_inputs():
    """
    Enhanced input validation and loading with better error handling
    """
    log("Validating input files...")

    # Check required files
    required_files = {
        "train_features.parquet": "Training data from Part 1",
        "item_stats.parquet": "Item statistics from Part 1",
        "consolidated_covisitation_matrices.pkl": "Co-visitation matrices from Part 2A"
    }

    for filename, description in required_files.items():
        filepath = f"{config.OUTPUT_PATH}/{filename}"
        if not os.path.exists(filepath):
            log(f"ERROR: Missing {filename} - {description}")
            log("Required steps:")
            log("  1. Run Part 1 (Data Processing) to generate training data")
            log("  2. Run Part 2A (Co-visitation Matrix Generation)")
            raise FileNotFoundError(f"Missing required file: {filename}")

        file_size = os.path.getsize(filepath) / (1024*1024)
        log(f"  ✓ {filename} - {file_size:.1f} MB")

    log("All required input files validated!")

    # Load data with enhanced error handling
    log("\nLoading input data...")

    try:
        # Load training features with lazy evaluation
        log("  Loading training features...")
        train_features_path = f"{config.OUTPUT_PATH}/train_features.parquet"
        file_size_mb = os.path.getsize(train_features_path) / (1024*1024)
        log(f"    File size: {file_size_mb:.1f} MB")

        # Use lazy loading for large datasets
        train_features_lazy = pl.scan_parquet(train_features_path)

        # Get basic statistics safely
        basic_stats = train_features_lazy.select([
            pl.count().alias('total_rows'),
            pl.col('session').n_unique().alias('unique_sessions'),
            pl.col("ts").min().alias("min_ts"),
            pl.col("ts").max().alias("max_ts")
        ]).collect()

        total_rows = basic_stats['total_rows'][0]
        unique_sessions = basic_stats['unique_sessions'][0]
        min_ts = basic_stats['min_ts'][0]
        max_ts = basic_stats['max_ts'][0]
        timespan_days = (max_ts - min_ts) / (1000 * 60 * 60 * 24)

        log(f"    Training data: {total_rows:,} events, {unique_sessions:,} sessions")
        log(f"    Timespan: {timespan_days:.1f} days")

        check_memory_usage("Training data stats")

        # Load item statistics
        log("  Loading item statistics...")
        item_stats = pl.read_parquet(f"{config.OUTPUT_PATH}/item_stats.parquet")
        log(f"    Item stats: {item_stats.shape} ({item_stats.estimated_size('mb'):.1f} MB)")

        # Load co-visitation matrices with fallback options
        log("  Loading co-visitation matrices...")
        covisit_files = [
            "consolidated_covisitation_matrices.pkl",
            "consolidated_covisitation_matrices_partial.pkl",
            "consolidated_covisitation_matrices_minimal.pkl"
        ]

        consolidated_covisitation_matrices = None
        matrix_source = None

        for filename in covisit_files:
            filepath = f"{config.OUTPUT_PATH}/{filename}"
            if os.path.exists(filepath):
                try:
                    with open(filepath, "rb") as f:
                        consolidated_covisitation_matrices = pickle.load(f)
                    matrix_source = filename
                    file_size = os.path.getsize(filepath) / (1024*1024)
                    log(f"    ✓ Loaded: {filename} ({file_size:.1f} MB)")
                    break
                except Exception as e:
                    log(f"    ✗ Failed to load {filename}: {e}")
                    continue

        if consolidated_covisitation_matrices is None:
            raise FileNotFoundError("No co-visitation matrices could be loaded!")

        check_memory_usage("Co-visitation matrices loading")

        # Enhanced matrix validation
        log("  Validating co-visitation matrices...")
        matrix_validation = {}
        total_source_items = 0

        if isinstance(consolidated_covisitation_matrices, dict):
            matrices_data = consolidated_covisitation_matrices.get('matrices', consolidated_covisitation_matrices)

            for name, matrix in matrices_data.items():
                try:
                    if isinstance(matrix, dict):
                        source_items = len(matrix)
                        # Sample a few entries to validate structure
                        sample_keys = list(matrix.keys())[:5]
                        valid_entries = 0
                        for key in sample_keys:
                            if isinstance(matrix[key], (list, dict)) and len(matrix[key]) > 0:
                                valid_entries += 1

                        matrix_validation[name] = {
                            "source_items": source_items,
                            "valid_entries": valid_entries,
                            "quality": "good" if valid_entries > 0 else "empty"
                        }
                        total_source_items += source_items
                        log(f"    {name}: {source_items:,} source items, quality: {matrix_validation[name]['quality']}")
                    else:
                        matrix_validation[name] = {"error": f"Invalid matrix type: {type(matrix)}"}
                        log(f"    {name}: Invalid type {type(matrix)}")
                except Exception as e:
                    matrix_validation[name] = {"error": str(e)}
                    log(f"    {name}: Validation error - {e}")

        log(f"  Total source items across all matrices: {total_source_items:,}")

        # Create validation results
        validation_results = {
            "timestamp": datetime.now().isoformat(),
            "matrix_source": matrix_source,
            "train_sessions": unique_sessions,
            "train_events": total_rows,
            "timespan_days": timespan_days,
            "matrix_validation": matrix_validation,
            "total_source_items": total_source_items
        }

        log("Input validation completed successfully!")
        return train_features_lazy, item_stats, consolidated_covisitation_matrices, validation_results

    except Exception as e:
        log(f"Error loading input data: {e}")
        raise e

# Load inputs
train_features_lazy, item_stats, consolidated_covisitation_matrices, validation_results = validate_and_load_inputs()

[2025-08-08 02:19:32] [MEM: 0.2GB/0.3%/OK] Validating input files...
[2025-08-08 02:19:37] [MEM: 0.2GB/0.3%/OK]   ✓ train_features.parquet - 3893.1 MB
[2025-08-08 02:19:37] [MEM: 0.2GB/0.3%/OK]   ✓ item_stats.parquet - 0.0 MB
[2025-08-08 02:19:37] [MEM: 0.2GB/0.3%/OK]   ✓ consolidated_covisitation_matrices.pkl - 1122.3 MB
[2025-08-08 02:19:37] [MEM: 0.2GB/0.3%/OK] All required input files validated!
[2025-08-08 02:19:37] [MEM: 0.2GB/0.3%/OK] 
Loading input data...
[2025-08-08 02:19:37] [MEM: 0.2GB/0.3%/OK]   Loading training features...
[2025-08-08 02:19:37] [MEM: 0.2GB/0.3%/OK]     File size: 3893.1 MB
[2025-08-08 02:20:49] [MEM: 2.1GB/4.1%/OK]     Training data: 216,384,937 events, 12,899,779 sessions
[2025-08-08 02:20:49] [MEM: 2.1GB/4.1%/OK]     Timespan: 28.0 days
[2025-08-08 02:20:49] [MEM: 2.1GB/4.1%/OK] Emergency memory cleanup during Training data stats: 2.1GB (4.1%)
[2025-08-08 02:20:49] [MEM: 2.1GB/4.1%/OK] Memory after cleanup: 2.1GB
[2025-08-08 02:20:49] [MEM: 2.1GB/4.1%/O

## ENHANCED CANDIDATE GENERATION FUNCTIONS

In [5]:
def create_enhanced_candidate_generator(covisitation_matrices: Dict, item_stats: pl.DataFrame):
    """
    Create enhanced candidate generation function with better positive sample generation
    """
    log("Creating enhanced candidate generation system...")

    # Extract matrices safely
    if 'matrices' in covisitation_matrices:
        matrices = covisitation_matrices['matrices']
    else:
        matrices = covisitation_matrices

    # Validate and prepare matrices
    usable_matrices = {}
    for name, matrix in matrices.items():
        if isinstance(matrix, dict) and len(matrix) > 0:
            usable_matrices[name] = matrix
            sample_key = next(iter(matrix.keys()))
            sample_value = matrix[sample_key]
            log(f"  Matrix '{name}': {len(matrix):,} items, sample structure: {type(sample_value)}")

    # Create item popularity mapping
    item_popularity = {}
    if len(item_stats) > 0:
        for row in item_stats.iter_rows(named=True):
            aid = row['aid']
            popularity = row.get('total_interactions', row.get('clicks', 0))
            item_popularity[aid] = popularity

    log(f"Candidate generation ready: {len(usable_matrices)} matrices, {len(item_popularity)} items")

    def generate_validation_candidates(session_data: pl.DataFrame, session_id: int) -> Dict[str, Set[int]]:
        """
        Enhanced candidate generation with better positive sample strategies
        """
        candidates = {"clicks": set(), "carts": set(), "orders": set()}

        try:
            # Get session items by type
            session_items = session_data.to_dicts()
            click_items = [item['aid'] for item in session_items if item['type'] == 'clicks']
            cart_items = [item['aid'] for item in session_items if item['type'] == 'carts']
            order_items = [item['aid'] for item in session_items if item['type'] == 'orders']

            # Strategy 1: Recent interaction-based candidates (higher weight)
            recent_items = click_items[-10:] if len(click_items) >= 10 else click_items

            for item in recent_items:
                # Click-to-click relationships
                if 'click_to_click' in usable_matrices and item in usable_matrices['click_to_click']:
                    matrix_candidates = usable_matrices['click_to_click'][item]
                    if isinstance(matrix_candidates, list):
                        for candidate_info in matrix_candidates[:config.MAX_CLICK_CANDIDATES]:
                            if isinstance(candidate_info, (list, tuple)) and len(candidate_info) >= 1:
                                candidates["clicks"].add(candidate_info[0])
                            elif isinstance(candidate_info, (int, np.integer)):
                                candidates["clicks"].add(int(candidate_info))
                    elif isinstance(matrix_candidates, dict):
                        for candidate in list(matrix_candidates.keys())[:config.MAX_CLICK_CANDIDATES]:
                            candidates["clicks"].add(candidate)

                # Click-to-buy relationships (higher conversion potential)
                if 'click_to_buy' in usable_matrices and item in usable_matrices['click_to_buy']:
                    matrix_candidates = usable_matrices['click_to_buy'][item]
                    if isinstance(matrix_candidates, list):
                        for candidate_info in matrix_candidates[:config.MAX_CART_CANDIDATES]:
                            if isinstance(candidate_info, (list, tuple)) and len(candidate_info) >= 1:
                                candidate_id = candidate_info[0]
                                candidates["carts"].add(candidate_id)
                                candidates["orders"].add(candidate_id)
                            elif isinstance(candidate_info, (int, np.integer)):
                                candidate_id = int(candidate_info)
                                candidates["carts"].add(candidate_id)
                                candidates["orders"].add(candidate_id)
                    elif isinstance(matrix_candidates, dict):
                        for candidate in list(matrix_candidates.keys())[:config.MAX_CART_CANDIDATES]:
                            candidates["carts"].add(candidate)
                            candidates["orders"].add(candidate)

            # Strategy 2: Buy-to-buy relationships (for users with purchase history)
            if order_items or cart_items:
                purchase_items = list(set(order_items + cart_items))[-5:]  # Recent purchases

                for item in purchase_items:
                    if 'buy_to_buy' in usable_matrices and item in usable_matrices['buy_to_buy']:
                        matrix_candidates = usable_matrices['buy_to_buy'][item]
                        if isinstance(matrix_candidates, list):
                            for candidate_info in matrix_candidates[:config.MAX_ORDER_CANDIDATES]:
                                if isinstance(candidate_info, (list, tuple)) and len(candidate_info) >= 1:
                                    candidate_id = candidate_info[0]
                                    candidates["orders"].add(candidate_id)
                                    candidates["carts"].add(candidate_id)
                                elif isinstance(candidate_info, (int, np.integer)):
                                    candidate_id = int(candidate_info)
                                    candidates["orders"].add(candidate_id)
                                    candidates["carts"].add(candidate_id)
                        elif isinstance(matrix_candidates, dict):
                            for candidate in list(matrix_candidates.keys())[:config.MAX_ORDER_CANDIDATES]:
                                candidates["orders"].add(candidate)
                                candidates["carts"].add(candidate)

            # Strategy 3: Popular item fallback (ensure minimum candidates)
            popular_items = sorted(item_popularity.items(), key=lambda x: x[1], reverse=True)

            for event_type in ["clicks", "carts", "orders"]:
                if len(candidates[event_type]) < config.MIN_CANDIDATES_PER_SESSION:
                    needed = config.MIN_CANDIDATES_PER_SESSION - len(candidates[event_type])
                    for item_id, _ in popular_items[:needed * 2]:  # Get extra to filter
                        if item_id not in candidates[event_type]:
                            candidates[event_type].add(item_id)
                            if len(candidates[event_type]) >= config.MIN_CANDIDATES_PER_SESSION:
                                break

            # Strategy 4: Add some actual session items as positive candidates (CRITICAL FOR POSITIVE SAMPLES)
            # This ensures we have positive labels by including items that were actually interacted with
            if click_items:
                # Add recent clicked items as click candidates
                for item in click_items[-5:]:
                    candidates["clicks"].add(item)

            if cart_items:
                # Add cart items as cart candidates
                for item in cart_items[-3:]:
                    candidates["carts"].add(item)

            if order_items:
                # Add order items as order candidates
                for item in order_items[-3:]:
                    candidates["orders"].add(item)

            # Ensure minimum diversity
            for event_type in candidates:
                candidates[event_type] = set(list(candidates[event_type])[:config.VALIDATION_CANDIDATES_PER_TYPE])

        except Exception as e:
            # Fallback: use popular items
            popular_items = sorted(item_popularity.items(), key=lambda x: x[1], reverse=True)
            for event_type in candidates:
                candidates[event_type] = set([item_id for item_id, _ in popular_items[:config.MIN_CANDIDATES_PER_SESSION]])

        return candidates

    return generate_validation_candidates

# Create candidate generator
generate_validation_candidates = create_enhanced_candidate_generator(consolidated_covisitation_matrices, item_stats)

[2025-08-08 02:21:38] [MEM: 11.7GB/22.9%/OK] Creating enhanced candidate generation system...
[2025-08-08 02:21:38] [MEM: 11.7GB/22.9%/OK]   Matrix 'click_to_click': 1,805,562 items, sample structure: <class 'list'>
[2025-08-08 02:21:38] [MEM: 11.7GB/22.9%/OK]   Matrix 'click_to_buy': 379,726 items, sample structure: <class 'list'>
[2025-08-08 02:21:38] [MEM: 11.7GB/22.9%/OK]   Matrix 'buy_to_buy': 204,530 items, sample structure: <class 'list'>
[2025-08-08 02:21:38] [MEM: 11.7GB/22.9%/OK] Candidate generation ready: 3 matrices, 1000 items


## ENHANCED VALIDATION DATA GENERATION

In [6]:
def generate_enhanced_validation_data(train_features_lazy: pl.LazyFrame,
                                     validation_results: Dict,
                                     generate_candidates_func) -> Tuple[pl.DataFrame, pl.DataFrame, Dict]:
    """
    Generate enhanced validation data with better positive sample generation
    """
    log("Starting enhanced validation data generation...")

    try:
        # Calculate validation cutoff with more data
        train_min_ts = validation_results.get('train_events', 0)
        timespan_days = validation_results.get('timespan_days', 28)

        # Get timestamp statistics
        ts_stats = train_features_lazy.select([
            pl.col("ts").min().alias("min_ts"),
            pl.col("ts").max().alias("max_ts")
        ]).collect()

        min_ts = ts_stats['min_ts'][0]
        max_ts = ts_stats['max_ts'][0]

        # Set validation cutoff (last N days)
        validation_cutoff = max_ts - (config.VALIDATION_DAYS * 24 * 60 * 60 * 1000)

        log(f"  Validation period: last {config.VALIDATION_DAYS} days")
        log(f"  Cutoff timestamp: {validation_cutoff}")
        log(f"  Target sessions: {config.MAX_VALIDATION_SESSIONS:,}")

        # Get validation sessions with better sampling
        log("  Selecting validation sessions...")

        # Get sessions from validation period with activity filtering
        validation_sessions_query = (
            train_features_lazy
            .filter(pl.col("ts") >= validation_cutoff)
            .group_by("session")
            .agg([
                pl.col("ts").count().alias("session_length"),
                pl.col("type").n_unique().alias("unique_types"),
                pl.col("aid").n_unique().alias("unique_items"),
                pl.col("ts").max().alias("last_ts")
            ])
            .filter(
                (pl.col("session_length") >= 3) &  # Minimum session activity
                (pl.col("unique_items") >= 2) &    # Minimum item diversity
                (pl.col("unique_types") >= 1)      # At least one interaction type
            )
            .sort("last_ts", descending=True)  # Prefer recent sessions
            .limit(config.MAX_VALIDATION_SESSIONS)
        )

        validation_sessions = validation_sessions_query.collect()
        selected_sessions = validation_sessions['session'].to_list()

        log(f"  Selected {len(selected_sessions):,} validation sessions")

        if len(selected_sessions) == 0:
            raise ValueError("No validation sessions found with the criteria")

        check_memory_usage("Validation session selection")

        # Process sessions in chunks for memory efficiency
        log("  Generating candidates and labels...")

        chunk_size = config.CHUNK_SIZE
        total_chunks = (len(selected_sessions) + chunk_size - 1) // chunk_size

        all_validation_data = []
        all_ground_truth = []

        for chunk_idx in range(total_chunks):
            start_idx = chunk_idx * chunk_size
            end_idx = min(start_idx + chunk_size, len(selected_sessions))
            chunk_sessions = selected_sessions[start_idx:end_idx]

            if chunk_idx % 10 == 0:
                progress = (chunk_idx / total_chunks) * 100
                log(f"    Processing chunk {chunk_idx + 1}/{total_chunks} ({progress:.1f}%)")

            # Get session data for this chunk
            chunk_data = (
                train_features_lazy
                .filter(pl.col("session").is_in(chunk_sessions))
                .collect()
            )

            # Process each session in the chunk
            for session_id in chunk_sessions:
                session_data = chunk_data.filter(pl.col("session") == session_id)

                if len(session_data) == 0:
                    continue

                # Split session into train/test parts
                session_events = session_data.sort("ts")
                total_events = len(session_events)

                if total_events < 3:
                    continue

                # Use 70% for history, 30% for prediction
                split_idx = max(1, int(total_events * 0.7))
                train_part = session_events[:split_idx]
                test_part = session_events[split_idx:]

                # Generate candidates based on train part
                candidates = generate_candidates_func(train_part, session_id)

                # Create ground truth from test part
                test_interactions = {}
                for row in test_part.iter_rows(named=True):
                    event_type = row['type']
                    aid = row['aid']
                    if event_type not in test_interactions:
                        test_interactions[event_type] = set()
                    test_interactions[event_type].add(aid)

                # Generate validation samples
                for event_type in ["clicks", "carts", "orders"]:
                    if event_type in candidates and len(candidates[event_type]) > 0:
                        # Create ground truth entry
                        true_items = test_interactions.get(event_type, set())
                        if len(true_items) > 0:
                            all_ground_truth.append({
                                "session": session_id,
                                "type": event_type,
                                "ground_truth": list(true_items)
                            })

                        # Create validation samples
                        for candidate_aid in candidates[event_type]:
                            label = 1 if candidate_aid in true_items else 0

                            all_validation_data.append({
                                "session": session_id,
                                "aid": candidate_aid,
                                "type": event_type,
                                "label": label
                            })

            # Memory cleanup
            if chunk_idx % 5 == 0:
                check_memory_usage("Chunk processing")

        # Create final DataFrames
        log("  Creating final DataFrames...")

        if len(all_validation_data) == 0:
            raise ValueError("No validation data generated")

        val_data = pl.DataFrame(all_validation_data)
        val_ground_truth = pl.DataFrame(all_ground_truth) if all_ground_truth else pl.DataFrame()

        # Calculate statistics
        total_samples = len(val_data)
        positive_samples = val_data.filter(pl.col("label") == 1).height
        positive_rate = (positive_samples / total_samples) * 100 if total_samples > 0 else 0
        unique_sessions = val_data['session'].n_unique()
        unique_items = val_data['aid'].n_unique()

        log(f"  Validation data generated:")
        log(f"    Total samples: {total_samples:,}")
        log(f"    Positive samples: {positive_samples:,} ({positive_rate:.2f}%)")
        log(f"    Unique sessions: {unique_sessions:,}")
        log(f"    Unique items: {unique_items:,}")
        log(f"    Ground truth entries: {len(val_ground_truth):,}")

        # Validate data quality
        if positive_rate < config.MIN_POSITIVE_RATE * 100:
            log(f"  WARNING: Low positive rate {positive_rate:.2f}%, target was {config.MIN_POSITIVE_RATE * 100:.2f}%")

        if unique_items < config.MIN_ITEM_DIVERSITY:
            log(f"  WARNING: Low item diversity {unique_items}, target was {config.MIN_ITEM_DIVERSITY}")

        # Create split information
        split_info = {
            "creation_timestamp": datetime.now().isoformat(),
            "validation_days": config.VALIDATION_DAYS,
            "val_cutoff_timestamp": validation_cutoff,
            "total_timespan_days": timespan_days,
            "train": {
                "sessions": validation_results.get('train_sessions', 0),
                "events": validation_results.get('train_events', 0),
                "cutoff_ts": validation_cutoff
            },
            "val": {
                "sessions": unique_sessions,
                "samples": total_samples,
                "positive_samples": positive_samples,
                "positive_rate": positive_rate,
                "unique_items": unique_items
            }
        }

        log("Enhanced validation data generation completed!")
        return val_data, val_ground_truth, split_info

    except Exception as e:
        log(f"Error generating validation data: {e}")
        raise e

# Generate validation data
val_data, val_ground_truth, split_info = generate_enhanced_validation_data(
    train_features_lazy, validation_results, generate_validation_candidates
)

check_memory_usage("Validation data generation", force_cleanup=True)

[2025-08-08 02:21:38] [MEM: 11.7GB/22.9%/OK] Starting enhanced validation data generation...
[2025-08-08 02:21:41] [MEM: 10.7GB/21.0%/OK]   Validation period: last 3 days
[2025-08-08 02:21:41] [MEM: 10.7GB/21.0%/OK]   Cutoff timestamp: 1661464799984
[2025-08-08 02:21:41] [MEM: 10.7GB/21.0%/OK]   Target sessions: 25,000
[2025-08-08 02:21:41] [MEM: 10.7GB/21.0%/OK]   Selecting validation sessions...
[2025-08-08 02:21:55] [MEM: 10.5GB/20.5%/OK]   Selected 25,000 validation sessions
[2025-08-08 02:21:55] [MEM: 10.5GB/20.5%/OK] Emergency memory cleanup during Validation session selection: 10.5GB (20.5%)
[2025-08-08 02:22:03] [MEM: 10.4GB/20.4%/OK] Memory after cleanup: 10.4GB
[2025-08-08 02:22:03] [MEM: 10.4GB/20.4%/OK]   Generating candidates and labels...
[2025-08-08 02:22:03] [MEM: 10.4GB/20.4%/OK]     Processing chunk 1/13 (0.0%)
[2025-08-08 02:22:57] [MEM: 9.9GB/19.5%/OK] Emergency memory cleanup during Chunk processing: 9.9GB (19.5%)
[2025-08-08 02:23:06] [MEM: 9.9GB/19.5%/OK] Memory 

11.24477767944336

## ENHANCED DATA ANALYSIS

In [7]:
def analyze_enhanced_training_data(val_data: pl.DataFrame,
                                 split_info: Dict,
                                 validation_results: Dict) -> Dict:
    """
    Enhanced analysis of training data quality
    """
    log("Analyzing enhanced training data...")

    try:
        # Basic statistics
        total_samples = len(val_data)
        positive_samples = val_data.filter(pl.col("label") == 1).height
        positive_rate = (positive_samples / total_samples) * 100 if total_samples > 0 else 0
        unique_sessions = val_data['session'].n_unique()
        unique_items = val_data['aid'].n_unique()

        log(f"  Enhanced Data Quality Assessment:")
        log(f"    Total samples: {total_samples:,}")
        log(f"    Positive samples: {positive_samples:,} ({positive_rate:.2f}%)")
        log(f"    Unique sessions: {unique_sessions:,}")
        log(f"    Unique items: {unique_items:,}")

        # Per-type analysis
        type_analysis = {}
        for event_type in ["clicks", "carts", "orders"]:
            type_data = val_data.filter(pl.col("type") == event_type)
            if len(type_data) > 0:
                type_positive = type_data.filter(pl.col("label") == 1).height
                type_total = len(type_data)
                type_rate = (type_positive / type_total) * 100 if type_total > 0 else 0
                type_sessions = type_data['session'].n_unique()

                type_analysis[event_type] = {
                    "total_samples": type_total,
                    "positive_samples": type_positive,
                    "positive_rate": type_rate,
                    "sessions": type_sessions
                }

                log(f"    {event_type}: {type_total:,} samples, {type_positive:,} positive ({type_rate:.2f}%), {type_sessions:,} sessions")

        # Session-level analysis
        session_stats = (
            val_data
            .group_by("session")
            .agg([
                pl.col("label").sum().alias("positive_count"),
                pl.col("label").count().alias("total_count")
            ])
        )

        avg_positive_per_session = session_stats['positive_count'].mean()
        avg_total_per_session = session_stats['total_count'].mean()

        log(f"  Session-Level Analysis:")
        log(f"    Avg positive per session: {avg_positive_per_session:.1f}")
        log(f"    Avg total per session: {avg_total_per_session:.1f}")

        # Item popularity analysis
        item_stats = (
            val_data
            .group_by("aid")
            .agg([
                pl.col("label").sum().alias("positive_count"),
                pl.col("label").count().alias("total_count")
            ])
            .sort("total_count", descending=True)
        )

        top_items = item_stats.head(5)
        log(f"  Top 5 most frequent items:")
        for i, row in enumerate(top_items.iter_rows(named=True)):
            log(f"    {i+1}. Item {row['aid']}: {row['total_count']} appearances, {row['positive_count']} positive")

        # Quality checks
        quality_checks = {
            "sufficient_samples": total_samples >= 100000,
            "adequate_positive_rate": positive_rate >= 0.1,
            "item_diversity": unique_items >= 50,
            "session_coverage": unique_sessions >= 1000,
            "balanced_types": len(type_analysis) == 3
        }

        overall_quality = "EXCELLENT" if all(quality_checks.values()) else "GOOD" if sum(quality_checks.values()) >= 3 else "NEEDS_IMPROVEMENT"

        log(f"  Quality Assessment:")
        for check, passed in quality_checks.items():
            status = "✓" if passed else "✗"
            log(f"    {status} {check}: {'PASS' if passed else 'FAIL'}")
        log(f"    Overall quality: {overall_quality}")

        # Create comprehensive statistics
        statistics = {
            "analysis_timestamp": datetime.now().isoformat(),
            "data_quality": overall_quality,
            "validation_data": {
                "total_samples": total_samples,
                "positive_samples": positive_samples,
                "positive_rate": positive_rate,
                "unique_sessions": unique_sessions,
                "unique_items": unique_items
            },
            "event_type_analysis": type_analysis,
            "session_statistics": {
                "avg_positive_per_session": avg_positive_per_session,
                "avg_total_per_session": avg_total_per_session
            },
            "quality_checks": quality_checks,
            "split_info": split_info,
            "input_validation": validation_results
        }

        log("Enhanced data analysis completed!")
        return statistics

    except Exception as e:
        log(f"Error in data analysis: {e}")
        return {"error": str(e), "timestamp": datetime.now().isoformat()}

# Analyze the generated data
training_data_statistics = analyze_enhanced_training_data(val_data, split_info, validation_results)

[2025-08-08 02:34:30] [MEM: 11.2GB/22.0%/OK] Analyzing enhanced training data...
[2025-08-08 02:34:31] [MEM: 11.2GB/22.1%/OK]   Enhanced Data Quality Assessment:
[2025-08-08 02:34:31] [MEM: 11.2GB/22.1%/OK]     Total samples: 6,617,259
[2025-08-08 02:34:31] [MEM: 11.2GB/22.1%/OK]     Positive samples: 55,688 (0.84%)
[2025-08-08 02:34:31] [MEM: 11.2GB/22.1%/OK]     Unique sessions: 25,000
[2025-08-08 02:34:31] [MEM: 11.2GB/22.1%/OK]     Unique items: 543,691
[2025-08-08 02:34:31] [MEM: 11.3GB/22.2%/OK]     clicks: 2,313,996 samples, 49,187 positive (2.13%), 25,000 sessions
[2025-08-08 02:34:31] [MEM: 11.3GB/22.2%/OK]     carts: 2,153,154 samples, 4,646 positive (0.22%), 25,000 sessions
[2025-08-08 02:34:31] [MEM: 11.3GB/22.2%/OK]     orders: 2,150,109 samples, 1,855 positive (0.09%), 25,000 sessions
[2025-08-08 02:34:31] [MEM: 10.2GB/20.0%/OK]   Session-Level Analysis:
[2025-08-08 02:34:31] [MEM: 10.2GB/20.0%/OK]     Avg positive per session: 2.2
[2025-08-08 02:34:31] [MEM: 10.2GB/20.0%

## ENHANCED OUTPUT SAVING

In [8]:
def save_enhanced_outputs(val_data: pl.DataFrame,
                         val_ground_truth: pl.DataFrame,
                         split_info: Dict,
                         statistics: Dict) -> Dict:
    """
    Save all outputs with enhanced error handling
    """
    log("Saving enhanced outputs...")

    try:
        # Ensure output directory exists
        os.makedirs(config.OUTPUT_PATH, exist_ok=True)

        output_paths = {}

        # 1. Save validation data (main output)
        val_data_path = f"{config.OUTPUT_PATH}/val_data.parquet"
        val_data.write_parquet(val_data_path, compression="snappy")
        file_size = os.path.getsize(val_data_path) / (1024*1024)
        log(f"  ✓ val_data.parquet saved ({file_size:.1f} MB)")
        output_paths["val_data_path"] = val_data_path

        # 2. Save train/validation splits
        splits_path = f"{config.OUTPUT_PATH}/train_val_splits.pkl"
        with open(splits_path, "wb") as f:
            pickle.dump(split_info, f)
        log(f"  ✓ train_val_splits.pkl saved")
        output_paths["splits_path"] = splits_path

        # 3. Save validation ground truth
        if len(val_ground_truth) > 0:
            gt_path = f"{config.OUTPUT_PATH}/validation_ground_truth.parquet"
            val_ground_truth.write_parquet(gt_path, compression="snappy")
            file_size = os.path.getsize(gt_path) / (1024*1024)
            log(f"  ✓ validation_ground_truth.parquet saved ({file_size:.1f} MB)")
            output_paths["ground_truth_path"] = gt_path

        # 4. Save training data statistics (use pickle instead of JSON to avoid serialization issues)
        stats_path = f"{config.OUTPUT_PATH}/training_data_statistics.pkl"
        with open(stats_path, "wb") as f:
            pickle.dump(statistics, f)
        log(f"  ✓ training_data_statistics.pkl saved")
        output_paths["statistics_path"] = stats_path

        # 5. Save summary report
        summary = {
            "notebook": "Part 2B2: Enhanced Training Data Preparation",
            "completion_timestamp": datetime.now().isoformat(),
            "version": "FIXED - Enhanced positive sample generation",
            "data_quality": statistics.get("data_quality", "UNKNOWN"),
            "key_improvements": [
                f"Increased validation sessions to {config.MAX_VALIDATION_SESSIONS:,}",
                f"Increased candidates per type to {config.VALIDATION_CANDIDATES_PER_TYPE}",
                "Enhanced candidate generation strategies",
                "Fixed variable scoping and error handling",
                "Improved positive sample generation"
            ],
            "outputs_generated": {
                "val_data.parquet": f"{len(val_data):,} samples with labels",
                "train_val_splits.pkl": "Enhanced train/validation split info",
                "validation_ground_truth.parquet": f"{len(val_ground_truth):,} ground truth entries",
                "training_data_statistics.pkl": "Comprehensive data analysis"
            },
            "quality_metrics": statistics.get("validation_data", {}),
            "next_step": "Run Part 2B3: Feature Engineering"
        }

        summary_path = f"{config.OUTPUT_PATH}/part_2b2_enhanced_summary.pkl"
        with open(summary_path, "wb") as f:
            pickle.dump(summary, f)
        log(f"  ✓ part_2b2_enhanced_summary.pkl saved")
        output_paths["summary_path"] = summary_path

        log("All enhanced outputs saved successfully!")
        return output_paths

    except Exception as e:
        log(f"Error saving outputs: {e}")
        raise e

# Save all outputs
output_paths = save_enhanced_outputs(val_data, val_ground_truth, split_info, training_data_statistics)

[2025-08-08 02:34:31] [MEM: 10.2GB/20.0%/OK] Saving enhanced outputs...
[2025-08-08 02:34:32] [MEM: 10.2GB/20.0%/OK]   ✓ val_data.parquet saved (24.4 MB)
[2025-08-08 02:34:32] [MEM: 10.2GB/20.0%/OK]   ✓ train_val_splits.pkl saved
[2025-08-08 02:34:32] [MEM: 10.2GB/20.0%/OK]   ✓ validation_ground_truth.parquet saved (1.9 MB)
[2025-08-08 02:34:32] [MEM: 10.2GB/20.0%/OK]   ✓ training_data_statistics.pkl saved
[2025-08-08 02:34:32] [MEM: 10.2GB/20.0%/OK]   ✓ part_2b2_enhanced_summary.pkl saved
[2025-08-08 02:34:32] [MEM: 10.2GB/20.0%/OK] All enhanced outputs saved successfully!


## ENHANCED FINAL SUMMARY

In [9]:
log("\n" + "="*80)
log("PART 2B2 COMPLETED: ENHANCED TRAINING DATA PREPARATION")
log("="*80)

# Display key results
val_data_info = training_data_statistics.get('validation_data', {})
log(f"\nKEY RESULTS (ENHANCED):")
log(f"  Validation samples: {val_data_info.get('total_samples', 0):,}")
log(f"  Positive samples: {val_data_info.get('positive_samples', 0):,} ({val_data_info.get('positive_rate', 0):.2f}%)")
log(f"  Unique sessions: {val_data_info.get('unique_sessions', 0):,}")
log(f"  Unique items: {val_data_info.get('unique_items', 0):,}")
log(f"  Data quality: {training_data_statistics.get('data_quality', 'UNKNOWN')}")

# Per-type breakdown
log(f"\nPER-TYPE ANALYSIS:")
type_analysis = training_data_statistics.get('event_type_analysis', {})
for event_type in ["clicks", "carts", "orders"]:
    if event_type in type_analysis:
        stats = type_analysis[event_type]
        log(f"  {event_type}: {stats['total_samples']:,} samples, {stats['positive_samples']:,} positive ({stats['positive_rate']:.2f}%)")

# Output files
log(f"\nOUTPUT FILES GENERATED:")
log(f"  val_data.parquet ({os.path.getsize(output_paths['val_data_path'])/(1024*1024):.1f} MB)")
log(f"  train_val_splits.pkl")
if "ground_truth_path" in output_paths:
    log(f"  validation_ground_truth.parquet ({os.path.getsize(output_paths['ground_truth_path'])/(1024*1024):.1f} MB)")
log(f"  training_data_statistics.pkl")
log(f"  part_2b2_enhanced_summary.pkl")
log(f"  All files saved to: {config.OUTPUT_PATH}")

# Quality assessment
quality_checks = training_data_statistics.get('quality_checks', {})
log(f"\nQUALITY ASSESSMENT:")
for check, passed in quality_checks.items():
    status = "yes" if passed else "no"
    log(f"  {check.replace('_', ' ').title()}: {status}")

overall_quality = training_data_statistics.get('data_quality', 'UNKNOWN')
log(f"\nOverall Quality: {overall_quality}")

# Performance summary
final_memory = get_memory_usage()
log(f"\nPERFORMANCE SUMMARY:")
log(f"  Final memory usage: {final_memory:.1f} GB")
log(f"  Max validation sessions: {config.MAX_VALIDATION_SESSIONS:,}")
log(f"  Candidates per type: {config.VALIDATION_CANDIDATES_PER_TYPE}")
log(f"  Enhanced features: Enabled")

# Final cleanup
log(f"\nPerforming final cleanup...")
try:
    del train_features_lazy, consolidated_covisitation_matrices
    force_garbage_collection()
    final_memory = get_memory_usage()
    log(f"Final memory usage after cleanup: {final_memory:.1f} GB")
except Exception as e:
    log(f"Cleanup warning: {e}")

log(f"\nPart 2B2 Enhanced finished successfully!")
log("="*80)

[2025-08-08 02:34:32] [MEM: 10.2GB/20.0%/OK] 
[2025-08-08 02:34:32] [MEM: 10.2GB/20.0%/OK] PART 2B2 COMPLETED: ENHANCED TRAINING DATA PREPARATION
[2025-08-08 02:34:32] [MEM: 10.2GB/20.0%/OK] 
KEY RESULTS (ENHANCED):
[2025-08-08 02:34:32] [MEM: 10.2GB/20.0%/OK]   Validation samples: 6,617,259
[2025-08-08 02:34:32] [MEM: 10.2GB/20.0%/OK]   Positive samples: 55,688 (0.84%)
[2025-08-08 02:34:32] [MEM: 10.2GB/20.0%/OK]   Unique sessions: 25,000
[2025-08-08 02:34:32] [MEM: 10.2GB/20.0%/OK]   Unique items: 543,691
[2025-08-08 02:34:32] [MEM: 10.2GB/20.0%/OK]   Data quality: EXCELLENT
[2025-08-08 02:34:32] [MEM: 10.2GB/20.0%/OK] 
PER-TYPE ANALYSIS:
[2025-08-08 02:34:32] [MEM: 10.2GB/20.0%/OK]   clicks: 2,313,996 samples, 49,187 positive (2.13%)
[2025-08-08 02:34:32] [MEM: 10.2GB/20.0%/OK]   carts: 2,153,154 samples, 4,646 positive (0.22%)
[2025-08-08 02:34:32] [MEM: 10.2GB/20.0%/OK]   orders: 2,150,109 samples, 1,855 positive (0.09%)
[2025-08-08 02:34:32] [MEM: 10.2GB/20.0%/OK] 
OUTPUT FILES G