# 回测引擎使用教程


## 🎯 教程目标

1. 掌握 `BacktestEngine` 的正确使用方法
2. 理解参数配置与因子数据要求
3. 学会分析和可视化回测结果
4. 了解最佳实践与注意事项

## 1. 导入必要模块

In [20]:
# 基础库
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import warnings
warnings.filterwarnings('ignore')

# 添加路径
import sys
import os
sys.path.append(os.path.dirname(os.path.abspath('.')))

# 重新加载模块（解决缓存问题）
import importlib

# 导入回测引擎（注意：已移除 run_backtest 函数，统一使用 BacktestEngine 类）
from backtest_engine import engine as engine_module
from backtest_engine import performance as performance_module
from data_manager import data as data_module

# 重新加载模块以确保使用最新代码
importlib.reload(engine_module)
importlib.reload(performance_module)
importlib.reload(data_module)

from backtest_engine.engine import BacktestEngine
from backtest_engine.performance import PerformanceAnalyzer
from data_manager.data import DataManager

print("✅ 回测引擎模块导入完成!")
print("📌 注意：现在统一使用 BacktestEngine 类进行回测")
print("🔄 已重新加载模块，使用最新代码")

✅ 回测引擎模块导入完成!
📌 注意：现在统一使用 BacktestEngine 类进行回测
🔄 已重新加载模块，使用最新代码


## 2. 初始化数据管理器

首先初始化 `DataManager` 用于加载股票数据。

In [21]:
# 初始化数据管理器
data_manager = DataManager()

print("✅ 数据管理器初始化完成!")
print("📊 可用数据类型: daily, balancesheet, income, cashflow")

✅ 数据管理器初始化完成!
📊 可用数据类型: daily, balancesheet, income, cashflow


## 3. 初始化回测引擎

创建 `BacktestEngine` 实例，配置回测参数。

In [22]:
# 创建回测引擎实例
engine = BacktestEngine(
    data_manager=data_manager,
    n_groups=5,                  # 分5组
    fee=0.001,                   # 0.1% 交易费用
    long_direction='low',        # 做多因子值低的组（例如：做多小市值）
    rebalance_freq='weekly',     # 周度调仓
    factor_name='factor'         # 因子列名（重要！）
)

print("🔧 回测引擎初始化完成!")
print("\n参数配置:")
print(f"  📊 分组数: {engine.n_groups}")
print(f"  💰 交易费用: {engine.fee:.3%}")
print(f"  📈 多头方向: {engine.long_direction}")
print(f"  🔄 调仓频率: {engine.rebalance_freq}")
print(f"  🏷️  因子名称: {engine.factor_name}")

🔧 回测引擎初始化完成
   多头方向: low
   调仓频率: weekly
   交易费用: 0.100%
   因子名称: factor
🔧 回测引擎初始化完成!

参数配置:
  📊 分组数: 5
  💰 交易费用: 0.100%
  📈 多头方向: low
  🔄 调仓频率: weekly
  🏷️  因子名称: factor


## 4. 执行回测的两种方式

### 方式一：使用 `prepare_data()` + `run()`（推荐用于内置因子）

引擎会自动计算因子并执行回测。

In [23]:
# 方式一示例：引擎自动准备数据（适用于内置因子，如规模因子）
def example_method_1():
    """使用 prepare_data + run 执行回测"""
    print("🚀 方式一：使用 prepare_data() + run()\n")
    
    # 步骤1：准备数据（引擎会自动计算因子）
    engine.prepare_data(
        start_date='2018-01-01',
        end_date='2018-12-31',
        stock_codes=None  # None 表示使用所有可用股票
    )
    
    # 步骤2：运行回测
    portfolio_returns = engine.run()
    
    print(f"\n✅ 回测完成!")
    print(f"  📊 收益序列: {len(portfolio_returns)} 个交易日")
    print(f"  💼 策略列: {list(portfolio_returns.columns)}")
    
    return portfolio_returns

