In [1]:
"""
趋势增强型智能网格策略 v3.1 (Bug修复版)
修复了可视化中的索引对齐问题
"""

import pandas as pd
import numpy as np
from typing import List, Dict, Optional, Tuple
from dataclasses import dataclass, field
from datetime import datetime
from pathlib import Path
import warnings
import matplotlib.pyplot as plt
from matplotlib.gridspec import GridSpec

In [2]:
warnings.filterwarnings('ignore')

In [3]:
# 设置中文字体
plt.rcParams['font.sans-serif'] = ['SimHei', 'DejaVu Sans', 'Arial Unicode MS']
plt.rcParams['axes.unicode_minus'] = False

In [4]:
@dataclass(slots=True)
class Trade:
    """交易记录（使用slots减少内存占用）"""
    date: pd.Timestamp
    type: str
    price: float
    shares: float
    value: float
    pnl: Optional[float] = None
    grid_level: Optional[int] = None


In [5]:
@dataclass
class StrategyConfig:
    """策略配置参数"""
    initial_capital: float = 100000.0
    base_grid_pct: float = 0.025
    grid_levels: int = 7
    max_position: float = 0.9
    stop_loss: float = 0.15
    trend_period: int = 20
    vol_period: int = 14
    cooldown_days: int = 20
    min_grid_spacing: float = 0.015
    max_grid_spacing: float = 0.06
    position_update_threshold: float = 0.05  # 网格中心更新阈值

In [6]:
class GridManager:
    """网格管理器 - 独立管理网格逻辑"""
    
    def __init__(self, center_price: float, levels: int, spacing: float):
        self.center_price = center_price
        self.levels = levels
        self.spacing = spacing
        self.grids: List[Dict] = []
        self._init_grids()
    
    def _init_grids(self):
        """初始化网格"""
        self.grids = []
        for i in range(1, self.levels + 1):
            self.grids.append({
                'level': i,
                'buy_price': self.center_price * (1 - self.spacing * i),
                'sell_price': self.center_price * (1 + self.spacing * i),
                'active': False,
                'shares': 0.0,
                'entry_price': 0.0  # 记录实际买入价格
            })
    
    def update_center(self, new_center: float, spacing: float, 
                     preserve_active: bool = False) -> List[Dict]:
        """
        更新网格中心
        
        Args:
            new_center: 新的中心价格
            spacing: 新的网格间距
            preserve_active: 是否保留已激活网格的持仓（False则清仓）
        
        Returns:
            需要平仓的网格列表（如果preserve_active=False）
        """
        closed_grids = []
        
        if preserve_active:
            # 保留已激活网格，只更新未激活的价格
            old_grids = {g['level']: g for g in self.grids if g['active']}
            self.center_price = new_center
            self.spacing = spacing
            self._init_grids()
            
            # 恢复已激活网格
            for level, old_grid in old_grids.items():
                if level <= self.levels:
                    self.grids[level-1]['active'] = True
                    self.grids[level-1]['shares'] = old_grid['shares']
                    self.grids[level-1]['entry_price'] = old_grid['entry_price']
        else:
            # 收集需要平仓的网格
            closed_grids = [g for g in self.grids if g['active']]
            self.center_price = new_center
            self.spacing = spacing
            self._init_grids()
        
        return closed_grids
    
    def get_active_grids(self) -> List[Dict]:
        """获取已激活的网格"""
        return [g for g in self.grids if g['active']]
    
    def get_inactive_buy_grids(self) -> List[Dict]:
        """获取未激活的买入网格（按价格降序，优先买低价）"""
        inactive = [g for g in self.grids if not g['active']]
        return sorted(inactive, key=lambda x: x['buy_price'], reverse=True)
    
    def check_signals(self, high: float, low: float) -> Tuple[List[Dict], List[Dict]]:
        """
        检查交易信号
        
        Returns:
            (买入信号列表, 卖出信号列表)
        """
        buy_signals = []
        sell_signals = []
        
        for grid in self.grids:
            if not grid['active'] and low <= grid['buy_price']:
                buy_signals.append(grid)
            elif grid['active'] and high >= grid['sell_price']:
                sell_signals.append(grid)
        
        return buy_signals, sell_signals

