# Moderation Model Training

This notebook demonstrates out-of-core learning for large datasets using PyIceberg and Scikit-Learn.

**Scenarios:**
1.  **Normal Load:** Fits in memory.
2.  **Memory Constraint:** REAL failure (OOM) with large dataset.
3.  **Streaming:** Incremental learning solution.

In [None]:
import os
import logging
import pandas as pd
import numpy as np
from pyiceberg.catalog import load_catalog
from sklearn.linear_model import LogisticRegression, SGDClassifier
from sklearn.preprocessing import StandardScaler, OneHotEncoder
import joblib
import pyarrow as pa

logging.basicConfig(level=logging.ERROR)

# Schema Definition
numeric_features = [
    'time_since_upload_seconds', 'hour_of_day', 'day_of_week', 
    'views_5min', 'views_1hr', 'comments_5min', 'comments_1hr',
    'view_velocity_per_min', 'comment_to_view_ratio', 'recent_engagement_score',
    'caption_length', 'user_image_count', 'user_age_days'
]
categorical_features = ['is_weekend', 'has_caption', 'category']
label_column = 'label_needs_moderation_24h'

## 1. Normal Load (Success)
Loads the **Regular** dataset (fits in RAM).

In [None]:
def scenario_1_normal_load():
    print("--- Scenario 1: In-Memory Training (Small Data) ---")
    try:
        catalog = load_catalog("gourmetgram")
        # Load normal sized table
        table = catalog.load_table("moderation.training_data")
        
        # Load all data
        df = table.scan().to_pandas()
        
        if df.empty:
            print("Dataset empty.")
            return
            
        print(f"Loaded {len(df)} rows. RAM: {df.memory_usage().sum() / 1e6:.2f} MB")
        
        # preprocessing omitted for brevity
        X = df[numeric_features].fillna(0)
        y = df[label_column]
        
        model = LogisticRegression()
        model.fit(X, y)
        print("Training complete.")
        
    except Exception as e:
        print(f"Failed: {e}")

scenario_1_normal_load()

## 2. Memory Constraint (Failure)
Attempts to load the **Large** dataset (6M+ rows). Will CRASH if RAM < 2GB.

In [None]:
def scenario_2_memory_crash_simulation():
    print("--- Scenario 2: Massive Load (Real OOM) ---")
    try:
        catalog = load_catalog("gourmetgram")
        # Load masssive table
        table = catalog.load_table("moderation.large_training_data")
        
        print("Attempting to load HUGE dataset into memory...")
        # THIS WILL CRASH THE KERNEL if memory is limited
        df = table.scan().to_pandas()
        
        print(f"Loaded {len(df)} rows. RAM: {df.memory_usage().sum() / 1e6:.2f} MB")
        
        # Note: We won't reach here if OOM kills the process
        print("Warning: Dataset fit in memory (Upgrade container limits to see crash)")
        
    except Exception as e:
        print(f"\n CRITICAL ERROR: {e}")

scenario_2_memory_crash_simulation()

### NOTE
The kernel will say that it will restart. In this case we have to run the first cell again before we can run the next cell.

## 3. Streaming Solution (Success)
Uses `SGDClassifier` and `ArrowBatchReader` to train incrementally on the **Large** dataset.

In [None]:
def scenario_3_streaming_solution():
    print("--- Scenario 3: Streaming Training (Large Data) ---")
    try:
        catalog = load_catalog("gourmetgram")
        # Load massive table
        table = catalog.load_table("moderation.large_training_data")
        
        # Initialize incremental model
        model = SGDClassifier(loss='log_loss', random_state=42)
        classes = np.array([0, 1])
        
        # Stream batches â€” never loads full dataset into memory
        scanner = table.scan()
        batches = scanner.to_arrow_batch_reader()

        batch_count = 0
        total_rows = 0
        scaler = StandardScaler()
        ohe = OneHotEncoder(handle_unknown='ignore', sparse_output=False)
        first_batch = True
        
        for batch in batches:
            df_batch = batch.to_pandas()
            if df_batch.empty: continue

            X_batch = df_batch[numeric_features + categorical_features]
            y_batch = df_batch[label_column]

            # Preprocessing
            X_num = X_batch[numeric_features].fillna(0)
            X_cat = X_batch[categorical_features].fillna('Unknown').astype(str)

            if first_batch:
                scaler.fit(X_num)
                ohe.fit(X_cat)
                first_batch = False

            X_transformed = np.hstack((
                scaler.transform(X_num),
                ohe.transform(X_cat)
            ))

            # Partial Fit
            model.partial_fit(X_transformed, y_batch, classes=classes)
            
            batch_count += 1
            total_rows += len(df_batch)
            if batch_count % 10 == 0:
                print(f"Batch {batch_count}: {len(df_batch)} rows. RAM stable.")
                
        print(f"\n SUCCESS: {total_rows} rows trained on LARGE dataset.")
        
        # Save Artifacts
        os.makedirs("models", exist_ok=True)
        joblib.dump(model, "models/moderation_model_demo.joblib")
        joblib.dump(scaler, "models/scaler_demo.joblib")
        joblib.dump(ohe, "models/encoder_demo.joblib")
        print("Artifacts saved.")
        
    except Exception as e:
        print(f"Failed: {e}")

scenario_3_streaming_solution()