In [1]:
import numpy as np
import pandas as pd
import scipy.stats as stats
import scipy
from datetime import datetime
import statsmodels.formula.api as smf
from linearmodels import FamaMacBeth # 用于面板回归
from pandas.tseries.offsets import MonthEnd # 用于处理月末日期

# 绘图相关库
from matplotlib import style
import matplotlib.pyplot as plt
import matplotlib.dates as mdates
from matplotlib.font_manager import FontProperties
from pylab import mpl
import platform

# --- 绘图配置 ---
# 根据操作系统自动设置中文字体，避免乱码
system = platform.system()
if system == 'Windows':
    plt.rcParams['font.sans-serif'] = ['SimHei']
    plt.rcParams['axes.unicode_minus'] = False
elif system == 'Darwin':  # macOS
    plt.rcParams['font.sans-serif'] = ['Arial Unicode MS']
    plt.rcParams['axes.unicode_minus'] = False
else:  # Linux
    plt.rcParams['font.sans-serif'] = ['WenQuanYi Micro Hei']
    plt.rcParams['axes.unicode_minus'] = False

# 设置输出矢量图，使图表更清晰
%matplotlib inline
%config InlineBackend.figure_format = 'svg'

# 设置pandas显示选项，显示所有列
pd.set_option('display.max_columns', None)

print(f"当前操作系统: {system}")

当前操作系统: Windows


In [None]:
#1. 读取个股交易数据
cross = pd.read_csv('E:\BaiduNetdiskDownload/TRD_Mnth202509.csv')

# 将交易月份转换为月末日期格式 (e.g., 1991-04 -> 1991-04-30)
cross['month'] = pd.to_datetime(cross['Trdmnt'], format='%Y-%m') + MonthEnd(1)

# 补齐股票代码至6位 (例如 1 -> 000001)
cross['Stkcd'] = cross['Stkcd'].apply(lambda x: '{:0>6}'.format(x))

# 重命名列以符合后续逻辑 (Mretwd: 考虑现金红利再投资的月回报率)
cross.rename(columns={'Mretwd': 'Return', 'Msmvosd': 'floatingvalue', 'Msmvttl': 'totalvalue'}, inplace=True)

# 2. 读取无风险利率和市场收益率数据
rf_data = pd.read_csv('E:\BaiduNetdiskDownload/Marketret_mon_stock2024.csv')
rf_data['month'] = pd.to_datetime(rf_data['month'], format='%b %Y') + MonthEnd(1)

# 3. 数据合并
# 将无风险利率合并到个股数据中
cross = pd.merge(cross, rf_data[['month', 'rfmonth']], on='month', how='left')

# 4. 计算衍生变量
cross = cross.sort_values(by=['Stkcd', 'month'])
# 计算上市时间（上市后的第几个月）
cross['list_month'] = cross.groupby('Stkcd').cumcount() + 1
# 计算超额收益率 (个股收益 - 无风险利率)
cross['ret'] = cross['Return'] - cross['rfmonth']
# 调整市值单位 (原数据可能为千元，这里统一单位)
cross['floatingvalue'] = cross['floatingvalue'] * 1000
cross['totalvalue'] = cross['totalvalue'] * 1000

# 5. 生成下期收益率 (Next Return) - 这是预测回归的关键变量
# 方法：构建完整的股票-月份索引，确保shift操作时日期的连续性
all_months = pd.DataFrame(cross['month'].unique(), columns=['month'])
all_stocks = pd.DataFrame(cross['Stkcd'].unique(), columns=['Stkcd'])
full_index = all_stocks.merge(all_months, how='cross') # 笛卡尔积

# 合并原始数据
cross_full = full_index.merge(cross, on=['Stkcd', 'month'], how='left')
cross_full = cross_full.sort_values(['Stkcd', 'month'])

# 获取下个月的超额收益率作为标签 (Target)
cross_full['next_ret'] = cross_full.groupby('Stkcd')['ret'].shift(-1)

# 还原回原始数据结构，只保留有数据的行
cross = cross.merge(cross_full[['Stkcd', 'month', 'next_ret']], on=['Stkcd', 'month'], how='right')

