In [1]:
import sys
sys.path.append('/public/src')
from factor_evaluation_server import FactorEvaluation,DataService # type: ignore
import numpy as np
import pandas as pd
from sklearn.linear_model import LinearRegression

In [2]:
ds=DataService()
df=ds['ETHUSDT_15m_2020_2025']['2021-10-01':]

In [3]:
evaluator=FactorEvaluation(df=df,future_return_periods=10)

# 定义因子！

In [4]:
def factor(df, base_window=94, vol_sensitivity=0.3, ridge_alpha=1.0):
    """
    改进版因子计算函数
    
    主要改进点：
    1. 动态窗口机制 - 根据市场波动率调整窗口大小
    2. 连续重心计算 - 消除日切导致的跳变
    3. 正则化回归 - 增强数值稳定性
    4. 因子中性化 - 消除市场状态影响
    5. 异常值处理 - 提高因子鲁棒性
    """
    # 创建副本避免修改原始数据
    df = df.copy()
    
    # ==================================================================
    # 步骤1: 计算基础指标
    # ==================================================================
    df['range'] = (df['high'] - df['low']) / df['open']
    
    # 使用EWMA提高稳定性
    short_ewm = df['range'].ewm(span=5, adjust=False).std()
    long_ewm = df['range'].ewm(span=20, adjust=False).std()
    
    # 避免除零错误
    long_ewm = long_ewm.replace(0, 1e-5)
    df['fractal'] = short_ewm / long_ewm
    
    df['is_up'] = (df['close'] > df['open']).astype(float)
    df['is_down'] = (df['close'] < df['open']).astype(float)
    
    # ==================================================================
    # 步骤2: 连续重心计算 (消除日切影响)
    # ==================================================================
    def continuous_center(direction_col, fractal_col, volume_col, min_points=10):
        """
        连续重心计算函数
        
        参数:
            direction_col: 方向序列 (1表示符合方向)
            fractal_col: fractal值序列
            volume_col: 成交量序列
            min_points: 最小有效点数
            
        返回:
            重心序列
        """
        # 计算权重
        weights = direction_col * fractal_col * volume_col
        
        # 累计权重和
        cum_weights = weights.expanding(min_periods=min_points).sum()
        
        # 累计加权索引
        index_series = pd.Series(np.arange(len(weights)), index=weights.index)
        cum_weighted_idx = (index_series * weights).expanding(min_periods=min_points).sum()
        
        # 计算重心
        center = cum_weighted_idx / cum_weights
        
        # 前min_points个点设为NaN
        center.iloc[:min_points] = np.nan
        
        return center
    
    # 计算连续重心
    fractal_vol = df['fractal'] * df['volume']
    df['G_u'] = continuous_center(df['is_up'], df['fractal'], df['volume'])
    df['G_d'] = continuous_center(df['is_down'], df['fractal'], df['volume'])
    
    # ==================================================================
    # 步骤3: 动态窗口计算
    # ==================================================================
    def calculate_dynamic_window(volatility, base_window=94, sensitivity=0.3):
        """
        根据波动率动态调整窗口大小
        
        参数:
            volatility: 波动率序列
            base_window: 基础窗口大小
            sensitivity: 波动率敏感度
            
        返回:
            动态窗口大小序列
        """
        # 计算波动率比率 (当前波动率/长期平均波动率)
        long_term_vol = volatility.ewm(span=504, adjust=False).mean()  # 约1周数据
        vol_ratio = volatility / long_term_vol.replace(0, 1e-5)
        
        # 计算动态窗口
        dynamic_window = base_window * (1 + sensitivity * (vol_ratio - 1))
        
        # 应用边界限制
        dynamic_window = np.clip(dynamic_window, base_window*0.6, base_window*1.8)
        
        return dynamic_window.astype(int)
    
    # 计算波动率 (使用ATR方法)
    atr_period = 14
    high_low = df['high'] - df['low']
    high_close = np.abs(df['high'] - df['close'].shift())
    low_close = np.abs(df['low'] - df['close'].shift())
    true_range = np.maximum(high_low, np.maximum(high_close, low_close))
    volatility = true_range.rolling(atr_period).mean()
    
    # 计算动态窗口
    df['window_size'] = calculate_dynamic_window(
        volatility, base_window=base_window, sensitivity=vol_sensitivity
    )
    
    # ==================================================================
    # 步骤4: 稳健滚动回归
    # ==================================================================
    def robust_rolling_regression(X, y, window_sizes, alpha=1.0):
        """
        带正则化的滚动回归
        
        参数:
            X: 特征矩阵 (n_samples, n_features)
            y: 目标向量 (n_samples,)
            window_sizes: 每个点的窗口大小
            alpha: 正则化强度
            
        返回:
            预测值序列
        """
        predictions = np.full(len(X), np.nan)
        residuals = np.full(len(X), np.nan)
        
        # 创建模型
        model = Ridge(alpha=alpha, fit_intercept=False, random_state=42)
        
        for i in range(len(X)):
            if i < max(window_sizes) or np.isnan(X[i]).any() or np.isnan(y[i]):
                continue
                
            # 获取当前窗口大小
            w_size = window_sizes[i]
            start_idx = max(0, i - w_size)
            
            # 提取窗口数据
            X_window = X[start_idx:i]
            y_window = y[start_idx:i]
            
            # 过滤NaN值
            valid_mask = ~np.isnan(X_window).any(axis=1) & ~np.isnan(y_window)
            valid_count = valid_mask.sum()
            
            # 至少需要10个有效点
            if valid_count < max(10, w_size//5):
                continue
                
            # 拟合模型
            try:
                model.fit(X_window[valid_mask], y_window[valid_mask])
                pred = model.predict([X[i]])[0]
                predictions[i] = pred
                residuals[i] = y[i] - pred
            except Exception as e:
                continue
                
        return predictions, residuals
    
    # 准备回归数据
    X = df[['G_u', 'fractal']].values
    y = df['G_d'].values
    
    # 执行滚动回归
    _, residuals = robust_rolling_regression(
        X, y, df['window_size'].values, alpha=ridge_alpha
    )
    
    # ==================================================================
    # 步骤5: 因子计算与后处理
    # ==================================================================
    # 计算原始因子值
    factor_series = pd.Series(-residuals * df['fractal'], index=df.index)
    
    # 因子中性化 (去除市场状态影响)
    def neutralize_factor(factor, features):
        """因子中性化处理"""
        valid_mask = ~np.isnan(factor) & ~np.isnan(features).any(axis=1)
        
        if valid_mask.sum() < 100:
            return factor
        
        # 使用分位数转换处理非线性关系
        from sklearn.preprocessing import QuantileTransformer
        qt = QuantileTransformer(output_distribution='normal', random_state=42)
        features_transformed = qt.fit_transform(features[valid_mask])
        
        # 岭回归中性化
        model = Ridge(alpha=1.0)
        model.fit(features_transformed, factor[valid_mask])
        neutralized = factor.copy()
        neutralized[valid_mask] = factor[valid_mask] - model.predict(features_transformed)
        
        return neutralized
    
    # 创建中性化特征 (市场状态代理)
    features = pd.DataFrame({
        'volatility': volatility,
        'volume': df['volume'].ewm(span=24).mean(),
        'buy_ratio': df['taker_buy_volume'] / df['volume'].replace(0, 1e-5),
        'range': df['range']
    })
    
    # 应用中性化
    factor_neutral = neutralize_factor(factor_series, features)
    
    # 异常值处理 (Winsorization)
    def winsorize(series, lower=0.01, upper=0.99):
        """缩尾处理异常值"""
        q_low = series.quantile(lower)
        q_high = series.quantile(upper)
        return series.clip(lower=q_low, upper=q_high)
    
    factor_final = winsorize(factor_neutral)
    
    # 前向填充NaN值
    factor_final = factor_final.ffill().fillna(0)
    
    return factor_final

# 测试因子表现

In [5]:
evaluator.set_factor(
    factor_func=lambda df: factor(df),
    factor_name='factor'
)

result=evaluator.run_full_evaluation(run_stationarity_test=True)

ValueError: 因子函数执行失败: Cannot convert non-finite values (NA or inf) to integer

In [None]:
result['information_ratio']['group_correlations']
# 这表示不同分组之间的相关性，通常用于评估因子在不同市场状态下的表现一致性。
# 如果相关性较高，说明因子在不同市场状态下表现一致；如果相关性较低，说明因子在不同市场状态下表现差异较大。

[-0.01690796479513809,
 0.017082174178146198,
 0.0015363590771937847,
 0.0023839418253158388,
 -0.010124699828869344,
 0.02355724951420949,
 0.0016114718532066448,
 0.009478070331710262,
 0.0028196744056216494,
 0.04323663360510031]

In [None]:
print(result)

{'stationarity_test': {'adf_statistic': -50.41940074401179, 'p_value': 0.0, 'critical_values': {'1%': -3.430399934164982, '5%': -2.861562070176103, '10%': -2.566781747172434}, 'is_stationary': True, 'alpha': 0.05}, 'correlation_analysis': {'IC': 0.0277662954904948, 'Rank_IC': 0.00960788248911194}, 'information_ratio': {'IR': 0.45824880084263897, 'group_correlations': [-0.01690796479513809, 0.017082174178146198, 0.0015363590771937847, 0.0023839418253158388, -0.010124699828869344, 0.02355724951420949, 0.0016114718532066448, 0.009478070331710262, 0.0028196744056216494, 0.04323663360510031], 'n_groups': 10}, 'group_analysis': {'group_stats':        val_min  val_max  val_mean  return_mean  return_std  count
group                                                            
0     -29.6643  -7.8114  -10.8188      -0.0003      0.0134   6552
1      -7.8113  -5.4966   -6.5002      -0.0005      0.0127   6552
2      -5.4966  -4.2037   -4.8063      -0.0003      0.0123   6552
3      -4.2037  -3.2444 