In [77]:
import pandas as pd
import numpy as np
from pathlib import Path


# 0. 配置与初始化

In [78]:
root_path = Path('../')

price_path = root_path / 'data' / 'price.h5'  # 宽表：date + 各资产
turnover_path = root_path / 'data' / 'daily_turnover.pkl'  # 长表：order_book_id / tradedate / today
volume_path = root_path / 'data' / 'daily_volume.pkl'  # 长表：order_book_id / date / volume
csi500_path = root_path / 'data' / 'csi500_mask_monthly.pkl'
dates_path = root_path / 'data' / 'stock_calendar_2008_2024.csv'
blacklist_path = root_path / 'data' / 'BLACKLIST.pkl'
untradable_path = root_path / 'data' / 'UNTRADABLE.pkl'


# 时间窗口设置
LOOKBACK = 130  # 2.5 年 (130周)
# 注意：CGO 需要 2.5 年 (130周) 的回溯窗口。
# 为了在 2014-01-01 开始回测时有数据，我们需要从 2009-01-01 开始读取数据。

start = pd.to_datetime('2008-01-01')
backtest = pd.to_datetime('2014-01-01')
end = pd.to_datetime('2024-01-01')
idx = pd.IndexSlice

# for test
# start = pd.to_datetime('2023-06-01')
# backtest = pd.to_datetime('2023-10-01')


# 1. 确定股票池 (Universe)

In [79]:
# 1.1 multi-index
mask = pd.read_pickle(csi500_path)
mask.columns = [col.rsplit('.', 1)[0] for col in mask.columns]  ## 格式化列名（去除后缀）
mask_slice = mask.loc[backtest:end] # 选取时间区间
# 筛选股票：只要在回测区间内曾经入选过 CSI500 的股票，全部纳入计算
tickers = mask_slice.columns[mask_slice.any(axis=0)].tolist()
tickers = sorted(list(set(tickers)))
print(len(tickers))

# 构建标准交易日历 (Trading Calendar)
dates = pd.read_csv(dates_path)
dates['date'] = pd.to_datetime(dates['date'])
dates = dates[(dates['date'] >= start) & (dates['date'] <= end)]
# 构建标准的 MultiIndex (Date x Asset) 用于后续对齐数据
multi_idx = pd.MultiIndex.from_product(
    [dates['date'], tickers],
    names=['date', 'asset']
)

1261


In [80]:
untradable = pd.read_pickle(untradable_path).reset_index()
blacklist = pd.read_pickle(blacklist_path).reset_index()

combined_black_untrad = pd.concat([blacklist, untradable], ignore_index=True)

# 2. 按 (date, asset) 去重（保留唯一组合）
black = combined_black_untrad.drop_duplicates(subset=['date', 'asset']).reset_index(drop=True)

# asset列只保留前6位
black['asset'] = black['asset'].str[:6]
black = black[(black['date'] >= start) & (black['date'] < end)].reset_index(drop=True)

# 2. 读取并清洗日度数据

In [81]:
# 2.1 读取价格 (收盘价)
price_short = pd.read_hdf(price_path).reset_index()
price = price_short.melt(id_vars='date', var_name='asset', value_name='price')
price['asset'] = price['asset'].str.split('.').str[0]
price = price.set_index(['date', 'asset'])
price = price.sort_index(level=[0, 1])
price = price.loc[idx[start:end, ], :]

In [82]:
# 2.2 读取换手率
turnover = pd.read_pickle(turnover_path).reset_index().rename(columns={'tradedate': 'date', 'order_book_id': 'asset', 'today': 'turnover'})
turnover['asset'] = turnover['asset'].str.split('.').str[0]
turnover = turnover.set_index(['date', 'asset'])
turnover = turnover.sort_index(level=[0, 1])
turnover = turnover.loc[idx[start:end, ], :]
turnover['turnover'] = turnover['turnover'] / 100  # 百分比转小数

In [83]:
# 2.3 读取成交量
volume = pd.read_pickle(volume_path).reset_index().rename(columns={'date': 'date', 'order_book_id': 'asset', 'volume': 'volume'})
volume['asset'] = volume['asset'].str.split('.').str[0]
volume = volume.set_index(['date', 'asset'])
volume = volume.sort_index(level=[0, 1])
volume = volume.loc[idx[start:end, ], :]

In [84]:
# 2.4 合并数据
combined = pd.concat([price, turnover, volume], axis=1)
combined = combined.reindex(multi_idx)
combined.head()

Unnamed: 0_level_0,Unnamed: 1_level_0,price,turnover,volume
date,asset,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1
2008-01-02,6,145.56726,0.016013,3500865.0
2008-01-02,8,31.72596,0.029684,1535118.0
2008-01-02,9,45.57838,0.015371,8910692.0
2008-01-02,12,186.354,0.012009,4640052.0
2008-01-02,21,142.41744,0.028263,10391764.0


