In [None]:
#!/usr/bin/env python3
# ===============================================
# PREPROCESS 9 - ULTRA GOD MODE SOTA 2026
# Calcule TOUTES les features manquantes depuis z√©ro
# ===============================================
#
# Ce script reproduit les Cellules A-E de l'ancien pipeline
# en 100% Polars, puis applique le pipeline ML standard.
#
# Input: data_clean/ml_ready/matches_ml_ready.parquet
# Output: data_clean/ml_final/
# ===============================================

from pathlib import Path
from datetime import datetime
import time
import json
import gc
import numpy as np
import polars as pl

# ===============================================
# CONFIGURATION
# ===============================================
ROOT = Path.cwd()
DATA_CLEAN = ROOT / "data_clean"

def find_latest_sota_file(ml_ready_dir: Path):
    """D√©tecte automatiquement la derni√®re version SOTA (v5, v6, v7, v8...)."""
    import re
    
    sota_files = list(ml_ready_dir.glob("matches_ml_ready_SOTA_v*.parquet"))
    
    if not sota_files:
        # Fallback to base file
        base = ml_ready_dir / "matches_ml_ready.parquet"
        if base.exists():
            return "BASE", base
        raise FileNotFoundError("No ML-ready file found!")
    
    # Extract version numbers and sort descending
    def get_version(p):
        match = re.search(r'SOTA_v(\d+)', p.stem)
        return int(match.group(1)) if match else 0
    
    sota_files.sort(key=get_version, reverse=True)
    latest = sota_files[0]
    version = f"SOTA_v{get_version(latest)}"
    
    return version, latest

# Usage:
ML_READY_VERSION, ML_READY_FILE = find_latest_sota_file(DATA_CLEAN / "ml_ready")
print(f"üìÅ Auto-detected: {ML_READY_VERSION} ‚Üí {ML_READY_FILE.name}")

# Players master file
PLAYERS_MASTER_FILE = ROOT / "data_atp_detailed" / "atp_master_players.csv"
PLAYERS_MASTER_PARQUET = ROOT / "data_atp_detailed" / "atp_master_players.parquet"

# Output
OUTPUT_DIR = DATA_CLEAN / "ml_final"
OUTPUT_DIR.mkdir(parents=True, exist_ok=True)

# Parameters
GENDER = "atp"
RANDOM_SEED = 42
PI2 = 2 * np.pi

# Split temporel
TRAIN_END_YEAR = 2019
VAL_END_YEAR = 2022

# Feature selection
MAX_FEATURES = 200
MIN_VARIANCE_THRESHOLD = 0.001
MAX_NULL_RATE = 0.95

# SOTA features to ALWAYS include (protected from elimination)
PROTECTED_SOTA_FEATURES = [
    # === EXISTANTES ===
    "tourney_speed_index",
    "pref_ssi_A", "pref_ssi_B", "diff_pref_ssi", "diff_ssi_mismatch",
    "r20_win_rate_vs_top10_A", "r20_win_rate_vs_top10_B",
    "r20_win_rate_vs_top50_A", "r20_win_rate_vs_top50_B",
    "r20_upset_rate_A", "r20_upset_rate_B",
    "r20_choke_rate_A", "r20_choke_rate_B",
    "r20_comeback_rate_A", "r20_comeback_rate_B",
    
    # === NOUVELLES PP_12-15 ===
    "bt_rating_A", "bt_rating_B", "bt_prob_winner",
    "bt_surface_rating_A", "bt_surface_rating_B",
    "bt_recent_rating_A", "bt_recent_rating_B",
    "bt_form_momentum_A", "bt_form_momentum_B",
    "travel_distance_A", "travel_distance_B",
    "timezone_shift_A", "timezone_shift_B",
    "home_advantage_A", "home_advantage_B",
    "is_high_altitude", "tourney_altitude_m",
    "emb_cosine_sim", "emb_l2_distance",
    "seq_cosine_sim", "seq_l2_distance",
    "odds_implied_prob_A", "odds_implied_prob_B",
]

# Colonnes leakage
LEAKAGE_COLS = [
    "score_ta", "duration_minutes_ta",
    "w_ace", "w_df", "w_svpt", "w_1stIn", "w_1stWon", "w_2ndWon",
    "l_ace", "l_df", "l_svpt", "l_1stIn", "l_1stWon", "l_2ndWon",
    "w_bpSaved", "w_bpFaced", "l_bpSaved", "l_bpFaced",
    "winner_name", "loser_name",
]

ID_COLS = [
    "custom_match_id", "match_id_ta_dedup", "match_id_ta_source",
    "winner_id", "loser_id", "tourney_name_ta", "tourney_slug_ta",
]


# ===============================================
# SECTION 1: LOAD & INITIAL CLEAN
# ===============================================

