In [1]:
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split, KFold
from sklearn.preprocessing import StandardScaler, RobustScaler
from sklearn.impute import KNNImputer
from sklearn.feature_selection import SelectFromModel
from sklearn.ensemble import StackingRegressor, RandomForestRegressor, GradientBoostingRegressor,VotingRegressor
from sklearn.linear_model import RidgeCV, LassoCV
from catboost import CatBoostRegressor, CatBoostClassifier
from sklearn.metrics import mean_squared_error
import matplotlib.pyplot as plt
from sklearn.neighbors import LocalOutlierFactor
from sklearn.preprocessing import OrdinalEncoder, StandardScaler, OneHotEncoder
from sklearn.pipeline import Pipeline, make_pipeline
from sklearn.compose import make_column_transformer, ColumnTransformer
from sklearn.model_selection import cross_val_score, cross_validate, KFold,TimeSeriesSplit,PredefinedSplit
from sklearn.base import BaseEstimator, TransformerMixin, RegressorMixin, ClassifierMixin, clone
from sklearn.ensemble import RandomForestRegressor
from sklearn.kernel_ridge import KernelRidge
from sklearn.neighbors import KNeighborsRegressor 
from sklearn.linear_model import ElasticNet, Lasso
from tqdm import tqdm
from sklearn.feature_selection import mutual_info_regression
from sklearn.impute import SimpleImputer
from sklearn.preprocessing import FunctionTransformer, RobustScaler
from sklearn.datasets import make_regression
from scipy import stats
from sklearn.metrics import r2_score 
from sklearn.base import BaseEstimator, TransformerMixin
from statsmodels.tsa.seasonal import STL
from scipy.optimize import curve_fit
from scipy.signal import detrend
from sklearn.linear_model import LinearRegression
from pprint import pprint
from sklearn.cluster import KMeans
from sklearn.metrics import silhouette_score
from catboost import CatBoostRegressor, CatBoostClassifier
from sklearn.base import BaseEstimator, TransformerMixin, RegressorMixin, ClassifierMixin, clone

In [84]:
train_file_path = "./hull-tactical-market-prediction/train.csv"
test_file_path = "./hull-tactical-market-prediction/test.csv"
train_df = pd.read_csv(train_file_path)
test_df = pd.read_csv(test_file_path)
print("Full train dataset shape is {}".format(train_df.shape))

Full train dataset shape is (9021, 98)


In [85]:
# ============================================================
# 严谨的三段数据分割函数（用于避免过拟合）
# ============================================================

def split_data_three_way(
    X, 
    y, 
    n_train_rows=None,
    n_validation_rows=None,
    n_test_rows=None,
    train_ratio=0.6,
    validation_ratio=0.2,
    test_ratio=0.2,
    verbose=1
):
    """
    严谨的三段数据分割函数，确保时间序列顺序和完整性
    
    参数:
    X: 特征数据（DataFrame 或 array-like）
    y: 目标数据（Series 或 array-like）
    n_train_rows: 训练段样本数（如果指定，优先使用）
    n_validation_rows: 验证段样本数（如果指定，优先使用）
    n_test_rows: 测试段样本数（如果指定，优先使用）
    train_ratio: 训练段比例（当未指定具体行数时使用）
    validation_ratio: 验证段比例（当未指定具体行数时使用）
    test_ratio: 测试段比例（当未指定具体行数时使用）
    verbose: 详细输出级别
    
    返回:
    (X_train, y_train, X_validation, y_validation, X_test, y_test, split_info)
    
    分割逻辑（时间顺序，从早到晚）:
    1. 训练段（最早）: 用于训练方向模型
    2. 验证段（中间）: 方向模型预测，Meta模型训练
    3. 测试段（最新）: 最终评估
    """
    # 转换为 DataFrame/Series 以便索引对齐
    if not isinstance(X, pd.DataFrame):
        X = pd.DataFrame(X)
    if not isinstance(y, pd.Series):
        y = pd.Series(y, index=X.index)
    
    total_rows = len(X)
    
    if verbose >= 1:
        print("="*70)
        print("三段数据分割（严谨模式 - 避免过拟合）")
        print("="*70)
        print(f"总样本数: {total_rows}")
    
    # 验证输入
    if len(X) != len(y):
        raise ValueError(f"X 和 y 长度不一致: X={len(X)}, y={len(y)}")
    
    if not X.index.equals(y.index):
        if verbose >= 1:
            print(f"警告: X 和 y 索引不完全一致，尝试对齐...")
        # 对齐索引
        common_index = X.index.intersection(y.index)
        X = X.loc[common_index]
        y = y.loc[common_index]
        total_rows = len(X)
        if verbose >= 1:
            print(f"对齐后样本数: {total_rows}")
    
    # 计算各段大小
    if n_test_rows is not None:
        n_test = n_test_rows
        remaining = total_rows - n_test
        
        if n_validation_rows is not None:
            n_validation = n_validation_rows
            n_train = remaining - n_validation
        else:
            n_validation = int(remaining * validation_ratio / (train_ratio + validation_ratio))
            n_train = remaining - n_validation
    else:
        if n_validation_rows is not None:
            n_validation = n_validation_rows
            remaining = total_rows - n_validation
            
            if n_train_rows is not None:
                n_train = n_train_rows
                n_test = remaining - n_train
            else:
                n_test = int(remaining * test_ratio / (train_ratio + test_ratio))
                n_train = remaining - n_test
        else:
            if n_train_rows is not None:
                n_train = n_train_rows
                remaining = total_rows - n_train
                n_validation = int(remaining * validation_ratio / (validation_ratio + test_ratio))
                n_test = remaining - n_validation
            else:
                # 使用比例
                n_train = int(total_rows * train_ratio)
                n_validation = int(total_rows * validation_ratio)
                n_test = total_rows - n_train - n_validation
    
    # 验证分割合理性
    if n_train < 10:
        raise ValueError(f"训练段样本数过少: {n_train}，至少需要10个样本")
    if n_validation < 10:
        raise ValueError(f"验证段样本数过少: {n_validation}，至少需要10个样本")
    if n_test < 10:
        raise ValueError(f"测试段样本数过少: {n_test}，至少需要10个样本")
    
    if n_train + n_validation + n_test != total_rows:
        # 调整测试段大小以匹配总数
        n_test = total_rows - n_train - n_validation
        if verbose >= 1:
            print(f"调整测试段大小: {n_test}")
    
    # 严谨的时间序列分割（确保顺序正确）
    # 训练段：最早的数据 [0:n_train]
    # 验证段：中间的数据 [n_train:n_train+n_validation]
    # 测试段：最新的数据 [n_train+n_validation:]
    
    train_end = n_train
    validation_end = n_train + n_validation
    
    X_train = X.iloc[:train_end].copy()
    y_train = y.iloc[:train_end].copy()
    
    X_validation = X.iloc[train_end:validation_end].copy()
    y_validation = y.iloc[train_end:validation_end].copy()
    
    X_test = X.iloc[validation_end:].copy()
    y_test = y.iloc[validation_end:].copy()
    
    # 验证分割结果
    assert len(X_train) == n_train, f"训练段长度不匹配: {len(X_train)} != {n_train}"
    assert len(X_validation) == n_validation, f"验证段长度不匹配: {len(X_validation)} != {n_validation}"
    assert len(X_test) == n_test, f"测试段长度不匹配: {len(X_test)} != {n_test}"
    assert len(X_train) + len(X_validation) + len(X_test) == total_rows, "总长度不匹配"
    
    # 验证时间顺序（如果索引是时间类型）
    if hasattr(X.index, 'is_monotonic_increasing'):
        if not X.index.is_monotonic_increasing:
            if verbose >= 1:
                print("警告: 索引不是单调递增，请确保数据已按时间排序")
    
    # 验证索引连续性
    train_indices = set(X_train.index)
    validation_indices = set(X_validation.index)
    test_indices = set(X_test.index)
    
    if train_indices & validation_indices:
        raise ValueError("训练段和验证段索引重叠！")
    if train_indices & test_indices:
        raise ValueError("训练段和测试段索引重叠！")
    if validation_indices & test_indices:
        raise ValueError("验证段和测试段索引重叠！")
    
    # 收集分割信息
    split_info = {
        'total_rows': total_rows,
        'n_train': n_train,
        'n_validation': n_validation,
        'n_test': n_test,
        'train_ratio': n_train / total_rows,
        'validation_ratio': n_validation / total_rows,
        'test_ratio': n_test / total_rows,
        'train_start_index': 0,
        'train_end_index': train_end,
        'validation_start_index': train_end,
        'validation_end_index': validation_end,
        'test_start_index': validation_end,
        'test_end_index': total_rows,
        'train_index_range': (X_train.index[0], X_train.index[-1]) if len(X_train) > 0 else None,
        'validation_index_range': (X_validation.index[0], X_validation.index[-1]) if len(X_validation) > 0 else None,
        'test_index_range': (X_test.index[0], X_test.index[-1]) if len(X_test) > 0 else None
    }
    
    if verbose >= 1:
        print(f"\n分割结果:")
        print(f"  训练段: {n_train} 个样本 ({n_train/total_rows:.2%})")
        print(f"    索引范围: {split_info['train_index_range']}")
        print(f"  验证段: {n_validation} 个样本 ({n_validation/total_rows:.2%})")
        print(f"    索引范围: {split_info['validation_index_range']}")
        print(f"  测试段: {n_test} 个样本 ({n_test/total_rows:.2%})")
        print(f"    索引范围: {split_info['test_index_range']}")
        print(f"\n✓ 数据分割完成，时间顺序验证通过")
        print("="*70)
    
    return X_train, y_train, X_validation, y_validation, X_test, y_test, split_info

print("✓ 已定义严谨的三段数据分割函数: split_data_three_way")

✓ 已定义严谨的三段数据分割函数: split_data_three_way


In [86]:
# ============================================================
# 特征选择器参数配置（可调节）
# ============================================================
# 在这里修改参数，所有使用 AutoFeatureSelectorWithLoss 的地方都会使用这些参数
feature_selector_params = {
    'n_clusters': None,              # 聚类数量，None 表示自动选择
    'min_cluster_size': 3,           # 每个聚类的最小特征数
    'max_clusters': 15,              # 最大聚类数量
    'loss_threshold': None,          # 损失函数阈值，高于此值的组将被删除（None 表示自动）
    'top_k_per_group': 0.65,        # 从每个组内保留最重要的特征比例（0-1之间）
    'clustering_method': 'kmeans',   # 聚类方法：'kmeans' 或 'hierarchical'
    'n_repeats': 3,                  # 置换测试的重复次数
    'random_state': 42,              # 随机种子
    'verbose_clustering': None       # 聚类日志详细程度，None 表示使用 verbose 的值
}

print("✓ 特征选择器参数配置已加载")


✓ 特征选择器参数配置已加载


In [87]:
TARGET = "market_forward_excess_returns"
FEATURES = [col for col in train_df.columns if col not in [TARGET]]


print("Preparing features (X)...")
X = train_df[FEATURES].copy()
X['forward_returns_lag'] = X['forward_returns'].shift(1)
X['risk_free_rate_lag'] = X['risk_free_rate'].shift(1)
X.drop('forward_returns', axis = 1, inplace = True)
X.drop('risk_free_rate', axis =1, inplace = True)


print("Preparing target (y) for model training...")
y = train_df[TARGET].copy()


#———————————————————————————————————————————————————————————————————————————————————————

# ============================================================
# 重要：不再在这里进行数据分割！
# 数据分割改为在主执行代码中使用 split_data_three_way() 完成
# 这样可以确保 X_train, X_validation, X_test 的索引不重叠
# ============================================================

# 保留原始数据的副本（用于诊断和验证）
X_original = X.copy()
y_original = y.copy()

print(f"\n原始数据准备完成:")
print(f"  X shape: {X.shape}")
print(f"  y shape: {y.shape}")
print(f"  X 索引范围: {X.index[0]} - {X.index[-1]}")
print(f"  y 索引范围: {y.index[0]} - {y.index[-1]}")

print("\n--- NA Counts ---")
print(f"X NAs: {X.isna().any().sum()}")
print(f"y NAs: {y.isna().any().sum()}")

print("\n" + "="*70)
print("注意：数据分割将在主执行代码中使用 split_data_three_way() 完成")
print("这确保了 X_train, X_validation, X_test 的索引不会重叠")
print("="*70)

Preparing features (X)...
Preparing target (y) for model training...

原始数据准备完成:
  X shape: (9021, 97)
  y shape: (9021,)
  X 索引范围: 0 - 9020
  y 索引范围: 0 - 9020

--- NA Counts ---
X NAs: 87
y NAs: 0

注意：数据分割将在主执行代码中使用 split_data_three_way() 完成
这确保了 X_train, X_validation, X_test 的索引不会重叠


In [None]:
# CatBoost 超参数配置
# 针对 250-350 个特征优化（经过特征选择后）
cat_parameters = {
    'iterations': 800,              # 最大迭代次数（保持，early stopping 会控制）
    'learning_rate': 0.004,          # 学习率（保持，小学习率更稳定）
    'depth': 6,                      # 树深度（从 5 增加到 6，捕捉更多特征交互）
    'min_data_in_leaf': 20,          # 叶节点最小样本数（从 19 增加到 20，增强正则化）
    'l2_leaf_reg': 7.0,              # L2 正则化（从 5.4 增加到 7.0，防止过拟合）
    'random_strength': 5.5,          # 随机强度（从 5.2 增加到 5.5，增加随机性）
    'colsample_bylevel': 0.78,       # 每层特征采样比例（从 0.86 降低到 0.78，增加随机性）
    'early_stopping_rounds': 50,     # 早停轮数（保持）
    'bootstrap_type': 'Bernoulli',   # 新增：Bernoulli 采样（增强正则化）
    'subsample': 0.85                # 新增：行采样比例（增强正则化）
}

# ============================================================
# 辅助类和函数
# ============================================================

class PandasPreprocessor(BaseEstimator, TransformerMixin):
    """
    简洁的预处理器：填充缺失值 + RobustScaler
    直接使用 pandas 操作，保持 DataFrame 格式，避免列名丢失问题
    """
    def __init__(self, impute_strategy='median'):
        self.impute_strategy = impute_strategy
        
    def fit(self, X, y=None):
        # 计算填充值（中位数）
        if self.impute_strategy == 'median':
            self.fill_values_ = X.median()
        elif self.impute_strategy == 'mean':
            self.fill_values_ = X.mean()
        else:
            self.fill_values_ = X.median()
        
        # 对于全 NaN 列，用 0 填充
        self.fill_values_ = self.fill_values_.fillna(0)
        
        # 计算 RobustScaler 参数
        X_filled = X.fillna(self.fill_values_)
        self.median_ = X_filled.median()
        q1 = X_filled.quantile(0.25)
        q3 = X_filled.quantile(0.75)
        self.iqr_ = q3 - q1
        # 避免除以零
        self.iqr_ = self.iqr_.replace(0, 1)
        
        self.columns_ = X.columns.tolist()
        return self

    def transform(self, X):
        # 只处理训练时见过的列
        X = X[[c for c in self.columns_ if c in X.columns]].copy()
        
        # 填充缺失值
        X = X.fillna(self.fill_values_)
        
        # RobustScaler: (X - median) / IQR
        X = (X - self.median_) / self.iqr_
        
        return X

# ============================================================
# 1. 定义 ad_sharpe_ratio
# ============================================================

def ad_sharpe_ratio_scorer(positions, market_excess_returns, risk_free_rate):
    """
    计算调整后的夏普率 (Adjusted Sharpe Ratio)
    
    参数:
    positions (np.array): 模型的最终仓位 (0 to 2)
    market_excess_returns (np.array): 市场的超额回报
    risk_free_rate (np.array): 同期的无风险利率
    """
    pos = np.asarray(positions, dtype=float)
    y_excess = np.asarray(market_excess_returns, dtype=float)
    rf = np.asarray(risk_free_rate, dtype=float)
    
    # 检查长度是否匹配
    min_len = min(len(pos), len(y_excess), len(rf))
    if min_len == 0:
        return 0.0
    
    # 如果长度不匹配，截取到最小长度
    if len(pos) != min_len or len(y_excess) != min_len or len(rf) != min_len:
        pos = pos[:min_len]
        y_excess = y_excess[:min_len]
        rf = rf[:min_len]
        
    TRADING_DAYS_PER_YR = 252
    epsilon = 1e-8

    # 重建总回报
    y_total = y_excess + rf
    
    # 计算策略的总回报
    strategy_total_returns = rf * (1 - pos) + pos * y_total
    
    # 计算策略的超额回报
    strategy_excess_returns = strategy_total_returns - rf
    
    # 检查输入数据中是否有 nan
    if np.isnan(pos).any() or np.isnan(y_excess).any() or np.isnan(rf).any():
        raise ValueError(
            f"ad_sharpe_ratio_scorer 输入包含 nan 值: "
            f"positions nan count={np.isnan(pos).sum()}, "
            f"market_excess nan count={np.isnan(y_excess).sum()}, "
            f"rf nan count={np.isnan(rf).sum()}"
        )
    
    # 计算年化夏普率
    mean_excess = np.mean(strategy_excess_returns)
    std_excess = np.std(strategy_excess_returns)
    
    if std_excess < epsilon:
        return 0.0
    
    sharpe_daily = mean_excess / std_excess
    sharpe_annualized = sharpe_daily * np.sqrt(TRADING_DAYS_PER_YR)
    
    return sharpe_annualized