# 注意：这种方式适用于 prepare_data 内部会调用因子计算函数的情况
print("✅ 方式一示例函数定义完成")
print("💡 适用于引擎内置的因子计算逻辑")

✅ 方式一示例函数定义完成
💡 适用于引擎内置的因子计算逻辑


### 方式二：使用自定义因子数据（推荐用于自定义因子）

如果你已经有了计算好的因子数据，可以直接设置到引擎中。

In [24]:
# 方式二示例：使用自定义因子数据
def example_method_2_with_custom_factor(factor_data, start_date, end_date):
    """
    使用自定义因子数据执行回测
    
    参数:
        factor_data: DataFrame with MultiIndex (trade_date, ts_code) and column 'factor'
        start_date: 回测开始日期
        end_date: 回测结束日期
    """
    print("🚀 方式二：使用自定义因子数据\n")
    
    # 创建引擎实例
    custom_engine = BacktestEngine(
        data_manager=data_manager,
        fee=0.001,
        long_direction='low',
        rebalance_freq='weekly',
        factor_name='factor'  # 确保与 factor_data 的列名一致
    )
    
    # 直接设置因子数据
    custom_engine.factor_data = factor_data
    
    # 获取股票列表
    stock_codes = factor_data.index.get_level_values('ts_code').unique().tolist()
    
    # 加载股票价格数据
    stock_data = data_manager.load_data(
        'daily',
        start_date=start_date,
        end_date=end_date,
        stock_codes=stock_codes
    )
    
    # 计算收益率
    stock_data = stock_data.sort_values(['ts_code', 'trade_date'])
    stock_data['next_return'] = stock_data.groupby('ts_code')['close'].pct_change().shift(-1)
    
    # 合并因子和收益率数据
    factor_reset = factor_data.reset_index()
    stock_subset = stock_data[['ts_code', 'trade_date', 'next_return']].copy()
    
    custom_engine.combined_data = pd.merge(
        factor_reset,
        stock_subset,
        on=['ts_code', 'trade_date'],
        how='inner'
    )
    
    # 确保因子列存在并处理缺失值
    if custom_engine.factor_name in custom_engine.combined_data.columns:
        custom_engine.combined_data.dropna(
            subset=[custom_engine.factor_name, 'next_return'], 
            inplace=True
        )
    
    # 运行回测
    portfolio_returns = custom_engine.run()
    
    print(f"\n✅ 回测完成!")
    print(f"  📊 收益序列: {len(portfolio_returns)} 个交易日")
    print(f"  💼 策略列: {list(portfolio_returns.columns)}")
    
    return portfolio_returns, custom_engine

print("✅ 方式二示例函数定义完成")
print("💡 适用于已有计算好的因子数据的情况")
print("💡 因子数据格式要求: MultiIndex (trade_date, ts_code) + 列名 'factor'")

✅ 方式二示例函数定义完成
💡 适用于已有计算好的因子数据的情况
💡 因子数据格式要求: MultiIndex (trade_date, ts_code) + 列名 'factor'


## 5. 性能分析与 IC 计算

使用 `PerformanceAnalyzer` 进行详细的性能分析和 IC 计算。

In [25]:
def analyze_performance(engine):
    """
    使用 PerformanceAnalyzer 进行完整的性能分析
    
    参数:
        engine: BacktestEngine 实例（已执行过 run()）
    """
    print("📊 开始性能分析...\n")
    
    try:
        # 获取性能分析器
        analyzer = engine.get_performance_analysis()
        
        # 计算性能指标（会自动计算 IC）
        metrics_df = analyzer.calculate_metrics()
        
        print("📈 组合业绩指标:")
        print(metrics_df.to_string())
        
        # 显示 IC 分析结果
        if analyzer.ic_series is not None and len(analyzer.ic_series) > 0:
            ic_mean = analyzer.ic_series.mean()
            ic_std = analyzer.ic_series.std()
            icir = ic_mean / ic_std if ic_std > 0 else 0
            ic_positive_ratio = (analyzer.ic_series > 0).mean()
            
            print(f"\n🎯 IC 分析:")
            print(f"  IC 均值: {ic_mean:.4f}")
            print(f"  IC 标准差: {ic_std:.4f}")
            print(f"  ICIR: {icir:.4f}")
            print(f"  IC>0 占比: {ic_positive_ratio:.2%}")
        else:
            print("\n⚠️  IC 分析数据不可用")
        
        return analyzer
        
    except Exception as e:
        print(f"❌ 性能分析失败: {e}")
        import traceback
        traceback.print_exc()
        return None

