# (Legacy) xgboost_fixed.ipynb

XGBoost baseline 的固定窗口 CV 版本。若要纳入报告对比，建议后续也迁移为脚本化/模块化输出到 `outputs/reports/`。



In [None]:
# %% [markdown]
# # Fixed-window time-series modeling with XGBoost
#
# This notebook implements a strict fixed-window time-series cross-validation (CV):
# - Each fold trains only on the most recent `train_window` observations (e.g., 20 trading days),
# - Validates on the next `val_size` days (for early stopping),
# - And tests on the subsequent `test_size` days.
#
# Compared to expanding windows, this enforces the assumption that the target depends only on the most recent window of history.
# Feature engineering is also constrained to use at most 20-day lookback windows (no long-memory EMAs).
#
# Assumptions:
# - We predict next-day return from the information available at the close of day t.
# - If you execute at the next day open/close, ensure your operational assumptions match this data availability.


# %%
import warnings
warnings.filterwarnings("ignore")

import numpy as np
import pandas as pd
from datetime import datetime, timedelta
from typing import Dict, List, Tuple, Optional
from IPython.display import display

import matplotlib.pyplot as plt
import seaborn as sns

plt.style.use('seaborn-v0_8-whitegrid') if 'seaborn-v0_8-whitegrid' in plt.style.available else plt.style.use('ggplot')
sns.set_palette("Set2")

import yfinance as yf

from sklearn.metrics import (
    accuracy_score, balanced_accuracy_score, roc_auc_score,
    mean_absolute_error, mean_squared_error, r2_score
)

# Assume XGBoost is available (no fallback)
from xgboost import XGBClassifier, XGBRegressor

SEED = 42
np.random.seed(SEED)
print("XGBoost available: True")


# %% [markdown]
# ## Data: 2800.HK, daily, 3 years
# Pull adjusted OHLCV data using `yfinance`. If `period` fails, fallback to explicit start/end dates.


# %%
def fetch_prices(ticker: str = "2800.HK", period: str = "3y", interval: str = "1d") -> pd.DataFrame:
    """Fetch adjusted OHLCV data. Index is timezone-naive DateTimeIndex."""
    tk = yf.Ticker(ticker)
    df = tk.history(period=period, interval=interval, auto_adjust=True)
    if df is None or df.empty:
        end = pd.Timestamp.today().normalize()
        start = end - pd.Timedelta(days=3*370)
        df = tk.history(start=start, end=end, interval=interval, auto_adjust=True)
    if df is None or df.empty:
        raise RuntimeError(f"Cannot fetch data for {ticker}")
    df.index = pd.to_datetime(df.index).tz_localize(None)
    return df[["Open", "High", "Low", "Close", "Volume"]].copy()

ticker = "2800.HK"
df_raw = fetch_prices(ticker=ticker, period="3y", interval="1d")
print(f"Data range: {df_raw.index.min().date()} to {df_raw.index.max().date()}")
print(f"Total samples: {len(df_raw)}")
df_raw.tail()


# %% [markdown]
# ## Feature engineering (max 20-day lookback)
# - Use only past information up to and including time t (no leakage).
# - Constrain all rolling windows to ≤ 20 trading days to match the "depends only on last 20 days" assumption.
# - Labels:
#   - Regression: `next_return` = log return from t to t+1 (set `use_log_return=True`).
#   - Classification: `target_up` = 1 if `next_return` > 0 else 0.


# %%
def calculate_rsi_rolling(series: pd.Series, period: int = 14) -> pd.Series:
    """RSI computed with rolling simple means to ensure finite memory.
    Uses only the last `period` observations.
    """
    delta = series.diff()
    up = delta.clip(lower=0)
    down = -delta.clip(upper=0)
    roll_up = up.rolling(window=period, min_periods=period).mean()
    roll_down = down.rolling(window=period, min_periods=period).mean().replace(0, np.nan)
    rs = roll_up / roll_down
    rsi = 100 - (100 / (1 + rs))
    return rsi

