In [1]:
import pandas as pd
import numpy as np
import os
import matplotlib.pyplot as plt
import statsmodels.api as sm
from scipy.stats import spearmanr

# -----------------------------------------------------------------------------
# 0. 配置与基础数学函数
# -----------------------------------------------------------------------------
DATA_PATH = './data/'
OUTPUT_PATH = './answer2/'
if not os.path.exists(OUTPUT_PATH):
    os.makedirs(OUTPUT_PATH)

def winsorize(series, n_std=3):
    """去极值: mean +/- 3std"""
    if series.empty: return series
    m = series.mean()
    s = series.std()
    return series.clip(lower=m - n_std*s, upper=m + n_std*s)

def standardize(series):
    """Z-score标准化"""
    if series.empty or series.std() == 0: return series
    return (series - series.mean()) / series.std()

def neutralize(factor_series, mv_series, ind_series):
    """
    行业市值中性化
    """
    # 1. 数据清洗
    mv_series = pd.to_numeric(mv_series, errors='coerce')
    factor_series = pd.to_numeric(factor_series, errors='coerce')
    ind_series = pd.to_numeric(ind_series, errors='coerce')

    df = pd.concat([factor_series, mv_series, ind_series], axis=1)
    df.columns = ['y', 'mv', 'ind']
    df = df.dropna()
    df = df[df['mv'] > 0] # 剔除市值<=0
    
    if df.empty: return factor_series * np.nan
    
    # 2. 市值处理
    try:
        df['log_mv'] = np.log(df['mv'])
        df['log_mv'] = winsorize(df['log_mv'])
        df['log_mv'] = standardize(df['log_mv'])
    except Exception:
        return factor_series * np.nan
        
    df = df.replace([np.inf, -np.inf], np.nan).dropna()
    if df.empty: return factor_series * np.nan

    # 3. 行业处理
    df['ind'] = df['ind'].astype(int)
    dummies = pd.get_dummies(df['ind'], prefix='ind', dtype=int)
    target_cols = [f'ind_{i}' for i in range(1, 30)]
    valid_cols = [c for c in target_cols if c in dummies.columns]
    
    # 4. 回归
    if valid_cols:
        X = pd.concat([df['log_mv'], dummies[valid_cols]], axis=1)
    else:
        X = df[['log_mv']]
        
    X = sm.add_constant(X)
    y = df['y']
    
    try:
        X = X.astype(float)
        y = y.astype(float)
        model = sm.OLS(y, X).fit()
        return model.resid.reindex(factor_series.index)
    except Exception:
        return factor_series * np.nan

In [2]:
# -----------------------------------------------------------------------------
# 模块一：股票池筛选
# -----------------------------------------------------------------------------
def get_stock_pool(t_date, trading_days, ipo_date_series, delist_date_series, amt_df, all_columns):
    """
    筛选特定日期的股票池：
    1. 上市满252天
    2. 未退市
    3. 当日非停牌 (成交额>0)
    """
    # 1. 上市满 252 天
    current_idx = trading_days.searchsorted(t_date, side='right') - 1
    cutoff_idx = current_idx - 252
    cutoff_date = trading_days.iloc[cutoff_idx] if cutoff_idx >= 0 else pd.Timestamp.min
    valid_ipo = ipo_date_series[ipo_date_series <= cutoff_date].index

    # 2. 未退市
    not_delisted = delist_date_series[delist_date_series.isna() | (delist_date_series > t_date)].index
    
    # 3. 当日非停牌
    if t_date in amt_df.index:
        trading_stocks = amt_df.loc[t_date][amt_df.loc[t_date] > 0].index
    else:
        # 容错：如果在非交易日计算，向前找最近数据
        nearest_idx = amt_df.index.get_indexer([t_date], method='ffill')[0]
        actual_date = amt_df.index[nearest_idx]
        trading_stocks = amt_df.loc[actual_date][amt_df.loc[actual_date] > 0].index

    # 取交集
    pool = list(set(valid_ipo) & set(not_delisted) & set(trading_stocks))
    # 确保在行情列名中存在
    pool = [s for s in pool if s in all_columns]
    
    return pool