In [7]:
class TrendEnhancedGridStrategy:
    """
    趋势增强型智能网格策略 v3.1
    
    主要优化：
    1. 预计算所有技术指标，避免循环中重复计算
    2. 修复execute_sell的bug
    3. 分离网格管理逻辑
    4. 添加更完善的风险管理
    5. 优化内存使用和运行速度
    """
    
    def __init__(self, config: Optional[StrategyConfig] = None):
        self.config = config or StrategyConfig()
        self.reset()
    
    def reset(self):
        """重置策略状态"""
        self.cash = self.config.initial_capital
        self.position = 0.0
        self.cost_basis = 0.0
        self.grid_manager: Optional[GridManager] = None
        self.trades: List[Trade] = []
        self.values: List[Dict] = []
        self.max_value = self.config.initial_capital
        self.cooldown_counter = 0
        self.last_grid_update_idx = 0
        
        # 预计算的指标
        self._df: Optional[pd.DataFrame] = None
        self._current_idx: int = 0
    
    def prepare_indicators(self, df: pd.DataFrame) -> pd.DataFrame:
        """
        预计算所有技术指标（向量化，大幅提升性能）
        """
        df = df.copy()
        
        # 确保数据类型正确
        df['datetime'] = pd.to_datetime(df['datetime'])
        df = df.sort_values('datetime').reset_index(drop=True)
        
        # 处理成交量
        if df['Volume'].dtype == object:
            def parse_volume(v):
                if isinstance(v, str):
                    v = v.strip()
                    if 'B' in v.upper(): 
                        return float(v.replace('B', '').replace('b', '')) * 1e9
                    elif 'M' in v.upper(): 
                        return float(v.replace('M', '').replace('m', '')) * 1e6
                    elif 'K' in v.upper():
                        return float(v.replace('K', '').replace('k', '')) * 1e3
                return float(v)
            df['Volume'] = df['Volume'].apply(parse_volume)
        
        # 计算RSI
        delta = df['Close'].diff()
        gain = (delta.where(delta > 0, 0)).rolling(window=14).mean()
        loss = (-delta.where(delta < 0, 0)).rolling(window=14).mean()
        rs = gain / loss
        df['RSI'] = 100 - (100 / (1 + rs))
        
        # 计算ATR
        high_low = df['High'] - df['Low']
        high_close = (df['High'] - df['Close'].shift()).abs()
        low_close = (df['Low'] - df['Close'].shift()).abs()
        tr = pd.concat([high_low, high_close, low_close], axis=1).max(axis=1)
        df['ATR'] = tr.rolling(window=self.config.vol_period).mean()
        
        # 计算均线
        df['MA5'] = df['Close'].rolling(5).mean()
        df['MA10'] = df['Close'].rolling(10).mean()
        df['MA20'] = df['Close'].rolling(self.config.trend_period).mean()
        
        # 向量化趋势计算
        conditions = [
            (df['Close'] > df['MA5']) & (df['MA5'] > df['MA10']) & (df['MA10'] > df['MA20']),
            (df['Close'] < df['MA5']) & (df['MA5'] < df['MA10']) & (df['MA10'] < df['MA20'])
        ]
        
        # 多头趋势评分
        bull_score = ((df['Close'] / df['MA20'] - 1) * 5).clip(upper=1.0)
        # 空头趋势评分
        bear_score = ((df['Close'] / df['MA20'] - 1) * 5).clip(lower=-1.0)
        # 震荡评分
        range_score = ((df['Close'] - df['MA20']) / df['MA20'] * 3).clip(-0.3, 0.3)
        
        df['Trend'] = np.select(conditions, [bull_score, bear_score], default=range_score)
        
        # RSI调整
        rsi_overbought = df['RSI'] > 75
        rsi_oversold = df['RSI'] < 25
        df.loc[rsi_overbought, 'Trend'] -= 0.3
        df.loc[rsi_oversold, 'Trend'] += 0.3
        df['Trend'] = df['Trend'].clip(-1.0, 1.0)
        
        # 计算成交量均线
        df['Volume_MA20'] = df['Volume'].rolling(20).mean()
        
        return df
    
    def get_grid_spacing(self, atr: float, price: float) -> float:
        """基于ATR计算动态网格间距"""
        if atr <= 0 or price <= 0:
            return self.config.base_grid_pct
        
        volatility = atr / price
        spacing = volatility * 2.5
        
        return max(self.config.min_grid_spacing, 
                  min(self.config.max_grid_spacing, spacing))
    
    def get_position_size(self, total_value: float, price: float,
                         is_trend_add: bool = False) -> float:
        """计算仓位大小"""
        if total_value <= 0 or price <= 0:
            return 0.0
        
        # 基础仓位 15%
        base_size = 0.15
        
        # 趋势增强
        current_trend = self._df['Trend'].iloc[self._current_idx]
        if is_trend_add and current_trend > 0:
            base_size *= (1 + current_trend * 0.5)
        
        # 波动率调整
        atr = self._df['ATR'].iloc[self._current_idx]
        if atr > 0 and price > 0:
            vol = atr / price
            base_size /= (1 + vol * 5)
        
        # 检查仓位限制
        current_pct = self.position * price / total_value
        available = self.config.max_position - current_pct
        
        # 单次最大30%，且不能超过可用额度
        return max(0.0, min(base_size, available, 0.3))
    
    def execute_buy(self, grid: Dict, price: float, date: pd.Timestamp,
                   total_value: float, is_trend_add: bool = False) -> bool:
        """执行买入"""
        size = self.get_position_size(total_value, price, is_trend_add)
        if size <= 0:
            return False
        
        # 计算投资金额（趋势好时增加投入）
        current_trend = self._df['Trend'].iloc[self._current_idx]
        trend_multiplier = 1 + max(0, current_trend) * 0.5
        invest = total_value * size * trend_multiplier
        
        # 计算可买股数
        max_shares_by_cash = self.cash / price if price > 0 else 0
        shares = min(invest / price, max_shares_by_cash) if price > 0 else 0
        
        if shares <= 0:
            return False
        
        # 更新资金和持仓
        cost = shares * price
        self.cash -= cost
        self.position += shares
        
        # 更新成本基础
        if self.position > 0:
            total_cost = self.cost_basis * (self.position - shares) + cost
            self.cost_basis = total_cost / self.position
        else:
            self.cost_basis = price
        
        # 更新网格状态
        grid['active'] = True
        grid['shares'] = shares
        grid['entry_price'] = price
        
        # 记录交易
        self.trades.append(Trade(
            date=date, type='BUY', price=price,
            shares=shares, value=cost, grid_level=grid['level']
        ))
        
        return True
    
    def execute_sell(self, grid: Dict, price: float, date: pd.Timestamp) -> Optional[Trade]:
        """
        执行卖出（修复原版bug：先保存shares再清零）
        """
        if not grid['active'] or grid['shares'] <= 0:
            return None
        
        # 先保存关键数据，避免后续修改影响
        shares = grid['shares']
        entry_price = grid['entry_price'] if grid['entry_price'] > 0 else grid['buy_price']
        
        # 计算交易金额
        value = shares * price
        self.cash += value
        
        # 计算盈亏
        cost = shares * entry_price
        pnl = value - cost
        
        # 更新持仓
        self.position -= shares
        if self.position <= 1e-10:  # 浮点数比较
            self.position = 0
            self.cost_basis = 0
        
        # 创建交易记录（在清零网格之前）
        trade = Trade(
            date=date, type='SELL', price=price,
            shares=shares, value=value, pnl=pnl, grid_level=grid['level']
        )
        self.trades.append(trade)
        
        # 最后清零网格状态
        grid['active'] = False
        grid['shares'] = 0
        grid['entry_price'] = 0
        
        return trade
    
    def execute_stop_loss(self, price: float, date: pd.Timestamp) -> Trade:
        """执行止损清仓"""
        if self.position <= 0:
            raise ValueError("持仓为空时不能止损")
        
        value = self.position * price
        self.cash += value
        
        trade = Trade(
            date=date, type='STOP_LOSS', price=price,
            shares=self.position, value=value
        )
        self.trades.append(trade)
        
        # 重置状态
        self.position = 0
        self.cost_basis = 0
        self.cooldown_counter = self.config.cooldown_days
        
        # 重置网格
        if self.grid_manager:
            for grid in self.grid_manager.grids:
                grid['active'] = False
                grid['shares'] = 0
                grid['entry_price'] = 0
        
        return trade
    
    def check_stop_loss(self, total_value: float, price: float,
                       date: pd.Timestamp) -> bool:
        """检查并执行止损"""
        self.max_value = max(self.max_value, total_value)
        
        if self.max_value <= 0:
            return False
        
        drawdown = (self.max_value - total_value) / self.max_value
        
        if drawdown > self.config.stop_loss and self.position > 0:
            self.execute_stop_loss(price, date)
            return True
        
        return False
    
    def should_update_grids(self, current_price: float) -> bool:
        """判断是否需要更新网格中心"""
        if self.grid_manager is None:
            return True
        
        last_center = self.grid_manager.center_price
        deviation = abs(current_price - last_center) / last_center
        
        # 价格偏离超过5%才更新，避免过于频繁
        return deviation > self.config.position_update_threshold
    
    def run_backtest(self, df: pd.DataFrame) -> pd.DataFrame:
        """
        运行回测（优化版）
        """
        self.reset()
        
        # 预计算指标
        print("预计算技术指标...")
        self._df = self.prepare_indicators(df)
        
        start_idx = max(self.config.trend_period, self.config.vol_period)
        n_bars = len(self._df)
        
        if n_bars <= start_idx:
            raise ValueError(f"数据长度不足，需要至少{start_idx}条数据")
        
        # 初始化
        init_price = self._df['Close'].iloc[start_idx]
        init_atr = self._df['ATR'].iloc[start_idx]
        spacing = self.get_grid_spacing(init_atr, init_price)
        self.grid_manager = GridManager(init_price, self.config.grid_levels, spacing)
        self.last_grid_update_idx = start_idx
        
        print(f"开始回测: {self._df['datetime'].iloc[start_idx]} 至 {self._df['datetime'].iloc[-1]}")
        
        for i in range(start_idx, n_bars):
            self._current_idx = i
            
            date = self._df['datetime'].iloc[i]
            price = self._df['Close'].iloc[i]
            high = self._df['High'].iloc[i]
            low = self._df['Low'].iloc[i]
            
            # 处理冷却期
            if self.cooldown_counter > 0:
                self.cooldown_counter -= 1
                total = self.cash + self.position * price
                self.max_value = max(self.max_value, total)
                self.values.append({
                    'date': date, 'price': price, 'cash': self.cash,
                    'position': self.position, 'value': total,
                    'trend': self._df['Trend'].iloc[i]
                })
                
                # 冷却结束，重新初始化网格
                if self.cooldown_counter == 0 and self.cash > 10000:
                    self.grid_manager = GridManager(price, self.config.grid_levels,
                                                   self.get_grid_spacing(
                                                       self._df['ATR'].iloc[i], price))
                    self.last_grid_update_idx = i
                continue
            
            # 定期更新网格中心
            current_trend = self._df['Trend'].iloc[i]
            if (i - self.last_grid_update_idx >= 5 or abs(current_trend) > 0.6):
                if self.should_update_grids(price):
                    if abs(current_trend) > 0.3 and self.grid_manager:
                        # 根据趋势方向调整中心
                        old_center = self.grid_manager.center_price
                        if current_trend > 0:
                            new_center = max(old_center, price * 0.95)
                        else:
                            new_center = min(old_center, price * 1.05)
                        
                        spacing = self.get_grid_spacing(self._df['ATR'].iloc[i], new_center)
                        self.grid_manager.update_center(new_center, spacing, preserve_active=True)
                    
                    self.last_grid_update_idx = i
            
            # 计算市值
            position_value = self.position * price
            total_value = self.cash + position_value
            
            # 止损检查
            if self.check_stop_loss(total_value, price, date):
                self.values.append({
                    'date': date, 'price': price, 'cash': self.cash,
                    'position': 0, 'value': total_value,
                    'trend': current_trend
                })
                continue
            
            # 成交量过滤
            vol_ok = True
            if i >= 20:
                avg_vol = self._df['Volume_MA20'].iloc[i]
                vol_ok = self._df['Volume'].iloc[i] > avg_vol * 0.6
            
            # 网格交易
            if self.grid_manager and vol_ok and current_trend > -0.5:
                buy_signals, sell_signals = self.grid_manager.check_signals(high, low)
                
                # 先处理卖出（释放资金）
                for grid in sell_signals:
                    self.execute_sell(grid, price, date)
                
                # 再处理买入
                for grid in buy_signals:
                    self.execute_buy(grid, price, date, total_value)
            
            # 趋势加仓（每10天检查一次，避免过于频繁）
            if (current_trend > 0.7 and self.position > 0 and 
                (i - start_idx) % 10 == 0 and i >= 20):
                
                recent_high = self._df['High'].iloc[i-20:i].max()
                if price >= recent_high * 0.98:
                    current_pct = self.position * price / total_value
                    if current_pct < self.config.max_position * 0.8:
                        # 找到第一个未激活的网格用于记录
                        inactive = [g for g in self.grid_manager.grids if not g['active']]
                        if inactive:
                            self.execute_buy(inactive[0], price, date, total_value, 
                                           is_trend_add=True)
            
            # 趋势减仓
            if current_trend < -0.8 and self.position > 0:
                reduce_pct = min(0.4, abs(current_trend))
                reduce_shares = self.position * reduce_pct
                
                if reduce_shares > 0:
                    value = reduce_shares * price
                    self.cash += value
                    self.position -= reduce_shares
                    
                    self.trades.append(Trade(
                        date=date, type='TREND_REDUCE', price=price,
                        shares=reduce_shares, value=value
                    ))
            
            # 记录每日数据
            total_value = self.cash + self.position * price
            self.values.append({
                'date': date, 'price': price, 'cash': self.cash,
                'position': self.position, 'value': total_value,
                'trend': current_trend
            })
        
        results = pd.DataFrame(self.values)
        print(f"回测完成: 共{len(results)}个交易日，{len(self.trades)}笔交易")
        return results
    
    def get_performance_metrics(self, results: pd.DataFrame) -> Dict:
        """计算绩效指标"""
        if len(results) == 0:
            return {}
        
        initial = self.config.initial_capital
        final = results['value'].iloc[-1]
        total_return = (final - initial) / initial if initial > 0 else 0
        
        days = (results['date'].iloc[-1] - results['date'].iloc[0]).days
        years = days / 365.25 if days > 0 else 0
        annual_return = (1 + total_return) ** (1/years) - 1 if years > 0 else 0
        
        # 最大回撤
        cummax = results['value'].cummax()
        drawdowns = (cummax - results['value']) / cummax
        max_dd = drawdowns.max() if len(cummax) > 0 else 0
        
        # 夏普比率
        daily_ret = results['value'].pct_change().dropna()
        if len(daily_ret) > 1 and daily_ret.std() > 0:
            sharpe = ((daily_ret.mean() * 252 - 0.02) / 
                     (daily_ret.std() * np.sqrt(252)))
        else:
            sharpe = 0
        
        # 交易统计
        trades_df = pd.DataFrame([{
            'type': t.type, 'pnl': t.pnl, 'value': t.value
        } for t in self.trades])
        
        if len(trades_df) > 0:
            buy_count = len(trades_df[trades_df['type'] == 'BUY'])
            sell_count = len(trades_df[trades_df['type'] == 'SELL'])
            stop_count = len(trades_df[trades_df['type'] == 'STOP_LOSS'])
            add_count = len(trades_df[trades_df['type'] == 'TREND_ADD'])
            reduce_count = len(trades_df[trades_df['type'] == 'TREND_REDUCE'])
            
            sell_trades = trades_df[trades_df['type'] == 'SELL']
            if len(sell_trades) > 0:
                wins = len(sell_trades[sell_trades['pnl'] > 0])
                win_rate = wins / len(sell_trades) * 100
                avg_pnl = sell_trades['pnl'].mean()
                total_pnl = sell_trades['pnl'].sum()
            else:
                win_rate = avg_pnl = total_pnl = 0
        else:
            buy_count = sell_count = stop_count = add_count = reduce_count = 0
            win_rate = avg_pnl = total_pnl = 0
        
        return {
            'initial_capital': initial,
            'final_value': final,
            'total_return_pct': total_return * 100,
            'annual_return_pct': annual_return * 100,
            'max_drawdown_pct': max_dd * 100,
            'sharpe_ratio': sharpe,
            'total_trades': len(self.trades),
            'buy_trades': buy_count,
            'sell_trades': sell_count,
            'stop_loss_trades': stop_count,
            'trend_add_trades': add_count,
            'trend_reduce_trades': reduce_count,
            'win_rate_pct': win_rate,
            'avg_pnl': avg_pnl,
            'total_pnl': total_pnl,
            'calmar_ratio': annual_return / max_dd if max_dd > 0 else 0
        }


