In [None]:
# 导入必要的库
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from scipy import stats
import warnings
warnings.filterwarnings('ignore')

# 设置中文字体和图表风格
plt.rcParams['font.sans-serif'] = ['SimHei']
plt.rcParams['axes.unicode_minus'] = False
sns.set_style('whitegrid')

# 导入回测结果
from pathlib import Path
results_path = Path('../data/backtest_results.csv')

if results_path.exists():
    # 加载回测结果
    results = pd.read_csv(results_path, parse_dates=['datetime'])
    print("成功加载回测结果数据")
    print(f"数据时间范围：{results['datetime'].min()} 至 {results['datetime'].max()}")
    print(f"数据列：{', '.join(results.columns)}")
else:
    print("示例：回测结果数据结构")
    results = pd.DataFrame({
        'datetime': pd.date_range('2020-01-01', '2023-12-31', freq='D'),
        'portfolio_value': np.random.uniform(1000000, 1500000, 1461),
        'returns': np.random.normal(0.0002, 0.02, 1461),
        'benchmark_returns': np.random.normal(0.0001, 0.015, 1461),
        'positions': np.random.randint(1, 10, 1461),
        'cash': np.random.uniform(100000, 200000, 1461),
        'trades': np.random.randint(0, 5, 1461)
    })
    print("使用模拟数据进行演示")


In [None]:
def calculate_performance_metrics(returns, benchmark_returns=None):
    """计算策略绩效指标
    
    Args:
        returns (pd.Series): 策略收益率序列
        benchmark_returns (pd.Series, optional): 基准收益率序列
        
    Returns:
        dict: 绩效指标字典
    """
    # 计算累计收益
    cum_returns = (1 + returns).cumprod() - 1
    
    # 计算年化收益率
    years = len(returns) / 252  # 假设252个交易日
    annual_return = (1 + cum_returns.iloc[-1]) ** (1/years) - 1
    
    # 计算波动率
    annual_volatility = returns.std() * np.sqrt(252)
    
    # 计算夏普比率
    risk_free_rate = 0.03  # 假设无风险利率为3%
    excess_returns = returns - risk_free_rate/252
    sharpe_ratio = np.sqrt(252) * excess_returns.mean() / returns.std()
    
    # 计算最大回撤
    cum_returns_series = (1 + returns).cumprod()
    rolling_max = cum_returns_series.expanding().max()
    drawdowns = cum_returns_series/rolling_max - 1
    max_drawdown = drawdowns.min()
    
    # 计算超额收益（如果有基准）
    if benchmark_returns is not None:
        cum_benchmark_returns = (1 + benchmark_returns).cumprod() - 1
        excess_cum_returns = cum_returns - cum_benchmark_returns
        tracking_error = (returns - benchmark_returns).std() * np.sqrt(252)
        information_ratio = (returns - benchmark_returns).mean() / (returns - benchmark_returns).std() * np.sqrt(252)
    else:
        excess_cum_returns = None
        tracking_error = None
        information_ratio = None
    
    return {
        'cumulative_return': cum_returns.iloc[-1],
        'annual_return': annual_return,
        'annual_volatility': annual_volatility,
        'sharpe_ratio': sharpe_ratio,
        'max_drawdown': max_drawdown,
        'excess_return': excess_cum_returns.iloc[-1] if excess_cum_returns is not None else None,
        'tracking_error': tracking_error,
        'information_ratio': information_ratio
    }

# 计算绩效指标
metrics = calculate_performance_metrics(results['returns'], results['benchmark_returns'])

# 打印绩效指标
print("策略绩效指标：")
print(f"累计收益率: {metrics['cumulative_return']:.2%}")
print(f"年化收益率: {metrics['annual_return']:.2%}")
print(f"年化波动率: {metrics['annual_volatility']:.2%}")
print(f"夏普比率: {metrics['sharpe_ratio']:.2f}")
print(f"最大回撤: {metrics['max_drawdown']:.2%}")
print(f"超额收益: {metrics['excess_return']:.2%}")
print(f"跟踪误差: {metrics['tracking_error']:.2%}")
print(f"信息比率: {metrics['information_ratio']:.2f}")

# 绘制收益曲线
plt.figure(figsize=(12, 6))
cum_returns = (1 + results['returns']).cumprod() - 1
cum_benchmark_returns = (1 + results['benchmark_returns']).cumprod() - 1

