# Day 2：基线回测 — SMA 交叉（backtesting.py）

目标：
- 使用 backtesting.py 在 Day1 下载的数据上实现 SMA(50,200) 交叉策略；
- 在回测中模拟手续费、滑点、最大仓位等执行约束；
- 导出回测统计与交易明细（CSV），并绘制回测结果图；
- 为 Day3 的 ML 信号留出接口（保存信号列）。

说明：建议在 WSL2 的虚拟环境中运行（与 Day1 相同 venv/conda）。

## 虚拟环境与依赖（在激活 venv 后运行）
在 WSL2 终端中，如尚未安装 backtesting.py，可运行：
```bash
pip install --upgrade pip
pip install backtesting pandas pyarrow matplotlib joblib
# 可选：若想用 backtrader 或其他框架：pip install backtrader
```
在 notebook 单元中可使用 `!pip install ...` 临时安装，但长期建议在外部激活 venv 并安装。

In [1]:
# 环境与库检查（在激活 venv 后运行）
import sys
import platform
import pandas as pd
import numpy as np
print('python', sys.version)
print('platform', platform.platform())
print('pandas', pd.__version__)
print('numpy', np.__version__)


python 3.13.5 (tags/v3.13.5:6cb20a2, Jun 11 2025, 16:15:46) [MSC v.1943 64 bit (AMD64)]
platform Windows-11-10.0.26100-SP0
pandas 2.3.3
numpy 2.3.4


## 读取 Day1 的数据（parquet/csv）
本单元会尝试从 `data/` 读取 parquest 文件。你可以修改 `symbol` 变量以读取别的文件。

In [2]:
import os
from glob import glob

# 自动寻找 data 目录下的 parquet 文件
data_dir = '../data'
parquets = glob(os.path.join(data_dir, '*.parquet'))
csvs = glob(os.path.join(data_dir, '*.csv'))
print('found parquet files:', parquets)
print('found csv files:', csvs)

# 指定 symbol（若未指定，默认使用第一个 parquet）
symbol = None
if parquets:
    path = parquets[0] if symbol is None else os.path.join(data_dir, f"{symbol}.parquet")
elif csvs:
    path = csvs[0] if symbol is None else os.path.join(data_dir, f"{symbol}.csv")
else:
    raise FileNotFoundError('No parquet or csv files found in data/. Run Day1 notebook first.')

print('Loading file:', path)
if path.endswith('.parquet'):
    df = pd.read_parquet(path)
else:
    df = pd.read_csv(path)

# 规范列名到 backtesting.py 期望的格式： 'Open','High','Low','Close','Volume'
df = df.rename(columns={
    'open':'Open', 'high':'High', 'low':'Low', 'close':'Close', 'volume':'Volume', 'date':'Date'
})

# 如果有小写列名不同，尝试更多映射
lower_cols = {c.lower(): c for c in df.columns}
if 'date' in lower_cols:
    df = df.rename(columns={lower_cols['date']:'Date'})

if 'Date' in df.columns:
    df['Date'] = pd.to_datetime(df['Date'])
    
    # backtesting.py 要求索引为 DatetimeIndex
    df = df.set_index('Date')

df = df.sort_index()
print(df.head())
print('length:', len(df))


found parquet files: ['../data\\sh601988.parquet']
found csv files: ['../data\\sh601988.csv']
Loading file: ../data\sh601988.parquet
            Open  High   Low  Close        Volume        amount  \
Date                                                              
2006-07-05  3.99  4.05  3.76   3.79  1.763902e+09  6.846112e+09   
2006-07-06  3.75  3.76  3.69   3.72  3.725565e+08  1.386101e+09   
2006-07-07  3.72  3.73  3.66   3.68  2.097986e+08  7.736735e+08   
2006-07-10  3.68  3.72  3.66   3.70  1.127236e+08  4.161987e+08   
2006-07-11  3.71  3.78  3.67   3.75  1.356450e+08  5.027508e+08   

            outstanding_share  turnover  