In [85]:
# 2.5
# 1. 计算每只股票在“原始数据”中的最后交易日期 (Life End Date)
valid_end_dates = (
    price.reset_index()
         .dropna(subset=['price'])
         .groupby('asset')['date'].max()
)

# 2. 正常执行价格填充 (处理停牌)
combined['price'] = combined['price'].groupby(level='asset').ffill()

# 3. 关键步骤：清洗退市后的“僵尸价格”
# 逻辑：如果 当前日期 > 该股票的最后真实交易日期，则 Price 应为 NaN

# 为了通过向量化快速处理，我们将 end_date 映射回 combined
combined = combined.reset_index()
combined['valid_end_date'] = combined['asset'].map(valid_end_dates)

# 找到那些被 ffill 错误填充的退市后数据
zombie_mask = combined['date'] > combined['valid_end_date']

# 强制设为 NaN
combined.loc[zombie_mask, 'price'] = np.nan

# 4. 清理辅助列并恢复索引
combined = combined.drop(columns=['valid_end_date'])
combined = combined.set_index(['date', 'asset'])

# 5. 应用黑名单 (Blacklist / Untradable)

# 将 black DataFrame 转换为 MultiIndex，以便与 black_untrad 对齐
# 注意：black 在 Cell 6 中已经生成，包含 columns: ['date', 'asset']
black_idx = pd.MultiIndex.from_frame(black[['date', 'asset']])
# 取交集：确保只处理 black_untrad 中实际存在的索引，避免报错
target_idx = combined.index.intersection(black_idx)
# 将黑名单中对应日期的价格设为 NaN
combined.loc[target_idx, 'price'] = np.nan

# 6. 再次确保 Volume/Turnover 在 Price 为 NaN 时也是 NaN
combined.loc[combined['price'].isna(), ['turnover', 'volume']] = np.nan

# 3. 日频转周频 (Resampling)

In [86]:
# 3.1 推导每日流通股本 (Shares Outstanding
turnover_safe = combined['turnover'].replace(0, np.nan)
combined['shares_outstanding'] = combined['volume'] / turnover_safe
# 关键：对于停牌日，流通股本是存在的，应沿用前值。
# 这解决了“周五停牌导致无法计算当周流通股本”的问题。
combined['shares_outstanding'] = combined['shares_outstanding'].groupby('asset').ffill()
combined.head()

Unnamed: 0_level_0,Unnamed: 1_level_0,price,turnover,volume,shares_outstanding
date,asset,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1
2008-01-02,6,145.56726,0.016013,3500865.0,218626400.0
2008-01-02,8,,,,
2008-01-02,9,45.57838,0.015371,8910692.0,579708000.0
2008-01-02,12,186.354,0.012009,4640052.0,386381200.0
2008-01-02,21,142.41744,0.028263,10391764.0,367680900.0


In [87]:
# 3.2 通用的周频聚合函数
def to_weekly(df, value_col, how):
    """
    将日频数据转成周频。
    不仅聚合数值，还自动将索引修正为该周实际的最后一个交易日（解决周五是节假日的问题）。
    """
    # 暂时重置索引，因为我们需要对 'date' 列本身进行操作
    df_temp = df.reset_index()

    # 定义聚合字典
    # 1. 对目标数据列进行 sum 或 last 操作
    agg_dict = {value_col: how}
    # 2. 关键步骤：同时找出这一周内 'date' 的最大值（即实际的最后交易日）
    agg_dict['date'] = 'max'

    # 使用 pd.Grouper 替代 resample，效果一样但更灵活
    # key='date' 指定按日期列分组，freq='W-FRI' 指定周五为结束
    weekly = (
        df_temp
        .groupby(['asset', pd.Grouper(key='date', freq='W-FRI')])
        .agg(agg_dict)
    )

    # 此时 weekly 的索引是 MultiIndex: (asset, resample_generated_friday)
    # 此时 weekly 有两列: [value_col, 'date'(真实日期)]

    # 我们丢弃 resample 生成的那个可能是节假日的索引
    weekly = weekly.droplevel('date')

    # 将聚合出来的“真实日期”设为索引
    weekly = weekly.set_index('date', append=True)

    # 排序以保持整洁
    weekly = weekly.sort_index(level=[0, 1])

    return weekly

# 3.3 构造周度换手率 = 周成交量之和 / 周末最后一日流通股数
weekly_volume = to_weekly(combined[['volume']], 'volume', 'sum')
combined['shares_outstanding'] = combined['shares_outstanding'].groupby('asset').ffill()
weekly_shares = to_weekly(combined[['shares_outstanding']], 'shares_outstanding', 'last')
weekly_price = to_weekly(combined[['price']], 'price', 'last')
# 3.4 计算周度换手率
combined = pd.concat([weekly_price, weekly_volume, weekly_shares], axis=1)
combined['turnover'] = combined['volume'] / combined['shares_outstanding']
combined = combined.reorder_levels(['date', 'asset']).sort_index(level=[0, 1])
combined = combined[['price', 'turnover']]
combined.head()