# 6. 计算过去12个月的累计交易天数 (用于筛选停牌过多的股票)
cross['Cumsum_tradingday'] = cross.groupby('Stkcd')['Ndaytrd'].transform(
    lambda x: x.rolling(window=12, min_periods=1).sum()
)

# 7. 数据筛选 (Filtering)
# 时间范围筛选
cross = cross[(cross['month'] >= '1995-01-31') & (cross['month'] <= '2024-12-31')]
# 交易活跃度筛选：月交易日>=7，月收盘价>=5元(剔除低价股/垃圾股)
cross = cross[cross['Ndaytrd'] >= 7]
cross = cross[cross['Clsdt'] >= 5]
# 上市时间筛选：上市超过6个月 (剔除新股IPO效应)
cross = cross[cross['list_month'] > 6]
# 停牌筛选：过去一年累计交易日 >= 100天
cross = cross[cross['Cumsum_tradingday'] >= 100]
# 市场类型筛选：保留主板/中小板/创业板等主要板块 (1, 4, 16)
cross = cross[(cross['Markettype'] == 1) | (cross['Markettype'] == 4) | (cross['Markettype'] == 16)]

# 8. 合并市场整体收益率 (MKT)
Market_ret = pd.read_csv('E:\BaiduNetdiskDownload/Marketret_mon_stock2024.csv')
Market_ret['month'] = pd.to_datetime(Market_ret['month'], format='%b %Y') + MonthEnd(0)
Market_ret.rename(columns={'ret': 'MKT'}, inplace=True)
cross = pd.merge(cross, Market_ret[['month', 'MKT']], on='month', how='left')

print("数据预处理完成。")

In [None]:
#导入EP (盈利收益率) 数据 
# 注意：EP 是 PE (市盈率) 的倒数。高EP = 低PE = 价值股；低EP = 高PE = 成长股。
EP = pd.read_csv('E:\BaiduNetdiskDownload/EP_individual_mon2024.csv')
EP['Stkcd'] = EP['Stkcd'].apply(lambda x: '{:0>6}'.format(x))

# 处理EP数据中的特殊月份格式 (例如 1991.25 -> 1991年3月)
EP['year'] = EP['month'].astype(int)
EP['month_decimal'] = EP['month'] - EP['year']
# 逻辑: (month-1)/12 = decimal => month = decimal*12 + 1
EP['month_num'] = (EP['month_decimal'] * 12).round().astype(int) + 1

# 处理进位造成的跨年问题
EP.loc[EP['month_num'] > 12, 'year'] += 1
EP.loc[EP['month_num'] > 12, 'month_num'] -= 12

EP['month'] = pd.to_datetime(EP['year'].astype(str) + '-' + EP['month_num'].astype(str) + '-01') + MonthEnd(1)
EP = EP[['Stkcd', 'month', 'ep', 'ep_recent']]

# 将EP因子合并入主表
cross = pd.merge(cross, EP, on=['Stkcd', 'month'], how='left')

# --- 进一步的数据清洗 (针对改进后的策略) ---
# 引入市值分位数筛选：剔除小市值股票（总市值 < 当月30%分位数）
# 这是为了消除小市值股票流动性差对策略的影响
fenweishu_guimo = pd.DataFrame(cross.groupby(['month'])['totalvalue'].quantile(0.3))
fenweishu_guimo.columns = ['fenweishu_guimo']

cross_new = pd.merge(cross, fenweishu_guimo, on='month', how='left')
# 核心筛选：保留市值大于当月30%分位数的股票
cross_new = cross_new[cross_new['totalvalue'] > cross_new['fenweishu_guimo']]

# --- 投资组合排序 (Sorting) ---
# 计算每个月EP因子的分位数断点 (10%, 20%, ..., 90%)
fenweishu = pd.DataFrame(
    cross_new.groupby(['month'])['ep'].quantile([0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9])
)
fenweishu = fenweishu.reset_index().pivot_table(index='month', columns='level_1', values='ep')
fenweishu.columns = ['one', 'two', 'three', 'four', 'five', 'six', 'seven', 'eight', 'nine']

# 将分位数断点合并回个股数据
portfolio = pd.merge(cross_new, fenweishu, on='month')