In [3]:
# -----------------------------------------------------------------------------
# 模块二：计算原始因子
# -----------------------------------------------------------------------------
def calculate_raw_factors(t_date, stock_pool, common_idx, 
                          close_adj, turnover, ret_daily, 
                          X_ff3_vals, Y_ret_vals, stock_codes):
    """
    计算 t_date 时点的四个因子原始值
    包含 FF3 残差的 Numpy 加速计算逻辑
    """
    # 定位时间窗口 (过去21天，含当日)
    if t_date not in common_idx:
        loc_idx = common_idx.get_indexer([t_date], method='ffill')[0]
    else:
        loc_idx = common_idx.get_loc(t_date)
        
    # 1. 用于计算区间收益 (Return)：需要回溯 21 个间隔，即取到 t-21，总长度 22
    start_idx = max(0, loc_idx - 21)
    
    # 2. 用于计算统计量 (Mean/Std/FF3)：需要最近 21 个数据点，即取到 t-20，总长度 21
    start_idx_stat = max(0, loc_idx - 20)

    end_idx = loc_idx + 1
    
    # 准备空结果容器
    res = {}
    valid_pool = [s for s in stock_pool if s in close_adj.columns]
    
    if not valid_pool:
        return {k: pd.Series(dtype=float) for k in ['return_1m', 'turn_1m', 'std_1m', 'std_FF3factor_1m']}

    # --- 基础因子计算 ---
    slice_close = close_adj.iloc[start_idx : end_idx][valid_pool]

    # Factor 1: return_1m
    if len(slice_close) > 21:
        res['return_1m'] = slice_close.iloc[-1] / slice_close.iloc[0] - 1
    else:
        res['return_1m'] = pd.Series(np.nan, index=valid_pool)

    slice_turn = turnover.iloc[start_idx_stat : end_idx][valid_pool]
    slice_ret = ret_daily.iloc[start_idx_stat : end_idx][valid_pool]
    
    # Factor 2: turn_1m
    res['turn_1m'] = slice_turn.mean()

    # Factor 3: std_1m
    res['std_1m'] = slice_ret.std()

    # --- Factor 4: std_FF3factor_1m ---
    reg_start_idx = max(0, loc_idx - 20)
    slice_X = X_ff3_vals[reg_start_idx:end_idx, :] # (21, 4)
    slice_Y_all = Y_ret_vals[reg_start_idx:end_idx, :] # (21, N_all)
    
    # 映射股票池索引
    pool_indices = stock_codes.get_indexer(valid_pool)
    mask_found = pool_indices >= 0
    pool_indices = pool_indices[mask_found]
    valid_pool_found = np.array(valid_pool)[mask_found]
    
    f_ff3 = pd.Series(np.nan, index=valid_pool)
    
    # 仅当数据充足时计算
    if len(slice_X) >= 15 and not np.isnan(slice_X).any() and len(pool_indices) > 0:
        slice_Y_pool = slice_Y_all[:, pool_indices]
        valid_cols_mask = ~np.isnan(slice_Y_pool).any(axis=0) # 剔除含NaN的列
        
        if np.sum(valid_cols_mask) > 0:
            Y_clean = slice_Y_pool[:, valid_cols_mask]
            try:
                # 矩阵回归
                coef, _, _, _ = np.linalg.lstsq(slice_X, Y_clean, rcond=None)
                resid = Y_clean - slice_X @ coef
                resid_std = np.std(resid, axis=0, ddof=1)
                
                # 回填数据
                full_vals = np.full(len(valid_pool_found), np.nan)
                full_vals[valid_cols_mask] = resid_std
                f_ff3 = pd.Series(full_vals, index=valid_pool_found).reindex(valid_pool)
            except:
                pass
                
    res['std_FF3factor_1m'] = f_ff3
    return res

