In [None]:
# === Production configuration, logging, and utilities (top cell) ===
import os, logging, json, random, platform, gc
import numpy as np
import pandas as pd
from pathlib import Path

CONFIG = {
    'random_seed': 42,
    'oof_pred_vol_halflife': 10,     # days for EWMA pred volatility
    'realized_vol_halflife': 60,     # days for EWMA realized volatility
    'vol_cap_multiple': 1.2,         # cap strategy vol to X * market vol
    'risk_k_grid': [0.4, 0.6, 0.8, 1.0, 1.2, 1.4],
'neutral_band_grid': [0.0, 0.02, 0.05, 0.10, 0.15, 0.20],  # deadband in z-score units
    'artifacts_dir': 'artifacts',
    'transaction_cost_bps': 1.0,   # one-way cost in basis points, applied on allocation change
    # Feature engineering controls (kept moderate to avoid blow-up)
    'feature_lag_list': [1, 5, 10, 20],
    'rolling_windows': [5, 20, 60],
    'ewma_halflives_feat': [10, 60],
    'roc_windows': [5, 20],
    'zscore_windows': [20, 60],
    # Seasonality feature controls
    'use_date_splits': True,
    'date_min_train_days': 252,
    'date_val_days': 120,
    'seasonality_week_period': 5,
    'seasonality_month_period': 21,
    'seasonality_quarter_period': 63,
    # CV and calibration controls
    'cv_n_splits': 5,
    'fold_embargo_days': 5,
    'annualization': 252,
    # Training/Ensembling controls
    'time_decay_halflife_days': 180,     # sample weight half-life for recent data emphasis
    'eval_correlation': True,            # log corr(pred, target) per fold
    'final_topk_features': 150,          # None or int: use top-K features by gain
    'final_ensemble_size': 5,            # N_FINAL for stability
    'final_ensemble_size_xgb': 3,
    'final_ensemble_size_cat': 3,
    'use_sharpe_fobj': False,            # experimental custom objective
    'use_corr_feval': True,              # report corr in LightGBM
    'stability_filter_min_folds': 0,     # 0 disables stability filtering
    'use_stacking_meta_learner': False,  # enable later when stacking is added
    # Advanced controls
    'enable_optuna': False,
    'optuna_trials': 20,
    'optuna_timeout_min': 20,
    'optuna_folds': 1,                   # broaden later for stability
    'optuna_stability_penalty': 0.0,
    'enable_xgboost': True,
    'enable_catboost': True,
    'ensemble_weighting': 'sharpe',      # 'sharpe' or 'equal'
    'downcast_numeric': True,
    'memory_aggressive_gc': True,
}

def ensure_dir(path: str):
    Path(path).mkdir(parents=True, exist_ok=True)


def aggressive_gc(note: str = ""):
    if CONFIG.get('memory_aggressive_gc', False):
        try:
            gc.collect()
            if hasattr(np, 'memmap'):
                pass
            LOGGER.info(f'GC triggered {"("+note+")" if note else ""}')
        except Exception as _:
            pass

ensure_dir(CONFIG['artifacts_dir'])


def get_logger(name: str = 'hull_prod', level: int = logging.INFO, log_path: str | None = None) -> logging.Logger:
    logger = logging.getLogger(name)
    if not logger.handlers:
        logger.setLevel(level)
        ch = logging.StreamHandler()
        ch.setLevel(level)
        fmt = logging.Formatter('[%(asctime)s] %(levelname)s:%(name)s: %(message)s')
        ch.setFormatter(fmt)
        logger.addHandler(ch)
        if log_path:
            fh = logging.FileHandler(log_path)
            fh.setLevel(level)
            fh.setFormatter(fmt)
            logger.addHandler(fh)
    return logger

LOGGER = get_logger(log_path=os.path.join(CONFIG['artifacts_dir'], 'run.log'))

# Deterministic seeds
os.environ['PYTHONHASHSEED'] = str(CONFIG['random_seed'])
random.seed(CONFIG['random_seed'])
np.random.seed(CONFIG['random_seed'])

# Environment info (LightGBM version may be unavailable here; best-effort)
try:
    import lightgbm as lgb  # for version logging only
    lgb_ver = lgb.__version__
except Exception:
    lgb_ver = 'N/A'

LOGGER.info('Environment info:')
LOGGER.info(f"Python: {platform.python_version()}")
LOGGER.info(f"NumPy: {np.__version__}, Pandas: {pd.__version__}")
LOGGER.info(f"LightGBM: {lgb_ver}")
LOGGER.info('Production utilities configured.')

# === Volatility and Sharpe utilities (reusable) ===

