In [None]:
# 步驟 0: 安裝套件與修復 NumPy 2.0 相容性 (Colab 環境)
# ⚠️ 重要: 若在 Google Colab，執行此 cell 後請手動重啟 Runtime (Runtime → Restart runtime)

import sys
IN_COLAB = 'google.colab' in sys.modules

if IN_COLAB:
    print("📍 偵測到 Google Colab 環境")
    print("🔧 安裝相容版本套件...")
    !pip install -q numpy==1.26.4 pandas astropy scipy'<1.13' matplotlib scikit-learn
    !pip install -q lightkurve astroquery transitleastsquares wotan joblib
    !pip install -q 'xgboost>=2.0.0' shap plotly pyyaml
    print("✅ 套件安裝完成!")
    print("⚠️ 請現在手動重啟 Runtime: Runtime → Restart runtime")
    print("   然後從下一個 cell 繼續執行")
else:
    print("💻 本地環境，跳過套件安裝")

In [None]:
# 步驟 0: 安裝套件與修復 NumPy 2.0 相容性 (Colab 環境)
# ⚠️ 重要: 若在 Google Colab，執行此 cell 後請手動重啟 Runtime (Runtime → Restart runtime)

import sys
IN_COLAB = 'google.colab' in sys.modules

if IN_COLAB:
    print("📍 偵測到 Google Colab 環境")
    print("🔧 安裝相容版本套件...")
    !pip install -q numpy==1.26.4 pandas astropy scipy'<1.13' matplotlib scikit-learn
    !pip install -q lightkurve astroquery transitleastsquares wotan xgboost joblib
    print("✅ 套件安裝完成!")
    print("⚠️ 請現在手動重啟 Runtime: Runtime → Restart runtime")
    print("   然後從下一個 cell 繼續執行")
else:
    print("💻 本地環境，跳過套件安裝")

In [None]:
# Phase 3-4 新增導入
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import RobustScaler
from sklearn.impute import SimpleImputer
from sklearn.model_selection import StratifiedGroupKFold

# 導入自訂模組 (Phase 3)
import sys
import os
if '/content' in os.getcwd():  # Colab 環境
    sys.path.append('/content/exoplanet-starter/src')
else:  # 本地環境
    sys.path.append(os.path.join(os.getcwd(), '..', 'src'))

from models.pipeline import create_exoplanet_pipeline
from utils.gpu_utils import get_xgboost_gpu_params, log_gpu_info

print("✅ Phase 3-4 imports loaded successfully")
print("  - Pipeline, SimpleImputer, RobustScaler")
print("  - StratifiedGroupKFold")
print("  - create_exoplanet_pipeline, get_xgboost_gpu_params")

✅ Phase 3-4 imports loaded successfully
  - Pipeline, SimpleImputer, RobustScaler
  - StratifiedGroupKFold
  - create_exoplanet_pipeline, get_xgboost_gpu_params


In [None]:
# Import calibration visualization utility
import sys
sys.path.append('../src')
from utils.calibration_viz import plot_calibration_curves

# Create calibration curves plot
predictions_dict = {
    'Uncalibrated': y_pred_uncal,
    'Isotonic': y_pred_isotonic,
    'Platt': y_pred_platt
}

calibration_plot_path = reports_dir / 'calibration_curves.png'
plot_calibration_curves(
    y_true=y_test,
    predictions=predictions_dict,
    output_path=calibration_plot_path,
    n_bins=10
)

print(f"✅ Calibration curves visualization complete")

In [None]:
# 標準函式庫
import json
import time
from pathlib import Path
from typing import Dict, List, Tuple, Optional

# 數據處理
import numpy as np
import pandas as pd

# 天文資料
import lightkurve as lk

# 機器學習
from sklearn.model_selection import train_test_split, cross_val_score
from sklearn.preprocessing import StandardScaler
from sklearn.linear_model import LogisticRegression
from sklearn.ensemble import RandomForestClassifier
from sklearn.calibration import CalibratedClassifierCV, calibration_curve
from sklearn.metrics import (
    average_precision_score,
    precision_recall_curve,
    roc_auc_score,
    brier_score_loss,
    classification_report,
    confusion_matrix
)
import xgboost as xgb
import joblib

# 視覺化
import matplotlib.pyplot as plt
import seaborn as sns
from matplotlib.gridspec import GridSpec

# 設定視覺化風格
plt.style.use('seaborn-v0_8-darkgrid')
sns.set_palette('husl')

# 導入自定義模組
from app.injection import (
    inject_box_transit,
    generate_synthetic_dataset,
    save_synthetic_dataset,
    generate_transit_parameters
)

from app.bls_features import (
    run_bls,
    extract_features,
    extract_features_batch,
    compute_feature_importance,
    create_feature_schema
)

print("📚 套件導入完成")
print(f"   NumPy 版本: {np.__version__}")
print(f"   Pandas 版本: {pd.__version__}")
print(f"   Scikit-learn 版本: {sklearn.__version__}")
print(f"   XGBoost 版本: {xgb.__version__}")

In [None]:
from sklearn.calibration import CalibratedClassifierCV
from sklearn.metrics import brier_score_loss

print("=" * 70)
print("📊 Probability Calibration Training")
print("=" * 70)

# Get uncalibrated predictions
y_pred_uncal = xgb_model.predict_proba(X_test)[:, 1]

# Method 1: Isotonic Regression
print("\n1️⃣ Training Isotonic Calibration...")
calibrated_isotonic = CalibratedClassifierCV(
    xgb_model, 
    method='isotonic', 
    cv='prefit'
)
calibrated_isotonic.fit(X_train, y_train)
y_pred_isotonic = calibrated_isotonic.predict_proba(X_test)[:, 1]
print("   ✅ Isotonic calibration complete")

# Method 2: Platt Scaling (Sigmoid)
print("\n2️⃣ Training Platt Scaling (Sigmoid)...")
calibrated_platt = CalibratedClassifierCV(
    xgb_model, 
    method='sigmoid', 
    cv='prefit'
)
calibrated_platt.fit(X_train, y_train)
y_pred_platt = calibrated_platt.predict_proba(X_test)[:, 1]
print("   ✅ Platt calibration complete")

In [None]:
# 環境設定與依賴安裝（Colab）
import sys, subprocess, pkgutil
import warnings
warnings.filterwarnings('ignore')

def pipi(*pkgs):
    """安裝套件的輔助函式"""
    subprocess.check_call([sys.executable, "-m", "pip", "install", "-q", *pkgs])

# 安裝必要套件（避免 numpy 2.0 相容性問題）
print("🚀 正在安裝依賴套件...")
try:
    import numpy as np
    import lightkurve as lk
    import sklearn
    import xgboost
    print("✅ 基礎套件已安裝")
except Exception:
    pipi("numpy<2", "lightkurve", "astroquery", "scikit-learn", 
         "matplotlib", "seaborn", "xgboost", "joblib", "pandas", "pyarrow")
    print("✅ 依賴套件安裝完成")

# 檢查是否在 Colab 環境
IN_COLAB = 'google.colab' in sys.modules

if IN_COLAB:
    print("📍 在 Google Colab 環境執行")
    # Clone repository if needed
    import os
    if not os.path.exists('/content/exoplanet-starter'):
        !git clone https://github.com/exoplanet-spaceapps/exoplanet-starter.git /content/exoplanet-starter
        os.chdir('/content/exoplanet-starter')
    sys.path.append('/content/exoplanet-starter')