In [8]:
class StrategyVisualizer:
    """策略可视化（修复索引对齐问题）"""
    
    def __init__(self, config: StrategyConfig):
        self.config = config
        self.colors = {
            'strategy': '#1f77b4',
            'buyhold': '#ff7f0e',
            'profit': '#2ca02c',
            'loss': '#d62728',
            'buy': '#00aa00',
            'sell': '#00ff00',
            'stop': '#ff0000',
            'add': '#0000ff',
            'reduce': '#ff8800'
        }
    
    def create_comprehensive_report(self, df: pd.DataFrame, 
                                   results: pd.DataFrame,
                                   trades: List[Trade],
                                   output_path: str):
        """生成综合分析报告（修复版）"""
        trades_df = pd.DataFrame([{
            'date': t.date, 'type': t.type, 'price': t.price,
            'shares': t.shares, 'value': t.value, 'pnl': t.pnl
        } for t in trades])
        
        fig = plt.figure(figsize=(20, 24))
        gs = GridSpec(6, 2, height_ratios=[3, 2, 2, 2, 2, 2], 
                     hspace=0.35, wspace=0.25)
        
        # 1. 主图：净值对比
        ax1 = fig.add_subplot(gs[0, :])
        ax1.plot(results['date'], results['value'], 
                label='Trend Grid Strategy', linewidth=2.5, 
                color=self.colors['strategy'], alpha=0.9)
        
        # 修复：买入持有基准计算 - 使用数值索引而非布尔掩码
        start_date = results['date'].iloc[0]
        end_date = results['date'].iloc[-1]
        
        # 找到对应的起始和结束索引
        start_mask = df['datetime'] <= start_date
        if start_mask.any():
            start_idx = start_mask.idxmax()  # 第一个满足条件的索引
        else:
            start_idx = 0
            
        # 使用 .loc 获取从start_idx开始到结束的数据
        df_subset = df.loc[start_idx:].copy()
        df_subset = df_subset[df_subset['datetime'] <= end_date]  # 过滤结束日期
        
        if len(df_subset) > 0:
            initial_price = df_subset['Close'].iloc[0]
            buyhold_values = df_subset['Close'] / initial_price * self.config.initial_capital
            
            ax1.plot(df_subset['datetime'].values, buyhold_values.values, 
                    label='Buy & Hold', linewidth=2, 
                    color=self.colors['buyhold'], alpha=0.7, linestyle='--')
            
            buyhold_final = buyhold_values.iloc[-1]
        else:
            buyhold_final = self.config.initial_capital
        
        ax1.axhline(y=self.config.initial_capital, color='gray', 
                   linestyle=':', alpha=0.5, linewidth=1)
        ax1.fill_between(results['date'], self.config.initial_capital, results['value'],
                        where=(results['value'] >= self.config.initial_capital), 
                        alpha=0.2, color=self.colors['profit'])
        ax1.fill_between(results['date'], self.config.initial_capital, results['value'],
                        where=(results['value'] < self.config.initial_capital), 
                        alpha=0.2, color=self.colors['loss'])
        
        # 添加收益标注
        final_value = results['value'].iloc[-1]
        strategy_return = (final_value / self.config.initial_capital - 1) * 100
        buyhold_return = (buyhold_final / self.config.initial_capital - 1) * 100
        
        ax1.set_title(f'Trend-Enhanced Smart Grid Strategy Performance\n' + 
                     f'Strategy: {strategy_return:.1f}% vs Buy&Hold: {buyhold_return:.1f}%',
                     fontsize=16, fontweight='bold', pad=20)
        ax1.set_ylabel('Portfolio Value (CNY)', fontsize=12)
        ax1.legend(loc='upper left', fontsize=11)
        ax1.grid(True, alpha=0.3, linestyle='--')
        
        # 添加注释
        ax1.annotate(f'Strategy\n{final_value:,.0f}\n({strategy_return:.1f}%)', 
                    xy=(results['date'].iloc[-1], final_value), 
                    xytext=(-120, 20), textcoords='offset points',
                    bbox=dict(boxstyle='round,pad=0.5', facecolor=self.colors['strategy'], alpha=0.8),
                    fontsize=10, color='white', fontweight='bold')
        
        if len(df_subset) > 0:
            ax1.annotate(f'Buy&Hold\n{buyhold_final:,.0f}\n({buyhold_return:.1f}%)', 
                        xy=(df_subset['datetime'].iloc[-1], buyhold_final), 
                        xytext=(-120, -60), textcoords='offset points',
                        bbox=dict(boxstyle='round,pad=0.5', facecolor=self.colors['buyhold'], alpha=0.8),
                        fontsize=10, color='white', fontweight='bold')
        
        # 2. 价格与交易点
        ax2 = fig.add_subplot(gs[1, :])
        ax2.plot(df['datetime'], df['Close'], label='Stock Price', 
                color='black', linewidth=1.5, alpha=0.8)
        
        if len(trades_df) > 0:
            for trade_type, marker, color_key in [
                ('BUY', '^', 'buy'), ('SELL', 'v', 'sell'),
                ('STOP_LOSS', 'x', 'stop'), ('TREND_ADD', '*', 'add'),
                ('TREND_REDUCE', 'D', 'reduce')
            ]:
                subset = trades_df[trades_df['type'] == trade_type]
                if len(subset) > 0:
                    ax2.scatter(subset['date'], subset['price'], 
                               marker=marker, s=80, color=self.colors[color_key],
                               alpha=0.8, label=f'{trade_type} ({len(subset)})', zorder=5)
        
        ax2.set_title('Stock Price & Trading Signals', fontsize=14, fontweight='bold')
        ax2.set_ylabel('Price (CNY)', fontsize=11)
        ax2.legend(loc='upper left', ncol=5, fontsize=9)
        ax2.grid(True, alpha=0.3)
        
        # 3. 持仓比例
        ax3 = fig.add_subplot(gs[2, 0])
        position_pct = results['position'] * results['price'] / results['value'] * 100
        ax3.fill_between(results['date'], 0, position_pct, alpha=0.6, color='steelblue')
        ax3.plot(results['date'], position_pct, color='navy', linewidth=1)
        ax3.axhline(y=self.config.max_position*100, color='darkred', 
                   linestyle='--', alpha=0.3)
        ax3.set_ylabel('Position Ratio (%)', fontsize=11)
        ax3.set_title('Position Allocation', fontsize=13, fontweight='bold')
        ax3.grid(True, alpha=0.3)
        ax3.set_ylim(0, 100)
        
        # 4. 现金余额
        ax4 = fig.add_subplot(gs[2, 1])
        ax4.fill_between(results['date'], 0, results['cash'], 
                        alpha=0.5, color='green')
        ax4.plot(results['date'], results['cash'], 
                color='darkgreen', linewidth=1.5)
        ax4.set_ylabel('Cash (CNY)', fontsize=11)
        ax4.set_title('Cash Reserve', fontsize=13, fontweight='bold')
        ax4.grid(True, alpha=0.3)
        
        # 5. 趋势强度
        ax5 = fig.add_subplot(gs[3, 0])
        ax5.plot(results['date'], results['trend'], 
                color='purple', linewidth=1.5)
        ax5.axhline(y=0.5, color='green', linestyle='--', alpha=0.5)
        ax5.axhline(y=-0.5, color='red', linestyle='--', alpha=0.5)
        ax5.fill_between(results['date'], -1, 1, 
                        where=(results['trend'] > 0.5), alpha=0.2, color='green')
        ax5.fill_between(results['date'], -1, 1, 
                        where=(results['trend'] < -0.5), alpha=0.2, color='red')
        ax5.set_ylabel('Trend Strength', fontsize=11)
        ax5.set_title('Trend Indicator (-1 to 1)', fontsize=13, fontweight='bold')
        ax5.grid(True, alpha=0.3)
        ax5.set_ylim(-1.2, 1.2)
        
        # 6. RSI
        ax6 = fig.add_subplot(gs[3, 1])
        ax6.plot(df['datetime'], df['RSI'], color='purple', linewidth=1)
        ax6.axhline(y=70, color='red', linestyle='--', alpha=0.5)
        ax6.axhline(y=30, color='green', linestyle='--', alpha=0.5)
        ax6.fill_between(df['datetime'], 30, 70, alpha=0.1, color='gray')
        ax6.set_ylabel('RSI', fontsize=11)
        ax6.set_title('RSI Indicator (14-day)', fontsize=13, fontweight='bold')
        ax6.grid(True, alpha=0.3)
        ax6.set_ylim(0, 100)
        
        # 7. 回撤
        ax7 = fig.add_subplot(gs[4, 0])
        cummax = results['value'].cummax()
        drawdown = (cummax - results['value']) / cummax * 100
        ax7.fill_between(results['date'], 0, drawdown, 
                        alpha=0.5, color='red')
        ax7.plot(results['date'], drawdown, 
                color='darkred', linewidth=1.5)
        
        max_dd = drawdown.max()
        max_dd_idx = drawdown.idxmax()
        ax7.scatter(results['date'].iloc[max_dd_idx], max_dd, 
                   s=100, color='darkred', zorder=5)
        ax7.annotate(f'Max DD: {max_dd:.1f}%', 
                    xy=(results['date'].iloc[max_dd_idx], max_dd),
                    xytext=(10, 10), textcoords='offset points',
                    bbox=dict(boxstyle='round', facecolor='yellow', alpha=0.8),
                    fontsize=10, fontweight='bold')
        ax7.set_ylabel('Drawdown (%)', fontsize=11)
        ax7.set_title('Drawdown Analysis', fontsize=13, fontweight='bold')
        ax7.grid(True, alpha=0.3)
        
        # 8. 成交量
        ax8 = fig.add_subplot(gs[4, 1])
        # 计算涨跌颜色
        price_change = df['Close'].diff().fillna(0)
        colors_vol = ['red' if c >= 0 else 'green' for c in price_change]
        ax8.bar(df['datetime'], df['Volume']/1e6, 
               color=colors_vol, alpha=0.6, width=1)
        ax8.set_ylabel('Volume (M)', fontsize=11)
        ax8.set_title('Trading Volume', fontsize=13, fontweight='bold')
        ax8.grid(True, alpha=0.3)
        
        # 9. 盈亏分布
        ax9 = fig.add_subplot(gs[5, 0])
        sell_trades = trades_df[trades_df['type'] == 'SELL']
        if len(sell_trades) > 0 and 'pnl' in sell_trades.columns:
            pnls = sell_trades['pnl'].dropna()
            if len(pnls) > 0:
                colors_pnl = ['green' if p > 0 else 'red' for p in pnls]
                ax9.bar(range(len(pnls)), pnls, color=colors_pnl, alpha=0.7)
                ax9.axhline(y=0, color='black', linestyle='-', linewidth=1)
                
                win_rate = (pnls > 0).sum() / len(pnls) * 100
                ax9.set_xlabel('Trade Number', fontsize=11)
                ax9.set_ylabel('PnL (CNY)', fontsize=11)
                ax9.set_title(f'PnL Distribution (Win Rate: {win_rate:.1f}%)', 
                            fontsize=13, fontweight='bold')
                ax9.grid(True, alpha=0.3, axis='y')
        
        # 10. 交易统计
        ax10 = fig.add_subplot(gs[5, 1])
        trade_types = ['BUY', 'SELL', 'STOP_LOSS', 'TREND_ADD', 'TREND_REDUCE']
        trade_counts = [len(trades_df[trades_df['type']==t]) for t in trade_types]
        colors_type = ['green', 'lime', 'red', 'blue', 'orange']
        bars = ax10.bar([t.replace('_', '\n') for t in trade_types], 
                       trade_counts, color=colors_type, alpha=0.7)
        ax10.set_ylabel('Count', fontsize=11)
        ax10.set_title('Trade Statistics', fontsize=13, fontweight='bold')
        
        for bar, count in zip(bars, trade_counts):
            if count > 0:
                ax10.text(bar.get_x() + bar.get_width()/2, 
                         bar.get_height() + max(trade_counts)*0.01, 
                         str(count), ha='center', va='bottom', 
                         fontsize=10, fontweight='bold')
        ax10.grid(True, alpha=0.3, axis='y')
        
        plt.tight_layout()
        
        output_file = Path(output_path) / 'comprehensive_analysis.png'
        plt.savefig(output_file, dpi=200, bbox_inches='tight', 
                   facecolor='white', edgecolor='none')
        plt.close()
        
        print(f"✅ 综合分析图表已保存: {output_file}")
        return output_file