print("✅ 性能分析函数定义完成")
print("💡 使用 engine.get_performance_analysis() 获取分析器")

✅ 性能分析函数定义完成
💡 使用 engine.get_performance_analysis() 获取分析器


## 6. 结果可视化

绘制累积收益曲线和收益分布图。

In [26]:
def plot_backtest_results(portfolio_returns, strategy_name='Long_Only'):
    """
    绘制回测结果图表
    
    参数:
        portfolio_returns: DataFrame，包含策略收益序列
        strategy_name: 要绘制的策略名称（'Long_Only' 或 'Long_Short'）
    """
    # 设置中文字体
    plt.rcParams['font.sans-serif'] = ['SimHei', 'Arial Unicode MS', 'DejaVu Sans']
    plt.rcParams['axes.unicode_minus'] = False
    
    # 提取指定策略的收益
    if isinstance(portfolio_returns, pd.DataFrame):
        if strategy_name not in portfolio_returns.columns:
            strategy_name = portfolio_returns.columns[0]
        returns = portfolio_returns[strategy_name]
    else:
        returns = portfolio_returns
    
    # 计算累积收益
    cumulative_returns = (1 + returns).cumprod()
    
    # 创建图表
    fig, axes = plt.subplots(2, 2, figsize=(14, 10))
    
    # 1. 累积收益曲线
    axes[0, 0].plot(cumulative_returns.index, cumulative_returns.values,
                    linewidth=2, color='steelblue', label=f'{strategy_name}')
    axes[0, 0].set_title('📈 累积收益曲线', fontsize=14, fontweight='bold')
    axes[0, 0].set_ylabel('累积净值', fontsize=12)
    axes[0, 0].grid(True, alpha=0.3)
    axes[0, 0].legend()
    
    # 添加统计信息
    final_return = cumulative_returns.iloc[-1] - 1
    axes[0, 0].text(0.02, 0.98,
                    f'总收益: {final_return:.2%}\n最高: {cumulative_returns.max():.3f}\n最低: {cumulative_returns.min():.3f}',
                    transform=axes[0, 0].transAxes,
                    verticalalignment='top',
                    bbox=dict(boxstyle='round', facecolor='wheat', alpha=0.8))
    
    # 2. 日收益率分布
    axes[0, 1].hist(returns, bins=50, alpha=0.7, color='skyblue', edgecolor='black')
    axes[0, 1].set_title('📊 日收益率分布', fontsize=14, fontweight='bold')
    axes[0, 1].set_xlabel('日收益率', fontsize=12)
    axes[0, 1].set_ylabel('频次', fontsize=12)
    axes[0, 1].grid(True, alpha=0.3)
    
    mean_ret = returns.mean()
    std_ret = returns.std()
    axes[0, 1].axvline(mean_ret, color='red', linestyle='--', label=f'均值: {mean_ret:.4f}')
    axes[0, 1].axvline(mean_ret + std_ret, color='orange', linestyle='--', alpha=0.7)
    axes[0, 1].axvline(mean_ret - std_ret, color='orange', linestyle='--', alpha=0.7)
    axes[0, 1].legend()
    
    # 3. 回撤曲线
    running_max = cumulative_returns.cummax()
    drawdown = (cumulative_returns - running_max) / running_max
    axes[1, 0].fill_between(drawdown.index, 0, drawdown.values,
                            color='red', alpha=0.3, label='回撤')
    axes[1, 0].set_title('📉 回撤曲线', fontsize=14, fontweight='bold')
    axes[1, 0].set_ylabel('回撤', fontsize=12)
    axes[1, 0].grid(True, alpha=0.3)
    axes[1, 0].legend()
    
    max_dd = drawdown.min()
    axes[1, 0].text(0.02, 0.02,
                    f'最大回撤: {max_dd:.2%}',
                    transform=axes[1, 0].transAxes,
                    bbox=dict(boxstyle='round', facecolor='lightcoral', alpha=0.8))
    
    # 4. 滚动夏普比率（60日）
    rolling_sharpe = (returns.rolling(60).mean() / returns.rolling(60).std()) * np.sqrt(252)
    axes[1, 1].plot(rolling_sharpe.index, rolling_sharpe.values,
                   linewidth=1.5, color='green', alpha=0.7)
    axes[1, 1].set_title('📊 60日滚动夏普比率', fontsize=14, fontweight='bold')
    axes[1, 1].set_ylabel('夏普比率', fontsize=12)
    axes[1, 1].axhline(0, color='black', linestyle='--', alpha=0.3)
    axes[1, 1].grid(True, alpha=0.3)
    
    plt.tight_layout()
    plt.show()
    
    print("✅ 图表绘制完成!")

