In [None]:
"""
基於波動率指數預測美國股市走向
根據論文：A comparison of machine learning methods for predicting
the direction of the US stock market on the basis of volatility indices

此程式碼可在 Google Colab 上運行
"""

# ============================================================================
# 第一部分：安裝必要套件與導入函式庫
# ============================================================================

# 在 Colab 中執行以下命令安裝 yfinance（如果尚未安裝）
# !pip install yfinance scikit-learn pandas numpy matplotlib

import yfinance as yf
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
import warnings
warnings.filterwarnings('ignore')

from sklearn.preprocessing import StandardScaler
from sklearn.linear_model import Lasso, LassoCV
from sklearn.ensemble import RandomForestClassifier, BaggingClassifier
from sklearn.tree import DecisionTreeClassifier
from sklearn.metrics import accuracy_score, roc_auc_score, f1_score, confusion_matrix, roc_curve
import matplotlib.pyplot as plt

print("所有套件導入完成！")

# ============================================================================
# 第二部分：數據獲取與準備
# ============================================================================

def download_volatility_data(start_date='2011-01-01', end_date='2022-07-31'):
    """
    下載所有波動率指數和標普500指數數據

    Yahoo Finance 的股票代號：
    - ^VIX: CBOE Volatility Index
    - ^VIX9D: CBOE S&P 500 9-Day Volatility Index (可能無法獲取)
    - ^VIX3M: CBOE 3-Month Volatility Index
    - ^VIX6M: CBOE S&P 500 6-Month Volatility Index (可能無法獲取)
    - ^VVIX: CBOE VVIX Index
    - ^SKEW: CBOE SKEW Index
    - ^VXN: CBOE NASDAQ-100 Volatility Index
    - GVZ: CBOE Gold ETF Volatility Index
    - ^OVX: CBOE Crude Oil ETF Volatility Index
    - ^GSPC: S&P 500 Index

    注意：部分指數可能在 Yahoo Finance 上無法獲取，需要使用替代數據源
    """

    print(f"開始下載數據，時間範圍：{start_date} 至 {end_date}")

    # 定義所有需要下載的指數代號
    tickers = {
        'VIX': '^VIX',
        'VIX3M': '^VIX3M',
        'VVIX': '^VVIX',
        'SKEW': '^SKEW',
        'VXN': '^VXN',
        'GVZ': 'GVZ',
        'OVX': '^OVX',
        'SP500': '^GSPC'
    }

    # 下載數據並儲存為列表
    data_list = []
    column_names = []

    for name, ticker in tickers.items():
        try:
            print(f"下載 {name} ({ticker})...", end='')
            df = yf.download(ticker, start=start_date, end=end_date, progress=False)

            if df.empty:
                print(f" ✗ 無數據")
                continue

            # 確保獲取 Close 價格
            if 'Close' in df.columns:
                close_price = df['Close']
            elif isinstance(df, pd.Series):
                close_price = df
            else:
                # 如果是多層索引，嘗試獲取第一層
                close_price = df.iloc[:, 0] if len(df.columns) > 0 else None

            if close_price is not None and len(close_price) > 0:
                # 將下載的數據命名為簡單的名稱，避免多層索引
                close_price.name = name
                data_list.append(close_price)
                column_names.append(name)
                print(f" ✓ {len(close_price)} 筆數據")
            else:
                print(f" ✗ 無有效價格數據")

        except Exception as e:
            print(f" ✗ 下載失敗 - {str(e)}")

    # 檢查是否至少有一些數據
    if len(data_list) == 0:
        print("\n錯誤：無法下載任何數據！")
        print("可能的原因：")
        print("1. 網絡連接問題")
        print("2. Yahoo Finance API 限制")
        print("3. 股票代號不正確")
        return pd.DataFrame()

    # 合併所有數據
    print(f"\n成功下載 {len(data_list)} 個指數")
    df_combined = pd.concat(data_list, axis=1)

    # 確保所有日期對齊（取交集）
    df_combined = df_combined.dropna(how='all')

    # 處理缺失值
    print(f"原始數據形狀: {df_combined.shape}")
    print(f"日期範圍: {df_combined.index[0]} 至 {df_combined.index[-1]}")
    print(f"\n各指數缺失值統計:")
    missing_counts = df_combined.isnull().sum()
    for col, count in missing_counts.items():
        if count > 0:
            print(f"  {col}: {count} ({count/len(df_combined)*100:.1f}%)")

    # 前向填充和後向填充缺失值
    df_combined = df_combined.ffill().bfill()

    # 最終檢查
    remaining_nulls = df_combined.isnull().sum().sum()
    if remaining_nulls > 0:
        print(f"\n警告：仍有 {remaining_nulls} 個缺失值，將刪除這些行")
        df_combined = df_combined.dropna()

    print(f"最終數據形狀: {df_combined.shape}")

    return df_combined


