In [1]:
import os
import time
import pandas as pd
import numpy as np
import tushare as ts
from itertools import combinations
from statsmodels.tsa.stattools import adfuller, coint
import statsmodels.api as sm
from tqdm import tqdm
from collections import defaultdict
from math import ceil

In [2]:
# 参数设置
TS_TOKEN = os.getenv("TUSHARE_TOKEN")
print(TS_TOKEN)
INDEX_CODE = "000906.SH"  # 中证800=000906.SH
START_DATE = "20200101"   # 覆盖形成期
END_DATE   = "20250930"
SLEEP_SEC  = 0.4   # 2000积分账号的可用频率是200/min
MAX_RETRY  = 5

332e447a961d7b9722b284cb7262ee3e4d7a72efc06fd06ee849ad21


In [3]:
# ============== 2) 初始化 Tushare =======================
ts.set_token(TS_TOKEN)
pro = ts.pro_api()

In [4]:
# 获取中证800的成分股代码
df = pro.index_weight(index_code=INDEX_CODE, start_date=END_DATE)
print(df.head())
stock_codes = df['con_code'].unique().tolist()  # 自动去重
print(stock_codes[:5],len(stock_codes))

  index_code   con_code trade_date  weight
0  000906.SH  300750.SZ   20250930   3.132
1  000906.SH  600519.SH   20250930   2.675
2  000906.SH  601318.SH   20250930   1.749
3  000906.SH  600036.SH   20250930   1.475
4  000906.SH  601899.SH   20250930   1.430
['300750.SZ', '600519.SH', '601318.SH', '600036.SH', '601899.SH'] 800


In [5]:
# 获取股票基本信息
stock_basic = pro.stock_basic(fields='ts_code,symbol,name,area,industry,list_date')
print(stock_basic.head())

     ts_code  symbol   name area industry list_date
0  000001.SZ  000001   平安银行   深圳       银行  19910403
1  000002.SZ  000002    万科A   深圳     全国地产  19910129
2  000004.SZ  000004  *ST国华   深圳     软件服务  19910114
3  000006.SZ  000006   深振业A   深圳     区域地产  19920427
4  000007.SZ  000007    全新好   深圳     其他商业  19920413


In [6]:
# 合并数据, 将code和industry对应起来
merged_df = stock_basic[stock_basic['ts_code'].isin(stock_codes)]
print(merged_df.head(),'\n',len(merged_df))

# 只保留需要的2列(code和industry)
result_df = merged_df[['ts_code', 'industry']].copy()
print(result_df.head(),'\n',len(merged_df))

      ts_code  symbol  name area industry list_date
0   000001.SZ  000001  平安银行   深圳       银行  19910403
1   000002.SZ  000002   万科A   深圳     全国地产  19910129
6   000009.SZ  000009  中国宝安   深圳     电气设备  19910625
15  000021.SZ  000021   深科技   深圳      元器件  19940202
18  000027.SZ  000027  深圳能源   深圳     火力发电  19930903 
 800
      ts_code industry
0   000001.SZ       银行
1   000002.SZ     全国地产
6   000009.SZ     电气设备
15  000021.SZ      元器件
18  000027.SZ     火力发电 
 800


In [None]:
# 循环获取 CSI800 收盘价
all_data = []

for i, code in enumerate(stock_codes, 1):
    # 用 pro_bar 才能拿到前复权
    df = ts.pro_bar(ts_code=code, adj='qfq',
                    start_date=START_DATE, end_date=END_DATE,
                    fields='ts_code,trade_date,close')
    if df is not None and not df.empty:
        all_data.append(df)
    # print(f"{i}/{len(stock_codes)} {code} → {0 if df is None else len(df)} 行")
    time.sleep(0.4)

# 合并长表
daily_prices = pd.concat(all_data, ignore_index=True)
daily_prices['trade_date'] = pd.to_datetime(daily_prices['trade_date'])

# 宽表：索引=trade_date（变成“日期”列），列=ts_code，值=close
wide = (daily_prices
        .pivot(index='trade_date', columns='ts_code', values='close')
        .sort_index())
wide.index.name = '日期'

# 保存（缺失留空）
wide.to_csv('csi800_qfq_close.csv', encoding='utf-8-sig', na_rep='')