def ewma_std(series: pd.Series, halflife: int) -> pd.Series:
    if series.isna().all():
        return pd.Series(index=series.index, dtype=float)
    v = series.ewm(halflife=halflife, min_periods=max(5, halflife//2)).var()
    return np.sqrt(v).fillna(method='bfill').fillna(method='ffill')


def annualized_sharpe(returns: pd.Series, days_per_year: int | None = None) -> float:
    dpy = int(CONFIG.get('annualization', 252) if days_per_year is None else days_per_year)
    mu = returns.mean() * dpy
    sd = returns.std() * np.sqrt(dpy)
    return float(mu / (sd + 1e-9))


def fast_corr(y_true: pd.Series | np.ndarray, y_pred: pd.Series | np.ndarray) -> float:
    a = np.asarray(y_true, dtype=float)
    b = np.asarray(y_pred, dtype=float)
    if a.size < 3 or b.size < 3:
        return float('nan')
    a = a - a.mean()
    b = b - b.mean()
    denom = (np.sqrt((a * a).sum()) * np.sqrt((b * b).sum())) + 1e-12
    if denom == 0:
        return 0.0
    return float((a * b).sum() / denom)


def make_time_decay_weights(n: int, halflife_days: int | None) -> np.ndarray:
    if halflife_days is None or halflife_days <= 0:
        return np.ones(n, dtype=float)
    idx = np.arange(n, dtype=float)
    dist = (n - 1) - idx
    lam = np.log(2.0) / max(1.0, float(halflife_days))
    w = np.exp(-lam * dist)
    w = np.clip(w, 1e-6, None)
    return w.astype(float)

# Inverse-volatility modifier for sample weights (stabilize corr)
def apply_inverse_vol_weights(base_weights: np.ndarray, realized: pd.Series | None, halflife: int) -> np.ndarray:
    if realized is None or realized.isna().all():
        return base_weights
    vol = ewma_std(realized.fillna(0.0), halflife)
    vol = vol.replace(0, np.nan).fillna(vol.median() if vol.notna().any() else 1.0)
    inv = 1.0 / (vol.values + 1e-6)
    inv = inv / np.nanmedian(inv)
    w = base_weights * inv[:len(base_weights)]
    return w.astype(float)


def corr_feval(preds: np.ndarray, train_data) -> tuple[str, float, bool]:
    labels = train_data.get_label()
    c = fast_corr(labels, preds)
    # higher is better
    return 'corr', float(c if np.isfinite(c) else 0.0), True


def sharpe_loss(preds: np.ndarray, train_data):
    # Experimental surrogate: minimize negative Sharpe of returns = labels * preds
    y = train_data.get_label().astype(float)
    r = y * preds
    mu = r.mean()
    sd = r.std() + 1e-9
    # d/dpred of mu = y/len, but LightGBM expects gradients per-sample without 1/n scaling
    # Here we apply average later by dividing by len(y)
    # d/dpred of sd = (r - mu) * y / (sd * len)
    grad = - ( (y / sd) - ((r - mu) * mu * y) / (sd**3) )
    grad = grad / max(len(y), 1)
    # Use small positive hessian for stability
    hess = np.full_like(preds, 1e-6, dtype=float)
    return grad, hess


# Grid-search calibration of allocation mapping parameters (k, band)
# using only the provided prediction/realized series

def calibrate_k_and_band(oof_pred: pd.Series,
                         realized: pd.Series,
                         pred_halflife: int,
                         realized_halflife: int,
                         k_grid: list,
                         band_grid: list,
                         vol_cap_multiple: float = 1.2,
                         annualization: int | None = None,
                         turnover_bps: float | None = None) -> dict:
    assert len(oof_pred) == len(realized)
    oof_pred = oof_pred.astype(float)
    realized = realized.astype(float)

    pred_sigma = ewma_std(oof_pred, pred_halflife)
    mkt_vol = ewma_std(realized, realized_halflife)

    dpy = int(CONFIG.get('annualization', 252) if annualization is None else annualization)

    best = {'score': -np.inf, 'k': None, 'band': None}

    for k in k_grid:
        z = oof_pred / (pred_sigma.replace(0, np.nan).fillna(1e-6))
        for band in band_grid:
            z_band = z.where(z.abs() >= band, 0.0)
            alloc = (1.0 + k * z_band).clip(0.0, 2.0)
            strat_ret = alloc * realized
            strat_vol = ewma_std(strat_ret, realized_halflife)
            cap = (vol_cap_multiple * mkt_vol) / (strat_vol.replace(0, np.nan).fillna(1e-6))
            cap = cap.clip(0.0, 2.0)
            alloc_scaled = (1.0 + (alloc - 1.0) * cap).clip(0.0, 2.0)
            strat_ret_scaled = alloc_scaled * realized
            # Apply turnover penalty if requested
            if turnover_bps is not None and turnover_bps > 0:
                alloc_change = alloc_scaled.diff().abs().fillna(0.0)
                tc = (turnover_bps / 10000.0) * alloc_change
                strat_ret_scaled = strat_ret_scaled - tc
            mu = strat_ret_scaled.mean() * dpy
            sd = strat_ret_scaled.std() * np.sqrt(dpy)
            score = (mu / (sd + 1e-9)) if sd > 0 else -np.inf
            if score > best['score']:
                best = {'score': float(score), 'k': float(k), 'band': float(band)}
    return best


: 

In [None]:
# === Competition tuning switches and GPU auto-detection ===
# Flip on deeper Optuna, stacking, and larger ensemble sizes within runtime safety
CONFIG.update({
    'enable_optuna': True,         # re-enable tuning with safe budget
    'optuna_folds': 2,
    'optuna_trials': 120,
    'optuna_timeout_min': 60,
    'use_stacking_meta_learner': True,
    'final_ensemble_size': 15,     # more LGB seeds
    'final_ensemble_size_xgb': 7,  # more XGB seeds
    'final_ensemble_size_cat': 0,  # keep CAT off
    'final_topk_features': 100,    # tighter feature set for stability
    'stability_filter_min_folds': 3,
    'fold_embargo_days': 10,       # more embargo for cleaner CV
    'ensemble_weighting': 'nnls',  # correlation-oriented blending
    'pred_smooth_halflife': 5,     # EWMA smoothing before calibration
    'use_gpu_if_available': False, # force CPU
    'enable_catboost': False,      # disable CatBoost to avoid runtime/driver issues
})

# GPU detection (Kaggle-friendly, no internet)
GPU_AVAILABLE = False
try:
    import os
    # Common hints in Kaggle
    if os.environ.get('NVIDIA_VISIBLE_DEVICES') not in (None, '', 'none'):
        GPU_AVAILABLE = True
    if os.environ.get('KAGGLE_ACCELERATOR_TYPE', '').upper().find('GPU') >= 0:
        GPU_AVAILABLE = True
    # Linux NVIDIA presence fallback (ignored on Windows)
    if os.name == 'posix' and not GPU_AVAILABLE:
        GPU_AVAILABLE = os.path.exists('/proc/driver/nvidia/version')
except Exception:
    GPU_AVAILABLE = False

LOGGER.info(f"GPU available: {GPU_AVAILABLE}")

# CPU fallback to keep within time limits
if not GPU_AVAILABLE:
    CONFIG.update({
        'final_ensemble_size': min(int(CONFIG.get('final_ensemble_size', 9)), 5),
        'final_ensemble_size_xgb': min(int(CONFIG.get('final_ensemble_size_xgb', 5)), 2),
        'final_ensemble_size_cat': min(int(CONFIG.get('final_ensemble_size_cat', 5)), 2),
        'optuna_trials': min(int(CONFIG.get('optuna_trials', 300)), 120),
        'optuna_timeout_min': min(int(CONFIG.get('optuna_timeout_min', 120)), 60),
        'cv_n_splits': min(int(CONFIG.get('cv_n_splits', 5)), 4),
    })
    LOGGER.info('[CPU] Reduced ensemble sizes and HPO budget for runtime safety')



In [None]:
"""
Hull Tactical — Market Prediction: Strong baseline notebook
What this notebook does
- Loads local train/test (no internet)
- Time-series-safe preprocessing and light feature engineering
- Walk-forward (expanding window) cross-validation with LightGBM
- Volatility-aware allocation mapping in [0, 2] with a practical risk cap
- Diagnostics: CV RMSE, feature importances, simple backtest and Sharpe stats
- Trains final ensemble and writes submission.csv with columns ['date_id','allocation']

Notes & cautions
- This is a reproducible, efficient baseline designed to run < 1 hour in Kaggle.
- To reach top leaderboard: engineer better features (macro, cross-asset, seasonalities),
  robust stacking/ensembling, volatility forecasting, and careful walk-forward tuning.
"""

import warnings
warnings.filterwarnings('ignore')

import os
import gc
import time
from pathlib import Path

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from sklearn.metrics import mean_squared_error
import lightgbm as lgb

SEED = CONFIG['random_seed'] if 'CONFIG' in globals() else 42
np.random.seed(SEED)

TRAIN_PATH = 'train.csv'
TEST_PATH  = 'test.csv'
EVAL_DIR   = 'kaggle_evaluation'  # present in competition env; not required here

# Auto-detect paths on Kaggle
try:
    import os
    if os.path.exists('/kaggle/input'):
        roots = [p for p in os.listdir('/kaggle/input') if os.path.isdir(os.path.join('/kaggle/input', p))]
        for r in roots:
            p_train = os.path.join('/kaggle/input', r, 'train.csv')
            p_test  = os.path.join('/kaggle/input', r, 'test.csv')
            if os.path.exists(p_train) and os.path.exists(p_test):
                TRAIN_PATH = p_train
                TEST_PATH = p_test
                break
except Exception:
    pass

start_time = time.time()

# 1) Load data
LOGGER.info(f'Loading data from: {TRAIN_PATH} | {TEST_PATH}')
train = pd.read_csv(TRAIN_PATH)
test  = pd.read_csv(TEST_PATH)

# Sort by time to ensure proper temporal order
if 'date_id' in train.columns:
    train = train.sort_values('date_id').reset_index(drop=True)
if 'date_id' in test.columns:
    test = test.sort_values('date_id').reset_index(drop=True)

LOGGER.info(f'Train shape: {train.shape}')
LOGGER.info(f'Test  shape: {test.shape}')

# 2) Target selection (excess returns preferred). We keep a realized return series for backtest if possible.
# Priority: use 'market_forward_excess_returns' if provided; otherwise compute from 'forward_returns' - 'risk_free_rate'.
if 'market_forward_excess_returns' in train.columns:
    TARGET = 'market_forward_excess_returns'
    LOGGER.info('Using TARGET = market_forward_excess_returns')
else:
    if 'forward_returns' in train.columns and 'risk_free_rate' in train.columns:
        train['market_forward_excess_returns'] = train['forward_returns'] - train['risk_free_rate']
        TARGET = 'market_forward_excess_returns'
        LOGGER.info('Computed TARGET = forward_returns - risk_free_rate')
    else:
        raise ValueError('Target not found: need market_forward_excess_returns or (forward_returns and risk_free_rate).')

# Realized returns for toy backtest
if 'forward_returns' in train.columns:
    realized_returns = train['forward_returns'].copy()
elif 'market_forward_excess_returns' in train.columns and 'risk_free_rate' in train.columns:
    realized_returns = train['market_forward_excess_returns'] + train['risk_free_rate']
elif 'market_forward_excess_returns' in train.columns:
    realized_returns = train['market_forward_excess_returns'].copy()
else:
    realized_returns = None

# 3) Feature selection and preprocessing
# Use only numeric features, excluding identifiers and targets
exclude_cols = {'date_id', TARGET, 'forward_returns', 'risk_free_rate'}
num_cols = train.select_dtypes(include=[np.number]).columns.tolist()
feature_cols = [c for c in num_cols if c not in exclude_cols]
LOGGER.info(f'Initial numeric feature count: {len(feature_cols)}')

# Concatenate train+test in time order for leak-free lag/rolling creation (test follows train)
DF = pd.concat([
    train[feature_cols + (['date_id'] if 'date_id' in train.columns else [])],
    test[feature_cols + (['date_id'] if 'date_id' in test.columns else [])]
], axis=0, ignore_index=True)

# Replace infs and fill base NaNs using train-only medians
train_rows = train.shape[0]
for col in feature_cols:
    DF[col] = DF[col].replace([np.inf, -np.inf], np.nan)
base_medians = train[feature_cols].median()
DF[feature_cols] = DF[feature_cols].fillna(base_medians)

# Light, fast feature engineering: 1-lag and 5-day rolling mean (shifted) per column
lag_cols = []
for col in feature_cols:
    lag1 = f'{col}_lag1'
    rmean5 = f'{col}_rmean5'
    DF[lag1] = DF[col].shift(1)
    DF[rmean5] = DF[col].rolling(window=5, min_periods=1).mean().shift(1)
    lag_cols.extend([lag1, rmean5])

# Fill lag feature NaNs using train-only medians to avoid leakage
lag_medians = DF.iloc[:train_rows][lag_cols].median()
DF[lag_cols] = DF[lag_cols].fillna(lag_medians)

# Re-split into engineered train/test
train_fe = DF.iloc[:train_rows].copy()
test_fe  = DF.iloc[train_rows:].copy()

# Restore target and date_id
train_fe[TARGET] = train[TARGET].values
if 'date_id' in train.columns:
    train_fe['date_id'] = train['date_id'].values
if 'date_id' in test.columns:
    test_fe['date_id'] = test['date_id'].values

# Final feature list
features = [c for c in train_fe.columns if c not in ['date_id', TARGET]]

# Cast features to float32 for efficiency and reduce memory
if bool(CONFIG.get('downcast_numeric', False)):
    def downcast_df(df):
        for c in df.select_dtypes(include=[np.number]).columns:
            col = df[c]
            if pd.api.types.is_float_dtype(col):
                df[c] = pd.to_numeric(col, downcast='float')
            elif pd.api.types.is_integer_dtype(col):
                df[c] = pd.to_numeric(col, downcast='integer')
        return df
    train_fe[features] = downcast_df(train_fe[features])
    test_fe[features] = downcast_df(test_fe[features])
else:
    train_fe[features] = train_fe[features].astype('float32')
    test_fe[features] = test_fe[features].astype('float32')

aggressive_gc('post-basic-feature-casting')

LOGGER.info(f'Final features count: {len(features)}')

# Quick EDA prints
LOGGER.info('Basic EDA:')
LOGGER.info(f"Target mean/std: {train_fe[TARGET].mean():.6f} {train_fe[TARGET].std():.6f}")
missing_rate = train[feature_cols].isna().mean().mean()
LOGGER.info(f'Average missing rate (pre-impute) over base features: {missing_rate:.4f}')

# 4) Time-series CV: expanding window / walk-forward

def expanding_walk_forward_splits(n_samples: int,
                                  n_splits: int = 5,
                                  min_train_ratio: float = 0.6,
                                  val_size_ratio: float = 0.1,
                                  min_train: int = 252,
                                  min_val: int = 120):
    min_train_size = max(int(n_samples * min_train_ratio), min_train)
    val_size = max(int(n_samples * val_size_ratio), min_val)
    if min_train_size + val_size >= n_samples:
        # fallback to ensure at least one fold
        min_train_size = max(min_train, n_samples - 2 * min_val)
        val_size = min_val
    starts = np.linspace(min_train_size, n_samples - val_size, num=n_splits, dtype=int)
    seen = set()
    for s in starts:
        if s in seen:
            continue
        seen.add(s)
        tr_idx = np.arange(0, s)
        val_end = min(s + val_size, n_samples)
        val_idx = np.arange(s, val_end)
        if len(val_idx) > 0:
            yield tr_idx, val_idx

n_samples = train_fe.shape[0]
splits = list(expanding_walk_forward_splits(n_samples, n_splits=5))
LOGGER.info(f'CV folds: {len(splits)}')

# 5) Train LightGBM models on each fold
lgb_params = {
    'objective': 'regression',
    'metric': 'rmse',  # use RMSE for early stopping; corr only as feval
    'boosting_type': 'gbdt',
    'learning_rate': 0.02,
    'num_leaves': 64,
    'max_depth': -1,
    'min_data_in_leaf': 50,
    'feature_fraction': 0.7,
    'bagging_fraction': 0.8,
    'bagging_freq': 5,
    'seed': SEED,
    'bagging_seed': SEED + 1,
    'feature_fraction_seed': SEED + 2,
    'verbosity': -1,
    'num_threads': -1,
}

# GPU hooks (if available)
if bool(CONFIG.get('use_gpu_if_available', False)) and 'GPU_AVAILABLE' in globals() and GPU_AVAILABLE:
    try:
        lgb_params.update({'device_type': 'gpu'})
        LOGGER.info('[GPU] Enabled LightGBM GPU')
    except Exception:
        pass

models = []
val_scores = []
feature_importance_df = pd.DataFrame()

for fold, (tr_idx, val_idx) in enumerate(splits):
    X_tr = train_fe.iloc[tr_idx][features]
    y_tr = train_fe.iloc[tr_idx][TARGET]
    X_val = train_fe.iloc[val_idx][features]
    y_val = train_fe.iloc[val_idx][TARGET]

    LOGGER.info(f'Fold {fold+1}/{len(splits)} — train {len(tr_idx)} val {len(val_idx)}')
    dtrain = lgb.Dataset(X_tr, label=y_tr)
    dvalid = lgb.Dataset(X_val, label=y_val)

    try:
        bst = lgb.train(
            params=lgb_params,
            train_set=dtrain,
            num_boost_round=2000,
            valid_sets=[dtrain, dvalid],
            valid_names=['train', 'valid'],
            callbacks=[
                lgb.early_stopping(stopping_rounds=200, verbose=True),
                lgb.log_evaluation(period=200),
            ],
            feval=corr_feval if bool(CONFIG.get('use_corr_feval', False)) else None,
        )
    except Exception as e:
        # Fallback to CPU if GPU device not available
        if isinstance(e, Exception):
            lgb_params.pop('device_type', None)
            bst = lgb.train(
                params=lgb_params,
                train_set=dtrain,
                num_boost_round=2000,
                valid_sets=[dtrain, dvalid],
                valid_names=['train', 'valid'],
                callbacks=[
                    lgb.early_stopping(stopping_rounds=200, verbose=True),
                    lgb.log_evaluation(period=200),
                ],
                feval=corr_feval if bool(CONFIG.get('use_corr_feval', False)) else None,
            )

    models.append(bst)

    val_pred = bst.predict(X_val, num_iteration=bst.best_iteration)
    rmse = mean_squared_error(y_val, val_pred, squared=False)
    LOGGER.info(f'Fold {fold+1} RMSE: {rmse:.6f}')
    val_scores.append(rmse)

    fi = pd.DataFrame({
        'feature': features,
        'importance': bst.feature_importance(importance_type='gain'),
        'fold': fold,
    })
    feature_importance_df = pd.concat([feature_importance_df, fi], axis=0)

    del X_tr, y_tr, X_val, y_val, dtrain, dvalid
    gc.collect()

LOGGER.info(f"CV RMSE mean: {float(np.mean(val_scores)):.6f}")
LOGGER.info(f"CV RMSE std : {float(np.std(val_scores)):.6f}")

# 6) OOF predictions for diagnostics/backtest
oof = np.zeros(n_samples, dtype=float)
counts = np.zeros(n_samples, dtype=float)
for fold, (tr_idx, val_idx) in enumerate(splits):
    bst = models[fold]
    oof[val_idx] += bst.predict(train_fe.iloc[val_idx][features], num_iteration=bst.best_iteration)
    counts[val_idx] += 1

mask = counts > 0
oof[mask] /= counts[mask]
train_fe['pred'] = oof

# Feature importances (mean over folds)
fi_mean = (feature_importance_df.groupby('feature')['importance']
           .mean().sort_values(ascending=False))
LOGGER.info('Top 20 features by average gain:')
LOGGER.info(f"\n{fi_mean.head(20)}")

plt.figure(figsize=(8, 6))
fi_mean.head(20).sort_values().plot(kind='barh')
plt.title('Top 20 feature importances (avg gain)')
plt.tight_layout()
plt.savefig(os.path.join(CONFIG['artifacts_dir'], 'feature_importances_top20.png'))
plt.close()

# 7) Simple volatility-aware mapping from predictions to allocation in [0, 2]
# Estimate rolling prediction volatility to scale signals
pred_sigma = pd.Series(train_fe['pred']).rolling(window=20, min_periods=5).std()
pred_sigma = pred_sigma.fillna(pred_sigma.iloc[5:25].median() if pred_sigma.notna().any() else 1.0)

risk_k = 0.8  # aggressiveness; tune in walk-forward if time allows
alloc_oof = 1.0 + risk_k * (train_fe['pred'] / (pred_sigma.replace(0, np.nan).fillna(1e-6)))
alloc_oof = alloc_oof.clip(0.0, 2.0)
train_fe['alloc'] = alloc_oof

# Enforce vol cap: realized strategy vol <= 1.2 * market vol (rolling)
if realized_returns is not None:
    train_fe['realized_returns'] = realized_returns.values
    strat_ret_raw = train_fe['alloc'] * train_fe['realized_returns']
    strat_vol_180 = strat_ret_raw.rolling(180, min_periods=30).std().fillna(strat_ret_raw.std())
    market_vol_180 = train_fe['realized_returns'].rolling(180, min_periods=30).std().fillna(train_fe['realized_returns'].std())

    scale = (1.2 * market_vol_180) / (strat_vol_180.replace(0, np.nan).fillna(1e-6))
    scale = scale.clip(0.0, 2.0)
    alloc_scaled = 1.0 + (train_fe['alloc'] - 1.0) * scale
    train_fe['alloc_scaled'] = alloc_scaled.clip(0.0, 2.0)

    # Simple transaction cost model: one-way cost on changes in allocation
    tc = CONFIG.get('transaction_cost_bps', 0.0) / 10000.0
    alloc_change_raw = train_fe['alloc'].diff().abs().fillna(0.0)
    alloc_change_scaled = train_fe['alloc_scaled'].diff().abs().fillna(0.0)
    tc_raw = tc * alloc_change_raw
    tc_scaled = tc * alloc_change_scaled

    # Backtest diagnostics (after costs)
    train_fe['strat_ret_raw'] = strat_ret_raw - tc_raw
    train_fe['strat_ret_scaled'] = (train_fe['alloc_scaled'] * train_fe['realized_returns']) - tc_scaled

    train_fe['cum_sp500'] = (1.0 + train_fe['realized_returns']).cumprod()
    train_fe['cum_raw']   = (1.0 + train_fe['strat_ret_raw']).cumprod()
    train_fe['cum_scaled']= (1.0 + train_fe['strat_ret_scaled']).cumprod()

    plt.figure(figsize=(10, 6))
    x_axis = train_fe['date_id'] if 'date_id' in train_fe.columns else np.arange(len(train_fe))
    plt.plot(x_axis, train_fe['cum_sp500'], label='Market (buy-hold)')
    plt.plot(x_axis, train_fe['cum_raw'], label='Strategy (raw)')
    plt.plot(x_axis, train_fe['cum_scaled'], label='Strategy (vol scaled)')
    plt.legend(); plt.title('Cumulative performance (toy backtest)'); plt.tight_layout()
    plt.savefig(os.path.join(CONFIG['artifacts_dir'], 'backtest_cumulative.png'))
    plt.close()

    def ann_sharpe(returns: pd.Series, days_per_year: int = 252):
        mu = returns.mean() * days_per_year
        sd = returns.std() * np.sqrt(days_per_year)
        return float(mu / (sd + 1e-9))

    LOGGER.info(f"Ann Sharpe raw   (toy): {ann_sharpe(train_fe['strat_ret_raw'].fillna(0)):.4f}")
    LOGGER.info(f"Ann Sharpe scaled(toy): {ann_sharpe(train_fe['strat_ret_scaled'].fillna(0)):.4f}")
    LOGGER.info(f"Ann Sharpe market      : {ann_sharpe(train_fe['realized_returns'].fillna(0)):.4f}")
else:
    LOGGER.warning('Skipping backtest: realized forward returns not available in train.')

# 8) Train final ensemble on full train and predict test
X_full = train_fe[features]
y_full = train_fe[TARGET]

avg_best_iter = int(np.clip(np.mean([m.best_iteration for m in models]), 200, 2000)) if len(models) else 1000
LOGGER.info(f'Using avg_best_iter: {avg_best_iter}')

final_models = []
N_FINAL = int(CONFIG.get('final_ensemble_size', 3))  # config-driven ensemble size
for i in range(N_FINAL):
    params = lgb_params.copy()
    params['seed'] = SEED + 13 * i
    params['bagging_seed'] = SEED + 13 * i + 1
    params['feature_fraction_seed'] = SEED + 13 * i + 2
    w_full = make_time_decay_weights(len(X_full), int(CONFIG.get('time_decay_halflife_days', 0)))
    dtrain = lgb.Dataset(X_full, label=y_full, weight=w_full)
    try:
        bst = lgb.train(params, dtrain, num_boost_round=avg_best_iter)
    except Exception as e:
        if isinstance(e, Exception):
            params.pop('device_type', None)
            bst = lgb.train(params, dtrain, num_boost_round=avg_best_iter)
    final_models.append(bst)
    del dtrain

def rank_normalize(arr: np.ndarray) -> np.ndarray:
    n = len(arr)
    if n <= 1:
        return np.zeros_like(arr, dtype=float)
    ranks = np.argsort(np.argsort(arr))
    return ranks.astype(float) / max(n - 1, 1)

# Predict test with rank-based ensembling (often more robust for Sharpe-like metrics)
X_test = test_fe[features]
preds_list = []
for bst in final_models:
    # ensure a concrete iteration count
    preds_list.append(bst.predict(X_test, num_iteration=avg_best_iter))

# Average of ranks (0..1), then center roughly around 0 by subtracting 0.5 for mapping
ranked_preds = np.stack([rank_normalize(p) for p in preds_list], axis=1).mean(axis=1)
preds_test = ranked_preds - 0.5

# Allocation mapping in [0, 2]
# Use sigma estimated from recent prediction volatility on train as a proxy
pred_sigma_train = pd.Series(train_fe['pred']).rolling(window=20, min_periods=5).std()
if pred_sigma_train.notna().any():
    sigma_est = float(pred_sigma_train.iloc[-50:].median()) if pred_sigma_train.iloc[-50:].notna().any() else float(pred_sigma_train.dropna().median())
else:
    sigma_est = float(train_fe['pred'].std() if 'pred' in train_fe else 1.0)

k = 0.8  # same aggressiveness factor used above
alloc_test = 1.0 + k * (preds_test / (sigma_est + 1e-9))
alloc_test = np.clip(alloc_test, 0.0, 2.0)

# Apply a global downscale if recent historical strategy vol would breach cap
if realized_returns is not None:
    hist_window = min(len(train_fe), 180)
    hist_strat_vol = (train_fe['alloc'].iloc[-hist_window:] * train_fe['realized_returns'].iloc[-hist_window:]).std()
    hist_mkt_vol   = train_fe['realized_returns'].iloc[-hist_window:].std()
    scale_global = min(1.0, (CONFIG['vol_cap_multiple'] * hist_mkt_vol) / (hist_strat_vol + 1e-9)) if hist_strat_vol > 0 else 1.0
    alloc_test = 1.0 + (alloc_test - 1.0) * scale_global
    alloc_test = np.clip(alloc_test, 0.0, 2.0)

# Do not write submission here; defer to calibrated step
submission = pd.DataFrame({
    'date_id': test_fe['date_id'].values if 'date_id' in test_fe.columns else np.arange(len(test_fe)),
    'allocation': alloc_test.astype(float)
})
LOGGER.info(f"Prepared preliminary submission with {submission.shape[0]} rows (not yet saved)")



In [None]:
# === Enhanced feature engineering and nested Sharpe-centric CV ===
import math

LOGGER.info('Starting enhanced feature engineering and nested calibration CV...')

# Rebuild engineered features in a modular way to avoid leakage

def build_features(df: pd.DataFrame, base_cols: list[str]) -> tuple[pd.DataFrame, list[str]]:
    out = df.copy()
    created = []
    lag_list = CONFIG.get('feature_lag_list', [1, 5, 10, 20])
    roll_ws  = CONFIG.get('rolling_windows', [5, 20, 60])
    ewm_halfs= CONFIG.get('ewma_halflives_feat', [10, 60])
    roc_ws   = CONFIG.get('roc_windows', [5, 20])
    z_ws     = CONFIG.get('zscore_windows', [20, 60])

    # Seasonality proxies from date_id if available (shifted to avoid leakage)
    if 'date_id' in out.columns:
        d = out['date_id']
        for P, name in [
            (int(CONFIG.get('seasonality_week_period', 5)), 'wk'),
            (int(CONFIG.get('seasonality_month_period', 21)), 'mo'),
            (int(CONFIG.get('seasonality_quarter_period', 63)), 'qr'),
        ]:
            if P and P > 1:
                out[f'sin_{name}'] = np.sin(2 * np.pi * (d % P) / P).shift(1)
                out[f'cos_{name}'] = np.cos(2 * np.pi * (d % P) / P).shift(1)
                created.extend([f'sin_{name}', f'cos_{name}'])

    for col in base_cols:
        s = out[col]
        # Lags
        for L in lag_list:
            name = f'{col}_lag{L}'
            out[name] = s.shift(L)
            created.append(name)
        # Rolling stats (shifted)
        for W in roll_ws:
            mean_name = f'{col}_rmean{W}'
            std_name  = f'{col}_rstd{W}'
            out[mean_name] = s.rolling(W, min_periods=1).mean().shift(1)
            out[std_name]  = s.rolling(W, min_periods=1).std().shift(1)
            created.extend([mean_name, std_name])
        # EWMA std (shifted)
        for H in ewm_halfs:
            name = f'{col}_ewmstd_h{H}'
            out[name] = s.ewm(halflife=H, min_periods=max(5, H//2)).std().shift(1)
            created.append(name)
        # Rate-of-change (shifted)
        for W in roc_ws:
            name = f'{col}_roc{W}'
            out[name] = s.pct_change(W).shift(1)
            created.append(name)
        # Z-scores (shifted)
        for W in z_ws:
            mu = s.rolling(W, min_periods=1).mean().shift(1)
            sd = s.rolling(W, min_periods=1).std().shift(1)
            name = f'{col}_z{W}'
            out[name] = (s.shift(1) - mu) / (sd.replace(0, np.nan) + 1e-6)
            created.append(name)

    # Vol-of-vol and rolling higher moments on the target proxy if present
    target_proxy = None
    if 'realized_returns' in out.columns:
        target_proxy = out['realized_returns']
    elif 'forward_returns' in out.columns:
        target_proxy = out['forward_returns']
    if target_proxy is not None:
        for W in [20, 60, 120]:
            name_vv = f'target_volofvol_{W}'
            name_sk = f'target_skew_{W}'
            name_ku = f'target_kurt_{W}'
            r = target_proxy.shift(1)
            out[name_vv] = r.rolling(W, min_periods=10).std().rolling(W, min_periods=10).std().shift(1)
            out[name_sk] = r.rolling(W, min_periods=10).skew().shift(1)
            out[name_ku] = r.rolling(W, min_periods=10).kurt().shift(1)
            created.extend([name_vv, name_sk, name_ku])

    return out, created

# Build from scratch using the time-safe concat approach
exclude_cols = {'date_id', TARGET, 'forward_returns', 'risk_free_rate'}
num_cols = train.select_dtypes(include=[np.number]).columns.tolist()
base_feature_cols = [c for c in num_cols if c not in exclude_cols]

DF_base = pd.concat([
    train[base_feature_cols + (['date_id'] if 'date_id' in train.columns else [])],
    test[base_feature_cols + (['date_id'] if 'date_id' in test.columns else [])]
], axis=0, ignore_index=True)

# Clean base features
train_rows = train.shape[0]
for col in base_feature_cols:
    DF_base[col] = DF_base[col].replace([np.inf, -np.inf], np.nan)
base_medians = train[base_feature_cols].median()
DF_base[base_feature_cols] = DF_base[base_feature_cols].fillna(base_medians)

# Build engineered features
DF_eng, created_cols = build_features(DF_base, base_feature_cols)

# Clean engineered features: replace infs, impute with train-only medians, and handle all-NaN cols
DF_eng[created_cols] = DF_eng[created_cols].replace([np.inf, -np.inf], np.nan)
eng_medians = DF_eng.iloc[:train_rows][created_cols].median()
DF_eng[created_cols] = DF_eng[created_cols].fillna(eng_medians)
# Drop engineered columns that remain all-NaN in train
na_all_cols = [c for c in created_cols if DF_eng.iloc[:train_rows][c].isna().all()]
if len(na_all_cols):
    LOGGER.warning(f'Dropping {len(na_all_cols)} engineered features with all-NaN in train')
    DF_eng.drop(columns=na_all_cols, inplace=True, errors='ignore')
    created_cols = [c for c in created_cols if c not in na_all_cols]
# Final safety fill for any residual NaNs
DF_eng[created_cols] = DF_eng[created_cols].fillna(0.0)

# Split back to train/test
train_fe = DF_eng.iloc[:train_rows].copy()
test_fe  = DF_eng.iloc[train_rows:].copy()

# Restore target/date
train_fe[TARGET] = train[TARGET].values
if 'date_id' in train.columns:
    train_fe['date_id'] = train['date_id'].values
if 'date_id' in test.columns:
    test_fe['date_id'] = test['date_id'].values

# Realized returns for backtest
if realized_returns is not None:
    train_fe['realized_returns'] = realized_returns.values

# Final feature set and types
features = [c for c in train_fe.columns if c not in ['date_id', TARGET, 'realized_returns']]
if bool(CONFIG.get('downcast_numeric', False)):
    def downcast_df(df):
        for c in df.select_dtypes(include=[np.number]).columns:
            col = df[c]
            if pd.api.types.is_float_dtype(col):
                df[c] = pd.to_numeric(col, downcast='float')
            elif pd.api.types.is_integer_dtype(col):
                df[c] = pd.to_numeric(col, downcast='integer')
        return df
    train_fe[features] = downcast_df(train_fe[features])
    test_fe[features] = downcast_df(test_fe[features])
else:
    train_fe[features] = train_fe[features].astype('float32')
    test_fe[features] = test_fe[features].astype('float32')

aggressive_gc('post-enhanced-feature-casting')

LOGGER.info(f'Enhanced features count: {len(features)} (added {len(created_cols)} engineered)')

# Expanding CV with optional embargo between train/val
n_samples = train_fe.shape[0]
splits = list(expanding_walk_forward_splits(n_samples, n_splits=int(CONFIG.get('cv_n_splits', 5))))
LOGGER.info(f'CV folds (enhanced): {len(splits)}')

# Optionally replace with date-aware splits (purged) based on unique date_id
if bool(CONFIG.get('use_date_splits', False)) and 'date_id' in train_fe.columns:
    def walk_forward_by_dates_local(df: pd.DataFrame,
                                    date_col: str,
                                    n_splits: int,
                                    min_train_days: int,
                                    val_days: int) -> list[tuple[np.ndarray, np.ndarray]]:
        dates = df[date_col].values
        unique_dates = np.unique(dates)
        result: list[tuple[np.ndarray, np.ndarray]] = []
        if len(unique_dates) < (min_train_days + val_days + 1):
            return result
        anchors = np.linspace(min_train_days, len(unique_dates) - val_days, n_splits, dtype=int)
        for a in anchors:
            train_last_date = unique_dates[a - 1]
            val_last_date = unique_dates[min(a + val_days - 1, len(unique_dates) - 1)]
            tr_idx = np.where(dates <= train_last_date)[0]
            val_idx = np.where((dates > train_last_date) & (dates <= val_last_date))[0]
            if len(val_idx) > 0:
                result.append((tr_idx, val_idx))
        return result
    splits_date = walk_forward_by_dates_local(
        train_fe, 'date_id',
        n_splits=int(CONFIG.get('cv_n_splits', 5)),
        min_train_days=int(CONFIG.get('date_min_train_days', 252)),
        val_days=int(CONFIG.get('date_val_days', 120))
    )
    if len(splits_date):
        splits = splits_date
        LOGGER.info(f'CV folds (enhanced, date-based): {len(splits)}')
    else:
        LOGGER.info('Date-based splits not applied (insufficient unique dates); using row-based.')


def apply_embargo(tr_idx: np.ndarray, val_idx: np.ndarray, days: int) -> tuple[np.ndarray, np.ndarray]:
    if days <= 0 or len(val_idx) == 0 or len(tr_idx) == 0:
        return tr_idx, val_idx
    # Prefer date-based embargo if date_id available
    try:
        if 'date_id' in train_fe.columns:
            dates_all = train_fe['date_id'].values
            unique_dates = np.unique(dates_all)
            date_to_ord = {d: i for i, d in enumerate(unique_dates)}
            tr_ord = np.array([date_to_ord[d] for d in dates_all[tr_idx]], dtype=int)
            val_ord = np.array([date_to_ord[d] for d in dates_all[val_idx]], dtype=int)
            max_tr_ord = int(tr_ord.max())
            min_val_ord = int(val_ord.min())
            tr_mask = tr_ord <= (max_tr_ord - days)
            val_mask = val_ord >= (min_val_ord + days)
            tr_idx2 = tr_idx[tr_mask]
            val_idx2 = val_idx[val_mask]
            return tr_idx2, val_idx2
    except Exception:
        pass
    # Fallback: index-based embargo
    keep_train = max(0, len(tr_idx) - days)
    tr_idx2 = tr_idx[:keep_train]
    val_idx2 = val_idx[days:] if len(val_idx) > days else np.array([], dtype=int)
    return tr_idx2, val_idx2

# Train LightGBM with nested calibration on val per fold
lgb_params = {
    'objective': 'regression',
    'metric': 'rmse',  # use RMSE for early stopping; corr only as feval
    'boosting_type': 'gbdt',
    'learning_rate': 0.02,
    'num_leaves': 64,
    'max_depth': -1,
    'min_data_in_leaf': 50,
    'feature_fraction': 0.7,
    'bagging_fraction': 0.8,
    'bagging_freq': 5,
    'seed': SEED,
    'bagging_seed': SEED + 1,
    'feature_fraction_seed': SEED + 2,
    'verbosity': -1,
    'num_threads': -1,
}

# Optional Optuna tuning across multiple folds with stability penalty
if bool(CONFIG.get('enable_optuna', False)) and int(CONFIG.get('optuna_trials', 0)) > 0:
    try:
        import optuna
        LOGGER.info('[Optuna] Starting LightGBM tuning (multi-fold)...')
        use_folds = min(int(CONFIG.get('optuna_folds', 1)), len(splits))
        folds_for_tune = [splits[i] for i in range(use_folds)]
        def build_fold_data(tr_idx, val_idx):
            # For tuning, disable embargo to ensure enough validation samples
            tr_idx2, val_idx2 = apply_embargo(tr_idx, val_idx, 0)
            X_tr0 = train_fe.iloc[tr_idx2][features]
            y_tr0 = train_fe.iloc[tr_idx2][TARGET]
            X_val0 = train_fe.iloc[val_idx2][features]
            y_val0 = train_fe.iloc[val_idx2][TARGET]
            w_tr0 = make_time_decay_weights(len(X_tr0), int(CONFIG.get('time_decay_halflife_days', 0)))
            return X_tr0, y_tr0, X_val0, y_val0, w_tr0, val_idx2
        fold_data_cache = [build_fold_data(*fold) for fold in folds_for_tune]
        def objective(trial):
            params = lgb_params.copy()
            params.update({
                'metric': 'l2',  # RMSE for early stopping in tuning
                'num_leaves': trial.suggest_int('num_leaves', 31, 128),
                'learning_rate': trial.suggest_float('learning_rate', 0.01, 0.05, log=True),
                'feature_fraction': trial.suggest_float('feature_fraction', 0.6, 0.9),
                'bagging_fraction': trial.suggest_float('bagging_fraction', 0.6, 0.9),
                'min_data_in_leaf': trial.suggest_int('min_data_in_leaf', 50, 200),
                'lambda_l1': trial.suggest_float('lambda_l1', 0.0, 5.0),
                'lambda_l2': trial.suggest_float('lambda_l2', 0.0, 5.0),
                'min_gain_to_split': trial.suggest_float('min_gain_to_split', 0.0, 1.0),
            })
            scores = []
            for (X_tr0, y_tr0, X_val0, y_val0, w_tr0, val_idx2) in fold_data_cache:
                dtrain0 = lgb.Dataset(X_tr0, label=y_tr0, weight=w_tr0)
                dvalid0 = lgb.Dataset(X_val0, label=y_val0)
                try:
                    # Ensure RMSE for early stopping during tuning
                    params['metric'] = 'l2'
                    bst0 = lgb.train(
                        params=params,
                        train_set=dtrain0,
                        num_boost_round=1000,
                        valid_sets=[dtrain0, dvalid0],
                        valid_names=['train','valid'],
                        callbacks=[
                            lgb.early_stopping(stopping_rounds=200, verbose=False),
                        ],
                        feval=corr_feval if bool(CONFIG.get('use_corr_feval', False)) else None,
                    )
                except Exception as e:
                    if isinstance(e, Exception):
                        params.pop('device_type', None)
                        bst0 = lgb.train(
                            params=params,
                            train_set=dtrain0,
                            num_boost_round=1000,
                            valid_sets=[dtrain0, dvalid0],
                            valid_names=['train','valid'],
                            callbacks=[
                                lgb.early_stopping(stopping_rounds=200, verbose=False),
                            ],
                            feval=corr_feval if bool(CONFIG.get('use_corr_feval', False)) else None,
                        )
                # Ensure fold has sufficient validation samples
                if len(y_val0) < 10:
                    scores.append(-1e6)
                    continue

                val_pred0 = bst0.predict(X_val0, num_iteration=bst0.best_iteration)
                val_pred0 = np.nan_to_num(val_pred0, nan=0.0, posinf=0.0, neginf=0.0)
                # Score by correlation for tuning (avoid brittle calibration here)
                score = float(fast_corr(y_val0, val_pred0))

                if not np.isfinite(score):
                    score = -1e6
                scores.append(score)
            mean_score = float(np.mean(scores)) if len(scores) else -np.inf
            std_score = float(np.std(scores)) if len(scores) > 1 else 0.0
            penalty = float(CONFIG.get('optuna_stability_penalty', 0.0)) * std_score
            return mean_score - penalty
        study = optuna.create_study(direction='maximize')
        study.optimize(objective, n_trials=int(CONFIG.get('optuna_trials', 20)), timeout=int(CONFIG.get('optuna_timeout_min', 20))*60)
        best_params = study.best_params
        LOGGER.info(f"[Optuna] Best params: {best_params}")
        lgb_params.update(best_params)
        aggressive_gc('post-optuna')
    except Exception as e:
        LOGGER.warning(f'[Optuna] Skipped: {e}')

# Optional extra model libraries and base params
xgb_available = False
cat_available = False
if bool(CONFIG.get('enable_xgboost', False)):
    try:
        import xgboost as xgb
        xgb_available = True
        xgb_params_base = {
            'objective': 'reg:squarederror',
            'learning_rate': float(lgb_params.get('learning_rate', 0.02)),
            'max_depth': 6,
            'subsample': 0.8,
            'colsample_bytree': 0.7,
            'lambda': 1.0,
            'alpha': 0.0,
            'tree_method': 'hist',
            'seed': int(SEED),
        }
        # GPU for XGBoost if available (handle XGB v2+ vs v1.x)
        if bool(CONFIG.get('use_gpu_if_available', False)) and 'GPU_AVAILABLE' in globals() and GPU_AVAILABLE:
            try:
                ver_parts = [int(p) for p in str(getattr(xgb, '__version__', '1.7.0')).split('.')[:2]]
            except Exception:
                ver_parts = [1, 7]
            if ver_parts[0] >= 2:
                xgb_params_base.update({'device': 'cuda'})
            else:
                xgb_params_base.update({'tree_method': 'gpu_hist', 'predictor': 'gpu_predictor', 'gpu_id': 0})
            LOGGER.info('[GPU] Enabled XGBoost CUDA acceleration')
    except Exception as e:
        LOGGER.warning(f'XGBoost unavailable: {e}')
if bool(CONFIG.get('enable_catboost', False)):
    try:
        from catboost import CatBoostRegressor, Pool as CatPool
        cat_available = True
        cat_params_base = {
            'loss_function': 'RMSE',
            'learning_rate': float(lgb_params.get('learning_rate', 0.02)),
            'depth': 6,
            'l2_leaf_reg': 3.0,
            'random_seed': int(SEED),
            'od_type': 'Iter',
            'od_wait': 200,
            'thread_count': -1,
            'verbose': False,
        }
        # GPU for CatBoost if available
        if bool(CONFIG.get('use_gpu_if_available', False)) and 'GPU_AVAILABLE' in globals() and GPU_AVAILABLE:
            # Force GPU 0; CatBoost auto-select may fail on some Kaggle images
            cat_params_base.update({'task_type': 'GPU', 'devices': '0'})
            LOGGER.info('[GPU] Enabled CatBoost GPU')
    except Exception as e:
        LOGGER.warning(f'CatBoost unavailable: {e}')

models = []
# Optional additional model containers
models_xgb = []
models_cat = []
per_model_val_sharpes = {'lgb': [], 'xgb': [], 'cat': []}

val_scores = []
fold_sharpes = []
fold_risk_params = []
feature_importance_df = pd.DataFrame()

# OOF containers
oof_pred = np.zeros(n_samples, dtype=float)
oof_counts = np.zeros(n_samples, dtype=float)
oof_alloc = np.zeros(n_samples, dtype=float)

for fold, (tr_idx, val_idx) in enumerate(splits):
    tr_idx, val_idx = apply_embargo(tr_idx, val_idx, int(CONFIG.get('fold_embargo_days', 0)))
    if len(val_idx) == 0:
        LOGGER.warning(f'Fold {fold+1}: empty val after embargo; skipping')
        continue

    X_tr = train_fe.iloc[tr_idx][features]
    y_tr = train_fe.iloc[tr_idx][TARGET]
    X_val = train_fe.iloc[val_idx][features]
    y_val = train_fe.iloc[val_idx][TARGET]

    LOGGER.info(f'[Enhanced] Fold {fold+1}/{len(splits)} — train {len(tr_idx)} val {len(val_idx)}')
    # Time-decay weights (recent data up-weighted)
    hl_days = int(CONFIG.get('time_decay_halflife_days', 0))
    w_tr = make_time_decay_weights(len(X_tr), hl_days)
    # Apply inverse-volatility modifier if realized exists
    if 'realized_returns' in train_fe.columns:
        realized_tr = train_fe.iloc[tr_idx]['realized_returns']
        w_tr = apply_inverse_vol_weights(w_tr, realized_tr, int(CONFIG.get('realized_vol_halflife', 60)))

    dtrain = lgb.Dataset(X_tr, label=y_tr, weight=w_tr)
    dvalid = lgb.Dataset(X_val, label=y_val)

    try:
        bst = lgb.train(
            params=lgb_params,
            train_set=dtrain,
            num_boost_round=2000,
            valid_sets=[dtrain, dvalid],
            valid_names=['train', 'valid'],
            callbacks=[
                lgb.early_stopping(stopping_rounds=200, verbose=True),
                lgb.log_evaluation(period=200),
            ],
            feval=corr_feval if bool(CONFIG.get('use_corr_feval', False)) else None,
        )
    except Exception as e:
        if isinstance(e, Exception):
            lgb_params.pop('device_type', None)
            bst = lgb.train(
                params=lgb_params,
                train_set=dtrain,
                num_boost_round=2000,
                valid_sets=[dtrain, dvalid],
                valid_names=['train', 'valid'],
                callbacks=[
                    lgb.early_stopping(stopping_rounds=200, verbose=True),
                    lgb.log_evaluation(period=200),
                ],
                feval=corr_feval if bool(CONFIG.get('use_corr_feval', False)) else None,
            )

    models.append(bst)

    val_pred = bst.predict(X_val, num_iteration=bst.best_iteration)
    rmse = mean_squared_error(y_val, val_pred, squared=False)
    if bool(CONFIG.get('eval_correlation', False)):
        corr = fast_corr(y_val, val_pred)
        LOGGER.info(f"Fold {fold+1} RMSE: {rmse:.6f} | Corr: {corr:.4f}")
    else:
        LOGGER.info(f"Fold {fold+1} RMSE: {rmse:.6f}")
    val_scores.append(rmse)

    # Nested calibration using only val period
    if 'realized_returns' in train_fe.columns:
        realized_val = train_fe.iloc[val_idx]['realized_returns'].astype(float)
        best = calibrate_k_and_band(
            oof_pred=pd.Series(val_pred, index=realized_val.index),
            realized=realized_val,
            pred_halflife=int(CONFIG['oof_pred_vol_halflife']),
            realized_halflife=int(CONFIG['realized_vol_halflife']),
            k_grid=CONFIG['risk_k_grid'],
            band_grid=CONFIG['neutral_band_grid'],
            vol_cap_multiple=float(CONFIG['vol_cap_multiple']),
            turnover_bps=float(CONFIG.get('transaction_cost_bps', 0.0)),
        )
        fold_risk_params.append(best)
        # Map to allocation for OOF alloc series and Sharpe eval
        k_use = best.get('k', None)
        band_use = best.get('band', None)
        if k_use is None or not np.isfinite(k_use):
            k_use = 0.8
        if band_use is None or not np.isfinite(band_use):
            band_use = 0.0
        pred_sigma = ewma_std(pd.Series(val_pred, index=realized_val.index), int(CONFIG['oof_pred_vol_halflife']))
        z = pd.Series(val_pred, index=realized_val.index) / (pred_sigma.replace(0, np.nan).fillna(1e-6))
        z_band = z.where(z.abs() >= band_use, 0.0)
        alloc_val = (1.0 + k_use * z_band).clip(0.0, 2.0)
        # Vol cap scaling vs market vol
        mkt_vol = ewma_std(realized_val, int(CONFIG['realized_vol_halflife']))
        strat_vol = ewma_std(alloc_val * realized_val, int(CONFIG['realized_vol_halflife']))
        cap = (float(CONFIG['vol_cap_multiple']) * mkt_vol) / (strat_vol.replace(0, np.nan).fillna(1e-6))
        cap = cap.clip(0.0, 2.0)
        alloc_val = (1.0 + (alloc_val - 1.0) * cap).clip(0.0, 2.0)
        strat_ret_val = alloc_val * realized_val
        sharpe_val = annualized_sharpe(strat_ret_val.fillna(0.0), days_per_year=int(CONFIG.get('annualization', 252)))
        fold_sharpes.append(float(sharpe_val))
        per_model_val_sharpes['lgb'].append(float(sharpe_val))
        # Store OOF alloc and pred
        oof_alloc[val_idx] += alloc_val.values
    else:
        fold_risk_params.append({'k': None, 'band': None, 'score': None})
        fold_sharpes.append(float('nan'))
        per_model_val_sharpes['lgb'].append(float('nan'))

    oof_pred[val_idx] += val_pred
    oof_counts[val_idx] += 1

    fi = pd.DataFrame({
        'feature': features,
        'importance': bst.feature_importance(importance_type='gain'),
        'fold': fold,
    })
    feature_importance_df = pd.concat([feature_importance_df, fi], axis=0)

    # Optional: diversified models on same fold
    if xgb_available:
        try:
            dtr = xgb.DMatrix(X_tr, label=y_tr, weight=w_tr)
            dvl = xgb.DMatrix(X_val, label=y_val)
            xgb_params = xgb_params_base.copy()
            xgb_params['seed'] = int(SEED + 31 * fold)
            bst_xgb = xgb.train(
                params=xgb_params,
                dtrain=dtr,
                num_boost_round=2000,
                evals=[(dtr, 'train'), (dvl, 'valid')],
                verbose_eval=False,
            )
            nlim = getattr(bst_xgb, 'best_ntree_limit', None)
            val_pred_xgb = bst_xgb.predict(dvl, ntree_limit=nlim) if nlim else bst_xgb.predict(dvl)
            if 'realized_returns' in train_fe.columns:
                realized_val2 = train_fe.iloc[val_idx]['realized_returns'].astype(float)
                best_x = calibrate_k_and_band(pd.Series(val_pred_xgb, index=realized_val2.index), realized_val2,
                                              int(CONFIG['oof_pred_vol_halflife']), int(CONFIG['realized_vol_halflife']),
                                              CONFIG['risk_k_grid'], CONFIG['neutral_band_grid'], float(CONFIG['vol_cap_multiple']),
                                              turnover_bps=float(CONFIG.get('transaction_cost_bps', 0.0)))
                per_model_val_sharpes['xgb'].append(float(best_x['score']))
            else:
                per_model_val_sharpes['xgb'].append(float(fast_corr(y_val, val_pred_xgb)))
            models_xgb.append(bst_xgb)
        except Exception as e:
            LOGGER.warning(f'XGB fold {fold+1} skipped: {e}')
        aggressive_gc(f'post-fold-xgb-{fold+1}')

    if cat_available:
        try:
            model_cat = CatBoostRegressor(**cat_params_base)
            model_cat.set_params(random_seed=int(SEED + 37 * fold))
            model_cat.fit(X_tr, y_tr, sample_weight=w_tr, eval_set=(X_val, y_val), use_best_model=True, verbose=False)
            val_pred_cat = model_cat.predict(X_val)
            if 'realized_returns' in train_fe.columns:
                realized_val3 = train_fe.iloc[val_idx]['realized_returns'].astype(float)
                best_c = calibrate_k_and_band(pd.Series(val_pred_cat, index=realized_val3.index), realized_val3,
                                              int(CONFIG['oof_pred_vol_halflife']), int(CONFIG['realized_vol_halflife']),
                                              CONFIG['risk_k_grid'], CONFIG['neutral_band_grid'], float(CONFIG['vol_cap_multiple']),
                                              turnover_bps=float(CONFIG.get('transaction_cost_bps', 0.0)))
                per_model_val_sharpes['cat'].append(float(best_c['score']))
            else:
                per_model_val_sharpes['cat'].append(float(fast_corr(y_val, val_pred_cat)))
            models_cat.append(model_cat)
        except Exception as e:
            LOGGER.warning(f'CatBoost fold {fold+1} skipped: {e}')
        aggressive_gc(f'post-fold-cat-{fold+1}')

    del X_tr, y_tr, X_val, y_val, dtrain, dvalid
    gc.collect()
    aggressive_gc(f'post-fold-{fold+1}')

mask = oof_counts > 0
oof_pred[mask] /= oof_counts[mask]
if 'realized_returns' in train_fe.columns:
    oof_alloc[mask] /= oof_counts[mask]

train_fe['pred'] = oof_pred
if 'realized_returns' in train_fe.columns:
    train_fe['alloc_oof'] = np.clip(oof_alloc, 0.0, 2.0)

# Optional stacking meta-learner on OOF predictions across model families
meta_pred = None
if bool(CONFIG.get('use_stacking_meta_learner', False)) and (len(models) or len(models_xgb) or len(models_cat)):
    try:
        from sklearn.linear_model import Ridge
        # Build OOF matrix: for each fold, get predictions from each available family
        fam_names = []
        oof_matrix = []
        # We will rebuild per-fold preds to avoid leakage
        for fold, (tr_idx, val_idx) in enumerate(splits):
            tr_idx2, val_idx2 = apply_embargo(tr_idx, val_idx, int(CONFIG.get('fold_embargo_days', 0)))
            if len(val_idx2) == 0:
                continue
            X_val = train_fe.iloc[val_idx2][features]
            row_preds = []
            fam_names_fold = []
            if fold < len(models):
                row_preds.append(models[fold].predict(X_val, num_iteration=models[fold].best_iteration))
                fam_names_fold.append('lgb')
            if xgb_available and fold < len(models_xgb):
                dvl = xgb.DMatrix(X_val)
                nlim = getattr(models_xgb[fold], 'best_ntree_limit', None)
                row_preds.append(models_xgb[fold].predict(dvl, ntree_limit=nlim) if nlim else models_xgb[fold].predict(dvl))
                fam_names_fold.append('xgb')
            if cat_available and fold < len(models_cat):
                row_preds.append(models_cat[fold].predict(X_val))
                fam_names_fold.append('cat')
            if not fam_names:
                fam_names = fam_names_fold
            if row_preds:
                stacked = np.vstack(row_preds).T
                oof_matrix.append((val_idx2, stacked))
        if oof_matrix:
            # Assemble full matrix
            n = len(train_fe)
            k = len(fam_names)
            M = np.full((n, k), np.nan, dtype=float)
            for idxs, block in oof_matrix:
                M[idxs, :block.shape[1]] = block
            mask_rows = ~np.isnan(M).any(axis=1)
            M_fit = M[mask_rows]
            y_fit = train_fe.loc[mask_rows, TARGET].values
            meta = Ridge(alpha=1.0, fit_intercept=True)
            meta.fit(M_fit, y_fit)
            # meta OOF prediction
            meta_pred = np.full(n, np.nan, dtype=float)
            meta_pred[mask_rows] = meta.predict(M_fit)
            train_fe['pred_meta'] = meta_pred
            LOGGER.info(f"[Stacking] Meta-learner trained with families: {fam_names}")
    except Exception as e:
        LOGGER.warning(f'[Stacking] Skipped: {e}')

fi_mean = (feature_importance_df.groupby('feature')['importance']
           .mean().sort_values(ascending=False))
LOGGER.info('[Enhanced] Top 20 features by avg gain:\n' + str(fi_mean.head(20)))

plt.figure(figsize=(8, 6))
fi_mean.head(20).sort_values().plot(kind='barh')
plt.title('Top 20 feature importances (enhanced, avg gain)')
plt.tight_layout()
plt.savefig(os.path.join(CONFIG['artifacts_dir'], 'feature_importances_top20_enhanced.png'))
plt.close()

# Select top-K features for final training if configured, with stability filter
_topk = CONFIG.get('final_topk_features', None)
min_folds_keep = int(CONFIG.get('stability_filter_min_folds', 0))
if not fi_mean.empty and min_folds_keep > 0:
    fi_counts = (feature_importance_df.groupby('feature')['fold']
                 .nunique().sort_values(ascending=False))
    stable_feats = set(fi_counts[fi_counts >= min_folds_keep].index.tolist())
else:
    stable_feats = set(features)

if isinstance(_topk, int) and _topk > 0 and not fi_mean.empty:
    selected = list(fi_mean.head(_topk).index)
    selected = [f for f in selected if f in stable_feats]
    final_features = [f for f in features if f in selected]
    LOGGER.info(f"[Enhanced] Using top-K features with stability filter: K={len(final_features)}")
else:
    final_features = [f for f in features if f in stable_feats]

# Aggregate risk params across folds (median)
if len(fold_risk_params) and all(p.get('k') is not None for p in fold_risk_params):
    k_med = float(np.median([p['k'] for p in fold_risk_params]))
    band_med = float(np.median([p['band'] for p in fold_risk_params]))
    best_risk = {'k': k_med, 'band': band_med, 'score': float(np.nanmean(fold_sharpes))}
    LOGGER.info(f"[Enhanced] Aggregated risk params: k={k_med}, band={band_med}, Sharpe_mean={best_risk['score']:.4f}")
else:
    best_risk = {'k': 0.8, 'band': 0.0, 'score': None}

# Diagnostics on OOF allocations
if 'realized_returns' in train_fe.columns and 'alloc_oof' in train_fe.columns:
    tc = CONFIG.get('transaction_cost_bps', 0.0) / 10000.0
    alloc_change = pd.Series(train_fe['alloc_oof']).diff().abs().fillna(0.0)
    tc_series = tc * alloc_change
    strat_oof = (train_fe['alloc_oof'] * train_fe['realized_returns']) - tc_series
    sharpe_oof = annualized_sharpe(strat_oof.fillna(0.0), days_per_year=int(CONFIG.get('annualization', 252)))
    LOGGER.info(f"[Enhanced] OOF Sharpe (after TC): {sharpe_oof:.4f}")

# Train final models on full data
X_full = train_fe[final_features]
y_full = train_fe[TARGET]

avg_best_iter = int(np.clip(np.mean([m.best_iteration for m in models]) if len(models) else 1000, 200, 2000))
LOGGER.info(f'[Enhanced] Using avg_best_iter: {avg_best_iter}')

final_models = []
N_FINAL = int(CONFIG.get('final_ensemble_size', 3))
for i in range(N_FINAL):
    params = lgb_params.copy()
    params['seed'] = SEED + 17 * i
    params['bagging_seed'] = SEED + 17 * i + 1
    params['feature_fraction_seed'] = SEED + 17 * i + 2
    # Apply time-decay weights on full data as well (mild)
    w_full = make_time_decay_weights(len(X_full), int(CONFIG.get('time_decay_halflife_days', 0)))
    dtrain = lgb.Dataset(X_full, label=y_full, weight=w_full)
    try:
        bst = lgb.train(params, dtrain, num_boost_round=avg_best_iter)
    except Exception as e:
        if isinstance(e, Exception):
            params.pop('device_type', None)
            bst = lgb.train(params, dtrain, num_boost_round=avg_best_iter)
    final_models.append(bst)
    del dtrain

# Predict test with rank-based ensembling
X_test = test_fe[final_features]
preds_list = [bst.predict(X_test, num_iteration=avg_best_iter) for bst in final_models]

# Optional smoothing of test predictions before mapping
HALF = int(CONFIG.get('pred_smooth_halflife', 0))
if HALF and HALF > 0:
    def ewma1d(a, halflife):
        if len(a) == 0:
            return a
        lam = np.log(2.0) / max(1.0, float(halflife))
        out = np.zeros_like(a, dtype=float)
        alpha = 1.0 - np.exp(-lam)
        out[0] = a[0]
        for i in range(1, len(a)):
            out[i] = alpha * a[i] + (1.0 - alpha) * out[i-1]
        return out
    preds_list = [ewma1d(p, HALF) for p in preds_list]

# If stacking meta-learner was trained, include its full-train version
if bool(CONFIG.get('use_stacking_meta_learner', False)) and 'pred_meta' in train_fe.columns:
    try:
        from sklearn.linear_model import Ridge
        fam_preds_full = []
        fam_names_full = []
        # Build family predictions on full train for meta fit, then predict test
        # Refit simple Ridge on full data using same families available
        # Assemble matrix for train
        fam_preds_train = []
        if len(final_models):
            fam_preds_train.append(np.mean([m.predict(X_full, num_iteration=avg_best_iter) for m in final_models], axis=0))
            fam_preds_full.append(np.mean([m.predict(X_test, num_iteration=avg_best_iter) for m in final_models], axis=0))
            fam_names_full.append('lgb')
        if xgb_available and len(models_xgb):
            # Train a new xgb on full (already done below), so use preds_list seeds later
            pass
        if cat_available and len(models_cat):
            pass
        if fam_preds_full and fam_preds_train:
            M_tr = np.vstack(fam_preds_train).T
            y_tr = y_full.values if hasattr(y_full, 'values') else y_full
            meta_full = Ridge(alpha=1.0, fit_intercept=True)
            meta_full.fit(M_tr, y_tr)
            M_te = np.vstack(fam_preds_full).T
            preds_list.append(meta_full.predict(M_te))
            LOGGER.info('[Stacking] Added meta-learner predictions to ensemble')
    except Exception as e:
        LOGGER.warning(f'[Stacking] Meta-learner final prediction skipped: {e}')

# Add diversified final models if enabled (multi-seed bagging)
if xgb_available:
    try:
        dfull = xgb.DMatrix(X_full, label=y_full)
        dtest = xgb.DMatrix(X_test)
        n_xgb = int(CONFIG.get('final_ensemble_size_xgb', 1))
        for i in range(n_xgb):
            xgb_params = xgb_params_base.copy()
            xgb_params['seed'] = int(SEED + 101 * i)
            xgb_final = xgb.train(params=xgb_params, dtrain=dfull, num_boost_round=avg_best_iter, verbose_eval=False)
            nlim2 = getattr(xgb_final, 'best_ntree_limit', None)
            preds_list.append(xgb_final.predict(dtest, ntree_limit=nlim2) if nlim2 else xgb_final.predict(dtest))
    except Exception as e:
        LOGGER.warning(f'XGB final skipped: {e}')
    aggressive_gc('post-xgb-final')

if cat_available:
    try:
        n_cat = int(CONFIG.get('final_ensemble_size_cat', 1))
        for i in range(n_cat):
            cat_params = cat_params_base.copy()
            cat_params['random_seed'] = int(SEED + 151 * i)
            cat_final = CatBoostRegressor(**cat_params)
            cat_final.fit(X_full, y_full, verbose=False)
            preds_list.append(cat_final.predict(X_test))
    except Exception as e:
        LOGGER.warning(f'CatBoost final skipped: {e}')
    aggressive_gc('post-cat-final')

# Weighted rank-averaging based on validation Sharpe if available
weights = None
if str(CONFIG.get('ensemble_weighting', 'equal')).lower() == 'sharpe' and len(per_model_val_sharpes['lgb']) == len(splits):
    w_lgb = max(1e-6, float(np.nanmean(per_model_val_sharpes['lgb'])))
    w_xgb = max(1e-6, float(np.nanmean(per_model_val_sharpes['xgb']))) if xgb_available and len(per_model_val_sharpes['xgb']) else 0.0
    w_cat = max(1e-6, float(np.nanmean(per_model_val_sharpes['cat']))) if cat_available and len(per_model_val_sharpes['cat']) else 0.0
    # Each family weight is spread equally across its seeds
    weights = [w_lgb / max(len(final_models), 1.0)] * len(final_models)
    if xgb_available:
        n_xgb = int(CONFIG.get('final_ensemble_size_xgb', 1))
        weights.extend([w_xgb / max(n_xgb, 1.0)] * n_xgb)
    if cat_available:
        n_cat = int(CONFIG.get('final_ensemble_size_cat', 1))
        weights.extend([w_cat / max(n_cat, 1.0)] * n_cat)

if weights is None:
    weights = [1.0] * len(preds_list)

# Non-negative least squares (NNLS) on ranks as a fallback to Sharpe-weights if desired
# We prefer stability; keep default to Sharpe weights unless CONFIG requests nnls
if str(CONFIG.get('ensemble_weighting', 'sharpe')).lower() == 'nnls':
    try:
        from scipy.optimize import nnls
        ranked = [rank_normalize(p) for p in preds_list]
        ranked_stack = np.stack(ranked, axis=1)
        # target vector: mean of ranked OOF predictions proxy (use train ranks if available)
        # use equal target to encourage smooth combination
        y_target = np.mean(ranked_stack, axis=1)
        w_nnls, _ = nnls(ranked_stack, y_target)
        if np.isfinite(w_nnls).all() and w_nnls.sum() > 0:
            weights_arr = w_nnls / w_nnls.sum()
        else:
            weights_arr = np.asarray(weights, dtype=float) / (np.sum(weights) + 1e-12)
    except Exception:
        weights_arr = np.asarray(weights, dtype=float) / (np.sum(weights) + 1e-12)
else:
    weights_arr = np.asarray(weights, dtype=float) / (np.sum(weights) + 1e-12)

# Align weights to number of prediction columns
weights_arr = np.asarray(weights_arr, dtype=float)
if weights_arr.shape[0] != len(preds_list):
    LOGGER.warning(f'Adjusting ensemble weights from {weights_arr.shape[0]} to {len(preds_list)} to match models')
    if weights_arr.shape[0] > 0:
        weights_arr = np.resize(weights_arr, len(preds_list))
        weights_arr = weights_arr / (weights_arr.sum() + 1e-12)
    else:
        weights_arr = np.ones(len(preds_list), dtype=float) / max(len(preds_list), 1)

ranked = [rank_normalize(p) for p in preds_list]
ranked_stack = np.stack(ranked, axis=1)
ranked_preds = np.dot(ranked_stack, weights_arr)

preds_test = ranked_preds - 0.5

LOGGER.info('[Enhanced] Built enhanced pipeline with nested calibration, diversified models, and weighted ensemble predictions.')


In [None]:
# === Date-based CV helper and validation checks ===
from typing import Iterator, Tuple

def assert_required_columns(df: pd.DataFrame, cols: list):
    missing = [c for c in cols if c not in df.columns]
    if missing:
        raise ValueError(f"Missing required columns: {missing}")

# Ensure time order and integer monotonicity for date_id if present
if 'date_id' in train.columns:
    assert train['date_id'].is_monotonic_increasing, 'train.date_id must be sorted ascending'
if 'date_id' in test.columns:
    assert test['date_id'].is_monotonic_increasing, 'test.date_id must be sorted ascending'

assert_required_columns(train_fe, features + [TARGET])

# Optionally allow date-aware split sizes (fixed by unique dates rather than rows)
def walk_forward_by_dates(df: pd.DataFrame,
                          date_col: str,
                          n_splits: int = 5,
                          min_train_days: int = 252,
                          val_days: int = 120) -> Iterator[Tuple[np.ndarray, np.ndarray]]:
    dates = df[date_col].values
    unique_dates = np.unique(dates)
    if len(unique_dates) < (min_train_days + val_days + 1):
        # fallback to row-based splits already built
        for s in splits:
            yield s
        return
    anchors = np.linspace(min_train_days, len(unique_dates) - val_days, n_splits, dtype=int)
    for a in anchors:
        train_last_date = unique_dates[a - 1]
        val_last_date = unique_dates[min(a + val_days - 1, len(unique_dates) - 1)]
        tr_idx = np.where(dates <= train_last_date)[0]
        val_idx = np.where((dates > train_last_date) & (dates <= val_last_date))[0]
        if len(val_idx) > 0:
            yield tr_idx, val_idx

# If date_id exists, we can produce alternative date-based splits for diagnostics
if 'date_id' in train_fe.columns:
    date_splits = list(walk_forward_by_dates(train_fe, 'date_id'))
    LOGGER.info(f"Date-based splits prepared: {len(date_splits)} folds")
else:
    date_splits = None


In [None]:
# === Calibrated train diagnostics using aggregated risk params (no duplicate utils) ===

if 'pred' in train_fe.columns and realized_returns is not None:
    # Use aggregated best_risk from enhanced CV if available; else fallback
    k_use = best_risk['k'] if 'best_risk' in globals() and best_risk.get('k') is not None else 0.8
    band_use = best_risk['band'] if 'best_risk' in globals() and best_risk.get('band') is not None else 0.0

    pred_sigma = ewma_std(train_fe['pred'], int(CONFIG['oof_pred_vol_halflife']))
    z = train_fe['pred'] / (pred_sigma.replace(0, np.nan).fillna(1e-6))
    z_band = z.where(z.abs() >= band_use, 0.0)
    alloc_calib = (1.0 + k_use * z_band).clip(0.0, 2.0)

    mkt_vol = ewma_std(train_fe['realized_returns'], int(CONFIG['realized_vol_halflife']))
    strat_vol = ewma_std(alloc_calib * train_fe['realized_returns'], int(CONFIG['realized_vol_halflife']))
    cap = (float(CONFIG['vol_cap_multiple']) * mkt_vol) / (strat_vol.replace(0, np.nan).fillna(1e-6))
    cap = cap.clip(0.0, 2.0)
    train_fe['alloc_calibrated'] = (1.0 + (alloc_calib - 1.0) * cap).clip(0.0, 2.0)

    # Apply transaction costs on calibrated series
    tc = CONFIG.get('transaction_cost_bps', 0.0) / 10000.0
    alloc_change_cal = train_fe['alloc_calibrated'].diff().abs().fillna(0.0)
    tc_cal = tc * alloc_change_cal

    # Diagnostics
    strat_cal = (train_fe['alloc_calibrated'] * train_fe['realized_returns']) - tc_cal
    cum_cal = (1.0 + strat_cal).cumprod()
    cum_mkt = (1.0 + train_fe['realized_returns']).cumprod()

    plt.figure(figsize=(10, 5))
    x_axis = train_fe['date_id'] if 'date_id' in train_fe.columns else np.arange(len(train_fe))
    plt.plot(x_axis, cum_mkt, label='Market (buy-hold)')
    plt.plot(x_axis, cum_cal, label='Strategy (calibrated)')
    plt.legend(); plt.title('Cumulative performance — calibrated (enhanced)'); plt.tight_layout()
    plt.savefig(os.path.join(CONFIG['artifacts_dir'], 'backtest_cumulative_calibrated_enhanced.png'))
    plt.close()

    sharpe_cal = annualized_sharpe(strat_cal.fillna(0.0), days_per_year=int(CONFIG.get('annualization', 252)))
    LOGGER.info(f"[Enhanced] Calibrated train Sharpe (after TC): {sharpe_cal:.4f}")
else:
    LOGGER.warning('Skipping calibrated diagnostics: missing pred or realized_returns.')



In [None]:
# === Apply calibrated risk to TEST predictions (if available) and save artifacts ===

# If we calibrated best_risk, we also map test predictions with the same parameters.
# We reuse test predictions already computed (ranked_preds -> preds_test) and
# estimate sigma from recent train OOF prediction EWMA.

artifacts = {}

if 'pred' in train_fe.columns:
    pred_sigma_train = ewma_std(train_fe['pred'], int(CONFIG['oof_pred_vol_halflife']))
    sigma_est = float(pred_sigma_train.iloc[-50:].median()) if pred_sigma_train.notna().any() else float(train_fe['pred'].std())
else:
    sigma_est = float(train_fe['pred'].std()) if 'pred' in train_fe else 1.0

# Prefer aggregated params if present
k_use = best_risk['k'] if 'best_risk' in globals() and best_risk.get('k') is not None else 0.8
band_use = best_risk['band'] if 'best_risk' in globals() and best_risk.get('band') is not None else 0.0

# Center preds_test (already centered earlier). Apply band and scaling.
z_test = preds_test / (sigma_est + 1e-9)
z_test_band = np.where(np.abs(z_test) >= band_use, z_test, 0.0)
alloc_test_cal = 1.0 + k_use * z_test_band
alloc_test_cal = np.clip(alloc_test_cal, 0.0, 2.0)

# Optional global downscale using recent historical cap (after costs)
if realized_returns is not None and 'alloc_calibrated' in train_fe.columns:
    hist_window = min(len(train_fe), 180)
    tc = CONFIG.get('transaction_cost_bps', 0.0) / 10000.0
    alloc_change_hist = train_fe['alloc_calibrated'].iloc[-hist_window:].diff().abs().fillna(0.0)
    tc_hist = tc * alloc_change_hist
    strat_hist = (train_fe['alloc_calibrated'].iloc[-hist_window:] * train_fe['realized_returns'].iloc[-hist_window:]) - tc_hist
    hist_strat_vol = float(ewma_std(strat_hist, int(CONFIG['realized_vol_halflife'])).iloc[-1]) if len(strat_hist) else float(strat_hist.std())
    hist_mkt_vol   = float(ewma_std(train_fe['realized_returns'].iloc[-hist_window:], int(CONFIG['realized_vol_halflife'])).iloc[-1])
    scale_global = min(1.0, (float(CONFIG['vol_cap_multiple']) * hist_mkt_vol) / (hist_strat_vol + 1e-9)) if hist_strat_vol > 0 else 1.0
    alloc_test_cal = 1.0 + (alloc_test_cal - 1.0) * scale_global
    alloc_test_cal = np.clip(alloc_test_cal, 0.0, 2.0)

# Final single submission write (calibrated allocations)
submission_cal = pd.DataFrame({
    'date_id': test_fe['date_id'].values if 'date_id' in test_fe.columns else np.arange(len(test_fe)),
    'allocation': alloc_test_cal.astype(float)
})
submission_cal.to_csv('submission.csv', index=False)
LOGGER.info(f"Wrote submission.csv with {submission_cal.shape[0]} rows (calibrated)")

# Save artifacts for reproducibility
ensure_dir(CONFIG['artifacts_dir'])

# CV metrics (+ Sharpe)
cv_metrics = {
    'cv_rmse_mean': float(np.mean(val_scores)) if len(val_scores) else None,
    'cv_rmse_std': float(np.std(val_scores)) if len(val_scores) else None,
    'cv_sharpe_mean': float(np.nanmean(fold_sharpes)) if 'fold_sharpes' in globals() and len(fold_sharpes) else None,
    'cv_sharpe_median': float(np.nanmedian(fold_sharpes)) if 'fold_sharpes' in globals() and len(fold_sharpes) else None,
    'n_models_lgb': int(len(models)),
    'n_models_xgb': int(len(models_xgb)) if 'models_xgb' in globals() else 0,
    'n_models_cat': int(len(models_cat)) if 'models_cat' in globals() else 0,
    'ensemble_weighting': CONFIG.get('ensemble_weighting', 'equal'),
}
with open(os.path.join(CONFIG['artifacts_dir'], 'cv_metrics.json'), 'w') as f:
    json.dump(cv_metrics, f, indent=2)

# Fold risk params
if 'fold_risk_params' in globals() and len(fold_risk_params):
    with open(os.path.join(CONFIG['artifacts_dir'], 'fold_risk_params.json'), 'w') as f:
        json.dump(fold_risk_params, f, indent=2)

# OOF predictions
if 'pred' in train_fe.columns:
    cols = ['date_id', TARGET, 'pred'] if 'date_id' in train_fe.columns else [TARGET, 'pred']
    train_fe[cols].to_csv(os.path.join(CONFIG['artifacts_dir'], 'oof_predictions.csv'), index=False)

# Feature importances
if 'fi_mean' in globals() and not fi_mean.empty:
    fi_mean.reset_index().rename(columns={'index':'feature','importance':'importance_mean'}) \
        .to_csv(os.path.join(CONFIG['artifacts_dir'], 'feature_importances.csv'), index=False)

# Save features list
with open(os.path.join(CONFIG['artifacts_dir'], 'features.json'), 'w') as f:
    json.dump(sorted(list(features)), f, indent=2)

# Save final models
try:
    import joblib
    joblib.dump(final_models, os.path.join(CONFIG['artifacts_dir'], 'final_lgb_models.pkl'))
    LOGGER.info('Saved final_lgb_models.pkl to artifacts/')
except Exception as e:
    LOGGER.warning(f'Model save skipped: {e}')

# Metadata (include calibrated risk if available)
# Ensemble weights snapshot (if used)
try:
    ens_weights = None
    if 'weights_arr' in globals():
        ens_weights = list(map(float, weights_arr.tolist()))
except Exception:
    ens_weights = None

metadata = {
    'config': CONFIG,
    'avg_best_iter': int(avg_best_iter),
    'seed': int(SEED),
    'best_risk': best_risk if 'best_risk' in globals() else None,
    'ensemble_weights': ens_weights,
}
with open(os.path.join(CONFIG['artifacts_dir'], 'metadata.json'), 'w') as f:
    json.dump(metadata, f, indent=2)

# Total runtime log
elapsed = time.time() - start_time
LOGGER.info(f'Total runtime: {elapsed/60:.1f} min')

LOGGER.info('Artifacts saved to artifacts/.')