def calculate_features(df):
    """
    計算所有特徵變量

    包括：
    1. 已下載的波動率指數
    2. PUTCALL（用 VIX 的變化率模擬，實際需要期權數據）
    3. RVOL（實現波動率）- 過去30天標普500的滾動標準差
    """

    print("\n開始計算特徵...")

    df_features = df.copy()

    # 計算 RVOL（實現波動率）：過去30天標普500對數回報率的標準差
    if 'SP500' in df_features.columns:
        # 計算對數回報率
        log_returns = np.log(df_features['SP500'] / df_features['SP500'].shift(1))
        # 計算30天滾動標準差（年化）
        df_features['RVOL'] = log_returns.rolling(window=30).std() * np.sqrt(252)
        print("  ✓ RVOL 計算完成")
    else:
        print("  ✗ 無法計算 RVOL，缺少 SP500 數據")


    # 注意：PUTCALL 需要實際的期權成交量數據，這裡暫時不包含
    # 如果要模擬，可以使用其他代理變量

    # 刪除包含 NaN 的行
    df_features = df_features.dropna()

    # 移除 SP500 價格本身，因為它不應該直接作為預測特徵
    if 'SP500' in df_features.columns:
      df_features = df_features.drop(columns=['SP500'])

    print(f"特徵計算完成，數據形狀: {df_features.shape}")
    print(f"可用特徵: {df_features.columns.tolist()}")

    return df_features


def calculate_target(df):
    """
    計算目標變量：未來30天的對數回報率和方向

    Returns30 = ln(S_{t+30} / S_t)
    Direction = 1 if Returns30 > 0 else 0
    """

    print("\n計算目標變量...")

    df_target = df.copy()

    # 確保 SP500 存在
    if 'SP500' not in df_target.columns:
        print("  錯誤：無法計算目標變量，缺少 SP500 數據")
        return pd.DataFrame()

    # 計算未來30天的對數回報率
    # 注意：這裡計算的是未來數據，不能直接作為特徵使用
    df_target['Returns30'] = np.log(
        df_target['SP500'].shift(-30) / df_target['SP500']
    )

    # 轉換為二元分類目標（1=上漲，0=下跌）
    df_target['Direction'] = (df_target['Returns30'] > 0).astype(int)

    # 移除最後30天（沒有未來數據）
    df_target = df_target[:-30]

    print(f"  ✓ 計算完成")
    print(f"  上漲天數: {df_target['Direction'].sum()} ({df_target['Direction'].mean()*100:.2f}%)")
    print(f"  下跌天數: {len(df_target) - df_target['Direction'].sum()} ({(1-df_target['Direction'].mean())*100:.2f}%)")

    return df_target


# ============================================================================
# 第三部分：特徵選擇（使用 Lasso） - 修正為在滾動窗口內執行
# ============================================================================

# 將 Lasso 特徵選擇移動到 walk_forward_validation 函數內部


# ============================================================================
# 第四部分：滾動窗口前向驗證
# ============================================================================

