# Alpha 因子研究：MOM_21x1（IC / 分层回测 / t 检验）

本 Notebook 在米筐「投资研究」环境运行（`rqdatac` 数据接口）。

## 因子定义（MOM_21x1）
**动量（Momentum）**：近 21 个交易日收益，**排除最近 1 个交易日**，以减弱隔夜/微观结构噪声。
$$ \text{MOM\_21x1}(i,t) = \frac{P_{i,t-1}}{P_{i,t-21}} - 1 $$

## 研究设定
- 标的池：沪深300（`000300.XSHG`）。
- 频率：周频（周五形成，次周一持有 5 个交易日）。
- 去极值：MAD 法；标准化：z-score。
- 中性化：申万一级行业 + 对数市值（可开关）。
- 交易成本：方向性换手 × 5bp（示意）。

## 输出
- **IC 测试**：均值、标准差、t 统计、ICIR、胜率，并绘制时间序列与直方图。
- **分层回测**：5 分位净值曲线、Q5-Q1 多空表现与 t 检验。
- **讨论**：因子含义、结果解读与改进思路。

In [ ]:
# 环境与依赖
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from scipy import stats
from datetime import datetime

plt.rcParams['font.sans-serif'] = ['Arial Unicode MS', 'SimHei']
plt.rcParams['axes.unicode_minus'] = False

RQ_OK = True
try:
    from rqdatac import (
        init, get_trading_dates, index_components, get_price,
        is_suspended, is_st_stock, industry, get_factor
    )
    try:
        init()
    except Exception as e:
        print('rqdatac.init() 失败：', e)
except Exception as e:
    print('未检测到 rqdatac，将在本地以占位逻辑运行：', e)
    RQ_OK = False

## 参数区（可直接修改）

In [ ]:
universe_index = '000300.XSHG'  # 沪深300
start_date = '2017-01-01'
end_date = datetime.today().strftime('%Y-%m-%d')
rebalance_weekday = 4          # 0=周一,...,4=周五
holding_days = 5               # 持有 5 天
n_quantiles = 5                # 5 分位
cost_bp = 5                    # 单边成本 5bp
neutralize_industry = True     # 行业+市值中性化
industry_level = 'sw_l1'
winsor_mad = 5
np.random.seed(42)
print('参数：', universe_index, start_date, end_date)

## 工具函数

In [ ]:
def mad_winsorize(s, n=winsor_mad):
    med = s.median()
    mad = (s - med).abs().median()
    if mad == 0 or np.isnan(mad):
        return s
    hi = med + n*1.4826*mad
    lo = med - n*1.4826*mad
    return s.clip(lo, hi)

def zscore(s):
    std = s.std(ddof=1)
    return (s - s.mean()) / (std if std!=0 else 1)

def get_universe(date):
    if not RQ_OK:
        return []
    pool = index_components(universe_index, date)
    try:
        mask_st = ~is_st_stock(pool, date)
        mask_sus = ~is_suspended(pool, date)
        idx = pd.Index(pool)
        keep = idx[idx.isin(mask_st[mask_st].index) & idx.isin(mask_sus[mask_sus].index)]
        return list(keep)
    except Exception:
        return pool

def get_industry_expo(stocks, date):
    if not RQ_OK or len(stocks)==0:
        return pd.DataFrame(index=stocks)
    df = industry(stocks, date=date, level=industry_level)
    X = pd.get_dummies(df[industry_level])
    X.index = df.index
    return X

def get_log_mktcap(stocks, date):
    if not RQ_OK or len(stocks)==0:
        return pd.Series(index=stocks, dtype=float)
    try:
        mcap = get_factor(stocks, 'circulating_market_cap', date).iloc[:,0]
    except Exception:
        px = get_price(stocks, start_date=date, end_date=date, fields='close')
        cl = px.xs('close', axis=1, level=1).iloc[0]
        mcap = cl.fillna(cl.median())*1e8
    lncap = np.log(mcap.replace({0: np.nan}))
    return lncap.replace([np.inf, -np.inf], np.nan)