In [4]:
# -----------------------------------------------------------------------------
# 模块三：因子预处理
# -----------------------------------------------------------------------------
def preprocess_factors(raw_factors_dict, t_date, stock_pool, 
                       share_float, close_raw, industry_code, all_stocks):
    """
    对一组原始因子进行预处理：去极值 -> 中性化 -> 标准化
    """
    # 准备中性化数据 (FloatMV, Ind)
    if t_date in share_float.index and t_date in close_raw.index:
        float_mv = share_float.loc[t_date] * close_raw.loc[t_date]
    else:
        float_mv = pd.Series(np.nan, index=all_stocks)
        
    if t_date in industry_code.index:
        ind = industry_code.loc[t_date]
    else:
        ind = pd.Series(np.nan, index=all_stocks)
        
    proc_res = {}
    
    for name, s_raw in raw_factors_dict.items():
        # 1. 仅对 Pool 内数据处理
        s_valid = s_raw.loc[stock_pool].dropna()
        
        if s_valid.empty:
            proc_res[name] = s_raw.reindex(all_stocks) # 全NaN
            continue
            
        # 2. 去极值
        s_win = winsorize(s_valid)
        
        # 3. 中性化
        s_neu = neutralize(s_win, float_mv.loc[s_valid.index], ind.loc[s_valid.index])
        
        # 4. 标准化
        s_std = standardize(s_neu)
        
        # 5. 回填至全A股列表 (非Pool为NaN)
        proc_res[name] = s_std.reindex(all_stocks)
        
    return proc_res

In [5]:
# -----------------------------------------------------------------------------
# 模块四：因子检验
# -----------------------------------------------------------------------------
def run_factor_test(factor_name, df_factor, close_adj, test_months, output_path):
    """
    执行 RankIC 测试和分层回测，并绘图保存
    """
    df_factor.index = pd.to_datetime(test_months)
    ic_list = []
    layer_rets = []
    test_dates = []
    
    for i in range(len(test_months) - 1):
        t_current = test_months[i]
        t_next = test_months[i+1]
        
        if t_next not in close_adj.index or t_current not in close_adj.index:
            continue
            
        # 计算下期收益
        ret_next = close_adj.loc[t_next] / close_adj.loc[t_current] - 1
        
        try:
            factor_val = df_factor.loc[t_current]
        except KeyError:
            continue
            
        # 对齐数据
        df_test = pd.concat([factor_val, ret_next], axis=1).dropna()
        df_test.columns = ['factor', 'ret']
        if df_test.empty: continue
            
        # 1. RankIC
        ic = spearmanr(df_test['factor'], df_test['ret'])[0]
        ic_list.append(ic)
        
        # 2. 分层测试
        try:
            df_test['group'] = pd.qcut(df_test['factor'], 5, labels=False)
            g_ret = df_test.groupby('group')['ret'].mean()
            layer_rets.append({
                'Layer1': g_ret.get(4, 0), # Top
                'Layer2': g_ret.get(3, 0),
                'Layer3': g_ret.get(2, 0),
                'Layer4': g_ret.get(1, 0),
                'Layer5': g_ret.get(0, 0)  # Bottom
            })
        except ValueError:
            layer_rets.append({'Layer1':0,'Layer2':0,'Layer3':0,'Layer4':0,'Layer5':0})
            
        test_dates.append(t_current)
        
    # 保存结果
    df_result = pd.concat([
        pd.DataFrame({'RankIC': ic_list}, index=test_dates),
        pd.DataFrame(layer_rets, index=test_dates)
    ], axis=1)
    
    df_result.to_csv(os.path.join(output_path, f'factor_{factor_name}_test.csv'))
    print(f"Factor {factor_name} test done. Saved to CSV.")
    
    # 绘图
    try:
        # IC
        fig, ax = plt.subplots(figsize=(8, 5))
        df_result['RankIC'].cumsum().plot(ax=ax, title=f'Cumulative RankIC - {factor_name}')
        ax.grid(True)
        fig.savefig(os.path.join(output_path, f'{factor_name}_cumsum_ic.png'))
        plt.close(fig)
        
        # Layers
        fig, ax = plt.subplots(figsize=(8, 5))
        pd.DataFrame(layer_rets, index=test_dates).cumsum().plot(ax=ax, title=f'Returns by Layer - {factor_name}')
        ax.grid(True)
        fig.savefig(os.path.join(output_path, f'{factor_name}_cumsum_layers.png'))
        plt.close(fig)
    except Exception as e:
        print(f"Plotting failed: {e}")