Date                                     
2006-07-05       3.454547e+09  0.510603  
2006-07-06       3.454547e+09  0.107845  
2006-07-07       3.454547e+09  0.060731  
2006-07-10       3.454547e+09  0.032631  
2006-07-11       3.454547e+09  0.039266  
length: 4688


## 定义 SMA Cross 策略（backtesting.py）
我们将使用 backtesting.py 的 Strategy 类实现，包含参数化的短期/长期均线、最大持仓比例与手续费/滑点设置。

In [3]:
# 数据预处理 - 计算移动平均线
df['SMA50'] = df['Close'].rolling(window=50).mean()
df['SMA200'] = df['Close'].rolling(window=200).mean()

# 删除包含NaN的行（因为移动平均在开头会产生NaN）
df_clean = df.dropna().copy()

print('数据预处理完成，开始回测...')
print(f'数据长度: {len(df_clean)}')
print(f'数据时间范围: {df_clean.index[0]} 到 {df_clean.index[-1]}')

数据预处理完成，开始回测...
数据长度: 4489
数据时间范围: 2007-04-27 00:00:00 到 2025-11-10 00:00:00


In [4]:
from backtesting import Backtest, Strategy
from backtesting.lib import crossover
import pandas as pd

# class SmaCrossStrategy(Strategy):
#     # 可调参数
#     n1 = 50  # 短期均线
#     n2 = 200 # 长期均线
#     max_pct = 0.2  # 最大仓位占比（例如 0.2 = 20%）

#     def init(self):
#         # 使用 Strategy.I 计算指标
#         self.sma1 = self.I(pd.Series.rolling, self.data.Close, window=self.n1).mean()
#         self.sma2 = self.I(pd.Series.rolling, self.data.Close, window=self.n2).mean()

#     def next(self):
#         # 如果短期上穿长期，且未持仓或持仓为0，则买入到最大仓位
#         if crossover(self.sma1, self.sma2):
#             cash = self.equity - self.position.pl
#             size = int((self.max_pct * self.equity) / self.data.Close[-1])
#             if size > 0:
#                 # 市价买入
#                 self.buy(size=size)
#         elif crossover(self.sma2, self.sma1):
#             # 平仓
#             if self.position.size > 0:
#                 self.position.close()

#     # 可在需要时添加 on_order/on_execution 回调以记录更多执行细节
    
# print('Strategy defined')

class SmaCrossStrategy(Strategy):
    n1 = 50
    n2 = 200
    max_pct = 0.2

    def init(self):
        # 预计算的指标
        self.sma1 = self.data.SMA50
        self.sma2 = self.data.SMA200
        
        # 记录信号（可选，用于分析）
        self.signal = None

    def next(self):
        # 计算信号
        prev_sma1 = self.sma1[-2] if len(self.sma1) > 1 else self.sma1[-1]
        prev_sma2 = self.sma2[-2] if len(self.sma2) > 1 else self.sma2[-1]
        
        current_sma1 = self.sma1[-1]
        current_sma2 = self.sma2[-1]
        
        # 判断金叉死叉
        golden_cross = (prev_sma1 <= prev_sma2) and (current_sma1 > current_sma2)
        death_cross = (prev_sma1 >= prev_sma2) and (current_sma1 < current_sma2)
        
        if golden_cross and not self.position:
            # 金叉买入信号
            available_cash = self.equity - self.position.pl
            price = self.data.Close[-1]
            size = int((self.max_pct * self.equity) / price)
            
            if size > 0 and available_cash >= size * price:
                self.buy(size=size)
                self.signal = 1  # 买入信号
                
        elif death_cross and self.position and self.position.size > 0:
            # 死叉卖出信号
            self.position.close()
            self.signal = -1  # 卖出信号
        else:
            self.signal = 0  # 无信号

  from .autonotebook import tqdm as notebook_tqdm