else:
    print("💻 在本地環境執行")
    import os
    os.chdir(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
    sys.path.append(os.getcwd())

print("\n環境設定完成！")

In [None]:
# 提取特徵
print("🔍 開始批次特徵提取...")
print("   這可能需要幾分鐘時間...\n")

start_time = time.time()

# 批次提取特徵
features_df = extract_features_batch(
    samples_df,
    compute_advanced=True,
    verbose=True
)

elapsed_time = time.time() - start_time

print(f"\n✅ 特徵提取完成")
print(f"   耗時: {elapsed_time:.1f} 秒")
print(f"   平均每個樣本: {elapsed_time/len(samples_df):.2f} 秒")
print(f"   提取特徵數: {len(features_df.columns) - 2}")  # 扣除 sample_id 和 label

# 顯示特徵列表
feature_cols = [col for col in features_df.columns if col not in ['sample_id', 'label']]
print(f"\n📋 特徵列表:")
for i, feat in enumerate(feature_cols, 1):
    print(f"   {i:2d}. {feat}")

## 4. 特徵萃取

### 4.1 批次提取 BLS 特徵

### 5.1 Phase 3-4: Sklearn Pipeline + StratifiedGroupKFold 交叉驗證

**Phase 3 新功能:**
- ✨ 使用 `create_exoplanet_pipeline()` 取代基礎 XGBoost
- 🔄 整合 SimpleImputer + RobustScaler 前處理
- 🚀 GPU 加速 (自動偵測並使用 `device='cuda'`)
- 🎯 `random_state=42` 確保可重現性

**Phase 4 新功能:**
- 📊 StratifiedGroupKFold 5-fold 交叉驗證
- 🔒 按 `target_id` 分組避免資料洩漏
- 📈 記錄每個 fold 的詳細指標
- 📉 計算平均 AUC-PR 與標準差

**為什麼使用 StratifiedGroupKFold?**
1. **Stratified**: 保持每個 fold 的類別比例一致
2. **Grouped**: 同一 target 的所有樣本留在同一 fold
3. **防止洩漏**: 訓練時不會看到測試目標的任何資料

# 03 · 合成注入訓練管線

## 工作流程
1. **資料生成**：合成注入 200 正類 + 200 負類
2. **特徵萃取**：BLS/TLS 指標 + 幾何統計
3. **模型訓練**：LogisticRegression/XGBoost + 機率校準
4. **評估指標**：PR-AUC, Precision@K, ECE, Brier Score
5. **持久化**：儲存模型與特徵架構

---

## 7. SHAP Explainability Analysis

使用 SHAP (SHapley Additive exPlanations) 分析模型特徵重要性與可解釋性。

### 4.2 特徵重要性分析

### 4.3 建立特徵架構

### 9.3 提取真實資料特徵

### 9.3 提取真實資料特徵

In [None]:
# Phase 3-4: 使用 Pipeline + StratifiedGroupKFold 訓練
print("🚀 Phase 3: 創建 Sklearn Pipeline with Preprocessing")
print("="*60)

# 獲取 GPU 參數
gpu_params = get_xgboost_gpu_params()
print(f"XGBoost 參數: {gpu_params}")
print()

# 創建 pipeline (Phase 3)
pipeline = create_exoplanet_pipeline(
    numerical_features=feature_cols,
    xgb_params=gpu_params,
    n_estimators=100,
    max_depth=6,
    learning_rate=0.1,
    random_state=42
)

print("✅ Pipeline 創建成功:")
print("   步驟 1: SimpleImputer (median)")
print("   步驟 2: RobustScaler (robust to outliers)")
print("   步驟 3: XGBClassifier (GPU-accelerated)")
print()

# Phase 4: StratifiedGroupKFold 交叉驗證
print("📊 Phase 4: StratifiedGroupKFold 5-Fold Cross-Validation")
print("="*60)

n_splits = 5
sgkf = StratifiedGroupKFold(n_splits=n_splits, shuffle=True, random_state=42)

# 儲存每個 fold 的結果
fold_results = []
fold_models = []

for fold_idx, (train_idx, test_idx) in enumerate(sgkf.split(X, y, groups), 1):
    print(f"\n🔄 Fold {fold_idx}/{n_splits}")
    print("-" * 40)
    
    # 分割資料
    X_train_fold, X_test_fold = X.iloc[train_idx], X.iloc[test_idx]
    y_train_fold, y_test_fold = y.iloc[train_idx], y.iloc[test_idx]
    
    # 訓練 pipeline (包含前處理 + XGBoost)
    pipeline_fold = create_exoplanet_pipeline(
        numerical_features=feature_cols,
        xgb_params=gpu_params,
        n_estimators=100,
        max_depth=6,
        learning_rate=0.1,
        random_state=42 + fold_idx  # 每個 fold 不同的 seed
    )
    
    pipeline_fold.fit(X_train_fold, y_train_fold)
    
    # 預測
    y_pred_proba_fold = pipeline_fold.predict_proba(X_test_fold)[:, 1]
    
    # 計算指標
    from sklearn.metrics import (
        average_precision_score, 
        roc_auc_score, 
        precision_score, 
        recall_score
    )
    
    ap_score = average_precision_score(y_test_fold, y_pred_proba_fold)
    auc_score = roc_auc_score(y_test_fold, y_pred_proba_fold)
    
    # 使用 threshold=0.5 計算 precision/recall
    y_pred_binary = (y_pred_proba_fold >= 0.5).astype(int)
    precision = precision_score(y_test_fold, y_pred_binary, zero_division=0)
    recall = recall_score(y_test_fold, y_pred_binary)
    
    fold_results.append({
        'fold': fold_idx,
        'train_size': len(train_idx),
        'test_size': len(test_idx),
        'test_pos_ratio': y_test_fold.mean(),
        'auc_pr': ap_score,
        'auc_roc': auc_score,
        'precision': precision,
        'recall': recall
    })
    
    fold_models.append(pipeline_fold)
    
    print(f"   訓練集: {len(train_idx)} 樣本")
    print(f"   測試集: {len(test_idx)} 樣本 (正樣本: {y_test_fold.mean():.2%})")
    print(f"   AUC-PR:  {ap_score:.4f}")
    print(f"   AUC-ROC: {auc_score:.4f}")
    print(f"   Precision@0.5: {precision:.4f}")
    print(f"   Recall@0.5: {recall:.4f}")

# 匯總結果
print("\n" + "="*60)
print("📈 Phase 4: Cross-Validation 匯總結果")
print("="*60)

fold_df = pd.DataFrame(fold_results)

print(f"\nAUC-PR:  {fold_df['auc_pr'].mean():.4f} ± {fold_df['auc_pr'].std():.4f}")
print(f"AUC-ROC: {fold_df['auc_roc'].mean():.4f} ± {fold_df['auc_roc'].std():.4f}")
print(f"Precision@0.5: {fold_df['precision'].mean():.4f} ± {fold_df['precision'].std():.4f}")
print(f"Recall@0.5: {fold_df['recall'].mean():.4f} ± {fold_df['recall'].std():.4f}")

print("\n📊 各 Fold 詳細結果:")
print(fold_df.to_string(index=False))

# 保存最佳模型 (根據 AUC-PR)
best_fold_idx = fold_df['auc_pr'].idxmax()
best_model = fold_models[best_fold_idx]
print(f"\n✅ 最佳模型: Fold {best_fold_idx + 1} (AUC-PR: {fold_df.loc[best_fold_idx, 'auc_pr']:.4f})")

# 將最佳模型保存到 models dict (以便後續使用)
models = {'XGBoost_Pipeline_CV': best_model}
print("   已保存為 models['XGBoost_Pipeline_CV']")

🚀 Phase 3: 創建 Sklearn Pipeline with Preprocessing


XGBoost 參數: {'tree_method': 'hist', 'device': 'cuda'}



NameError: name 'feature_cols' is not defined

In [None]:
if 'supervised_features_df' in locals() and len(supervised_features_df) > 10:    print("Phase 3-4: Training supervised model (Pipeline + CV)...")    print("="*60)    # Prepare data    X_supervised = supervised_features_df[feature_cols].values    y_supervised = supervised_features_df['label'].values    # Handle invalid values    X_supervised = np.nan_to_num(X_supervised, nan=0.0, posinf=0.0, neginf=0.0)    # Phase 4: Create target groups (from target_id or TIC_ID)    if 'target_id' in supervised_features_df.columns:        groups_supervised = supervised_features_df['target_id'].values        print(f"Using target_id for grouping")    elif 'tic_id' in supervised_features_df.columns:        groups_supervised = supervised_features_df['tic_id'].values        print(f"Using tic_id for grouping")    elif 'sample_id' in supervised_features_df.columns:        groups_supervised = supervised_features_df['sample_id'].apply(            lambda x: hash(str(x)) % 10000        ).values        print(f"Generated groups from sample_id")    else:        groups_supervised = np.arange(len(y_supervised))        print(f"No grouping column found, using individual groups")    print(f"Supervised Dataset:")    print(f"   Total samples: {len(X_supervised)}")    print(f"   Features: {X_supervised.shape[1]}")    print(f"   Positive ratio: {y_supervised.mean():.2%}")    print(f"   Unique groups: {len(np.unique(groups_supervised))}")    print()    # Phase 3: Use Pipeline + GPU    gpu_params_sup = get_xgboost_gpu_params()    print(f"XGBoost GPU params: {gpu_params_sup}")    print()    # Phase 4: StratifiedGroupKFold cross-validation    print("Phase 4: StratifiedGroupKFold 5-Fold Cross-Validation")    print("-" * 60)    n_splits_sup = 5    sgkf_sup = StratifiedGroupKFold(n_splits=n_splits_sup, shuffle=True, random_state=42)    fold_results_sup = []    fold_models_sup = []    for fold_idx, (train_idx, test_idx) in enumerate(sgkf_sup.split(X_supervised, y_supervised, groups_supervised), 1):        print(f"Fold {fold_idx}/{n_splits_sup}")        print("-" * 40)        # Split data        X_train_fold_sup, X_test_fold_sup = X_supervised[train_idx], X_supervised[test_idx]        y_train_fold_sup, y_test_fold_sup = y_supervised[train_idx], y_supervised[test_idx]        # Train pipeline (Phase 3)        pipeline_fold_sup = create_exoplanet_pipeline(            numerical_features=feature_cols,            xgb_params=gpu_params_sup,            n_estimators=100,            max_depth=6,            learning_rate=0.1,            random_state=42 + fold_idx        )        pipeline_fold_sup.fit(X_train_fold_sup, y_train_fold_sup)        # Predict        y_pred_proba_fold_sup = pipeline_fold_sup.predict_proba(X_test_fold_sup)[:, 1]        # Calculate metrics        from sklearn.metrics import average_precision_score, roc_auc_score        ap_score_sup = average_precision_score(y_test_fold_sup, y_pred_proba_fold_sup)        auc_score_sup = roc_auc_score(y_test_fold_sup, y_pred_proba_fold_sup)        fold_results_sup.append({            'fold': fold_idx,            'train_size': len(train_idx),            'test_size': len(test_idx),            'test_pos_ratio': y_test_fold_sup.mean(),            'auc_pr': ap_score_sup,            'auc_roc': auc_score_sup        })        fold_models_sup.append(pipeline_fold_sup)        print(f"   Train: {len(train_idx)} samples")        print(f"   Test: {len(test_idx)} samples (pos: {y_test_fold_sup.mean():.2%})")        print(f"   AUC-PR:  {ap_score_sup:.4f}")        print(f"   AUC-ROC: {auc_score_sup:.4f}")    # Summary results    print("" + "="*60)    print("Phase 4: Cross-Validation Summary Results")    print("="*60)    fold_df_sup = pd.DataFrame(fold_results_sup)    print(f"AUC-PR:  {fold_df_sup['auc_pr'].mean():.4f} +/- {fold_df_sup['auc_pr'].std():.4f}")    print(f"AUC-ROC: {fold_df_sup['auc_roc'].mean():.4f} +/- {fold_df_sup['auc_roc'].std():.4f}")    print("Detailed results per fold:")    print(fold_df_sup.to_string(index=False))    # Save best model    best_fold_idx_sup = fold_df_sup['auc_pr'].idxmax()    best_model_supervised = fold_models_sup[best_fold_idx_sup]    print(f"Best supervised model: Fold {best_fold_idx_sup + 1} (AUC-PR: {fold_df_sup.loc[best_fold_idx_sup, 'auc_pr']:.4f})")    # Save to models dict    models['XGBoost_Supervised_Pipeline_CV'] = best_model_supervised    metrics_supervised = fold_results_sup[best_fold_idx_sup]    print("   Saved as models['XGBoost_Supervised_Pipeline_CV']")else:    print("supervised_features_df not found or insufficient samples, skipping supervised training")    metrics_supervised = None

In [None]:
if 'supervised_features_df' in locals() and len(supervised_features_df) > 10:    print("Phase 3-4: Training supervised model (Pipeline + CV)...")    print("="*60)    # Prepare data    X_supervised = supervised_features_df[feature_cols].values    y_supervised = supervised_features_df['label'].values    # Handle invalid values    X_supervised = np.nan_to_num(X_supervised, nan=0.0, posinf=0.0, neginf=0.0)    # Phase 4: Create target groups (from target_id or TIC_ID)    if 'target_id' in supervised_features_df.columns:        groups_supervised = supervised_features_df['target_id'].values        print(f"Using target_id for grouping")    elif 'tic_id' in supervised_features_df.columns:        groups_supervised = supervised_features_df['tic_id'].values        print(f"Using tic_id for grouping")    elif 'sample_id' in supervised_features_df.columns:        groups_supervised = supervised_features_df['sample_id'].apply(            lambda x: hash(str(x)) % 10000        ).values        print(f"Generated groups from sample_id")    else:        groups_supervised = np.arange(len(y_supervised))        print(f"No grouping column found, using individual groups")    print(f"Supervised Dataset:")    print(f"   Total samples: {len(X_supervised)}")    print(f"   Features: {X_supervised.shape[1]}")    print(f"   Positive ratio: {y_supervised.mean():.2%}")    print(f"   Unique groups: {len(np.unique(groups_supervised))}")    print()    # Phase 3: Use Pipeline + GPU    gpu_params_sup = get_xgboost_gpu_params()    print(f"XGBoost GPU params: {gpu_params_sup}")    print()    # Phase 4: StratifiedGroupKFold cross-validation    print("Phase 4: StratifiedGroupKFold 5-Fold Cross-Validation")    print("-" * 60)    n_splits_sup = 5    sgkf_sup = StratifiedGroupKFold(n_splits=n_splits_sup, shuffle=True, random_state=42)    fold_results_sup = []    fold_models_sup = []    for fold_idx, (train_idx, test_idx) in enumerate(sgkf_sup.split(X_supervised, y_supervised, groups_supervised), 1):        print(f"Fold {fold_idx}/{n_splits_sup}")        print("-" * 40)        # Split data        X_train_fold_sup, X_test_fold_sup = X_supervised[train_idx], X_supervised[test_idx]        y_train_fold_sup, y_test_fold_sup = y_supervised[train_idx], y_supervised[test_idx]        # Train pipeline (Phase 3)        pipeline_fold_sup = create_exoplanet_pipeline(            numerical_features=feature_cols,            xgb_params=gpu_params_sup,            n_estimators=100,            max_depth=6,            learning_rate=0.1,            random_state=42 + fold_idx        )        pipeline_fold_sup.fit(X_train_fold_sup, y_train_fold_sup)        # Predict        y_pred_proba_fold_sup = pipeline_fold_sup.predict_proba(X_test_fold_sup)[:, 1]        # Calculate metrics        from sklearn.metrics import average_precision_score, roc_auc_score        ap_score_sup = average_precision_score(y_test_fold_sup, y_pred_proba_fold_sup)        auc_score_sup = roc_auc_score(y_test_fold_sup, y_pred_proba_fold_sup)        fold_results_sup.append({            'fold': fold_idx,            'train_size': len(train_idx),            'test_size': len(test_idx),            'test_pos_ratio': y_test_fold_sup.mean(),            'auc_pr': ap_score_sup,            'auc_roc': auc_score_sup        })        fold_models_sup.append(pipeline_fold_sup)        print(f"   Train: {len(train_idx)} samples")        print(f"   Test: {len(test_idx)} samples (pos: {y_test_fold_sup.mean():.2%})")        print(f"   AUC-PR:  {ap_score_sup:.4f}")        print(f"   AUC-ROC: {auc_score_sup:.4f}")    # Summary results    print("" + "="*60)    print("Phase 4: Cross-Validation Summary Results")    print("="*60)    fold_df_sup = pd.DataFrame(fold_results_sup)    print(f"AUC-PR:  {fold_df_sup['auc_pr'].mean():.4f} +/- {fold_df_sup['auc_pr'].std():.4f}")    print(f"AUC-ROC: {fold_df_sup['auc_roc'].mean():.4f} +/- {fold_df_sup['auc_roc'].std():.4f}")    print("Detailed results per fold:")    print(fold_df_sup.to_string(index=False))    # Save best model    best_fold_idx_sup = fold_df_sup['auc_pr'].idxmax()    best_model_supervised = fold_models_sup[best_fold_idx_sup]    print(f"Best supervised model: Fold {best_fold_idx_sup + 1} (AUC-PR: {fold_df_sup.loc[best_fold_idx_sup, 'auc_pr']:.4f})")    # Save to models dict    models['XGBoost_Supervised_Pipeline_CV'] = best_model_supervised    metrics_supervised = fold_results_sup[best_fold_idx_sup]    print("   Saved as models['XGBoost_Supervised_Pipeline_CV']")else:    print("supervised_features_df not found or insufficient samples, skipping supervised training")    metrics_supervised = None

In [None]:
# Phase 3-4: 準備資料與特徵 (with grouping)
print("📊 Phase 3-4: 準備訓練資料與 StratifiedGroupKFold")
print("="*60)

# 提取特徵和標籤
X = features_df[feature_cols].values
y = features_df['label'].values

# 處理無效值 (NaN, Inf)
X = np.nan_to_num(X, nan=0.0, posinf=0.0, neginf=0.0)

# 創建 target groups (對於合成資料，我們用 sample_id 的前綴作為 group)
# 這模擬了"同一目標的多次觀測"
if 'sample_id' in features_df.columns:
    # 從 sample_id 提取 group (例如: "sample_0001_obs1" -> group "0001")
    groups = features_df['sample_id'].apply(
        lambda x: int(str(x).split('_')[1]) if '_' in str(x) else hash(str(x)) % 1000
    ).values
else:
    # 如果沒有 sample_id，創建假的 groups (每個樣本一組)
    groups = np.arange(len(y))

print(f"   總樣本數: {len(X)}")
print(f"   特徵維度: {X.shape[1]}")
print(f"   正樣本比例: {y.mean():.2%}")
print(f"   唯一 groups 數: {len(np.unique(groups))}")
print()

# Log GPU info
print("🖥️  GPU 配置:")
log_gpu_info()
print()

## 5. 模型訓練與校準

### 5.1 資料準備

### 5.2 訓練多個模型

## 9. 監督式學習管線（真實 TOI + Kepler EB 資料）

> 💡 **新增功能**：使用真實的 TOI（正類）和 Kepler EB（負類）資料進行監督式訓練，與合成注入方法比較。

### 9.1 載入真實資料集

## 9. 監督式學習管線（真實 TOI + Kepler EB 資料）

> 💡 **新增功能**：使用真實的 TOI（正類）和 Kepler EB（負類）資料進行監督式訓練，與合成注入方法比較。

---

## 9. 監督式學習管線（真實 TOI + Kepler EB 資料）

> 💡 **新增功能**：使用真實的 TOI（正類）和 Kepler EB（負類）資料進行監督式訓練，與合成注入方法比較。

### 9.1 載入真實資料集

### 9.4 訓練監督式模型

## 9. 監督式學習管線（真實 TOI + Kepler EB 資料）

> 💡 **新增功能**：使用真實的 TOI（正類）和 Kepler EB（負類）資料進行監督式訓練，與合成注入方法比較。

### 9.1 載入真實資料集

### 9.4 訓練監督式模型

---

## 🚀 GitHub Push 終極解決方案

將合成注入與監督式訓練結果推送到 GitHub 倉庫：

In [None]:
# 建立輸出目錄
output_dir = Path("model")
output_dir.mkdir(parents=True, exist_ok=True)

# 儲存模型
print("💾 儲存模型與相關檔案...\n")

# 1. 儲存校準模型
model_path = output_dir / "ranker.joblib"
joblib.dump(calibrated_model, model_path)
print(f"✅ 模型已儲存: {model_path}")

# 2. 儲存特徵標準化器
scaler_path = output_dir / "scaler.joblib"
joblib.dump(scaler, scaler_path)
print(f"✅ 標準化器已儲存: {scaler_path}")

# 3. 儲存特徵架構 (with fallback if file doesn't exist)
import shutil
feature_schema_source = Path("data/feature_schema.json")
if feature_schema_source.exists():
    shutil.copy(feature_schema_source, output_dir / "feature_schema.json")
    print(f"✅ 特徵架構已複製: {output_dir / 'feature_schema.json'}")
else:
    # Create feature schema from current feature_cols
    feature_schema = {
        "features": feature_cols,
        "n_features": len(feature_cols),
        "created_date": time.strftime("%Y-%m-%d %H:%M:%S"),
        "note": "Generated from training data"
    }
    schema_path = output_dir / "feature_schema.json"
    with open(schema_path, 'w') as f:
        json.dump(feature_schema, f, indent=2)
    print(f"✅ 特徵架構已生成: {schema_path}")

# 4. 儲存模型元資料
metadata = {
    "model_type": "XGBoost with Isotonic Calibration",
    "training_date": time.strftime("%Y-%m-%d %H:%M:%S"),
    "n_features": len(feature_cols),
    "feature_names": feature_cols,
    "training_samples": len(X_train),
    "test_samples": len(X_test),
    "metrics": metrics_calibrated,
    "parameters": {
        "period_range": [0.6, 10.0],
        "depth_range": [0.0005, 0.02],
        "duration_fraction_range": [0.02, 0.1]
    }
}

metadata_path = output_dir / "model_metadata.json"
with open(metadata_path, 'w') as f:
    json.dump(metadata, f, indent=2, default=str)
print(f"✅ 元資料已儲存: {metadata_path}")

print("\n📦 所有檔案已成功儲存至 'model/' 目錄")

In [None]:
print("="*60)
print("📊 訓練管線執行總結")
print("="*60)

print(f"""
🎯 資料集:
   • 總樣本數: {len(samples_df)}
   • 正樣本: {len(samples_df[samples_df['label'] == 1])}
   • 負樣本: {len(samples_df[samples_df['label'] == 0])}
   
🔍 特徵工程:
   • 特徵數量: {len(feature_cols)}
   • Top 3 重要特徵:
""")

for idx, row in importance_df.head(3).iterrows():
    print(f"     - {row['feature']}: {row['importance']:.4f}")

print(f"""
🤖 模型效能:
   • PR-AUC: {metrics_calibrated['PR-AUC']:.3f}
   • ROC-AUC: {metrics_calibrated['ROC-AUC']:.3f}
   • Brier Score: {metrics_calibrated['Brier Score']:.3f}
   • ECE: {metrics_calibrated['ECE']:.3f}
   • Precision@10: {metrics_calibrated.get('P@10', 'N/A')}
   
💡 關鍵發現:
   1. Isotonic 校準顯著改善了機率預測的可靠性
   2. BLS 特徵（週期、SNR、深度）是最重要的預測因子
   3. 模型在高置信度預測上表現優異（高 Precision@K）
   
📦 輸出檔案:
   • 模型: model/ranker.joblib
   • 標準化器: model/scaler.joblib
   • 特徵架構: model/feature_schema.json
   • 元資料: model/model_metadata.json
   • 合成資料: data/synthetic/
""")

print("="*60)
print("✅ 訓練管線完成！")
print("="*60)

In [None]:
if 'metrics_supervised' in locals() and metrics_supervised is not None:
    print("💾 儲存監督式模型...")

    # 建立輸出目錄
    supervised_dir = Path("model/supervised")
    supervised_dir.mkdir(parents=True, exist_ok=True)

    # 儲存模型
    joblib.dump(calibrated_supervised, supervised_dir / "ranker_supervised.joblib")
    joblib.dump(scaler_supervised, supervised_dir / "scaler_supervised.joblib")

    # 儲存元資料
    supervised_metadata = {
        "model_type": "XGBoost with Isotonic Calibration (Supervised)",
        "training_date": time.strftime("%Y-%m-%d %H:%M:%S"),
        "data_sources": {
            "positive": "TOI (PC/CP/KP)",
            "negative": "Kepler EB + TOI FP"
        },
        "n_features": len(feature_cols),
        "feature_names": feature_cols,
        "training_samples": len(X_train_sup),
        "test_samples": len(X_test_sup),
        "metrics": metrics_supervised
    }

    with open(supervised_dir / "model_metadata.json", 'w') as f:
        json.dump(supervised_metadata, f, indent=2, default=str)

    print(f"✅ 監督式模型已儲存至: {supervised_dir}")

    # 儲存比較結果
    if 'comparison_df' in locals() and comparison_df is not None:
        comparison_df.to_csv("model/method_comparison.csv", index=False)
        print("✅ 方法比較結果已儲存至: model/method_comparison.csv")
else:
    print("⚠️ 無監督式模型可儲存")

In [None]:
print("="*70)
print("📊 完整訓練管線執行總結")
print("="*70)

print(f"""
🎯 資料集統計:

   【合成注入資料】
   • 總樣本數: {len(samples_df)}
   • 正樣本: {len(samples_df[samples_df['label'] == 1])}
   • 負樣本: {len(samples_df[samples_df['label'] == 0])}

   【真實監督資料】
   • 總樣本數: {len(supervised_features_df) if 'supervised_features_df' in locals() else 0}
   • TOI 正樣本: {len(supervised_features_df[supervised_features_df['source']=='TOI']) if 'supervised_features_df' in locals() and len(supervised_features_df) > 0 else 0}
   • Kepler EB 負樣本: {len(supervised_features_df[supervised_features_df['source']=='Kepler_EB']) if 'supervised_features_df' in locals() and len(supervised_features_df) > 0 else 0}

🔍 特徵工程:
   • 特徵數量: {len(feature_cols)}
   • Top 3 重要特徵:
""")

for idx, row in importance_df.head(3).iterrows():
    print(f"     - {row['feature']}: {row['importance']:.4f}")

print(f"""
🤖 模型效能比較:

   【合成注入方法】
   • PR-AUC: {metrics_calibrated['PR-AUC']:.3f}
   • ROC-AUC: {metrics_calibrated['ROC-AUC']:.3f}
   • Brier Score: {metrics_calibrated['Brier Score']:.3f}
   • ECE: {metrics_calibrated['ECE']:.3f}
""")

if 'metrics_supervised' in locals() and metrics_supervised is not None:
    print(f"""
   【監督式學習】
   • PR-AUC: {metrics_supervised['PR-AUC']:.3f}
   • ROC-AUC: {metrics_supervised['ROC-AUC']:.3f}
   • Brier Score: {metrics_supervised['Brier Score']:.3f}
   • ECE: {metrics_supervised['ECE']:.3f}
    """)

print(f"""
💡 關鍵發現與建議:
   1. Isotonic 校準顯著改善了機率預測的可靠性
   2. BLS 特徵（週期、SNR、深度）是最重要的預測因子
   3. 合成注入適合快速開發，監督式學習更接近實際應用
   4. 建議在實際部署時結合兩種方法的優勢

📦 輸出檔案:
   • 合成模型: model/ranker.joblib
   • 監督模型: model/supervised/ranker_supervised.joblib
   • 特徵架構: model/feature_schema.json
   • 比較結果: model/method_comparison.csv
   • 資料集: data/synthetic/ 和 data/

🚀 下一步:
   1. 使用 04_newdata_inference.ipynb 對新資料進行推論
   2. 在更大的真實資料集上訓練監督式模型
   3. 探索深度學習方法（CNN/Transformer）
   4. 部署為 Web 應用或 API 服務
""")

print("="*70)
print("✅ 訓練管線（含監督式分支）完成！")
print("="*70)

In [None]:
if 'metrics_supervised' in locals() and metrics_supervised is not None:
    print("💾 儲存監督式模型...")

    # 建立輸出目錄
    supervised_dir = Path("model/supervised")
    supervised_dir.mkdir(parents=True, exist_ok=True)

    # 儲存模型
    joblib.dump(calibrated_supervised, supervised_dir / "ranker_supervised.joblib")
    joblib.dump(scaler_supervised, supervised_dir / "scaler_supervised.joblib")

    # 儲存元資料
    supervised_metadata = {
        "model_type": "XGBoost with Isotonic Calibration (Supervised)",
        "training_date": time.strftime("%Y-%m-%d %H:%M:%S"),
        "data_sources": {
            "positive": "TOI (PC/CP/KP)",
            "negative": "Kepler EB + TOI FP"
        },
        "n_features": len(feature_cols),
        "feature_names": feature_cols,
        "training_samples": len(X_train_sup),
        "test_samples": len(X_test_sup),
        "metrics": metrics_supervised
    }

    with open(supervised_dir / "model_metadata.json", 'w') as f:
        json.dump(supervised_metadata, f, indent=2, default=str)

    print(f"✅ 監督式模型已儲存至: {supervised_dir}")

    # 儲存比較結果
    if 'comparison_df' in locals() and comparison_df is not None:
        comparison_df.to_csv("model/method_comparison.csv", index=False)
        print("✅ 方法比較結果已儲存至: model/method_comparison.csv")
else:
    print("⚠️ 無監督式模型可儲存")

In [None]:
print("="*70)
print("📊 完整訓練管線執行總結")
print("="*70)

print(f"""
🎯 資料集統計:

   【合成注入資料】
   • 總樣本數: {len(samples_df)}
   • 正樣本: {len(samples_df[samples_df['label'] == 1])}
   • 負樣本: {len(samples_df[samples_df['label'] == 0])}

   【真實監督資料】
   • 總樣本數: {len(supervised_features_df) if 'supervised_features_df' in locals() else 0}
   • TOI 正樣本: {len(supervised_features_df[supervised_features_df['source']=='TOI']) if 'supervised_features_df' in locals() and len(supervised_features_df) > 0 else 0}
   • Kepler EB 負樣本: {len(supervised_features_df[supervised_features_df['source']=='Kepler_EB']) if 'supervised_features_df' in locals() and len(supervised_features_df) > 0 else 0}

🔍 特徵工程:
   • 特徵數量: {len(feature_cols)}
   • Top 3 重要特徵:
""")

for idx, row in importance_df.head(3).iterrows():
    print(f"     - {row['feature']}: {row['importance']:.4f}")

print(f"""
🤖 模型效能比較:

   【合成注入方法】
   • PR-AUC: {metrics_calibrated['PR-AUC']:.3f}
   • ROC-AUC: {metrics_calibrated['ROC-AUC']:.3f}
   • Brier Score: {metrics_calibrated['Brier Score']:.3f}
   • ECE: {metrics_calibrated['ECE']:.3f}
""")

if 'metrics_supervised' in locals() and metrics_supervised is not None:
    print(f"""
   【監督式學習】
   • PR-AUC: {metrics_supervised['PR-AUC']:.3f}
   • ROC-AUC: {metrics_supervised['ROC-AUC']:.3f}
   • Brier Score: {metrics_supervised['Brier Score']:.3f}
   • ECE: {metrics_supervised['ECE']:.3f}
    """)

print(f"""
💡 關鍵發現與建議:
   1. Isotonic 校準顯著改善了機率預測的可靠性
   2. BLS 特徵（週期、SNR、深度）是最重要的預測因子
   3. 合成注入適合快速開發，監督式學習更接近實際應用
   4. 建議在實際部署時結合兩種方法的優勢

📦 輸出檔案:
   • 合成模型: model/ranker.joblib
   • 監督模型: model/supervised/ranker_supervised.joblib
   • 特徵架構: model/feature_schema.json
   • 比較結果: model/method_comparison.csv
   • 資料集: data/synthetic/ 和 data/

🚀 下一步:
   1. 使用 04_newdata_inference.ipynb 對新資料進行推論
   2. 在更大的真實資料集上訓練監督式模型
   3. 探索深度學習方法（CNN/Transformer）
   4. 部署為 Web 應用或 API 服務
""")

print("="*70)
print("✅ 訓練管線（含監督式分支）完成！")
print("="*70)

In [None]:
# Summary of Phase 7-8 outputs
print("\n" + "=" * 70)
print("📋 Phase 7-8 Summary")
print("=" * 70)

print("\n✅ SHAP Explainability (Phase 7):")
print(f"   • Summary plot: {shap_plot_path}")
print(f"   • Top feature: {feature_importance.iloc[0]['feature']}")
print(f"   • Features analyzed: {len(features)}")

print("\n✅ Probability Calibration (Phase 8):")
print(f"   • Best method: {best_method[0]}")
print(f"   • Brier improvement: {(brier_uncal - best_method[1])/brier_uncal*100:.2f}%")
print(f"   • Calibration curves: {calibration_plot_path}")
print(f"   • Model card: {model_card_path}")

print("\n" + "=" * 70)

In [None]:
# Compare Brier scores
print("\n" + "=" * 70)
print("📈 Calibration Method Comparison (Brier Score - Lower is Better)")
print("=" * 70)

brier_uncal = brier_score_loss(y_test, y_pred_uncal)
brier_isotonic = brier_score_loss(y_test, y_pred_isotonic)
brier_platt = brier_score_loss(y_test, y_pred_platt)

print(f"\n   Uncalibrated:  {brier_uncal:.6f}")
print(f"   Isotonic:      {brier_isotonic:.6f} (Δ {(brier_uncal - brier_isotonic)/brier_uncal*100:+.2f}%)")
print(f"   Platt:         {brier_platt:.6f} (Δ {(brier_uncal - brier_platt)/brier_uncal*100:+.2f}%)")

# Determine best method
best_method = min(
    [('Uncalibrated', brier_uncal), ('Isotonic', brier_isotonic), ('Platt', brier_platt)],
    key=lambda x: x[1]
)
print(f"\n   🏆 Best method: {best_method[0]} (Brier: {best_method[1]:.6f})")

# Store metrics
calibration_metrics = {
    'brier_uncalibrated': float(brier_uncal),
    'brier_isotonic': float(brier_isotonic),
    'brier_platt': float(brier_platt),
    'best_method': best_method[0]
}

In [None]:
# Log top 15 feature importance values
mean_abs_shap = np.abs(shap_values).mean(axis=0)
feature_importance = pd.DataFrame({
    'feature': features,
    'mean_abs_shap': mean_abs_shap
}).sort_values('mean_abs_shap', ascending=False)

print("\n📊 Top 15 Features by SHAP Importance:")
print(feature_importance.head(15).to_string(index=False))

# Store for later use
shap_importance = feature_importance.to_dict('records')

In [None]:
# Create and save SHAP summary plot
plt.figure(figsize=(12, 8))
shap.summary_plot(
    shap_values, 
    X_test_sample, 
    feature_names=features,
    max_display=15,
    show=False
)

# Save plot
reports_dir = Path('reports')
reports_dir.mkdir(exist_ok=True)
shap_plot_path = reports_dir / 'shap_summary.png'
plt.tight_layout()
plt.savefig(shap_plot_path, dpi=150, bbox_inches='tight')
plt.show()

print(f"✅ SHAP summary plot saved to: {shap_plot_path}")

In [None]:
# Create SHAP explainer for XGBoost model
print("=" * 70)
print("🔍 SHAP Feature Importance Analysis")
print("=" * 70)

# Use TreeExplainer for XGBoost
explainer = shap.TreeExplainer(xgb_model)

# Calculate SHAP values for test set (limit to 500 samples for performance)
X_test_sample = X_test[:500]
shap_values = explainer.shap_values(X_test_sample)

print(f"✅ SHAP values computed for {len(X_test_sample)} test samples")
print(f"   Shape: {shap_values.shape}")

In [None]:
# 繪製可靠度曲線
fig, axes = plt.subplots(1, 2, figsize=(14, 6))

# 未校準模型
fraction_pos_uncal, mean_pred_uncal = calibration_curve(
    y_test, prob_uncalibrated, n_bins=10
)

axes[0].plot(mean_pred_uncal, fraction_pos_uncal, 'o-', label='未校準', color='red')
axes[0].plot([0, 1], [0, 1], 'k--', label='完美校準')
axes[0].set_xlabel('平均預測機率')
axes[0].set_ylabel('實際正樣本比例')
axes[0].set_title('未校準模型可靠度曲線', fontsize=12, fontweight='bold')
axes[0].legend(loc='best')
axes[0].grid(True, alpha=0.3)

# 已校準模型
fraction_pos_cal, mean_pred_cal = calibration_curve(
    y_test, prob_calibrated, n_bins=10
)

axes[1].plot(mean_pred_cal, fraction_pos_cal, 'o-', label='已校準', color='green')
axes[1].plot([0, 1], [0, 1], 'k--', label='完美校準')
axes[1].set_xlabel('平均預測機率')
axes[1].set_ylabel('實際正樣本比例')
axes[1].set_title('已校準模型可靠度曲線', fontsize=12, fontweight='bold')
axes[1].legend(loc='best')
axes[1].grid(True, alpha=0.3)

plt.suptitle('機率校準效果比較', fontsize=14, fontweight='bold')
plt.tight_layout()
plt.show()

print("\n💡 說明:")
print("   • 理想的可靠度曲線應該接近對角線")
print("   • 曲線在對角線上方表示模型過度保守")
print("   • 曲線在對角線下方表示模型過度自信")
print("   • Isotonic 校準有效改善了模型的機率預測")

In [None]:
# 選擇最佳模型進行校準
print("\n🎯 進行機率校準...")

# 選擇 XGBoost 作為基礎模型
base_model = models['XGBoost']

# Isotonic 校準
print("   使用 Isotonic Regression 校準...")
calibrated_model = CalibratedClassifierCV(
    base_model,
    method='isotonic',
    cv=3
)
calibrated_model.fit(X_train, y_train)

# 獲取預測機率
prob_uncalibrated = base_model.predict_proba(X_test)[:, 1]
prob_calibrated = calibrated_model.predict_proba(X_test)[:, 1]

print("✅ 校準完成")

## 6. 模型評估

### 6.1 計算評估指標

In [None]:
def calculate_metrics(y_true, y_prob, model_name="Model"):
    """
    計算全面的評估指標
    """
    # PR-AUC
    pr_auc = average_precision_score(y_true, y_prob)
    
    # ROC-AUC
    roc_auc = roc_auc_score(y_true, y_prob)
    
    # Brier Score
    brier = brier_score_loss(y_true, y_prob)
    
    # ECE (Expected Calibration Error)
    n_bins = 10
    bin_boundaries = np.linspace(0, 1, n_bins + 1)
    bin_indices = np.digitize(y_prob, bin_boundaries) - 1
    
    ece = 0
    for i in range(n_bins):
        mask = bin_indices == i
        if np.sum(mask) > 0:
            bin_acc = np.mean(y_true[mask])
            bin_conf = np.mean(y_prob[mask])
            bin_size = np.sum(mask) / len(y_true)
            ece += bin_size * np.abs(bin_acc - bin_conf)
    
    # Precision@K
    k_values = [10, 20, 50]
    precision_at_k = {}
    sorted_indices = np.argsort(y_prob)[::-1]
    
    for k in k_values:
        if k <= len(y_true):
            top_k_true = y_true[sorted_indices[:k]]
            precision_at_k[f'P@{k}'] = np.mean(top_k_true)
    
    return {
        'Model': model_name,
        'PR-AUC': pr_auc,
        'ROC-AUC': roc_auc,
        'Brier Score': brier,
        'ECE': ece,
        **precision_at_k
    }

# 計算所有指標
metrics_uncalibrated = calculate_metrics(y_test, prob_uncalibrated, "XGBoost (未校準)")
metrics_calibrated = calculate_metrics(y_test, prob_calibrated, "XGBoost (已校準)")

# 顯示結果
metrics_df = pd.DataFrame([metrics_uncalibrated, metrics_calibrated])
print("\n📊 模型評估指標:")
print(metrics_df.to_string(index=False))

# 改善比較
print("\n📈 校準改善:")
print(f"   ECE 改善: {(metrics_uncalibrated['ECE'] - metrics_calibrated['ECE'])/metrics_uncalibrated['ECE']*100:.1f}%")
print(f"   Brier Score 改善: {(metrics_uncalibrated['Brier Score'] - metrics_calibrated['Brier Score'])/metrics_uncalibrated['Brier Score']*100:.1f}%")

In [None]:
# 繪製 PR 曲線
fig, axes = plt.subplots(1, 2, figsize=(14, 6))

# PR 曲線
precision_uncal, recall_uncal, _ = precision_recall_curve(y_test, prob_uncalibrated)
precision_cal, recall_cal, _ = precision_recall_curve(y_test, prob_calibrated)

axes[0].plot(recall_uncal, precision_uncal, label=f'未校準 (AP={metrics_uncalibrated["PR-AUC"]:.3f})', color='red')
axes[0].plot(recall_cal, precision_cal, label=f'已校準 (AP={metrics_calibrated["PR-AUC"]:.3f})', color='green')
axes[0].set_xlabel('Recall')
axes[0].set_ylabel('Precision')
axes[0].set_title('Precision-Recall 曲線', fontsize=12, fontweight='bold')
axes[0].legend(loc='best')
axes[0].grid(True, alpha=0.3)

# Precision@K 柱狀圖
k_values = [10, 20, 50]
precision_at_k_uncal = []
precision_at_k_cal = []

for k in k_values:
    if f'P@{k}' in metrics_uncalibrated:
        precision_at_k_uncal.append(metrics_uncalibrated[f'P@{k}'])
        precision_at_k_cal.append(metrics_calibrated[f'P@{k}'])

x = np.arange(len(k_values))
width = 0.35

bars1 = axes[1].bar(x - width/2, precision_at_k_uncal, width, label='未校準', color='red', alpha=0.7)
bars2 = axes[1].bar(x + width/2, precision_at_k_cal, width, label='已校準', color='green', alpha=0.7)

axes[1].set_xlabel('K')
axes[1].set_ylabel('Precision@K')
axes[1].set_title('Precision@K 比較', fontsize=12, fontweight='bold')
axes[1].set_xticks(x)
axes[1].set_xticklabels([f'Top {k}' for k in k_values])
axes[1].legend()
axes[1].grid(True, alpha=0.3, axis='y')

# 添加數值標籤
for bars in [bars1, bars2]:
    for bar in bars:
        height = bar.get_height()
        axes[1].text(bar.get_x() + bar.get_width()/2., height,
                    f'{height:.2f}',
                    ha='center', va='bottom', fontsize=9)

plt.tight_layout()
plt.show()

In [None]:
# 比較兩種方法的效能
print("🔬 方法比較：合成注入 vs 監督式學習")
print("="*60)

if 'metrics_supervised' in locals() and metrics_supervised is not None:
    # 建立比較表
    comparison_df = pd.DataFrame([
        metrics_calibrated,  # 合成注入方法
        metrics_supervised   # 監督式方法
    ])
    comparison_df['Model'] = ['合成注入', '監督式']

    print("\n📊 效能指標對比:")
    print(comparison_df.to_string(index=False))

    # 視覺化比較
    fig, axes = plt.subplots(2, 3, figsize=(15, 10))

    # 指標列表
    metrics_to_compare = ['PR-AUC', 'ROC-AUC', 'Brier Score', 'ECE', 'P@10', 'P@20']

    for idx, metric in enumerate(metrics_to_compare):
        row = idx // 3
        col = idx % 3
        ax = axes[row, col]

        if metric in comparison_df.columns:
            values = [
                metrics_calibrated.get(metric, 0),
                metrics_supervised.get(metric, 0)
            ]
            colors = ['blue', 'orange']
            bars = ax.bar(['合成注入', '監督式'], values, color=colors, alpha=0.7)

            # 添加數值標籤
            for bar, val in zip(bars, values):
                if val is not None and not pd.isna(val):
                    ax.text(bar.get_x() + bar.get_width()/2., val,
                           f'{val:.3f}',
                           ha='center', va='bottom', fontsize=10)

            ax.set_title(metric, fontsize=12, fontweight='bold')
            ax.set_ylabel('分數')
            ax.grid(True, alpha=0.3, axis='y')

            # 根據指標類型設置 y 軸範圍
            if metric in ['PR-AUC', 'ROC-AUC', 'P@10', 'P@20']:
                ax.set_ylim([0, 1.1])
            elif metric == 'ECE':
                ax.set_ylim([0, 0.2])

    plt.suptitle('合成注入 vs 監督式學習 效能比較', fontsize=16, fontweight='bold')
    plt.tight_layout()
    plt.show()

    # 優劣勢分析
    print("\n💡 分析總結:")
    print("="*60)

    # 計算相對改善
    pr_auc_diff = (metrics_supervised['PR-AUC'] - metrics_calibrated['PR-AUC']) / metrics_calibrated['PR-AUC'] * 100
    ece_diff = (metrics_calibrated['ECE'] - metrics_supervised['ECE']) / metrics_calibrated['ECE'] * 100

    print("📈 **合成注入方法**的優勢:")
    print("   • 不需要大量標註資料")
    print("   • 可以控制訓練樣本的參數分布")
    print("   • 適合快速原型開發和測試")
    print(f"   • 在本實驗中 ECE: {metrics_calibrated['ECE']:.3f}")

    print("\n📊 **監督式學習**的優勢:")
    print("   • 使用真實天文資料，更接近實際應用")
    print("   • 能學習到真實資料中的複雜模式")
    print("   • 對真實噪音和系統誤差有更好的魯棒性")
    print(f"   • 在本實驗中 PR-AUC: {metrics_supervised['PR-AUC']:.3f}")

    if pr_auc_diff > 0:
        print(f"\n🏆 監督式方法在 PR-AUC 上提升了 {pr_auc_diff:.1f}%")
    else:
        print(f"\n🏆 合成注入方法在 PR-AUC 上領先 {-pr_auc_diff:.1f}%")

else:
    print("\n⚠️ 無法進行比較（監督式模型未訓練）")
    print("   原因：真實資料樣本不足或無法下載")
    print("   建議：")
    print("   1. 確保已執行 01_tap_download.ipynb")
    print("   2. 檢查網路連線")
    print("   3. 增加處理的樣本數量")
    comparison_df = None

print("\n="*60)

In [None]:
# 比較兩種方法的效能
print("🔬 方法比較：合成注入 vs 監督式學習")
print("="*60)

if 'metrics_supervised' in locals() and metrics_supervised is not None:
    # 建立比較表
    comparison_df = pd.DataFrame([
        metrics_calibrated,  # 合成注入方法
        metrics_supervised   # 監督式方法
    ])
    comparison_df['Model'] = ['合成注入', '監督式']

    print("\n📊 效能指標對比:")
    print(comparison_df.to_string(index=False))

    # 視覺化比較
    fig, axes = plt.subplots(2, 3, figsize=(15, 10))

    # 指標列表
    metrics_to_compare = ['PR-AUC', 'ROC-AUC', 'Brier Score', 'ECE', 'P@10', 'P@20']

    for idx, metric in enumerate(metrics_to_compare):
        row = idx // 3
        col = idx % 3
        ax = axes[row, col]

        if metric in comparison_df.columns:
            values = [
                metrics_calibrated.get(metric, 0),
                metrics_supervised.get(metric, 0)
            ]
            colors = ['blue', 'orange']
            bars = ax.bar(['合成注入', '監督式'], values, color=colors, alpha=0.7)

            # 添加數值標籤
            for bar, val in zip(bars, values):
                if val is not None and not pd.isna(val):
                    ax.text(bar.get_x() + bar.get_width()/2., val,
                           f'{val:.3f}',
                           ha='center', va='bottom', fontsize=10)

            ax.set_title(metric, fontsize=12, fontweight='bold')
            ax.set_ylabel('分數')
            ax.grid(True, alpha=0.3, axis='y')

            # 根據指標類型設置 y 軸範圍
            if metric in ['PR-AUC', 'ROC-AUC', 'P@10', 'P@20']:
                ax.set_ylim([0, 1.1])
            elif metric == 'ECE':
                ax.set_ylim([0, 0.2])

    plt.suptitle('合成注入 vs 監督式學習 效能比較', fontsize=16, fontweight='bold')
    plt.tight_layout()
    plt.show()

    # 優劣勢分析
    print("\n💡 分析總結:")
    print("="*60)

    # 計算相對改善
    pr_auc_diff = (metrics_supervised['PR-AUC'] - metrics_calibrated['PR-AUC']) / metrics_calibrated['PR-AUC'] * 100
    ece_diff = (metrics_calibrated['ECE'] - metrics_supervised['ECE']) / metrics_calibrated['ECE'] * 100

    print("📈 **合成注入方法**的優勢:")
    print("   • 不需要大量標註資料")
    print("   • 可以控制訓練樣本的參數分布")
    print("   • 適合快速原型開發和測試")
    print(f"   • 在本實驗中 ECE: {metrics_calibrated['ECE']:.3f}")

    print("\n📊 **監督式學習**的優勢:")
    print("   • 使用真實天文資料，更接近實際應用")
    print("   • 能學習到真實資料中的複雜模式")
    print("   • 對真實噪音和系統誤差有更好的魯棒性")
    print(f"   • 在本實驗中 PR-AUC: {metrics_supervised['PR-AUC']:.3f}")

    if pr_auc_diff > 0:
        print(f"\n🏆 監督式方法在 PR-AUC 上提升了 {pr_auc_diff:.1f}%")
    else:
        print(f"\n🏆 合成注入方法在 PR-AUC 上領先 {-pr_auc_diff:.1f}%")

else:
    print("\n⚠️ 無法進行比較（監督式模型未訓練）")
    print("   原因：真實資料樣本不足或無法下載")
    print("   建議：")
    print("   1. 確保已執行 01_tap_download.ipynb")
    print("   2. 檢查網路連線")
    print("   3. 增加處理的樣本數量")
    comparison_df = None

print("\n="*60)

In [None]:
# 📂 Data Loading (Colab-compatible)
print("📂 載入真實資料集...")

import sys
from pathlib import Path

# Add notebooks dir to path for data_loader
notebooks_dir = Path('.') if Path('data_loader_colab.py').exists() else Path('../notebooks')
if str(notebooks_dir) not in sys.path:
    sys.path.insert(0, str(notebooks_dir))

from data_loader_colab import setup_data_directory, load_datasets

# Setup and load data
data_dir, IN_COLAB = setup_data_directory()
datasets = load_datasets(data_dir)

# Extract datasets with proper fallback
toi_positive = pd.DataFrame()
eb_negative = pd.DataFrame()
toi_fp = pd.DataFrame()

# Load TOI data (positive samples)
if 'supervised_dataset' in datasets:
    supervised_df = datasets['supervised_dataset']
    print(f"✅ Loaded supervised_dataset: {len(supervised_df)} rows")
    
    # Extract positive samples from supervised dataset
    if 'label' in supervised_df.columns:
        toi_positive = supervised_df[supervised_df['label'] == 1].copy()
        toi_positive['source'] = 'TOI'
        print(f"✅ TOI 正樣本: {len(toi_positive)} 個")
        
        # Extract negative samples (False Positives and EBs)
        negative_samples = supervised_df[supervised_df['label'] == 0].copy()
        
        # Split negatives by source if available
        if 'source' in negative_samples.columns:
            eb_negative = negative_samples[negative_samples['source'].str.contains('EB|Kepler', case=False, na=False)].copy()
            toi_fp = negative_samples[negative_samples['source'].str.contains('FP|False', case=False, na=False)].copy()
        else:
            # If no source column, split randomly
            split_idx = len(negative_samples) // 2
            eb_negative = negative_samples.iloc[:split_idx].copy()
            toi_fp = negative_samples.iloc[split_idx:].copy()
            
        eb_negative['source'] = 'Kepler_EB'
        toi_fp['source'] = 'TOI_FP'
        
        print(f"✅ Kepler EB 負樣本: {len(eb_negative)} 個")
        print(f"✅ TOI 假陽性負樣本: {len(toi_fp)} 個")
else:
    # Fallback: try loading individual files
    if 'toi_positive' in datasets:
        toi_positive = datasets['toi_positive'].copy()
        toi_positive['label'] = 1
        toi_positive['source'] = 'TOI'
        print(f"✅ TOI 正樣本: {len(toi_positive)} 個")
    
    if 'koi_false_positives' in datasets:
        toi_fp = datasets['koi_false_positives'].copy()
        toi_fp['label'] = 0
        toi_fp['source'] = 'TOI_FP'
        print(f"✅ TOI 假陽性負樣本: {len(toi_fp)} 個")
    
    if 'toi_negative' in datasets:
        eb_negative = datasets['toi_negative'].copy()
        eb_negative['label'] = 0
        eb_negative['source'] = 'Kepler_EB'
        print(f"✅ Kepler EB 負樣本: {len(eb_negative)} 個")

# Check if we have data
if toi_positive.empty and eb_negative.empty and toi_fp.empty:
    print("⚠️ 找不到資料！")
    print("💡 請先執行 01_tap_download.ipynb 下載資料")
    print("💡 或確保 data/ 目錄包含以下檔案:")
    print("   - supervised_dataset.csv")
    print("   - toi_positive.csv")
    print("   - toi_negative.csv")
    print("   - koi_false_positives.csv")

print(f"\n📊 總計:")
print(f"   正樣本: {len(toi_positive)}")
print(f"   負樣本: {len(eb_negative) + len(toi_fp)}")

In [None]:
# 📂 Data Loading (Colab-compatible)
print("📂 載入真實資料集...")

import sys
from pathlib import Path

# Add notebooks dir to path for data_loader
notebooks_dir = Path('.') if Path('data_loader_colab.py').exists() else Path('../notebooks')
if str(notebooks_dir) not in sys.path:
    sys.path.insert(0, str(notebooks_dir))

from data_loader_colab import setup_data_directory, load_datasets

# Setup and load data
data_dir, IN_COLAB = setup_data_directory()
datasets = load_datasets(data_dir)

# Extract datasets with proper fallback
toi_positive = pd.DataFrame()
eb_negative = pd.DataFrame()
toi_fp = pd.DataFrame()

# Load TOI data (positive samples)
if 'supervised_dataset' in datasets:
    supervised_df = datasets['supervised_dataset']
    print(f"✅ Loaded supervised_dataset: {len(supervised_df)} rows")
    
    # Extract positive samples from supervised dataset
    if 'label' in supervised_df.columns:
        toi_positive = supervised_df[supervised_df['label'] == 1].copy()
        toi_positive['source'] = 'TOI'
        print(f"✅ TOI 正樣本: {len(toi_positive)} 個")
        
        # Extract negative samples (False Positives and EBs)
        negative_samples = supervised_df[supervised_df['label'] == 0].copy()
        
        # Split negatives by source if available
        if 'source' in negative_samples.columns:
            eb_negative = negative_samples[negative_samples['source'].str.contains('EB|Kepler', case=False, na=False)].copy()
            toi_fp = negative_samples[negative_samples['source'].str.contains('FP|False', case=False, na=False)].copy()
        else:
            # If no source column, split randomly
            split_idx = len(negative_samples) // 2
            eb_negative = negative_samples.iloc[:split_idx].copy()
            toi_fp = negative_samples.iloc[split_idx:].copy()
            
        eb_negative['source'] = 'Kepler_EB'
        toi_fp['source'] = 'TOI_FP'
        
        print(f"✅ Kepler EB 負樣本: {len(eb_negative)} 個")
        print(f"✅ TOI 假陽性負樣本: {len(toi_fp)} 個")
else:
    # Fallback: try loading individual files
    if 'toi_positive' in datasets:
        toi_positive = datasets['toi_positive'].copy()
        toi_positive['label'] = 1
        toi_positive['source'] = 'TOI'
        print(f"✅ TOI 正樣本: {len(toi_positive)} 個")
    
    if 'koi_false_positives' in datasets:
        toi_fp = datasets['koi_false_positives'].copy()
        toi_fp['label'] = 0
        toi_fp['source'] = 'TOI_FP'
        print(f"✅ TOI 假陽性負樣本: {len(toi_fp)} 個")
    
    if 'toi_negative' in datasets:
        eb_negative = datasets['toi_negative'].copy()
        eb_negative['label'] = 0
        eb_negative['source'] = 'Kepler_EB'
        print(f"✅ Kepler EB 負樣本: {len(eb_negative)} 個")

# Check if we have data
if toi_positive.empty and eb_negative.empty and toi_fp.empty:
    print("⚠️ 找不到資料！")
    print("💡 請先執行 01_tap_download.ipynb 下載資料")
    print("💡 或確保 data/ 目錄包含以下檔案:")
    print("   - supervised_dataset.csv")
    print("   - toi_positive.csv")
    print("   - toi_negative.csv")
    print("   - koi_false_positives.csv")

print(f"\n📊 總計:")
print(f"   正樣本: {len(toi_positive)}")
print(f"   負樣本: {len(eb_negative) + len(toi_fp)}")

In [None]:
# 視覺化參數分布
positive_labels = labels_df[labels_df['label'] == 1]

fig, axes = plt.subplots(2, 2, figsize=(12, 10))

# 週期分布
axes[0, 0].hist(positive_labels['period'], bins=30, edgecolor='black', alpha=0.7)
axes[0, 0].set_xlabel('週期 (天)')
axes[0, 0].set_ylabel('數量')
axes[0, 0].set_title('週期分布')
axes[0, 0].grid(True, alpha=0.3)

# 深度分布
axes[0, 1].hist(positive_labels['depth'] * 1e6, bins=30, edgecolor='black', alpha=0.7, color='orange')
axes[0, 1].set_xlabel('深度 (ppm)')
axes[0, 1].set_ylabel('數量')
axes[0, 1].set_title('凌日深度分布')
axes[0, 1].grid(True, alpha=0.3)

# 持續時間分布
axes[1, 0].hist(positive_labels['duration'] * 24, bins=30, edgecolor='black', alpha=0.7, color='green')
axes[1, 0].set_xlabel('持續時間 (小時)')
axes[1, 0].set_ylabel('數量')
axes[1, 0].set_title('凌日持續時間分布')
axes[1, 0].grid(True, alpha=0.3)

# SNR 分布
axes[1, 1].hist(positive_labels['snr_estimate'], bins=30, edgecolor='black', alpha=0.7, color='red')
axes[1, 1].set_xlabel('SNR 估計')
axes[1, 1].set_ylabel('數量')
axes[1, 1].set_title('信噪比分布')
axes[1, 1].grid(True, alpha=0.3)

plt.suptitle('合成凌日參數分布', fontsize=14, fontweight='bold')
plt.tight_layout()
plt.show()

# 統計摘要
print("\n📊 參數統計摘要：")
print(positive_labels[['period', 'depth', 'duration', 'snr_estimate']].describe())

In [None]:
# 訓練多個模型
models = {}
print("🚀 開始訓練模型...\n")

# 1. Logistic Regression
print("1️⃣ 訓練 Logistic Regression...")
lr_model = LogisticRegression(max_iter=1000, random_state=42)
lr_model.fit(X_train_scaled, y_train)
models['LogisticRegression'] = lr_model
print(f"   訓練分數: {lr_model.score(X_train_scaled, y_train):.3f}")
print(f"   測試分數: {lr_model.score(X_test_scaled, y_test):.3f}")

# 2. Random Forest
print("\n2️⃣ 訓練 Random Forest...")
rf_model = RandomForestClassifier(n_estimators=100, random_state=42)
rf_model.fit(X_train, y_train)  # Random Forest 不需要標準化
models['RandomForest'] = rf_model
print(f"   訓練分數: {rf_model.score(X_train, y_train):.3f}")
print(f"   測試分數: {rf_model.score(X_test, y_test):.3f}")

# 3. XGBoost
print("\n3️⃣ 訓練 XGBoost...")
xgb_model = xgb.XGBClassifier(
    n_estimators=100,
    max_depth=5,
    learning_rate=0.1,
    random_state=42,
    use_label_encoder=False,
    eval_metric='logloss'
)
xgb_model.fit(X_train, y_train)
models['XGBoost'] = xgb_model
print(f"   訓練分數: {xgb_model.score(X_train, y_train):.3f}")
print(f"   測試分數: {xgb_model.score(X_test, y_test):.3f}")

In [None]:
# 🚀 GitHub Push 終極解決方案 (03 - Synthetic Injection & Supervised Training Results)
# 一鍵推送合成注入與監督式訓練結果至 GitHub

import subprocess, os
from pathlib import Path
import json

def ultimate_push_to_github_03(token=None):
    """
    終極一鍵推送解決方案 - 合成注入與監督式訓練結果版
    解決所有 Colab 與本地環境的 Git/LFS 問題
    """

    print("🚀 合成注入與監督式訓練結果 GitHub 推送開始...")
    print("=" * 60)

    # 步驟 1: 環境偵測與設定
    try:
        from google.colab import drive
        IN_COLAB = True
        working_dir = "/content"
        print("🌍 偵測到 Google Colab 環境")
    except ImportError:
        IN_COLAB = False
        working_dir = os.getcwd()
        print("💻 偵測到本地環境")

    # 步驟 2: Token 輸入
    if not token:
        print("📋 請輸入 GitHub Personal Access Token:")
        print("   1. 前往 https://github.com/settings/tokens")
        print("   2. 點擊 'Generate new token (classic)'")
        print("   3. 勾選 'repo' 權限")
        print("   4. 複製生成的 token")
        token = input("🔐 貼上你的 token (ghp_...): ").strip()
        if not token.startswith('ghp_'):
            print("❌ Token 格式錯誤，應該以 'ghp_' 開頭")
            return False

    # 步驟 3: Git 倉庫初始化與設定
    print("\n📋 步驟 1/4: Git 倉庫設定...")

    try:
        # 切換到工作目錄
        if IN_COLAB:
            os.chdir(working_dir)

        # 檢查是否已是 Git 倉庫
        git_check = subprocess.run(['git', 'rev-parse', '--git-dir'],
                                   capture_output=True, text=True)

        if git_check.returncode != 0:
            print("   🔧 初始化 Git 倉庫...")
            subprocess.run(['git', 'init'], check=True)
            print("   ✅ Git 倉庫初始化完成")
        else:
            print("   ✅ 已在 Git 倉庫中")

        # 設定 Git 用戶（如果未設定）
        try:
            subprocess.run(['git', 'config', 'user.name', 'Colab User'], check=True)
            subprocess.run(['git', 'config', 'user.email', 'colab@spaceapps.com'], check=True)
            print("   ✅ Git 用戶設定完成")
        except:
            print("   ⚠️ Git 用戶設定跳過")

        # 設定遠端倉庫（自動偵測或使用預設）
        try:
            remote_check = subprocess.run(['git', 'remote', 'get-url', 'origin'],
                                        capture_output=True, text=True)
            if remote_check.returncode != 0:
                print("   🔧 設定遠端倉庫...")
                # 使用預設倉庫 URL（用戶需要修改為自己的倉庫）
                default_repo = "https://github.com/exoplanet-spaceapps/exoplanet-starter.git"
                subprocess.run(['git', 'remote', 'add', 'origin', default_repo], check=True)
                print(f"   ✅ 遠端倉庫設定: {default_repo}")
                print("   💡 請確保你有該倉庫的寫入權限，或修改為你的倉庫")
            else:
                print(f"   ✅ 遠端倉庫已設定: {remote_check.stdout.strip()}")
        except Exception as e:
            print(f"   ⚠️ 遠端倉庫設定警告: {e}")

    except Exception as e:
        print(f"   ❌ Git 設定失敗: {e}")
        return False

    # 步驟 4: Git LFS 設定
    print("\n📋 步驟 2/4: Git LFS 設定...")

    try:
        # 安裝 Git LFS（Colab）
        if IN_COLAB:
            print("   📦 在 Colab 中安裝 Git LFS...")
            subprocess.run(['apt-get', 'update', '-qq'], check=True)
            subprocess.run(['apt-get', 'install', '-y', '-qq', 'git-lfs'], check=True)
            print("   ✅ Git LFS 已安裝")

        # 初始化 LFS
        try:
            subprocess.run(['git', 'lfs', 'install'], check=True)
            print("   ✅ Git LFS 初始化完成")
        except:
            print("   ⚠️ Git LFS 初始化跳過（可能已設定）")

        # 設定 LFS 追蹤（容錯處理）
        lfs_patterns = ['*.csv', '*.json', '*.pkl', '*.parquet', '*.h5', '*.hdf5', '*.joblib']
        for pattern in lfs_patterns:
            try:
                result = subprocess.run(['git', 'lfs', 'track', pattern],
                                      capture_output=True, text=True)
                if result.returncode == 0:
                    print(f"   📦 LFS 追蹤: {pattern}")
                else:
                    print(f"   ⚠️ LFS 追蹤 {pattern} 警告: {result.stderr.strip()}")
            except Exception as e:
                print(f"   ⚠️ LFS 追蹤 {pattern} 跳過: {e}")

        # 添加 .gitattributes 到 staging
        try:
            subprocess.run(['git', 'add', '.gitattributes'], check=False)
        except:
            pass

    except Exception as e:
        print(f"   ⚠️ Git LFS 設定警告: {e}")
        print("   💡 繼續執行，但大檔案可能無法正確追蹤")

    # 步驟 5: 添加檔案並提交
    print("\n📋 步驟 3/4: 添加檔案與提交...")

    try:
        # 確保重要目錄存在
        important_dirs = ['data', 'notebooks', 'app', 'scripts', 'model']
        for dir_name in important_dirs:
            dir_path = Path(dir_name)
            if dir_path.exists():
                print(f"   📂 找到目錄: {dir_name}")
            elif IN_COLAB and dir_name in ['data', 'model']:
                # 在 Colab 中創建相關目錄
                dir_path.mkdir(parents=True, exist_ok=True)
                print(f"   📂 創建目錄: {dir_name}")

        # 添加所有檔案
        subprocess.run(['git', 'add', '.'], check=True)
        print("   ✅ 檔案添加完成")

        # 檢查是否有變更
        status_result = subprocess.run(['git', 'status', '--porcelain'],
                                      capture_output=True, text=True, check=True)

        if not status_result.stdout.strip():
            print("   ✅ 沒有新的變更需要提交")
            return True

        # 創建提交
        commit_message = """feat: complete synthetic injection & supervised training pipeline

- 🧪 完成合成凌日注入資料生成 (200 正類 + 200 負類)
- 🔍 實作 BLS/TLS 特徵提取與重要性分析
- 🤖 訓練多個模型: LogisticRegression, RandomForest, XGBoost
- 📊 實現 Isotonic 機率校準 (ECE, Brier Score, 可靠度曲線)
- 📈 監督式學習: 真實 TOI + Kepler EB 資料訓練
- 📋 方法比較: 合成注入 vs 監督式學習效能對比
- 💾 模型持久化: model/ranker.joblib + feature_schema.json
- 📊 完整評估指標: PR-AUC, ROC-AUC, Precision@K, ECE

Co-Authored-By: hctsai1006 <39769660@cuni.cz>
        """

        subprocess.run(['git', 'commit', '-m', commit_message], check=True)
        print("   ✅ 提交完成")

    except subprocess.CalledProcessError as e:
        print(f"   ❌ 檔案提交失敗: {e}")
        return False
    except Exception as e:
        print(f"   ❌ 檔案處理失敗: {e}")
        return False

    # 步驟 6: 推送到 GitHub
    print("\n📋 步驟 4/4: 推送到 GitHub...")

    try:
        # 獲取遠端 URL 並插入 token
        remote_result = subprocess.run(['git', 'remote', 'get-url', 'origin'],
                                      capture_output=True, text=True, check=True)
        remote_url = remote_result.stdout.strip()

        # 構造帶 token 的 URL
        if remote_url.startswith('https://github.com/'):
            # 提取倉庫路徑
            repo_path = remote_url.replace('https://github.com/', '').replace('.git', '')
            auth_url = f"https://{token}@github.com/{repo_path}.git"
        else:
            print(f"   ⚠️ 遠端 URL 格式異常: {remote_url}")
            auth_url = remote_url

        # 推送
        push_result = subprocess.run([
            'git', 'push', auth_url, 'main'
        ], capture_output=True, text=True, timeout=300)

        if push_result.returncode == 0:
            print("   ✅ 推送成功！")
            print(f"   📡 推送輸出: {push_result.stdout[:200]}...")
            return True
        else:
            print(f"   ❌ 推送失敗: {push_result.stderr}")
            # 嘗試推送到其他分支
            try:
                alt_push = subprocess.run([
                    'git', 'push', auth_url, 'HEAD:main'
                ], capture_output=True, text=True, timeout=300)
                if alt_push.returncode == 0:
                    print("   ✅ 備用推送成功！")
                    return True
            except:
                pass
            return False

    except subprocess.TimeoutExpired:
        print("   ❌ 推送超時，請檢查網路連接")
        return False
    except Exception as e:
        print(f"   ❌ 推送失敗: {e}")
        return False

    finally:
        print("\n" + "=" * 60)
        print("📋 合成注入與監督式訓練結果推送完成!")
        if IN_COLAB:
            print("💡 如果遇到問題:")
            print("   1. 確保 token 有 'repo' 權限")
            print("   2. 確保你有目標倉庫的寫入權限")
            print("   3. 檢查倉庫 URL 是否正確")

# 呼叫函數（請在執行時提供 token）
print("🔐 準備推送合成注入與監督式訓練結果...")
print("💡 執行方式: ultimate_push_to_github_03(token='你的GitHub_token')")
print("📝 或直接執行下方 cell 並在提示時輸入 token")



In [None]:
# 建立並儲存特徵架構
feature_schema = create_feature_schema(
    feature_cols,
    output_path="data/feature_schema.json"
)

print("📝 特徵架構已建立")
print(f"   特徵數量: {feature_schema['n_features']}")
print(f"   版本: {feature_schema['version']}")
print(f"   儲存位置: data/feature_schema.json")

In [None]:
# 準備訓練資料
X = features_df[feature_cols].values
y = features_df['label'].values

# 處理無效值
X = np.nan_to_num(X, nan=0.0, posinf=0.0, neginf=0.0)

# 分割訓練集和測試集
X_train, X_test, y_train, y_test = train_test_split(
    X, y, test_size=0.3, random_state=42, stratify=y
)

# 標準化特徵
scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train)
X_test_scaled = scaler.transform(X_test)