# 可选：把行业映射单独保存一份（1 行 / 股票）
(result_df[['ts_code','industry']]
 .drop_duplicates('ts_code')
 .sort_values('ts_code')
 .to_csv('csi800_industry_map.csv', index=False, encoding='utf-8-sig'))

In [8]:
# ========== 1) 读入数据 ==========
PRICES_CSV = "csi800_qfq_close.csv"      # 你的文件名
MAP_CSV    = "csi800_industry_map.csv"   # 你的文件名

# 读价格宽表：第一列是日期
prices = pd.read_csv(PRICES_CSV, parse_dates=['日期'])
prices = prices.sort_values('日期').set_index('日期')
# 形成期 / 测试期
TRAIN_START, TRAIN_END = "2020-01-01", "2023-12-30"
TEST_START,  TEST_END  = "2024-01-01", "2025-09-30"
# 仅用形成期做协整与半衰期
prices_train = prices.loc[TRAIN_START:TRAIN_END].copy()

# 读行业映射（去重，只保留需要的两列）
imap = (pd.read_csv(MAP_CSV, dtype={'ts_code': str})
          .drop_duplicates(subset=['ts_code'])
          .loc[:, ['ts_code', 'industry']])

# 只保留“行业表”中出现的代码列，且这些代码确实在价格表里有列
valid_cols = [c for c in imap['ts_code'].unique().tolist() if c in prices.columns]
prices = prices[valid_cols].copy()
imap   = imap[imap['ts_code'].isin(valid_cols)].copy()

# ========== 2) 参数 ==========
USE_LOG  = True     # 建议对价格取对数做协整（更常见）
MIN_OBS  = 150      # 两只股票重叠样本数的下限
OUTPUT_CSV = "coint_results.csv"

# ========== 3) 工具函数 ==========
def eg_ols_adf(y: pd.Series, x: pd.Series):
    """
    Engle-Granger 二步法：
    第一步：OLS 回归 y = a + b*x + u
    第二步：对残差 u 做 ADF 单位根检验。
    返回：alpha, beta, adf_stat, adf_pvalue, adf_crit (dict)
    注意：ADF 的 p 值和临界值是“常规 ADF”的，而 EG 真正的
         p 值应使用 MacKinnon 表（statsmodels.coint 已内置）。
    """
    # OLS
    X = sm.add_constant(x.values, has_constant='add')  # [1, x]
    model = sm.OLS(y.values, X, missing='drop')
    res = model.fit()
    alpha = res.params[0]
    beta  = res.params[1]
    resid = y - (alpha + beta * x)

    # 对残差做 ADF
    adf_stat, adf_pvalue, usedlag, nobs, crit_vals, icbest = adfuller(
        resid.values, maxlag=None, regression='c', autolag='AIC'
    )
    return alpha, beta, adf_stat, adf_pvalue, crit_vals, resid

def sm_coint(y: pd.Series, x: pd.Series):
    """
    statsmodels 原生协整检验（MacKinnon p 值更标准）
    返回：t_stat, p_value, crit_vals (dict)
    """
    t_stat, p_value, crit_vals = coint(y.values, x.values, trend='c')  # y~x
    return t_stat, p_value, {'1%': crit_vals[0], '5%': crit_vals[1], '10%': crit_vals[2]}

# ========== 4) 按行业两两配对 & 协整 ==========
rows = []
grouped = imap.groupby('industry', dropna=False)