print("✅ 可视化函数定义完成")
print("💡 默认绘制 'Long_Only' 策略，可通过 strategy_name 参数指定")

✅ 可视化函数定义完成
💡 默认绘制 'Long_Only' 策略，可通过 strategy_name 参数指定


## 7. 完整示例：规模因子回测

以下是一个完整的端到端示例。

In [27]:
# 完整示例：规模因子回测
def complete_example():
    """完整的规模因子回测流程演示"""
    print("=" * 60)
    print("🚀 完整示例：规模因子回测流程")
    print("=" * 60)
    
    try:
        # 步骤1：创建回测引擎
        print("\n📌 步骤1: 创建回测引擎")
        demo_engine = BacktestEngine(
            data_manager=data_manager,
            n_groups=5,
            fee=0.001,
            long_direction='low',  # 做多小市值
            rebalance_freq='weekly',
            factor_name='factor'
        )
        
        # 步骤2：准备数据（引擎内部会调用 calculate_size_factor）
        print("\n📌 步骤2: 准备数据")
        demo_engine.prepare_data(
            start_date='2018-01-01',
            end_date='2018-03-31',
            stock_codes=None
        )
        
        # 步骤3：运行回测
        print("\n📌 步骤3: 运行回测")
        portfolio_returns = demo_engine.run()
        
        # 步骤4：性能分析
        print("\n📌 步骤4: 性能分析")
        analyzer = analyze_performance(demo_engine)
        
        # 步骤5：可视化
        print("\n📌 步骤5: 可视化结果")
        plot_backtest_results(portfolio_returns, 'Long_Only')
        
        print("\n" + "=" * 60)
        print("✅ 完整示例执行成功!")
        print("=" * 60)
        
        return portfolio_returns, demo_engine
        
    except Exception as e:
        print(f"\n❌ 示例执行失败: {e}")
        import traceback
        traceback.print_exc()
        return None, None

# 运行完整示例（取消注释下面一行来执行）
# returns, eng = complete_example()

print("✅ 完整示例函数定义完成")
print("💡 取消注释最后一行来运行完整示例")

✅ 完整示例函数定义完成
💡 取消注释最后一行来运行完整示例


## 8. 最佳实践与注意事项

### ✅ 最佳实践

1. **明确指定因子列名**
   - 使用 `factor_name` 参数显式指定因子列名
   - 推荐统一使用 `'factor'` 作为列名