In [9]:
def print_performance_report(metrics: Dict):
    """打印格式化绩效报告"""
    print("\n" + "="*70)
    print(" " * 20 + "策略绩效报告 v3.1")
    print("="*70)
    
    print(f"{'初始资金:':<20} {metrics['initial_capital']:>15,.0f} CNY")
    print(f"{'最终价值:':<20} {metrics['final_value']:>15,.0f} CNY")
    print(f"{'总收益率:':<20} {metrics['total_return_pct']:>15.2f} %")
    print(f"{'年化收益:':<20} {metrics['annual_return_pct']:>15.2f} %")
    print(f"{'最大回撤:':<20} {metrics['max_drawdown_pct']:>15.2f} %")
    print(f"{'夏普比率:':<20} {metrics['sharpe_ratio']:>15.2f}")
    print(f"{'卡玛比率:':<20} {metrics['calmar_ratio']:>15.2f}")
    print("-"*70)
    print(f"{'总交易次数:':<20} {metrics['total_trades']:>15}")
    print(f"{'买入次数:':<20} {metrics['buy_trades']:>15}")
    print(f"{'卖出次数:':<20} {metrics['sell_trades']:>15}")
    print(f"{'止损次数:':<20} {metrics['stop_loss_trades']:>15}")
    print(f"{'趋势加仓:':<20} {metrics['trend_add_trades']:>15}")
    print(f"{'趋势减仓:':<20} {metrics['trend_reduce_trades']:>15}")
    print("-"*70)
    print(f"{'胜率:':<20} {metrics['win_rate_pct']:>15.1f} %")
    print(f"{'平均盈亏:':<20} {metrics['avg_pnl']:>15,.0f} CNY")
    print(f"{'总盈亏:':<20} {metrics['total_pnl']:>15,.0f} CNY")
    print("="*70)