def walk_forward_validation(df, feature_cols, target_col='Direction',
                           train_size=2128, model_type='RandomForest'):
    """
    實施滾動窗口前向驗證

    參數:
        df: 包含特徵和目標的數據框
        feature_cols: 所有候選特徵列名列表
        target_col: 目標列名
        train_size: 訓練窗口大小（論文中為2128天，約70%）
        model_type: 'RandomForest' 或 'Bagging'

    返回:
        predictions: 預測結果
        actuals: 實際結果
        prediction_probs: 預測上漲概率
    """

    print(f"\n開始滾動窗口前向驗證...")
    print(f"  訓練窗口大小: {train_size}")
    print(f"  測試集大小: {len(df) - train_size}")
    print(f"  模型類型: {model_type}")

    predictions = []
    actuals = []
    prediction_probs = []
    selected_features_history = [] # 記錄每個窗口選擇的特徵

    # 滾動窗口
    total_iterations = len(df) - train_size

    for i in range(total_iterations):
        # 定義訓練和測試索引
        train_start = i
        train_end = i + train_size
        test_idx = train_end

        # 提取當前窗口的訓練和測試數據
        # IMPORTANT: Slice the dataframe first to avoid data leakage
        df_train_window = df.iloc[train_start:train_end].copy()
        df_test_window = df.iloc[test_idx:test_idx+1].copy()

        # --- 在每個滾動窗口內處理缺失值 ---
        # 先處理訓練窗口的缺失值
        df_train_window = df_train_window.dropna()

        # Check if training window is empty after dropping NaNs
        if df_train_window.empty:
            print(f"\n  警告 (窗口 {i+1}): 訓練窗口在處理缺失值後為空，跳過此窗口")
            continue

        # 處理測試窗口的缺失值
        df_test_window = df_test_window.dropna()

        # Check if test window is empty after dropping NaNs
        if df_test_window.empty:
            print(f"\n  警告 (窗口 {i+1}): 測試窗口在處理缺失值後為空，跳過此窗口")
            continue


        X_train_window = df_train_window[feature_cols].values
        y_train_window = df_train_window[target_col].values
        X_test_window = df_test_window[feature_cols].values
        y_test_window = df_test_window[target_col].values[0] # 測試集只有一行

        # 檢查訓練數據是否有 NaN (額外檢查)
        if np.isnan(X_train_window).any():
             print(f"\n  錯誤 (窗口 {i+1}): X_train_window 仍包含 NaN 值!")
             print("NaN counts per column in training window:")
             print(df_train_window[feature_cols].isnull().sum())
             return None, None, None, None # 返回 None 表示錯誤


        # --- 在每個滾動窗口內執行 Lasso 特徵選擇 ---
        # 使用當前窗口的訓練數據進行 Lasso 特徵選擇
        y_lasso_target = df_train_window['Returns30'].values

        # 標準化訓練數據用於 Lasso
        scaler_lasso = StandardScaler()
        X_train_lasso_scaled = scaler_lasso.fit_transform(X_train_window)

        # 使用 LassoCV 選擇最佳 alpha
        lasso_cv = LassoCV(alphas=np.logspace(-4, 2, 100), cv=5, random_state=42, n_jobs=-1)
        lasso_cv.fit(X_train_lasso_scaled, y_lasso_target)

        # 獲取非零係數的特徵索引
        selected_feature_indices = np.where(lasso_cv.coef_ != 0)[0]

        # 獲取選擇的特徵名稱
        current_selected_features = [feature_cols[j] for j in selected_feature_indices]

        # 如果沒有特徵被選中，使用所有特徵 (避免模型沒有輸入)
        if len(current_selected_features) == 0:
            print(f"\n  警告 (窗口 {i+1}): Lasso 沒有選擇任何特徵，將使用所有特徵")
            current_selected_features = feature_cols
            # 更新 selected_feature_indices 以包含所有特徵
            selected_feature_indices = np.arange(len(feature_cols))


        # 記錄選擇的特徵
        selected_features_history.append(current_selected_features)

        # 提取僅包含選定特徵的訓練和測試數據
        # 從當前窗口的數據框中根據 Lasso 選擇的索引來提取特徵
        X_train_selected = df_train_window[current_selected_features].values
        X_test_selected = df_test_window[current_selected_features].values

        # --- 標準化選定的特徵 ---
        scaler_model = StandardScaler()
        X_train_scaled = scaler_model.fit_transform(X_train_selected)
        X_test_scaled = scaler_model.transform(X_test_selected)

        # --- 訓練模型 ---
        if model_type == 'RandomForest':
            # 根據論文設定：ntree=500, mtry=sqrt(P)，P 是當前窗口的特徵數
            n_features = int(np.sqrt(X_train_scaled.shape[1]))
            # 確保 max_features 不超過當前選定的特徵數
            n_features = max(1, min(n_features, X_train_scaled.shape[1]))
            model = RandomForestClassifier(
                n_estimators=500,
                max_features=n_features,
                random_state=42,
                n_jobs=-1
            )
        elif model_type == 'Bagging':
            # 根據論文設定：ntree=500, mtry=P（使用所有特徵）
             model = BaggingClassifier(
                estimator=DecisionTreeClassifier(random_state=42), # 添加 random_state
                n_estimators=500,
                random_state=42,
                n_jobs=-1
            )
        else:
            raise ValueError(f"未知的模型類型: {model_type}")

        # 訓練模型 (使用 Direction 作為目標)
        model.fit(X_train_scaled, y_train_window)

        # 預測
        y_pred = model.predict(X_test_scaled)[0]
        y_prob = model.predict_proba(X_test_scaled)[0, 1]  # 上漲的概率

        # 保存結果
        predictions.append(y_pred)
        actuals.append(y_test_window)
        prediction_probs.append(y_prob)

        # 進度顯示
        if (i + 1) % 100 == 0 or i == 0 or i == total_iterations - 1:
            print(f"  進度: {i+1}/{total_iterations} ({(i+1)/total_iterations*100:.1f}%)")
            # 打印當前窗口選擇的特徵 (可選)
            # print(f"    窗口 {i+1} 選擇的特徵: {current_selected_features}")

    return np.array(predictions), np.array(actuals), np.array(prediction_probs), selected_features_history