def load_dataset() -> pl.DataFrame:
    """Charge le dataset ML ready."""
    
    print("\n" + "=" * 70)
    print("   SECTION 1: LOAD DATASET")
    print("=" * 70)
    
    print(f"\n  Loading {ML_READY_FILE}...")
    df = pl.read_parquet(ML_READY_FILE)
    print(f"  Shape: {df.shape}")
    
    # Ensure year column exists
    if "year" not in df.columns:
        if "tourney_date_ta" in df.columns:
            if df["tourney_date_ta"].dtype in [pl.Int64, pl.Int32]:
                df = df.with_columns([
                    (pl.col("tourney_date_ta") // 10000).cast(pl.Int32).alias("year")
                ])
            else:
                df = df.with_columns([
                    pl.col("tourney_date_ta").dt.year().cast(pl.Int32).alias("year")
                ])
    
    # Filter valid surfaces
    if "tourney_surface_ta" in df.columns:
        n_before = len(df)
        df = df.filter(pl.col("tourney_surface_ta").is_not_null())
        print(f"  After surface filter: {len(df)} (removed {n_before - len(df)})")
    
    return df


# ===============================================
# SECTION 2A: CELLULE A - Form/Fatigue Features
# ===============================================

def add_cellule_a_features(df: pl.DataFrame) -> pl.DataFrame:
    """
    Cellule A: Rolling features de forme et fatigue.
    Calcule: is_vs_top10/50, rounds_played_tourney, etc.
    """
    
    print("\n" + "=" * 70)
    print("   CELLULE A: FORM/FATIGUE FEATURES")
    print("=" * 70)
    
    n_before = len(df.columns)
    
    # --- is_vs_top10 / is_vs_top50 ---
    # Pour A: adversaire = loser, donc loser_rank_ta
    # Pour B: adversaire = winner, donc winner_rank_ta
    print("\n[A.1] Adding is_vs_top10/50...")
    
    if "loser_rank_ta" in df.columns:
        df = df.with_columns([
            (pl.col("loser_rank_ta").fill_null(9999) <= 10).cast(pl.Int8).alias("is_vs_top10_A"),
            (pl.col("loser_rank_ta").fill_null(9999) <= 50).cast(pl.Int8).alias("is_vs_top50_A"),
        ])
        print("  ‚úÖ is_vs_top10_A, is_vs_top50_A")
    
    if "winner_rank_ta" in df.columns:
        df = df.with_columns([
            (pl.col("winner_rank_ta").fill_null(9999) <= 10).cast(pl.Int8).alias("is_vs_top10_B"),
            (pl.col("winner_rank_ta").fill_null(9999) <= 50).cast(pl.Int8).alias("is_vs_top50_B"),
        ])
        print("  ‚úÖ is_vs_top10_B, is_vs_top50_B")
    
    # --- rounds_played_tourney ---
    print("\n[A.2] Adding rounds_played_tourney...")
    
    # Round order mapping
    round_order = {
        "Q": 0, "Q1": 0, "Q2": 1, "Q3": 2,
        "RR": 16, "R128": 20, "R64": 30, "R56": 35, "R48": 37,
        "R32": 40, "R28": 45, "R24": 47, "R16": 50, "QF": 60, "SF": 70, "F": 80,
        "3RD": 76, "BRONZE": 75
    }
    
    if "round_ta" in df.columns:
        df = df.with_columns([
            pl.col("round_ta").str.to_uppercase().replace(round_order, default=0).cast(pl.Int16).alias("round_ord")
        ])
    
    # Pour calculer rounds_played_tourney, on a besoin de trier par (joueur, tournoi, date, round)
    # et compter le cumsum. Mais ici on n'a pas la vue joueur-level, on a match-level.
    # On va approximer avec round_ord directement.
    if "round_ord" in df.columns:
        # Plus le round_ord est √©lev√©, plus le joueur a jou√© de tours
        # Approximation: round_ord / 10 donne ~nombre de tours jou√©s
        df = df.with_columns([
            (pl.col("round_ord") / 10).cast(pl.Int8).alias("rounds_played_approx_A"),
            (pl.col("round_ord") / 10).cast(pl.Int8).alias("rounds_played_approx_B"),
        ])
        print("  ‚úÖ rounds_played_approx_A/B (approximation)")
    
    # --- is_qualifier / is_wildcard / is_lucky_loser ---
    print("\n[A.3] Adding entry type flags...")
    
    # Check if entry columns exist (they might come from other sources)
    for suffix in ["_A", "_B"]:
        # Default to 0 if not available
        for flag in ["is_qualifier", "is_wildcard", "is_lucky_loser"]:
            col_name = f"{flag}{suffix}"
            if col_name not in df.columns:
                df = df.with_columns([pl.lit(0).cast(pl.Int8).alias(col_name)])
    print("  ‚úÖ is_qualifier/wildcard/lucky_loser (defaulted to 0)")
    
    # --- fatigue_qualifs ---
    print("\n[A.4] Adding fatigue_qualifs...")
    
    # Si on a des colonnes qualifs
    for suffix in ["_A", "_B"]:
        q_minutes_col = f"p_q_minutes{suffix}"
        if q_minutes_col in df.columns:
            df = df.with_columns([
                pl.col(q_minutes_col).fill_null(0).cast(pl.Float32).alias(f"fatigue_qualifs{suffix}")
            ])
        else:
            df = df.with_columns([pl.lit(0.0).cast(pl.Float32).alias(f"fatigue_qualifs{suffix}")])
    print("  ‚úÖ fatigue_qualifs_A/B")
    
    # --- r20_win_rate_vs_top10 ---
    # Check if SOTA version already exists (from Preprocess 2.1b)
    print("\n[A.5] Checking r20_win_rate_vs_top10...")
    
    for suffix in ["_A", "_B"]:
        col_name = f"r20_win_rate_vs_top10{suffix}"
        
        if col_name in df.columns:
            coverage = df[col_name].is_not_null().mean()
            print(f"  ‚úÖ {col_name} already exists (SOTA, coverage: {coverage:.1%})")
        else:
            # Try to find a suitable proxy
            win_rate_col = None
            for candidate in [f"win_rate_20{suffix}", f"win_rate_r20{suffix}", f"r20_win_rate{suffix}"]:
                if candidate in df.columns:
                    win_rate_col = candidate
                    break
            
            if win_rate_col:
                df = df.with_columns([
                    pl.col(win_rate_col).fill_null(0.5).cast(pl.Float32).alias(col_name)
                ])
                print(f"  ‚ö†Ô∏è {col_name} (approximated from {win_rate_col})")
            else:
                df = df.with_columns([pl.lit(None).cast(pl.Float32).alias(col_name)])
                print(f"  ‚ö†Ô∏è {col_name} (no proxy found, will be imputed later)")
    
    n_after = len(df.columns)
    print(f"\n  Cellule A: {n_after - n_before} features added")
    
    return df


# ===============================================
# SECTION 2B: CELLULE B - Cyclic & Fatigue Index
# ===============================================

def add_cellule_b_features(df: pl.DataFrame) -> pl.DataFrame:
    """
    Cellule B: Encodage cyclique temporel + Index de fatigue cumul√©e.
    """
    
    print("\n" + "=" * 70)
    print("   CELLULE B: CYCLIC ENCODING & FATIGUE INDEX")
    print("=" * 70)
    
    n_before = len(df.columns)
    
    # --- Cyclic temporal encoding ---
    print("\n[B.1] Adding cyclic temporal features...")
    
    if "tourney_date_ta" in df.columns:
        # Convert to datetime if needed
        if df["tourney_date_ta"].dtype in [pl.Int64, pl.Int32]:
            df = df.with_columns([
                pl.col("tourney_date_ta").cast(pl.Utf8).str.to_datetime("%Y%m%d").alias("_dt_temp")
            ])
        else:
            df = df.with_columns([pl.col("tourney_date_ta").alias("_dt_temp")])
        
        # Extract components
        df = df.with_columns([
            pl.col("_dt_temp").dt.month().alias("_month"),
            pl.col("_dt_temp").dt.week().alias("_week"),
            pl.col("_dt_temp").dt.weekday().alias("_dow"),
        ])
        
        # Cyclic encoding
        df = df.with_columns([
            (PI2 * pl.col("_month") / 12).sin().cast(pl.Float32).alias("month_sin"),
            (PI2 * pl.col("_month") / 12).cos().cast(pl.Float32).alias("month_cos"),
            (PI2 * pl.col("_week") / 52).sin().cast(pl.Float32).alias("week_sin"),
            (PI2 * pl.col("_week") / 52).cos().cast(pl.Float32).alias("week_cos"),
            (PI2 * pl.col("_dow") / 7).sin().cast(pl.Float32).alias("dow_sin"),
            (PI2 * pl.col("_dow") / 7).cos().cast(pl.Float32).alias("dow_cos"),
            ((pl.col("_week").clip(1, 48) - 1) / 47).cast(pl.Float32).alias("season_progress"),
        ])
        
        df = df.drop(["_dt_temp", "_month", "_week", "_dow"])
        print("  ‚úÖ month_sin/cos, week_sin/cos, dow_sin/cos, season_progress")
    else:
        print("  ‚ö†Ô∏è tourney_date_ta not found, skipping cyclic encoding")
    
    # --- Cumulative fatigue index ---
    print("\n[B.2] Adding cumulative fatigue index...")
    
    for suffix in ["_A", "_B"]:
        fatigue_components = []
        component_names = []
        
        # Matches last 7d (30%)
        matches_7d_candidates = [f"matches_last_7d{suffix}", f"matches_7d{suffix}"]
        matches_7d_col = next((c for c in matches_7d_candidates if c in df.columns), None)
        if matches_7d_col:
            fatigue_components.append((pl.col(matches_7d_col).fill_null(0) / 5).clip(0, 1) * 0.30)
            component_names.append("matches_7d")
        
        # Minutes last 7d (30%)
        minutes_7d_candidates = [f"minutes_last_7d{suffix}", f"minutes_7d{suffix}"]
        minutes_7d_col = next((c for c in minutes_7d_candidates if c in df.columns), None)
        if minutes_7d_col:
            fatigue_components.append((pl.col(minutes_7d_col).fill_null(0) / 600).clip(0, 1) * 0.30)
            component_names.append("minutes_7d")
        
        # Minutes last 30d (20%)
        minutes_30d_candidates = [f"minutes_last_30d{suffix}", f"minutes_30d{suffix}"]
        minutes_30d_col = next((c for c in minutes_30d_candidates if c in df.columns), None)
        if minutes_30d_col:
            fatigue_components.append((pl.col(minutes_30d_col).fill_null(0) / 2000).clip(0, 1) * 0.20)
            component_names.append("minutes_30d")
        
        # Days since last (inverse, 20%)
        days_candidates = [f"days_since_last{suffix}", f"days_since_last_ff{suffix}", f"days_rest{suffix}"]
        days_col = next((c for c in days_candidates if c in df.columns), None)
        if days_col:
            fatigue_components.append((1 - (pl.col(days_col).fill_null(7) / 14).clip(0, 1)) * 0.20)
            component_names.append("days_rest_inv")
        
        # Fatigue qualifs (bonus)
        fatigue_qualifs_col = f"fatigue_qualifs{suffix}"
        if fatigue_qualifs_col in df.columns:
            fatigue_components.append((pl.col(fatigue_qualifs_col).fill_null(0) / 300).clip(0, 1) * 0.10)
            component_names.append("qualifs")
        
        if fatigue_components:
            fatigue_sum = fatigue_components[0]
            for expr in fatigue_components[1:]:
                fatigue_sum = fatigue_sum + expr
            
            df = df.with_columns([
                fatigue_sum.clip(0, 1).cast(pl.Float32).alias(f"cumulative_fatigue_index{suffix}"),
                (fatigue_sum > 0.6).cast(pl.Int8).alias(f"is_high_fatigue{suffix}"),
                (fatigue_sum < 0.2).cast(pl.Int8).alias(f"is_fresh{suffix}"),
            ])
            print(f"  ‚úÖ cumulative_fatigue_index{suffix} (components: {', '.join(component_names)})")
        else:
            # Default neutral values
            df = df.with_columns([
                pl.lit(0.3).cast(pl.Float32).alias(f"cumulative_fatigue_index{suffix}"),
                pl.lit(0).cast(pl.Int8).alias(f"is_high_fatigue{suffix}"),
                pl.lit(0).cast(pl.Int8).alias(f"is_fresh{suffix}"),
            ])
            print(f"  ‚ö†Ô∏è cumulative_fatigue_index{suffix} (no components found, defaulted)")
    
    # Fatigue advantage
    df = df.with_columns([
        (pl.col("cumulative_fatigue_index_B") - pl.col("cumulative_fatigue_index_A"))
        .cast(pl.Float32).alias("fatigue_advantage_A")
    ])
    print("  ‚úÖ fatigue_advantage_A")
    
    n_after = len(df.columns)
    print(f"\n  Cellule B: {n_after - n_before} features added")
    
    return df


# ===============================================
# SECTION 2C: CELLULE C - Players Enrichment
# ===============================================

def add_cellule_c_features(df: pl.DataFrame) -> pl.DataFrame:
    """
    Cellule C: Enrichissement avec donn√©es joueurs (age, height, handed, etc.)
    """
    
    print("\n" + "=" * 70)
    print("   CELLULE C: PLAYERS ENRICHMENT")
    print("=" * 70)
    
    n_before = len(df.columns)
    
    # Try to load players master
    players = None
    
    if PLAYERS_MASTER_PARQUET.exists():
        print(f"\n  Loading players from {PLAYERS_MASTER_PARQUET}...")
        try:
            players = pl.read_parquet(PLAYERS_MASTER_PARQUET)
        except Exception as e:
            print(f"  ‚ö†Ô∏è Error loading parquet: {e}")
    
    if players is None and PLAYERS_MASTER_FILE.exists():
        print(f"\n  Loading players from {PLAYERS_MASTER_FILE}...")
        try:
            players = pl.read_csv(PLAYERS_MASTER_FILE, infer_schema_length=10000)
        except Exception as e:
            print(f"  ‚ö†Ô∏è Error loading CSV: {e}")
    
    if players is not None:
        print(f"  Loaded {len(players)} players")
        print(f"  Columns: {players.columns[:10]}...")
        
        # Normalize players data
        # Find ID column
        id_col = None
        for candidate in ["atp_id_ref", "generated_id", "player_id", "id"]:
            if candidate in players.columns:
                id_col = candidate
                break
        
        if id_col:
            print(f"  Using ID column: {id_col}")
            
            # Prepare player features
            player_features = [id_col]
            rename_map = {id_col: "player_id_join"}
            
            # Birth date -> for age calculation
            if "birth_date" in players.columns:
                players = players.with_columns([
                    pl.col("birth_date").str.to_datetime("%Y-%m-%d", strict=False).alias("birth_date_dt")
                ])
                player_features.append("birth_date_dt")
            
            # Height
            if "height_cm" in players.columns:
                players = players.with_columns([
                    pl.col("height_cm").cast(pl.Float32)
                ])
                player_features.append("height_cm")
            
            # Weight
            if "weight_kg" in players.columns:
                players = players.with_columns([
                    pl.col("weight_kg").cast(pl.Float32)
                ])
                player_features.append("weight_kg")
            
            # Handed
            if "plays_hand" in players.columns:
                players = players.with_columns([
                    pl.when(pl.col("plays_hand").str.to_lowercase().str.contains("left"))
                    .then(pl.lit("L"))
                    .when(pl.col("plays_hand").str.to_lowercase().str.contains("right"))
                    .then(pl.lit("R"))
                    .otherwise(pl.lit(None))
                    .alias("handed")
                ])
                player_features.append("handed")
            
            # Backhand
            if "plays_backhand" in players.columns:
                players = players.with_columns([
                    pl.when(pl.col("plays_backhand").str.to_lowercase().str.contains("two|2"))
                    .then(pl.lit("2H"))
                    .when(pl.col("plays_backhand").str.to_lowercase().str.contains("one|1"))
                    .then(pl.lit("1H"))
                    .otherwise(pl.lit(None))
                    .alias("backhand")
                ])
                player_features.append("backhand")
            
            # Pro year
            if "pro_year" in players.columns:
                players = players.with_columns([
                    pl.col("pro_year").cast(pl.Float32)
                ])
                player_features.append("pro_year")
            
            # Select and deduplicate
            players_subset = players.select([c for c in player_features if c in players.columns]).unique(id_col)
            players_subset = players_subset.rename({id_col: "player_id_join"})
            
            # Join for winner (A) and loser (B)
            for suffix, id_source in [("_A", "winner_id"), ("_B", "loser_id")]:
                if id_source in df.columns:
                    # Prepare join
                    players_for_join = players_subset.clone()
                    
                    # Rename columns with suffix
                    for col in players_for_join.columns:
                        if col != "player_id_join":
                            players_for_join = players_for_join.rename({col: f"{col}{suffix}"})
                    
                    # Join
                    df = df.join(
                        players_for_join,
                        left_on=id_source,
                        right_on="player_id_join",
                        how="left"
                    )
            
            print("  ‚úÖ Player features joined")
            
            # Calculate derived features
            if "tourney_date_ta" in df.columns:
                # Get date expression
                if df["tourney_date_ta"].dtype in [pl.Int64, pl.Int32]:
                    date_expr = pl.col("tourney_date_ta").cast(pl.Utf8).str.to_datetime("%Y%m%d")
                    year_expr = (pl.col("tourney_date_ta") // 10000).cast(pl.Float32)
                else:
                    date_expr = pl.col("tourney_date_ta")
                    year_expr = pl.col("tourney_date_ta").dt.year().cast(pl.Float32)
                
                for suffix in ["_A", "_B"]:
                    # Age at match
                    birth_col = f"birth_date_dt{suffix}"
                    if birth_col in df.columns:
                        df = df.with_columns([
                            ((date_expr - pl.col(birth_col)).dt.total_days() / 365.25)
                            .clip(13, 55)
                            .cast(pl.Float32)
                            .alias(f"age_at_match{suffix}")
                        ])
                        print(f"  ‚úÖ age_at_match{suffix}")
                    
                    # Pro career length
                    pro_year_col = f"pro_year{suffix}"
                    if pro_year_col in df.columns:
                        df = df.with_columns([
                            (year_expr - pl.col(pro_year_col))
                            .clip(0, 40)
                            .cast(pl.Float32)
                            .alias(f"pro_career_len{suffix}")
                        ])
                        print(f"  ‚úÖ pro_career_len{suffix}")
        else:
            print("  ‚ö†Ô∏è No valid ID column found in players file")
    else:
        print("  ‚ö†Ô∏è No players file found, creating default values")
        
        # Create default values
        for suffix in ["_A", "_B"]:
            df = df.with_columns([
                pl.lit(None).cast(pl.Float32).alias(f"age_at_match{suffix}"),
                pl.lit(None).cast(pl.Float32).alias(f"pro_career_len{suffix}"),
                pl.lit(None).cast(pl.Float32).alias(f"height_cm{suffix}"),
                pl.lit(None).cast(pl.Float32).alias(f"weight_kg{suffix}"),
                pl.lit(None).cast(pl.Utf8).alias(f"handed{suffix}"),
                pl.lit(None).cast(pl.Utf8).alias(f"backhand{suffix}"),
            ])
    
    n_after = len(df.columns)
    print(f"\n  Cellule C: {n_after - n_before} features added")
    
    return df


# ===============================================
# SECTION 2D: CELLULE D - Tourney Speed Index
# ===============================================

def add_cellule_d_features(df: pl.DataFrame) -> pl.DataFrame:
    """
    Cellule D: Index de vitesse du tournoi bas√© sur l'exc√®s d'aces.
    Note: Si les features SOTA existent d√©j√† (de Preprocess 2.1b), on les garde.
    """
    
    print("\n" + "=" * 70)
    print("   CELLULE D: TOURNEY SPEED INDEX")
    print("=" * 70)
    
    n_before = len(df.columns)
    
    # Check if SOTA speed index already exists
    if "tourney_speed_index" in df.columns:
        coverage = df["tourney_speed_index"].is_not_null().mean()
        print(f"\n  ‚úÖ tourney_speed_index already exists (coverage: {coverage:.1%})")
        print("     Skipping recalculation (using Preprocess 2.1b SOTA version)")
    else:
        # Check for precomputed speed index file
        speed_index_dir = DATA_CLEAN / "features" / "tourney_speed_index"
        
        if speed_index_dir.exists():
            print(f"\n  Loading precomputed speed index from {speed_index_dir}...")
            try:
                speed_df = pl.read_parquet(speed_index_dir)
                
                # Find the right columns
                speed_col = "tourney_speed_index" if "tourney_speed_index" in speed_df.columns else "speed_prior3y"
                
                if "tourney_slug_ta" in speed_df.columns and "year" in speed_df.columns:
                    speed_df = speed_df.select([
                        "tourney_slug_ta", "year",
                        pl.col(speed_col).alias("tourney_speed_index")
                    ]).unique(["tourney_slug_ta", "year"])
                    
                    df = df.join(speed_df, on=["tourney_slug_ta", "year"], how="left")
                    print(f"  ‚úÖ tourney_speed_index loaded (coverage: {df['tourney_speed_index'].is_not_null().mean():.1%})")
            except Exception as e:
                print(f"  ‚ö†Ô∏è Error loading speed index: {e}")
        
        # If still not loaded, approximate from surface
        if "tourney_speed_index" not in df.columns:
            print("\n  Approximating speed index from surface...")
            
            if "tourney_surface_ta" in df.columns:
                df = df.with_columns([
                    pl.when(pl.col("tourney_surface_ta") == "Grass").then(pl.lit(0.70))
                    .when(pl.col("tourney_surface_ta") == "Carpet").then(pl.lit(0.65))
                    .when(pl.col("tourney_surface_ta") == "Hard").then(pl.lit(0.50))
                    .when(pl.col("tourney_surface_ta") == "Clay").then(pl.lit(0.30))
                    .otherwise(pl.lit(0.50))
                    .cast(pl.Float32).alias("tourney_speed_index")
                ])
                print("  ‚ö†Ô∏è tourney_speed_index (from surface prior - less accurate)")
    
    # Check if SSI features already exist
    if "pref_ssi_A" in df.columns and "pref_ssi_B" in df.columns:
        print(f"\n  ‚úÖ pref_ssi_A/B already exist (from Preprocess 2.1b)")
        print("     Skipping SSI recalculation")
    
    # Calculate speed sensitivity per player (only if not already done)
    print("\n  Adding speed sensitivity...")
    
    for suffix in ["_A", "_B"]:
        if f"speed_sensitivity{suffix}" in df.columns:
            print(f"  ‚úÖ speed_sensitivity{suffix} already exists")
            continue
            
        # Find ace rate column
        ace_candidates = [f"r10_p_s_ace_p{suffix}", f"ace_rate_r10{suffix}", 
                         f"r10_ace_rate{suffix}", f"w_s_ace_p" if suffix == "_A" else f"l_s_ace_p"]
        ace_col = next((c for c in ace_candidates if c in df.columns), None)
        
        if ace_col:
            # Speed sensitivity = deviation from mean ace rate
            mean_ace = df.select(pl.col(ace_col).mean()).item()
            df = df.with_columns([
                (pl.col(ace_col).fill_null(mean_ace) - mean_ace)
                .cast(pl.Float32).alias(f"speed_sensitivity{suffix}")
            ])
            print(f"  ‚úÖ speed_sensitivity{suffix}")
        else:
            df = df.with_columns([pl.lit(0.0).cast(pl.Float32).alias(f"speed_sensitivity{suffix}")])
            print(f"  ‚ö†Ô∏è speed_sensitivity{suffix} (no ace rate found, defaulted to 0)")
    
    # Speed matchup interaction
    if "inter__speed_matchup" not in df.columns:
        df = df.with_columns([
            (pl.col("tourney_speed_index") * 
             (pl.col("speed_sensitivity_A") - pl.col("speed_sensitivity_B")))
            .cast(pl.Float32).alias("inter__speed_matchup")
        ])
        print("  ‚úÖ inter__speed_matchup")
    
    n_after = len(df.columns)
    print(f"\n  Cellule D: {n_after - n_before} features added")
    
    return df


# ===============================================
# SECTION 2E: CELLULE E - Meta-features
# ===============================================

def add_cellule_e_features(df: pl.DataFrame) -> pl.DataFrame:
    """
    Cellule E: Meta-features (logit transforms, SSI mismatch, gaps).
    """
    
    print("\n" + "=" * 70)
    print("   CELLULE E: META-FEATURES")
    print("=" * 70)
    
    n_before = len(df.columns)
    
    # --- Logit transform helper ---
    def safe_logit(col_expr):
        clipped = col_expr.clip(0.001, 0.999)
        return (clipped / (1 - clipped)).log()
    
    # --- BP conversion logit ---
    print("\n[E.1] Adding logit transform features...")
    
    bp_candidates = [
        ("r10_p_bp_conv_p_A", "r10_p_bp_conv_p_B"),
        ("A_r10_p_bp_conv_p", "B_r10_p_bp_conv_p"),
        ("bp_conv_pct_A", "bp_conv_pct_B"),
    ]
    for col_a, col_b in bp_candidates:
        if col_a in df.columns and col_b in df.columns:
            df = df.with_columns([
                (safe_logit(pl.col(col_a)) - safe_logit(pl.col(col_b)))
                .cast(pl.Float32).alias("diff__clutch_bpconv_logit")
            ])
            print("  ‚úÖ diff__clutch_bpconv_logit")
            break
    else:
        df = df.with_columns([pl.lit(0.0).cast(pl.Float32).alias("diff__clutch_bpconv_logit")])
        print("  ‚ö†Ô∏è diff__clutch_bpconv_logit (defaulted, no bp_conv columns)")
    
    # --- First serve in logit ---
    fs_candidates = [
        ("r10_p_s_1stIn_p_A", "r10_p_s_1stIn_p_B"),
        ("A_r10_p_s_1stIn_p", "B_r10_p_s_1stIn_p"),
    ]
    for col_a, col_b in fs_candidates:
        if col_a in df.columns and col_b in df.columns:
            df = df.with_columns([
                (safe_logit(pl.col(col_a)) - safe_logit(pl.col(col_b)))
                .cast(pl.Float32).alias("diff__first_in_consistency")
            ])
            print("  ‚úÖ diff__first_in_consistency")
            break
    else:
        df = df.with_columns([pl.lit(0.0).cast(pl.Float32).alias("diff__first_in_consistency")])
    
    # --- DF pressure ---
    df_candidates = [
        ("r10_p_s_df_p_A", "r10_p_s_df_p_B"),
        ("A_r10_p_s_df_p", "B_r10_p_s_df_p"),
    ]
    for col_a, col_b in df_candidates:
        if col_a in df.columns and col_b in df.columns:
            df = df.with_columns([
                (pl.col(col_b) - pl.col(col_a)).cast(pl.Float32).alias("diff__df_pressure")
            ])
            print("  ‚úÖ diff__df_pressure")
            break
    else:
        df = df.with_columns([pl.lit(0.0).cast(pl.Float32).alias("diff__df_pressure")])
    
    # --- Rally mean & style ---
    print("\n[E.2] Adding charting meta-features...")
    
    rally_candidates = [
        ("r10_chart_avg_rally_A", "r10_chart_avg_rally_B"),  # NOUVEAU - du Preprocess 2.1E
        ("r10_ch_avg_rally_len_A", "r10_ch_avg_rally_len_B"),  # Fallback ancien format
        ("A_r10_ch_avg_rally_len", "B_r10_ch_avg_rally_len"),  # Fallback autre format
    ]
    for col_a, col_b in rally_candidates:
        if col_a in df.columns and col_b in df.columns:
            df = df.with_columns([
                (pl.col(col_a) - pl.col(col_b)).cast(pl.Float32).alias("diff__rally_mean"),
                ((pl.col(col_a) + pl.col(col_b)) / 2).cast(pl.Float32).alias("style__rally_match"),
            ])
            print("  ‚úÖ diff__rally_mean, style__rally_match")
            break
    else:
        df = df.with_columns([
            pl.lit(0.0).cast(pl.Float32).alias("diff__rally_mean"),
            pl.lit(6.0).cast(pl.Float32).alias("style__rally_match"),  # ~6 shots per rally average
        ])
        print("  ‚ö†Ô∏è diff__rally_mean, style__rally_match (defaulted)")
    
    # --- Net aggression ---
    net_candidates = [
        ("r10_ch_net_won_pct_A", "r10_ch_net_won_pct_B"),
        ("A_r10_ch_net_won_pct", "B_r10_ch_net_won_pct"),
    ]
    for col_a, col_b in net_candidates:
        if col_a in df.columns and col_b in df.columns:
            df = df.with_columns([
                (pl.col(col_a) - pl.col(col_b)).cast(pl.Float32).alias("diff__net_aggression")
            ])
            print("  ‚úÖ diff__net_aggression")
            break
    else:
        df = df.with_columns([pl.lit(0.0).cast(pl.Float32).alias("diff__net_aggression")])
    # --- NOUVEAU: Features charting √©tendues ---
    print("\n[E.2b] Adding extended charting features...")
    
    # Serve pattern gap
    serve_pattern_candidates = [
        ("r10_chart_serve_wide_pct_A", "r10_chart_serve_wide_pct_B"),
    ]
    for col_a, col_b in serve_pattern_candidates:
        if col_a in df.columns and col_b in df.columns:
            df = df.with_columns([
                (pl.col(col_a) - pl.col(col_b)).cast(pl.Float32).alias("diff__serve_wide_pattern"),
                (pl.col("r10_chart_serve_t_pct_A") - pl.col("r10_chart_serve_t_pct_B"))
                .cast(pl.Float32).alias("diff__serve_t_pattern"),
            ])
            print("  ‚úÖ diff__serve_wide_pattern, diff__serve_t_pattern")
            break
    
    # Shot quality gap
    shot_candidates = [
        ("r10_chart_fh_winner_pct_A", "r10_chart_fh_winner_pct_B"),
    ]
    for col_a, col_b in shot_candidates:
        if col_a in df.columns and col_b in df.columns:
            df = df.with_columns([
                (pl.col(col_a) - pl.col(col_b)).cast(pl.Float32).alias("diff__fh_winner"),
                (pl.col("r10_chart_bh_winner_pct_A") - pl.col("r10_chart_bh_winner_pct_B"))
                .cast(pl.Float32).alias("diff__bh_winner"),
            ])
            print("  ‚úÖ diff__fh_winner, diff__bh_winner")
            break
    
    # Pressure performance gap
    pressure_candidates = [
        ("r10_chart_bp_ptsw_pct_A", "r10_chart_bp_ptsw_pct_B"),
    ]
    for col_a, col_b in pressure_candidates:
        if col_a in df.columns and col_b in df.columns:
            df = df.with_columns([
                (pl.col(col_a) - pl.col(col_b)).cast(pl.Float32).alias("diff__bp_pressure"),
                (pl.col("r10_chart_gp_ptsw_pct_A") - pl.col("r10_chart_gp_ptsw_pct_B"))
                .cast(pl.Float32).alias("diff__gp_pressure"),
            ])
            print("  ‚úÖ diff__bp_pressure, diff__gp_pressure")
            break
    # --- SSI Mismatch ---
    print("\n[E.3] Adding SSI mismatch...")
    
    # Check if SOTA pref_ssi already exists (from Preprocess 2.1b)
    if "pref_ssi_A" in df.columns and "pref_ssi_B" in df.columns:
        print("  ‚úÖ pref_ssi_A/B already exist (SOTA from Preprocess 2.1b)")
        
        # Just ensure diff_ssi_mismatch exists
        if "diff_ssi_mismatch" not in df.columns:
            if "tourney_speed_index" in df.columns:
                df = df.with_columns([
                    (
                        (pl.col("tourney_speed_index").fill_null(0) - pl.col("pref_ssi_A").fill_null(0)).abs() -
                        (pl.col("tourney_speed_index").fill_null(0) - pl.col("pref_ssi_B").fill_null(0)).abs()
                    )
                    .cast(pl.Float32).alias("diff_ssi_mismatch")
                ])
                print("  ‚úÖ diff_ssi_mismatch (computed from SOTA pref_ssi)")
    else:
        # Fallback: compute from Glicko surface ratings
        for suffix in ["_A", "_B"]:
            hard_col = f"g2_hard_rating{suffix}"
            clay_col = f"g2_clay_rating{suffix}"
            
            if hard_col in df.columns and clay_col in df.columns:
                mean_rating = (pl.col(hard_col) + pl.col(clay_col)) / 2
                df = df.with_columns([
                    ((pl.col(hard_col) - pl.col(clay_col)) / mean_rating.clip(1, None))
                    .cast(pl.Float32).alias(f"ssi_hard_vs_clay{suffix}")
                ])
                
                if "tourney_surface_ta" in df.columns:
                    df = df.with_columns([
                        pl.when(pl.col("tourney_surface_ta") == "Hard")
                        .then(-pl.col(f"ssi_hard_vs_clay{suffix}"))
                        .when(pl.col("tourney_surface_ta") == "Clay")
                        .then(pl.col(f"ssi_hard_vs_clay{suffix}"))
                        .otherwise(pl.lit(0.0))
                        .abs()
                        .cast(pl.Float32).alias(f"ssi_mismatch{suffix}")
                    ])
                
                # Use as fallback pref_ssi
                if f"pref_ssi{suffix}" not in df.columns:
                    df = df.with_columns([
                        pl.col(f"ssi_hard_vs_clay{suffix}").fill_null(0).cast(pl.Float32).alias(f"pref_ssi{suffix}")
                    ])
                print(f"  ‚úÖ ssi_hard_vs_clay{suffix}, ssi_mismatch{suffix}, pref_ssi{suffix} (fallback)")
            else:
                if f"pref_ssi{suffix}" not in df.columns:
                    df = df.with_columns([
                        pl.lit(0.0).cast(pl.Float32).alias(f"ssi_hard_vs_clay{suffix}"),
                        pl.lit(0.0).cast(pl.Float32).alias(f"ssi_mismatch{suffix}"),
                        pl.lit(0.0).cast(pl.Float32).alias(f"pref_ssi{suffix}"),
                    ])
        
        # SSI mismatch diff
        if "diff_ssi_mismatch" not in df.columns:
            if "ssi_mismatch_A" in df.columns and "ssi_mismatch_B" in df.columns:
                df = df.with_columns([
                    (pl.col("ssi_mismatch_B") - pl.col("ssi_mismatch_A"))
                    .cast(pl.Float32).alias("diff_ssi_mismatch")
                ])
            else:
                df = df.with_columns([pl.lit(0.0).cast(pl.Float32).alias("diff_ssi_mismatch")])
        print("  ‚úÖ diff_ssi_mismatch")
    
    # --- Rank interactions ---
    print("\n[E.4] Adding rank interactions...")
    
    if "winner_rank_ta" in df.columns and "loser_rank_ta" in df.columns:
        # SEULEMENT les features SYM√âTRIQUES ici (ne d√©pendent pas de qui gagne)
        df = df.with_columns([
            # both_top10: les deux joueurs sont top 10
            ((pl.col("winner_rank_ta").fill_null(9999) <= 10) &
             (pl.col("loser_rank_ta").fill_null(9999) <= 10))
            .cast(pl.Int8).alias("both_top10"),

            # both_top50: les deux joueurs sont top 50
            ((pl.col("winner_rank_ta").fill_null(9999) <= 50) &
             (pl.col("loser_rank_ta").fill_null(9999) <= 50))
            .cast(pl.Int8).alias("both_top50"),

            # rank_diff_abs: diff√©rence absolue (sym√©trique)
            (pl.col("winner_rank_ta").fill_null(100) - 
             pl.col("loser_rank_ta").fill_null(100)).abs()
            .cast(pl.Float32).alias("rank_diff_abs"),
        ])
        print("  ‚úÖ both_top10, both_top50, rank_diff_abs (symmetric - safe)")
        print("  ‚è≥ diff_log_rank, is_underdog_A ‚Üí post-shuffle")
    
    # --- Rest advantage ---
    print("\n[E.5] Adding rest advantage...")
    
    rest_candidates = [
        ("days_since_last_A", "days_since_last_B"),
        ("days_since_last_ff_A", "days_since_last_ff_B"),
    ]
    for col_a, col_b in rest_candidates:
        if col_a in df.columns and col_b in df.columns:
            df = df.with_columns([
                (pl.col(col_a) - pl.col(col_b)).cast(pl.Float32).alias("rest_advantage_A")
            ])
            print("  ‚úÖ rest_advantage_A")
            break
    else:
        df = df.with_columns([pl.lit(0.0).cast(pl.Float32).alias("rest_advantage_A")])
    
    # --- Surface Glicko gaps ---
    print("\n[E.6] Adding Glicko surface gaps...")
    
    for surface in ["hard", "clay", "grass"]:
        col_a = f"g2_{surface}_rating_A"
        col_b = f"g2_{surface}_rating_B"
        
        if col_a in df.columns and col_b in df.columns:
            df = df.with_columns([
                (pl.col(col_a) - pl.col(col_b)).cast(pl.Float32).alias(f"g2_{surface}_gap")
            ])
            print(f"  ‚úÖ g2_{surface}_gap")
    
    # Current surface advantage
    if "tourney_surface_ta" in df.columns:
        gap_mapping = []
        for surf, gap_col in [("Hard", "g2_hard_gap"), ("Clay", "g2_clay_gap"), ("Grass", "g2_grass_gap")]:
            if gap_col in df.columns:
                gap_mapping.append((surf, gap_col))
        
        if gap_mapping:
            expr = pl.lit(0.0)
            for surf, gap_col in gap_mapping:
                expr = pl.when(pl.col("tourney_surface_ta") == surf).then(pl.col(gap_col)).otherwise(expr)
            df = df.with_columns([expr.cast(pl.Float32).alias("current_surface_advantage")])
            print("  ‚úÖ current_surface_advantage")
    
    # --- Mental/Clutch gaps ---
    print("\n[E.7] Adding mental/clutch gaps...")
    
    gap_pairs = [
        ("mental_toughness_score_A", "mental_toughness_score_B", "mental_gap"),
        ("clutch_score_A", "clutch_score_B", "clutch_gap"),
        ("upset_rate_r20_A", "upset_rate_r20_B", "upset_rate_gap"),
        ("comeback_rate_r20_A", "comeback_rate_r20_B", "comeback_rate_gap"),
        ("win_streak_current_A", "win_streak_current_B", "win_streak_gap"),
    ]
    for col_a, col_b, new_col in gap_pairs:
        if col_a in df.columns and col_b in df.columns:
            df = df.with_columns([
                (pl.col(col_a) - pl.col(col_b)).cast(pl.Float32).alias(new_col)
            ])
            print(f"  ‚úÖ {new_col}")
    
    n_after = len(df.columns)
    print(f"\n  Cellule E: {n_after - n_before} features added")
    
    return df


# ===============================================
# SECTION 3: SHUFFLE A/B
# ===============================================

def shuffle_ab(df: pl.DataFrame, seed: int = 42) -> pl.DataFrame:
    """Shuffle A/B pour avoir target 50/50."""
    
    print("\n" + "=" * 70)
    print("   SECTION 3: SHUFFLE A/B")
    print("=" * 70)
    
    n_rows = len(df)
    
    np.random.seed(seed)
    swap_array = np.random.rand(n_rows) < 0.5
    n_swapped = swap_array.sum()
    
    print(f"  Swapping {n_swapped:,} rows ({100*n_swapped/n_rows:.1f}%)")
    
    df = df.with_columns([pl.Series("_swap_mask", swap_array)])
    
    # Identify paired columns
    cols_A = [c for c in df.columns if c.endswith("_A")]
    cols_B = [c for c in df.columns if c.endswith("_B")]
    
    bases_A = {c[:-2] for c in cols_A}
    bases_B = {c[:-2] for c in cols_B}
    paired_bases = bases_A & bases_B
    
    print(f"  Paired features to swap: {len(paired_bases)}")
    
    # Build swap expressions
    swap_exprs = []
    
    for base in paired_bases:
        col_a = f"{base}_A"
        col_b = f"{base}_B"
        
        if col_a in df.columns and col_b in df.columns:
            swap_exprs.extend([
                pl.when(pl.col("_swap_mask")).then(pl.col(col_b)).otherwise(pl.col(col_a)).alias(f"_new_{col_a}"),
                pl.when(pl.col("_swap_mask")).then(pl.col(col_a)).otherwise(pl.col(col_b)).alias(f"_new_{col_b}"),
            ])
    
    # Swap winner_id <-> loser_id
    if "winner_id" in df.columns and "loser_id" in df.columns:
        swap_exprs.extend([
            pl.when(pl.col("_swap_mask")).then(pl.col("loser_id")).otherwise(pl.col("winner_id")).alias("_new_winner_id"),
            pl.when(pl.col("_swap_mask")).then(pl.col("winner_id")).otherwise(pl.col("loser_id")).alias("_new_loser_id"),
        ])
    
    if swap_exprs:
        df = df.with_columns(swap_exprs)
    
    # Replace columns
    rename_map = {}
    drop_cols = []
    
    for base in paired_bases:
        col_a = f"{base}_A"
        col_b = f"{base}_B"
        new_a = f"_new_{col_a}"
        new_b = f"_new_{col_b}"
        
        if new_a in df.columns:
            drop_cols.extend([col_a, col_b])
            rename_map[new_a] = col_a
            rename_map[new_b] = col_b
    
    if "_new_winner_id" in df.columns:
        drop_cols.extend(["winner_id", "loser_id"])
        rename_map["_new_winner_id"] = "winner_id"
        rename_map["_new_loser_id"] = "loser_id"
    
    df = df.drop([c for c in drop_cols if c in df.columns])
    df = df.rename(rename_map)
    
    # Invert diff columns
    diff_cols = [c for c in df.columns if c.startswith("diff")]
    if diff_cols:
        invert_exprs = [
            pl.when(pl.col("_swap_mask")).then(-pl.col(c)).otherwise(pl.col(c)).alias(c)
            for c in diff_cols
        ]
        df = df.with_columns(invert_exprs)
    
    # Update target
    df = df.with_columns([
        pl.when(pl.col("_swap_mask")).then(pl.lit(0)).otherwise(pl.lit(1))
        .cast(pl.Int8).alias("target_A_wins")
    ])
    
    df = df.drop("_swap_mask")
    
    # Verify balance
    target_dist = df.group_by("target_A_wins").len().sort("target_A_wins")
    print(f"\n  Target distribution:")
    for row in target_dist.iter_rows():
        pct = 100 * row[1] / n_rows
        print(f"    {row[0]}: {row[1]:,} ({pct:.1f}%)")
    
    return df

# ==============================================================================
# POST-SHUFFLE RANK FEATURES (NO LEAKAGE)
# ==============================================================================

def add_post_shuffle_rank_features(df: pl.DataFrame) -> pl.DataFrame:
    """
    Calcule les features de rang APR√àS le shuffle pour √©viter le leakage.
    
    Apr√®s shuffle_ab():
    - target_A_wins = 1 ‚Üí A = winner original (pas de swap)
    - target_A_wins = 0 ‚Üí A = loser original (swap effectu√©)
    
    Cette fonction cr√©e rank_A et rank_B en tenant compte du swap,
    puis calcule diff_log_rank et is_underdog_A correctement.
    """
    
    print("\n" + "=" * 70)
    print("   POST-SHUFFLE RANK FEATURES (NO LEAKAGE)")
    print("=" * 70)
    
    # V√©rifications
    if "winner_rank_ta" not in df.columns or "loser_rank_ta" not in df.columns:
        print("  ‚ö†Ô∏è winner_rank_ta/loser_rank_ta not found, skipping")
        return df
    
    if "target_A_wins" not in df.columns:
        print("  ‚ö†Ô∏è target_A_wins not found (shuffle not done?), skipping")
        return df
    
    # Cr√©er rank_A et rank_B selon le swap
    df = df.with_columns([
        # rank_A: rang du joueur en position A
        pl.when(pl.col("target_A_wins") == 1)
          .then(pl.col("winner_rank_ta"))  # Pas de swap: A = winner
          .otherwise(pl.col("loser_rank_ta"))  # Swap: A = loser
          .alias("rank_A"),
        
        # rank_B: rang du joueur en position B
        pl.when(pl.col("target_A_wins") == 1)
          .then(pl.col("loser_rank_ta"))  # Pas de swap: B = loser
          .otherwise(pl.col("winner_rank_ta"))  # Swap: B = winner
          .alias("rank_B"),
    ])
    
    # Calculer les features d√©riv√©es
    df = df.with_columns([
        # diff_log_rank: log(rank_B) - log(rank_A)
        # Positif si A est mieux class√©
        (pl.col("rank_B").clip(1, 2000).log() - 
         pl.col("rank_A").clip(1, 2000).log())
        .cast(pl.Float32)
        .alias("diff_log_rank"),
        
        # is_underdog_A: 1 si A est un gros outsider
        (pl.col("rank_A").fill_null(9999) > 
         pl.col("rank_B").fill_null(1) * 2)
        .cast(pl.Int8)
        .alias("is_underdog_A"),
        
        # diff_rank_normalized: √©cart normalis√©
        ((pl.col("rank_A").fill_null(100) - 
          pl.col("rank_B").fill_null(100)) / 100)
        .clip(-5, 5)
        .cast(pl.Float32)
        .alias("diff_rank_normalized"),
    ])
    
    # Statistiques
    print(f"  ‚úÖ rank_A, rank_B created")
    print(f"  ‚úÖ diff_log_rank created")
    print(f"  ‚úÖ is_underdog_A created")
    print(f"  ‚úÖ diff_rank_normalized created")
    
    # Check anti-leakage
    corr = df.select(pl.corr("target_A_wins", "diff_log_rank")).item()
    print(f"\n  üîç Anti-leakage: corr(target, diff_log_rank) = {corr:.4f}")
    
    if abs(corr) > 0.5:
        print("  ‚ö†Ô∏è WARNING: High correlation! Check for leakage.")
    else:
        print("  ‚úÖ Correlation OK - no obvious leakage")
    
    return df

# ==============================================================================
# POST-SHUFFLE ODDS CONVERSION (NO LEAKAGE)
# ==============================================================================

def convert_odds_to_AB(df: pl.DataFrame) -> pl.DataFrame:
    """
    Convertit les odds winner/loser ‚Üí A/B apr√®s le shuffle.
    
    DOIT √™tre appel√© APR√àS shuffle_ab() car on utilise target_A_wins
    pour savoir qui est A et qui est B.
    """
    
    print("\n" + "=" * 70)
    print("   POST-SHUFFLE ODDS CONVERSION")
    print("=" * 70)
    
    if "odds_implied_prob_winner" not in df.columns:
        print("  ‚ö†Ô∏è No odds columns found, skipping")
        return df
    
    # Convertir winner/loser ‚Üí A/B selon le shuffle
    # target_A_wins = 1 ‚Üí A = winner (pas de swap)
    # target_A_wins = 0 ‚Üí A = loser (swap effectu√©)
    
    df = df.with_columns([
        # Probabilit√©s implicites
        pl.when(pl.col("target_A_wins") == 1)
          .then(pl.col("odds_implied_prob_winner"))
          .otherwise(pl.col("odds_implied_prob_loser"))
          .cast(pl.Float32)
          .alias("odds_implied_prob_A"),
        
        pl.when(pl.col("target_A_wins") == 1)
          .then(pl.col("odds_implied_prob_loser"))
          .otherwise(pl.col("odds_implied_prob_winner"))
          .cast(pl.Float32)
          .alias("odds_implied_prob_B"),
        
        # Cotes brutes (si disponibles)
        pl.when(pl.col("target_A_wins") == 1)
          .then(pl.col("odds_winner"))
          .otherwise(pl.col("odds_loser"))
          .cast(pl.Float32)
          .alias("odds_A"),
        
        pl.when(pl.col("target_A_wins") == 1)
          .then(pl.col("odds_loser"))
          .otherwise(pl.col("odds_winner"))
          .cast(pl.Float32)
          .alias("odds_B"),
    ])
    
    # Features d√©riv√©es
    df = df.with_columns([
        # Diff√©rence de probabilit√© implicite
        (pl.col("odds_implied_prob_A") - pl.col("odds_implied_prob_B"))
        .cast(pl.Float32)
        .alias("diff_odds_implied_prob"),
        
        # Log odds ratio (utile pour le mod√®le)
        (pl.col("odds_B").log() - pl.col("odds_A").log())
        .cast(pl.Float32)
        .alias("log_odds_ratio_AB"),
    ])
    
    # Supprimer les colonnes winner/loser (√©viter confusion)
    cols_to_drop = ["odds_winner", "odds_loser", 
                    "odds_implied_prob_winner", "odds_implied_prob_loser"]
    df = df.drop([c for c in cols_to_drop if c in df.columns])
    
    # Stats
    n_with_odds = df["odds_implied_prob_A"].is_not_null().sum()
    print(f"  ‚úÖ Odds converted to A/B format")
    print(f"  üìä Matchs with odds: {n_with_odds:,}/{len(df):,} ({100*n_with_odds/len(df):.1f}%)")
    
    # V√©rification anti-leakage
    corr = df.select(pl.corr("target_A_wins", "odds_implied_prob_A")).item()
    print(f"  üîç Anti-leakage: corr(target, odds_implied_prob_A) = {corr:.4f}")
    
    if abs(corr) > 0.5:
        print("  ‚ö†Ô∏è WARNING: High correlation detected!")
    else:
        print("  ‚úÖ Correlation OK - no obvious leakage")
    
    return df
    
# ===============================================
# SECTION 4: TEMPORAL SPLIT
# ===============================================

def temporal_split(df: pl.DataFrame) -> tuple:
    """Split temporel strict."""
    
    print("\n" + "=" * 70)
    print("   SECTION 4: TEMPORAL SPLIT")
    print("=" * 70)
    
    train = df.filter(pl.col("year") <= TRAIN_END_YEAR)
    val = df.filter((pl.col("year") > TRAIN_END_YEAR) & (pl.col("year") <= VAL_END_YEAR))
    test = df.filter(pl.col("year") > VAL_END_YEAR)
    
    print(f"\n  Train: {len(train):,} rows (‚â§{TRAIN_END_YEAR})")
    print(f"  Val:   {len(val):,} rows ({TRAIN_END_YEAR+1}-{VAL_END_YEAR})")
    print(f"  Test:  {len(test):,} rows (>{VAL_END_YEAR})")
    
    return train, val, test


# ===============================================
# SECTION 5: FEATURE SELECTION
# ===============================================

def select_features(train: pl.DataFrame, val: pl.DataFrame, test: pl.DataFrame) -> tuple:
    """Feature selection with SOTA features protection."""
    
    print("\n" + "=" * 70)
    print("   SECTION 5: FEATURE SELECTION")
    print("=" * 70)
    
    exclude_cols = ["target_A_wins", "year", "tourney_date_ta", "gender", 
                    "tourney_surface_ta", "tourney_level_ta", "round_ta",
                    "best_of_ta", "is_indoor", "match_status"]
    exclude_cols += ID_COLS
    
    numeric_types = [pl.Float32, pl.Float64, pl.Int8, pl.Int16, pl.Int32, pl.Int64]
    numeric_cols = [c for c in train.columns 
                    if c not in exclude_cols
                    and train[c].dtype in numeric_types]
    
    print(f"\n[5.1] Initial numeric features: {len(numeric_cols)}")
    
    # Identify protected SOTA features that exist in data
    protected_in_data = [c for c in PROTECTED_SOTA_FEATURES if c in train.columns]
    print(f"[5.1b] Protected SOTA features found: {len(protected_in_data)}")
    for f in protected_in_data:
        coverage = train[f].is_not_null().mean()
        print(f"       {f}: {coverage:.1%} coverage")
    
    # Separate protected from regular features
    regular_cols = [c for c in numeric_cols if c not in protected_in_data]
    
    # Remove high null rate (only for non-protected features)
    null_exprs = [pl.col(c).is_null().mean().alias(c) for c in regular_cols]
    null_rates_df = train.select(null_exprs)
    null_rates = {c: null_rates_df[c][0] for c in regular_cols}
    
    keep_cols = [c for c in regular_cols if (null_rates[c] or 0) <= MAX_NULL_RATE]
    print(f"[5.2] After null filter (regular): {len(keep_cols)}")
    
    # Remove near-constant (only for non-protected features)
    var_exprs = [pl.col(c).var().alias(c) for c in keep_cols]
    var_df = train.select(var_exprs)
    variances = {c: var_df[c][0] for c in keep_cols}
    
    keep_cols = [c for c in keep_cols if (variances[c] or 0) >= MIN_VARIANCE_THRESHOLD]
    print(f"[5.3] After variance filter (regular): {len(keep_cols)}")
    
    # Feature importance via correlation
    correlations = {}
    batch_size = 50
    for i in range(0, len(keep_cols), batch_size):
        batch_cols = keep_cols[i:i+batch_size]
        corr_exprs = [pl.corr("target_A_wins", c).alias(c) for c in batch_cols]
        corr_df = train.select(corr_exprs)
        for c in batch_cols:
            corr_val = corr_df[c][0]  # Renamed to avoid shadowing 'val' DataFrame
            correlations[c] = abs(corr_val) if corr_val is not None and not np.isnan(corr_val) else 0.0
    
    # Also compute correlations for protected features (for reporting)
    for c in protected_in_data:
        try:
            corr_val = train.select(pl.corr("target_A_wins", c))[0, 0]
            correlations[c] = abs(corr_val) if corr_val is not None and not np.isnan(corr_val) else 0.0
        except:
            correlations[c] = 0.0
    
    sorted_features = sorted(correlations.items(), key=lambda x: x[1], reverse=True)
    
    print(f"\n  Top 10 by correlation:")
    for feat, corr in sorted_features[:10]:
        protected_mark = " [SOTA]" if feat in protected_in_data else ""
        print(f"    {feat}: {corr:.4f}{protected_mark}")
    
    # Select top N regular features + ALL protected SOTA features
    n_regular = MAX_FEATURES - len(protected_in_data)
    regular_selected = [f for f, _ in sorted_features if f not in protected_in_data][:n_regular]
    
    selected_features = protected_in_data + regular_selected
    print(f"\n  Selected: {len(selected_features)} features")
    print(f"    - Protected SOTA: {len(protected_in_data)}")
    print(f"    - Regular (top by corr): {len(regular_selected)}")
    
    # Apply selection
    keep_all = ["target_A_wins", "year"] + [c for c in ID_COLS if c in train.columns] + selected_features
    keep_all = list(dict.fromkeys([c for c in keep_all if c in train.columns]))
    
    train_sel = train.select(keep_all)
    val_sel = val.select([c for c in keep_all if c in val.columns])
    test_sel = test.select([c for c in keep_all if c in test.columns])
    
    return train_sel, val_sel, test_sel, selected_features


# ===============================================
# SECTION 6: SCALING
# ===============================================

def apply_scaling(train: pl.DataFrame, val: pl.DataFrame, test: pl.DataFrame, 
                  feature_cols: list) -> tuple:
    """Applique QuantileTransformer avec imputation sp√©ciale pour SOTA features."""
    
    print("\n" + "=" * 70)
    print("   SECTION 6: QUANTILE SCALING")
    print("=" * 70)
    
    from sklearn.preprocessing import QuantileTransformer
    import joblib
    
    numeric_types = [pl.Float32, pl.Float64, pl.Int32, pl.Int64, pl.Int16, pl.Int8]
    scale_cols = [c for c in feature_cols 
                  if c not in ID_COLS + ["target_A_wins", "year"]
                  and c in train.columns
                  and train[c].dtype in numeric_types]
    
    print(f"\n  Scaling {len(scale_cols)} features")
    
    # Compute medians for regular imputation
    median_exprs = [pl.col(c).median().alias(c) for c in scale_cols]
    medians_df = train.select(median_exprs)
    medians = {c: medians_df[c][0] for c in scale_cols}
    
    # Special handling for SOTA features with high null rates
    # Use sensible defaults instead of median (which might be null)
    sota_defaults = {
        "r20_win_rate_vs_top10_A": 0.3,  # Below average (most players lose vs top10)
        "r20_win_rate_vs_top10_B": 0.3,
        "r20_win_rate_vs_top50_A": 0.4,
        "r20_win_rate_vs_top50_B": 0.4,
        "r20_upset_rate_A": 0.3,
        "r20_upset_rate_B": 0.3,
        "pref_ssi_A": 0.0,  # Neutral preference
        "pref_ssi_B": 0.0,
        "diff_pref_ssi": 0.0,
        "diff_ssi_mismatch": 0.0,
        "tourney_speed_index": 0.0,  # Neutral speed
    }
    
    for col, default in sota_defaults.items():
        if col in medians:
            if medians[col] is None or np.isnan(medians[col]):
                medians[col] = default
                print(f"  ‚ö†Ô∏è {col}: using default {default} (high null rate)")
    
    # Impute with combined medians/defaults
    impute_exprs = [pl.col(c).fill_null(medians.get(c, 0.0)).alias(c) for c in scale_cols]
    train = train.with_columns(impute_exprs)
    val = val.with_columns([pl.col(c).fill_null(medians.get(c, 0.0)).alias(c) for c in scale_cols if c in val.columns])
    test = test.with_columns([pl.col(c).fill_null(medians.get(c, 0.0)).alias(c) for c in scale_cols if c in test.columns])
    
    # Scale
    train_np = train.select(scale_cols).to_numpy()
    
    scaler = QuantileTransformer(
        n_quantiles=min(1000, len(train)),
        output_distribution='normal',
        random_state=RANDOM_SEED
    )
    
    train_scaled = scaler.fit_transform(train_np)
    del train_np
    
    for i, col in enumerate(scale_cols):
        train = train.with_columns([pl.Series(col, train_scaled[:, i].astype(np.float32))])
    del train_scaled
    
    # Transform val/test
    for df_name, df_ref in [("val", val), ("test", test)]:
        df_cols = [c for c in scale_cols if c in df_ref.columns]
        if df_cols:
            df_np = df_ref.select(df_cols).to_numpy()
            df_full = np.zeros((len(df_ref), len(scale_cols)), dtype=np.float32)
            for i, col in enumerate(scale_cols):
                if col in df_cols:
                    df_full[:, i] = df_np[:, df_cols.index(col)]
            
            df_scaled = scaler.transform(df_full)
            del df_np, df_full
            
            for i, col in enumerate(scale_cols):
                if col in df_ref.columns:
                    if df_name == "val":
                        val = val.with_columns([pl.Series(col, df_scaled[:, i].astype(np.float32))])
                    else:
                        test = test.with_columns([pl.Series(col, df_scaled[:, i].astype(np.float32))])
            del df_scaled
    
    # Save scaler
    scaler_path = OUTPUT_DIR / "quantile_scaler.joblib"
    joblib.dump(scaler, scaler_path)
    print(f"  Scaler saved: {scaler_path}")
    
    # Save medians
    medians_path = OUTPUT_DIR / "imputation_medians.json"
    medians_serializable = {k: float(v) if v is not None else 0.0 for k, v in medians.items()}
    medians_path.write_text(json.dumps(medians_serializable, indent=2))
    
    return train, val, test


# ===============================================
# SECTION 7: EXPORT
# ===============================================

def export_datasets(train: pl.DataFrame, val: pl.DataFrame, test: pl.DataFrame, feature_cols: list):
    """Export final datasets."""
    
    print("\n" + "=" * 70)
    print("   SECTION 7: EXPORT")
    print("=" * 70)
    
    train.write_parquet(OUTPUT_DIR / "train.parquet")
    val.write_parquet(OUTPUT_DIR / "val.parquet")
    test.write_parquet(OUTPUT_DIR / "test.parquet")
    
    (OUTPUT_DIR / "feature_list.json").write_text(json.dumps(feature_cols, indent=2))
    
    metadata = {
        "created": datetime.now().isoformat(),
        "train_rows": len(train),
        "val_rows": len(val),
        "test_rows": len(test),
        "n_features": len(feature_cols),
        "train_end_year": TRAIN_END_YEAR,
        "val_end_year": VAL_END_YEAR,
        "pipeline": "preprocess3_ULTRA_GODMODE_SOTA2026",
    }
    (OUTPUT_DIR / "metadata.json").write_text(json.dumps(metadata, indent=2))
    
    print(f"\n  ‚úÖ Exported to {OUTPUT_DIR}")
    print(f"     train.parquet: {len(train):,} rows")
    print(f"     val.parquet: {len(val):,} rows")
    print(f"     test.parquet: {len(test):,} rows")

def convert_new_features_to_AB(df: pl.DataFrame) -> pl.DataFrame:
    """
    Convertit les nouvelles features GOD (PP_12-15) de winner/loser vers A/B.
    IMPORTANT: √Ä appeler APR√àS le shuffle (quand target_A_wins existe).
    """
    
    print("\n" + "=" * 70)
    print("   CONVERSION GOD FEATURES: winner/loser ‚Üí A/B")
    print("=" * 70)
    
    converted_count = 0
    
    # Features avec paires winner/loser
    winner_cols = [c for c in df.columns if c.endswith("_winner")]
    
    for winner_col in winner_cols:
        loser_col = winner_col.replace("_winner", "_loser")
        
        if loser_col not in df.columns:
            continue
        
        a_col = winner_col.replace("_winner", "_A")
        b_col = winner_col.replace("_winner", "_B")
        
        if a_col in df.columns:
            continue
        
        df = df.with_columns([
            pl.when(pl.col("target_A_wins") == 1)
                .then(pl.col(winner_col))
                .otherwise(pl.col(loser_col))
                .alias(a_col),
            
            pl.when(pl.col("target_A_wins") == 1)
                .then(pl.col(loser_col))
                .otherwise(pl.col(winner_col))
                .alias(b_col),
        ])
        converted_count += 2
    
    # Features "diff" √† inverser si swap
    diff_features = ["bt_diff", "bt_recent_diff", "bt_surface_diff",
                     "travel_advantage_winner", "timezone_advantage_winner", 
                     "rest_advantage_winner"]
    
    for col in diff_features:
        if col in df.columns:
            new_col = col.replace("_winner", "") + "_A" if "_winner" in col else col
            df = df.with_columns([
                pl.when(pl.col("target_A_wins") == 1)
                    .then(pl.col(col))
                    .otherwise(-pl.col(col))
                    .alias(new_col)
            ])
            converted_count += 1
    
    print(f"  ‚úÖ Converted {converted_count} features from winner/loser to A/B")
    
    return df
# ===============================================
# MAIN
# ===============================================

def main():
    """Pipeline complet."""
    
    print("\n" + "=" * 70)
    print("   PREPROCESS 9 - ULTRA GOD MODE SOTA 2026")
    print("   TennisTitan - COMPLETE Feature Engineering")
    print("=" * 70)
    print(f"   Timestamp: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
    print("=" * 70)
    
    t0 = time.perf_counter()
    
    # 1. Load
    df = load_dataset()
    gc.collect()
    
    # 2. Add ALL missing features (Cellules A-E)
    df = add_cellule_a_features(df)
    gc.collect()
    
    df = add_cellule_b_features(df)
    gc.collect()
    
    df = add_cellule_c_features(df)
    gc.collect()
    
    df = add_cellule_d_features(df)
    gc.collect()
    
    df = add_cellule_e_features(df)
    gc.collect()
    
    # Print feature summary
    print("\n" + "=" * 70)
    print("   FEATURE SUMMARY")
    print("=" * 70)
    print(f"  Total columns: {len(df.columns)}")
    
    # 3. Shuffle
    df = shuffle_ab(df, seed=RANDOM_SEED)
    gc.collect()

    # ========== NOUVEAU: CONVERSION GOD FEATURES ==========
    df = convert_new_features_to_AB(df)
    gc.collect()
    # ======================================================

    # ========== NOUVEAU: POST-SHUFFLE RANK FEATURES ==========
    df = add_post_shuffle_rank_features(df)
    gc.collect()
    # =========================================================

    # ========== NOUVEAU: CONVERT ODDS WINNER/LOSER ‚Üí A/B ==========
    df = convert_odds_to_AB(df)
    gc.collect()
    # ==============================================================
    
    # Split temporel
    train, val, test = temporal_split(df)
    del df
    gc.collect()
    
    # 5. Select
    train, val, test, feature_cols = select_features(train, val, test)
    gc.collect()
    
    # 6. Scale
    train, val, test = apply_scaling(train, val, test, feature_cols)
    gc.collect()
    
    # 7. Export
    export_datasets(train, val, test, feature_cols)
    
    elapsed = time.perf_counter() - t0
    
    print("\n" + "=" * 70)
    print(f"   COMPLETE! Total time: {elapsed:.1f}s")
    print("=" * 70)
    
    print("\nüìä FINAL SUMMARY:")
    print(f"   Train: {len(train):,} rows")
    print(f"   Val: {len(val):,} rows")
    print(f"   Test: {len(test):,} rows")
    print(f"   Features: {len(feature_cols)}")


if __name__ == "__main__":
    main()