def generate_hft_positions(
    y_pred_series: pd.Series, 
    y_true_lag1_series: pd.Series, 
    span_N: int = 60, 
    sensitivity_k: float = 1,
    allow_short: bool = False, # 新增：是否允许做空
    target_volatility: float = 0.20 # 新增：目标波动率（可选）
) -> pd.Series:
    
    # ... (前序数据对齐代码保持不变) ...
    if not isinstance(y_pred_series, pd.Series):
        y_pred_series = pd.Series(y_pred_series)
    if not isinstance(y_true_lag1_series, pd.Series):
        y_true_lag1_series = pd.Series(y_true_lag1_series)
    y_pred_aligned, y_true_lag1_aligned = y_pred_series.align(y_true_lag1_series, join='outer')

    # --- 改进 1: 更标准的波动率计算 ---
    # 使用标准差而不是平方均值，更稳健
    rolling_std = y_true_lag1_aligned.ewm(span=span_N, min_periods=max(5, span_N // 10)).std()
    
    # 填充缺失值和防止除零
    global_std = y_true_lag1_aligned.std() if not np.isnan(y_true_lag1_aligned.std()) else 1e-5
    rolling_std = rolling_std.fillna(global_std).clip(lower=1e-6)

    # --- 改进 2: 信号标准化 ---
    # 将预测值根据当前市场波动率进行标准化
    # 含义：当前的预测值相对于近期的波动来说，算强还是弱？
    normalized_pred = y_pred_aligned / rolling_std
    
    # --- 改进 3: 信号生成 (tanh) ---
    raw_signal = np.tanh(normalized_pred * sensitivity_k)
    
    # --- 改进 4: 灵活的方向控制 ---
    if allow_short:
        # 如果允许做空，仓位在 [-2, 2] 之间
        positions = raw_signal * 2 
    else:
        # 如果只做多，保留 Long 仓位，负预测值对应 0 仓位
        positions = np.maximum(0, raw_signal) * 2

    positions = positions.fillna(0.0)
    
    if not isinstance(positions, pd.Series):
        positions = pd.Series(positions, index=y_pred_aligned.index)
        
    return positions

print("✓ 已定义 ad_sharpe_ratio_scorer 和 generate_hft_positions")

✓ 已定义 ad_sharpe_ratio_scorer 和 generate_hft_positions


In [89]:
# ============================================================
# 2. 定义自动选择因子模型（具备损失函数来选择因子）
# ============================================================

import warnings
from scipy.cluster.hierarchy import linkage, fcluster
from scipy.spatial.distance import pdist, squareform
warnings.filterwarnings('ignore')

class AutoFeatureSelectorWithLoss(BaseEstimator, TransformerMixin):
    """
    自动因子筛选模型 - 使用损失函数来选择因子
    
    核心功能：
    1. 因子聚类（将相似因子分组）
    2. 使用自定义损失函数评估每个因子组的重要性
    3. 动态筛选：删除损失函数值高的组，保留损失函数值低的组
    4. 每次训练都重新筛选
    """
    
    def __init__(
        self,
        base_model,
        loss_function=None,  # 自定义损失函数，用于评估因子重要性
        n_clusters=None,
        min_cluster_size=3,
        max_clusters=15,
        loss_threshold=None,  # 损失函数阈值，高于此值的组将被删除
        top_k_per_group=0.65,  # 从 0.80 降低到 0.65，只保留每个组内最重要的 65% 特征
        clustering_method='kmeans',
        n_repeats=5,
        random_state=42,
        verbose=1,
        verbose_clustering=None  # 是否输出聚类日志，None表示使用verbose的值
    ):
        """
        参数:
        base_model: 基础模型（用于训练和评估）
        loss_function: 自定义损失函数，接收 (y_true, y_pred, positions, market_excess, rf) 返回损失值
        loss_threshold: 损失函数阈值，None表示自动选择
        verbose_clustering: 是否输出聚类相关日志，None表示使用verbose的值，True/False表示强制开启/关闭
        """
        self.base_model = base_model
        self.loss_function = loss_function
        self.n_clusters = n_clusters
        self.min_cluster_size = min_cluster_size
        self.max_clusters = max_clusters
        self.loss_threshold = loss_threshold
        self.top_k_per_group = top_k_per_group
        self.clustering_method = clustering_method
        self.n_repeats = n_repeats
        self.random_state = random_state
        self.verbose = verbose
        self.verbose_clustering = verbose_clustering if verbose_clustering is not None else verbose
        
        # 存储结果
        self.feature_groups_ = None
        self.group_loss_ = None
        self.selected_features_ = None
        self.feature_to_group_ = None
        self.clustering_model_ = None
    
    def _should_log_clustering(self, level=1):
        """判断是否应该输出聚类日志"""
        return self.verbose_clustering >= level
    
    def _default_loss_function(self, y_true, y_pred, positions, market_excess, rf):
        """
        默认损失函数：负的调整后夏普率
        
        损失函数设计原则：返回需要最小化的值
        - 如果夏普率越大越好，则损失 = -夏普率（越小越好）
        - 这样在置换测试中：损失增加 = 置换后损失 - 基线损失 > 0 表示因子重要
        
        示例：
        - 基准：Sharpe=2.0 -> Loss=-2.0
        - 置换后：Sharpe=1.0 -> Loss=-1.0
        - loss_increase = -1.0 - (-2.0) = 1.0 > 0，表示损失增加，因子重要
        """
        sharpe = ad_sharpe_ratio_scorer(positions, market_excess, rf)
        return -sharpe  # 返回负值，因为我们要最小化损失（损失越小，夏普率越大）
    
    def _compute_feature_correlation(self, X):
        """计算因子之间的相关性矩阵"""
        if self._should_log_clustering(2):
            print("  计算因子相关性矩阵...")
        
        # 处理输入：如果是 DataFrame，先处理 NaN
        if isinstance(X, pd.DataFrame):
            # 检查 NaN 情况
            nan_counts = X.isna().sum()
            if nan_counts.sum() > 0:
                if self._should_log_clustering(2):
                    print(f"  警告: 发现 {nan_counts.sum()} 个 NaN 值，使用前向填充和均值填充")
                # 先尝试前向填充，然后均值填充
                X_clean = X.fillna(method='ffill').fillna(X.mean())
            else:
                X_clean = X
            corr_matrix = X_clean.corr().abs()
        else:
            # 如果是 numpy 数组，转换为 DataFrame 并处理 NaN
            X_df = pd.DataFrame(X)
            nan_counts = X_df.isna().sum()
            if nan_counts.sum() > 0:
                if self._should_log_clustering(2):
                    print(f"  警告: 发现 {nan_counts.sum()} 个 NaN 值，使用均值填充")
                X_clean = X_df.fillna(X_df.mean())
            else:
                X_clean = X_df
            corr_matrix = X_clean.corr().abs()
        
        # 确保相关性矩阵中没有 NaN（如果仍有 NaN，用 0 填充）
        corr_matrix = corr_matrix.fillna(0)
        
        # 计算距离矩阵：1 - abs(correlation)
        # 相关性越高（接近1），距离越小（接近0）
        # 相关性越低（接近0），距离越大（接近1）
        distance_matrix = 1 - corr_matrix.values
        
        # 确保距离矩阵是有效的：
        # 1. 没有 NaN
        distance_matrix = np.nan_to_num(distance_matrix, nan=1.0, posinf=1.0, neginf=1.0)
        
        # 2. 确保非负（距离不能为负）
        distance_matrix = np.clip(distance_matrix, 0, 1)
        
        # 3. 确保对称（距离矩阵应该是对称的）
        distance_matrix = (distance_matrix + distance_matrix.T) / 2
        
        # 4. 对角线应该是 0（特征与自己的距离为 0）
        np.fill_diagonal(distance_matrix, 0)
        
        if self._should_log_clustering(2):
            print(f"  距离矩阵统计: min={distance_matrix.min():.4f}, max={distance_matrix.max():.4f}, "
                  f"mean={distance_matrix.mean():.4f}, has_nan={np.isnan(distance_matrix).any()}")
        
        return distance_matrix, corr_matrix
    
    def _find_optimal_clusters(self, X, distance_matrix):
        """自动选择最优聚类数量"""
        if self._should_log_clustering(1):
            print("  自动选择最优聚类数量...")
        
        n_features = X.shape[1]
        max_k = min(self.max_clusters, n_features // self.min_cluster_size, 15)
        min_k = max(2, n_features // 20)
        
        if max_k < min_k:
            return min_k
        
        best_k = min_k
        best_score = -1
        
        # 验证距离矩阵的有效性
        if np.isnan(distance_matrix).any() or np.isinf(distance_matrix).any():
            if self._should_log_clustering(1):
                print("  警告: 距离矩阵包含 NaN 或 Inf，跳过轮廓系数计算，使用默认聚类数量")
            return min_k
        
        for k in range(min_k, max_k + 1):
            try:
                if self.clustering_method == 'kmeans':
                    from sklearn.decomposition import PCA
                    # 对于 kmeans，使用原始数据（需要先处理 NaN）
                    X_clean = X.fillna(method='ffill').fillna(X.mean()) if isinstance(X, pd.DataFrame) else pd.DataFrame(X).fillna(method='ffill').fillna(pd.DataFrame(X).mean())
                    X_values = X_clean.values if isinstance(X_clean, pd.DataFrame) else X_clean
                    
                    # PCA 降维
                    n_components = min(k*2, n_features-1, X_values.shape[0]-1)
                    if n_components < 1:
                        n_components = 1
                    if X_values.shape[1] > n_components:
                        pca = PCA(n_components=n_components, random_state=self.random_state)
                        X_pca = pca.fit_transform(X_values)
                    else:
                        X_pca = X_values
                    
                    kmeans = KMeans(n_clusters=k, random_state=self.random_state, n_init=10)
                    labels = kmeans.fit_predict(X_pca)
                    
                    # 对于 kmeans，使用原始数据计算轮廓系数（不使用 precomputed 距离）
                    if len(np.unique(labels)) > 1 and X_pca.shape[0] > k:
                        sil_score = silhouette_score(X_pca, labels, metric='euclidean')
                        if sil_score > best_score:
                            best_score = sil_score
                            best_k = k
                else:
                    # 层次聚类：使用距离矩阵
                    # 确保距离矩阵是有效的
                    if np.isnan(distance_matrix).any() or np.isinf(distance_matrix).any():
                        continue
                    
                    linkage_matrix = linkage(squareform(distance_matrix), method='ward')
                    labels = fcluster(linkage_matrix, k, criterion='maxclust') - 1
                    
                    # 对于层次聚类，使用距离矩阵计算轮廓系数
                    if len(np.unique(labels)) > 1:
                        sil_score = silhouette_score(distance_matrix, labels, metric='precomputed')
                        if not np.isnan(sil_score) and sil_score > best_score:
                            best_score = sil_score
                            best_k = k
            except Exception as e:
                if self._should_log_clustering(2):
                    print(f"  警告: k={k} 时聚类失败: {str(e)}")
                continue
        
        if self._should_log_clustering(1):
            print(f"  最优聚类数量: {best_k} (轮廓系数: {best_score:.4f})")
        
        return best_k
    
    def _cluster_features(self, X):
        """对因子进行聚类分组"""
        if self._should_log_clustering(1):
            print("  对因子进行聚类...")
        
        feature_names = X.columns.tolist() if isinstance(X, pd.DataFrame) else [f'feature_{i}' for i in range(X.shape[1])]
        n_features = len(feature_names)
        
        # 如果特征数量太少，直接返回每个特征一个组
        if n_features <= self.min_cluster_size:
            feature_groups = {0: feature_names}
            feature_to_group = {name: 0 for name in feature_names}
            if self._should_log_clustering(1):
                print(f"  特征数量太少 ({n_features})，跳过聚类，每个特征一个组")
            return feature_groups, feature_to_group
        
        distance_matrix, corr_matrix = self._compute_feature_correlation(X)
        
        if self.n_clusters is None:
            n_clusters = self._find_optimal_clusters(X, distance_matrix)
        else:
            n_clusters = self.n_clusters
        
        # 确保聚类数量不超过特征数量
        n_clusters = min(n_clusters, n_features)
        
        if self.clustering_method == 'kmeans':
            from sklearn.decomposition import PCA
            # 对于特征聚类，我们需要转置数据：每个特征是一个样本
            # X 的形状是 (n_samples, n_features)，我们需要 (n_features, n_features) 的相关性表示
            # 或者使用特征的相关性矩阵进行降维
            n_components = min(n_clusters * 2, n_features - 1, 50, X.shape[0] - 1)
            if n_components < 1:
                n_components = 1
            
            # 使用特征的相关性矩阵进行降维（转置后每个特征是一个样本）
            if isinstance(X, pd.DataFrame):
                X_for_clustering = X.T.values  # 转置：每个特征是一个样本
            else:
                X_for_clustering = X.T
            
            # 如果特征数量大于样本数量，使用 PCA 降维
            if X_for_clustering.shape[1] > n_components:
                pca = PCA(n_components=n_components, random_state=self.random_state)
                X_pca = pca.fit_transform(X_for_clustering)
            else:
                X_pca = X_for_clustering
            
            kmeans = KMeans(n_clusters=n_clusters, random_state=self.random_state, n_init=10)
            labels = kmeans.fit_predict(X_pca)
            self.clustering_model_ = kmeans
        else:
            # 层次聚类：使用距离矩阵
            linkage_matrix = linkage(squareform(distance_matrix), method='ward')
            labels = fcluster(linkage_matrix, n_clusters, criterion='maxclust') - 1
        
        # 确保 labels 长度与 feature_names 匹配
        if len(labels) != len(feature_names):
            if self._should_log_clustering(1):
                print(f"  警告: labels 长度 ({len(labels)}) 与 feature_names 长度 ({len(feature_names)}) 不匹配")
            # 如果长度不匹配，使用索引范围
            min_len = min(len(labels), len(feature_names))
            labels = labels[:min_len]
            feature_names = feature_names[:min_len]
        
        feature_groups = {}
        feature_to_group = {}
        
        for i, label in enumerate(labels):
            if i >= len(feature_names):
                break
            if label not in feature_groups:
                feature_groups[label] = []
            feature_groups[label].append(feature_names[i])
            feature_to_group[feature_names[i]] = label
        
        filtered_groups = {}
        for group_id, features in feature_groups.items():
            if len(features) >= self.min_cluster_size:
                filtered_groups[group_id] = features
        
        if self._should_log_clustering(1):
            print(f"  聚类完成: {len(filtered_groups)} 个有效组 (总因子数: {len(feature_names)})")
        
        return filtered_groups, feature_to_group
    
    def _compute_group_loss(self, model, X, y, feature_groups, market_excess, rf):
        """
        使用损失函数计算每个因子组的重要性
        
        方法：对每个组的因子进行随机置换，使用已训练模型直接预测（推理阶段置换）
        损失增加越大，因子组越重要
        
        性能优化：不重新训练模型，直接使用已训练模型预测置换后的数据
        """
        if self.verbose >= 1:
            print("  使用损失函数计算因子组重要性（推理阶段置换，不重新训练）...")
        
        # 确保 market_excess 和 rf 与 X 的索引/长度对齐
        n_samples = len(X) if hasattr(X, '__len__') else (len(y) if hasattr(y, '__len__') else len(market_excess) if hasattr(market_excess, '__len__') else len(rf))
        
        # 处理 market_excess：确保长度匹配和索引对齐
        if isinstance(market_excess, pd.Series):
            if isinstance(X, pd.DataFrame):
                market_excess = market_excess.reindex(X.index, fill_value=np.nan).values
            else:
                market_excess = market_excess.values[:n_samples] if len(market_excess) >= n_samples else np.pad(market_excess.values, (0, n_samples - len(market_excess)), constant_values=np.nan)
        elif hasattr(market_excess, '__len__'):
            if len(market_excess) != n_samples:
                if len(market_excess) > n_samples:
                    market_excess = market_excess[:n_samples]
                else:
                    market_excess = np.pad(np.asarray(market_excess), (0, n_samples - len(market_excess)), constant_values=np.nan)
            market_excess = np.asarray(market_excess, dtype=float)
        else:
            market_excess = np.full(n_samples, np.nan)
        
        # 处理 rf：确保长度匹配和索引对齐
        if isinstance(rf, pd.Series):
            if isinstance(X, pd.DataFrame):
                rf = rf.reindex(X.index, fill_value=np.nan).values
            else:
                rf = rf.values[:n_samples] if len(rf) >= n_samples else np.pad(rf.values, (0, n_samples - len(rf)), constant_values=np.nan)
        elif hasattr(rf, '__len__'):
            if len(rf) != n_samples:
                if len(rf) > n_samples:
                    rf = rf[:n_samples]
                else:
                    rf = np.pad(np.asarray(rf), (0, n_samples - len(rf)), constant_values=np.nan)
            rf = np.asarray(rf, dtype=float)
        else:
            rf = np.full(n_samples, np.nan)
        
        # 检查是否有 nan 值（不应该有，如果有则报错）
        if np.isnan(market_excess).any():
            nan_count = np.isnan(market_excess).sum()
            nan_indices = np.where(np.isnan(market_excess))[0][:10]  # 只显示前10个
            raise ValueError(
                f"market_excess 包含 {nan_count} 个 nan 值（索引示例: {nan_indices.tolist()}）。"
                f"这不应该发生，请检查数据预处理流程。"
                f"X.shape={X.shape if hasattr(X, 'shape') else 'N/A'}, "
                f"market_excess.shape={market_excess.shape}, "
                f"X.index={X.index[:5].tolist() if isinstance(X, pd.DataFrame) else 'N/A'}..."
            )
        
        if np.isnan(rf).any():
            nan_count = np.isnan(rf).sum()
            nan_indices = np.where(np.isnan(rf))[0][:10]  # 只显示前10个
            raise ValueError(
                f"rf 包含 {nan_count} 个 nan 值（索引示例: {nan_indices.tolist()}）。"
                f"这不应该发生，请检查数据预处理流程。"
                f"X.shape={X.shape if hasattr(X, 'shape') else 'N/A'}, "
                f"rf.shape={rf.shape}, "
                f"X.index={X.index[:5].tolist() if isinstance(X, pd.DataFrame) else 'N/A'}..."
            )
        
        # 使用默认损失函数或自定义损失函数
        loss_func = self.loss_function if self.loss_function is not None else self._default_loss_function
        
        # 检测损失函数类型（使用函数名称检测，避免未定义函数的引用错误）
        loss_func_name = getattr(loss_func, '__name__', '')
        is_direction_loss = loss_func_name == 'direction_loss_function'
        is_metalabel_loss = loss_func_name == 'metalabel_loss_function'
        is_opportunity_loss = loss_func_name == 'opportunity_loss_function'
        
        # 使用已训练的模型（不再重新训练）
        # 注意：model 应该已经在外部训练好了
        y_pred_baseline = model.predict(X)
        
        # 生成仓位（根据损失函数类型决定）
        y_pred_series = pd.Series(y_pred_baseline, index=X.index if isinstance(X, pd.DataFrame) else range(len(y_pred_baseline)))
        
        if is_direction_loss or is_opportunity_loss or is_metalabel_loss:
            # 对于并行模型的损失函数，损失函数内部会自己处理 y_pred
            # positions 参数不会被使用，生成一个占位符
            positions_baseline = np.zeros(len(y_pred_series))
        else:
            # 对于其他模型（如默认损失函数），y_pred 是收益率，需要通过 generate_hft_positions 生成仓位
            forward_returns_lag = X['forward_returns_lag'] if 'forward_returns_lag' in X.columns else pd.Series(np.zeros(len(y)), index=y_pred_series.index)
            positions_baseline = generate_hft_positions(y_pred_series, forward_returns_lag).values
        
        # 计算基线损失
        baseline_loss = loss_func(y, y_pred_baseline, positions_baseline, market_excess, rf)
        
        # 检查基线损失是否为 nan 或 inf（不应该发生，如果有则报错）
        if np.isnan(baseline_loss) or np.isinf(baseline_loss):
            raise ValueError(
                f"基线损失计算异常 (baseline_loss={baseline_loss})。"
                f"这不应该发生，请检查："
                f"1. market_excess 和 rf 是否正确传递（无 nan）"
                f"2. positions_baseline 是否正确生成（无 nan）"
                f"3. 损失函数计算是否正确"
                f"market_excess nan count={np.isnan(market_excess).sum() if hasattr(market_excess, '__len__') else 'N/A'}, "
                f"rf nan count={np.isnan(rf).sum() if hasattr(rf, '__len__') else 'N/A'}, "
                f"positions_baseline nan count={np.isnan(positions_baseline).sum() if hasattr(positions_baseline, '__len__') else 'N/A'}"
            )
        
        group_loss = {}
        np.random.seed(self.random_state)
        
        # 对每个组进行损失函数测试
        for group_id, features in tqdm(feature_groups.items(), desc="  测试各组", disable=self.verbose < 1):
            valid_features = [f for f in features if f in X.columns]
            if len(valid_features) == 0:
                group_loss[group_id] = -np.inf  # 不重要组，损失增加为负
                continue
            
            loss_scores = []
            
            for repeat in range(self.n_repeats):
                X_permuted = X.copy()
                
                # 随机打乱当前组的所有因子
                for feat in valid_features:
                    X_permuted[feat] = np.random.permutation(X_permuted[feat].values)
                
                # 【性能优化】直接使用已训练模型预测，不重新训练
                y_pred_permuted = model.predict(X_permuted)
                
                # 生成仓位（根据损失函数类型决定）
                y_pred_series_perm = pd.Series(y_pred_permuted, index=X_permuted.index if isinstance(X_permuted, pd.DataFrame) else range(len(y_pred_permuted)))
                
                if is_direction_loss or is_opportunity_loss or is_metalabel_loss:
                    # 对于并行模型的损失函数，损失函数内部会自己处理 y_pred
                    # positions 参数不会被使用，生成一个占位符
                    positions_permuted = np.zeros(len(y_pred_series_perm))
                else:
                    # 对于其他模型，需要通过 generate_hft_positions 生成仓位
                    forward_returns_lag_perm = X_permuted['forward_returns_lag'] if 'forward_returns_lag' in X_permuted.columns else pd.Series(np.zeros(len(y)), index=y_pred_series_perm.index)
                    positions_permuted = generate_hft_positions(y_pred_series_perm, forward_returns_lag_perm).values
                
                # 计算损失
                permuted_loss = loss_func(y, y_pred_permuted, positions_permuted, market_excess, rf)
                
                # 检查 permuted_loss 是否为 nan 或 inf（不应该发生，如果有则报错）
                if np.isnan(permuted_loss) or np.isinf(permuted_loss):
                    raise ValueError(
                        f"组 {group_id} 第 {repeat+1} 次重复的损失计算异常 (permuted_loss={permuted_loss})。"
                        f"这不应该发生，请检查 positions_permuted 是否正确生成（无 nan）"
                    )
                
                # 损失增加 = 置换后损失 - 基线损失
                # 如果损失函数是负夏普率（越小越好）：
                #   基准：Sharpe=2.0 -> Loss=-2.0
                #   置换后：Sharpe=1.0 -> Loss=-1.0
                #   loss_increase = -1.0 - (-2.0) = 1.0 > 0，表示损失增加，因子重要
                loss_increase = permuted_loss - baseline_loss
                
                # 检查 loss_increase 是否为 nan 或 inf（不应该发生）
                if np.isnan(loss_increase) or np.isinf(loss_increase):
                    raise ValueError(
                        f"组 {group_id} 第 {repeat+1} 次重复的损失增加异常 (loss_increase={loss_increase})。"
                        f"baseline_loss={baseline_loss}, permuted_loss={permuted_loss}"
                    )
                
                loss_scores.append(loss_increase)
            
            # 平均损失增加
            if len(loss_scores) == 0:
                raise ValueError(f"组 {group_id} 的所有损失计算都失败，loss_scores 为空")
            
            avg_loss_increase = np.mean(loss_scores)
            
            # 最终检查（不应该发生）
            if np.isnan(avg_loss_increase) or np.isinf(avg_loss_increase):
                raise ValueError(
                    f"组 {group_id} 的平均损失增加异常 (avg_loss_increase={avg_loss_increase})。"
                    f"loss_scores={loss_scores[:5]}... (显示前5个)"
                )
            
            group_loss[group_id] = avg_loss_increase
        
        if self.verbose >= 1:
            print(f"  组损失计算完成")
            sorted_groups = sorted(group_loss.items(), key=lambda x: x[1], reverse=True)
            print(f"  Top 5 重要组 (损失增加越大越重要):")
            for group_id, loss_inc in sorted_groups[:5]:
                n_features = len(feature_groups[group_id])
                print(f"    组 {group_id} ({n_features}个因子): 损失增加 = {loss_inc:.6f}")
        
        return group_loss
    
    def _select_features_from_groups(self, feature_groups, group_loss, X, y, trained_model=None):
        """
        从重要组中选择因子
        
        改进：使用全局模型的特征重要性，而不是单独训练组内模型
        理由：有些因子是"辅助因子"，单独训练时重要性低，但在全量模型中配合其他因子非常有效
        """
        if self.verbose >= 1:
            print("  从重要组中选择因子（使用全局模型特征重要性）...")
        
        # 确定损失阈值
        if self.loss_threshold is None:
            # 自动选择：保留损失增加大于中位数的组
            loss_values = list(group_loss.values())
            if len(loss_values) > 0:
                # 检查是否有 nan 或 inf（不应该发生）
                invalid_values = [v for v in loss_values if np.isnan(v) or np.isinf(v)]
                if len(invalid_values) > 0:
                    raise ValueError(
                        f"发现 {len(invalid_values)} 个无效的损失值（nan 或 inf）。"
                        f"这不应该发生，请检查组损失计算流程。"
                        f"无效值示例: {invalid_values[:5]}"
                    )
                threshold = np.median(loss_values)
            else:
                threshold = 0.0
        else:
            threshold = self.loss_threshold
        
        # 筛选重要组（损失增加大于阈值的组）
        important_groups = {
            group_id: features 
            for group_id, features in feature_groups.items()
            if group_loss[group_id] > threshold
        }
        
        if self.verbose >= 1:
            print(f"  筛选出 {len(important_groups)} 个重要组 (阈值: {threshold:.6f})")
        
        # 如果提供了已训练的模型，使用全局模型的特征重要性
        if trained_model is not None:
            # 获取全局模型的特征重要性
            if hasattr(trained_model, 'get_feature_importance'):
                global_feat_importance = pd.Series(
                    trained_model.get_feature_importance(),
                    index=X.columns
                )
            elif hasattr(trained_model, 'feature_importances_'):
                global_feat_importance = pd.Series(
                    trained_model.feature_importances_,
                    index=X.columns
                )
            else:
                global_feat_importance = None
        else:
            # 如果没有提供已训练模型，训练一个全局模型获取特征重要性
            if self.verbose >= 1:
                print("  训练全局模型以获取特征重要性...")
            global_model = clone(self.base_model)
            global_model.fit(X, y)
            
            if hasattr(global_model, 'get_feature_importance'):
                global_feat_importance = pd.Series(
                    global_model.get_feature_importance(),
                    index=X.columns
                )
            elif hasattr(global_model, 'feature_importances_'):
                global_feat_importance = pd.Series(
                    global_model.feature_importances_,
                    index=X.columns
                )
            else:
                global_feat_importance = None
        
        # 从重要组中选择因子
        selected_features = []
        total_before_selection = 0
        total_after_selection = 0
        
        for group_id, features in important_groups.items():
            total_before_selection += len(features)
            
            if global_feat_importance is not None:
                # 只使用在 X.columns 和 global_feat_importance 中都存在的特征
                valid_features = [
                    f for f in features 
                    if f in X.columns and f in global_feat_importance.index
                ]
                
                if len(valid_features) > 0:
                    # 使用全局模型的特征重要性选择组内因子
                    group_feat_importance = global_feat_importance[valid_features]
                    
                    # 基于实际可用特征数量计算选择数量
                    n_select = max(1, int(len(valid_features) * self.top_k_per_group))
                    
                    # 选择 top_k 个最重要的因子
                    top_features = group_feat_importance.nlargest(n_select).index.tolist()
                    selected_features.extend(top_features)
                    total_after_selection += len(top_features)
                    
                    if self.verbose >= 1:
                        print(f"    组 {group_id}: {len(valid_features)} 个有效因子 -> 选择 {len(top_features)} 个 (top_k={self.top_k_per_group:.2f})")
                else:
                    if self.verbose >= 1:
                        print(f"    组 {group_id}: 没有有效特征，跳过")
            else:
                # 如果没有特征重要性，保留组内所有因子（但只保留在 X.columns 中的）
                valid_features = [f for f in features if f in X.columns]
                selected_features.extend(valid_features)
                total_after_selection += len(valid_features)
                if self.verbose >= 1:
                    print(f"    组 {group_id}: 无法获取特征重要性，保留所有 {len(valid_features)} 个有效因子")
        
        self.selected_features_ = list(set(selected_features))  # 去重
        
        if self.verbose >= 1:
            print(f"  组内因子选择统计:")
            print(f"    重要组总因子数: {total_before_selection}")
            print(f"    选择后因子数: {total_after_selection}")
            print(f"    去重后因子数: {len(self.selected_features_)}")
            print(f"    最终选择 {len(self.selected_features_)} 个因子 (top_k_per_group={self.top_k_per_group:.2f})")
        
        return self.selected_features_
    
    def fit(self, X, y, market_excess=None, rf=None):
        """
        拟合模型并选择因子
        
        参数:
        X: 特征数据
        y: 目标变量
        market_excess: 市场超额回报（用于损失函数计算）
        rf: 无风险利率（用于损失函数计算）
        """
        if market_excess is None:
            # 如果没有提供，尝试从X中获取
            if 'market_forward_excess_returns' in X.columns:
                market_excess = X['market_forward_excess_returns'].values
            else:
                market_excess = np.zeros(len(y))
        
        if rf is None:
            if 'risk_free_rate' in X.columns:
                rf = X['risk_free_rate'].values
            else:
                rf = np.zeros(len(y))
        
        # 聚类因子
        self.feature_groups_, self.feature_to_group_ = self._cluster_features(X)
        
        # 训练全局模型（只训练一次，用于计算组损失和特征重要性）
        trained_model = clone(self.base_model)
        trained_model.fit(X, y)
        
        # 计算组损失（使用已训练的模型，不重新训练）
        self.group_loss_ = self._compute_group_loss(
            trained_model, X, y, self.feature_groups_, market_excess, rf
        )
        
        # 选择因子（使用已训练的全局模型的特征重要性）
        self.selected_features_ = self._select_features_from_groups(
            self.feature_groups_, self.group_loss_, X, y, trained_model=trained_model
        )
        
        return self
    
    def transform(self, X):
        """返回选中的因子"""
        if self.selected_features_ is None:
            raise ValueError("模型尚未拟合，请先调用 fit()")
        
        # 只返回存在的特征
        available_features = [f for f in self.selected_features_ if f in X.columns]
        
        if isinstance(X, pd.DataFrame):
            # 确保返回的 DataFrame 保持原始索引
            return X[available_features].copy()
        else:
            # 如果是numpy数组，需要映射索引
            feature_indices = [X.columns.get_loc(f) for f in available_features if f in X.columns]
            return X[:, feature_indices]
    
    def fit_transform(self, X, y, market_excess=None, rf=None):
        """拟合并转换"""
        return self.fit(X, y, market_excess, rf).transform(X)

print("✓ 已定义 AutoFeatureSelectorWithLoss (使用损失函数选择因子)")


✓ 已定义 AutoFeatureSelectorWithLoss (使用损失函数选择因子)


In [90]:
# ============================================================
# 并行模型架构（带特征选择）
# ============================================================
# 1. OpportunityRegressor: 预测机会大小 |HFT position|，使用 MSE
# 2. DirectionClassifier: 预测方向 {0=空, 1=多}，使用 Logloss
# 3. 两个模型各自独立做特征选择（AutoFeatureSelectorWithLoss）
# 4. 最终仓位 = 方向(±1) × 机会
# ============================================================

# ============================================================
# 1. 机会模型的损失函数（用于特征选择）
# ============================================================

def opportunity_loss_function(y_true, y_pred, positions, market_excess, rf):
    """
    机会模型的损失函数：基于预测机会大小的 Adjusted Sharpe Ratio
    """
    if isinstance(y_true, pd.Series):
        true_direction = np.sign(y_true.values)
    else:
        true_direction = np.sign(y_true)
    
    if isinstance(y_pred, pd.Series):
        opportunity_pred = y_pred.values
    else:
        opportunity_pred = np.array(y_pred)
    
    test_positions = np.abs(opportunity_pred)
    test_positions = np.clip(test_positions, 0, 2)
    final_positions = np.where(true_direction >= 0, test_positions, 0)
    
    sharpe = ad_sharpe_ratio_scorer(final_positions, market_excess, rf)
    return -sharpe


def direction_loss_function(y_true, y_pred, positions, market_excess, rf):
    """
    方向模型的损失函数：基于方向预测准确率的 Adjusted Sharpe Ratio
    """
    if isinstance(y_true, pd.Series):
        y_true_values = y_true.values
    else:
        y_true_values = np.array(y_true)
    
    if isinstance(y_pred, pd.Series):
        y_pred_values = y_pred.values
    else:
        y_pred_values = np.array(y_pred)
    
    if np.max(y_pred_values) <= 1 and np.min(y_pred_values) >= 0:
        direction_pred = np.where(y_pred_values > 0.5, 1, -1)
    else:
        direction_pred = np.sign(y_pred_values)
    
    test_positions = np.where(direction_pred > 0, 1.0, 0.0)
    sharpe = ad_sharpe_ratio_scorer(test_positions, market_excess, rf)
    return -sharpe


print("✓ 已定义 opportunity_loss_function（机会模型特征选择损失函数）")
print("✓ 已定义 direction_loss_function（方向模型特征选择损失函数）")


# ============================================================
# 2. OpportunityPipeline - 预测机会大小（回归）
# ============================================================

class OpportunityPipeline(BaseEstimator, RegressorMixin):
    """
    机会模型 Pipeline：预测 HFT 理想仓位的绝对值
    目标：|generate_hft_positions(market_excess_return, ...)|
    损失函数：MSE
    特征选择：AutoFeatureSelectorWithLoss (使用 opportunity_loss_function)
    """
    
    def __init__(
        self,
        feature_creator=None,
        feature_selector=None,
        feature_selector_params=None,
        model=None,
        verbose=1
    ):
        self.feature_creator = feature_creator
        self.feature_selector = feature_selector
        self.feature_selector_params = feature_selector_params
        self.model = model or CatBoostRegressor(
            iterations=800, learning_rate=0.004, depth=6,
            min_data_in_leaf=20, l2_leaf_reg=7.0, random_strength=5.5,
            colsample_bylevel=0.78, early_stopping_rounds=50,
            bootstrap_type='Bernoulli', subsample=0.85,
            loss_function='RMSE', verbose=0, random_seed=42
        )
        self.verbose = verbose
        self.is_fitted_ = False
        self.feature_pipeline_ = None
        self.feature_selector_ = None
        self.feature_names_ = None
    
    def _create_feature_pipeline(self):
        steps = []
        if self.feature_creator is not None:
            steps.append(('feature_creator', FunctionTransformer(self.feature_creator)))
        steps.append(('preprocessor', PandasPreprocessor(impute_strategy='median')))
        return Pipeline(steps)
    
    def _get_or_create_feature_selector(self):
        if self.feature_selector is not None:
            return clone(self.feature_selector)
        if self.feature_selector_params is not None:
            selector_base_model = CatBoostRegressor(
                iterations=800, learning_rate=0.004, depth=6, verbose=0, random_seed=42
            )
            return AutoFeatureSelectorWithLoss(
                base_model=selector_base_model,
                loss_function=opportunity_loss_function,
                **self.feature_selector_params
            )
        return None
    
    def fit(self, X, y, market_excess=None, rf=None, sample_weight=None):
        if self.verbose >= 1:
            print("="*70)
            print("OpportunityPipeline - 开始拟合（机会模型，MSE）")
            print("="*70)
        
        y_series = pd.Series(y, index=X.index) if not isinstance(y, pd.Series) else y
        if market_excess is None:
            market_excess = y_series
        if rf is None:
            rf = X['risk_free_rate_lag'].fillna(0) if 'risk_free_rate_lag' in X.columns else pd.Series(0, index=X.index)
        
        if 'forward_returns_lag' in X.columns:
            forward_returns_lag = X['forward_returns_lag']
        else:
            forward_returns_lag = y_series.shift(1).fillna(0)
        
        ideal_positions = generate_hft_positions(y_series, forward_returns_lag, span_N=60, sensitivity_k=1, allow_short=True)
        opportunity_target = np.abs(ideal_positions)
        
        if self.verbose >= 1:
            print(f"机会目标统计: min={opportunity_target.min():.4f}, max={opportunity_target.max():.4f}, mean={opportunity_target.mean():.4f}")
            print("\n[步骤1/3] 特征工程...")
        
        self.feature_pipeline_ = self._create_feature_pipeline()
        X_transformed = self.feature_pipeline_.fit_transform(X, opportunity_target)
        if isinstance(X_transformed, pd.DataFrame):
            X_transformed.index = X.index
        
        if self.verbose >= 1:
            print(f"特征工程后特征数量: {X_transformed.shape[1]}")
        
        self.feature_selector_ = self._get_or_create_feature_selector()
        if self.feature_selector_ is not None:
            if self.verbose >= 1:
                print("\n[步骤2/3] 特征选择（使用 opportunity_loss_function）...")
            me_aligned = market_excess.loc[X_transformed.index] if isinstance(market_excess, pd.Series) else market_excess
            rf_aligned = rf.loc[X_transformed.index] if isinstance(rf, pd.Series) else rf
            X_transformed = self.feature_selector_.fit_transform(X_transformed, opportunity_target, market_excess=me_aligned, rf=rf_aligned)
            if self.verbose >= 1:
                print(f"特征选择后特征数量: {X_transformed.shape[1]}")
        else:
            if self.verbose >= 1:
                print("\n[步骤2/3] 跳过特征选择（未配置）")
        
        self.feature_names_ = X_transformed.columns.tolist() if isinstance(X_transformed, pd.DataFrame) else None
        
        if self.verbose >= 1:
            print("\n[步骤3/3] 训练机会模型（MSE）...")
        
        from sklearn.model_selection import train_test_split
        X_train, X_val, y_train, y_val = train_test_split(X_transformed, opportunity_target, test_size=0.15, shuffle=False)
        if sample_weight is not None:
            sw_train = sample_weight[:len(X_train)] if not isinstance(sample_weight, pd.Series) else sample_weight.iloc[:len(X_train)]
            self.model.fit(X_train, y_train, eval_set=(X_val, y_val), sample_weight=sw_train)
        else:
            self.model.fit(X_train, y_train, eval_set=(X_val, y_val))
        
        self.is_fitted_ = True
        if self.verbose >= 1:
            print("OpportunityPipeline - 拟合完成")
        return self
    
    def predict(self, X):
        if not self.is_fitted_:
            raise ValueError("模型尚未拟合，请先调用 fit()")
        original_index = X.index if hasattr(X, 'index') else None
        
        X_transformed = self.feature_pipeline_.transform(X)
        if isinstance(X_transformed, pd.DataFrame):
            X_transformed.index = X.index
        if self.feature_selector_ is not None:
            X_transformed = self.feature_selector_.transform(X_transformed)
        predictions = self.model.predict(X_transformed)
        predictions = np.clip(predictions, 0, 2)
        if original_index is not None:
            predictions = pd.Series(predictions, index=original_index, name='opportunity_prediction')
        return predictions


# ============================================================
# 3. DirectionClassifierPipeline - 预测方向（二分类）
# ============================================================

class DirectionClassifierPipeline(BaseEstimator, ClassifierMixin):
    """
    方向分类模型 Pipeline：预测市场方向 {0=空, 1=多}
    目标：sign(market_excess_return) > 0 ? 1 : 0
    损失函数：Logloss (Cross Entropy)
    特征选择：AutoFeatureSelectorWithLoss (使用 direction_loss_function)
    """
    
    def __init__(self, feature_creator=None, feature_selector=None, feature_selector_params=None, model=None, verbose=1):
        self.feature_creator = feature_creator
        self.feature_selector = feature_selector
        self.feature_selector_params = feature_selector_params
        self.model = model or CatBoostClassifier(
            iterations=800, learning_rate=0.004, depth=6,
            min_data_in_leaf=20, l2_leaf_reg=7.0, random_strength=5.5,
            colsample_bylevel=0.78, early_stopping_rounds=50,
            bootstrap_type='Bernoulli', subsample=0.85,
            loss_function='Logloss', verbose=0, random_seed=42
        )
        self.verbose = verbose
        self.is_fitted_ = False
        self.feature_pipeline_ = None
        self.feature_selector_ = None
        self.feature_names_ = None
    
    def _create_feature_pipeline(self):
        steps = []
        if self.feature_creator is not None:
            steps.append(('feature_creator', FunctionTransformer(self.feature_creator)))
        steps.append(('preprocessor', PandasPreprocessor(impute_strategy='median')))
        return Pipeline(steps)
    
    def _get_or_create_feature_selector(self):
        if self.feature_selector is not None:
            return clone(self.feature_selector)
        if self.feature_selector_params is not None:
            selector_base_model = CatBoostClassifier(iterations=800, learning_rate=0.004, depth=6, verbose=0, random_seed=42)
            return AutoFeatureSelectorWithLoss(
                base_model=selector_base_model,
                loss_function=direction_loss_function,
                **self.feature_selector_params
            )
        return None
    
    def fit(self, X, y, market_excess=None, rf=None, sample_weight=None):
        if self.verbose >= 1:
            print("="*70)
            print("DirectionClassifierPipeline - 开始拟合（方向分类，Logloss）")
            print("="*70)
        
        y_series = pd.Series(y, index=X.index) if not isinstance(y, pd.Series) else y
        if market_excess is None:
            market_excess = y_series
        if rf is None:
            rf = X['risk_free_rate_lag'].fillna(0) if 'risk_free_rate_lag' in X.columns else pd.Series(0, index=X.index)
        
        direction_labels = (y_series > 0).astype(int)
        
        if self.verbose >= 1:
            n_long = (direction_labels == 1).sum()
            n_short = (direction_labels == 0).sum()
            print(f"方向标签分布: 多(1)={n_long} ({n_long/len(direction_labels):.2%}), 空(0)={n_short} ({n_short/len(direction_labels):.2%})")
            print("\n[步骤1/3] 特征工程...")
        
        self.feature_pipeline_ = self._create_feature_pipeline()
        X_transformed = self.feature_pipeline_.fit_transform(X, direction_labels)
        if isinstance(X_transformed, pd.DataFrame):
            X_transformed.index = X.index
        
        if self.verbose >= 1:
            print(f"特征工程后特征数量: {X_transformed.shape[1]}")
        
        self.feature_selector_ = self._get_or_create_feature_selector()
        if self.feature_selector_ is not None:
            if self.verbose >= 1:
                print("\n[步骤2/3] 特征选择（使用 direction_loss_function）...")
            me_aligned = market_excess.loc[X_transformed.index] if isinstance(market_excess, pd.Series) else market_excess
            rf_aligned = rf.loc[X_transformed.index] if isinstance(rf, pd.Series) else rf
            X_transformed = self.feature_selector_.fit_transform(X_transformed, direction_labels, market_excess=me_aligned, rf=rf_aligned)
            if self.verbose >= 1:
                print(f"特征选择后特征数量: {X_transformed.shape[1]}")
        else:
            if self.verbose >= 1:
                print("\n[步骤2/3] 跳过特征选择（未配置）")
        
        self.feature_names_ = X_transformed.columns.tolist() if isinstance(X_transformed, pd.DataFrame) else None
        
        if self.verbose >= 1:
            print("\n[步骤3/3] 训练方向分类模型（Logloss）...")
        
        from sklearn.model_selection import train_test_split
        X_train, X_val, y_train, y_val = train_test_split(X_transformed, direction_labels, test_size=0.15, shuffle=False)
        if sample_weight is not None:
            sw_train = sample_weight[:len(X_train)] if not isinstance(sample_weight, pd.Series) else sample_weight.iloc[:len(X_train)]
            self.model.fit(X_train, y_train, eval_set=(X_val, y_val), sample_weight=sw_train)
        else:
            self.model.fit(X_train, y_train, eval_set=(X_val, y_val))
        
        self.is_fitted_ = True
        if self.verbose >= 1:
            print("DirectionClassifierPipeline - 拟合完成")
        return self
    
    def predict_proba(self, X):
        if not self.is_fitted_:
            raise ValueError("模型尚未拟合，请先调用 fit()")
        original_index = X.index if hasattr(X, 'index') else None
        X_transformed = self.feature_pipeline_.transform(X)
        if isinstance(X_transformed, pd.DataFrame):
            X_transformed.index = X.index
        if self.feature_selector_ is not None:
            X_transformed = self.feature_selector_.transform(X_transformed)
        proba = self.model.predict_proba(X_transformed)
        return proba
    
    def predict(self, X):
        proba = self.predict_proba(X)
        direction = np.where(proba[:, 1] > 0.5, 1, -1)
        original_index = X.index if hasattr(X, 'index') else None
        if original_index is not None:
            direction = pd.Series(direction, index=original_index, name='direction_prediction')
        return direction


# ============================================================
# 4. ParallelEnsemblePipeline - 并行集成模型
# ============================================================

class ParallelEnsemblePipeline(BaseEstimator, RegressorMixin):
    """
    并行集成 Pipeline：封装机会模型和方向模型，生成最终仓位
    最终仓位 = 方向(±1) × 机会([0, 2])
    """
    
    def __init__(self, opportunity_pipeline=None, direction_pipeline=None, opportunity_model=None,
                 direction_model=None, feature_creator=None, feature_selector_params=None, verbose=1):
        self.feature_creator = feature_creator
        self.feature_selector_params = feature_selector_params
        self.opportunity_model = opportunity_model
        self.direction_model = direction_model
        self.verbose = verbose
        
        if opportunity_pipeline is None:
            self.opportunity_pipeline = OpportunityPipeline(
                feature_creator=feature_creator,
                feature_selector_params=feature_selector_params,
                model=opportunity_model,
                verbose=verbose
            )
        else:
            self.opportunity_pipeline = opportunity_pipeline
        
        if direction_pipeline is None:
            self.direction_pipeline = DirectionClassifierPipeline(
                feature_creator=feature_creator,
                feature_selector_params=feature_selector_params,
                model=direction_model,
                verbose=verbose
            )
        else:
            self.direction_pipeline = direction_pipeline
        
        self.is_fitted_ = False
    
    def fit(self, X, y, market_excess=None, rf=None, sample_weight=None):
        if self.verbose >= 1:
            print("="*70)
            print("ParallelEnsemblePipeline - 开始拟合（并行双模型）")
            print("="*70)
            print(f"样本数: {len(X)}")
        
        if market_excess is None:
            market_excess = y
        if rf is None:
            rf = X['risk_free_rate_lag'].fillna(0) if 'risk_free_rate_lag' in X.columns else pd.Series(0, index=X.index)
        
        if self.verbose >= 1:
            print("\n" + "="*70)
            print("[模型1/2] 训练机会模型（Regressor，MSE）")
            print("="*70)
        self.opportunity_pipeline.fit(X, y, market_excess=market_excess, rf=rf, sample_weight=sample_weight)
        
        if self.verbose >= 1:
            print("\n" + "="*70)
            print("[模型2/2] 训练方向模型（Classifier，Logloss）")
            print("="*70)
        self.direction_pipeline.fit(X, y, market_excess=market_excess, rf=rf, sample_weight=sample_weight)
        
        self.is_fitted_ = True
        if self.verbose >= 1:
            print("\n" + "="*70)
            print("ParallelEnsemblePipeline - 拟合完成")
            print("="*70)
        return self
    
    def predict(self, X, allow_short=False):
        if not self.is_fitted_:
            raise ValueError("模型尚未拟合，请先调用 fit()")
        original_index = X.index if hasattr(X, 'index') else None
        opportunity = self.opportunity_pipeline.predict(X)
        direction = self.direction_pipeline.predict(X)
        if isinstance(opportunity, pd.Series) and isinstance(direction, pd.Series):
            final_position = direction * opportunity
        else:
            final_position = np.array(direction) * np.array(opportunity)
        if not allow_short:
            if isinstance(final_position, pd.Series):
                final_position = final_position.clip(lower=0)
            else:
                final_position = np.clip(final_position, 0, 2)
        if original_index is not None and not isinstance(final_position, pd.Series):
            final_position = pd.Series(final_position, index=original_index, name='final_position')
        return final_position
    
    def predict_components(self, X):
        if not self.is_fitted_:
            raise ValueError("模型尚未拟合，请先调用 fit()")
        return {
            'opportunity': self.opportunity_pipeline.predict(X),
            'direction': self.direction_pipeline.predict(X),
            'direction_proba': self.direction_pipeline.predict_proba(X)
        }


# ============================================================
# 5. move_forward_two_way_test - 两段滑动窗口测试
# ============================================================

def move_forward_two_way_test(X, y, pipeline, train_size=3000, test_size=63, start_index=0,
                               expanding_window=False, market_excess=None, rf=None, allow_short=False, verbose=1):
    """两段滑动窗口测试 - 适用于并行模型，返回详细组件预测"""
    from sklearn.base import clone
    n_samples = len(X)
    train_start = start_index
    train_end = start_index + train_size
    test_end = train_end + test_size
    
    min_required = train_size + test_size + start_index
    if min_required > n_samples:
        raise ValueError(f"数据量不足！需要至少 {min_required} 个样本，但只有 {n_samples} 个。")
    
    # 主预测结果
    oof_predictions = pd.Series(np.nan, index=y.index, dtype=float, name="prediction")
    # 组件预测结果
    oof_direction = pd.Series(np.nan, index=y.index, dtype=float, name="direction")
    oof_opportunity = pd.Series(np.nan, index=y.index, dtype=float, name="opportunity")
    oof_direction_proba = pd.Series(np.nan, index=y.index, dtype=float, name="direction_proba")
    
    round_stats = []
    round_num = 0
    
    if verbose >= 1:
        print("="*80)
        print("两段滑动窗口测试 (Move Forward Two-Way Test)")
        print("="*80)
        print(f"总样本数: {n_samples}, 训练窗口: {train_size}, 测试窗口: {test_size}")
        print(f"窗口模式: {'扩展窗口' if expanding_window else '滑动窗口'}")
        remaining_after_first = n_samples - (start_index + train_size + test_size)
        estimated_rounds = 1 + max(0, remaining_after_first // test_size)
        print(f"预计轮数: ~{estimated_rounds}")
        print("="*80)
    
    while test_end <= n_samples:
        round_num += 1
        if verbose >= 1:
            print(f"\n{'='*80}")
            print(f"第 {round_num} 轮")
            print(f"{'='*80}")
            print(f"  训练段: [{train_start}:{train_end}) = {train_end - train_start} 样本")
            print(f"  测试段: [{train_end}:{test_end}) = {test_end - train_end} 样本")
        
        X_train = X.iloc[train_start:train_end].copy()
        y_train = y.iloc[train_start:train_end].copy()
        X_test = X.iloc[train_end:test_end].copy()
        y_test = y.iloc[train_end:test_end].copy()
        
        me_train = market_excess.iloc[train_start:train_end] if isinstance(market_excess, pd.Series) else None
        rf_train = rf.iloc[train_start:train_end] if isinstance(rf, pd.Series) else None
        
        if verbose >= 1:
            print(f"\n  [步骤1/2] 并行训练模型...")
        pipeline_clone = clone(pipeline)
        pipeline_clone.fit(X_train, y_train, market_excess=me_train, rf=rf_train)
        
        if verbose >= 1:
            print(f"\n  [步骤2/2] 在测试段上预测...")
        test_predictions = pipeline_clone.predict(X_test, allow_short=allow_short)
        
        # 获取组件预测
        components = pipeline_clone.predict_components(X_test)
        
        if isinstance(test_predictions, pd.Series):
            oof_predictions.loc[X_test.index] = test_predictions.values
        else:
            oof_predictions.loc[X_test.index] = test_predictions
        
        # 存储组件预测
        direction_vals = components['direction'].values if isinstance(components['direction'], pd.Series) else components['direction']
        opportunity_vals = components['opportunity'].values if isinstance(components['opportunity'], pd.Series) else components['opportunity']
        direction_proba_vals = components['direction_proba'][:, 1] if components['direction_proba'].ndim > 1 else components['direction_proba']
        
        oof_direction.loc[X_test.index] = direction_vals
        oof_opportunity.loc[X_test.index] = opportunity_vals
        oof_direction_proba.loc[X_test.index] = direction_proba_vals
        
        round_stats.append({
            'round': round_num, 
            'train_range': (train_start, train_end), 
            'test_range': (train_end, test_end),
            'test_indices': X_test.index.tolist()
        })
        
        if verbose >= 1:
            pred_mean = test_predictions.mean() if hasattr(test_predictions, 'mean') else np.mean(test_predictions)
            print(f"\n  预测均值: {pred_mean:.4f}, 真实均值: {y_test.mean():.4f}")
        
        if expanding_window:
            train_end += test_size
        else:
            train_start += test_size
            train_end += test_size
        test_end += test_size
    
    valid_predictions = oof_predictions.dropna()
    if verbose >= 1:
        print("\n" + "="*80)
        print("两段滑动窗口测试完成！")
        print("="*80)
        print(f"总轮数: {round_num}, 有效预测数: {len(valid_predictions)}")
        if len(valid_predictions) > 0:
            y_test_all = y.loc[valid_predictions.index]
            corr = np.corrcoef(valid_predictions.values, y_test_all.values)[0, 1]
            print(f"整体IC: {corr:.4f}")
            if market_excess is not None and rf is not None:
                me_test = market_excess.loc[valid_predictions.index]
                rf_test = rf.loc[valid_predictions.index]
                sharpe = ad_sharpe_ratio_scorer(valid_predictions.values, me_test, rf_test)
                print(f"整体夏普率: {sharpe:.4f}")
        print("="*80)
    
    # 返回包含所有信息的结果字典
    result = {
        'predictions': oof_predictions,
        'direction': oof_direction,
        'opportunity': oof_opportunity,
        'direction_proba': oof_direction_proba,
        'round_stats': round_stats
    }
    return result


print("✓ 已定义 OpportunityPipeline（机会模型，MSE，带特征选择）")
print("✓ 已定义 DirectionClassifierPipeline（方向分类模型，Logloss，带特征选择）")
print("✓ 已定义 ParallelEnsemblePipeline（并行集成模型）")
print("✓ 已定义 move_forward_two_way_test（两段滑动窗口测试）")




✓ 已定义 opportunity_loss_function（机会模型特征选择损失函数）
✓ 已定义 direction_loss_function（方向模型特征选择损失函数）
✓ 已定义 OpportunityPipeline（机会模型，MSE，带特征选择）
✓ 已定义 DirectionClassifierPipeline（方向分类模型，Logloss，带特征选择）
✓ 已定义 ParallelEnsemblePipeline（并行集成模型）
✓ 已定义 move_forward_two_way_test（两段滑动窗口测试）


In [91]:
# ============================================================
# 5. IC PSR CSR 测试
# ============================================================

def ic_psr_csr_test(
    y_pred, 
    y_true, 
    positions=None,
    market_excess=None,
    rf=None,
    window_size=30,
    n_trials=10
):
    """
    综合测试：IC (信息系数), PSR (概率夏普率), CSR (条件夏普率)
    
    参数:
    y_pred: 预测值
    y_true: 真实值
    positions: 仓位序列（可选）
    market_excess: 市场超额回报（可选）
    rf: 无风险利率（可选）
    window_size: 滚动窗口大小
    n_trials: 用于DSR计算的试验次数
    """
    print("="*70)
    print("IC PSR CSR 综合测试")
    print("="*70)
    
    y_pred = np.asarray(y_pred)
    y_true = np.asarray(y_true)
    
    # ============================================================
    # 1. IC (信息系数) 分析
    # ============================================================
    print("\n" + "="*70)
    print("1. 信息系数 (IC) 分析")
    print("="*70)
    
    # 整体IC
    ic_pearson = np.corrcoef(y_pred, y_true)[0, 1]
    ic_spearman = stats.spearmanr(y_pred, y_true)[0]
    
    # IC显著性检验
    n = len(y_pred)
    ic_t_stat = ic_pearson * np.sqrt(n - 2) / np.sqrt(1 - ic_pearson**2 + 1e-8)
    ic_p_value = 2 * (1 - stats.t.cdf(abs(ic_t_stat), n - 2))
    
    print(f"整体 IC (Pearson):  {ic_pearson:.4f}")
    print(f"整体 IC (Spearman): {ic_spearman:.4f}")
    print(f"IC t-statistic: {ic_t_stat:.4f}")
    print(f"IC p-value: {ic_p_value:.6f}")
    
    if ic_p_value < 0.05:
        print("✓ IC 在 5% 水平上显著（模型具备预测能力）")
    else:
        print("✗ IC 不显著")
    
    # 滚动IC
    rolling_ic = []
    for i in range(window_size, len(y_pred)):
        window_pred = y_pred[i-window_size:i]
        window_true = y_true[i-window_size:i]
        ic_window = np.corrcoef(window_pred, window_true)[0, 1]
        rolling_ic.append(ic_window)
    
    rolling_ic = np.array(rolling_ic)
    ic_mean = np.nanmean(rolling_ic)
    ic_std = np.nanstd(rolling_ic)
    ic_ir = ic_mean / (ic_std + 1e-8)  # IC比率
    
    print(f"\n--- 滚动IC分析（{window_size}天窗口）---")
    print(f"滚动IC均值: {ic_mean:.4f}")
    print(f"滚动IC标准差: {ic_std:.4f}")
    print(f"滚动IC > 0 的比例: {(rolling_ic > 0).mean():.2%}")
    print(f"IC比率 (IC_IR): {ic_ir:.4f}")
    if ic_ir > 0.5:
        print("  ✓ IC稳定性良好（IC_IR > 0.5）")
    else:
        print("  ⚠ IC稳定性一般")
    
    # ============================================================
    # 2. PSR (概率夏普率) 分析
    # ============================================================
    print("\n" + "="*70)
    print("2. PSR (概率夏普率) 分析")
    print("="*70)
    
    if positions is not None and market_excess is not None and rf is not None:
        # 计算策略收益
        pos = np.asarray(positions)
        y_excess = np.asarray(market_excess)
        rf_arr = np.asarray(rf)
        
        y_total = y_excess + rf_arr
        strategy_returns = rf_arr * (1 - pos) + pos * y_total
        strategy_excess_returns = strategy_returns - rf_arr
        
        # 计算夏普率
        mean_ret = np.mean(strategy_excess_returns)
        std_ret = np.std(strategy_excess_returns)
        sharpe_ratio = mean_ret / (std_ret + 1e-8) * np.sqrt(252)
        
        # 计算PSR (Probability Sharpe Ratio)
        # PSR = P(SR > 0)
        n = len(strategy_excess_returns)
        skewness = stats.skew(strategy_excess_returns)
        excess_kurtosis = stats.kurtosis(strategy_excess_returns)
        kurtosis = excess_kurtosis + 3
        
        # SR的标准差
        var_sr = (1 + (kurtosis - 1) / 4 * sharpe_ratio**2 / 252 - skewness * sharpe_ratio / np.sqrt(252)) / (n - 1)
        std_sr = np.sqrt(var_sr) if var_sr > 0 else 1e-8
        
        # PSR = P(SR > 0) = 1 - CDF(0)
        z_score = (sharpe_ratio / np.sqrt(252) - 0) / std_sr
        psr = 1 - stats.norm.cdf(-z_score)
        
        print(f"传统夏普比率 (年化): {sharpe_ratio:.4f}")
        print(f"PSR (P(SR > 0)): {psr:.4f} ({psr*100:.2f}%)")
        
        if psr > 0.90:
            print("  ✓ PSR > 0.90: 策略夏普比率显著大于0")
        elif psr > 0.75:
            print("  ⚠ PSR > 0.75: 策略夏普比率可能大于0")
        else:
            print("  ✗ PSR < 0.75: 策略夏普比率可能不显著")
        
        # ============================================================
        # 3. DSR (Deflated Sharpe Ratio) 分析
        # ============================================================
        print("\n" + "="*70)
        print("3. DSR (Deflated Sharpe Ratio) 分析")
        print("="*70)
        
        # 计算预期最大SR
        gamma = 0.5772156649  # 欧拉常数
        if n_trials > 1:
            z_n = stats.norm.ppf(1 - 1.0 / n_trials)
            z_ne = stats.norm.ppf(1 - 1.0 / (n_trials * np.e))
            expected_max_sr = std_sr * ((1 - gamma) * z_n + gamma * z_ne) * np.sqrt(252)
        else:
            expected_max_sr = 0
        
        # DSR概率
        z_score_dsr = (sharpe_ratio - expected_max_sr) / (std_sr * np.sqrt(252))
        dsr_prob = stats.norm.cdf(z_score_dsr)
        dsr_pvalue = 1 - dsr_prob
        
        print(f"设定尝试次数 (N): {n_trials} 次")
        print(f"实际夏普比率 (年化): {sharpe_ratio:.4f}")
        print(f"运气能产生的最大SR (年化): {expected_max_sr:.4f} (阈值)")
        print(f"DSR 概率值: {dsr_prob:.4f} ({dsr_prob*100:.2f}%)")
        print(f"DSR p-value: {dsr_pvalue:.6f}")
        
        if dsr_prob > 0.95:
            print("  ✅ 结果: 非常显著 (DSR > 95%)")
        elif dsr_prob > 0.90:
            print("  ☑️ 结果: 显著 (DSR > 90%)")
        elif dsr_prob > 0.50:
            print("  ⚠ 结果: 不显著 (DSR < 90%)")
        else:
            print("  ❌ 结果: 失败 (DSR < 50%)")
    else:
        print("⚠ 缺少仓位或市场数据，跳过PSR和DSR计算")
        sharpe_ratio = None
        psr = None
        dsr_prob = None
        dsr_pvalue = None
    
    # ============================================================
    # 4. 分层回测分析
    # ============================================================
    print("\n" + "="*70)
    print("4. 分层回测分析（Decile Analysis）")
    print("="*70)
    
    # 将预测值分为10个分位
    pred_quantiles = pd.qcut(y_pred, q=10, labels=False, duplicates='drop')
    
    decile_results = []
    for decile in range(10):
        mask = pred_quantiles == decile
        if mask.sum() > 0:
            decile_returns = y_true[mask]
            mean_return = decile_returns.mean()
            sharpe_decile = mean_return / (decile_returns.std() + 1e-8) * np.sqrt(252)
            decile_results.append({
                'Decile': decile + 1,
                'Mean_Return': mean_return,
                'Sharpe': sharpe_decile,
                'N_Samples': mask.sum()
            })
    
    decile_df = pd.DataFrame(decile_results)
    print(decile_df.to_string(index=False))
    
    # 计算收益差和单调性
    if len(decile_df) >= 2:
        spread = decile_df['Mean_Return'].iloc[-1] - decile_df['Mean_Return'].iloc[0]
        spread_annualized = spread * 252 * 100
        monotonicity = stats.spearmanr(decile_df['Decile'], decile_df['Mean_Return'])[0]
        
        print(f"\n收益差 (最高分位 - 最低分位): {spread:.6f} ({spread_annualized:.2f}% 年化)")
        print(f"单调性 (Spearman): {monotonicity:.4f}")
        
        if abs(monotonicity) > 0.7:
            print("  ✓ 预测信号单调性良好（|ρ| > 0.7）")
        else:
            print("  ⚠ 预测信号单调性一般")
    
    # 返回结果
    results = {
        'ic_pearson': ic_pearson,
        'ic_spearman': ic_spearman,
        'ic_p_value': ic_p_value,
        'ic_ir': ic_ir,
        'rolling_ic': rolling_ic,
        'sharpe_ratio': sharpe_ratio,
        'psr': psr,
        'dsr_prob': dsr_prob,
        'dsr_pvalue': dsr_pvalue,
        'decile_df': decile_df,
        'spread': spread if len(decile_df) >= 2 else None,
        'monotonicity': monotonicity if len(decile_df) >= 2 else None
    }
    
    return results

def ic_test_by_market_regime(
    y_pred, 
    y_true, 
    market_returns,
    threshold_bull=0.02,  # 牛市阈值（日收益率 > threshold_bull）
    threshold_bear=-0.02,  # 熊市阈值（日收益率 < threshold_bear）
    window_size=30
):
    """
    按市场状态（牛市/熊市/混合市场）分别测试 IC
    
    参数:
    y_pred: 预测值
    y_true: 真实值
    market_returns: 市场收益率序列（用于判断市场状态）
    threshold_bull: 牛市阈值（默认：日收益率 > 2%）
    threshold_bear: 熊市阈值（默认：日收益率 < -2%）
    window_size: 滚动窗口大小（用于计算滚动IC）
    
    返回:
    results: 包含各市场状态IC测试结果的字典
    """
    print("="*70)
    print("按市场状态分组 IC 测试")
    print("="*70)
    
    y_pred = np.asarray(y_pred)
    y_true = np.asarray(y_true)
    market_returns = np.asarray(market_returns)
    
    # 确保长度一致
    min_len = min(len(y_pred), len(y_true), len(market_returns))
    y_pred = y_pred[:min_len]
    y_true = y_true[:min_len]
    market_returns = market_returns[:min_len]
    
    # 判断市场状态
    bull_mask = market_returns > threshold_bull
    bear_mask = market_returns < threshold_bear
    mixed_mask = ~(bull_mask | bear_mask)
    
    results = {}
    
    print(f"\n市场状态划分标准:")
    print(f"  牛市: 市场收益率 > {threshold_bull:.2%}")
    print(f"  熊市: 市场收益率 < {threshold_bear:.2%}")
    print(f"  混合市场: {threshold_bear:.2%} <= 市场收益率 <= {threshold_bull:.2%}")
    
    # ============================================================
    # 1. 牛市 IC 测试
    # ============================================================
    print("\n" + "="*70)
    print("1. 牛市 IC 测试")
    print("="*70)
    
    if bull_mask.sum() > 10:  # 至少需要10个样本
        y_pred_bull = y_pred[bull_mask]
        y_true_bull = y_true[bull_mask]
        
        # 整体IC
        ic_pearson_bull = np.corrcoef(y_pred_bull, y_true_bull)[0, 1]
        ic_spearman_bull = stats.spearmanr(y_pred_bull, y_true_bull)[0]
        
        # IC显著性检验
        n_bull = len(y_pred_bull)
        ic_t_stat_bull = ic_pearson_bull * np.sqrt(n_bull - 2) / np.sqrt(1 - ic_pearson_bull**2 + 1e-8)
        ic_p_value_bull = 2 * (1 - stats.t.cdf(abs(ic_t_stat_bull), n_bull - 2))
        
        # 滚动IC
        rolling_ic_bull = []
        for i in range(window_size, len(y_pred_bull)):
            window_pred = y_pred_bull[i-window_size:i]
            window_true = y_true_bull[i-window_size:i]
            ic_window = np.corrcoef(window_pred, window_true)[0, 1]
            rolling_ic_bull.append(ic_window)
        
        rolling_ic_bull = np.array(rolling_ic_bull)
        ic_mean_bull = np.nanmean(rolling_ic_bull) if len(rolling_ic_bull) > 0 else np.nan
        ic_std_bull = np.nanstd(rolling_ic_bull) if len(rolling_ic_bull) > 0 else np.nan
        ic_ir_bull = ic_mean_bull / (ic_std_bull + 1e-8) if not np.isnan(ic_std_bull) else np.nan
        
        print(f"样本数: {n_bull}")
        print(f"整体 IC (Pearson):  {ic_pearson_bull:.4f}")
        print(f"整体 IC (Spearman): {ic_spearman_bull:.4f}")
        print(f"IC t-statistic: {ic_t_stat_bull:.4f}")
        print(f"IC p-value: {ic_p_value_bull:.6f}")
        if ic_p_value_bull < 0.05:
            print("  ✓ IC 在 5% 水平上显著")
        else:
            print("  ✗ IC 不显著")
        
        if len(rolling_ic_bull) > 0:
            print(f"\n滚动IC分析（{window_size}天窗口）:")
            print(f"  滚动IC均值: {ic_mean_bull:.4f}")
            print(f"  滚动IC标准差: {ic_std_bull:.4f}")
            print(f"  滚动IC > 0 的比例: {(rolling_ic_bull > 0).mean():.2%}")
            print(f"  IC比率 (IC_IR): {ic_ir_bull:.4f}")
        
        results['bull'] = {
            'ic_pearson': ic_pearson_bull,
            'ic_spearman': ic_spearman_bull,
            'ic_p_value': ic_p_value_bull,
            'ic_ir': ic_ir_bull,
            'rolling_ic': rolling_ic_bull,
            'n_samples': n_bull,
            'sample_ratio': n_bull / min_len
        }
    else:
        print(f"样本数不足（{bull_mask.sum()} 个），跳过牛市IC测试")
        results['bull'] = None
    
    # ============================================================
    # 2. 熊市 IC 测试
    # ============================================================
    print("\n" + "="*70)
    print("2. 熊市 IC 测试")
    print("="*70)
    
    if bear_mask.sum() > 10:  # 至少需要10个样本
        y_pred_bear = y_pred[bear_mask]
        y_true_bear = y_true[bear_mask]
        
        # 整体IC
        ic_pearson_bear = np.corrcoef(y_pred_bear, y_true_bear)[0, 1]
        ic_spearman_bear = stats.spearmanr(y_pred_bear, y_true_bear)[0]
        
        # IC显著性检验
        n_bear = len(y_pred_bear)
        ic_t_stat_bear = ic_pearson_bear * np.sqrt(n_bear - 2) / np.sqrt(1 - ic_pearson_bear**2 + 1e-8)
        ic_p_value_bear = 2 * (1 - stats.t.cdf(abs(ic_t_stat_bear), n_bear - 2))
        
        # 滚动IC
        rolling_ic_bear = []
        for i in range(window_size, len(y_pred_bear)):
            window_pred = y_pred_bear[i-window_size:i]
            window_true = y_true_bear[i-window_size:i]
            ic_window = np.corrcoef(window_pred, window_true)[0, 1]
            rolling_ic_bear.append(ic_window)
        
        rolling_ic_bear = np.array(rolling_ic_bear)
        ic_mean_bear = np.nanmean(rolling_ic_bear) if len(rolling_ic_bear) > 0 else np.nan
        ic_std_bear = np.nanstd(rolling_ic_bear) if len(rolling_ic_bear) > 0 else np.nan
        ic_ir_bear = ic_mean_bear / (ic_std_bear + 1e-8) if not np.isnan(ic_std_bear) else np.nan
        
        print(f"样本数: {n_bear}")
        print(f"整体 IC (Pearson):  {ic_pearson_bear:.4f}")
        print(f"整体 IC (Spearman): {ic_spearman_bear:.4f}")
        print(f"IC t-statistic: {ic_t_stat_bear:.4f}")
        print(f"IC p-value: {ic_p_value_bear:.6f}")
        if ic_p_value_bear < 0.05:
            print("  ✓ IC 在 5% 水平上显著")
        else:
            print("  ✗ IC 不显著")
        
        if len(rolling_ic_bear) > 0:
            print(f"\n滚动IC分析（{window_size}天窗口）:")
            print(f"  滚动IC均值: {ic_mean_bear:.4f}")
            print(f"  滚动IC标准差: {ic_std_bear:.4f}")
            print(f"  滚动IC > 0 的比例: {(rolling_ic_bear > 0).mean():.2%}")
            print(f"  IC比率 (IC_IR): {ic_ir_bear:.4f}")
        
        results['bear'] = {
            'ic_pearson': ic_pearson_bear,
            'ic_spearman': ic_spearman_bear,
            'ic_p_value': ic_p_value_bear,
            'ic_ir': ic_ir_bear,
            'rolling_ic': rolling_ic_bear,
            'n_samples': n_bear,
            'sample_ratio': n_bear / min_len
        }
    else:
        print(f"样本数不足（{bear_mask.sum()} 个），跳过熊市IC测试")
        results['bear'] = None
    
    # ============================================================
    # 3. 混合市场 IC 测试
    # ============================================================
    print("\n" + "="*70)
    print("3. 混合市场 IC 测试")
    print("="*70)
    
    if mixed_mask.sum() > 10:  # 至少需要10个样本
        y_pred_mixed = y_pred[mixed_mask]
        y_true_mixed = y_true[mixed_mask]
        
        # 整体IC
        ic_pearson_mixed = np.corrcoef(y_pred_mixed, y_true_mixed)[0, 1]
        ic_spearman_mixed = stats.spearmanr(y_pred_mixed, y_true_mixed)[0]
        
        # IC显著性检验
        n_mixed = len(y_pred_mixed)
        ic_t_stat_mixed = ic_pearson_mixed * np.sqrt(n_mixed - 2) / np.sqrt(1 - ic_pearson_mixed**2 + 1e-8)
        ic_p_value_mixed = 2 * (1 - stats.t.cdf(abs(ic_t_stat_mixed), n_mixed - 2))
        
        # 滚动IC
        rolling_ic_mixed = []
        for i in range(window_size, len(y_pred_mixed)):
            window_pred = y_pred_mixed[i-window_size:i]
            window_true = y_true_mixed[i-window_size:i]
            ic_window = np.corrcoef(window_pred, window_true)[0, 1]
            rolling_ic_mixed.append(ic_window)
        
        rolling_ic_mixed = np.array(rolling_ic_mixed)
        ic_mean_mixed = np.nanmean(rolling_ic_mixed) if len(rolling_ic_mixed) > 0 else np.nan
        ic_std_mixed = np.nanstd(rolling_ic_mixed) if len(rolling_ic_mixed) > 0 else np.nan
        ic_ir_mixed = ic_mean_mixed / (ic_std_mixed + 1e-8) if not np.isnan(ic_std_mixed) else np.nan
        
        print(f"样本数: {n_mixed}")
        print(f"整体 IC (Pearson):  {ic_pearson_mixed:.4f}")
        print(f"整体 IC (Spearman): {ic_spearman_mixed:.4f}")
        print(f"IC t-statistic: {ic_t_stat_mixed:.4f}")
        print(f"IC p-value: {ic_p_value_mixed:.6f}")
        if ic_p_value_mixed < 0.05:
            print("  ✓ IC 在 5% 水平上显著")
        else:
            print("  ✗ IC 不显著")
        
        if len(rolling_ic_mixed) > 0:
            print(f"\n滚动IC分析（{window_size}天窗口）:")
            print(f"  滚动IC均值: {ic_mean_mixed:.4f}")
            print(f"  滚动IC标准差: {ic_std_mixed:.4f}")
            print(f"  滚动IC > 0 的比例: {(rolling_ic_mixed > 0).mean():.2%}")
            print(f"  IC比率 (IC_IR): {ic_ir_mixed:.4f}")
        
        results['mixed'] = {
            'ic_pearson': ic_pearson_mixed,
            'ic_spearman': ic_spearman_mixed,
            'ic_p_value': ic_p_value_mixed,
            'ic_ir': ic_ir_mixed,
            'rolling_ic': rolling_ic_mixed,
            'n_samples': n_mixed,
            'sample_ratio': n_mixed / min_len
        }
    else:
        print(f"样本数不足（{mixed_mask.sum()} 个），跳过混合市场IC测试")
        results['mixed'] = None
    
    # ============================================================
    # 4. 汇总对比
    # ============================================================
    print("\n" + "="*70)
    print("4. 市场状态 IC 对比汇总")
    print("="*70)
    
    summary_data = []
    if results['bull'] is not None:
        summary_data.append({
            '市场状态': '牛市',
            'IC (Pearson)': results['bull']['ic_pearson'],
            'IC (Spearman)': results['bull']['ic_spearman'],
            'IC_IR': results['bull']['ic_ir'],
            '样本数': results['bull']['n_samples'],
            '样本占比': f"{results['bull']['sample_ratio']:.2%}"
        })
    if results['bear'] is not None:
        summary_data.append({
            '市场状态': '熊市',
            'IC (Pearson)': results['bear']['ic_pearson'],
            'IC (Spearman)': results['bear']['ic_spearman'],
            'IC_IR': results['bear']['ic_ir'],
            '样本数': results['bear']['n_samples'],
            '样本占比': f"{results['bear']['sample_ratio']:.2%}"
        })
    if results['mixed'] is not None:
        summary_data.append({
            '市场状态': '混合市场',
            'IC (Pearson)': results['mixed']['ic_pearson'],
            'IC (Spearman)': results['mixed']['ic_spearman'],
            'IC_IR': results['mixed']['ic_ir'],
            '样本数': results['mixed']['n_samples'],
            '样本占比': f"{results['mixed']['sample_ratio']:.2%}"
        })
    
    if len(summary_data) > 0:
        summary_df = pd.DataFrame(summary_data)
        print(summary_df.to_string(index=False))
    
    return results

print("✓ 已定义 ic_psr_csr_test")
print("✓ 已定义 ic_test_by_market_regime")


✓ 已定义 ic_psr_csr_test
✓ 已定义 ic_test_by_market_regime


In [92]:
# ============================================================
# 修复后的 jittering_test 函数（支持 EnsemblePipeline）
# ============================================================
# 请用此函数替换 Cell 7 中的 jittering_test 函数

def jittering_test(
    test_pipeline, 
    X_test, 
    y_test,
    N_TEST_SAMPLES=100,
    NOISE_LEVELS=[0.01, 0.05, 0.10, 0.20],
    N_JITTERS=50,
    market_excess=None,
    rf=None
):
    """
    Jittering测试：评估模型对输入噪声的稳定性 - 支持 EnsemblePipeline
    
    参数:
    test_pipeline: 训练好的 Pipeline 实例（EnsemblePipeline 或 FactorSelectionPipeline）
    X_test: 测试特征
    y_test: 测试目标
    N_TEST_SAMPLES: 测试样本数量
    NOISE_LEVELS: 噪声水平列表
    N_JITTERS: 每个噪声水平的重复次数
    market_excess: 市场超额回报（用于计算 sharpe ratio，可选）
    rf: 无风险利率（用于计算 sharpe ratio，可选）
    """
    if not test_pipeline.is_fitted_:
        raise ValueError("Pipeline 尚未拟合，请先调用 fit()")
    
    # 检测 pipeline 类型（支持新的 ParallelEnsemblePipeline）
    is_ensemble_pipeline = isinstance(test_pipeline, ParallelEnsemblePipeline)
    
    print("="*70)
    print("Jittering Test - 模型稳定性测试")
    if is_ensemble_pipeline:
        print("Pipeline 类型: ParallelEnsemblePipeline (并行双模型)")
    else:
        print("Pipeline 类型: 其他 Pipeline")
    print("="*70)
    
    # 随机选择测试样本
    test_indices = np.random.choice(len(X_test), min(N_TEST_SAMPLES, len(X_test)), replace=False)
    X_test_sample = X_test.iloc[test_indices].copy()
    y_test_sample = y_test.iloc[test_indices].copy()
    
    # 准备市场数据（用于计算 sharpe ratio）
    if market_excess is None:
        # 尝试从 X_test_sample 中获取
        if 'market_forward_excess_returns' in X_test_sample.columns:
            market_excess = X_test_sample['market_forward_excess_returns'].values
        else:
            market_excess = y_test_sample.values if hasattr(y_test_sample, 'values') else y_test_sample
    else:
        market_excess = market_excess[test_indices] if hasattr(market_excess, '__getitem__') else market_excess
    
    if rf is None:
        # 尝试从 X_test_sample 中获取
        if 'risk_free_rate_lag' in X_test_sample.columns:
            rf = X_test_sample['risk_free_rate_lag'].values
        elif 'risk_free_rate' in X_test_sample.columns:
            rf = X_test_sample['risk_free_rate'].values
        else:
            rf = np.zeros(len(y_test_sample))
    else:
        rf = rf[test_indices] if hasattr(rf, '__getitem__') else rf
    
    # 原始预测（使用 pipeline 的 predict 方法，内部自动处理特征工程和因子选择）
    y_pred_original = test_pipeline.predict(X_test_sample)
    
    # 计算原始 sharpe ratio
    if is_ensemble_pipeline:
        # EnsemblePipeline.predict() 已经返回最终仓位 [0, 2]，直接使用
        positions_original = pd.Series(y_pred_original, index=X_test_sample.index) if not isinstance(y_pred_original, pd.Series) else y_pred_original
    else:
        # FactorSelectionPipeline.predict() 返回收益率预测，需要转换为仓位
        forward_returns_lag = X_test_sample['forward_returns_lag'].values if 'forward_returns_lag' in X_test_sample.columns else np.zeros(len(y_test_sample))
        positions_original = generate_hft_positions(
            pd.Series(y_pred_original, index=X_test_sample.index),
            pd.Series(forward_returns_lag, index=X_test_sample.index)
        )
    
    sharpe_original = ad_sharpe_ratio_scorer(
        positions_original.values,
        market_excess,
        rf
    )
    
    print(f"\n测试样本数: {len(X_test_sample)}")
    print(f"噪声水平: {NOISE_LEVELS}")
    print(f"每个噪声水平的重复次数: {N_JITTERS}")
    print(f"原始 Adjusted Sharpe Ratio: {sharpe_original:.4f}")
    
    results_dict = {}
    
    # 对每个噪声水平进行测试
    for noise_level in NOISE_LEVELS:
        print(f"\n--- 噪声水平: {noise_level} ---")
        predictions_list = []
        sharpe_ratios = []
        
        for jitter in range(N_JITTERS):
            # 在原始特征上添加噪声（对数值特征）
            X_test_jittered = X_test_sample.copy()
            for col in X_test_jittered.columns:
                if X_test_jittered[col].dtype in [np.float64, np.float32, np.int64, np.int32]:
                    noise = np.random.normal(0, noise_level * X_test_jittered[col].std(), len(X_test_jittered))
                    X_test_jittered[col] = X_test_jittered[col] + noise
            
            # 使用 pipeline 的 predict() 方法（自动处理特征工程、因子选择、预测）
            y_pred_jittered = test_pipeline.predict(X_test_jittered)
            
            # 计算仓位和 sharpe ratio
            if is_ensemble_pipeline:
                # EnsemblePipeline.predict() 返回最终仓位 [0, 2]
                positions_jittered = pd.Series(y_pred_jittered, index=X_test_sample.index) if not isinstance(y_pred_jittered, pd.Series) else y_pred_jittered
            else:
                # FactorSelectionPipeline.predict() 返回收益率预测，需要转换为仓位
                forward_returns_lag = X_test_sample['forward_returns_lag'].values if 'forward_returns_lag' in X_test_sample.columns else np.zeros(len(y_test_sample))
                positions_jittered = generate_hft_positions(
                    pd.Series(y_pred_jittered, index=X_test_sample.index),
                    pd.Series(forward_returns_lag, index=X_test_sample.index)
                )
            
            # 保存预测值（用于统计）
            predictions_list.append(y_pred_jittered)
            
            # 计算 sharpe ratio
            sharpe_jittered = ad_sharpe_ratio_scorer(
                positions_jittered.values,
                market_excess,
                rf
            )
            sharpe_ratios.append(sharpe_jittered)
        
        # 计算统计量
        predictions_array = np.array(predictions_list)
        pred_mean = predictions_array.mean(axis=0)
        pred_std = predictions_array.std(axis=0)
        pred_cv = pred_std / (np.abs(pred_mean) + 1e-8)  # 变异系数
        
        # 计算 sharpe ratio 统计量
        sharpe_ratios = np.array(sharpe_ratios)
        sharpe_mean = np.mean(sharpe_ratios)
        sharpe_std = np.std(sharpe_ratios)
        sharpe_min = np.min(sharpe_ratios)
        sharpe_max = np.max(sharpe_ratios)
        sharpe_range = sharpe_max - sharpe_min
        sharpe_cv = sharpe_std / (np.abs(sharpe_mean) + 1e-8)  # sharpe ratio 变异系数
        
        results_dict[noise_level] = {
            'predictions': predictions_array,
            'mean': pred_mean,
            'std': pred_std,
            'cv': pred_cv,
            'mean_std': pred_std.mean(),
            'mean_cv': pred_cv.mean(),
            'max_std': pred_std.max(),
            'sharpe_ratios': sharpe_ratios,
            'sharpe_mean': sharpe_mean,
            'sharpe_std': sharpe_std,
            'sharpe_min': sharpe_min,
            'sharpe_max': sharpe_max,
            'sharpe_range': sharpe_range,
            'sharpe_cv': sharpe_cv
        }
        
        print(f"    平均预测标准差: {results_dict[noise_level]['mean_std']:.6f}")
        print(f"    平均变异系数: {results_dict[noise_level]['mean_cv']:.4f}")
        print(f"    最大预测标准差: {results_dict[noise_level]['max_std']:.6f}")
        print(f"    Sharpe Ratio 均值: {sharpe_mean:.4f}")
        print(f"    Sharpe Ratio 标准差: {sharpe_std:.4f}")
        print(f"    Sharpe Ratio 范围: [{sharpe_min:.4f}, {sharpe_max:.4f}]")
        print(f"    Sharpe Ratio 波动范围: {sharpe_range:.4f}")
        print(f"    Sharpe Ratio 变异系数: {sharpe_cv:.4f}")
    
    # 打印汇总表
    print("\n" + "="*70)
    print("Jittering Test 汇总")
    print("="*70)
    print(f"{'噪声水平':>10}  {'平均标准差':>15}  {'平均CV':>15}  {'最大标准差':>15}")
    print("-"*70)
    for noise_level in NOISE_LEVELS:
        pred_mean = results_dict[noise_level]['mean']
        print(f"{noise_level:>10.2f}  "
              f"{results_dict[noise_level]['mean_std']:>15.6f}  "
              f"{results_dict[noise_level]['mean_cv']:>15.4f}  "
              f"{results_dict[noise_level]['max_std']:>15.6f}")
    
    # 打印 Sharpe Ratio 汇总表
    print("\n" + "="*70)
    print("Sharpe Ratio 波动分析")
    print("="*70)
    print(f"{'噪声水平':>10}  {'Sharpe均值':>15}  {'Sharpe标准差':>15}  {'Sharpe范围':>20}  {'Sharpe CV':>15}")
    print("-"*90)
    print(f"{'原始':>10}  {sharpe_original:>15.4f}  {'-':>15}  {'-':>20}  {'-':>15}")
    for noise_level in NOISE_LEVELS:
        sharpe_range_str = f"[{results_dict[noise_level]['sharpe_min']:.4f}, {results_dict[noise_level]['sharpe_max']:.4f}]"
        print(f"{noise_level:>10.2f}  "
              f"{results_dict[noise_level]['sharpe_mean']:>15.4f}  "
              f"{results_dict[noise_level]['sharpe_std']:>15.4f}  "
              f"{sharpe_range_str:>20}  "
              f"{results_dict[noise_level]['sharpe_cv']:>15.4f}")
    
    # 可视化
    fig, axes = plt.subplots(3, 2, figsize=(14, 15))
    
    # 子图1: 预测值分布（箱线图）
    ax1 = axes[0, 0]
    box_data = []
    labels = []
    for noise_level in NOISE_LEVELS:
        sample_predictions = results_dict[noise_level]['predictions'][:, :10].flatten()
        box_data.append(sample_predictions)
        labels.append(f'{noise_level}')
    ax1.boxplot(box_data, labels=labels)
    ax1.set_xlabel('噪声水平')
    ax1.set_ylabel('预测值')
    ax1.set_title('不同噪声水平下的预测值分布')
    ax1.grid(True, alpha=0.3)
    
    # 子图2: 标准差 vs 噪声水平
    ax2 = axes[0, 1]
    mean_stds = [results_dict[nl]['mean_std'] for nl in NOISE_LEVELS]
    max_stds = [results_dict[nl]['max_std'] for nl in NOISE_LEVELS]
    ax2.plot(NOISE_LEVELS, mean_stds, 'o-', label='平均标准差', linewidth=2)
    ax2.plot(NOISE_LEVELS, max_stds, 's-', label='最大标准差', linewidth=2)
    ax2.set_xlabel('噪声水平')
    ax2.set_ylabel('预测标准差')
    ax2.set_title('预测稳定性 vs 噪声水平')
    ax2.legend()
    ax2.grid(True, alpha=0.3)
    
    # 子图3: 变异系数分布
    ax3 = axes[1, 0]
    for noise_level in NOISE_LEVELS:
        cv_values = results_dict[noise_level]['cv']
        ax3.hist(cv_values, bins=20, alpha=0.5, label=f'{noise_level}', density=True)
    ax3.set_xlabel('变异系数')
    ax3.set_ylabel('密度')
    ax3.set_title('变异系数分布')
    ax3.legend()
    ax3.grid(True, alpha=0.3)
    
    # 子图4: 预测值对比（原始 vs 加噪声）
    ax4 = axes[1, 1]
    ax4.scatter(y_pred_original[:20], results_dict[NOISE_LEVELS[0]]['mean'][:20], 
               alpha=0.6, label=f'噪声={NOISE_LEVELS[0]}')
    ax4.scatter(y_pred_original[:20], results_dict[NOISE_LEVELS[-1]]['mean'][:20], 
               alpha=0.6, label=f'噪声={NOISE_LEVELS[-1]}')
    min_val = min(y_pred_original.min(), min([results_dict[nl]['mean'].min() for nl in NOISE_LEVELS]))
    max_val = max(y_pred_original.max(), max([results_dict[nl]['mean'].max() for nl in NOISE_LEVELS]))
    ax4.plot([min_val, max_val], [min_val, max_val], 'r--', linewidth=2, label='y=x')
    ax4.set_xlabel('原始预测值')
    ax4.set_ylabel('加噪声后预测值')
    ax4.set_title('预测值稳定性')
    ax4.legend()
    ax4.grid(True, alpha=0.3)
    
    # 子图5: Sharpe Ratio 分布（箱线图）
    ax5 = axes[2, 0]
    sharpe_box_data = []
    sharpe_labels = []
    # 添加原始 sharpe ratio
    sharpe_box_data.append([sharpe_original])
    sharpe_labels.append('原始')
    for noise_level in NOISE_LEVELS:
        sharpe_box_data.append(results_dict[noise_level]['sharpe_ratios'])
        sharpe_labels.append(f'{noise_level}')
    ax5.boxplot(sharpe_box_data, labels=sharpe_labels)
    ax5.set_xlabel('噪声水平')
    ax5.set_ylabel('Adjusted Sharpe Ratio')
    ax5.set_title('不同噪声水平下的 Sharpe Ratio 分布')
    ax5.axhline(y=sharpe_original, color='r', linestyle='--', linewidth=2, label='原始 Sharpe')
    ax5.legend()
    ax5.grid(True, alpha=0.3)
    
    # 子图6: Sharpe Ratio 波动 vs 噪声水平
    ax6 = axes[2, 1]
    sharpe_means = [sharpe_original] + [results_dict[nl]['sharpe_mean'] for nl in NOISE_LEVELS]
    sharpe_stds = [0] + [results_dict[nl]['sharpe_std'] for nl in NOISE_LEVELS]
    sharpe_mins = [sharpe_original] + [results_dict[nl]['sharpe_min'] for nl in NOISE_LEVELS]
    sharpe_maxs = [sharpe_original] + [results_dict[nl]['sharpe_max'] for nl in NOISE_LEVELS]
    noise_levels_plot = [0] + list(NOISE_LEVELS)
    
    ax6.plot(noise_levels_plot, sharpe_means, 'o-', label='Sharpe 均值', linewidth=2, markersize=8)
    ax6.fill_between(noise_levels_plot, 
                     [m - s for m, s in zip(sharpe_means, sharpe_stds)],
                     [m + s for m, s in zip(sharpe_means, sharpe_stds)],
                     alpha=0.3, label='±1 标准差')
    ax6.plot(noise_levels_plot, sharpe_mins, 's--', label='Sharpe 最小值', linewidth=1.5, markersize=6)
    ax6.plot(noise_levels_plot, sharpe_maxs, '^--', label='Sharpe 最大值', linewidth=1.5, markersize=6)
    ax6.set_xlabel('噪声水平')
    ax6.set_ylabel('Adjusted Sharpe Ratio')
    ax6.set_title('Sharpe Ratio 波动 vs 噪声水平')
    ax6.legend()
    ax6.grid(True, alpha=0.3)
    
    plt.tight_layout()
    plt.savefig('jittering_test.png', dpi=150, bbox_inches='tight')
    print(f"\n图表已保存到: jittering_test.png")
    plt.close(fig)  # 关闭图形，避免在 Jupyter notebook 中重复显示
    
    # 在返回结果中添加原始 sharpe ratio
    results_dict['_original_sharpe'] = sharpe_original
    
    return results_dict

print("✓ 已定义修复后的 jittering_test（支持 EnsemblePipeline）")
print("⚠ 请用此函数替换 Cell 7 中的旧版本")


✓ 已定义修复后的 jittering_test（支持 EnsemblePipeline）
⚠ 请用此函数替换 Cell 7 中的旧版本


In [93]:
# ============================================================
# 重要更新：Meta模型标签生成和最终仓位聚合逻辑
# ============================================================
# 
# 已完成的修改：
# 1. Meta模型的标签生成：现在使用HFT生成的理想仓位来判断方向正确性
#    - 方向模型的理想仓位 vs 真实收益率的理想仓位
#    - 1代表同号（方向正确），0代表异号（方向错误）
# 
# 2. 最终仓位聚合：直接使用 meta * 方向
#    - 不再使用 generate_hft_positions 函数
#    - 因为方向模型的预测已经是仓位scale，不需要再次转换
#
# 需要手动更新的地方：
# - EnsemblePipeline.predict() 方法中的 generate_ensemble_positions 调用
#   将 forward_returns_lag 参数改为 None（或移除该参数）

print("="*70)
print("重要更新说明")
print("="*70)
print("""
已完成的修改：

1. Meta模型标签生成逻辑（MetaLabelPipeline.fit()）:
   ✓ 现在使用HFT生成的理想仓位来判断方向正确性
   ✓ 方向模型的理想仓位 vs 真实收益率的理想仓位
   ✓ 1代表同号（方向正确），0代表异号（方向错误）
   ✓ 逻辑与方向模型一致

2. 最终仓位聚合函数（generate_ensemble_positions()）:
   ✓ 直接使用 meta * 方向
   ✓ 不再使用 generate_hft_positions 函数
   ✓ 因为方向模型的预测已经是仓位scale，不需要再次转换

需要手动更新的地方：
- EnsemblePipeline.predict() 方法（约第2911-2921行）
  将以下代码：
  
  if 'forward_returns_lag' not in X.columns:
      raise ValueError("X 必须包含 'forward_returns_lag' 列")
  forward_returns_lag = X['forward_returns_lag']
  final_positions = generate_ensemble_positions(
      direction_predictions,
      confidence_predictions,
      forward_returns_lag,
      allow_short=allow_short
  )
  
  改为：
  
  final_positions = generate_ensemble_positions(
      direction_predictions,
      confidence_predictions,
      forward_returns_lag=None,  # 不再需要
      allow_short=allow_short
  )
""")

print("\n✓ 更新说明已显示")

重要更新说明

已完成的修改：

1. Meta模型标签生成逻辑（MetaLabelPipeline.fit()）:
   ✓ 现在使用HFT生成的理想仓位来判断方向正确性
   ✓ 方向模型的理想仓位 vs 真实收益率的理想仓位
   ✓ 1代表同号（方向正确），0代表异号（方向错误）
   ✓ 逻辑与方向模型一致

2. 最终仓位聚合函数（generate_ensemble_positions()）:
   ✓ 直接使用 meta * 方向
   ✓ 不再使用 generate_hft_positions 函数
   ✓ 因为方向模型的预测已经是仓位scale，不需要再次转换

需要手动更新的地方：
- EnsemblePipeline.predict() 方法（约第2911-2921行）
  将以下代码：
  
  if 'forward_returns_lag' not in X.columns:
      raise ValueError("X 必须包含 'forward_returns_lag' 列")
  forward_returns_lag = X['forward_returns_lag']
  final_positions = generate_ensemble_positions(
      direction_predictions,
      confidence_predictions,
      forward_returns_lag,
      allow_short=allow_short
  )
  
  改为：
  
  final_positions = generate_ensemble_positions(
      direction_predictions,
      confidence_predictions,
      forward_returns_lag=None,  # 不再需要
      allow_short=allow_short
  )


✓ 更新说明已显示


In [94]:
# ============================================================
# 6. 拟合图（可视化结果）
# ============================================================

def plot_fitting_results(
    results_df,
    oof_predictions,
    ic_results=None,
    save_path='fitting_results.png'
):
    """
    生成综合拟合结果图
    
    参数:
    results_df: 包含预测和真实值的DataFrame
    oof_predictions: OOF预测序列
    ic_results: IC测试结果（可选）
    save_path: 保存路径
    """
    # 设置中文字体
    plt.rcParams['font.sans-serif'] = ['Arial Unicode MS', 'SimHei', 'DejaVu Sans']  # 支持中文
    plt.rcParams['axes.unicode_minus'] = False  # 解决负号显示问题
    
    print("="*70)
    print("生成拟合结果图")
    print("="*70)
    
    # 准备数据：确保索引对齐
    # 找到 oof_predictions 和 results_df 的交集索引
    common_indices = oof_predictions.index.intersection(results_df.index)
    oof_predictions_aligned = oof_predictions.loc[common_indices]
    results_df_aligned = results_df.loc[common_indices]
    
    # 创建有效掩码（非NaN的预测值）
    valid_mask = ~oof_predictions_aligned.isna()
    
    # 获取有效数据
    valid_indices = oof_predictions_aligned.index[valid_mask]
    y_pred = oof_predictions_aligned[valid_mask].values
    y_true = results_df_aligned.loc[valid_indices, 'market_forward_excess_returns'].values
    
    # 创建图形
    fig = plt.figure(figsize=(18, 12))
    
    # 子图1: 预测值 vs 实际值散点图
    ax1 = plt.subplot(3, 3, 1)
    ax1.scatter(y_pred, y_true, alpha=0.5, s=10)
    # 添加回归线
    z = np.polyfit(y_pred, y_true, 1)
    p = np.poly1d(z)
    ax1.plot(y_pred, p(y_pred), "r--", alpha=0.8, linewidth=2, 
            label=f'Regression Line (IC={np.corrcoef(y_pred, y_true)[0,1]:.4f})')
    ax1.set_xlabel('Predicted')
    ax1.set_ylabel('Actual Returns')
    ax1.set_title('Predicted vs Actual Returns')
    ax1.legend()
    ax1.grid(True, alpha=0.3)
    
    # 子图2: 时间序列对比
    ax2 = plt.subplot(3, 3, 2)
    indices = valid_indices  # 使用对齐后的有效索引
    ax2.plot(range(len(y_pred)), y_pred, alpha=0.6, label='Predicted', linewidth=1)
    ax2.plot(range(len(y_true)), y_true, alpha=0.6, label='Actual', linewidth=1)
    ax2.set_xlabel('Time Index')
    ax2.set_ylabel('Returns')
    ax2.set_title('Predicted vs Actual Time Series')
    ax2.legend()
    ax2.grid(True, alpha=0.3)
    
    # 子图3: 滚动IC时间序列
    ax3 = plt.subplot(3, 3, 3)
    if ic_results is not None and 'rolling_ic' in ic_results:
        rolling_ic = ic_results['rolling_ic']
        ax3.plot(range(len(rolling_ic)), rolling_ic, alpha=0.7, linewidth=1)
        ax3.axhline(y=0, color='r', linestyle='--', linewidth=1)
        ax3.axhline(y=np.nanmean(rolling_ic), color='g', linestyle='--', linewidth=1, 
                   label=f'Mean: {np.nanmean(rolling_ic):.4f}')
        ax3.set_xlabel('Time Point')
        ax3.set_ylabel('Rolling IC (30 days)')
        ax3.set_title('Rolling IC Time Series')
        ax3.legend()
        ax3.grid(True, alpha=0.3)
    
    # 子图4: 分层收益
    ax4 = plt.subplot(3, 3, 4)
    if ic_results is not None and 'decile_df' in ic_results:
        decile_df = ic_results['decile_df']
        ax4.bar(decile_df['Decile'], decile_df['Mean_Return'] * 252 * 100, 
               color='steelblue', alpha=0.7)
        ax4.axhline(y=0, color='r', linestyle='--', linewidth=1)
        ax4.set_xlabel('Prediction Quantile')
        ax4.set_ylabel('Annualized Avg Returns (%)')
        ax4.set_title('Decile Analysis - Avg Returns by Quantile')
        ax4.grid(True, alpha=0.3, axis='y')
    
    # 子图5: 预测值分布
    ax5 = plt.subplot(3, 3, 5)
    ax5.hist(y_pred, bins=50, alpha=0.6, color='steelblue', label='Predicted', density=True)
    ax5.axvline(y_pred.mean(), color='r', linestyle='--', linewidth=2, 
               label=f'Mean: {y_pred.mean():.4f}')
    ax5.set_xlabel('Predicted')
    ax5.set_ylabel('Density')
    ax5.set_title('Predicted Distribution')
    ax5.legend()
    ax5.grid(True, alpha=0.3)
    
    # 子图6: 实际收益分布
    ax6 = plt.subplot(3, 3, 6)
    ax6.hist(y_true, bins=50, alpha=0.6, color='coral', label='Actual Returns', density=True)
    ax6.axvline(y_true.mean(), color='r', linestyle='--', linewidth=2, 
               label=f'Mean: {y_true.mean():.4f}')
    ax6.set_xlabel('Actual Returns')
    ax6.set_ylabel('Density')
    ax6.set_title('Actual Returns Distribution')
    ax6.legend()
    ax6.grid(True, alpha=0.3)
    
    # 子图7: 残差分析
    ax7 = plt.subplot(3, 3, 7)
    residuals = y_true - y_pred
    ax7.scatter(y_pred, residuals, alpha=0.5, s=10)
    ax7.axhline(y=0, color='r', linestyle='--', linewidth=2)
    ax7.set_xlabel('Predicted')
    ax7.set_ylabel('Residuals (Actual - Predicted)')
    ax7.set_title('Residual Analysis')
    ax7.grid(True, alpha=0.3)
    
    # 子图8: 累积收益对比（如果有仓位数据）
    ax8 = plt.subplot(3, 3, 8)
    if 'positions' in results_df_aligned.columns:
        # 计算策略累积收益
        pos = results_df_aligned.loc[valid_indices, 'positions'].values
        rf = results_df_aligned.loc[valid_indices, 'risk_free_rate'].values if 'risk_free_rate' in results_df_aligned.columns else np.zeros(len(pos))
        
        # 使用 market_forward_excess_returns 或 forward_returns
        if 'forward_returns' in results_df_aligned.columns:
            forward_returns = results_df_aligned.loc[valid_indices, 'forward_returns'].values
        else:
            forward_returns = y_true + rf  # 如果没有 forward_returns，从超额收益重建
        
        strategy_returns = rf * (1 - pos) + pos * forward_returns
        benchmark_returns = forward_returns
        
        strategy_cumulative = (1 + strategy_returns).cumprod()
        benchmark_cumulative = (1 + benchmark_returns).cumprod()
        
        ax8.plot(range(len(strategy_cumulative)), strategy_cumulative, label='Strategy', linewidth=2)
        ax8.plot(range(len(benchmark_cumulative)), benchmark_cumulative, label='Benchmark', linewidth=2, linestyle='--')
        ax8.set_xlabel('Time Index')
        ax8.set_ylabel('Cumulative Returns')
        ax8.set_title('Strategy vs Benchmark Cumulative Returns')
        ax8.legend()
        ax8.grid(True, alpha=0.3)
    
    # 子图9: IC分布
    ax9 = plt.subplot(3, 3, 9)
    if ic_results is not None and 'rolling_ic' in ic_results:
        rolling_ic = ic_results['rolling_ic']
        ax9.hist(rolling_ic[~np.isnan(rolling_ic)], bins=30, alpha=0.6, color='green', density=True)
        ax9.axvline(np.nanmean(rolling_ic), color='r', linestyle='--', linewidth=2, 
                   label=f'Mean: {np.nanmean(rolling_ic):.4f}')
        ax9.axvline(0, color='k', linestyle='-', linewidth=1)
        ax9.set_xlabel('IC Value')
        ax9.set_ylabel('Density')
        ax9.set_title('Rolling IC Distribution')
        ax9.legend()
        ax9.grid(True, alpha=0.3)
    
    plt.tight_layout()
    plt.savefig(save_path, dpi=150, bbox_inches='tight')
    print(f"\n图表已保存到: {save_path}")
    
    # 关闭图形，避免在 Jupyter notebook 中重复显示
    plt.close(fig)
    
    return fig

print("✓ 已定义 plot_fitting_results")


✓ 已定义 plot_fitting_results


In [95]:
# ============================================================
# 数据流诊断工具 - 用于检查IC计算前的数据质量
# ============================================================

def diagnose_ic_data_flow(y_pred, y_true, market_returns=None, name="数据"):
    """
    完整诊断数据流，用于IC计算前的检查
    
    参数:
    y_pred: 预测值
    y_true: 真实值
    market_returns: 市场收益率（可选，用于市场状态分组）
    name: 数据名称
    
    返回:
    diagnosis: 诊断结果字典
    """
    print("="*70)
    print(f"数据流诊断: {name}")
    print("="*70)
    
    y_pred = np.asarray(y_pred)
    y_true = np.asarray(y_true)
    
    diagnosis = {
        'is_valid': True,
        'issues': [],
        'stats': {}
    }
    
    # 1. 检查长度
    print(f"\n1. 长度检查:")
    print(f"   y_pred长度: {len(y_pred)}")
    print(f"   y_true长度: {len(y_true)}")
    
    if len(y_pred) != len(y_true):
        diagnosis['is_valid'] = False
        diagnosis['issues'].append(f"长度不一致: y_pred={len(y_pred)}, y_true={len(y_true)}")
        print(f"   ✗ 错误: 长度不一致！")
    else:
        print(f"   ✓ 长度一致")
    
    if len(y_pred) < 2:
        diagnosis['is_valid'] = False
        diagnosis['issues'].append(f"数据长度不足: 需要至少2个样本，实际只有{len(y_pred)}个")
        print(f"   ✗ 错误: 数据长度不足（需要至少2个样本）")
    else:
        print(f"   ✓ 长度足够")
    
    # 2. 检查NaN
    print(f"\n2. NaN检查:")
    nan_pred = np.isnan(y_pred).sum()
    nan_true = np.isnan(y_true).sum()
    print(f"   y_pred NaN数量: {nan_pred}")
    print(f"   y_true NaN数量: {nan_true}")
    
    if nan_pred > 0:
        nan_indices_pred = np.where(np.isnan(y_pred))[0][:10]
        diagnosis['is_valid'] = False
        diagnosis['issues'].append(f"y_pred包含{nan_pred}个NaN值（位置示例: {nan_indices_pred.tolist()}）")
        print(f"   ✗ y_pred包含NaN值！位置示例: {nan_indices_pred.tolist()}")
    else:
        print(f"   ✓ y_pred无NaN")
    
    if nan_true > 0:
        nan_indices_true = np.where(np.isnan(y_true))[0][:10]
        diagnosis['is_valid'] = False
        diagnosis['issues'].append(f"y_true包含{nan_true}个NaN值（位置示例: {nan_indices_true.tolist()}）")
        print(f"   ✗ y_true包含NaN值！位置示例: {nan_indices_true.tolist()}")
    else:
        print(f"   ✓ y_true无NaN")
    
    # 3. 检查Inf
    print(f"\n3. Inf检查:")
    inf_pred = np.isinf(y_pred).sum()
    inf_true = np.isinf(y_true).sum()
    print(f"   y_pred Inf数量: {inf_pred}")
    print(f"   y_true Inf数量: {inf_true}")
    
    if inf_pred > 0 or inf_true > 0:
        diagnosis['is_valid'] = False
        diagnosis['issues'].append(f"数据包含Inf值: y_pred={inf_pred}, y_true={inf_true}")
        print(f"   ✗ 数据包含Inf值！")
    else:
        print(f"   ✓ 无Inf值")
    
    # 4. 检查方差
    print(f"\n4. 方差检查:")
    var_pred = np.var(y_pred)
    var_true = np.var(y_true)
    print(f"   y_pred方差: {var_pred:.10f}")
    print(f"   y_true方差: {var_true:.10f}")
    
    if var_pred == 0:
        diagnosis['is_valid'] = False
        diagnosis['issues'].append(f"y_pred方差为0（所有值相同: {y_pred[0] if len(y_pred) > 0 else 'N/A'}）")
        print(f"   ✗ y_pred方差为0（所有值相同: {y_pred[0] if len(y_pred) > 0 else 'N/A'}）")
    else:
        print(f"   ✓ y_pred有方差")
    
    if var_true == 0:
        diagnosis['is_valid'] = False
        diagnosis['issues'].append(f"y_true方差为0（所有值相同: {y_true[0] if len(y_true) > 0 else 'N/A'}）")
        print(f"   ✗ y_true方差为0（所有值相同: {y_true[0] if len(y_true) > 0 else 'N/A'}）")
    else:
        print(f"   ✓ y_true有方差")
    
    # 5. 统计信息
    print(f"\n5. 统计信息:")
    stats_pred = {
        'min': np.min(y_pred),
        'max': np.max(y_pred),
        'mean': np.mean(y_pred),
        'std': np.std(y_pred),
        'median': np.median(y_pred)
    }
    stats_true = {
        'min': np.min(y_true),
        'max': np.max(y_true),
        'mean': np.mean(y_true),
        'std': np.std(y_true),
        'median': np.median(y_true)
    }
    diagnosis['stats'] = {'y_pred': stats_pred, 'y_true': stats_true}
    
    print(f"   y_pred: min={stats_pred['min']:.6f}, max={stats_pred['max']:.6f}, mean={stats_pred['mean']:.6f}, std={stats_pred['std']:.6f}, median={stats_pred['median']:.6f}")
    print(f"   y_true: min={stats_true['min']:.6f}, max={stats_true['max']:.6f}, mean={stats_true['mean']:.6f}, std={stats_true['std']:.6f}, median={stats_true['median']:.6f}")
    
    # 6. 检查市场收益率（如果提供）
    if market_returns is not None:
        print(f"\n6. 市场收益率检查:")
        market_returns = np.asarray(market_returns)
        nan_market = np.isnan(market_returns).sum()
        print(f"   市场收益率长度: {len(market_returns)}")
        print(f"   NaN数量: {nan_market}")
        
        if nan_market > 0:
            diagnosis['issues'].append(f"市场收益率包含{nan_market}个NaN值")
            print(f"   ⚠ 市场收益率包含NaN值")
        else:
            print(f"   ✓ 市场收益率无NaN")
        
        # 检查市场状态分组
        if len(market_returns) == len(y_pred):
            threshold_bull = 0.02
            threshold_bear = -0.02
            bull_mask = market_returns > threshold_bull
            bear_mask = market_returns < threshold_bear
            mixed_mask = ~(bull_mask | bear_mask)
            
            print(f"   市场状态分布:")
            print(f"     牛市 (> {threshold_bull:.2%}): {bull_mask.sum()} 个样本")
            print(f"     熊市 (< {threshold_bear:.2%}): {bear_mask.sum()} 个样本")
            print(f"     混合市场: {mixed_mask.sum()} 个样本")
            
            # 检查每个市场状态的数据质量
            if bull_mask.sum() > 0:
                y_pred_bull = y_pred[bull_mask]
                y_true_bull = y_true[bull_mask]
                nan_bull_pred = np.isnan(y_pred_bull).sum()
                nan_bull_true = np.isnan(y_true_bull).sum()
                var_bull_pred = np.var(y_pred_bull)
                var_bull_true = np.var(y_true_bull)
                print(f"     牛市数据质量: y_pred NaN={nan_bull_pred}, y_true NaN={nan_bull_true}, y_pred方差={var_bull_pred:.6f}, y_true方差={var_bull_true:.6f}")
                if nan_bull_pred > 0 or nan_bull_true > 0 or var_bull_pred == 0 or var_bull_true == 0:
                    diagnosis['issues'].append(f"牛市数据有问题: NaN或方差为0")
            
            if bear_mask.sum() > 0:
                y_pred_bear = y_pred[bear_mask]
                y_true_bear = y_true[bear_mask]
                nan_bear_pred = np.isnan(y_pred_bear).sum()
                nan_bear_true = np.isnan(y_true_bear).sum()
                var_bear_pred = np.var(y_pred_bear)
                var_bear_true = np.var(y_true_bear)
                print(f"     熊市数据质量: y_pred NaN={nan_bear_pred}, y_true NaN={nan_bear_true}, y_pred方差={var_bear_pred:.6f}, y_true方差={var_bear_true:.6f}")
                if nan_bear_pred > 0 or nan_bear_true > 0 or var_bear_pred == 0 or var_bear_true == 0:
                    diagnosis['issues'].append(f"熊市数据有问题: NaN或方差为0")
    
    # 总结
    print(f"\n" + "="*70)
    if diagnosis['is_valid'] and len(diagnosis['issues']) == 0:
        print(f"✓ 数据质量检查通过，可以计算IC")
    else:
        print(f"✗ 数据质量检查失败，发现问题:")
        for issue in diagnosis['issues']:
            print(f"  - {issue}")
    print("="*70)
    
    return diagnosis

print("✓ 已定义数据流诊断工具: diagnose_ic_data_flow")

✓ 已定义数据流诊断工具: diagnose_ic_data_flow


In [96]:
# ============================================================
# 改进版本的 ic_test_by_market_regime - 包含完整数据验证
# ============================================================

def ic_test_by_market_regime_improved(
    y_pred, 
    y_true, 
    market_returns,
    threshold_bull=0.02,
    threshold_bear=-0.02,
    window_size=30
):
    """
    改进版本：按市场状态（牛市/熊市/混合市场）分别测试 IC
    包含完整的数据验证和诊断
    
    参数:
    y_pred: 预测值
    y_true: 真实值
    market_returns: 市场收益率序列（用于判断市场状态）
    threshold_bull: 牛市阈值（默认：日收益率 > 2%）
    threshold_bear: 熊市阈值（默认：日收益率 < -2%）
    window_size: 滚动窗口大小（用于计算滚动IC）
    
    返回:
    results: 包含各市场状态IC测试结果的字典
    """
    print("="*70)
    print("按市场状态分组 IC 测试（改进版 - 含数据验证）")
    print("="*70)
    
    y_pred = np.asarray(y_pred)
    y_true = np.asarray(y_true)
    market_returns = np.asarray(market_returns)
    
    # 确保长度一致
    min_len = min(len(y_pred), len(y_true), len(market_returns))
    y_pred = y_pred[:min_len]
    y_true = y_true[:min_len]
    market_returns = market_returns[:min_len]
    
    # 先进行整体数据诊断
    diagnosis = diagnose_ic_data_flow(y_pred, y_true, market_returns, name="整体数据")
    
    if not diagnosis['is_valid']:
        print("\n⚠ 警告: 整体数据存在问题，但继续尝试按市场状态分组计算...")
    
    # 判断市场状态
    bull_mask = market_returns > threshold_bull
    bear_mask = market_returns < threshold_bear
    mixed_mask = ~(bull_mask | bear_mask)
    
    results = {}
    
    print(f"\n市场状态划分标准:")
    print(f"  牛市: 市场收益率 > {threshold_bull:.2%}")
    print(f"  熊市: 市场收益率 < {threshold_bear:.2%}")
    print(f"  混合市场: {threshold_bear:.2%} <= 市场收益率 <= {threshold_bull:.2%}")
    
    # 辅助函数：安全计算IC
    def safe_calculate_ic(y_pred_subset, y_true_subset, name=""):
        """安全计算IC，包含数据验证"""
        # 检查NaN
        nan_pred = np.isnan(y_pred_subset).sum()
        nan_true = np.isnan(y_true_subset).sum()
        if nan_pred > 0 or nan_true > 0:
            print(f"  ✗ {name}数据包含NaN: y_pred={nan_pred}, y_true={nan_true}")
            return np.nan, np.nan
        
        # 检查方差
        var_pred = np.var(y_pred_subset)
        var_true = np.var(y_true_subset)
        if var_pred == 0:
            print(f"  ✗ {name}y_pred方差为0（所有值相同: {y_pred_subset[0] if len(y_pred_subset) > 0 else 'N/A'}）")
            return np.nan, np.nan
        if var_true == 0:
            print(f"  ✗ {name}y_true方差为0（所有值相同: {y_true_subset[0] if len(y_true_subset) > 0 else 'N/A'}）")
            return np.nan, np.nan
        
        # 计算IC
        try:
            ic_pearson = np.corrcoef(y_pred_subset, y_true_subset)[0, 1]
        except Exception as e:
            print(f"  ✗ 计算Pearson IC时出错: {e}")
            ic_pearson = np.nan
        
        try:
            ic_spearman_result = stats.spearmanr(y_pred_subset, y_true_subset)
            ic_spearman = ic_spearman_result[0] if hasattr(ic_spearman_result, '__len__') else ic_spearman_result
        except Exception as e:
            print(f"  ✗ 计算Spearman IC时出错: {e}")
            ic_spearman = np.nan
        
        return ic_pearson, ic_spearman
    
    # ============================================================
    # 1. 牛市 IC 测试
    # ============================================================
    print("\n" + "="*70)
    print("1. 牛市 IC 测试")
    print("="*70)
    
    if bull_mask.sum() > 10:
        y_pred_bull = y_pred[bull_mask]
        y_true_bull = y_true[bull_mask]
        
        print(f"样本数: {len(y_pred_bull)}")
        ic_pearson_bull, ic_spearman_bull = safe_calculate_ic(y_pred_bull, y_true_bull, "牛市")
        
        # IC显著性检验
        n_bull = len(y_pred_bull)
        if not np.isnan(ic_pearson_bull):
            ic_t_stat_bull = ic_pearson_bull * np.sqrt(n_bull - 2) / np.sqrt(1 - ic_pearson_bull**2 + 1e-8)
            ic_p_value_bull = 2 * (1 - stats.t.cdf(abs(ic_t_stat_bull), n_bull - 2))
        else:
            ic_t_stat_bull = np.nan
            ic_p_value_bull = np.nan
        
        # 滚动IC
        rolling_ic_bull = []
        for i in range(window_size, len(y_pred_bull)):
            window_pred = y_pred_bull[i-window_size:i]
            window_true = y_true_bull[i-window_size:i]
            ic_window, _ = safe_calculate_ic(window_pred, window_true, "")
            rolling_ic_bull.append(ic_window)
        
        rolling_ic_bull = np.array(rolling_ic_bull)
        ic_mean_bull = np.nanmean(rolling_ic_bull) if len(rolling_ic_bull) > 0 else np.nan
        ic_std_bull = np.nanstd(rolling_ic_bull) if len(rolling_ic_bull) > 0 else np.nan
        ic_ir_bull = ic_mean_bull / (ic_std_bull + 1e-8) if not np.isnan(ic_std_bull) else np.nan
        
        print(f"整体 IC (Pearson):  {ic_pearson_bull:.4f}")
        print(f"整体 IC (Spearman): {ic_spearman_bull:.4f}")
        print(f"IC t-statistic: {ic_t_stat_bull:.4f}")
        print(f"IC p-value: {ic_p_value_bull:.6f}")
        if not np.isnan(ic_p_value_bull) and ic_p_value_bull < 0.05:
            print("  ✓ IC 在 5% 水平上显著")
        else:
            print("  ✗ IC 不显著")
        
        if len(rolling_ic_bull) > 0:
            print(f"\n滚动IC分析（{window_size}天窗口）:")
            print(f"  滚动IC均值: {ic_mean_bull:.4f}")
            print(f"  滚动IC标准差: {ic_std_bull:.4f}")
            print(f"  滚动IC > 0 的比例: {(rolling_ic_bull > 0).mean():.2%}")
            print(f"  IC比率 (IC_IR): {ic_ir_bull:.4f}")
        
        results['bull'] = {
            'ic_pearson': ic_pearson_bull,
            'ic_spearman': ic_spearman_bull,
            'ic_p_value': ic_p_value_bull,
            'ic_ir': ic_ir_bull,
            'rolling_ic': rolling_ic_bull,
            'n_samples': n_bull,
            'sample_ratio': n_bull / min_len
        }
    else:
        print(f"样本数不足（{bull_mask.sum()} 个），跳过牛市IC测试")
        results['bull'] = None
    
    # ============================================================
    # 2. 熊市 IC 测试
    # ============================================================
    print("\n" + "="*70)
    print("2. 熊市 IC 测试")
    print("="*70)
    
    if bear_mask.sum() > 10:
        y_pred_bear = y_pred[bear_mask]
        y_true_bear = y_true[bear_mask]
        
        print(f"样本数: {len(y_pred_bear)}")
        ic_pearson_bear, ic_spearman_bear = safe_calculate_ic(y_pred_bear, y_true_bear, "熊市")
        
        # IC显著性检验
        n_bear = len(y_pred_bear)
        if not np.isnan(ic_pearson_bear):
            ic_t_stat_bear = ic_pearson_bear * np.sqrt(n_bear - 2) / np.sqrt(1 - ic_pearson_bear**2 + 1e-8)
            ic_p_value_bear = 2 * (1 - stats.t.cdf(abs(ic_t_stat_bear), n_bear - 2))
        else:
            ic_t_stat_bear = np.nan
            ic_p_value_bear = np.nan
        
        # 滚动IC
        rolling_ic_bear = []
        for i in range(window_size, len(y_pred_bear)):
            window_pred = y_pred_bear[i-window_size:i]
            window_true = y_true_bear[i-window_size:i]
            ic_window, _ = safe_calculate_ic(window_pred, window_true, "")
            rolling_ic_bear.append(ic_window)
        
        rolling_ic_bear = np.array(rolling_ic_bear)
        ic_mean_bear = np.nanmean(rolling_ic_bear) if len(rolling_ic_bear) > 0 else np.nan
        ic_std_bear = np.nanstd(rolling_ic_bear) if len(rolling_ic_bear) > 0 else np.nan
        ic_ir_bear = ic_mean_bear / (ic_std_bear + 1e-8) if not np.isnan(ic_std_bear) else np.nan
        
        print(f"整体 IC (Pearson):  {ic_pearson_bear:.4f}")
        print(f"整体 IC (Spearman): {ic_spearman_bear:.4f}")
        print(f"IC t-statistic: {ic_t_stat_bear:.4f}")
        print(f"IC p-value: {ic_p_value_bear:.6f}")
        if not np.isnan(ic_p_value_bear) and ic_p_value_bear < 0.05:
            print("  ✓ IC 在 5% 水平上显著")
        else:
            print("  ✗ IC 不显著")
        
        if len(rolling_ic_bear) > 0:
            print(f"\n滚动IC分析（{window_size}天窗口）:")
            print(f"  滚动IC均值: {ic_mean_bear:.4f}")
            print(f"  滚动IC标准差: {ic_std_bear:.4f}")
            print(f"  滚动IC > 0 的比例: {(rolling_ic_bear > 0).mean():.2%}")
            print(f"  IC比率 (IC_IR): {ic_ir_bear:.4f}")
        
        results['bear'] = {
            'ic_pearson': ic_pearson_bear,
            'ic_spearman': ic_spearman_bear,
            'ic_p_value': ic_p_value_bear,
            'ic_ir': ic_ir_bear,
            'rolling_ic': rolling_ic_bear,
            'n_samples': n_bear,
            'sample_ratio': n_bear / min_len
        }
    else:
        print(f"样本数不足（{bear_mask.sum()} 个），跳过熊市IC测试")
        results['bear'] = None
    
    # 返回结果
    return results

print("✓ 已定义改进版本的 ic_test_by_market_regime_improved（包含完整数据验证）")

✓ 已定义改进版本的 ic_test_by_market_regime_improved（包含完整数据验证）


In [97]:
# ============================================================
# feature_creator
# ============================================================
def create_drop_features_classic(df: pd.DataFrame) -> pd.DataFrame:
    global log
    
    # 原始的特征列表 (保持不变)
    I_cols = ['M4_roll_mean_5', 'M4', 'V13', 'M4_lag_1', 'S12', 'forward_returns_lag_roll_mean_5', 'S5_lag_1', 
              'S5_roll_mean_20', 'forward_returns_lag', 'I2', 'S2', 'P7', 'P5', 'M17', 'S2_lag_1',  'M2', 'S5', 
              'V7', 'S5_roll_mean_60', 'S2_roll_std_60', 'M17_lag_1', 'V12', 'M17_lag_5', 'M11', 'forward_returns_lag_lag_1', 
              'M8', 'M17_lag_20', 'S2_roll_mean_5', 'S2_roll_mean_20', 'E19_roll_std_5', 'E11', 'P10', 'V8', 'forward_returns_lag_roll_mean_60', 
              'M17_roll_mean_20', 'M12', 'E11_roll_std_20', 'E11_lag_20', 'S5_roll_mean_5',"feat_I2_poshinge_x_M17low"]
    
    MP_cols = ["feat_M4_minus_P7", "feat_M17_minus_P5"]
    SV_cols = []
    
    df_out = df.copy()
    
    # 基础特征生成配置
    TOP_FEATURES= ['forward_returns_lag', 'M4','M17','S5','S2','E19','E11']
    LAG_PERIODS = [1, 5, 20]
    ROLLING_WINDOWS = [5, 20, 60]
    COLS_TO_DROP =  ['date_id', 'risk_free_rate_lag']
    epsilon = 1e-6 # 防止除以零
    
    # --- 1. 批量生成 Lag 和 Rolling 特征 ---
    features_dict = {}
    for col in TOP_FEATURES:
        if col in df_out.columns:
            for lag in LAG_PERIODS:
                features_dict[f'{col}_lag_{lag}'] = df_out[col].shift(lag)
            for window in ROLLING_WINDOWS:
                features_dict[f'{col}_roll_mean_{window}'] = df_out[col].rolling(window=window, min_periods=1).mean()
                features_dict[f'{col}_roll_std_{window}'] = df_out[col].rolling(window=window, min_periods=1).std()
                
    df_out = pd.concat([df_out, pd.DataFrame(features_dict)], axis=1)
    
    # --- 2. 原始交互特征计算 (保持不变) ---
    if 'I2' in df_out.columns and 'M17' in df_out.columns:
        m17_thr = df_out["M17"].median()
        df_out["feat_I2_poshinge_x_M17low"] = np.maximum(df_out["I2"], 0.0) * (df_out["M17"] < m17_thr).astype(float)

    if 'S12' in df_out.columns and 'V7' in df_out.columns:
        df_out["feat_S12_minu_V7"] = df_out['S12']  - df_out['V7'] 
    
    if 'M4' in df_out.columns and 'P7' in df_out.columns:
        df_out["feat_M4_minus_P7"] = df_out["M4"] - df_out["P7"]
        
    if 'M17' in df_out.columns and 'P5' in df_out.columns:
        df_out["feat_M17_minus_P5"] = df_out["M17"] - df_out["P5"]
        
    if 'M4' in df_out.columns and 'E19' in df_out.columns:
        df_out['feat_M4_x_E19'] = df_out['M4'] * df_out['E19']

    if 'P7' in df_out.columns and 'V7' in df_out.columns:
        df_out['feat_P7_x_V7'] = df_out['P7'] * df_out['V7']

    if 'S5' in df_out.columns and 'E11' in df_out.columns:
        df_out['feat_S5_x_E11'] = df_out['S5'] * df_out['E11']
        
    if 'feat_M4_minus_P7' in df_out.columns and 'E19' in df_out.columns:
        df_out['feat_M4_P7_x_E19'] = df_out['feat_M4_minus_P7'] * df_out['E19']

    # ==============================================================================
    # 【新增核心逻辑】 熊市/环境感知特征 (Regime Awareness)
    # ==============================================================================
    
    # 【特征A】宏观趋势分 (Red/Green Light)
    # 假设 S5 代表某种价格/指数 (如果不准确，请换成你的 Close 价格特征)
    # 逻辑：当前价格减去60日均线。 >0 为牛市/震荡，<0 为熊市
    if 'S5' in df_out.columns and 'S5_roll_mean_60' in df_out.columns:
        df_out['feat_Macro_Trend'] = df_out['S5'] - df_out['S5_roll_mean_60']
    else:
        # 如果没有 S5，尝试用 M4 (假设是动量特征替代)
        if 'M4' in df_out.columns:
             df_out['feat_Macro_Trend'] = df_out['M4'] 
        else:
             df_out['feat_Macro_Trend'] = 0 # 兜底
    
    # 【特征B】高波动崩盘风险 (Volatility Regime)
    # 逻辑：当波动率 (S2_roll_std_60) 很高时，往往是熊市底部或大跌过程
    if 'S2_roll_std_60' in df_out.columns:
        df_out['feat_Vol_Regime'] = df_out['S2_roll_std_60']
    
    # 【特征C】动量 x 趋势 交互项 (Momentum Interaction)
    # 目的：告诉模型，同样的 Lag Return，在 Trend>0 时是好事，在 Trend<0 时可能是补跌信号
    # 熊市中 IC 为负的罪魁祸首通常是 forward_returns_lag，这里给它加上环境约束
    if 'forward_returns_lag' in df_out.columns and 'feat_Macro_Trend' in df_out.columns:
        df_out['feat_Mom_x_Trend'] = df_out['forward_returns_lag'] * df_out['feat_Macro_Trend']

    # 定义要加入输出列表的新特征
    NEW_REGIME_COLS = ['feat_Macro_Trend', 'feat_Vol_Regime', 'feat_Mom_x_Trend']
    
    # ==============================================================================
    
    # 删除废弃列
    df_out.drop(columns = [col for col in COLS_TO_DROP if col in df_out.columns], inplace = True)
    
    if (log):
        pprint(f"Feature X (Top 10): {df_out.columns.tolist()[:10]} ... and {len(df_out.columns)-10} more")
        print(f"Feature Creator Output Shape: {df_out.shape}")
        # 打印一下新特征是否成功加入
        print(f"New Regime Features Included: {[c for c in NEW_REGIME_COLS if c in df_out.columns]}")
        log = False
        
    return df_out

f_creator_classic = FunctionTransformer(create_drop_features_classic, validate=False)

In [99]:
from statsmodels.tsa.stattools import adfuller
import pandas as pd
import numpy as np

def identify_cols_to_detrend(df, p_value_threshold=0.05):
    """
    自动识别哪些列是不平稳的（需要 Detrend），哪些是平稳的。
    """
    non_stationary_cols = []  # 需要处理的列 (d=0.4)
    stationary_cols = []      # 不需要处理的列
    
    # 排除非特征列
    ignore_cols = ['date_id', 'forward_returns', 'market_forward_excess_returns', 
                   'risk_free_rate', 'prediction', 'positions']
    
    features = [c for c in df.columns if c not in ignore_cols]
    
    print(f"正在对 {len(features)} 个特征进行平稳性检验 (ADF Test)...")
    
    for col in features:
        # 获取该列数据，去除 NaN 以便计算
        series = df[col].dropna()
        
        # 如果数据太少，直接跳过
        if len(series) < 50:
            continue
            
        # 为了速度，可以只取前 2000 行或者随机采样，但全量跑最准确
        # 这里为了演示，我们取前 3000 行来判断属性
        sample = series.iloc[:3000].values
        
        try:
            # ADF 检验
            result = adfuller(sample)
            p_value = result[1]
            
            if p_value > p_value_threshold:
                # p值大 -> 不拒绝原假设 -> 不平稳 -> 需要 Detrend
                non_stationary_cols.append(col)
            else:
                # p值小 -> 拒绝原假设 -> 平稳 -> 保持原样
                stationary_cols.append(col)
                
        except Exception as e:
            print(f"无法计算 {col}: {e}")
            
    return non_stationary_cols, stationary_cols

# ================= 使用方法 =================
# 假设 train_df 是你的原始数据
cols_to_fix, cols_ok = identify_cols_to_detrend(X_full)

print(f"\n需要做 d=0.4 的特征 ({len(cols_to_fix)}个):")
print(cols_to_fix[:20], "..." if len(cols_to_fix)>20 else "")

print(f"\n本身已平稳的特征 ({len(cols_ok)}个):")
print(cols_ok[:20], "..." if len(cols_ok)>20 else "")

正在对 96 个特征进行平稳性检验 (ADF Test)...
无法计算 M16: Invalid input, x is constant
无法计算 V11: Invalid input, x is constant
无法计算 V12: Invalid input, x is constant
无法计算 V8: Invalid input, x is constant

需要做 d=0.4 的特征 (45个):
['E1', 'E10', 'E11', 'E12', 'E15', 'E17', 'E18', 'E2', 'E20', 'E3', 'E5', 'E6', 'E8', 'E9', 'I1', 'I2', 'I3', 'I4', 'I5', 'I6'] ...

本身已平稳的特征 (47个):
['D1', 'D2', 'D3', 'D4', 'D5', 'D6', 'D7', 'D8', 'D9', 'E13', 'E14', 'E16', 'E19', 'E4', 'E7', 'I7', 'M1', 'M15', 'M2', 'M3'] ...


In [None]:
# ============================================================
# Delta 清洗：分数差分（Fractional Differencing）
# 用于处理非平稳时间序列，同时保留长期记忆
# ============================================================

def get_weights_ffd(d: float, threshold: float = 1e-5, max_size: int = 500) -> np.ndarray:
    """
    计算固定窗口分数差分（FFD）的权重
    
    参数:
    d: 差分阶数，0 < d < 1
    threshold: 权重截断阈值
    max_size: 最大权重数量
    
    返回:
    weights: 权重数组
    """
    weights = [1.0]
    k = 1
    while True:
        w = -weights[-1] * (d - k + 1) / k
        if abs(w) < threshold or k >= max_size:
            break
        weights.append(w)
        k += 1
    return np.array(weights[::-1]).reshape(-1, 1)


def frac_diff_ffd(series: pd.Series, d: float = 0.4, threshold: float = 1e-5) -> pd.Series:
    """
    固定窗口分数差分（Fixed-Width Window Fracdiff, FFD）
    
    参数:
    series: 输入时间序列
    d: 差分阶数，推荐 0.3-0.5 之间
    threshold: 权重截断阈值
    
    返回:
    差分后的序列
    """
    weights = get_weights_ffd(d, threshold)
    width = len(weights)
    
    # 需要足够的历史数据
    if len(series) < width:
        return pd.Series(np.nan, index=series.index)
    
    result = np.full(len(series), np.nan)
    
    for i in range(width - 1, len(series)):
        window = series.iloc[i - width + 1:i + 1].values
        if not np.any(np.isnan(window)):
            result[i] = np.dot(weights.T, window)[0, 0]
    
    return pd.Series(result, index=series.index)


def delta_clean(df: pd.DataFrame, d: float = 0.4, verbose: bool = True) -> pd.DataFrame:
    """
    Delta 清洗函数：对非平稳特征进行分数差分处理
    
    处理的特征类别：
    - P (Price/Valuation): PE, PB, CapeRatio 等估值指标 - 必须处理
    - E (Macro Economic): GDP, M2, CPI 等宏观经济数据 - 必须处理
    - I (Interest Rate): 利率相关特征 - 建议处理
    
    参数:
    df: 输入 DataFrame
    d: 分数差分阶数，默认 0.4（保留约 60% 的记忆）
    verbose: 是否打印详细信息
    
    返回:
    清洗后的 DataFrame
    """
    df_out = df.copy()
    
    # 定义需要处理的特征前缀
    MUST_CLEAN_PREFIXES = ['P', 'E']  # Price/Valuation, Macro Economic - 必须处理
    SHOULD_CLEAN_PREFIXES = ['I']     # Interest Rate - 建议处理
    
    # 识别需要清洗的列
    must_clean_cols = []
    should_clean_cols = []
    
    for col in df_out.columns:
        # 检查列名是否以特定前缀开头（后面跟数字）
        for prefix in MUST_CLEAN_PREFIXES:
            if col.startswith(prefix) and len(col) > 1 and col[1:].split('_')[0].isdigit():
                must_clean_cols.append(col)
                break
        for prefix in SHOULD_CLEAN_PREFIXES:
            if col.startswith(prefix) and len(col) > 1 and col[1:].split('_')[0].isdigit():
                should_clean_cols.append(col)
                break
    
    all_clean_cols = must_clean_cols + should_clean_cols
    
    if verbose:
        print("=" * 70)
        print(f"Delta 清洗（分数差分 d={d}）")
        print("=" * 70)
        print(f"必须处理的特征 (P/E): {len(must_clean_cols)} 个")
        print(f"建议处理的特征 (I): {len(should_clean_cols)} 个")
        print(f"总计: {len(all_clean_cols)} 个特征")
    
    cleaned_count = 0
    
    for col in all_clean_cols:
        if col in df_out.columns:
            original = df_out[col].copy()
            cleaned = frac_diff_ffd(original, d=d)
            
            # 替换原列
            df_out[col] = cleaned
            cleaned_count += 1
    
    if verbose:
        print(f"\n已清洗特征数: {cleaned_count}")
        # 显示处理的列
        if must_clean_cols:
            print(f"\nP/E 类特征 (已处理): {must_clean_cols[:10]}{'...' if len(must_clean_cols) > 10 else ''}")
        if should_clean_cols:
            print(f"I 类特征 (已处理): {should_clean_cols[:10]}{'...' if len(should_clean_cols) > 10 else ''}")
        print("=" * 70)
    
    return df_out


print("✓ 已定义 delta_clean 函数（分数差分清洗）")
print("  - get_weights_ffd: 计算 FFD 权重")
print("  - frac_diff_ffd: 固定窗口分数差分")
print("  - delta_clean: 主清洗函数，处理 P/E/I 类特征")

In [None]:
# 全局变量用于控制日志
log = True

def create_drop_features(df: pd.DataFrame) -> pd.DataFrame:
    """
    特征工程函数：创建和选择特征
    """
    global log
    
    
    df_out = df.copy()
    
    # 基础特征生成配置 - 扩展版（目标：生成 250-350 个特征）
    # 从 7 个扩展到 28 个基础特征，覆盖所有类别
    TOP_FEATURES = [
        # 动量类 (Momentum) - 5 个
        'M4', 'M17', 'M1', 'M2', 'M3',
        
        # 情绪类 (Sentiment) - 5 个
        'S2', 'S5', 'S1', 'S3', 'S12',
        
        # 经济类 (Economic) - 5 个
        'E11', 'E19', 'E1', 'E2', 'E3',
        
        # 价格类 (Price) - 4 个（新增，熊市重要）
        'P1', 'P5', 'P7', 'P9',
        
        # 波动率类 (Volatility) - 4 个（新增，熊市重要）
        'V1', 'V3', 'V5', 'V7',
        
        # 流动性类 (Liquidity) - 3 个（新增，熊市重要）
        'I1', 'I2', 'I3',
        
        # 方向类 (Direction) - 2 个（新增）
        'D1', 'D2',
        
        # 滞后收益（重要）
        'forward_returns_lag'
    ]
    
    # 优化 Lag 和 Rolling 窗口（减少冗余，提高效率）
    LAG_PERIODS = [1, 3, 5, 10, 20]  # 5 个 lag（原来 3 个）
    ROLLING_WINDOWS = [5, 10, 20, 60]  # 4 个窗口（原来 3 个）
    
    COLS_TO_DROP = ['date_id', 'risk_free_rate_lag']
    epsilon = 1e-6
    
    # 批量生成 Lag 和 Rolling 特征
    features_dict = {}
    for col in TOP_FEATURES:
        if col in df_out.columns:
            # Lag 特征
            for lag in LAG_PERIODS:
                features_dict[f'{col}_lag_{lag}'] = df_out[col].shift(lag)
            
            # Rolling 统计特征
            for window in ROLLING_WINDOWS:
                features_dict[f'{col}_roll_mean_{window}'] = df_out[col].rolling(window=window, min_periods=1).mean()
                features_dict[f'{col}_roll_std_{window}'] = df_out[col].rolling(window=window, min_periods=1).std()
            
            # 对于重要特征，添加更多滚动统计（仅对部分关键特征）
            if col in ['forward_returns_lag', 'M4', 'S5', 'V7', 'I2']:
                for window in [20, 60]:
                    features_dict[f'{col}_roll_max_{window}'] = df_out[col].rolling(window=window, min_periods=1).max()
                    features_dict[f'{col}_roll_min_{window}'] = df_out[col].rolling(window=window, min_periods=1).min()
    
    # 批量生成交互特征（类别内和类别间）
    # 类别内交互：同一类别特征之间的交互
    interaction_features = []
    
    # M (动量) 类别内交互
    m_features = [col for col in ['M1', 'M2', 'M3', 'M4', 'M17'] if col in df_out.columns]
    if len(m_features) >= 2:
        for i, feat1 in enumerate(m_features[:3]):  # 只选择前3个进行交互
            for feat2 in m_features[i+1:min(i+3, len(m_features))]:
                if feat1 in df_out.columns and feat2 in df_out.columns:
                    features_dict[f'feat_{feat1}_x_{feat2}'] = df_out[feat1] * df_out[feat2]
                    features_dict[f'feat_{feat1}_div_{feat2}'] = df_out[feat1] / (df_out[feat2] + epsilon)
                    interaction_features.extend([f'feat_{feat1}_x_{feat2}', f'feat_{feat1}_div_{feat2}'])
    
    # S (情绪) 类别内交互
    s_features = [col for col in ['S1', 'S2', 'S3', 'S5', 'S12'] if col in df_out.columns]
    if len(s_features) >= 2:
        for i, feat1 in enumerate(s_features[:3]):
            for feat2 in s_features[i+1:min(i+3, len(s_features))]:
                if feat1 in df_out.columns and feat2 in df_out.columns:
                    features_dict[f'feat_{feat1}_x_{feat2}'] = df_out[feat1] * df_out[feat2]
                    interaction_features.append(f'feat_{feat1}_x_{feat2}')
    
    # E (经济) 类别内交互
    e_features = [col for col in ['E1', 'E2', 'E3', 'E11', 'E19'] if col in df_out.columns]
    if len(e_features) >= 2:
        for i, feat1 in enumerate(e_features[:3]):
            for feat2 in e_features[i+1:min(i+3, len(e_features))]:
                if feat1 in df_out.columns and feat2 in df_out.columns:
                    features_dict[f'feat_{feat1}_x_{feat2}'] = df_out[feat1] * df_out[feat2]
                    interaction_features.append(f'feat_{feat1}_x_{feat2}')
    
    # 类别间交互（特别是熊市相关）
    # V (波动率) × I (流动性) - 熊市重要
    v_features = [col for col in ['V1', 'V3', 'V5', 'V7'] if col in df_out.columns]
    i_features = [col for col in ['I1', 'I2', 'I3'] if col in df_out.columns]
    if len(v_features) > 0 and len(i_features) > 0:
        v_feat = v_features[0]
        i_feat = i_features[0]
        if v_feat in df_out.columns and i_feat in df_out.columns:
            features_dict['feat_V_x_I'] = df_out[v_feat] * df_out[i_feat]
            features_dict['feat_V_div_I'] = df_out[v_feat] / (df_out[i_feat] + epsilon)
            interaction_features.extend(['feat_V_x_I', 'feat_V_div_I'])
    
    # M (动量) × V (波动率) - 熊市动量失效
    if len(m_features) > 0 and len(v_features) > 0:
        m_feat = m_features[0]
        v_feat = v_features[0]
        if m_feat in df_out.columns and v_feat in df_out.columns:
            features_dict['feat_M_div_V'] = df_out[m_feat] / (df_out[v_feat] + epsilon)
            features_dict['feat_M_x_V'] = df_out[m_feat] * df_out[v_feat]
            interaction_features.extend(['feat_M_div_V', 'feat_M_x_V'])
    
    # S (情绪) × V (波动率) - 恐慌情绪
    if len(s_features) > 0 and len(v_features) > 0:
        s_feat = s_features[0]
        v_feat = v_features[0]
        if s_feat in df_out.columns and v_feat in df_out.columns:
            features_dict['feat_S_x_V'] = df_out[s_feat] * df_out[v_feat]
            interaction_features.append('feat_S_x_V')
    
    # P (价格) × V (波动率) - 价格波动
    p_features = [col for col in ['P1', 'P5', 'P7', 'P9'] if col in df_out.columns]
    if len(p_features) > 0 and len(v_features) > 0:
        p_feat = p_features[0]
        v_feat = v_features[0]
        if p_feat in df_out.columns and v_feat in df_out.columns:
            features_dict['feat_P_x_V'] = df_out[p_feat] * df_out[v_feat]
            interaction_features.append('feat_P_x_V')
    
    # 保留原有的重要交互特征
    if 'I2' in df_out.columns and 'M17' in df_out.columns:
        m17_thr = df_out["M17"].median()
        features_dict["feat_I2_poshinge_x_M17low"] = np.maximum(df_out["I2"], 0.0) * (df_out["M17"] < m17_thr).astype(float)
        interaction_features.append("feat_I2_poshinge_x_M17low")
    
    if 'S12' in df_out.columns and 'V7' in df_out.columns:
        features_dict["feat_S12_minus_V7"] = df_out['S12'] - df_out['V7']
        interaction_features.append("feat_S12_minus_V7")
    
    if 'M4' in df_out.columns and 'P7' in df_out.columns:
        features_dict["feat_M4_minus_P7"] = df_out["M4"] - df_out["P7"]
        interaction_features.append("feat_M4_minus_P7")
    
    if 'M4' in df_out.columns and 'E19' in df_out.columns:
        features_dict['feat_M4_x_E19'] = df_out['M4'] * df_out['E19']
        interaction_features.append('feat_M4_x_E19')
    
    if 'P7' in df_out.columns and 'V7' in df_out.columns:
        features_dict['feat_P7_x_V7'] = df_out['P7'] * df_out['V7']
        interaction_features.append('feat_P7_x_V7')
    
    if 'S5' in df_out.columns and 'E11' in df_out.columns:
        features_dict['feat_S5_x_E11'] = df_out['S5'] * df_out['E11']
        interaction_features.append('feat_S5_x_E11')

    # 环境感知特征 (Regime Awareness) - 增强版
    regime_features = []
    
    # 1. 宏观趋势特征
    if 'S5' in df_out.columns:
        if 'S5_roll_mean_60' in features_dict:
            features_dict['feat_Macro_Trend'] = df_out['S5'] - features_dict['S5_roll_mean_60']
        else:
            # 如果还没有生成，临时计算
            s5_roll_mean = df_out['S5'].rolling(60, min_periods=1).mean()
            features_dict['feat_Macro_Trend'] = df_out['S5'] - s5_roll_mean
    elif 'M4' in df_out.columns:
        if 'M4_roll_mean_60' in features_dict:
            features_dict['feat_Macro_Trend'] = df_out['M4'] - features_dict['M4_roll_mean_60']
        else:
            features_dict['feat_Macro_Trend'] = df_out['M4']
    else:
        features_dict['feat_Macro_Trend'] = 0
    regime_features.append('feat_Macro_Trend')
    
    # 2. 波动率状态特征
    if 'S2_roll_std_60' in features_dict:
        features_dict['feat_Vol_Regime'] = features_dict['S2_roll_std_60']
    elif 'S2' in df_out.columns:
        features_dict['feat_Vol_Regime'] = df_out['S2'].rolling(60, min_periods=1).std()
    elif 'V7' in df_out.columns:
        features_dict['feat_Vol_Regime'] = df_out['V7']
    else:
        features_dict['feat_Vol_Regime'] = 0
    regime_features.append('feat_Vol_Regime')
    
    # 3. 动量 × 趋势
    if 'forward_returns_lag' in df_out.columns:
        if 'feat_Macro_Trend' in features_dict:
            features_dict['feat_Mom_x_Trend'] = df_out['forward_returns_lag'] * features_dict['feat_Macro_Trend']
        else:
            features_dict['feat_Mom_x_Trend'] = df_out['forward_returns_lag']
        regime_features.append('feat_Mom_x_Trend')
    
    # 4. 熊市指标：连续下跌天数（新增）
    if 'forward_returns_lag' in df_out.columns:
        returns_neg = (df_out['forward_returns_lag'] < 0).astype(int)
        # 计算连续下跌天数
        bear_days = returns_neg.groupby((returns_neg != returns_neg.shift()).cumsum()).cumsum()
        features_dict['feat_Bear_Days'] = bear_days
        regime_features.append('feat_Bear_Days')
    
    # 5. 波动率突破指标（新增，熊市重要）
    if 'S2' in df_out.columns:
        vol_current = df_out['S2'].rolling(5, min_periods=1).std()
        vol_historical = df_out['S2'].rolling(60, min_periods=1).std()
        features_dict['feat_Vol_Spike'] = vol_current / (vol_historical + epsilon)
        regime_features.append('feat_Vol_Spike')
    elif 'V7' in df_out.columns:
        vol_current = df_out['V7'].rolling(5, min_periods=1).mean()
        vol_historical = df_out['V7'].rolling(60, min_periods=1).mean()
        features_dict['feat_Vol_Spike'] = vol_current / (vol_historical + epsilon)
        regime_features.append('feat_Vol_Spike')
    
    # 6. 相对强度特征（新增）
    if 'M4' in df_out.columns:
        # 当前值 vs 历史分位数
        def calc_percentile(series):
            if len(series) < 60:
                return np.nan
            return (series.iloc[-1] > series.iloc[:-1]).sum() / len(series.iloc[:-1])
        features_dict['feat_M4_percentile_60'] = df_out['M4'].rolling(60, min_periods=60).apply(calc_percentile, raw=False)
        regime_features.append('feat_M4_percentile_60')
    
    # 7. RSI 类特征（新增，熊市超卖指标）
    if 'forward_returns_lag' in df_out.columns:
        returns = df_out['forward_returns_lag']
        gains = returns.where(returns > 0, 0)
        losses = -returns.where(returns < 0, 0)
        avg_gain = gains.rolling(14, min_periods=1).mean()
        avg_loss = losses.rolling(14, min_periods=1).mean()
        rs = avg_gain / (avg_loss + epsilon)
        features_dict['feat_RSI'] = 100 - (100 / (1 + rs))
        regime_features.append('feat_RSI')
    
    NEW_REGIME_COLS = regime_features
    
    # 确保新特征的索引与原始 DataFrame 对齐（合并所有特征）
    if features_dict:
        features_df = pd.DataFrame(features_dict, index=df_out.index)
        df_out = pd.concat([df_out, features_df], axis=1)
    
    # 删除废弃列
    df_out.drop(columns = [col for col in COLS_TO_DROP if col in df_out.columns], inplace = True)
    
    # 统计信息（用于日志）- 在所有特征生成完成后计算
    n_base_features = len([f for f in TOP_FEATURES if f in df_out.columns])
    # 衍生特征：Lag 和 Rolling 特征
    derived_patterns = [f'{f}_lag_' for f in TOP_FEATURES] + [f'{f}_roll_' for f in TOP_FEATURES]
    n_derived_features = len([col for col in df_out.columns if any(col.startswith(p) for p in derived_patterns)])
    # 交互特征：以 feat_ 开头但不是环境感知特征
    n_interaction_features = len([col for col in df_out.columns if col.startswith('feat_') and col not in NEW_REGIME_COLS])
    # 环境感知特征
    n_regime_features = len([col for col in NEW_REGIME_COLS if col in df_out.columns])
    
    if (log):
        total_features = len(df_out.columns)
        print(f"\n{'='*70}")
        print("特征工程统计")
        print(f"{'='*70}")
        print(f"基础特征数: {n_base_features} (从 {len(TOP_FEATURES)} 个候选特征中选择)")
        print(f"衍生特征数: {n_derived_features} (Lag + Rolling 统计)")
        print(f"交互特征数: {n_interaction_features} (类别内和类别间交互)")
        print(f"环境感知特征数: {n_regime_features} (市场状态指标)")
        print(f"总特征数: {total_features}")
        print(f"特征类别覆盖: M(动量), S(情绪), E(经济), P(价格), V(波动率), I(流动性), D(方向)")
        print(f"Feature X (Top 10): {df_out.columns.tolist()[:10]} ... and {total_features-10} more")
        print(f"Feature Creator Output Shape: {df_out.shape}")
        if NEW_REGIME_COLS:
            print(f"环境感知特征: {[c for c in NEW_REGIME_COLS if c in df_out.columns]}")
        print(f"{'='*70}\n")
        log = False
        
    return df_out

In [98]:
# ============================================================
# 主执行代码 - 使用并行模型（带特征选择）
# ============================================================

# 重置日志标志
log = True

# 准备数据
print("="*70)
print("准备数据...")
print("="*70)

X_full = X.copy()
y_full = y.copy()

# Delta 清洗：对 P/E/I 类特征进行分数差分处理
X_full = delta_clean(X_full, d=0.4, verbose=True)

print(f"\nX shape: {X_full.shape}")
print(f"y shape: {y_full.shape}")

# 准备市场数据
market_excess = y_full
rf = X_full['risk_free_rate_lag'].fillna(0) if 'risk_free_rate_lag' in X_full.columns else pd.Series(0, index=X_full.index)

# ============================================================
# 配置特征选择器参数（两个模型共用配置，但各自独立选择）
# ============================================================

parallel_feature_selector_params = {
    'n_clusters': None,
    'min_cluster_size': 3,
    'max_clusters': 15,
    'loss_threshold': None,
    'top_k_per_group': 0.65,
    'clustering_method': 'kmeans',
    'n_repeats': 3,
    'random_state': 42,
    'verbose': 1
}

print("\n特征选择器参数:")
for k, v in parallel_feature_selector_params.items():
    print(f"  {k}: {v}")

# ============================================================
# 配置并行模型
# ============================================================

print("\n" + "="*70)
print("配置并行模型")
print("="*70)

# 机会模型配置（Regressor）
opportunity_model = CatBoostRegressor(
    iterations=800,
    learning_rate=0.004,
    depth=6,
    min_data_in_leaf=20,
    l2_leaf_reg=7.0,
    random_strength=5.5,
    colsample_bylevel=0.78,
    early_stopping_rounds=50,
    bootstrap_type='Bernoulli',
    subsample=0.85,
    loss_function='RMSE',
    verbose=0,
    random_seed=42
)

# 方向模型配置（Classifier）
direction_model = CatBoostClassifier(
    iterations=800,
    learning_rate=0.004,
    depth=6,
    min_data_in_leaf=20,
    l2_leaf_reg=7.0,
    random_strength=5.5,
    colsample_bylevel=0.78,
    early_stopping_rounds=50,
    bootstrap_type='Bernoulli',
    subsample=0.85,
    loss_function='Logloss',
    verbose=0,
    random_seed=42
)

print("机会模型（Regressor）: RMSE (MSE)")
print("方向模型（Classifier）: Logloss (Cross Entropy)")

# ============================================================
# 创建并行集成 Pipeline
# ============================================================

pipeline = ParallelEnsemblePipeline(
    opportunity_model=opportunity_model,
    direction_model=direction_model,
    feature_creator=create_drop_features,
    feature_selector_params=parallel_feature_selector_params,
    verbose=1
)

print("\n✓ 并行集成 Pipeline 创建完成")

# ============================================================
# 运行两段滑动窗口测试
# ============================================================

print("\n" + "="*70)
print("开始两段滑动窗口测试")
print("="*70)

# 测试参数
TRAIN_SIZE = 5000
TEST_SIZE = 2000
START_INDEX = 13
EXPANDING_WINDOW = False
ALLOW_SHORT = True

print(f"训练窗口: {TRAIN_SIZE}")
print(f"测试窗口: {TEST_SIZE}")
print(f"窗口模式: {'扩展窗口' if EXPANDING_WINDOW else '滑动窗口'}")
print(f"允许做空: {ALLOW_SHORT}")

# 运行测试
test_result = move_forward_two_way_test(
    X_full, y_full,
    pipeline=pipeline,
    train_size=TRAIN_SIZE,
    test_size=TEST_SIZE,
    start_index=START_INDEX,
    expanding_window=EXPANDING_WINDOW,
    market_excess=market_excess,
    rf=rf,
    allow_short=ALLOW_SHORT,
    verbose=1
)

# 提取结果
oof_predictions = test_result['predictions']
oof_direction = test_result['direction']
oof_opportunity = test_result['opportunity']
oof_direction_proba = test_result['direction_proba']
round_stats = test_result['round_stats']

# ============================================================
# 结果分析
# ============================================================

print("\n" + "="*70)
print("结果分析")
print("="*70)

valid_predictions = oof_predictions.dropna()
print(f"有效预测数: {len(valid_predictions)}")

if len(valid_predictions) > 0:
    results_df = train_df.loc[valid_predictions.index].copy()
    results_df['prediction'] = valid_predictions
    results_df['positions'] = valid_predictions
    
    y_test_all = y_full.loc[valid_predictions.index]
    ic = np.corrcoef(valid_predictions.values, y_test_all.values)[0, 1]
    print(f"整体 IC: {ic:.4f}")
    
    me_test = market_excess.loc[valid_predictions.index]
    rf_test = rf.loc[valid_predictions.index]
    sharpe = ad_sharpe_ratio_scorer(valid_predictions.values, me_test, rf_test)
    print(f"整体夏普率: {sharpe:.4f}")
    
    print(f"\n仓位统计:")
    print(f"  min: {valid_predictions.min():.4f}")
    print(f"  max: {valid_predictions.max():.4f}")
    print(f"  mean: {valid_predictions.mean():.4f}")
    
    # ============================================================
    # 方向模型评估 - 准确率
    # ============================================================
    print("\n" + "="*70)
    print("方向模型评估")
    print("="*70)
    
    valid_idx = valid_predictions.index
    valid_direction = oof_direction.loc[valid_idx].dropna()
    valid_direction_proba = oof_direction_proba.loc[valid_idx].dropna()
    
    # 真实方向：y > 0 为多，y <= 0 为空
    true_direction = (y_full.loc[valid_direction.index] > 0).astype(int) * 2 - 1  # 转换为 ±1
    pred_direction = valid_direction.values
    
    # 准确率和 F1 分数计算
    from sklearn.metrics import accuracy_score, f1_score, precision_score, recall_score
    
    # 转换为 0/1 标签用于 sklearn 计算
    true_labels = (true_direction.values + 1) // 2  # -1→0, 1→1
    pred_labels = (pred_direction + 1) // 2  # -1→0, 1→1
    
    accuracy = accuracy_score(true_labels, pred_labels)
    f1 = f1_score(true_labels, pred_labels, average='binary')  # 以多头(1)为正类
    f1_macro = f1_score(true_labels, pred_labels, average='macro')  # 宏平均
    precision = precision_score(true_labels, pred_labels, average='binary')
    recall = recall_score(true_labels, pred_labels, average='binary')
    
    # 多空分别准确率
    long_mask = true_direction.values == 1
    short_mask = true_direction.values == -1
    
    long_accuracy = (pred_direction[long_mask] == 1).sum() / long_mask.sum() if long_mask.sum() > 0 else 0
    short_accuracy = (pred_direction[short_mask] == -1).sum() / short_mask.sum() if short_mask.sum() > 0 else 0
    
    print(f"总体方向准确率: {accuracy:.4f}")
    print(f"F1 分数: {f1:.4f} (多头为正类)")
    print(f"F1 分数 (宏平均): {f1_macro:.4f}")
    print(f"Precision: {precision:.4f}, Recall: {recall:.4f}")
    print(f"  做多准确率: {long_accuracy:.4f} (真实多时预测多)")
    print(f"  做空准确率: {short_accuracy:.4f} (真实空时预测空)")
    print(f"  真实多头样本数: {long_mask.sum()}")
    print(f"  真实空头样本数: {short_mask.sum()}")
    
    # 平均预测概率
    avg_proba_when_correct = valid_direction_proba.values[(pred_direction == true_direction.values)].mean()
    avg_proba_when_wrong = valid_direction_proba.values[(pred_direction != true_direction.values)].mean()
    print(f"\n预测概率分析:")
    print(f"  预测正确时平均概率: {avg_proba_when_correct:.4f}")
    print(f"  预测错误时平均概率: {avg_proba_when_wrong:.4f}")
    
    # ============================================================
    # 机会模型评估 - R²
    # ============================================================
    print("\n" + "="*70)
    print("机会模型评估")
    print("="*70)
    
    valid_opportunity = oof_opportunity.loc[valid_idx].dropna()
    
    # 真实机会：|generate_hft_positions(...)|
    from sklearn.metrics import r2_score, mean_squared_error, mean_absolute_error
    
    # 需要计算真实的机会目标（使用与训练时相同的参数）
    y_test_opp = y_full.loc[valid_opportunity.index]
    forward_returns_lag_test = X_full.loc[valid_opportunity.index, 'forward_returns_lag'] if 'forward_returns_lag' in X_full.columns else pd.Series(0, index=valid_opportunity.index)
    
    # 使用与训练时相同的参数：span_N=60, sensitivity_k=1, allow_short=True
    true_opportunity = np.abs(generate_hft_positions(
        y_test_opp, forward_returns_lag_test, 
        span_N=60, sensitivity_k=1, allow_short=True
    ).values)
    pred_opportunity = np.abs(valid_opportunity.values)  # 确保是绝对值
    
    r2 = r2_score(true_opportunity, pred_opportunity)
    rmse = np.sqrt(mean_squared_error(true_opportunity, pred_opportunity))
    mae = mean_absolute_error(true_opportunity, pred_opportunity)
    corr = np.corrcoef(true_opportunity, pred_opportunity)[0, 1]
    
    print(f"R² 分数: {r2:.4f}")
    print(f"RMSE: {rmse:.4f}")
    print(f"MAE: {mae:.4f}")
    print(f"相关系数: {corr:.4f}")
    print(f"\n真实机会统计: min={true_opportunity.min():.4f}, max={true_opportunity.max():.4f}, mean={true_opportunity.mean():.4f}")
    print(f"预测机会统计: min={pred_opportunity.min():.4f}, max={pred_opportunity.max():.4f}, mean={pred_opportunity.mean():.4f}")
    
    # ============================================================
    # 按时间段分析
    # ============================================================
    print("\n" + "="*70)
    print("按时间段分析")
    print("="*70)
    
    for stat in round_stats:
        round_num = stat['round']
        test_indices = stat['test_indices']
        
        # 获取该轮的数据
        round_idx = [i for i in test_indices if i in valid_predictions.index]
        if len(round_idx) == 0:
            continue
            
        round_pred = valid_predictions.loc[round_idx]
        round_true = y_full.loc[round_idx]
        round_direction_pred = oof_direction.loc[round_idx]
        round_direction_true = (round_true > 0).astype(int) * 2 - 1
        round_opportunity_pred = oof_opportunity.loc[round_idx]
        
        frl_round = X_full.loc[round_idx, 'forward_returns_lag'] if 'forward_returns_lag' in X_full.columns else pd.Series(0, index=round_idx)
        round_opportunity_true = np.abs(generate_hft_positions(
            round_true, frl_round, span_N=60, sensitivity_k=1, allow_short=True
        ).values)
        
        # 计算指标
        round_ic = np.corrcoef(round_pred.values, round_true.values)[0, 1]
        round_dir_acc = (round_direction_pred.values == round_direction_true.values).mean()
        round_opp_r2 = r2_score(round_opportunity_true, round_opportunity_pred.values) if len(round_idx) > 1 else np.nan
        round_opp_corr = np.corrcoef(round_opportunity_true, round_opportunity_pred.values)[0, 1]
        
        me_round = market_excess.loc[round_idx]
        rf_round = rf.loc[round_idx]
        round_sharpe = ad_sharpe_ratio_scorer(round_pred.values, me_round, rf_round)
        
        print(f"\n第 {round_num} 轮 (样本数: {len(round_idx)}):")
        print(f"  IC: {round_ic:.4f}, 夏普率: {round_sharpe:.4f}")
        print(f"  方向准确率: {round_dir_acc:.4f}")
        print(f"  机会 R²: {round_opp_r2:.4f}, 机会相关系数: {round_opp_corr:.4f}")
    
    # ============================================================
    # 数据流诊断
    # ============================================================
    print("\n")
    diagnosis = diagnose_ic_data_flow(
        y_pred=valid_predictions.values,
        y_true=y_test_all.values,
        market_returns=me_test.values,
        name="并行模型预测"
    )
    
    # ============================================================
    # 市场状态分组 IC 测试
    # ============================================================
    print("\n")
    ic_results = ic_test_by_market_regime_improved(
        y_pred=valid_predictions.values,
        y_true=y_test_all.values,
        market_returns=me_test.values,
        threshold_bull=0.02,
        threshold_bear=-0.02,
        window_size=30
    )
    
    # ============================================================
    # 生成拟合结果图
    # ============================================================
    print("\n")
    plot_fitting_results(
        results_df=results_df,
        oof_predictions=oof_predictions,
        ic_results=ic_results,
        save_path='fitting_results.png'
    )

print("\n✓ 并行模型测试完成")

准备数据...
X shape: (9021, 97)
y shape: (9021,)

特征选择器参数:
  n_clusters: None
  min_cluster_size: 3
  max_clusters: 15
  loss_threshold: None
  top_k_per_group: 0.65
  clustering_method: kmeans
  n_repeats: 3
  random_state: 42
  verbose: 1

配置并行模型
机会模型（Regressor）: RMSE (MSE)
方向模型（Classifier）: Logloss (Cross Entropy)

✓ 并行集成 Pipeline 创建完成

开始两段滑动窗口测试
训练窗口: 5000
测试窗口: 2000
窗口模式: 滑动窗口
允许做空: True
两段滑动窗口测试 (Move Forward Two-Way Test)
总样本数: 9021, 训练窗口: 5000, 测试窗口: 2000
窗口模式: 滑动窗口
预计轮数: ~2

第 1 轮
  训练段: [13:5013) = 5000 样本
  测试段: [5013:7013) = 2000 样本

  [步骤1/2] 并行训练模型...
ParallelEnsemblePipeline - 开始拟合（并行双模型）
样本数: 5000

[模型1/2] 训练机会模型（Regressor，MSE）
OpportunityPipeline - 开始拟合（机会模型，MSE）
机会目标统计: min=0.0003, max=2.0000, mean=1.0660

[步骤1/3] 特征工程...

特征工程统计
基础特征数: 29 (从 29 个候选特征中选择)
衍生特征数: 397 (Lag + Rolling 统计)
交互特征数: 36 (类别内和类别间交互)
环境感知特征数: 7 (市场状态指标)
总特征数: 535
特征类别覆盖: M(动量), S(情绪), E(经济), P(价格), V(波动率), I(流动性), D(方向)
Feature X (Top 10): ['D1', 'D2', 'D3', 'D4', 'D5', 'D6', 'D7', 'D8', 'D9', 'E1'

  测试各组: 100%|██████████| 5/5 [00:00<00:00,  8.60it/s]


  组损失计算完成
  Top 5 重要组 (损失增加越大越重要):
    组 0 (395个因子): 损失增加 = 0.023031
    组 24 (90个因子): 损失增加 = 0.003580
    组 23 (3个因子): 损失增加 = 0.000042
    组 13 (11个因子): 损失增加 = -0.001278
    组 18 (14个因子): 损失增加 = -0.001385
  从重要组中选择因子（使用全局模型特征重要性）...
  筛选出 2 个重要组 (阈值: 0.000042)
    组 0: 395 个有效因子 -> 选择 256 个 (top_k=0.65)
    组 24: 90 个有效因子 -> 选择 58 个 (top_k=0.65)
  组内因子选择统计:
    重要组总因子数: 485
    选择后因子数: 314
    去重后因子数: 314
    最终选择 314 个因子 (top_k_per_group=0.65)
特征选择后特征数量: 314

[步骤3/3] 训练机会模型（MSE）...
OpportunityPipeline - 拟合完成

[模型2/2] 训练方向模型（Classifier，Logloss）
DirectionClassifierPipeline - 开始拟合（方向分类，Logloss）
方向标签分布: 多(1)=2538 (50.76%), 空(0)=2462 (49.24%)

[步骤1/3] 特征工程...
特征工程后特征数量: 535

[步骤2/3] 特征选择（使用 direction_loss_function）...
  对因子进行聚类...
  自动选择最优聚类数量...
  聚类完成: 5 个有效组 (总因子数: 535)
  使用损失函数计算因子组重要性（推理阶段置换，不重新训练）...


  测试各组: 100%|██████████| 5/5 [00:00<00:00,  7.91it/s]


  组损失计算完成
  Top 5 重要组 (损失增加越大越重要):
    组 0 (395个因子): 损失增加 = 3.625794
    组 24 (90个因子): 损失增加 = 1.379327
    组 18 (14个因子): 损失增加 = 0.446120
    组 13 (11个因子): 损失增加 = 0.171139
    组 23 (3个因子): 损失增加 = 0.079079
  从重要组中选择因子（使用全局模型特征重要性）...
  筛选出 2 个重要组 (阈值: 0.446120)
    组 0: 395 个有效因子 -> 选择 256 个 (top_k=0.65)
    组 24: 90 个有效因子 -> 选择 58 个 (top_k=0.65)
  组内因子选择统计:
    重要组总因子数: 485
    选择后因子数: 314
    去重后因子数: 314
    最终选择 314 个因子 (top_k_per_group=0.65)
特征选择后特征数量: 314

[步骤3/3] 训练方向分类模型（Logloss）...
DirectionClassifierPipeline - 拟合完成

ParallelEnsemblePipeline - 拟合完成

  [步骤2/2] 在测试段上预测...

  预测均值: 0.7174, 真实均值: 0.0002

第 2 轮
  训练段: [2013:7013) = 5000 样本
  测试段: [7013:9013) = 2000 样本

  [步骤1/2] 并行训练模型...
ParallelEnsemblePipeline - 开始拟合（并行双模型）
样本数: 5000

[模型1/2] 训练机会模型（Regressor，MSE）
OpportunityPipeline - 开始拟合（机会模型，MSE）
机会目标统计: min=0.0009, max=2.0000, mean=1.0508

[步骤1/3] 特征工程...
特征工程后特征数量: 535

[步骤2/3] 特征选择（使用 opportunity_loss_function）...
  对因子进行聚类...
  自动选择最优聚类数量...
  聚类完成: 4 个有效组 (总因子数: 535)
  使用损

  测试各组: 100%|██████████| 4/4 [00:00<00:00,  9.74it/s]


  组损失计算完成
  Top 5 重要组 (损失增加越大越重要):
    组 0 (323个因子): 损失增加 = 0.045179
    组 16 (12个因子): 损失增加 = -0.000379
    组 20 (130个因子): 损失增加 = -0.000530
    组 24 (47个因子): 损失增加 = -0.002420
  从重要组中选择因子（使用全局模型特征重要性）...
  筛选出 2 个重要组 (阈值: -0.000454)
    组 0: 323 个有效因子 -> 选择 209 个 (top_k=0.65)
    组 16: 12 个有效因子 -> 选择 7 个 (top_k=0.65)
  组内因子选择统计:
    重要组总因子数: 335
    选择后因子数: 216
    去重后因子数: 216
    最终选择 216 个因子 (top_k_per_group=0.65)
特征选择后特征数量: 216

[步骤3/3] 训练机会模型（MSE）...
OpportunityPipeline - 拟合完成

[模型2/2] 训练方向模型（Classifier，Logloss）
DirectionClassifierPipeline - 开始拟合（方向分类，Logloss）
方向标签分布: 多(1)=2586 (51.72%), 空(0)=2414 (48.28%)

[步骤1/3] 特征工程...
特征工程后特征数量: 535

[步骤2/3] 特征选择（使用 direction_loss_function）...
  对因子进行聚类...
  自动选择最优聚类数量...
  聚类完成: 4 个有效组 (总因子数: 535)
  使用损失函数计算因子组重要性（推理阶段置换，不重新训练）...


  测试各组: 100%|██████████| 4/4 [00:00<00:00,  8.08it/s]


  组损失计算完成
  Top 5 重要组 (损失增加越大越重要):
    组 0 (323个因子): 损失增加 = 3.209955
    组 20 (130个因子): 损失增加 = 1.644813
    组 24 (47个因子): 损失增加 = 0.617838
    组 16 (12个因子): 损失增加 = 0.152585
  从重要组中选择因子（使用全局模型特征重要性）...
  筛选出 2 个重要组 (阈值: 1.131326)
    组 0: 323 个有效因子 -> 选择 209 个 (top_k=0.65)
    组 20: 130 个有效因子 -> 选择 84 个 (top_k=0.65)
  组内因子选择统计:
    重要组总因子数: 453
    选择后因子数: 293
    去重后因子数: 293
    最终选择 293 个因子 (top_k_per_group=0.65)
特征选择后特征数量: 293

[步骤3/3] 训练方向分类模型（Logloss）...
DirectionClassifierPipeline - 拟合完成

ParallelEnsemblePipeline - 拟合完成

  [步骤2/2] 在测试段上预测...

  预测均值: 0.9890, 真实均值: 0.0002

两段滑动窗口测试完成！
总轮数: 2, 有效预测数: 4000
整体IC: 0.0081
整体夏普率: 0.3434

结果分析
有效预测数: 4000
整体 IC: 0.0081
整体夏普率: 0.3434

仓位统计:
  min: -1.1851
  max: 1.2643
  mean: 0.8532

方向模型评估
总体方向准确率: 0.5255
F1 分数: 0.6674 (多头为正类)
F1 分数 (宏平均): 0.4200
Precision: 0.5301, Recall: 0.9007
  做多准确率: 0.9007 (真实多时预测多)
  做空准确率: 0.1050 (真实空时预测空)
  真实多头样本数: 2114
  真实空头样本数: 1886

预测概率分析:
  预测正确时平均概率: 0.5047
  预测错误时平均概率: 0.5044

机会模型评估
R² 分数: 0.0513
RMSE: 