# ============================================================================
# 第五部分：性能評估
# ============================================================================

def evaluate_model(y_true, y_pred, y_prob):
    """
    評估模型性能

    計算三個關鍵指標：
    1. Accuracy (ACC)
    2. AUC
    3. F-measure (F1-score)
    """

    print("\n" + "="*60)
    print("模型性能評估")
    print("="*60)

    # 計算指標
    acc = accuracy_score(y_true, y_pred)
    auc = roc_auc_score(y_true, y_prob)
    f1 = f1_score(y_true, y_pred)

    print(f"\n準確率 (Accuracy):     {acc:.4f} ({acc*100:.2f}%)")
    print(f"AUC:                   {auc:.4f}")
    print(f"F-measure (F1-score):  {f1:.4f}")

    # 混淆矩陣
    cm = confusion_matrix(y_true, y_pred)
    print(f"\n混淆矩陣:")
    print(f"                預測下跌  預測上漲")
    print(f"實際下跌:      {cm[0,0]:>6}    {cm[0,1]:>6}")
    print(f"實際上漲:      {cm[1,0]:>6}    {cm[1,1]:>6}")

    # 其他統計
    tn, fp, fn, tp = cm.ravel()
    precision = tp / (tp + fp) if (tp + fp) > 0 else 0
    recall = tp / (tp + fn) if (tp + fn) > 0 else 0

    print(f"\n精確率 (Precision):    {precision:.4f}")
    print(f"召回率 (Recall):       {recall:.4f}")

    return acc, auc, f1