## 运行回测并导出结果
我们会配置初始资金、手续费（按比例）、滑点（绝对值或点数）等。
回测完成后将保存统计到 `reports/`，并把交易明细导出为 CSV。

In [9]:
# import os
# os.makedirs('../reports', exist_ok=True)

# from backtesting import Backtest

# # 配置参数
# cash = 100000  # 初始资金
# commission = 0.0003  # 手续费比例（示例）
# slippage = 0.01  # 滑点（每股价格偏差，示例）

# # backtesting.py 的 slippage 参数可以通过 'slippage' 或自定义委托模型实现。
# bt = Backtest(
#     df,
#     SmaCrossStrategy,
#     cash=cash,
#     commission=commission,
#     exclusive_orders=True,
#     trade_on_close=False,
#     hedging=False,
# )

# stats = bt.run()
# print(stats)

# # 保存统计到 CSV/JSON
# stats_df = pd.DataFrame(stats)
# stats_path = os.path.join('reports', 'sma_backtest_stats.csv')
# try:
#     # backtesting.run() 返回的是字典/Series-like，有时直接转换会有问题；这里把关键字段提取为 DataFrame
#     pd.Series(stats).to_frame('value').to_csv(stats_path)
#     print('Saved stats to', stats_path)
# except Exception as e:
#     print('Failed to save stats:', e)

# # 导出交易明细
# trades = bt.get_trades()
# trades_df = trades.to_df()
# trades_path = os.path.join('reports', 'sma_backtest_trades.csv')
# trades_df.to_csv(trades_path, index=False)
# print('Saved trades to', trades_path)

# # 画图并保存
# fig = bt.plot(filename=os.path.join('reports', 'sma_backtest.html'))
# print('Plotted backtest (interactive HTML saved in reports)')
import os
os.makedirs('../reports', exist_ok=True)

from backtesting import Backtest

# 配置参数
cash = 100000  # 初始资金
commission = 0.0003  # 手续费比例（示例）
slippage = 0.01  # 滑点（每股价格偏差，示例）

# 使用已经预处理好的数据（包含SMA指标）
bt = Backtest(
    df_clean,  # 使用预处理后的数据，不是原始数据
    SmaCrossStrategy,
    cash=cash,
    commission=commission,
    exclusive_orders=True,
    trade_on_close=False,
    hedging=False,
)

stats = bt.run()
print(stats)

# 保存统计到 CSV/JSON
stats_path = os.path.join('../reports', 'sma_backtest_stats.csv')
try:
    # backtesting.run() 返回的是字典/Series-like
    pd.Series(stats).to_frame('value').to_csv(stats_path)
    print('Saved stats to', stats_path)
except Exception as e:
    print('Failed to save stats:', e)

# 导出交易明细 - 正确的方法
try:
    # 方法1: 从stats中获取
    if hasattr(stats, '_trades') and stats._trades:
        trades_df = pd.DataFrame([{
            'EntryBar': trade.entry_bar,
            'ExitBar': trade.exit_bar,
            'EntryPrice': trade.entry_price,
            'ExitPrice': trade.exit_price,
            'PnL': trade.pl,
            'ReturnPct': trade.pl_pct,
            'Size': trade.size,
            'EntryTime': trade.entry_time,
            'ExitTime': trade.exit_time
        } for trade in stats._trades])
    else:
        # 方法2: 尝试其他属性名
        trades_df = pd.DataFrame()
        
    if not trades_df.empty:
        trades_path = os.path.join('../reports', 'sma_backtest_trades.csv')
        trades_df.to_csv(trades_path, index=False)
        print('Saved trades to', trades_path)
        print(f'交易数量: {len(trades_df)}')
    else:
        print('No trades found in the backtest results')
        
except Exception as e:
    print(f'Error extracting trades: {e}')

# 画图并保存
try:
    fig = bt.plot(filename=os.path.join('../reports', 'sma_backtest.html'))
    print('Plotted backtest (interactive HTML saved in reports)')