def neutralize_series(factor_s, stocks, date):
    if not neutralize_industry or len(stocks)==0:
        return factor_s
    X = get_industry_expo(stocks, date)
    X['ln_mktcap'] = get_log_mktcap(stocks, date)
    X = X.fillna(0)
    y = factor_s.reindex(X.index).astype(float)
    mask = y.notna()
    if mask.sum() < 10:
        return factor_s
    X1 = X[mask].values
    y1 = y[mask].values
    beta = np.linalg.lstsq(np.c_[np.ones(len(X1)), X1], y1, rcond=None)[0]
    y_hat = np.c_[np.ones(len(X1)), X1] @ beta
    resid = pd.Series(y1 - y_hat, index=X[mask].index)
    return resid.reindex(factor_s.index)

def compute_factor(stocks, date):
    if not RQ_OK or len(stocks)==0:
        return pd.Series(index=stocks, dtype=float)
    dates = get_trading_dates(end_date=date, count=22)
    if len(dates) < 22:
        return pd.Series(index=stocks, dtype=float)
    start, end = dates[0], dates[-2]
    px = get_price(stocks, start_date=start, end_date=end, fields='close')
    cl = px.xs('close', axis=1, level=1)
    r = cl.iloc[-1]/cl.iloc[0] - 1
    return r

def next_period_return(stocks, start_date, holding_days=5):
    if not RQ_OK or len(stocks)==0:
        return pd.Series(index=stocks, dtype=float)
    dates = get_trading_dates(start_date=start_date, count=holding_days+1)
    if len(dates) < holding_days+1:
        return pd.Series(index=stocks, dtype=float)
    px = get_price(stocks, start_date=dates[0], end_date=dates[-1], fields=['open','close'])
    op = px.xs('open', axis=1, level=1).iloc[0]
    cl = px.xs('close', axis=1, level=1).iloc[-1]
    return cl/op - 1

def apply_cost(prev_w, new_w, cost_bp=5):
    w0 = prev_w.reindex(new_w.index).fillna(0)
    turnover = (w0 - new_w).abs().sum()
    return turnover * (cost_bp/1e4)

## 构建调仓序列

In [ ]:
if RQ_OK:
    all_days = get_trading_dates(start_date=start_date, end_date=end_date)
else:
    all_days = pd.date_range(start_date, end_date, freq='B')
rebalance_days = [d for d in all_days if pd.Timestamp(d).weekday()==rebalance_weekday]
len(rebalance_days)

## 计算因子（横截面）并处理（去极值/中性化/标准化）

In [ ]:
factor_panel = {}
for d in rebalance_days:
    stk = get_universe(d) if RQ_OK else []
    f = compute_factor(stk, d)
    f = mad_winsorize(f)
    f = neutralize_series(f, stk, d)
    f = zscore(f)
    factor_panel[pd.Timestamp(d)] = f
factor_df = pd.DataFrame(factor_panel).T.sort_index()
factor_df.head()

## 计算下一期收益（用于 IC 和分层回测）

In [ ]:
fwd_panel = {}
for d in factor_df.index:
    stk = factor_df.columns[factor_df.loc[d].notna()]
    ret = next_period_return(list(stk), d, holding_days=holding_days)
    fwd_panel[pd.Timestamp(d)] = ret
forward_ret = pd.DataFrame(fwd_panel).T
forward_ret.head()

## IC 测试（Spearman）与 t 检验

In [ ]:
ics = []
for d in factor_df.index:
    s = factor_df.loc[d].dropna()
    r = forward_ret.loc[d].reindex(s.index).dropna()
    s = s.reindex(r.index)
    if len(s) > 5:
        ics.append((d, stats.spearmanr(s, r).correlation))
ic_series = pd.Series({d:ic for d,ic in ics}).sort_index()

ic_mean = ic_series.mean()
ic_std  = ic_series.std(ddof=1)
ic_t    = ic_mean / (ic_std/np.sqrt(len(ic_series))) if ic_std>0 else np.nan
ic_ir   = ic_mean / ic_std if ic_std>0 else np.nan
ic_pos  = (ic_series>0).mean()

print('IC 均值:', round(ic_mean,4), 'IC 标准差:', round(ic_std,4), 'IC t:', round(ic_t,2), 'ICIR:', round(ic_ir,2), '胜率:', round(ic_pos,2))

plt.figure(figsize=(10,4))
ic_series.plot(title='IC 时间序列')
plt.show()

plt.figure(figsize=(5,4))
ic_series.hist(bins=30)
plt.title('IC 分布')
plt.show()