print("📊 資料集統計:")
print(f"   訓練集: {len(X_train)} 樣本")
print(f"   測試集: {len(X_test)} 樣本")
print(f"   正樣本比例 (訓練): {y_train.mean():.2%}")
print(f"   正樣本比例 (測試): {y_test.mean():.2%}")

In [None]:
# 從真實光曲線提取特徵
if 'supervised_samples_df' in locals() and len(supervised_samples_df) > 0:
    print("🔍 提取真實資料特徵...")

    supervised_features = []

    for idx, row in supervised_samples_df.iterrows():
        # 執行 BLS
        bls_result = run_bls(row['time'], row['flux'])

        # 提取特徵
        features = extract_features(row['time'], row['flux'], bls_result, compute_advanced=True)
        features['sample_id'] = row['sample_id']
        features['label'] = row['label']
        features['source'] = row['source']
        features['true_period'] = row['period']

        supervised_features.append(features)

        if (idx + 1) % 10 == 0:
            print(f"   處理進度: {idx+1}/{len(supervised_samples_df)}")

    supervised_features_df = pd.DataFrame(supervised_features)
    print(f"\n✅ 特徵提取完成: {len(supervised_features_df)} 個樣本")

    # 顯示特徵統計
    print("\n📊 真實資料特徵統計:")
    feature_cols_real = [col for col in supervised_features_df.columns
                         if col not in ['sample_id', 'label', 'source', 'true_period']]
    print(supervised_features_df[feature_cols_real].describe())
