# 导入依赖库

In [None]:
import os
import pandas as pd
import numpy as np
from utils import get_data_dict, compute_ic, get_ret_mat

# 超参数

In [2]:
start_date = '2024-06-01'  # 因子计算采用数据的起始日期
update = True

# 工具函数

In [None]:
def _format_params_suffix(params: dict) -> str:
    """
    Format params into a filename suffix. Single param uses its value; multiple params sorted by key.
    """
    if not params:
        return ''
    items = sorted(params.items())
    parts = []
    for _, value in items:
        parts.append(str(value))
    return '_' + '_'.join(parts)

In [None]:
def compute_factor(data_dict, factor_func, update=False, cache_dir="data/factor", **params):
    """
    Compute a factor for each DataFrame in data_dict, cache the result locally,
    and compute daily factor rank IC against future `forward_days`-day returns.

    Parameters:
    - data_dict: dict of {code: pd.DataFrame}, each df must contain a 'close' column
    - factor_func: callable(df, **params) -> pd.Series or pd.DataFrame of factor values
    - update: bool, if True recompute even if cache exists
    - cache_dir: directory to store cached factor CSVs
    - params: additional parameters passed to factor_func

    Returns:
    - df_factor: pd.DataFrame, index=dates, columns=codes, factor values
    - ic_series: pd.Series, index=dates, daily Rank IC (float)
    """
    # Ensure cache directory exists
    os.makedirs(cache_dir, exist_ok=True)

    # Build cache filename
    func_name = factor_func.__name__
    suffix = _format_params_suffix(params)
    cache_path = os.path.join(cache_dir, f"{func_name}{suffix}.csv")

    # 获取收益率df以计算ic
    ret_mat = get_ret_mat(data_dict)

    # Load from cache if available
    if not update and os.path.exists(cache_path):
        df_factor = pd.read_csv(cache_path, index_col=0, parse_dates=True)
    else:
        # Otherwise compute factor
        raw_factor = {}
        for code, df in data_dict.items():
            raw_factor[code] = factor_func(df, **params)
        df_factor = pd.concat(raw_factor, axis=1)

        df_factor['ic'] = compute_ic(df_factor, ret_mat)

        # Cache to CSV
        df_factor.to_csv(cache_path)

    return df_factor

# 获取历史行情

In [None]:
# 历史行情
data_dict = get_data_dict(start=start_date)

# 因子计算

In [None]:
# 回撤, 偏度, 峰度
# 10. ROC (Rate of Change)
def roc(df: pd.DataFrame, window: int = 20) -> pd.Series:
    return (df['close'] - df['close'].shift(window)) / df['close'].shift(window)

# 11. 振幅 × 换手滚动均值
def amp_turn(df: pd.DataFrame, window: int = 20) -> pd.Series:
    amp = amplitude(df)
    turn = df['换手率']
    return (amp * turn).rolling(window).mean()

# 12. 高低价差与成交额之积
def hlvol(df: pd.DataFrame) -> pd.Series:
    return (df['high'] - df['low']) * df['成交额']

# 13. 收盘价相对于 ATR 的标准化偏离
def stddev_from_atr(df: pd.DataFrame, window: int = 14) -> pd.Series:
    ma = df['close'].rolling(window).mean()
    atr_series = atr(df, window)
    return (df['close'] - ma) / atr_series

# ------- 示例：批量计算各因子 -------

factors_to_compute = {
    'roc': {'func': roc, 'windows': [5, 10, 20]},
    'amp_turn': {'func': amp_turn, 'windows': [5, 10, 20]},
    'stddev_from_atr': {'func': stddev_from_atr, 'windows': [14, 20]}
}


In [None]:
# 动量因子：过去 window 个交易日的收益率
def momentum(df: pd.DataFrame, window: int = 20) -> pd.Series:
    return df['close'].pct_change(periods=window)

for window in [1, 3, 5, 10, 15, 20, 40, 60, 120]:
    compute_factor(data_dict, momentum, window=window, update=update)

In [None]:
# 波动率因子：过去 window 个交易日的收益率标准差
def volatility(df: pd.DataFrame, window: int = 20) -> pd.Series:
    return df['涨跌幅'].rolling(window=window).std()

for window in [3, 5, 10, 15, 20, 40, 60, 120]:
    compute_factor(data_dict, volatility, window=window, update=update)

In [None]:
# 振幅因子：过去 window 个交易日的振幅均值
def amplitude(df: pd.DataFrame, window: int = 20) -> pd.Series:
    return df['振幅'].rolling(window=window).mean()

for window in [1, 3, 5, 10, 15, 20, 40, 60, 120]:
    compute_factor(data_dict, amplitude, window=window, update=update)

In [None]:
# 换手率因子：过去 window 个交易日的换手率均值
def turnover(df: pd.DataFrame, window: int = 20) -> pd.Series:
    return df['换手率'].rolling(window=window).mean()

for window in [3, 5, 10, 15, 20, 40, 60, 120]:
    compute_factor(data_dict, turnover, window=window, update=update)