def plot_results(y_true, y_pred, y_prob, dates=None):
    """
    視覺化結果
    """

    fig, axes = plt.subplots(2, 2, figsize=(15, 10))

    # 1. 預測 vs 實際（時間序列）
    ax1 = axes[0, 0]
    x = range(len(y_true))
    ax1.plot(x, y_true, 'g-', alpha=0.5, label='實際方向', linewidth=1)
    ax1.plot(x, y_pred, 'r--', alpha=0.7, label='預測方向', linewidth=1)
    ax1.set_title('預測 vs 實際方向（時間序列）', fontsize=12, fontweight='bold')
    ax1.set_xlabel('時間索引')
    ax1.set_ylabel('方向 (0=下跌, 1=上漲)')
    ax1.legend()
    ax1.grid(True, alpha=0.3)

    # 2. ROC 曲線
    ax2 = axes[0, 1]
    fpr, tpr, _ = roc_curve(y_true, y_prob)
    auc_score = roc_auc_score(y_true, y_prob)
    ax2.plot(fpr, tpr, 'b-', linewidth=2, label=f'ROC 曲線 (AUC = {auc_score:.4f})')
    ax2.plot([0, 1], [0, 1], 'k--', label='隨機猜測')
    ax2.set_title('ROC 曲線', fontsize=12, fontweight='bold')
    ax2.set_xlabel('False Positive Rate')
    ax2.set_ylabel('True Positive Rate')
    ax2.legend()
    ax2.grid(True, alpha=0.3)

    # 3. 預測概率分布
    ax3 = axes[1, 0]
    ax3.hist(y_prob[y_true == 0], bins=30, alpha=0.5, color='red', label='實際下跌')
    ax3.hist(y_prob[y_true == 1], bins=30, alpha=0.5, color='green', label='實際上漲')
    ax3.set_title('預測概率分布', fontsize=12, fontweight='bold')
    ax3.set_xlabel('預測上漲概率')
    ax3.set_ylabel('頻率')
    ax3.legend()
    ax3.grid(True, alpha=0.3)

    # 4. 混淆矩陣熱圖
    ax4 = axes[1, 1]
    cm = confusion_matrix(y_true, y_pred)
    im = ax4.imshow(cm, interpolation='nearest', cmap=plt.cm.Blues)
    ax4.figure.colorbar(im, ax=ax4)
    ax4.set_title('混淆矩陣', fontsize=12, fontweight='bold')
    tick_marks = np.arange(2)
    ax4.set_xticks(tick_marks)
    ax4.set_yticks(tick_marks)
    ax4.set_xticklabels(['下跌', '上漲'])
    ax4.set_yticklabels(['下跌', '上漲'])
    ax4.set_ylabel('實際標籤')
    ax4.set_xlabel('預測標籤')

    # 在格子中添加數值
    thresh = cm.max() / 2.
    for i in range(2):
        for j in range(2):
            ax4.text(j, i, format(cm[i, j], 'd'),
                    ha="center", va="center",
                    color="white" if cm[i, j] > thresh else "black",
                    fontsize=16, fontweight='bold')

    plt.tight_layout()
    plt.savefig('model_results.png', dpi=300, bbox_inches='tight')
    print("\n結果圖表已保存為 'model_results.png'")
    plt.show()


# ============================================================================
# 主程式：整合所有步驟
# ============================================================================