print("开始按行业进行两两协整检验...")
for industry, sub in grouped:
    codes = sub['ts_code'].unique().tolist()

    # 行业内股票不足两只就跳过
    if len(codes) < 2:
        continue

    # 枚举行业内所有两两配对
    for a, b in tqdm(list(combinations(codes, 2)), desc=f"[{industry}] pairs", leave=False):
        s = prices_train[[a, b]].dropna()
        if len(s) < MIN_OBS:
            continue

        # 可选：对数价格
        if USE_LOG:
            s = np.log(s)

        # 分别取两个序列（注意对齐后已无缺失）
        y = s[a]
        x = s[b]

        try:
            # EG 二步法（回归 + 残差ADF）
            alpha, beta, adf_stat, adf_p, adf_crit, resid = eg_ols_adf(y, x)

            # 原生协整检验（推荐看这个的 p 值）
            t_stat, p_val, crit = sm_coint(y, x)

            rows.append({
                'industry': industry,
                'y': a, 'x': b,
                'n_obs': int(len(s)),
                'alpha': alpha,
                'beta': beta,                      # 对冲比率（b）
                'eg_adf_stat': adf_stat,          # 残差ADF统计量（常规ADF）
                'eg_adf_pvalue': adf_p,           # 残差ADF p 值（仅作参考）
                'eg_adf_cv_1%': adf_crit.get('1%'),
                'eg_adf_cv_5%': adf_crit.get('5%'),
                'eg_adf_cv_10%': adf_crit.get('10%'),
                'coint_tstat': t_stat,            # 协整 t 值（越小越显著）
                'coint_pvalue': p_val,            # MacKinnon p 值（推荐依据）
                'coint_cv_1%': crit['1%'],
                'coint_cv_5%': crit['5%'],
                'coint_cv_10%': crit['10%'],
                'resid_std': float(np.std(resid, ddof=1)),
            })
        except Exception as e:
            # 某些极端样本可能失败，直接跳过
            # 你也可以 print(e) 来查看
            continue

# ========== 5) 保存结果 ==========
if rows:
    out = pd.DataFrame(rows)
    # 常见过滤：p 值显著的对（例如 5%）
    out = out.sort_values(['industry', 'coint_pvalue', 'y', 'x']).reset_index(drop=True)
    out.to_csv(OUTPUT_CSV, index=False, encoding='utf-8-sig')
    print(f"✅ 完成！共 {len(out)} 对，已保存：{OUTPUT_CSV}")
else:
    print("⚠️ 没有得到任何有效的配对结果。请检查 MIN_OBS、文件列名或数据缺失情况。")

开始按行业进行两两协整检验...


                                                                                                                                                                

✅ 完成！共 7577 对，已保存：coint_results.csv




In [9]:
# ========== 0) 参数 ==========
PRICES_CSV     = "csi800_qfq_close.csv"
COINT_CSV      = "coint_results.csv"
OUTPUT_CSV     = "coint_results_with_halflife.csv"

USE_LOG        = True     # 上一步若用对数，这里也保持一致
MIN_OBS_HL     = 100      # 计算半衰期所需的最少重叠样本
ONLY_COINTEGRATED = False # 若只给“显著协整”的配对算HL，设 True
PVAL_THRESH    = 0.05     # 协整显著性阈值

# ========== 1) 读数据 ==========
prices = pd.read_csv(PRICES_CSV, parse_dates=['日期']).sort_values('日期').set_index('日期')
coint_df = pd.read_csv(COINT_CSV)

# 与协整保持一致的形成期/测试期
TRAIN_START, TRAIN_END = "2020-01-01", "2023-12-30"
TEST_START,  TEST_END  = "2024-01-01", "2025-09-30"

prices_train = prices.loc[TRAIN_START:TRAIN_END].copy()

# 可选：只保留显著协整的对，减少计算量
if ONLY_COINTEGRATED and 'coint_pvalue' in coint_df.columns:
    coint_df = coint_df[coint_df['coint_pvalue'] < PVAL_THRESH].copy()

# ========== 2) 工具函数：计算半衰期 ==========
def compute_halflife_from_resid(z: pd.Series):
    """
    输入：残差序列 z_t（建议已对齐、无缺失）
    步骤：去均值 -> 回归 Δz_t = c + κ z_{t-1}
    令 ρ = 1 + κ；若 0<ρ<1，则 HL = -ln(2)/ln(ρ)，否则 NaN
    返回：hl_days, rho, kappa, n_used
    """
    z = z.dropna()
    if len(z) < MIN_OBS_HL:
        return np.nan, np.nan, np.nan, int(len(z))

    # 去均值（可减少截距的影响）
    z = z - z.mean()

    # 组 Δz 与 z_{t-1}
    dz = z.diff()
    z_lag = z.shift(1)
    df_hl = pd.concat([dz, z_lag], axis=1).dropna()
    df_hl.columns = ['dz', 'z_lag1']

    if len(df_hl) < MIN_OBS_HL:
        return np.nan, np.nan, np.nan, int(len(df_hl))

    X = sm.add_constant(df_hl['z_lag1'].values, has_constant='add')
    y = df_hl['dz'].values

    try:
        res = sm.OLS(y, X).fit()
        kappa = float(res.params[1])   # Δz_t = c + κ z_{t-1}
        rho = 1.0 + kappa
        if 0.0 < rho < 1.0 and np.isfinite(rho):
            hl = -np.log(2.0) / np.log(rho)
        else:
            hl = np.nan
        return float(hl), float(rho), float(kappa), int(len(df_hl))
    except Exception:
        return np.nan, np.nan, np.nan, int(len(df_hl))