# 根据EP值给每只股票打标签 (P1 到 Pmax)
# P1: EP最小 (高PE/亏损，成长股)
# Pmax: EP最大 (低PE，价值股)
portfolio['sort'] = np.where(portfolio['ep'] <= portfolio['one'], 'P1',
                    np.where(portfolio['ep'] <= portfolio['two'], 'P2',
                    np.where(portfolio['ep'] <= portfolio['three'], 'P3',
                    np.where(portfolio['ep'] <= portfolio['four'], 'P4',
                    np.where(portfolio['ep'] <= portfolio['five'], 'P5',
                    np.where(portfolio['ep'] <= portfolio['six'], 'P6',
                    np.where(portfolio['ep'] <= portfolio['seven'], 'P7',
                    np.where(portfolio['ep'] <= portfolio['eight'], 'P8',
                    np.where(portfolio['ep'] <= portfolio['nine'], 'P9', 'Pmax')))))))))

# 去除缺失值，确保后续计算准确
portfolio = portfolio.dropna(subset=['floatingvalue', 'next_ret', 'ep'])
print("投资组合分组完成。")

In [None]:
# 1. 计算各分组的月度加权平均收益率
# weights = floatingvalue (流通市值加权)
portfolio_value = pd.DataFrame(
    portfolio.groupby(['month', 'sort']).apply(
        lambda x: np.average(x['next_ret'], weights=x['floatingvalue']), 
        include_groups=False
    )
)
portfolio_value = portfolio_value.reset_index()
portfolio_value.columns = ['month', 'sort', 'p']
# 调整月份为月末
portfolio_value['month'] = portfolio_value['month'] + MonthEnd(1)

# 2. 转换为宽表格式 (Pivot)
# 行索引: 月份, 列索引: P1...Pmax
portfolio_value = portfolio_value.pivot_table(index='month', columns='sort', values='p')

# 3. 构建多空策略 (Long-Short Strategy)
# 做多 Pmax (价值股), 做空 P1 (成长股)
portfolio_value['My_portfolio'] = portfolio_value['Pmax'] - portfolio_value['P1']

# 筛选分析的时间段 (2000-02 之后)
portfolio_value = portfolio_value['2000-02':'2024-12'].copy()
# 重置索引频率确保连续
portfolio_value.index = pd.to_datetime(portfolio_value.index)

# 4. 准备绘图与分析数据
MYPOR = portfolio_value[['P1', 'Pmax', 'My_portfolio']].dropna()
MYPOR = pd.merge(MYPOR, Market_ret[['month', 'MKT']], on='month', how='left')
MYPOR.set_index('month', inplace=True)

# 5. 计算净值曲线 (Cumulative Returns)
MYPOR['price_portfolio'] = (1 + MYPOR['My_portfolio']).cumprod() # 多空策略净值
MYPOR['price_p1'] = (1 + MYPOR['P1']).cumprod()       # 纯成长股净值
MYPOR['price_pmax'] = (1 + MYPOR['Pmax']).cumprod()   # 纯价值股净值
MYPOR['price_market'] = (1 + MYPOR['MKT']).cumprod()  # 市场基准净值

# 6. 计算风险指标：夏普比率 (Sharpe Ratio)
# 假设年化无风险利率接近0，或者已经使用的是超额收益
sharpe_ratio = MYPOR['My_portfolio'].mean() / MYPOR['My_portfolio'].std() * np.sqrt(12)
print(f"策略夏普比率 (Sharpe Ratio): {sharpe_ratio:.4f}")

# 7. 计算最大回撤 (Max Drawdown)
cumulative_return = (1 + MYPOR['My_portfolio']).cumprod()
rolling_max = cumulative_return.cummax()
drawdown = cumulative_return / rolling_max - 1
max_drawdown = drawdown.min()

# 找出最大回撤发生的日期区间
max_drawdown_end = drawdown.idxmin()
max_drawdown_start = cumulative_return.loc[:max_drawdown_end].idxmax()

print(f"策略最大回撤 (Max Drawdown): {max_drawdown:.4%}")
print(f"最大回撤区间: {max_drawdown_start.date()} 至 {max_drawdown_end.date()}")