def main():
    """
    主程式：執行完整的預測流程
    """

    print("="*60)
    print("基於波動率指數的美國股市方向預測")
    print("="*60)

    # 步驟1：下載數據
    print("\n" + "="*60)
    print("步驟 1: 數據獲取")
    print("="*60)

    # 將結束日期延長30天，以便計算最後的目標變量
    end_date_extended = (datetime.strptime('2022-07-31', '%Y-%m-%d') + timedelta(days=30)).strftime('%Y-%m-%d')
    df_raw = download_volatility_data(start_date='2011-01-01', end_date=end_date_extended)

    if df_raw.empty:
        print("錯誤：無法獲取數據，請檢查網絡連接或數據源")
        return None # Return None on error


    # 步驟2：計算目標變量 (需要 SP500 價格) - 在特徵工程前執行
    print("\n" + "="*60)
    print("步驟 2: 計算目標變量")
    print("="*60)
    # 使用原始數據的拷貝來計算目標，確保 SP500 列存在
    df_with_target = calculate_target(df_raw.copy())

    if df_with_target.empty:
        print("錯誤：無法計算目標變量，請檢查 SP500 數據")
        return None # Return None on error

    # 步驟3：計算特徵 (移除 SP500 價格本身) - 在計算目標變量後執行
    print("\n" + "="*60)
    print("步驟 3: 特徵工程")
    print("="*60)
    # 使用原始數據的拷貝來計算特徵，calculate_features 會移除 SP500
    df_features = calculate_features(df_raw.copy())

    # 將特徵和目標合併，並處理日期對齊和缺失值
    # 注意：這裡合併的 df_features 已經移除了 SP500
    df_full = pd.merge(df_features, df_with_target[['Returns30', 'Direction']],
                       left_index=True, right_index=True, how='inner')

    # 處理合併後的缺失值（主要是因為 RVOL 需要 30 天滾動窗口）
    df_full = df_full.dropna()


    print(f"\n最終數據集形狀: {df_full.shape}")
    print(f"日期範圍: {df_full.index[0]} 至 {df_full.index[-1]}")

    # 準備所有候選特徵列名
    feature_cols = [col for col in df_full.columns if col not in ['Returns30', 'Direction']]
    print(f"\n可用特徵 ({len(feature_cols)}): {feature_cols}")

    # 步驟4：模型訓練與驗證（隨機森林）
    # 特徵選擇將在 walk_forward_validation 內部執行
    print("\n" + "="*60)
    print("步驟 4: 模型訓練與驗證 - 隨機森林")
    print("="*60)

    # 訓練窗口大小（論文中為2128天，約70%）
    # 確保 train_size 不超過數據總長度減去測試集大小 (1)
    train_size = min(2128, len(df_full) - 1)
    if train_size < 30: # 確保訓練集至少能計算 RVOL
        print("錯誤：數據集太小，無法執行滾動窗口驗證。")
        return None # Return None on error

    y_pred_rf, y_true_rf, y_prob_rf, selected_features_rf = walk_forward_validation(
        df_full,
        feature_cols=feature_cols, # 傳入所有候選特徵
        target_col='Direction',
        train_size=train_size,
        model_type='RandomForest'
    )

    # Check if walk_forward_validation returned None due to error
    if y_pred_rf is None:
        return None

    # 評估隨機森林
    acc_rf, auc_rf, f1_rf = evaluate_model(y_true_rf, y_pred_rf, y_prob_rf)

    # 步驟5：模型訓練與驗證（Bagging）
    print("\n" + "="*60)
    print("步驟 5: 模型訓練與驗證 - Bagging")
    print("="*60)

    y_pred_bag, y_true_bag, y_prob_bag, selected_features_bag = walk_forward_validation(
        df_full,
        feature_cols=feature_cols, # 傳入所有候選特徵
        target_col='Direction',
        train_size=train_size,
        model_type='Bagging'
    )

    # Check if walk_forward_validation returned None due to error
    if y_pred_bag is None:
        return None

    # 評估 Bagging
    acc_bag, auc_bag, f1_bag = evaluate_model(y_true_bag, y_pred_bag, y_prob_bag)

    # 步驟6：比較結果
    print("\n" + "="*60)
    print("最終結果比較")
    print("="*60)

    comparison = pd.DataFrame({
        '模型': ['隨機森林', 'Bagging'],
        '準確率 (ACC)': [acc_rf, acc_bag],
        'AUC': [auc_rf, auc_bag],
        'F-measure': [f1_rf, f1_bag]
    })

    print("\n" + comparison.to_string(index=False))

    # 選擇最佳模型進行視覺化
    if acc_rf >= acc_bag:
        print("\n最佳模型：隨機森林")
        plot_results(y_true_rf, y_pred_rf, y_prob_rf)
    else:
        print("\n最佳模型：Bagging")
        plot_results(y_true_bag, y_pred_bag, y_prob_bag)

    print("\n" + "="*60)
    print("程式執行完成！")
    print("="*60)

    return df_full, feature_cols, selected_features_rf, selected_features_bag, comparison


# ============================================================================
# 執行主程式
# ============================================================================

if __name__ == "__main__":
    # 執行主程式
    main_results = main()

    # Check if main() returned None due to an error
    if main_results is not None:
        df_final, all_features, rf_selected_features, bag_selected_features, results = main_results

        print("\n提示：您可以進一步調整參數或進行更多分析")
        print("例如：")
        print("- 調整訓練窗口大小")
        print("- 嘗試不同的超參數")
        print("- 加入更多特徵")
        print("- 使用不同的時間範圍")
        print("\n各窗口選擇的特徵記錄在 rf_selected_features 和 bag_selected_features 中。")
    else:
        print("\n主程式執行失敗，請檢查錯誤訊息。")