# ========== 3) 逐对计算半衰期并并回 ==========
hl_rows = []

# 仅保留价格表里确实存在的代码
valid_cols = set(prices.columns)
total_pairs = len(coint_df)

for _, row in tqdm(coint_df.iterrows(), total=total_pairs, desc="Half-life"):
    y_code = row['y']
    x_code = row['x']
    alpha  = row.get('alpha', np.nan)
    beta   = row.get('beta', np.nan)

    # 若价格表缺列，跳过
    if y_code not in valid_cols or x_code not in valid_cols:
        hl_rows.append({'y': y_code, 'x': x_code,
                        'hl_days': np.nan, 'rho': np.nan, 'kappa': np.nan, 'n_hl': 0})
        continue

    s = prices_train[[y_code, x_code]].dropna()
    if USE_LOG:
        s = np.log(s)

    # 若 alpha/beta 缺失，稳妥起见再回归一次（但通常你的表里已有）
    if pd.isna(alpha) or pd.isna(beta):
        # 回归 y ~ a + b x
        X = sm.add_constant(s[x_code].values, has_constant='add')
        res = sm.OLS(s[y_code].values, X).fit()
        alpha = float(res.params[0]); beta = float(res.params[1])

    # 用（alpha, beta）重建残差
    z = s[y_code] - (alpha + beta * s[x_code])

    hl, rho, kappa, n_used = compute_halflife_from_resid(z)

    hl_rows.append({
        'y': y_code, 'x': x_code,
        'hl_days': hl,      # 半衰期（交易日数）
        'rho': rho,         # AR(1) 系数
        'kappa': kappa,     # Δz_t = c + κ z_{t-1} 的斜率
        'n_hl': n_used      # 用于计算HL的样本点数
    })

hl_df = pd.DataFrame(hl_rows)

# 并回原表
merged = coint_df.merge(hl_df, on=['y','x'], how='left')

# 常见排序：按行业、p值和半衰期
sort_cols = [c for c in ['industry','coint_pvalue','hl_days','y','x'] if c in merged.columns]
merged = merged.sort_values(sort_cols).reset_index(drop=True)

# 保存
merged.to_csv(OUTPUT_CSV, index=False, encoding='utf-8-sig')
print(f"✅ 已保存：{OUTPUT_CSV}  （共 {len(merged)} 行）")

Half-life: 100%|██████████████████████████████████████████████████████████████████████████████████████████████████████████| 7577/7577 [00:06<00:00, 1129.09it/s]


✅ 已保存：coint_results_with_halflife.csv  （共 7577 行）


In [10]:
# 回测参数: 成本&滑点（单位：bps）
# 交易所手续费
EXCHANGE_BPS_PER_SIDE = 0.3
# 监管费 0.002%/边 ≈ 0.2 bps
REGULATORY_BPS_PER_SIDE = 0.2
# 过户费 0.001%/边 ≈ 0.1 bps
TRANSFER_BPS_PER_SIDE = 0.1
# 券商佣金：机构常用 0.02%/边=2 bps；零售可改 3 bps
BROKER_BPS_PER_SIDE = 2.0

# ——显性费用（除印花税）合计（单边）——
FEES_BPS_PER_SIDE = (EXCHANGE_BPS_PER_SIDE
                     + REGULATORY_BPS_PER_SIDE
                     + TRANSFER_BPS_PER_SIDE
                     + BROKER_BPS_PER_SIDE)

# 印花税（仅卖出侧）：0.05% = 5 bps
STAMP_SELL_BPS = 5.0

# ——滑点（按“回合/roundtrip”口径设置，再折半到单边）——
# CSI500 建议 20–40 bps/回合；这里取 20 bps/回合
SLIPPAGE_ROUNDTRIP_BPS = 20.0
SLIPPAGE_BPS_PER_SIDE = SLIPPAGE_ROUNDTRIP_BPS / 2.0  # 15 bps/边