In [6]:
# -----------------------------------------------------------------------------
# 模块五：FF3因子日序列计算
# -----------------------------------------------------------------------------
def calculate_ff3_daily(close_raw, share_total, pb_lf, ret_daily):
    """
    根据原始行情计算 SMB 和 HML 的日频序列 (Numpy加速版)
    SMB: 市值后30%收益率 - 前30%收益率
    HML: BP前30%收益率 - 后30%收益率
    """
    print("Calculating FF3 Factors ...")
    
    # 1. 对齐数据
    # 找到所有数据的共同索引和列
    common_idx = close_raw.index.intersection(share_total.index).intersection(pb_lf.index).intersection(ret_daily.index)
    common_cols = close_raw.columns.intersection(share_total.columns).intersection(pb_lf.columns).intersection(ret_daily.columns)
    
    # 转换为 Numpy 矩阵以加速
    mat_mv = (close_raw.loc[common_idx, common_cols] * share_total.loc[common_idx, common_cols]).values
    mat_bp = (1 / pb_lf.loc[common_idx, common_cols]).values # BP = 1 / PB
    mat_ret = ret_daily.loc[common_idx, common_cols].values
    
    n_days = len(common_idx)
    smb_arr = np.full(n_days, np.nan)
    hml_arr = np.full(n_days, np.nan)
    
    # 设置计算起点 (假设前几个月数据不稳定，从有数据处开始，这里简单全循环)
    # 为了提速，可跳过早期空值过多的时期
    mat_mv = (close_raw * share_total).shift(1).values 
    mat_bp = (1 / pb_lf).shift(1).values
    mat_ret = ret_daily.values

    for i in range(n_days):
        mv_t = mat_mv[i, :]
        bp_t = mat_bp[i, :]
        ret_t = mat_ret[i, :]
        
        # 有效性掩码：三个指标都必须非NaN
        valid_mask = (~np.isnan(mv_t)) & (~np.isnan(bp_t)) & (~np.isnan(ret_t))
        
        # 只要有效股票数够多才计算（例如大于50只）
        if np.sum(valid_mask) < 50:
            continue
            
        v_mv = mv_t[valid_mask]
        v_bp = bp_t[valid_mask]
        v_ret = ret_t[valid_mask]
        
        # --- 计算 SMB (按市值排序) ---
        q30_mv = np.percentile(v_mv, 30)
        q70_mv = np.percentile(v_mv, 70)
        
        small_mask = v_mv <= q30_mv
        big_mask = v_mv >= q70_mv
        
        if np.any(small_mask) and np.any(big_mask):
            # 算术平均
            smb_val = np.mean(v_ret[small_mask]) - np.mean(v_ret[big_mask])
            smb_arr[i] = smb_val
            
        # --- 计算 HML (按BP排序) ---
        # 题目定义：HML = 前30%(High BP) - 后30%(Low BP)
        q30_bp = np.percentile(v_bp, 30)
        q70_bp = np.percentile(v_bp, 70)
        
        high_bp_mask = v_bp >= q70_bp # Top 30%
        low_bp_mask = v_bp <= q30_bp  # Bottom 30%
        
        if np.any(high_bp_mask) and np.any(low_bp_mask):
            hml_val = np.mean(v_ret[high_bp_mask]) - np.mean(v_ret[low_bp_mask])
            hml_arr[i] = hml_val
            
    smb_daily = pd.Series(smb_arr, index=common_idx)
    hml_daily = pd.Series(hml_arr, index=common_idx)
    
    print("FF3 calculation done.")
    return smb_daily, hml_daily