Unnamed: 0_level_0,Unnamed: 1_level_0,price,turnover
date,asset,Unnamed: 2_level_1,Unnamed: 3_level_1
2008-01-04,6,151.6212,0.061551
2008-01-04,8,,
2008-01-04,9,44.92196,0.04408
2008-01-04,12,209.65896,0.09046
2008-01-04,21,143.13672,0.063998


# 4. 计算 CGO (Capital Gains Overhang)

In [88]:
def compute_ref_price(group, lookback=LOOKBACK):
    # 1. 数据准备
    price_arr = group['price'].to_numpy()

    # 将 NaN 的换手率填充为 0，视为无交易，不影响筹码分布
    V = np.nan_to_num(group['turnover'].to_numpy(), nan=0.0)
    V = np.clip(V, 0.0, 0.999999)

    ref_price = np.full(len(group), np.nan)

    # -----------------------------------------------------------
    # 修改点 1: 循环从第 1 个点开始 (t=0 时没有历史，无法计算成本，保持 NaN)
    # -----------------------------------------------------------
    for t in range(1, len(group)):

        # -------------------------------------------------------
        # 修改点 2: 动态确定窗口起点
        # 如果 t < lookback，起点就是 0 (使用所有历史数据)
        # 如果 t >= lookback，起点就是 t - lookback (维持滚动窗口)
        # -------------------------------------------------------
        start_idx = max(0, t - lookback)

        # 切片：取 [start_idx, t) 的数据，并倒序（离 t 最近的排前面）
        v_window = V[start_idx:t][::-1]
        p_window = price_arr[start_idx:t][::-1]

        surv = 1.0   # 初始生存率
        num = 0.0    # 分子 (加权成本)
        denom = 0.0  # 分母 (总权重)

        for v_i, p_i in zip(v_window, p_window):
            # 处理逻辑与原版一致：
            # 1. 价格缺失但有量 -> 脏数据，跳过但衰减生存率
            # 2. 价格缺失且无量 -> 停牌，完全跳过，不衰减
            if np.isnan(p_i):
                if v_i > 0:
                    surv *= (1.0 - v_i)
                continue

            # 正常计算
            weight = v_i * surv
            num += weight * p_i
            denom += weight

            # 更新生存率
            surv *= (1.0 - v_i)

            # 加速退出：如果生存率极低，更早的历史忽略不计
            if surv < 0.01:
                break

        # -------------------------------------------------------
        # 只要有有效的历史成交 (denom > 0)，就计算 Ref Price
        # 即使只有 1 周的历史，也可以计算
        # -------------------------------------------------------
        if denom > 1e-10:
            ref_price[t] = num / denom

    # 赋值回 DataFrame
    group = group.copy()
    group['ref_price'] = ref_price

    # 计算 CGO
    # 注意：如果中间某天 price 是 NaN，那么当天的 CGO 依然会是 NaN，这是合理的
    group['price_lag1'] = group['price'].shift(1)
    group['cgo'] = (group['price_lag1'] - group['ref_price']) / group['price_lag1']

    return group

weekly_cgo = (
    combined
    .groupby('asset', group_keys=False)
    .apply(compute_ref_price)
    .drop(columns=['price_lag1'])
)

# 5. 结果截取与检查

In [89]:
weekly_cgo = weekly_cgo.loc[idx[backtest:end, :], :]
weekly_cgo.to_pickle(root_path / 'data' / 'weekly_cgo.pkl')

In [90]:
weekly_cgo

Unnamed: 0_level_0,Unnamed: 1_level_0,price,turnover,ref_price,cgo
date,asset,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1
2014-01-03,000006,145.58467,0.024450,154.269609,-0.024984
2014-01-03,000008,45.32280,0.025881,49.163013,-0.082312
2014-01-03,000009,40.92795,0.045670,44.327245,-0.076000
2014-01-03,000012,127.44582,0.035269,143.620415,-0.128320
2014-01-03,000021,71.39440,0.016725,73.950424,-0.055644
...,...,...,...,...,...
2023-12-29,688777,66.80055,0.053619,73.908569,-0.090773
2023-12-29,688778,56.24916,0.049870,64.883019,-0.201379
2023-12-29,688779,7.49710,0.076873,10.312308,-0.426306
2023-12-29,688819,29.31600,0.074604,34.554888,-0.182941


In [91]:
weekly_cgo.isna().sum()

price        118079
turnover      78776
ref_price     84552
cgo          118323
dtype: int64

# 6 月频