In [None]:
# OLS 回归分析
# 检验策略是否拥有相对于市场的 Alpha
# 模型: Strategy_Return ~ constant + Market_Return
model_port = smf.ols('My_portfolio ~ MKT', data=MYPOR).fit(
    cov_type='HAC', cov_kwds={'maxlags': 6} # 使用HAC调整标准误，修正异方差和自相关
)
print("\n=== OLS Regression Results (CAPM Alpha) ===")
print(model_port.summary())

#Fama-MacBeth 回归
# 检验在控制了规模(Size)之后，EP因子是否依然显著正向影响下期收益
cross_reg = cross_new[cross_new['month'] >= '2000-01'].copy()
cross_reg = cross_reg.set_index(['Stkcd', 'month'])
cross_reg['log_totalvalue'] = np.log(cross_reg['totalvalue']) # 对数市值

# 模型: next_ret ~ intercept + log_size + ep
fm_model = FamaMacBeth.from_formula(
    'next_ret ~ 1 + log_totalvalue + ep', 
    data=cross_reg.dropna(subset=['next_ret', 'log_totalvalue', 'ep'])
)
# Newey-West 调整带宽设置为 6
res = fm_model.fit(cov_type='kernel', debiased=False, bandwidth=6)
print("\n=== Fama-MacBeth Regression Results ===")
print(res.summary)

# --- 3. 绘图：策略净值走势 ---
plt.figure(figsize=(10, 5))
plt.plot(MYPOR['price_portfolio'], '.-r', label='Price of My Portfolio (Value - Growth)', linewidth=1)
plt.plot(MYPOR['price_market'], '.-b', label='Price of Market', linewidth=1)
plt.plot(MYPOR['price_p1'], '.-g', label='Price of Lowest EP (Growth)', linewidth=1)
plt.plot(MYPOR['price_pmax'], '.-c', label='Price of Highest EP (Value)', linewidth=1)

plt.title("China's Stock Market: Value Strategy Performance")
plt.xlabel('Month')
plt.ylabel('Cumulative Return (Net Value)')
plt.legend()
plt.grid(True, alpha=0.3)
plt.show()

In [None]:
# 分时段检验
time_periods = [
    ('2000-02-29', '2007-12-31', '2000-2007'),
    ('2008-01-31', '2015-12-31', '2008-2015'),
    ('2016-01-31', '2024-12-31', '2016-2024')
]

period_results = []
for start_date, end_date, period_name in time_periods:
    # 截取该时间段数据
    subset = portfolio_value.loc[start_date:end_date, 'My_portfolio']
    if len(subset) == 0: continue
    
    # 计算统计量
    mean_ret = subset.mean()
    t_stat, p_val = stats.ttest_1samp(subset.dropna(), 0)
    win_rate = (subset > 0).sum() / len(subset)
    
    period_results.append({
        '时期': period_name,
        '样本数': len(subset),
        '年化超额收益(%)': mean_ret * 12 * 100,
        't统计量': t_stat,
        'p值': p_val,
        '胜率': win_rate
    })

print("\n=== 分时段稳健性检验 ===")
print(pd.DataFrame(period_results))

# 牛熊市状态检验 (Bull vs Bear Market)
# 定义：市场当月收益率 > 历史中位数 为牛市，否则为熊市
portfolio_with_mkt = pd.merge(portfolio_value, Market_ret[['month', 'MKT']], left_index=True, right_on='month')
mkt_median = portfolio_with_mkt['MKT'].median()

portfolio_with_mkt['State'] = np.where(portfolio_with_mkt['MKT'] > mkt_median, '牛市', '熊市')

state_results = []
for state in ['牛市', '熊市']:
    subset = portfolio_with_mkt[portfolio_with_mkt['State'] == state]['My_portfolio']
    
    mean_ret = subset.mean()
    t_stat, p_val = stats.ttest_1samp(subset.dropna(), 0)
    
    state_results.append({
        '市场状态': state,
        '样本数': len(subset),
        '年化超额收益(%)': mean_ret * 12 * 100,
        't统计量': t_stat,
        'p值': p_val
    })

print("\n=== 牛熊市稳健性检验 ===")
print(f"市场收益率中位数: {mkt_median:.4%}")
print(pd.DataFrame(state_results))