# 便于阅读：总的“单边成本”（= 显性费用单边 + 滑点单边），不含印花税
TOTAL_BPS_PER_SIDE_EX_STAMP = FEES_BPS_PER_SIDE + SLIPPAGE_BPS_PER_SIDE
print(TOTAL_BPS_PER_SIDE_EX_STAMP)


12.6


In [11]:
# 回测
# ========== 参数 ==========
PRICES_CSV  = "csi800_qfq_close.csv"
COINT_CSV   = "coint_results_with_halflife.csv"

# 数据处理
USE_LOG       = True        # True: 用对数价/对数收益(推荐)；False: 用普通收益
MIN_OBS_ROLL  = 20          # z分数滚动窗口下限
MAX_OBS_ROLL  = 120         # z分数滚动窗口上限

# 配对筛选
PVAL_THRESH   = 0.01        # 协整显著性阈值
HL_MIN, HL_MAX = 5, 40      # 半衰期范围（交易日）
N_OBS_MIN     = 150         # EG估计有效样本（若 coint 表有 n_obs 列才会用到）
MAX_PAIRS     = 30          # 同时持有的配对上限
PER_INDUSTRY_CAP = 5        # 每行业最多几对（若有 industry 列）
ENFORCE_UNIQUE_STOCK = True # 是否“一票一对”（避免重复持股）

# 信号规则（更宽入场、更紧离场，降低噪声触发）
Z_ENTRY       = 2.8         # 2.0 -> 2.8
Z_EXIT        = 0.6         # 0.8 -> 0.6
TIME_STOP_MULT = 1.5
SEED          = 42

# 持有与冷却
MIN_HOLD_DAYS    = 5        # 最少持有天数，避免日内翻来覆去
REENTRY_COOLDOWN = 3        # 平仓后冷却 3 天再允许入场
NO_REBAL_BUFFER  = 0.01     # 再平衡缓冲阈值：单腿 |Δ权重| < 1% 不调仓

# 成本（基点，单边；按券级精确换手 |Δ权重| 计费）
COMM_BPS      = 10          # 手续费/税
SLIP_BPS      = 10          # 滑点
TOTAL_BPS     = COMM_BPS + SLIP_BPS

np.random.seed(SEED)

# ========== 工具函数 ==========
def safe_clip_window(hl, mn=MIN_OBS_ROLL, mx=MAX_OBS_ROLL):
    if not np.isfinite(hl):
        return mx
    return int(np.clip(round(5.0 * hl), mn, mx))  # 3.0 -> 5.0

def zscore_series(y: pd.Series, x: pd.Series, a: float, b: float, hl_days: float):
    """残差 & z 分数；滚动窗 L≈3*hl，夹在 [MIN_OBS_ROLL, MAX_OBS_ROLL]"""
    L = safe_clip_window(hl_days)
    spread = y - (a + b * x)
    m = spread.rolling(L, min_periods=L).mean()
    s = spread.rolling(L, min_periods=L).std(ddof=1)
    z = (spread - m) / s
    return spread, z, L

def pair_leg_weights(beta: float, side: int, per_pair_gross: float):
    """β 中性：y : -βx；side=+1 做多价差（多y空βx），side=-1 做空价差（空y多βx）"""
    k = per_pair_gross / (1.0 + abs(beta))
    wy = side * k
    wx = -side * k * beta
    return wy, wx

def greedy_select_pairs(df, max_pairs, per_industry_cap, enforce_unique_stock):
    """按 score 贪心选对，带行业上限 & 一票一对约束"""
    from collections import defaultdict
    chosen, used = [], set()
    cap_cnt = defaultdict(int)
    for _, r in df.sort_values('score', ascending=False).iterrows():
        y, x = r['y'], r['x']
        ind = r['industry'] if 'industry' in r else 'ALL'
        if enforce_unique_stock and (y in used or x in used):
            continue
        if cap_cnt[ind] >= per_industry_cap:
            continue
        chosen.append(r)
        used.add(y); used.add(x); cap_cnt[ind] += 1
        if len(chosen) >= max_pairs:
            break
    return pd.DataFrame(chosen).reset_index(drop=True)