In [93]:
# ===== 6 月频（由周频聚合）=====
wk = (
    weekly_cgo
    .reset_index()
    .sort_values(['asset', 'date'])   # 关键：保证“最后一条”就是当月最后交易周
)
wk['month'] = wk['date'].dt.to_period('M')

# 1) 每月“最后交易周”的截面：保留 date/price/ref_price/cgo 口径（取最后一周）
monthly_last = (
    wk.groupby(['asset', 'month'], as_index=False)
      .tail(1)
)

# 2) turnover：按你的要求，把当月所有 weekly_turnover 直接相加
#    用 min_count=1：整月全缺失 -> 结果保持 NaN（不会变成 0）
monthly_turnover_sum = (
    wk.groupby(['asset', 'month'])['turnover']
      .sum(min_count=1)
      .rename('turnover')
      .reset_index()
)

# 3) 合并：用“最后交易周”的 price/ref_price/cgo + “月内周换手率之和”的 turnover
monthly_cgo = (
    monthly_last
      .drop(columns=['turnover'])  # 去掉“最后一周 turnover”
      .merge(monthly_turnover_sum, on=['asset', 'month'], how='left')
      .sort_values(['date', 'asset'])
      .reset_index(drop=True)
)

monthly_cgo

Unnamed: 0,date,asset,price,ref_price,cgo,month,turnover
0,2014-01-30,000006,134.50423,151.621274,-0.082665,2014-01,0.185758
1,2014-01-30,000008,43.39848,48.299206,-0.100087,2014-01,0.161736
2,2014-01-30,000009,41.91201,43.672486,-0.055522,2014-01,0.213278
3,2014-01-30,000012,126.17454,137.959281,-0.079801,2014-01,0.327042
4,2014-01-30,000021,89.51140,74.851397,0.082632,2014-01,0.226100
...,...,...,...,...,...,...,...
151315,2023-12-29,688777,66.80055,73.908569,-0.090773,2023-12,0.310317
151316,2023-12-29,688778,56.24916,64.883019,-0.201379,2023-12,0.183250
151317,2023-12-29,688779,7.49710,10.312308,-0.426306,2023-12,0.392223
151318,2023-12-29,688819,29.31600,34.554888,-0.182941,2023-12,0.358491


In [94]:
monthly_cgo = monthly_cgo[monthly_cgo['date'] >= backtest]
monthly_cgo.isna().sum()

date             0
asset            0
price        27426
ref_price    19650
cgo          27486
month            0
turnover     18286
dtype: int64

In [95]:
monthly_cgo.to_csv(root_path / 'data' / 'monthly_cgo.csv')

# 7 test

In [96]:
masked1 = (
    mask.loc[backtest:end]
        .stack(future_stack=True)   # 解决 FutureWarning（pandas 2.1+）
        .rename('in_csi500')
        .rename_axis(index=['date', 'asset'])
        .reset_index()
)

# 用 month 作为对齐键（因为 mask 的 date 往往是月末自然日，而你的月频是月末交易日）
masked1['month'] = masked1['date'].dt.to_period('M')

masked1.head()

Unnamed: 0,date,asset,in_csi500,month
0,2014-01-31,5,False,2014-01
1,2014-01-31,6,True,2014-01
2,2014-01-31,8,False,2014-01
3,2014-01-31,9,False,2014-01
4,2014-01-31,12,False,2014-01


In [97]:
monthly_cgo_true = (
    monthly_cgo
        .merge(
            masked1[['asset', 'month', 'in_csi500']],
            on=['asset', 'month'],
            how='left'
        )
        .loc[lambda df: df['in_csi500'].eq(True)]  # 只保留 CSI500 成分
        .drop(columns=['in_csi500'])
        .reset_index(drop=True)
)
monthly_cgo_true.head()

Unnamed: 0,date,asset,price,ref_price,cgo,month,turnover
0,2014-01-30,6,134.50423,151.621274,-0.082665,2014-01,0.185758
1,2014-01-30,21,89.5114,74.851397,0.082632,2014-01,0.2261
2,2014-01-30,28,201.94872,141.584955,0.293832,2014-01,0.137941
3,2014-01-30,30,12.30939,13.128609,-0.066552,2014-01,0.276862
4,2014-01-30,31,48.29065,54.824986,-0.148251,2014-01,0.047347


In [98]:
# 不要 groupby('date')，因为 date 是“当月最后一周的周度日期”，每个月都不一样
number_by_month = monthly_cgo_true.groupby('month')['asset'].count()
print(number_by_month.head(12))
print(number_by_month.value_counts().head())

month
2014-01    500
2014-02    500
2014-03    500
2014-04    500
2014-05    500
2014-06    500
2014-07    500
2014-08    500
2014-09    500
2014-10    500
2014-11    500
2014-12    500
Freq: M, Name: asset, dtype: int64
asset
500    120
Name: count, dtype: int64