def calculate_macd_sma(close: pd.Series, fast: int = 8, slow: int = 16, signal: int = 6):
    """MACD variant using simple moving averages to enforce finite lookback windows (≤ 20).
    - macd_line = SMA_fast - SMA_slow
    - signal_line = SMA of macd_line (window = signal)
    - histogram = macd_line - signal_line
    """
    sma_fast = close.rolling(window=fast, min_periods=fast).mean()
    sma_slow = close.rolling(window=slow, min_periods=slow).mean()
    macd_line = sma_fast - sma_slow
    signal_line = macd_line.rolling(window=signal, min_periods=signal).mean()
    histogram = macd_line - signal_line
    return macd_line, signal_line, histogram

def build_features_and_labels(df: pd.DataFrame,
                              horizon: int = 1,
                              use_log_return: bool = True,
                              max_lookback: int = 20) -> Tuple[pd.DataFrame, List[str]]:
    """Build features using only up to `max_lookback` days of history.
    All features are computed using information available by the end of day t,
    and the label predicts the return from t to t+horizon.
    """
    close = df["Close"].copy()
    vol = df["Volume"].copy() if "Volume" in df.columns else None
    features = pd.DataFrame(index=df.index)

    # Returns up to t (these use close_t and close_{t-1})
    features["ret_1d"] = close.pct_change(1)
    features["ret_2d"] = close.pct_change(2)
    features["ret_5d"] = close.pct_change(5)
    features["ret_10d"] = close.pct_change(10)

    # Realized volatility based on past returns (bounded by 20 days)
    features["vol_5d"] = features["ret_1d"].rolling(5, min_periods=5).std()
    features["vol_10d"] = features["ret_1d"].rolling(10, min_periods=10).std()
    features["vol_20d"] = features["ret_1d"].rolling(20, min_periods=20).std()

    # Moving averages and relative position (all computed with ≤ 20-day windows)
    sma_5 = close.rolling(5, min_periods=5).mean()
    sma_10 = close.rolling(10, min_periods=10).mean()
    sma_20 = close.rolling(20, min_periods=20).mean()

    # Relative distance of today's close to MAs
    features["close_to_sma5"] = close / sma_5 - 1
    features["close_to_sma10"] = close / sma_10 - 1
    features["close_to_sma20"] = close / sma_20 - 1
    features["sma5_sma10"] = sma_5 / sma_10 - 1
    features["sma10_sma20"] = sma_10 / sma_20 - 1

    # Technical indicators with ≤ 20-day lookback
    features["rsi_14"] = calculate_rsi_rolling(close, 14)
    macd_line, signal_line, histogram = calculate_macd_sma(close, fast=8, slow=16, signal=6)
    features["macd"] = macd_line
    features["macd_signal"] = signal_line
    features["macd_hist"] = histogram

    # Bollinger bands (20-day)
    bb_period = min(20, max_lookback)
    bb_std = close.rolling(bb_period, min_periods=bb_period).std()
    bb_mean = close.rolling(bb_period, min_periods=bb_period).mean()
    features["bb_upper"] = (bb_mean + 2 * bb_std - close) / close
    features["bb_lower"] = (close - (bb_mean - 2 * bb_std)) / close

    # Volume features (bounded to 20-day windows)
    if vol is not None:
        features["volume_ratio"] = vol / vol.rolling(20, min_periods=20).mean()
        features["volume_change"] = vol.pct_change()

    # Calendar features (known at time t)
    features["day_of_week"] = df.index.dayofweek
    features["month"] = df.index.month
    features["quarter"] = df.index.quarter

    # Lagged returns up to 20 days back
    for lag in [1, 2, 3, 5, 10, 15, 20]:
        features[f"ret_lag_{lag}"] = features["ret_1d"].shift(lag)

    # Targets: next-period return and direction
    if use_log_return:
        next_ret = np.log(close.shift(-horizon) / close)
    else:
        next_ret = close.pct_change(horizon).shift(-horizon)
    target_up = (next_ret > 0).astype(int)

    # Clean and merge
    features = features.replace([np.inf, -np.inf], np.nan)
    data = features.copy()
    data["next_return"] = next_ret
    data["target_up"] = target_up
    data = data.dropna().copy()

    feature_cols = [c for c in data.columns if c not in ["next_return", "target_up"]]
    return data, feature_cols