In [None]:
# -----------------------------------------------------------------------------
# 主程序：执行全流程
# -----------------------------------------------------------------------------

# 1. 基础配置与加载
print("Loading data...")
# 辅助函数：加载并转置，同时处理日期索引
def load_market_data(name):
    path = os.path.join(DATA_PATH, name)
    print(f"Reading {name}...")
    df = pd.read_csv(path, index_col=0).T
    df.index = pd.to_datetime(df.index) # 关键：确保索引是Timestamp对象
    return df

# 加载非行情数据
day_df = pd.read_csv(os.path.join(DATA_PATH, 'day.csv'))
month_df = pd.read_csv(os.path.join(DATA_PATH, 'month.csv'))
ipo_date = pd.read_csv(os.path.join(DATA_PATH, 'IPO_date_info.csv'), index_col=0)
delist_date = pd.read_csv(os.path.join(DATA_PATH, 'delist_date_info.csv'), index_col=0)

# 加载日频行情 (注意文件名后缀 _day.csv)
close_adj = load_market_data('close_adj_day.csv')
close_raw = load_market_data('close_day.csv')
amt = load_market_data('amt_day.csv')
turnover = load_market_data('turn_day.csv')
share_total = load_market_data('share_totala_day.csv')
share_float = load_market_data('float_a_shares_day.csv')
industry_code = load_market_data('cs_indus_code_day.csv')
pb_lf = load_market_data('pb_lf_day.csv')
csi_all = load_market_data('csiall_day.csv')

# 计算日收益率 (使用 fill_method=None 避免新版 pandas 警告)
ret_daily = close_adj.pct_change(fill_method=None)
# 假设 csi_all 第一列是指数收盘价
mkt_daily_ret = csi_all.iloc[:, 0].pct_change(fill_method=None)

# 准备日期序列
trading_days = pd.to_datetime(day_df['date']).sort_values().reset_index(drop=True)
month_dates = pd.to_datetime(month_df['date'])
# 筛选回测区间: 2010-12 到 2024-11
test_months = month_dates[(month_dates >= '2010-12-01') & (month_dates <= '2024-11-30')].tolist()
ipo_series = pd.to_datetime(ipo_date['IPO_date'])
delist_series = pd.to_datetime(delist_date['delist_date'])

# 2. 全局计算 FF3 因子
# 这一步计算全市场的 SMB 和 HML，用于后续个股回归
smb_daily, hml_daily = calculate_ff3_daily(close_raw, share_total, pb_lf, ret_daily)

# 构建回归所需的因子矩阵 X (Constant, Mkt, SMB, HML)
common_idx = close_adj.index
# 确保所有因子在时间轴上对齐
df_ff3 = pd.DataFrame({
    'Mkt': mkt_daily_ret,
    'SMB': smb_daily,
    'HML': hml_daily
}).reindex(common_idx)

# 构建 Numpy 矩阵供后续切片回归使用
# 第一列加常数项 (Intercept)
X_ff3_vals = np.hstack([np.ones((len(df_ff3), 1)), df_ff3.values]) 
Y_ret_vals = ret_daily.reindex(common_idx).values
all_stocks = close_adj.columns

# 3. 循环计算与处理
print("Starting Monthly Loop...")
# 定义容器，key 名需与 calculate_raw_factors 返回的一致
target_factors = ['return_1m', 'turn_1m', 'std_1m', 'std_FF3factor_1m']
factors_raw_list = {k: [] for k in target_factors}
factors_proc_list = {k: [] for k in target_factors}