plt.plot(results['datetime'], cum_returns, label='策略收益')
plt.plot(results['datetime'], cum_benchmark_returns, label='基准收益')
plt.title('策略累计收益对比图')
plt.xlabel('日期')
plt.ylabel('累计收益率')
plt.legend()
plt.grid(True)
plt.show()


In [None]:
def analyze_risk(returns):
    """分析策略风险特征
    
    Args:
        returns (pd.Series): 收益率序列
    """
    # 1. 收益分布分析
    plt.figure(figsize=(15, 10))
    
    # 1.1 直方图和核密度估计
    plt.subplot(2, 2, 1)
    sns.histplot(returns, kde=True, stat='density')
    plt.title('收益率分布')
    plt.xlabel('收益率')
    plt.ylabel('密度')
    
    # 1.2 Q-Q图
    plt.subplot(2, 2, 2)
    stats.probplot(returns, dist="norm", plot=plt)
    plt.title('收益率Q-Q图')
    
    # 2. 波动率分析
    # 计算20日滚动波动率
    rolling_vol = returns.rolling(window=20).std() * np.sqrt(252)
    plt.subplot(2, 2, 3)
    plt.plot(rolling_vol.index, rolling_vol)
    plt.title('20日滚动波动率')
    plt.xlabel('日期')
    plt.ylabel('年化波动率')
    
    # 3. 回撤分析
    cum_returns = (1 + returns).cumprod()
    rolling_max = cum_returns.expanding().max()
    drawdowns = cum_returns/rolling_max - 1
    
    plt.subplot(2, 2, 4)
    plt.plot(drawdowns.index, drawdowns)
    plt.title('回撤分析')
    plt.xlabel('日期')
    plt.ylabel('回撤')
    
    plt.tight_layout()
    plt.show()
    
    # 4. 风险统计指标
    # 计算VaR和CVaR
    var_95 = np.percentile(returns, 5)
    cvar_95 = returns[returns <= var_95].mean()
    
    # 计算偏度和峰度
    skewness = stats.skew(returns)
    kurtosis = stats.kurtosis(returns)
    
    print("\n风险统计指标：")
    print(f"95% VaR: {var_95:.2%}")
    print(f"95% CVaR: {cvar_95:.2%}")
    print(f"偏度: {skewness:.2f}")
    print(f"峰度: {kurtosis:.2f}")
    
    # 5. 最大回撤期间分析
    max_drawdown_idx = drawdowns.idxmin()
    max_drawdown_start = drawdowns[:max_drawdown_idx][drawdowns[:max_drawdown_idx] == 0].index[-1]
    max_drawdown_end = drawdowns[max_drawdown_idx:][drawdowns[max_drawdown_idx:] == 0].index[0] \
        if len(drawdowns[max_drawdown_idx:][drawdowns[max_drawdown_idx:] == 0]) > 0 \
        else drawdowns.index[-1]
    
    print("\n最大回撤期间分析：")
    print(f"开始日期: {max_drawdown_start}")
    print(f"结束日期: {max_drawdown_end}")
    print(f"持续天数: {(max_drawdown_end - max_drawdown_start).days}天")
    print(f"最大回撤: {drawdowns.min():.2%}")

# 分析策略风险
analyze_risk(results['returns'])


In [None]:
def analyze_trading(results_df):
    """分析策略交易特征
    
    Args:
        results_df (pd.DataFrame): 回测结果数据框
    """
    # 1. 交易频率分析
    daily_trades = results_df['trades']
    
    plt.figure(figsize=(15, 10))
    
    # 1.1 每日交易次数分布
    plt.subplot(2, 2, 1)
    sns.histplot(daily_trades[daily_trades > 0], bins=20)
    plt.title('每日交易次数分布')
    plt.xlabel('交易次数')
    plt.ylabel('频率')
    
    # 1.2 交易次数的时间序列
    plt.subplot(2, 2, 2)
    plt.plot(results_df['datetime'], daily_trades)
    plt.title('每日交易次数时间序列')
    plt.xlabel('日期')
    plt.ylabel('交易次数')
    
    # 2. 持仓分析
    # 2.1 持仓数量时间序列
    plt.subplot(2, 2, 3)
    plt.plot(results_df['datetime'], results_df['positions'])
    plt.title('持仓数量时间序列')
    plt.xlabel('日期')
    plt.ylabel('持仓数量')
    
    # 2.2 现金比例时间序列
    cash_ratio = results_df['cash'] / results_df['portfolio_value']
    plt.subplot(2, 2, 4)
    plt.plot(results_df['datetime'], cash_ratio)
    plt.title('现金比例时间序列')
    plt.xlabel('日期')
    plt.ylabel('现金比例')
    
    plt.tight_layout()
    plt.show()
    
    # 3. 交易统计
    total_days = len(results_df)
    trading_days = len(results_df[results_df['trades'] > 0])
    avg_positions = results_df['positions'].mean()
    avg_cash_ratio = cash_ratio.mean()
    
    print("\n交易统计：")
    print(f"总交易日数: {total_days}天")
    print(f"有交易的天数: {trading_days}天")
    print(f"交易频率: {trading_days/total_days:.2%}")
    print(f"平均每日交易次数: {daily_trades.mean():.2f}")
    print(f"平均持仓数量: {avg_positions:.2f}")
    print(f"平均现金比例: {avg_cash_ratio:.2%}")
    
    # 4. 持仓周转率（以持仓变化作为简化估计）
    position_changes = abs(results_df['positions'].diff())
    turnover_ratio = position_changes.sum() / (2 * total_days * avg_positions) if avg_positions > 0 else 0
    
    print("\n周转率分析：")
    print(f"年化换手率: {turnover_ratio * 252:.2%}")