# Build features & labels (log-returns by default)
feat_df, feature_cols = build_features_and_labels(df_raw, horizon=1, use_log_return=True, max_lookback=20)
print(f"Samples after cleaning: {len(feat_df)}, Features: {len(feature_cols)}")
feat_df.tail()


# %% [markdown]
# ## Fixed-window time-series CV (rolling window)
# - For each fold:
#   - Train on the most recent `train_window` samples ending at `test_start - val_size`.
#   - Validate on the next `val_size` samples ending at `test_start` (for early stopping).
#   - Test on the next `test_size` samples.
# - Evaluate both classification (direction) and regression (next_return).


# %%
def train_models(X_train: np.ndarray, y_cls_train: np.ndarray, y_reg_train: np.ndarray,
                 X_val: Optional[np.ndarray] = None,
                 y_cls_val: Optional[np.ndarray] = None,
                 y_reg_val: Optional[np.ndarray] = None):
    """Train classifier and regressor.
    Uses early stopping only if a validation set is provided and non-empty.
    """
    use_es = X_val is not None and y_cls_val is not None and len(X_val) > 0


    clf_kwargs = dict(
        n_estimators=500,
        learning_rate=0.03,
        max_depth=4,
        min_child_weight=3,
        subsample=0.8,
        colsample_bytree=0.8,
        reg_lambda=1.0,
        reg_alpha=0.5,
        random_state=SEED,
        eval_metric="logloss",
        verbosity=0,
        n_jobs=-1,
        tree_method="hist"
    )
    if use_es:
        clf_kwargs["early_stopping_rounds"] = 30
    clf = XGBClassifier(**clf_kwargs)

    if use_es:
        clf.fit(X_train, y_cls_train, eval_set=[(X_val, y_cls_val)], verbose=False)
    else:
        clf.fit(X_train, y_cls_train, verbose=False)

    reg_kwargs = dict(
        n_estimators=500,
        learning_rate=0.03,
        max_depth=4,
        min_child_weight=3,
        subsample=0.8,
        colsample_bytree=0.8,
        reg_lambda=1.0,
        reg_alpha=0.5,
        random_state=SEED,
        eval_metric="rmse",
        verbosity=0,
        n_jobs=-1,
        tree_method="hist"
    )
    if use_es:
        reg_kwargs["early_stopping_rounds"] = 30
    reg = XGBRegressor(**reg_kwargs)

    if use_es:
        reg.fit(X_train, y_reg_train, eval_set=[(X_val, y_reg_val)], verbose=False)
    else:
        reg.fit(X_train, y_reg_train, verbose=False)

    return clf, reg

def eval_fold(clf, reg, X_test: np.ndarray, y_cls_test: np.ndarray, y_reg_test: np.ndarray) -> Dict[str, float]:
    """Evaluate classification and regression on one fold."""
    proba = clf.predict_proba(X_test)[:, 1]
    y_pred_cls = (proba >= 0.5).astype(int)

    acc = accuracy_score(y_cls_test, y_pred_cls)
    bacc = balanced_accuracy_score(y_cls_test, y_pred_cls)
    try:
        auc = roc_auc_score(y_cls_test, proba)
    except Exception:
        auc = np.nan

    y_pred_reg = reg.predict(X_test)
    reg_mae = mean_absolute_error(y_reg_test, y_pred_reg)
    reg_rmse = np.sqrt(mean_squared_error(y_reg_test, y_pred_reg))
    reg_r2 = r2_score(y_reg_test, y_pred_reg)
    dir_acc = (np.sign(y_reg_test) == np.sign(y_pred_reg)).mean()

    return {
        "cls_acc": acc, "cls_bacc": bacc, "cls_auc": auc,
        "reg_mae": reg_mae, "reg_rmse": reg_rmse, "reg_r2": reg_r2,
        "reg_dir_acc": dir_acc
    }