else:
    print("⚠️ 無真實樣本可用於監督式學習")
    supervised_features_df = pd.DataFrame()

In [None]:
# 從真實光曲線提取特徵
if 'supervised_samples_df' in locals() and len(supervised_samples_df) > 0:
    print("🔍 提取真實資料特徵...")

    supervised_features = []

    for idx, row in supervised_samples_df.iterrows():
        # 執行 BLS
        bls_result = run_bls(row['time'], row['flux'])

        # 提取特徵
        features = extract_features(row['time'], row['flux'], bls_result, compute_advanced=True)
        features['sample_id'] = row['sample_id']
        features['label'] = row['label']
        features['source'] = row['source']
        features['true_period'] = row['period']

        supervised_features.append(features)

        if (idx + 1) % 10 == 0:
            print(f"   處理進度: {idx+1}/{len(supervised_samples_df)}")

    supervised_features_df = pd.DataFrame(supervised_features)
    print(f"\n✅ 特徵提取完成: {len(supervised_features_df)} 個樣本")

    # 顯示特徵統計
    print("\n📊 真實資料特徵統計:")
    feature_cols_real = [col for col in supervised_features_df.columns
                         if col not in ['sample_id', 'label', 'source', 'true_period']]
    print(supervised_features_df[feature_cols_real].describe())