## 分层回测（含简化交易成本）

In [ ]:
qs = [f'Q{i}' for i in range(1, n_quantiles+1)]
quant_nav = pd.DataFrame(index=factor_df.index, columns=qs)
long_short = pd.Series(index=factor_df.index, dtype=float)
prev_w = {q: pd.Series(dtype=float) for q in qs}

for d in factor_df.index:
    s = factor_df.loc[d].dropna()
    r = forward_ret.loc[d].reindex(s.index)
    rk = s.rank(method='first')
    cats = pd.qcut(rk, n_quantiles, labels=qs)
    df = pd.DataFrame({'ret': r, 'q': cats}).dropna()
    if df.empty:
        continue
    qret = {}
    for q in qs:
        members = df.index[df['q']==q]
        if len(members)==0:
            qret[q] = 0.0
            continue
        w = pd.Series(1/len(members), index=members)
        gross = df.loc[members, 'ret'].mean()
        cost = apply_cost(prev_w[q], w, cost_bp=cost_bp)
        qret[q] = gross - cost
        prev_w[q] = w
    for q in qs:
        quant_nav.loc[d, q] = qret[q]
    long_short.loc[d] = qret[qs[-1]] - qret[qs[0]]

quant_nav = (1 + quant_nav.fillna(0)).cumprod()
ls_nav = (1 + long_short.fillna(0)).cumprod()

plt.figure(figsize=(10,5))
for q in qs:
    plt.plot(quant_nav.index, quant_nav[q], label=q)
plt.plot(ls_nav.index, ls_nav.values, label='Long-Short (Q5-Q1)', linestyle='--')
plt.title('分层净值曲线（含成本）')
plt.legend()
plt.show()

ann = 252/holding_days
ls_ret = long_short.dropna()
ann_mean = ls_ret.mean()*ann
ann_std  = ls_ret.std(ddof=1)*np.sqrt(ann)
sharpe   = ann_mean/ann_std if ann_std>0 else np.nan
t_stat   = ls_ret.mean() / (ls_ret.std(ddof=1)/np.sqrt(len(ls_ret))) if ls_ret.std(ddof=1)>0 else np.nan

print('多空年化收益(近似):', round(ann_mean,4), '年化波动(近似):', round(ann_std,4), '夏普(近似):', round(sharpe,2))
print('多空 t 统计量:', round(t_stat,2))

## 结果汇总表

In [ ]:
summary = pd.DataFrame({
    'IC_mean':[ic_mean], 'IC_std':[ic_std], 'IC_t':[ic_t], 'ICIR':[ic_ir], 'IC_pos_ratio':[ic_pos],
    'LS_ann_return(approx)':[ann_mean], 'LS_ann_vol(approx)':[ann_std], 'LS_sharpe(approx)':[sharpe], 'LS_t':[t_stat],
    'Periods':[len(ic_series)]
})
summary

## 讨论：因子含义、回测结果与改进思路
### 因子经济含义
- **趋势延续**：近 1 个月表现好的股票，未来短期可能延续强势；反之亦然。
- **剔除最近 1 天**：降低隔夜跳空、短期反转等噪声影响。

### 回测解读（如何读取上述结果）
- 若 **IC 均值 > 0**、ICIR 较高，说明因子有稳定正向预测力；胜率>50% 属于加分项。
- 分层净值中，**Q5 曲线应高于 Q1**，且 **Q5-Q1 多空曲线**抬升；t 统计显著（|t|>2）更可靠。
- 若效果一般，常见原因包括样本时段、交易成本、极端行情；可通过改进项优化。

### 可行改进
1. **窗口组合**：与更长（63/126日）或更短（5/10日）窗口动量做线性/非线性融合。
2. **量价共振**：用成交额变化或换手率对动量加权，过滤伪趋势。
3. **波动调节**：因子或权重做波动率缩放（vol targeting）。
4. **更强中性化**：细化行业层级；加入 Beta、估值、质量等暴露控制。
5. **执行优化**：涨跌停/流动性过滤、更细成本模型（冲击+手续费+滑点）。
6. **稳健性**：牛熊/震荡分段、不同指数池交叉验证、滚动窗口稳定性评估。
7. **多因子集成**：与价值/质量/低波等融合，并做冲突解决（行业内择优）。