In [None]:
# 相对换手率
def turnover_rel(df: pd.DataFrame, window: int = 20) -> pd.Series:
    return df['换手率'] / df['换手率'].rolling(window).mean()

for window in [3, 5, 10, 15, 20, 40, 60, 120]:
    compute_factor(data_dict, turnover_rel, window=window, update=update)

In [None]:
# 成交量分位数：今日成交量在过去 window 日中的分位数
def volume_pct(df: pd.DataFrame, window: int = 20) -> pd.Series:
    return df['volume'].rolling(window).apply(lambda x: pd.Series(x).rank(pct=True).iloc[-1])

for window in [3, 5, 10, 15, 20, 40, 60, 120]:
    compute_factor(data_dict, volume_pct, window=window, update=update)

In [None]:
# 量比 
def volume_ratio(df: pd.DataFrame, window: int = 20) -> pd.Series:
    return df['volume'] / df['volume'].rolling(window).mean()

for window in [3, 5, 10, 15, 20, 40, 60, 120]:
    compute_factor(data_dict, volume_ratio, window=window, update=update)

In [None]:
# 平均真实波幅（ATR）因子, 用当日典型价格（Typical Price）归一化
def atr(df: pd.DataFrame, window: int = 14) -> pd.Series:
    typical = (df['high'] + df['low'] + df['close']) / 3
    high_low = df['high'] - df['low']
    high_prev = (df['high'] - df['close'].shift(1)).abs()
    low_prev = (df['low']  - df['close'].shift(1)).abs()
    tr = pd.concat([high_low, high_prev, low_prev], axis=1).max(axis=1)
    tr_rel = tr / typical
    return tr_rel.rolling(window, min_periods=1).mean()

for window in [7, 14, 21, 42, 63, 119]:
    compute_factor(data_dict, atr, window=window, update=update)

In [None]:
# 布林带宽度因子：上下轨差占中轨比例
def boll(df: pd.DataFrame, window: int = 20, k: int = 2) -> pd.Series:
    mid = df['close'].rolling(window).mean()
    std = df['close'].rolling(window).std()
    upper = mid + k * std
    lower = mid - k * std
    return (upper - lower) / mid
            
for window in [5, 10, 20, 40, 60, 120]:
    for k in [1.5, 2, 2.5]:
        compute_factor(data_dict, boll, window=window, k=k, update=update)

In [None]:
# 移动平均价差因子：短期均线高出长期均线的比例
def ma_spread(df: pd.DataFrame, short: int = 5, long: int = 20) -> pd.Series:
    ma_short = df['close'].rolling(window=short).mean() 
    ma_long = df['close'].rolling(window=long).mean()
    return (ma_short - ma_long) / ma_long

for short in [3, 5, 10]:
    for long in [20, 40, 60]:
        compute_factor(data_dict, ma_spread, short=short, long=long, update=update)

In [None]:
# 相对强弱指数（RSI）因子
def rsi(df: pd.DataFrame, window: int = 14) -> pd.Series:
    delta = df['close'].diff()
    up = delta.clip(lower=0)
    down = -delta.clip(upper=0)
    ma_up = up.rolling(window).mean()
    ma_down = down.rolling(window).mean()
    rs = ma_up / (ma_down + 1e-9)
    return 100 - 100 / (1 + rs)

for window in [7, 14, 21, 42, 63, 119]:
    compute_factor(data_dict, rsi, window=window, update=update)

In [None]:
# 能量潮（OBV）因子
def obv(df: pd.DataFrame, window: int = 20) -> pd.Series:
    direction = df['close'].diff().apply(lambda x: 1 if x > 0 else (-1 if x < 0 else np.nan))
    signed_vol = direction * df['volume']
    obv_window = signed_vol.rolling(window=window, min_periods=1).sum()
    return obv_window

for window in [5, 10, 20, 40, 60, 120]:
    compute_factor(data_dict, obv, window=window, update=update)

In [None]:
# 价量趋势指标 PVT
def pvt(df: pd.DataFrame, window: int = 20) -> pd.Series:
    pvt_series = df['涨跌幅'] * df['volume']
    return pvt_series.rolling(window=window, min_periods=1).sum()

for window in [5, 10, 20, 40, 60, 120]:
    compute_factor(data_dict, obv, window=window, update=update)

In [None]:
# macd 因子
def macd(df: pd.DataFrame,
         fast: int = 12,
         slow: int = 26,
         signal: int = 9) -> pd.DataFrame:
    price = df['close']

    # 计算快 EMA 和慢 EMA
    ema_fast = price.ewm(span=fast, adjust=False).mean()
    ema_slow = price.ewm(span=slow, adjust=False).mean()

    # MACD 线
    macd_line = ema_fast - ema_slow

    # 信号线
    signal_line = macd_line.ewm(span=signal, adjust=False).mean()

    return macd_line - signal_line

_ = compute_factor(data_dict, macd, update=update)

  return spearmanr(a, b)[0]