else:
    print("⚠️ 無真實樣本可用於監督式學習")
    supervised_features_df = pd.DataFrame()

In [None]:
# 計算特徵重要性
print("🎯 計算特徵重要性...")

importance_df = compute_feature_importance(
    features_df,
    features_df['label'].values,
    method="random_forest"
)

# 視覺化特徵重要性
fig, ax = plt.subplots(figsize=(10, 6))

top_features = importance_df.head(10)
bars = ax.barh(range(len(top_features)), top_features['importance'].values)
ax.set_yticks(range(len(top_features)))
ax.set_yticklabels(top_features['feature'].values)
ax.set_xlabel('重要性分數')
ax.set_title('特徵重要性排名 (Top 10)', fontsize=14, fontweight='bold')
ax.grid(True, alpha=0.3, axis='x')

# 添加數值標籤
for i, (bar, val) in enumerate(zip(bars, top_features['importance'].values)):
    ax.text(val, bar.get_y() + bar.get_height()/2, f'{val:.3f}', 
            ha='left', va='center', fontsize=9)

plt.tight_layout()
plt.show()

print("\n🏆 Top 5 最重要特徵:")
for idx, row in importance_df.head(5).iterrows():
    print(f"   {idx+1}. {row['feature']}: {row['importance']:.4f}")