except Exception as e:
    print(f'Error plotting: {e}')

# 简单诊断与性能摘要
print("\n=== 回测结果摘要 ===")
print(f"最终净值: {stats['Equity Final [$]']:.2f}")
print(f"总回报率: {stats['Return [%]']:.2f}%")
print(f"年化回报率: {stats['Return (Ann.) [%]']:.2f}%")
if 'Sharpe Ratio' in stats and not np.isnan(stats['Sharpe Ratio']):
    print(f"夏普比率: {stats['Sharpe Ratio']:.2f}")
print(f"最大回撤: {stats['Max. Drawdown [%]']:.2f}%")
print(f"总交易次数: {stats['# Trades']}")


                                                       

Start                     2007-04-27 00:00:00
End                       2025-11-10 00:00:00
Duration                   6772 days 00:00:00
Exposure Time [%]                         0.0
Equity Final [$]                     100000.0
Equity Peak [$]                      100000.0
Return [%]                                0.0
Buy & Hold Return [%]                 0.71174
Return (Ann.) [%]                         0.0
Volatility (Ann.) [%]                     0.0
CAGR [%]                                  0.0
Sharpe Ratio                              NaN
Sortino Ratio                             NaN
Calmar Ratio                              NaN
Alpha [%]                                 0.0
Beta                                      0.0
Max. Drawdown [%]                        -0.0
Avg. Drawdown [%]                         NaN
Max. Drawdown Duration                    NaN
Avg. Drawdown Duration                    NaN
# Trades                                    0
Win Rate [%]                      

## 简单诊断与性能摘要
计算并显示交易次数、胜率、平均盈亏、最大回撤、年化收益等简单指标（部分指标已在 stats 中，但我们也直接计算一些常见值）。

In [10]:
# trades_df 来自上面导出的交易明细（若为空，请确认策略有交易）
if trades_df.empty:
    print('No trades generated by the strategy. Consider adjusting parameters or checking data length.')
else:
    trades_df['PnL'] = trades_df['PnL']
    total_trades = len(trades_df)
    wins = (trades_df['PnL'] > 0).sum()
    win_rate = wins / total_trades if total_trades>0 else float('nan')
    avg_pnl = trades_df['PnL'].mean()
    print(f'total_trades={total_trades}, wins={wins}, win_rate={win_rate:.2%}, avg_pnl={avg_pnl:.4f}')
    print('Top 5 trades by PnL:')
    display(trades_df.sort_values('PnL', ascending=False).head())


NameError: name 'trades_df' is not defined

## 可选：把策略信号保存为列以便 Day3 使用（例如 ML 层复查）
我们把均线信号添加到原始价格数据并保存为 parquet，Day3 可以直接读取并做特征叠加。

In [11]:
# 计算信号列并保存
df_signals = df.copy()
df_signals['sma_short'] = df_signals['Close'].rolling(50).mean()
df_signals['sma_long'] = df_signals['Close'].rolling(200).mean()
df_signals['sma_cross_signal'] = (df_signals['sma_short'] > df_signals['sma_long']).astype(int)
signals_path = os.path.join('data', 'signals_sma_50_200.parquet')
df_signals.reset_index().to_parquet(signals_path, index=False)
print('Saved signals to', signals_path)


OSError: Cannot save file into a non-existent directory: 'data'

## 下一步建议
- 若回测生成过多交易或表现不稳定：尝试调整 `n1/n2`、`max_pct`，或增加最小持仓时间等规则（避免频繁交易）。
- 若希望更接近实盘：在 Day4 模拟 T+1、涨跌停与最小下单单位（手数）等规则；若使用券商 API，也需实现订单执行层并考虑滑点/拒单情况。
- Day3 将引入特征工程与 ML 信号：你可以直接在 Day3 使用本单元保存的 signals parquet 文件作为基线特征。