def fixed_window_cv(feat_df: pd.DataFrame, feature_cols: List[str],
                    test_size: int = 14,
                    train_window: int = 20,
                    val_size: int = 5,
                    step_size: Optional[int] = None) -> pd.DataFrame:
    """Fixed-window CV: roll a constant-length training window forward through time.
    - For each fold:
      train = [test_start - val_size - train_window, test_start - val_size)
      val   = [test_start - val_size, test_start)
      test  = [test_start, test_start + test_size)
    """
    if step_size is None:
        step_size = test_size  # non-overlapping test windows by default

    metrics = []
    X_all = feat_df[feature_cols].astype(float).values
    y_cls_all = feat_df["target_up"].astype(int).values
    y_reg_all = feat_df["next_return"].astype(float).values

    N = len(feat_df)
    start_index = train_window + val_size
    fold_id = 0

    for test_start in range(start_index, N - test_size + 1, step_size):
        train_end = test_start
        val_start = train_end - val_size
        train_start = val_start - train_window
        test_end = test_start + test_size

        if train_start < 0:
            continue  # not enough history

        X_train = X_all[train_start: val_start]
        y_cls_train = y_cls_all[train_start: val_start]
        y_reg_train = y_reg_all[train_start: val_start]

        X_val = X_all[val_start: train_end] if val_size > 0 else None
        y_cls_val = y_cls_all[val_start: train_end] if val_size > 0 else None
        y_reg_val = y_reg_all[val_start: train_end] if val_size > 0 else None

        X_test = X_all[test_start: test_end]
        y_cls_test = y_cls_all[test_start: test_end]
        y_reg_test = y_reg_all[test_start: test_end]

        clf, reg = train_models(X_train, y_cls_train, y_reg_train, X_val, y_cls_val, y_reg_val)
        fold_metrics = eval_fold(clf, reg, X_test, y_cls_test, y_reg_test)
        fold_metrics["fold"] = fold_id
        fold_metrics["train_size"] = len(X_train)
        fold_metrics["val_size"] = len(X_val) if X_val is not None else 0
        fold_metrics["test_size"] = len(X_test)
        fold_metrics["test_start"] = feat_df.index[test_start]
        fold_metrics["test_end"] = feat_df.index[test_end - 1]
        metrics.append(fold_metrics)
        fold_id += 1

    return pd.DataFrame(metrics)

# Run fixed-window CV (default: train_window=20, val_size=5, test_size=14)
cv_results = fixed_window_cv(feat_df, feature_cols,
                             test_size=14,
                             train_window=30,
                             val_size=10,
                             step_size=14)

print(f"Folds: {len(cv_results)}")
cv_results[["fold","test_start","test_end","train_size","val_size","test_size"]]


# %%
def summarize_metrics(df: pd.DataFrame, cols: List[str]) -> pd.DataFrame:
    return df[cols].agg(['mean','std','min','max'])

print("\n=== Per-fold metrics (head) ===")
display(cv_results.head())

print("\n=== Summary (Classification) ===")
display(summarize_metrics(cv_results, ["cls_acc", "cls_bacc", "cls_auc"]))

print("\n=== Summary (Regression on next_return) ===")
display(summarize_metrics(cv_results, ["reg_mae", "reg_rmse", "reg_r2", "reg_dir_acc"]))

# Naive baselines over the full sample
naive_mae = feat_df["next_return"].abs().mean()              # Predict 0 for regression
naive_dir_acc = max((feat_df["next_return"] > 0).mean(),     # Predict majority sign for direction
                    (feat_df["next_return"] <= 0).mean())

print("\n=== Naive baselines over full sample ===")
print(f"Regression MAE baseline (predict 0): {naive_mae:.6f}")
print(f"Direction baseline (majority sign): {naive_dir_acc:.3f}")


# %% [markdown]
# ## Plot predictions for one fold
# - Subplot 1: `next_return` (actual vs predicted) within the test window.
# - Subplot 2: Reconstructed price paths (actual vs predicted) starting from the previous close.
# - Set `assume_log_return=True` if you built labels using log returns (default here).


# %%
def pick_fold_index(cv_df: pd.DataFrame, mode: str = 'last') -> int:
    if len(cv_df) == 0:
        raise ValueError("cv_results is empty. Run fixed-window CV first.")
    if mode == 'last':
        return int(cv_df.iloc[-1]['fold'])
    elif mode == 'best_auc':
        idx = cv_df['cls_auc'].idxmax()
        return int(cv_df.loc[idx, 'fold'])
    elif mode == 'best_dir':
        idx = cv_df['reg_dir_acc'].idxmax()
        return int(cv_df.loc[idx, 'fold'])
    else:
        raise ValueError("mode must be 'last' | 'best_auc' | 'best_dir'")