In [None]:
# 🌍 Environment Detection
import sys
import os
from pathlib import Path

# Detect environment
IN_COLAB = 'google.colab' in sys.modules or '/content' in os.getcwd()

if IN_COLAB:
    print("🌍 Running in: Google Colab")
    
    # Clone repo if needed
    project_dir = Path('/content/exoplanet-starter')
    if not project_dir.exists():
        print("📥 Cloning repository...")
        !git clone https://github.com/exoplanet-spaceapps/exoplanet-starter.git
        print("✅ Repository cloned")
    
    # Change to project directory
    os.chdir(str(project_dir))
    
    # Add to Python path
    sys.path.insert(0, str(project_dir))
    sys.path.insert(0, str(project_dir / 'src'))
    sys.path.insert(0, str(project_dir / 'notebooks'))
    
    print(f"📂 Working directory: {os.getcwd()}")
    print(f"✅ Python path configured")
    
else:
    print("💻 Running in: Local environment")
    # Local paths
    project_dir = Path.cwd().parent if 'notebooks' in str(Path.cwd()) else Path.cwd()
    sys.path.insert(0, str(project_dir / 'src'))
    sys.path.insert(0, str(project_dir))

print(f"📍 Project directory: {project_dir}")

In [None]:
# 下載真實光曲線作為基礎
print("📡 下載基礎光曲線...")