2. **选择合适的调仓频率**
   - `daily`: 每日调仓，计算量大，适合研究
   - `weekly`: 周度调仓，**推荐**，平衡收益和实用性
   - `monthly`: 月度调仓，降低交易成本

3. **正确设置多头方向**
   - `long_direction='high'`: 做多因子值高的组（正向因子）
   - `long_direction='low'`: 做多因子值低的组（负向因子，如小市值效应）

4. **合理设置交易费用**
   - 考虑实际交易成本（佣金、印花税、滑点等）
   - 典型范围：0.1% - 0.3%

5. **使用 PerformanceAnalyzer 进行全面分析**
   - 不仅关注收益率，也要评估风险指标
   - 重视 IC 分析，评估因子预测能力

### ⚠️ 注意事项

1. **因子数据格式**
   - 必须是 DataFrame with MultiIndex (trade_date, ts_code)
   - 因子列名必须与 `factor_name` 参数一致
   - 示例：
     ```python
     factor_data.index.names  # ['trade_date', 'ts_code']
     factor_data.columns      # ['factor']
     ```

2. **避免前视偏差**
   - 确保因子计算只使用历史数据
   - 规模因子示例：使用 `ann_date` 对齐股本数据

3. **数据质量检查**
   - 检查缺失值和异常值
   - 验证股票代码格式一致性
   - 确保日期格式正确

4. **交易成本处理**
   - 新版引擎仅在调仓日扣除成本（更真实）
   - 根据换手率估算成本

### 📚 进一步学习

- **因子开发**: 参考 `factor_library/fundamental/size_factor.py`
- **完整流程**: 参考 `factor_library/factor_backtest_template.ipynb`
- **数据准备**: 参考 `data_manager/data_example.ipynb`

---

**🎉 教程完成！**

现在你已经掌握了新版 `BacktestEngine` 的使用方法。

In [28]:
# 快速参考卡片
print("=" * 60)
print("📚 BacktestEngine 快速参考")
print("=" * 60)

reference = """
1️⃣ 导入模块:
   from backtest_engine.engine import BacktestEngine
   from data_manager.data import DataManager

2️⃣ 初始化:
   data_manager = DataManager()
   engine = BacktestEngine(
       data_manager=data_manager,
       n_groups=5,
       fee=0.001,
       long_direction='low',
       rebalance_freq='weekly',
       factor_name='factor'  # 重要！
   )

3️⃣ 执行回测:
   # 方式一：自动准备数据
   engine.prepare_data(start_date, end_date, stock_codes)
   returns = engine.run()
   
   # 方式二：使用自定义因子
   engine.factor_data = your_factor_data
   # ... 准备 combined_data ...
   returns = engine.run()

4️⃣ 性能分析:
   analyzer = engine.get_performance_analysis()
   metrics = analyzer.calculate_metrics()

5️⃣ 可视化:
   plot_backtest_results(returns, 'Long_Only')
"""

print(reference)
print("=" * 60)
print("✅ 教程结束！祝您回测顺利！")
print("=" * 60)

📚 BacktestEngine 快速参考

1️⃣ 导入模块:
   from backtest_engine.engine import BacktestEngine
   from data_manager.data import DataManager

2️⃣ 初始化:
   data_manager = DataManager()
   engine = BacktestEngine(
       data_manager=data_manager,
       n_groups=5,
       fee=0.001,
       long_direction='low',
       rebalance_freq='weekly',
       factor_name='factor'  # 重要！
   )

3️⃣ 执行回测:
   # 方式一：自动准备数据
   engine.prepare_data(start_date, end_date, stock_codes)
   returns = engine.run()
   
   # 方式二：使用自定义因子
   engine.factor_data = your_factor_data
   # ... 准备 combined_data ...
   returns = engine.run()

4️⃣ 性能分析:
   analyzer = engine.get_performance_analysis()
   metrics = analyzer.calculate_metrics()

5️⃣ 可视化:
   plot_backtest_results(returns, 'Long_Only')

✅ 教程结束！祝您回测顺利！