def compute_yearly_metrics(perf: pd.DataFrame, years=(2023, 2024, 2025)):
    """
    基于每日“算术净收益” perf['ret'] 统计年度指标。
    - period_return：当年几何累计收益
    - annualized_return：几何年化（默认）；如需算术年化见注释
    - annualized_vol：按 252 年化
    - sharpe：年化收益 / 年化波动
    - 其余同前
    """
    out = []
    ann_factor = 252
    for y in years:
        sub = perf.loc[perf.index.year == y].copy()
        if sub.empty:
            continue

        ndays = sub.shape[0]
        # 当年几何累计
        period_return = (1.0 + sub['ret']).prod() - 1.0

        # ---- 选一：几何年化（推荐）----
        annualized_return = (1.0 + period_return)**(ann_factor / ndays) - 1.0

        # # ---- 选二：算术年化（与全周期做法一致）----
        # annualized_return = sub['ret'].mean() * ann_factor

        # 年化波动 & 夏普
        annualized_vol = sub['ret'].std(ddof=1) * np.sqrt(ann_factor)
        sharpe = annualized_return / annualized_vol if annualized_vol > 0 else np.nan

        # 当年内最大回撤（当年起点净值重置为1）
        nav_year = (1.0 + sub['ret']).cumprod()
        max_drawdown = (nav_year / nav_year.cummax() - 1.0).min()

        out.append({
            'year': y,
            'trading_days': int(ndays),
            'period_return': period_return,
            'annualized_return': annualized_return,
            'annualized_vol': annualized_vol,
            'sharpe': sharpe,
            'max_drawdown': max_drawdown,
            'win_rate': float((sub['ret'] > 0).mean()),
            'avg_active_pairs': float(sub['n_active'].mean()),
            'avg_turn_bps': float(sub['turn_bps'].mean()),
        })
    return pd.DataFrame(out)


# ========== 读数据 & 预处理 ==========
prices_all = (pd.read_csv(PRICES_CSV, parse_dates=['日期'])
                .sort_values('日期').set_index('日期'))
if USE_LOG:
    prices_all = np.log(prices_all)

# 测试期（仅用于回测）
TEST_START, TEST_END = "2024-01-01", "2025-09-30"
prices = prices_all.loc[TEST_START:TEST_END].copy()

pairs = pd.read_csv(COINT_CSV)
need = {'y','x','alpha','beta','coint_pvalue','hl_days'}
if not need.issubset(pairs.columns):
    raise SystemExit(f"协整表缺列：{need - set(pairs.columns)}")

pairs = pairs.dropna(subset=['y','x','alpha','beta','coint_pvalue','hl_days']).copy()
pairs = pairs[(pairs['coint_pvalue'] < PVAL_THRESH) &
              (pairs['hl_days'] >= HL_MIN) &
              (pairs['hl_days'] <= HL_MAX)].copy()
if 'n_obs' in pairs.columns:
    pairs = pairs[pairs['n_obs'] >= N_OBS_MIN].copy()

# 保证代码在价格表里
valid_cols = set(prices.columns)
pairs = pairs[pairs['y'].isin(valid_cols) & pairs['x'].isin(valid_cols)].copy()
if pairs.empty:
    raise SystemExit("筛选后配对为空，请放宽阈值或检查列名。")

# 打分：p 越小、hl 越短越好
pairs['score'] = -np.log10(pairs['coint_pvalue']) / (pairs['hl_days'] + 1.0)
# 行业限额（若有）
if 'industry' in pairs.columns:
    pairs = (pairs.sort_values(['industry','score'], ascending=[True, False])
                  .groupby('industry').head(PER_INDUSTRY_CAP).reset_index(drop=True))
# 贪心选对 & 输出
pairs = greedy_select_pairs(pairs, MAX_PAIRS, PER_INDUSTRY_CAP, ENFORCE_UNIQUE_STOCK)
pairs.to_csv("selected_pairs.csv", index=False, encoding="utf-8-sig")
print(f"✅ 已保存：selected_pairs.csv（{len(pairs)} 对）")

# ========== 预计算 z 分数 ==========
dates = prices.index
zz = {}  # (y,x) -> dict(z=Series, time_stop=int, beta=float)
for _, r in pairs.iterrows():
    y, x, a, b, hl = r['y'], r['x'], float(r['alpha']), float(r['beta']), float(r['hl_days'])
    spread, z, L = zscore_series(prices[y], prices[x], a, b, hl)
    zz[(y, x)] = dict(z=z.reindex(dates), time_stop=int(ceil(max(hl, 1.0) * TIME_STOP_MULT)), beta=b)