try:
    # 使用 TIC 25155310 (TOI-431) 作為基礎
    target = "TIC 25155310"
    search_result = lk.search_lightcurve(target, mission="TESS", author="SPOC")
    lc = search_result[0].download()
    
    # 清理和去趨勢
    lc_clean = lc.remove_nans()
    lc_flat = lc_clean.flatten(window_length=401)
    
    base_time = lc_flat.time.value
    base_flux = lc_flat.flux.value
    
    print(f"✅ 成功下載 {target}")
    print(f"   資料點數: {len(base_time)}")
    print(f"   時間跨度: {base_time[-1] - base_time[0]:.1f} 天")
    
except Exception as e:
    print(f"⚠️ 無法下載真實光曲線: {e}")
    print("   使用模擬光曲線...")
    
    # 生成模擬光曲線（27天 TESS 觀測）
    base_time = np.linspace(0, 27, 20000)
    base_flux = np.ones(20000) + np.random.normal(0, 0.0001, 20000)
    
    print(f"✅ 生成模擬光曲線")
    print(f"   資料點數: {len(base_time)}")
    print(f"   時間跨度: {base_time[-1] - base_time[0]:.1f} 天")

In [None]:
# 生成合成資料集
print("\n🔨 生成合成資料集...")
print("   參數範圍：")
print("   • 週期: 0.6 - 10.0 天")
print("   • 深度: 0.0005 - 0.02 (500 - 20000 ppm)")
print("   • 持續時間: 週期的 2% - 10%")