In [10]:
def main():
    """主函数"""
    # 配置
    DATA_PATH = 'C:/Users/1/Desktop/python量化/603993历史数据(2020-2025).csv'
    OUTPUT_PATH = 'C:/Users/1/Desktop'
    
    print("="*70)
    print("趋势增强型智能网格策略 v3.1 - 开始运行")
    print("="*70)
    
    # 1. 加载数据
    print("\n1. 加载数据...")
    try:
        df = pd.read_csv(DATA_PATH)
        print(f"   原始数据: {len(df)} 行")
    except Exception as e:
        print(f"   ❌ 数据加载失败: {e}")
        return
    
    # 2. 初始化策略
    print("\n2. 初始化策略...")
    config = StrategyConfig(
        initial_capital=100000,
        base_grid_pct=0.025,
        grid_levels=7,
        max_position=0.9,
        stop_loss=0.15
    )
    
    strategy = TrendEnhancedGridStrategy(config)
    
    # 3. 运行回测
    print("\n3. 运行回测...")
    try:
        results = strategy.run_backtest(df)
    except Exception as e:
        print(f"   ❌ 回测失败: {e}")
        import traceback
        traceback.print_exc()
        return
    
    # 4. 计算绩效
    print("\n4. 计算绩效指标...")
    metrics = strategy.get_performance_metrics(results)
    print_performance_report(metrics)
    
    # 5. 生成图表
    print("\n5. 生成可视化图表...")
    try:
        visualizer = StrategyVisualizer(config)  # 传递config
        visualizer.create_comprehensive_report(
            strategy._df, results, strategy.trades, OUTPUT_PATH
        )
    except Exception as e:
        print(f"   ⚠️ 图表生成失败: {e}")
        import traceback
        traceback.print_exc()
    
    print("\n" + "="*70)
    print("✅ 回测完成！")
    print(f"输出目录: {OUTPUT_PATH}")
    print("="*70)


In [11]:
if __name__ == "__main__":
    main()

趋势增强型智能网格策略 v3.1 - 开始运行

1. 加载数据...
   原始数据: 1473 行

2. 初始化策略...

3. 运行回测...
预计算技术指标...
开始回测: 2020-02-25 00:00:00 至 2026-02-13 00:00:00
回测完成: 共1453个交易日，69笔交易

4. 计算绩效指标...

                    策略绩效报告 v3.1
初始资金:                        100,000 CNY
最终价值:                        152,863 CNY
总收益率:                          52.86 %
年化收益:                           7.37 %
最大回撤:                          13.25 %
夏普比率:                           0.59
卡玛比率:                           0.56
----------------------------------------------------------------------
总交易次数:                            69
买入次数:                             34
卖出次数:                             33
止损次数:                              0
趋势加仓:                              0
趋势减仓:                              2
----------------------------------------------------------------------
胜率:                             60.6 %
平均盈亏:                          1,005 CNY
总盈亏:                          33,161 CNY

5. 生成可视化图表...
✅ 综合分析图表已保存: C:\Users\