# 分析交易特征
analyze_trading(results)


In [None]:
def perform_attribution_analysis(returns, benchmark_returns):
    """进行归因分析
    
    Args:
        returns (pd.Series): 策略收益率序列
        benchmark_returns (pd.Series): 基准收益率序列
    """
    # 1. 计算滚动beta
    def calculate_rolling_beta(returns, benchmark_returns, window=60):
        # 计算协方差矩阵
        covariance = returns.rolling(window=window).cov(benchmark_returns)
        # 计算基准收益率的方差
        benchmark_variance = benchmark_returns.rolling(window=window).var()
        # 计算beta
        return covariance / benchmark_variance
    
    rolling_beta = calculate_rolling_beta(returns, benchmark_returns)
    
    # 2. 计算超额收益分解
    # 假设使用单因子模型：R = α + β*Rm + ε
    # 使用全样本beta进行分解
    X = sm.add_constant(benchmark_returns)
    model = sm.OLS(returns, X)
    results = model.fit()
    
    alpha = results.params[0]
    beta = results.params[1]
    r_squared = results.rsquared
    
    # 3. 绘制分析图
    plt.figure(figsize=(15, 10))
    
    # 3.1 滚动beta
    plt.subplot(2, 2, 1)
    plt.plot(rolling_beta.index, rolling_beta)
    plt.title('60日滚动Beta')
    plt.xlabel('日期')
    plt.ylabel('Beta')
    
    # 3.2 残差收益率
    residuals = results.resid
    plt.subplot(2, 2, 2)
    plt.plot(residuals.index, residuals.cumsum())
    plt.title('累计残差收益（Alpha）')
    plt.xlabel('日期')
    plt.ylabel('累计残差收益')
    
    # 3.3 收益分解
    market_contribution = beta * benchmark_returns
    alpha_contribution = returns - market_contribution
    
    plt.subplot(2, 2, 3)
    plt.plot(returns.index, alpha_contribution.cumsum(), label='Alpha贡献')
    plt.plot(returns.index, market_contribution.cumsum(), label='市场贡献')
    plt.title('收益分解')
    plt.xlabel('日期')
    plt.ylabel('累计贡献')
    plt.legend()
    
    # 3.4 QQ图分析残差
    plt.subplot(2, 2, 4)
    stats.probplot(residuals, dist="norm", plot=plt)
    plt.title('残差Q-Q图')
    
    plt.tight_layout()
    plt.show()
    
    # 4. 打印归因分析结果
    print("\n归因分析结果：")
    print(f"Alpha（年化）: {alpha * 252:.2%}")
    print(f"Beta: {beta:.2f}")
    print(f"R方: {r_squared:.2%}")
    
    # 5. 计算业绩分解
    total_return = (1 + returns).prod() - 1
    market_return = (1 + market_contribution).prod() - 1
    alpha_return = (1 + alpha_contribution).prod() - 1
    
    print("\n收益分解：")
    print(f"总收益: {total_return:.2%}")
    print(f"市场贡献: {market_return:.2%}")
    print(f"Alpha贡献: {alpha_return:.2%}")
    
    # 6. 计算信息比率
    tracking_error = residuals.std() * np.sqrt(252)
    information_ratio = (alpha * 252) / tracking_error
    
    print("\n风险调整指标：")
    print(f"跟踪误差: {tracking_error:.2%}")
    print(f"信息比率: {information_ratio:.2f}")