def reconstruct_price_path(start_price: float, returns: np.ndarray, assume_log_return: bool = True) -> np.ndarray:
    if assume_log_return:
        path = start_price * np.exp(np.cumsum(returns))
    else:
        path = start_price * np.cumprod(1.0 + returns)
    return path

def plot_fold_predictions_fixed(feat_df: pd.DataFrame, feature_cols: List[str], df_raw: pd.DataFrame,
                                cv_results: pd.DataFrame,
                                fold_to_plot: str = 'last', assume_log_return: bool = True):
    # 1) Choose fold
    fold_idx = pick_fold_index(cv_results, mode=fold_to_plot)
    row = cv_results[cv_results['fold'] == fold_idx].iloc[0]
    test_start_date = row['test_start']
    test_end_date = row['test_end']

    # 2) Map dates back to positional indices
    all_dates = feat_df.index
    test_start = all_dates.get_loc(test_start_date)
    test_end = all_dates.get_loc(test_end_date) + 1  # right-open

    # Reconstruct train/val positions from sizes stored in cv_results
    train_size = int(row['train_size'])
    val_size = int(row['val_size'])
    train_end = test_start
    val_start = train_end - val_size
    train_start = val_start - train_size

    X_all = feat_df[feature_cols].astype(float).values
    y_cls_all = feat_df["target_up"].astype(int).values
    y_reg_all = feat_df["next_return"].astype(float).values

    X_train = X_all[train_start: val_start]
    y_cls_train = y_cls_all[train_start: val_start]
    y_reg_train = y_reg_all[train_start: val_start]

    X_val = X_all[val_start: train_end] if val_size > 0 else None
    y_cls_val = y_cls_all[val_start: train_end] if val_size > 0 else None
    y_reg_val = y_reg_all[val_start: train_end] if val_size > 0 else None

    X_test = X_all[test_start: test_end]
    y_cls_test = y_cls_all[test_start: test_end]
    y_reg_test = y_reg_all[test_start: test_end]
    test_dates = all_dates[test_start: test_end]

    # 3) Train and predict
    clf, reg = train_models(X_train, y_cls_train, y_reg_train, X_val, y_cls_val, y_reg_val)
    proba = clf.predict_proba(X_test)[:, 1]
    y_pred_cls = (proba >= 0.5).astype(int)
    y_pred_reg = reg.predict(X_test)

    # 4) Plot returns and reconstructed price paths
    fig, axes = plt.subplots(2, 1, figsize=(12, 8), sharex=True)
    ax1, ax2 = axes

    ax1.plot(test_dates, y_reg_test, label='Actual next_return', color='tab:blue')
    ax1.plot(test_dates, y_pred_reg, label='Predicted next_return', color='tab:orange', alpha=0.8)
    ax1.axhline(0, color='gray', linestyle='--', alpha=0.5)
    ax1.set_title(f'Fold {fold_idx}: next_return - Actual vs Predicted')
    ax1.set_ylabel('Return')
    ax1.legend(loc='best')
    ax1.grid(True, alpha=0.3)

    # Reconstruct price path from previous close (aligned to feat_df dates)
    try:
        raw_close_aligned = df_raw.loc[feat_df.index, 'Close']
        price_start_idx = test_start - 1
        if price_start_idx >= 0:
            start_close = float(raw_close_aligned.iloc[price_start_idx])
            actual_price_path = reconstruct_price_path(start_close, y_reg_test, assume_log_return)
            pred_price_path = reconstruct_price_path(start_close, y_pred_reg, assume_log_return)
            price_index = [all_dates[price_start_idx]] + list(test_dates)
            ax2.plot(price_index, [start_close] + list(actual_price_path), label='Actual price (reconstructed)', color='tab:green')
            ax2.plot(price_index, [start_close] + list(pred_price_path), label='Predicted price (reconstructed)', color='tab:red', alpha=0.8)
            ax2.set_title(f'Fold {fold_idx}: Price path (from previous close)')
            ax2.set_ylabel('Price')
            ax2.legend(loc='best')
            ax2.grid(True, alpha=0.3)
        else:
            ax2.set_visible(False)
    except Exception:
        ax2.set_visible(False)

    plt.xlabel('Date')
    plt.tight_layout()
    plt.show()

    # 5) Print fold metrics
    dir_acc = (np.sign(y_reg_test) == np.sign(y_pred_reg)).mean()
    print(f"Fold {fold_idx} metrics:")
    print(f"  Classification - Acc: {accuracy_score(y_cls_test, y_pred_cls):.3f}, "
          f"BalancedAcc: {balanced_accuracy_score(y_cls_test, y_pred_cls):.3f}, "
          f"AUC: {roc_auc_score(y_cls_test, proba):.3f}")
    print(f"  Regression - MAE: {mean_absolute_error(y_reg_test, y_pred_reg):.6f}, "
          f"R2: {r2_score(y_reg_test, y_pred_reg):.6f}, "
          f"DirAcc: {dir_acc:.3f}")