# ========== 定义收益序列 ==========
if USE_LOG:
    # 对数收益：log(P_t) - log(P_{t-1})
    base_ret = prices.diff().fillna(0.0)
else:
    # 普通收益：P_t / P_{t-1} - 1
    base_ret = prices.pct_change().fillna(0.0)

# ========== 回测主循环（T-1 信号，T 执行；精确换手+成本；β中性） ==========
active = {}      # {(y,x): {'side': +1/-1, 'days': int}}
pos_prev = {}    # 上日配对级持仓 {(y,x): (wy, wx)}
records = []
cooldown_until = {}  # {(y,x): 允许再次入场的最早t索引}

for t in range(1, len(dates)):  # 从第二天起（有 T-1）
    dt_prev, dt = dates[t-1], dates[t]

    # 1) 已有持仓：离场检查（z < z_exit 或 时间止损）
    to_close = []
    for k, st in active.items():
        z_yday = zz[k]['z'].get(dt_prev, np.nan)
        st['days'] += 1
        can_close = (st['days'] >= MIN_HOLD_DAYS) and np.isfinite(z_yday) and (abs(z_yday) < Z_EXIT)
        time_stop = (st['days'] >= zz[k]['time_stop'])
        if can_close or time_stop:
            to_close.append(k)
    for k in to_close:
        del active[k]
        # 退出后设置冷却，不允许立刻反向再入
        cooldown_until[k] = t + REENTRY_COOLDOWN
    

    # 2) 新入场（仅未持仓）
    for k, info in zz.items():
        if k in active:
            continue
        if (k in cooldown_until) and (t < cooldown_until[k]):
            continue  # 冷却中，跳过
        z_yday = info['z'].get(dt_prev, np.nan)
        if not np.isfinite(z_yday):
            continue
        if z_yday > Z_ENTRY:
            active[k] = dict(side=-1, days=0)
        elif z_yday < -Z_ENTRY:
            active[k] = dict(side=+1, days=0)

    # 3) 资金分配：等权到每个激活配对（总名义=1）
    n_active = len(active)
    if n_active == 0:
        records.append(dict(date=dt, n_active=0, turn_bps=0.0, ret=0.0, nav=np.nan))
        pos_prev = {}
        continue
    per_pair_gross = 1.0 / n_active

    # 4) 生成今日目标仓位（配对级→券级）
    new_pos_pairs = {}
    w_new = defaultdict(float)
    for k, st in active.items():
        y, x = k
        beta = zz[k]['beta']
        wy, wx = pair_leg_weights(beta, st['side'], per_pair_gross)
    
        # ——No-trade buffer：单腿 |Δ权重| < NO_REBAL_BUFFER 则不调——
        wy_prev, wx_prev = pos_prev.get(k, (0.0, 0.0))
        if abs(wy - wy_prev) < NO_REBAL_BUFFER:
            wy = wy_prev
        if abs(wx - wx_prev) < NO_REBAL_BUFFER:
            wx = wx_prev
    
        new_pos_pairs[k] = (wy, wx)
        w_new[y] += wy
        w_new[x] += wx
    
        w_old = defaultdict(float)
        for (y, x), (wy, wx) in (pos_prev or {}).items():
            w_old[y] += wy
            w_old[x] += wx

    # 5) 当日毛收益
    day_ret_raw = 0.0
    for code, w in w_new.items():
        r = base_ret.at[dt, code] if code in base_ret.columns else 0.0
        day_ret_raw += w * r

    # 6) 成本（精确换手）
    # ——计算当日精确换手：按券级权重变化Δw（买为正、卖为负）——
    buy_turn = 0.0
    sell_turn = 0.0

    all_codes = set(w_new) | set(w_old)
    for code in all_codes:
        delta = w_new.get(code, 0.0) - w_old.get(code, 0.0)
        if delta > 0:
            buy_turn += delta      # 买入名义
        elif delta < 0:
            sell_turn += (-delta)  # 卖出名义

    # ——成本构成——
    # 1) 显性费用（不含印花税）按单边计：买 + 卖 都要交
    fees_cost = (TOTAL_BPS_PER_SIDE_EX_STAMP / 1e4) * (buy_turn + sell_turn)

    # 2) 卖出印花税：仅对“卖出名义”收取
    stamp_cost = (STAMP_SELL_BPS / 1e4) * sell_turn

    # 3) 合计成本（单位与收益同维度，为“算术收益”）
    cost = fees_cost + stamp_cost

    # 7) 将对数毛收益转为算术净收益（若 USE_LOG=True）
    if USE_LOG:
        gross_arith = np.exp(day_ret_raw) - 1.0
        net_ret = gross_arith - cost
    else:
        net_ret = day_ret_raw - cost

    turn = buy_turn + sell_turn
    records.append(dict(
        date=dt,
        n_active=n_active,
        buy_turn_bps=buy_turn * 1e4,
        sell_turn_bps=sell_turn * 1e4,
        turn_bps=turn * 1e4,                # 兼容旧列
        fees_cost_bps=fees_cost * 1e4,      # 手续费/佣金/监管/过户（双边）
        stamp_cost_bps=stamp_cost * 1e4,    # 卖出印花税
        total_cost_bps=(fees_cost + stamp_cost) * 1e4,
        ret=net_ret
    ))
    pos_prev = new_pos_pairs

