In [None]:
import pandas as pd
import numpy as np
import statsmodels.api as sm
import matplotlib.pyplot as plt
from scipy.stats import chi2, f
plt.style.use('seaborn')
pd.set_option('display.max_columns', None)

def calculate_log_returns(df, price_col='close'):
    """计算对数收益率并过滤异常值"""
    df['return'] = np.log(df[price_col]) - np.log(df[price_col].shift(1))
    df = df[df['return'].between(-0.1, 0.1)].copy()
    return df.dropna()


# 第一部分：单资产CAPM检验

def single_asset_test(stock_path, market_path, rf_col='rf'):
    """单资产时间序列检验"""
    # 读取数据
    stock = pd.read_csv(stock_path, parse_dates=['date'])
    market = pd.read_csv(market_path, parse_dates=['date'])
    
    # 计算收益率
    stock = calculate_log_returns(stock)
    market = calculate_log_returns(market, 'close')
    
    # 合并数据集
    merged = pd.merge(stock[['date', 'return', rf_col]], 
                     market[['date', 'return']], 
                     on='date', suffixes=('_stock', '_market'))
    
    # CAPM回归
    X = sm.add_constant(merged['return_market'] - merged[rf_col])
    y = merged['return_stock'] - merged[rf_col]
    model = sm.OLS(y, X)
    results = model.fit()
    
    return results


# 第二部分：多资产CAPM检验

def multi_asset_test(stocks_data, market_data, rf_col='rf'):
    """多资产联合检验"""
    # 准备数据矩阵
    merged = market_data[['date', 'return', rf_col]].copy()
    merged['market_excess'] = merged['return'] - merged[rf_col]
    
    for i, stock_df in enumerate(stocks_data, 1):
        stock_df = calculate_log_returns(stock_df)
        merged = pd.merge(merged, stock_df[['date', 'return']], 
                         on='date', how='inner', 
                         suffixes=('', f'_stock{i}'))
    
    # 构建收益矩阵
    excess_returns = merged.filter(regex='stock').sub(merged[rf_col], axis=0)
    market_excess = merged['market_excess']
    
    # 多变量回归
    X = sm.add_constant(market_excess)
    models = [sm.OLS(y, X).fit() for y in excess_returns.values.T]
    
    # 联合假设检验
    alphas = [model.params[0] for model in models]
    cov_matrix = np.cov([model.resid for model in models])
    
    # Wald检验
    wald_stat = alphas @ np.linalg.inv(cov_matrix) @ alphas
    p_wald = 1 - chi2.cdf(wald_stat, len(alphas))
    
    return {
        'alphas': alphas,
        'wald_stat': wald_stat,
        'p_value': p_wald
    }

# 第三部分：惯性/反转效应检验

def momentum_effect_test(data, N_values, M_values, n_groups=5):
    """动量效应检验"""
    results = {}
    
    for N in N_values:
        for M in M_values:
            # 计算过去N期收益
            data[f'past_{N}'] = data.groupby('code')['return'].transform(
                lambda x: x.rolling(N).apply(lambda x: np.prod(1+x)-1))
            
            # 分组构建组合
            data['group'] = data.groupby('date')[f'past_{N}'].transform(
                lambda x: pd.qcut(x, n_groups, labels=False))
            
            # 计算未来M期收益
            future_ret = data.groupby(['date', 'group'])['return'].transform(
                lambda x: x.rolling(M).apply(lambda x: np.prod(1+x)-1))
            
            # 统计组合收益
            portfolio_ret = future_ret.groupby(data['group']).mean()
            results[(N, M)] = portfolio_ret
    
    return results

# ==============================
# 主程序执行
# ==============================
if __name__ == "__main__":
    # 单资产检验示例
    stock_results = []
    for stock_code in ['708', '20', '547', '982']:
        res = single_asset_test(f'{stock_code}_data.csv', 'market_data.csv')
        stock_results.append(res.summary())
    
    # 多资产检验
    stock_dfs = [pd.read_csv(f'{code}_data.csv') for code in ['708', '20', '547', '982']]
    market_data = pd.read_csv('market_data.csv')
    multi_result = multi_asset_test(stock_dfs, market_data)
    
    # 惯性效应检验
    all_data = pd.concat([pd.read_csv(f'{code}_data.csv') for code in ['708', '20', '547', '982']])
    momentum_results = momentum_effect_test(all_data, 
                                           N_values=[1, 3, 6, 12],
                                           M_values=[1, 3, 6, 12])
    
    # 结果可视化
    fig, ax = plt.subplots(2, 2, figsize=(15, 10))
    for idx, (key, rets) in enumerate(momentum_results.items()):
        row, col = divmod(idx, 2)
        rets.plot.bar(ax=ax[row, col])
        ax[row, col].set_title(f'N={key[0]}, M={key[1]}')
    plt.tight_layout()
    plt.show()