# Plot last fold by default; use 'best_auc' or 'best_dir' to choose alternative folds
plot_fold_predictions_fixed(
    feat_df=feat_df,
    feature_cols=feature_cols,
    df_raw=df_raw,
    cv_results=cv_results,
    fold_to_plot='last',   # 'last' | 'best_auc' | 'best_dir'
    assume_log_return=True # Set False if you used simple returns
)


# %% [markdown]
# ## Out-of-fold (OOF) classification using fixed-window CV
# Generate OOF probabilities for the classification task across all folds and compute global metrics.


# %%
def oof_fixed_window(feat_df: pd.DataFrame, feature_cols: List[str],
                     test_size: int = 7, train_window: int = 20, val_size: int = 5, step_size: Optional[int] = None):
    if step_size is None:
        step_size = test_size
    X_all = feat_df[feature_cols].astype(float).values
    y_cls_all = feat_df["target_up"].astype(int).values
    N = len(feat_df)

    oof_true, oof_proba, oof_pred = [], [], []

    start_index = train_window + val_size
    for test_start in range(start_index, N - test_size + 1, step_size):
        train_end = test_start
        val_start = train_end - val_size
        train_start = val_start - train_window
        test_end = test_start + test_size
        if train_start < 0:
            continue

        X_train = X_all[train_start: val_start]
        y_train_cls = y_cls_all[train_start: val_start]
        X_val = X_all[val_start: train_end] if val_size > 0 else None
        y_val_cls = y_cls_all[val_start: train_end] if val_size > 0 else None

        # Train classifier only (regressor unused here)
        clf, _ = train_models(X_train, y_train_cls, y_train_cls, X_val, y_val_cls, y_val_cls)
        proba = clf.predict_proba(X_all[test_start: test_end])[:, 1]
        pred = (proba >= 0.5).astype(int)

        oof_true.append(y_cls_all[test_start: test_end])
        oof_proba.append(proba)
        oof_pred.append(pred)

    y_true = np.concatenate(oof_true)
    y_proba = np.concatenate(oof_proba)
    y_pred = np.concatenate(oof_pred)
    return y_true, y_proba, y_pred

y_true, y_proba, y_pred = oof_fixed_window(
    feat_df, feature_cols,
    test_size=7,
    train_window=20,
    val_size=5,
    step_size=7
)

acc = accuracy_score(y_true, y_pred)
bacc = balanced_accuracy_score(y_true, y_pred)
try:
    auc = roc_auc_score(y_true, y_proba)
except Exception:
    auc = np.nan

print(f"OOF Accuracy={acc:.3f}, BalancedAcc={bacc:.3f}, AUC={auc:.3f}, N={len(y_true)}")

# Approximate 95% CI for accuracy (normal approximation)
from math import sqrt
n = len(y_true)
phat = acc
z = 1.96
half = z*np.sqrt(phat*(1-phat)/n)
print(f"Acc 95% CI (approx): [{phat - half:.3f}, {phat + half:.3f}]")