# 进行归因分析
try:
    import statsmodels.api as sm
    perform_attribution_analysis(results['returns'], results['benchmark_returns'])
except ImportError:
    print("注意：需要安装statsmodels包来进行完整的归因分析。")
    print("可以使用命令：pip install statsmodels")


In [None]:
def perform_robustness_tests(returns, benchmark_returns):
    """进行策略鲁棒性测试
    
    Args:
        returns (pd.Series): 策略收益率序列
        benchmark_returns (pd.Series): 基准收益率序列
    """
    # 1. 分析不同市场环境下的表现
    # 根据基准收益率划分市场环境
    market_conditions = pd.qcut(benchmark_returns, 
                              q=3, 
                              labels=['熊市', '震荡市', '牛市'])
    
    # 计算不同市场环境下的表现
    performance_by_market = pd.DataFrame({
        '策略收益率': returns,
        '基准收益率': benchmark_returns,
        '市场环境': market_conditions
    }).groupby('市场环境').agg({
        '策略收益率': ['mean', 'std'],
        '基准收益率': ['mean', 'std']
    })
    
    # 2. 交易成本敏感性分析
    cost_levels = [0, 0.001, 0.002, 0.003, 0.004, 0.005]  # 0到50bp的交易成本
    cost_impact = []
    
    for cost in cost_levels:
        # 假设每次换仓都产生成本
        turnover_cost = abs(returns).mean() * cost
        net_return = returns - turnover_cost
        
        # 计算年化收益和夏普比率
        annual_return = (1 + net_return).prod() ** (252/len(net_return)) - 1
        sharpe = np.sqrt(252) * net_return.mean() / net_return.std()
        
        cost_impact.append({
            '交易成本(bp)': cost * 10000,
            '年化收益率': annual_return,
            '夏普比率': sharpe
        })
    
    cost_impact_df = pd.DataFrame(cost_impact)
    
    # 3. 绘制分析图
    plt.figure(figsize=(15, 10))
    
    # 3.1 不同市场环境下的表现对比
    plt.subplot(2, 2, 1)
    performance_by_market['策略收益率']['mean'].plot(kind='bar', yerr=performance_by_market['策略收益率']['std'])
    plt.title('不同市场环境下的策略表现')
    plt.xlabel('市场环境')
    plt.ylabel('日均收益率')
    
    # 3.2 交易成本敏感性分析
    plt.subplot(2, 2, 2)
    plt.plot(cost_impact_df['交易成本(bp)'], cost_impact_df['年化收益率'])
    plt.title('交易成本敏感性分析')
    plt.xlabel('交易成本(bp)')
    plt.ylabel('年化收益率')
    
    # 3.3 滚动窗口分析
    window_size = 60  # 60天窗口
    rolling_sharpe = np.sqrt(252) * returns.rolling(window=window_size).mean() / returns.rolling(window=window_size).std()
    
    plt.subplot(2, 2, 3)
    plt.plot(rolling_sharpe.index, rolling_sharpe)
    plt.title('60日滚动夏普比率')
    plt.xlabel('日期')
    plt.ylabel('夏普比率')
    
    # 3.4 子样本分析
    mid_point = len(returns) // 2
    periods = ['前半样本', '后半样本']
    sub_sample_returns = [returns[:mid_point], returns[mid_point:]]
    sub_sample_stats = []
    
    for period, ret in zip(periods, sub_sample_returns):
        annual_return = (1 + ret).prod() ** (252/len(ret)) - 1
        sharpe = np.sqrt(252) * ret.mean() / ret.std()
        max_drawdown = (ret.cumsum() - ret.cumsum().expanding().max()).min()
        
        sub_sample_stats.append({
            '样本期': period,
            '年化收益率': annual_return,
            '夏普比率': sharpe,
            '最大回撤': max_drawdown
        })
    
    sub_sample_df = pd.DataFrame(sub_sample_stats)
    
    plt.subplot(2, 2, 4)
    sub_sample_df.set_index('样本期')[['年化收益率', '夏普比率', '最大回撤']].plot(kind='bar')
    plt.title('子样本分析')
    plt.xlabel('样本期')
    plt.ylabel('指标值')
    
    plt.tight_layout()
    plt.show()
    
    # 4. 打印详细分析结果
    print("\n不同市场环境下的表现：")
    print(performance_by_market)
    
    print("\n交易成本敏感性分析：")
    print(cost_impact_df)
    
    print("\n子样本分析：")
    print(sub_sample_df)

# 进行鲁棒性测试
perform_robustness_tests(results['returns'], results['benchmark_returns'])