for t_date in test_months:
    # date_str = t_date.strftime('%Y-%m-%d')
    # print(f"Processing {date_str}...")
    
    # A. 股票池筛选
    pool = get_stock_pool(t_date, trading_days, ipo_series, delist_series, amt, all_stocks)
    
    # B. 计算原始因子
    raw_res = calculate_raw_factors(t_date, pool, common_idx, 
                                    close_adj, turnover, ret_daily, 
                                    X_ff3_vals, Y_ret_vals, all_stocks)
    
    # C. 因子预处理
    proc_res = preprocess_factors(raw_res, t_date, pool, 
                                  share_float, close_raw, industry_code, all_stocks)
    
    # D. 收集结果
    for k in target_factors:
        # 原始值 (Reindex 确保包含所有股票，非Pool为NaN)
        r_s = raw_res[k].reindex(all_stocks)
        r_s.name = t_date
        factors_raw_list[k].append(r_s)
        
        # 预处理值
        p_s = proc_res[k]
        p_s.name = t_date
        factors_proc_list[k].append(p_s)

# -----------------------------------------------------------------------------
# 4. 保存因子文件
# -----------------------------------------------------------------------------
print("Saving Factor Files...")
if not os.path.exists(OUTPUT_PATH):
    os.makedirs(OUTPUT_PATH)

# 建立内部因子名到输出文件名的映射字典
# Key: 代码中计算用的名称
# Value: 最终保存文件的名称
name_mapping = {
    'return_1m': 'return_1m',
    'turn_1m': 'turn_1m',
    'std_1m': 'std_1m',
    'std_FF3factor_1m': 'std_FF3_1m'  # 这里将 std_FF3factor_1m 映射为 std_FF3_1m
}

for internal_name in target_factors:
    # 获取对应的数据
    df_raw = pd.DataFrame(factors_raw_list[internal_name])
    df_proc = pd.DataFrame(factors_proc_list[internal_name])
    
    # 强制设置索引为日期对象
    df_raw.index = pd.to_datetime(test_months)
    df_proc.index = pd.to_datetime(test_months)
    
    # 获取输出用的文件名标识
    output_name = name_mapping.get(internal_name, internal_name)
    
    # 按照作业要求的目录结构命名文件
    # 例如：factor_std_FF3_1m_raw.csv
    df_raw.to_csv(os.path.join(OUTPUT_PATH, f'factor_{output_name}_raw.csv'))
    df_proc.to_csv(os.path.join(OUTPUT_PATH, f'factor_{output_name}_processed.csv'))

# -----------------------------------------------------------------------------
# 5. 因子检验
# -----------------------------------------------------------------------------
print("Running Factor Tests...")
for internal_name in target_factors:
    df_proc = pd.DataFrame(factors_proc_list[internal_name])
    df_proc.index = pd.to_datetime(test_months)
    
    # 获取输出用的文件名标识
    output_name = name_mapping.get(internal_name, internal_name)
    
    # 调用检验函数时，传入 output_name
    run_factor_test(output_name, df_proc, close_adj, test_months, OUTPUT_PATH)

print("All Done! Results saved to:", OUTPUT_PATH)

Loading data...
Reading close_adj_day.csv...
Reading close_day.csv...
Reading amt_day.csv...
Reading turn_day.csv...
Reading share_totala_day.csv...
Reading float_a_shares_day.csv...
Reading cs_indus_code_day.csv...
Reading pb_lf_day.csv...
Reading csiall_day.csv...
Calculating FF3 Factors ...
FF3 calculation done.
Starting Monthly Loop...
Saving Factor Files...
Running Factor Tests...
Factor return_1m test done. Saved to CSV.
Factor turn_1m test done. Saved to CSV.
Factor std_1m test done. Saved to CSV.
Factor std_FF3_1m test done. Saved to CSV.
All Done! Results saved to: ./answer2/