# ========== 汇总与导出 ==========
perf = pd.DataFrame(records).set_index('date')
perf['ret'] = perf['ret'].fillna(0.0)
perf['nav'] = (1.0 + perf['ret']).cumprod()
perf[['n_active','turn_bps','ret','nav']].to_csv("portfolio_daily.csv", encoding="utf-8-sig")
print("✅ 已保存：portfolio_daily.csv")

# 全周期摘要（仅供参考）
ann_factor = 252
ann_ret = perf['ret'].mean() * ann_factor
ann_vol = perf['ret'].std(ddof=1) * np.sqrt(ann_factor)
sharpe = ann_ret / ann_vol if ann_vol > 0 else np.nan
max_dd = (perf['nav'] / perf['nav'].cummax() - 1.0).min()
print("\n======= 全周期摘要 =======")
print(f"年化收益: {ann_ret:.2%}")
print(f"年化波动: {ann_vol:.2%}")
print(f"夏普比  : {sharpe:.2f}")
print(f"最大回撤: {max_dd:.2%}")
print(f"平均持仓对数/日: {perf['n_active'].mean():.1f}")

# ========== 年度分段指标（2023/2024/2025 YTD） ==========
years = (2023, 2024, 2025)
yearly = compute_yearly_metrics(perf, years)
yearly.to_csv("yearly_metrics.csv", index=False, encoding="utf-8-sig")
print("\n======= 年度分段（2023 / 2024 / 2025YTD） =======")
if not yearly.empty:
    for _, r in yearly.iterrows():
        print(f"\n{int(r['year'])}: 交易日={int(r['trading_days'])}")
        print(f"累计收益: {r['period_return']:.2%}")
        print(f"年化收益: {r['annualized_return']:.2%}")
        print(f"年化波动: {r['annualized_vol']:.2%}")
        print(f"夏普比  : {r['sharpe']:.2f}")
        print(f"最大回撤: {r['max_drawdown']:.2%}")
        print(f"胜率    : {r['win_rate']:.2%}")
        print(f"平均持仓对数: {r['avg_active_pairs']:.1f}")
        print(f"平均换手  : {r['avg_turn_bps']:.1f} bps")
else:
    print("（没有匹配到相应年份的数据）")
print("✅ 已保存：yearly_metrics.csv")

✅ 已保存：selected_pairs.csv（30 对）
✅ 已保存：portfolio_daily.csv

年化收益: 15.65%
年化波动: 19.36%
夏普比  : 0.81
最大回撤: -19.53%
平均持仓对数/日: 2.8


2024: 交易日=241
累计收益: 16.86%
年化收益: 17.69%
年化波动: 18.17%
夏普比  : 0.97
最大回撤: -10.41%
胜率    : 36.93%
平均持仓对数: 2.6
平均换手  : 1680.2 bps

2025: 交易日=183
累计收益: 7.89%
年化收益: 11.02%
年化波动: 20.88%
夏普比  : 0.53
最大回撤: -19.53%
胜率    : 45.90%
平均持仓对数: 3.2
平均换手  : 2046.5 bps
✅ 已保存：yearly_metrics.csv