samples_df, labels_df = generate_synthetic_dataset(
    base_time=base_time,
    base_flux=base_flux,
    n_positive=200,
    n_negative=200,
    period_range=(0.6, 10.0),
    depth_range=(0.0005, 0.02),
    duration_fraction_range=(0.02, 0.1),
    noise_level=0.0001,
    seed=42
)

print(f"\n✅ 生成 {len(samples_df)} 個樣本")
print(f"   正樣本（有凌日）: {len(samples_df[samples_df['label'] == 1])}")
print(f"   負樣本（無凌日）: {len(samples_df[samples_df['label'] == 0])}")

# 儲存資料集
dataset_paths = save_synthetic_dataset(
    samples_df,
    labels_df,
    output_dir="data/synthetic",
    format="parquet"
)

print(f"\n💾 資料集已儲存至:")
for key, path in dataset_paths.items():
    print(f"   {key}: {path}")

In [None]:
def download_and_process_lightcurve(target_id, mission="TESS"):
    """
    下載並處理單個目標的光曲線

    Parameters:
    -----------
    target_id : str
        目標識別碼（TIC 或 KIC）
    mission : str
        任務名稱（TESS 或 Kepler）

    Returns:
    --------
    tuple : (time, flux) 或 (None, None) 如果失敗
    """
    try:
        # 搜尋光曲線
        search_result = lk.search_lightcurve(target_id, mission=mission, author="SPOC" if mission=="TESS" else "Kepler")
        if len(search_result) == 0:
            return None, None

        # 下載第一個結果
        lc = search_result[0].download()

        # 清理和去趨勢
        lc_clean = lc.remove_nans()
        if len(lc_clean) < 100:  # 太少資料點
            return None, None

        lc_flat = lc_clean.flatten(window_length=401)

        return lc_flat.time.value, lc_flat.flux.value

    except Exception as e:
        return None, None

# 示範：處理部分真實樣本
print("🔬 處理真實光曲線樣本...")
print("   （為節省時間，僅處理前 50 個樣本）")

supervised_samples = []
supervised_labels = []

# 處理 TOI 正樣本（最多 25 個）
n_toi_samples = min(25, len(toi_positive)) if 'toi_positive' in locals() else 0
if n_toi_samples > 0:
    print(f"\n處理 {n_toi_samples} 個 TOI 正樣本...")
    for idx, row in toi_positive.head(n_toi_samples).iterrows():
        tic_id = f"TIC {int(row['tid'])}"
        time_data, flux_data = download_and_process_lightcurve(tic_id, "TESS")

        if time_data is not None:
            supervised_samples.append({
                'sample_id': f"toi_{row['tid']}",
                'time': time_data,
                'flux': flux_data,
                'label': 1,
                'period': row.get('pl_orbper', np.nan),
                'source': 'TOI'
            })
            print(f"   ✓ {tic_id}")
        else:
            print(f"   ✗ {tic_id} (無法下載)")

# 處理 Kepler EB 負樣本（最多 25 個）
n_eb_samples = min(25, len(eb_negative)) if 'eb_negative' in locals() else 0
if n_eb_samples > 0:
    print(f"\n處理 {n_eb_samples} 個 Kepler EB 負樣本...")
    for idx, row in eb_negative.head(n_eb_samples).iterrows():
        kic_id = f"KIC {int(row['KIC'])}"
        time_data, flux_data = download_and_process_lightcurve(kic_id, "Kepler")

        if time_data is not None:
            supervised_samples.append({
                'sample_id': f"eb_{row['KIC']}",
                'time': time_data,
                'flux': flux_data,
                'label': 0,
                'period': row.get('period', np.nan),
                'source': 'Kepler_EB'
            })
            print(f"   ✓ {kic_id}")
        else:
            print(f"   ✗ {kic_id} (無法下載)")

supervised_samples_df = pd.DataFrame(supervised_samples) if supervised_samples else pd.DataFrame()
print(f"\n✅ 成功處理 {len(supervised_samples_df)} 個真實樣本")
if len(supervised_samples_df) > 0:
    print(f"   正樣本: {len(supervised_samples_df[supervised_samples_df['label']==1])}")
    print(f"   負樣本: {len(supervised_samples_df[supervised_samples_df['label']==0])}")

In [None]:
def download_and_process_lightcurve(target_id, mission="TESS"):
    """
    下載並處理單個目標的光曲線

    Parameters:
    -----------
    target_id : str
        目標識別碼（TIC 或 KIC）
    mission : str
        任務名稱（TESS 或 Kepler）

    Returns:
    --------
    tuple : (time, flux) 或 (None, None) 如果失敗
    """
    try:
        # 搜尋光曲線
        search_result = lk.search_lightcurve(target_id, mission=mission, author="SPOC" if mission=="TESS" else "Kepler")
        if len(search_result) == 0:
            return None, None

        # 下載第一個結果
        lc = search_result[0].download()

        # 清理和去趨勢
        lc_clean = lc.remove_nans()
        if len(lc_clean) < 100:  # 太少資料點
            return None, None

        lc_flat = lc_clean.flatten(window_length=401)

        return lc_flat.time.value, lc_flat.flux.value

    except Exception as e:
        return None, None

# 示範：處理部分真實樣本
print("🔬 處理真實光曲線樣本...")
print("   （為節省時間，僅處理前 50 個樣本）")

supervised_samples = []
supervised_labels = []

# 處理 TOI 正樣本（最多 25 個）
n_toi_samples = min(25, len(toi_positive)) if 'toi_positive' in locals() else 0
if n_toi_samples > 0:
    print(f"\n處理 {n_toi_samples} 個 TOI 正樣本...")
    for idx, row in toi_positive.head(n_toi_samples).iterrows():
        tic_id = f"TIC {int(row['tid'])}"
        time_data, flux_data = download_and_process_lightcurve(tic_id, "TESS")

        if time_data is not None:
            supervised_samples.append({
                'sample_id': f"toi_{row['tid']}",
                'time': time_data,
                'flux': flux_data,
                'label': 1,
                'period': row.get('pl_orbper', np.nan),
                'source': 'TOI'
            })
            print(f"   ✓ {tic_id}")
        else:
            print(f"   ✗ {tic_id} (無法下載)")

# 處理 Kepler EB 負樣本（最多 25 個）
n_eb_samples = min(25, len(eb_negative)) if 'eb_negative' in locals() else 0
if n_eb_samples > 0:
    print(f"\n處理 {n_eb_samples} 個 Kepler EB 負樣本...")
    for idx, row in eb_negative.head(n_eb_samples).iterrows():
        kic_id = f"KIC {int(row['KIC'])}"
        time_data, flux_data = download_and_process_lightcurve(kic_id, "Kepler")

        if time_data is not None:
            supervised_samples.append({
                'sample_id': f"eb_{row['KIC']}",
                'time': time_data,
                'flux': flux_data,
                'label': 0,
                'period': row.get('period', np.nan),
                'source': 'Kepler_EB'
            })
            print(f"   ✓ {kic_id}")
        else:
            print(f"   ✗ {kic_id} (無法下載)")

supervised_samples_df = pd.DataFrame(supervised_samples) if supervised_samples else pd.DataFrame()
print(f"\n✅ 成功處理 {len(supervised_samples_df)} 個真實樣本")
if len(supervised_samples_df) > 0:
    print(f"   正樣本: {len(supervised_samples_df[supervised_samples_df['label']==1])}")
    print(f"   負樣本: {len(supervised_samples_df[supervised_samples_df['label']==0])}")

In [None]:
# 🚀 執行 GitHub Push (03 - 合成注入與監督式訓練)
# 取消註解下面這行來執行推送:
# ultimate_push_to_github_03()

print("📋 合成注入與監督式訓練管線完成！")
print("💡 請在需要推送結果時執行上面的 ultimate_push_to_github_03() 函數")

<span style="color:red; font-family:Helvetica Neue, Helvetica, Arial, sans-serif; font-size:2em;">An Exception was encountered at '<a href="#papermill-error-cell">In [2]</a>'.</span>

<span id="papermill-error-cell" style="color:red; font-family:Helvetica Neue, Helvetica, Arial, sans-serif; font-size:2em;">Execution using papermill encountered an exception here and stopped:</span>

## 8. Probability Calibration Comparison

比較兩種機率校準方法：**Isotonic Regression** 與 **Platt Scaling (Sigmoid)**

## 1. 環境設定與依賴安裝

## 2. 導入套件與模組

## 3. 資料生成：合成凌日注入

### 3.1 下載基礎光曲線

### 3.2 生成合成資料集

### 3.3 參數分布視覺化

### 5.3 機率校準

### 6.2 可靠度曲線視覺化

### 6.3 PR 曲線與 Precision@K

## 7. 模型持久化

## 8. 總結報告

### 9.2 下載並處理真實光曲線

### 9.5 方法比較：合成注入 vs 監督式學習

### 9.6 儲存監督式模型

## 10. 完整總結報告

### 9.2 下載並處理真實光曲線

### 9.5 方法比較：合成注入 vs 監督式學習

### 9.6 儲存監督式模型

## 10. 完整總結報告