# VectorBT双资产再平衡策略深度探索

本笔记本将`ImprovedVectorBTStrategy`拆分成独立的函数，让您可以分步骤地理解每个组件的内部功能和结构。

## 策略概述

这是一个基于Holt-Winters预测和磁滞回线逻辑的双资产再平衡策略：

1. **Holt-Winters信号计算** - 使用HWDP模型生成趋势预测信号
2. **磁滞回线逻辑** - 通过阈值判断避免频繁交易
3. **动态权重分配** - 根据信号在不同权重配置间切换
4. **智能再平衡** - 仅在权重变化且到达再平衡日期时执行交易
5. **VectorBT框架** - 使用专业回测框架进行性能分析

## 1. 导入库和环境设置

In [None]:
# 导入所需库
import numpy as np
import pandas as pd
import vectorbt as vbt
import matplotlib.pyplot as plt
from dffc.holt_winters._holt_winters import HWDP
from dffc.holt_winters._optimization import process_hw_opt
from dffc.fund_data import register_fund_data

# 设置vectorbt配置
print("配置vectorbt设置...")
vbt.settings.array_wrapper['freq'] = 'days'          # 时间频率为天
vbt.settings.returns['year_freq'] = '252 days'       # 年化天数（交易日）
vbt.settings.portfolio['seed'] = 42                  # 随机种子确保可重复性
vbt.settings.portfolio.stats['incl_unrealized'] = True  # 包含未实现盈亏

# 设置中文字体支持
plt.rcParams['font.sans-serif'] = ['SimHei', 'Microsoft YaHei', 'DejaVu Sans']
plt.rcParams['axes.unicode_minus'] = False

print("环境设置完成！")
print(f"VectorBT版本: {vbt.__version__}")
print("VectorBT设置已配置为:")
print(f"- 频率: {vbt.settings.array_wrapper['freq']}")
print(f"- 年化频率: {vbt.settings.returns['year_freq']}")
print(f"- 随机种子: {vbt.settings.portfolio['seed']}")
print(f"- 包含未实现盈亏: {vbt.settings.portfolio.stats['incl_unrealized']}")

## 2. 数据加载和准备函数

In [None]:
def load_fund_data(fund_codes, fund_names, start_date, end_date):
    """
    加载基金数据
    
    Args:
        fund_codes: list, 基金代码列表
        fund_names: list, 基金名称列表  
        start_date: str, 开始日期
        end_date: str, 结束日期
        
    Returns:
        DataFrame: 价格数据
    """
    print("注册基金数据源...")
    register_fund_data()
    
    print(f"下载基金数据: {fund_codes}")
    print(f"日期范围: {start_date} 至 {end_date}")
    
    fund_data = vbt.FundData.download(
        fund_codes,
        names=fund_names,
        start=start_date,
        end=end_date
    )
    
    prices = fund_data.get('cumulative_value').dropna()
    
    print(f"数据加载完成！")
    print(f"数据形状: {prices.shape}")
    print(f"数据列: {list(prices.columns)}")
    print(f"日期范围: {prices.index[0]} 至 {prices.index[-1]}")
    
    return prices

# 加载实际数据
prices = load_fund_data(
    fund_codes=['007467', '004253'],
    fund_names=['HL', 'GD'], 
    start_date='2022-07-01',
    end_date='2025-07-01'
)

# 显示数据预览
print("\n前5行数据:")
print(prices.head())
print("\n后5行数据:")
print(prices.tail())

## 3. Holt-Winters信号计算函数

In [None]:
def calculate_hw_signals(prices, optimization=True):
    """
    计算Holt-Winters信号
    
    Args:
        prices: DataFrame, 价格数据
        optimization: bool, 是否优化HW参数
        
    Returns:
        DataFrame: HW信号数据
    """
    print("开始计算Holt-Winters信号...")
    
    hw_signals = pd.DataFrame(index=prices.index, columns=prices.columns)
    
    if optimization:
        print("正在优化Holt-Winters参数...")
        # 使用优化算法寻找最佳参数
        result = process_hw_opt(prices, ".", 8)
        hw_params = {}
        
        for fund_result in result:
            hw_params[fund_result['fundcode']] = {
                'alpha': fund_result['alpha'],
                'beta': fund_result['beta'], 
                'gamma': fund_result['gamma'],
                'm': fund_result['season']
            }
            print(f"基金 {fund_result['fundcode']} 优化参数:")
            print(f"  alpha={fund_result['alpha']:.3f}, beta={fund_result['beta']:.3f}")
            print(f"  gamma={fund_result['gamma']:.3f}, m={fund_result['season']}")
    else:
        print("使用默认参数...")
        # 使用默认参数
        hw_params = {col: {'alpha': 0.3, 'beta': 0.1, 'gamma': 0.1, 'm': 8} 
                    for col in prices.columns}
        
    # 为每个资产计算HW信号
    for col in prices.columns:
        params = hw_params[col]
        print(f"计算 {col} 的HWDP信号...")
        
        hwdp_result = HWDP.run(
            prices[col], 
            alpha=params['alpha'],
            beta=params['beta'], 
            gamma=params['gamma'],
            m=params['m'],
            multiplicative=True
        )
        hw_signals[col] = hwdp_result.hwdp
        
    print("Holt-Winters信号计算完成！")
    return hw_signals

# 计算HW信号
hw_signals = calculate_hw_signals(prices, optimization=True)

# 显示信号统计
print("\nHW信号统计:")
print(hw_signals.describe())

# 计算信号差值
delta_hdp = hw_signals.iloc[:, 0] - hw_signals.iloc[:, 1]
print(f"\nHDP差值统计:")
print(f"均值: {delta_hdp.mean():.4f}")
print(f"标准差: {delta_hdp.std():.4f}")
print(f"最大值: {delta_hdp.max():.4f}")
print(f"最小值: {delta_hdp.min():.4f}")

In [None]:
delta_hdp.to_csv("./d_hwdp.csv")

## 4. 磁滞回线逻辑和目标权重生成

In [None]:
def generate_target_weights(hw_signals, prices, 
                           default_weights=[0.5, 0.5],
                           up_weights=[0.2, 0.8], 
                           down_weights=[0.8, 0.2],
                           threshold=0.6):
    """
    使用磁滞回线逻辑生成目标权重序列
    
    Args:
        hw_signals: DataFrame, HW信号数据
        prices: DataFrame, 价格数据（用于获取索引和列名）
        default_weights: list, 默认权重 [资产1, 资产2]
        up_weights: list, 上升趋势权重 [资产1, 资产2]  
        down_weights: list, 下降趋势权重 [资产1, 资产2]
        threshold: float, 磁滞回线阈值
        
    Returns:
        DataFrame: 目标权重序列
        pd.Series: 交易信号序列
    """
    print("生成目标权重序列...")
    print(f"权重配置:")
    print(f"  默认权重: {default_weights}")
    print(f"  上升权重: {up_weights}")  
    print(f"  下降权重: {down_weights}")
    print(f"  阈值: {threshold}")
    
    # 计算HDP差值 (第一个资产 - 第二个资产)
    delta_hdp = hw_signals.iloc[:, 0] - hw_signals.iloc[:, 1]
    
    # 初始化信号序列
    signals = pd.Series(index=prices.index, dtype=int)
    signals.iloc[0] = 0  # 初始信号为中性
    
    # 磁滞回线状态记忆
    memory_switch = True  # True表示可以向上切换，False表示可以向下切换
    signal_changes = []  # 记录信号变化
    
    print("执行磁滞回线逻辑...")
    
    for i in range(1, len(delta_hdp)):
        prev_switch = memory_switch
        
        if prev_switch and delta_hdp.iloc[i] > threshold:
            # 可以向上切换且信号超过上阈值
            signals.iloc[i] = 1  # 上升信号
            memory_switch = False  # 现在只能向下切换
            if signals.iloc[i-1] != 1:
                signal_changes.append((delta_hdp.index[i], '上升信号', delta_hdp.iloc[i]))
                
        elif not prev_switch and delta_hdp.iloc[i] < -threshold:
            # 可以向下切换且信号低于下阈值
            signals.iloc[i] = -1  # 下降信号
            memory_switch = True   # 现在只能向上切换
            if signals.iloc[i-1] != -1:
                signal_changes.append((delta_hdp.index[i], '下降信号', delta_hdp.iloc[i]))
                
        else:
            # 保持前一个信号
            signals.iloc[i] = signals.iloc[i-1]
    
    print(f"信号变化次数: {len(signal_changes)}")
    for date, signal_type, value in signal_changes[:5]:  # 显示前5次变化
        print(f"  {date.date()}: {signal_type}, HDP差值={value:.4f}")
    
    # 根据信号生成目标权重
    target_weights = pd.DataFrame(index=prices.index, columns=prices.columns)
    
    for i in range(len(signals)):
        if signals.iloc[i] == 1:
            target_weights.iloc[i] = up_weights    # 上升趋势权重
        elif signals.iloc[i] == -1:
            target_weights.iloc[i] = down_weights  # 下降趋势权重
        else:
            if i == 0:
                target_weights.iloc[i] = default_weights  # 初始默认权重
            else:
                target_weights.iloc[i] = target_weights.iloc[i-1]  # 保持前一权重
    
    # 统计权重分布
    weight_stats = {}
    for weight_type, weights in [('默认', default_weights), ('上升', up_weights), ('下降', down_weights)]:
        count = 0
        for i in range(len(target_weights)):
            if np.allclose(target_weights.iloc[i].values, weights):
                count += 1
        weight_stats[weight_type] = count
    
    print("\\n权重分布统计:")
    for weight_type, count in weight_stats.items():
        percentage = count / len(target_weights) * 100
        print(f"  {weight_type}权重: {count}天 ({percentage:.1f}%)")
    
    return target_weights, signals

# 生成目标权重
target_weights, signals = generate_target_weights(
    hw_signals, prices,
    default_weights=[0.5, 0.5],
    up_weights=[0.2, 0.8],
    down_weights=[0.8, 0.2], 
    threshold=0.6
)

print("\\n目标权重示例:")
print(target_weights.head(10))

In [None]:
target_weights.to_csv("./temp.csv")

## 5. 再平衡时间表创建

In [None]:
def create_rebalance_schedule(prices, rebalance_freq='M'):
    """
    创建再平衡时间表
    
    Args:
        prices: DataFrame, 价格数据
        rebalance_freq: str, 再平衡频率 ('D', 'W', 'M', 'Q')
        
    Returns:
        pd.Series: 再平衡掩码（True表示该日期可以再平衡）
    """
    print(f"创建再平衡时间表，频率: {rebalance_freq}")
    
    if rebalance_freq == 'D':
        # 每日再平衡
        rb_mask = pd.Series(True, index=prices.index)
        desc = "每日"
    elif rebalance_freq == 'W':
        # 每周再平衡（周期开始日）  
        rb_mask = ~prices.index.to_period('W').duplicated()
        desc = "每周"
    elif rebalance_freq == 'M':
        # 每月再平衡（月初）
        rb_mask = ~prices.index.to_period('M').duplicated()
        desc = "每月"
    elif rebalance_freq == 'Q':
        # 每季度再平衡（季度开始）
        rb_mask = ~prices.index.to_period('Q').duplicated()
        desc = "每季度"
    else:
        raise ValueError("rebalance_freq must be one of 'D', 'W', 'M', 'Q'")
    
    rb_count = rb_mask.sum()
    print(f"{desc}再平衡，共 {rb_count} 个再平衡机会")
    
    # 显示前几个再平衡日期
    rb_dates = prices.index[rb_mask]
    print(f"前10个再平衡日期: {rb_dates[:10].date.tolist()}")
    
    return rb_mask

def create_actual_rebalance_mask(target_weights, rebalance_schedule):
    """
    创建实际再平衡掩码（结合权重变化和再平衡时机）
    
    Args:
        target_weights: DataFrame, 目标权重
        rebalance_schedule: pd.Series, 再平衡时间表
        
    Returns:
        pd.Series: 实际再平衡掩码
    """
    print("创建实际再平衡掩码...")
    
    actual_rebalances = pd.Series(False, index=target_weights.index)
    
    # 第一天一定要初始化
    actual_rebalances.iloc[0] = True
    current_target = target_weights.iloc[0].copy()
    
    rebalance_count = 0
    
    for i in range(1, len(target_weights)):
        if rebalance_schedule.iloc[i]:
            # 这是一个再平衡机会日
            new_target = target_weights.iloc[i]
            if not new_target.equals(current_target):
                # 目标权重发生了变化，需要再平衡
                actual_rebalances.iloc[i] = True
                current_target = new_target.copy()
                rebalance_count += 1
    
    print(f"实际再平衡次数: {actual_rebalances.sum()}")
    print(f"权重变化触发的再平衡: {rebalance_count}")
    
    # 显示实际再平衡日期
    actual_rb_dates = target_weights.index[actual_rebalances]
    print(f"前10个实际再平衡日期: {actual_rb_dates[:10].date.tolist()}")
    
    return actual_rebalances

# 测试不同再平衡频率
for freq, desc in [('D', '每日'), ('W', '每周'), ('M', '每月'), ('Q', '每季度')]:
    print(f"\\n=== {desc}再平衡测试 ===")
    rebalance_schedule = create_rebalance_schedule(prices, freq)
    actual_rebalances = create_actual_rebalance_mask(target_weights, rebalance_schedule)

# 使用月度再平衡作为最终选择
print("\\n=== 选择月度再平衡作为最终策略 ===")
final_rebalance_schedule = create_rebalance_schedule(prices, 'M')
final_actual_rebalances = create_actual_rebalance_mask(target_weights, final_rebalance_schedule)

## 6. 回测数据结构准备

In [None]:
def prepare_backtest_data(prices, target_weights, rebalance_mask, adjust_factor=0.2):
    """
    准备vectorbt回测所需的数据结构
    
    Args:
        prices: DataFrame, 价格数据
        target_weights: DataFrame, 目标权重
        rebalance_mask: pd.Series, 再平衡掩码
        adjust_factor: float, 权重调整因子（渐进调整）
        
    Returns:
        tuple: (MultiIndex价格数据, 订单矩阵)
    """
    print("准备回测数据结构...")
    
    # 1. 创建MultiIndex结构（vectorbt要求）
    num_tests = 1
    _prices = prices.vbt.tile(num_tests, keys=pd.Index(np.arange(num_tests), name='symbol_group'))
    print(f"价格数据结构: {_prices.shape}")
    print(f"MultiIndex列: {_prices.columns}")
    
    # 2. 创建订单矩阵
    orders = np.full_like(_prices, np.nan)  # 初始化为NaN（无订单）
    
    # 3. 第一天初始化
    orders[0, :] = target_weights.iloc[0].values
    print(f"初始权重: {target_weights.iloc[0].values}")
    
    # 4. 在再平衡日期设置目标权重（应用调整因子）
    rebalance_count = 0
    
    for i, should_rebalance in enumerate(rebalance_mask):
        if should_rebalance and i > 0:
            current_target = target_weights.iloc[i].values
            prev_target = target_weights.iloc[i-1].values
            
            # 应用调整因子进行渐进调整
            weight_diff = current_target - prev_target
            adjusted_target = prev_target + weight_diff * adjust_factor
            
            orders[i, :] = adjusted_target
            rebalance_count += 1
            
            if rebalance_count <= 5:  # 显示前5次再平衡
                print(f"再平衡 {rebalance_count}: {prices.index[i].date()}")
                print(f"  前权重: {prev_target}")
                print(f"  目标权重: {current_target}")
                print(f"  调整后权重: {adjusted_target}")
    
    print(f"\\n总再平衡次数: {rebalance_count}")
    
    # 5. 统计订单矩阵
    non_nan_orders = ~np.isnan(orders).all(axis=1)
    print(f"有效订单日期数: {non_nan_orders.sum()}")
    
    return _prices, orders

def show_order_analysis(orders, prices, rebalance_mask):
    """
    分析订单矩阵
    
    Args:
        orders: numpy.array, 订单矩阵
        prices: DataFrame, 价格数据
        rebalance_mask: pd.Series, 再平衡掩码
    """
    print("\\n=== 订单分析 ===")
    
    # 找到所有有订单的日期
    has_orders = ~np.isnan(orders).all(axis=1)
    order_dates = prices.index[has_orders]
    
    print(f"总订单天数: {len(order_dates)}")
    print(f"订单占比: {len(order_dates)/len(prices)*100:.2f}%")
    
    # 按月统计订单
    order_months = pd.to_datetime(order_dates).to_period('M').value_counts().sort_index()
    print(f"\\n按月订单分布:")
    for month, count in order_months.head().items():
        print(f"  {month}: {count}个订单")
    
    # 检查订单和再平衡的对应关系
    rebalance_dates = prices.index[rebalance_mask]
    orders_match_rebalance = set(order_dates) == set(rebalance_dates)
    print(f"\\n订单日期与再平衡日期匹配: {orders_match_rebalance}")
    
    return order_dates

# 准备回测数据
_prices, orders = prepare_backtest_data(prices, target_weights, final_actual_rebalances, adjust_factor=0.2)

# 分析订单
order_dates = show_order_analysis(orders, prices, final_actual_rebalances)

## 7. 执行VectorBT组合回测

In [None]:
def run_vectorbt_backtest(_prices, orders, initial_cash=100000, fees=0.001):
    """
    使用VectorBT执行组合回测
    
    Args:
        _prices: DataFrame, MultiIndex价格数据
        orders: numpy.array, 订单矩阵
        initial_cash: float, 初始资金
        fees: float, 交易费率
        
    Returns:
        vbt.Portfolio: 回测结果组合对象
    """
    print("=== 开始VectorBT回测 ===")
    print(f"初始资金: {initial_cash:,}")
    print(f"交易费率: {fees*100:.2f}%")
    print(f"订单类型: TargetPercent (目标百分比)")
    
    # 使用vectorbt执行回测
    portfolio = vbt.Portfolio.from_orders(
        close=_prices,                    # 价格数据
        size=orders,                      # 订单大小（权重）
        size_type='TargetPercent',        # 订单类型：目标百分比
        group_by='symbol_group',          # 按组分组
        cash_sharing=True,                # 组内共享现金
        call_seq='auto',                  # 调用序列：自动（先卖后买）
        fees=fees,                        # 交易费用
        init_cash=initial_cash,           # 初始现金
        freq='1D',                        # 频率：日
        min_size=1,                       # 最小订单大小
        size_granularity=1                # 订单粒度
    )
    
    print("回测完成！")
    print(f"回测期间: {portfolio.wrapper.index[0].date()} 至 {portfolio.wrapper.index[-1].date()}")
    print(f"回测天数: {len(portfolio.wrapper.index)}")
    
    return portfolio

# 执行回测
portfolio = run_vectorbt_backtest(_prices, orders, initial_cash=100000, fees=0.001)

# 快速查看基本信息
print("\\n=== 基本回测信息 ===")
print(f"最终组合价值: {portfolio.value().iloc[-1]:,.2f}")
print(f"总收益: {portfolio.total_return()*100:.2f}%")
print(f"年化收益: {portfolio.annualized_return()*100:.2f}%")
print(f"夏普比率: {portfolio.sharpe_ratio():.2f}")
print(f"最大回撤: {portfolio.max_drawdown()*100:.2f}%")

## 8. 策略表现深度分析

In [None]:
def analyze_strategy_performance(portfolio, prices):
    """
    深度分析策略表现
    
    Args:
        portfolio: vbt.Portfolio, 组合对象
        prices: DataFrame, 原始价格数据
        
    Returns:
        dict: 分析结果字典
    """
    print("=== 策略表现深度分析 ===")
    
    # 1. 整体统计数据
    stats = portfolio.stats()
    print("\\n1. 整体统计:")
    print(stats)
    
    # 2. 风险收益指标
    print("\\n2. 关键风险收益指标:")
    total_return = portfolio.total_return() * 100
    annual_return = portfolio.annualized_return() * 100
    volatility = portfolio.annualized_volatility() * 100
    sharpe = portfolio.sharpe_ratio()
    max_dd = portfolio.max_drawdown() * 100
    calmar = portfolio.calmar_ratio()
    
    metrics = {
        '总收益率': f"{total_return:.2f}%",
        '年化收益率': f"{annual_return:.2f}%", 
        '年化波动率': f"{volatility:.2f}%",
        '夏普比率': f"{sharpe:.2f}",
        '最大回撤': f"{max_dd:.2f}%",
        '卡玛比率': f"{calmar:.2f}"
    }
    
    for metric, value in metrics.items():
        print(f"  {metric}: {value}")
    
    # 3. 个股表现分析
    print("\\n3. 个股表现:")
    try:
        individual_returns = portfolio.total_return(group_by=False) * 100
        for i, (asset, ret) in enumerate(individual_returns.items()):
            print(f"  {prices.columns[i]}: {ret:.2f}%")
    except:
        print("  无法获取个股表现数据")
    
    # 4. 交易分析
    print("\\n4. 交易分析:")
    try:
        orders_count = portfolio.orders.count()
        total_trades = orders_count.sum() if hasattr(orders_count, 'sum') else orders_count
        
        fees_paid = portfolio.orders.fees.sum()
        total_fees = fees_paid.sum() if hasattr(fees_paid, 'sum') else fees_paid
        
        print(f"  总交易次数: {total_trades}")
        print(f"  总交易费用: {total_fees:.2f}")
        print(f"  平均每笔费用: {total_fees/max(1, total_trades):.2f}")
        
        # 费用占收益比例
        portfolio_value = portfolio.value()
        total_profit = portfolio_value.iloc[-1] - portfolio_value.iloc[0]
        fee_ratio = total_fees / total_profit * 100 if total_profit > 0 else 0
        print(f"  费用占利润比例: {fee_ratio:.2f}%")
        
    except Exception as e:
        print(f"  交易分析错误: {e}")
    
    # 5. 时间序列分析
    print("\\n5. 时间序列分析:")
    portfolio_value = portfolio.value()
    returns = portfolio_value.pct_change().dropna()
    
    print(f"  日均收益率: {returns.mean()*100:.4f}%")
    print(f"  收益率标准差: {returns.std()*100:.4f}%") 
    print(f"  正收益天数: {(returns > 0).sum()}")
    print(f"  负收益天数: {(returns < 0).sum()}")
    print(f"  胜率: {(returns > 0).mean()*100:.1f}%")
    
    # 6. 与基准比较
    print("\\n6. 与等权重基准比较:")
    benchmark_returns = prices.pct_change().mean(axis=1).fillna(0)
    benchmark_cumret = (1 + benchmark_returns).cumprod()
    benchmark_total_return = (benchmark_cumret.iloc[-1] - 1) * 100
    
    print(f"  基准总收益: {benchmark_total_return:.2f}%")
    print(f"  超额收益: {total_return - benchmark_total_return:.2f}%")
    print(f"  信息比率: {(annual_return - benchmark_total_return*252/len(benchmark_returns))/volatility:.2f}")
    
    return {
        'total_return': total_return,
        'annual_return': annual_return,
        'volatility': volatility,
        'sharpe_ratio': sharpe,
        'max_drawdown': max_dd,
        'benchmark_return': benchmark_total_return,
        'excess_return': total_return - benchmark_total_return
    }

# 执行深度分析
analysis_results = analyze_strategy_performance(portfolio, prices)

## 9. 可视化分析和图表

In [None]:
def create_comprehensive_plots(prices, hw_signals, target_weights, portfolio, rebalance_mask, signals):
    """
    创建综合可视化图表
    
    Args:
        prices: DataFrame, 价格数据
        hw_signals: DataFrame, HW信号
        target_weights: DataFrame, 目标权重
        portfolio: vbt.Portfolio, 组合对象
        rebalance_mask: pd.Series, 再平衡掩码
        signals: pd.Series, 交易信号
    """
    # 创建子图
    fig, axes = plt.subplots(5, 1, figsize=(16, 20))
    
    # 1. 资产价格走势
    axes[0].plot(prices.index, prices.iloc[:, 0], label=prices.columns[0], linewidth=2, color='blue')
    axes[0].plot(prices.index, prices.iloc[:, 1], label=prices.columns[1], linewidth=2, color='orange')
    axes[0].set_title('资产价格走势', fontsize=14, fontweight='bold')
    axes[0].legend()
    axes[0].grid(True, alpha=0.3)
    axes[0].set_ylabel('累计净值')
    
    # 2. HW信号和阈值
    delta_hdp = hw_signals.iloc[:, 0] - hw_signals.iloc[:, 1]
    axes[1].plot(delta_hdp.index, delta_hdp, label='HDP差值', linewidth=2, color='green')
    axes[1].axhline(y=0.6, color='r', linestyle='--', label='上阈值 (0.6)', alpha=0.8)
    axes[1].axhline(y=-0.6, color='r', linestyle='--', label='下阈值 (-0.6)', alpha=0.8)
    axes[1].axhline(y=0, color='k', linestyle='-', alpha=0.3)
    
    # 标记信号变化点
    signal_changes = signals.diff() != 0
    signal_change_dates = signals.index[signal_changes]
    for date in signal_change_dates:
        axes[1].axvline(x=date, color='purple', linestyle=':', alpha=0.6)
    
    axes[1].set_title('Holt-Winters信号差值与交易信号', fontsize=14, fontweight='bold')
    axes[1].legend()
    axes[1].grid(True, alpha=0.3)
    axes[1].set_ylabel('HDP差值')
    
    # 3. 目标权重变化
    axes[2].plot(target_weights.index, target_weights.iloc[:, 0], 
                 label=f'{prices.columns[0]}权重', linewidth=2, color='blue')
    axes[2].plot(target_weights.index, target_weights.iloc[:, 1], 
                 label=f'{prices.columns[1]}权重', linewidth=2, color='orange')
    
    # 标记再平衡日期
    rb_dates = target_weights.index[rebalance_mask]
    for i, rb_date in enumerate(rb_dates):
        if i < 5:  # 只标记前几个，避免图表过乱
            axes[2].axvline(x=rb_date, color='gray', linestyle=':', alpha=0.7)
    
    axes[2].set_title('目标权重分配变化', fontsize=14, fontweight='bold')
    axes[2].set_ylim(0, 1)
    axes[2].legend()
    axes[2].grid(True, alpha=0.3)
    axes[2].set_ylabel('权重')
    
    # 4. 实际权重分配（堆叠面积图）
    try:
        asset_values = portfolio.asset_value(group_by=False)
        total_value = portfolio.value()
        actual_weights = asset_values.div(total_value, axis=0)
        
        axes[3].fill_between(actual_weights.index, 0, actual_weights.iloc[:, 0], 
                            label=prices.columns[0], alpha=0.7, color='blue')
        axes[3].fill_between(actual_weights.index, actual_weights.iloc[:, 0], 1,
                            label=prices.columns[1], alpha=0.7, color='orange')
        
        axes[3].set_title('实际权重分配变化', fontsize=14, fontweight='bold')
        axes[3].set_ylim(0, 1)
        axes[3].legend()
        axes[3].grid(True, alpha=0.3)
        axes[3].set_ylabel('实际权重')
    except:
        axes[3].text(0.5, 0.5, '无法显示实际权重数据', ha='center', va='center', transform=axes[3].transAxes)
        axes[3].set_title('实际权重分配变化（数据不可用）', fontsize=14)
    
    # 5. 累计收益对比
    portfolio_value = portfolio.value()
    portfolio_returns = portfolio_value / portfolio_value.iloc[0]
    
    # 计算等权重基准
    benchmark_returns = prices.pct_change().mean(axis=1).fillna(0)
    benchmark_cumret = (1 + benchmark_returns).cumprod()
    
    # 计算个股表现
    asset1_return = prices.iloc[:, 0] / prices.iloc[0, 0]
    asset2_return = prices.iloc[:, 1] / prices.iloc[0, 1]
    
    axes[4].plot(portfolio_returns.index, portfolio_returns, 
                 label='再平衡策略', linewidth=3, color='red')
    axes[4].plot(benchmark_cumret.index, benchmark_cumret, 
                 label='等权重基准', linewidth=2, color='gray')
    axes[4].plot(asset1_return.index, asset1_return, 
                 label=prices.columns[0], linewidth=1, alpha=0.7, color='blue')
    axes[4].plot(asset2_return.index, asset2_return, 
                 label=prices.columns[1], linewidth=1, alpha=0.7, color='orange')
    
    axes[4].set_title('累计收益对比', fontsize=14, fontweight='bold')
    axes[4].legend()
    axes[4].grid(True, alpha=0.3)
    axes[4].set_ylabel('累计收益')
    axes[4].set_xlabel('日期')
    
    plt.tight_layout()
    plt.show()
    
    # 打印最终收益对比
    print("\\n=== 收益对比总结 ===")
    strategy_return = (portfolio_returns.iloc[-1] - 1) * 100
    benchmark_return = (benchmark_cumret.iloc[-1] - 1) * 100
    asset1_ret = (asset1_return.iloc[-1] - 1) * 100
    asset2_ret = (asset2_return.iloc[-1] - 1) * 100
    
    print(f"再平衡策略: {strategy_return:.2f}%")
    print(f"等权重基准: {benchmark_return:.2f}%") 
    print(f"{prices.columns[0]}单独持有: {asset1_ret:.2f}%")
    print(f"{prices.columns[1]}单独持有: {asset2_ret:.2f}%")
    print(f"策略超额收益: {strategy_return - benchmark_return:.2f}%")

# 创建综合可视化
create_comprehensive_plots(prices, hw_signals, target_weights, portfolio, final_actual_rebalances, signals)

## 10. 策略总结和关键洞察

In [None]:
def summarize_strategy_insights(analysis_results):
    """
    总结策略关键洞察
    
    Args:
        analysis_results: dict, 分析结果
    """
    print("=== 🎯 VectorBT双资产再平衡策略总结 ===")
    print()
    
    # 1. 策略核心机制
    print("📋 **策略核心机制:**")
    print("  1️⃣ Holt-Winters模型预测趋势 → 生成HWDP信号")
    print("  2️⃣ 磁滞回线逻辑 → 避免频繁交易噪音") 
    print("  3️⃣ 动态权重分配 → [0.2,0.8] ↔ [0.8,0.2]")
    print("  4️⃣ 智能再平衡 → 仅在权重变化时执行")
    print("  5️⃣ VectorBT框架 → 专业回测和分析")
    print()
    
    # 2. 关键参数设置
    print("⚙️ **关键参数设置:**")
    print("  • 磁滞阈值: ±0.6 (避免频繁切换)")
    print("  • 权重配置: 上升[0.2,0.8] vs 下降[0.8,0.2]")
    print("  • 调整因子: 0.2 (渐进式权重调整)")
    print("  • 再平衡频率: 月度 (平衡成本与效果)")
    print("  • 交易费用: 0.1% (现实交易成本)")
    print()
    
    # 3. 策略表现亮点
    print("🏆 **策略表现亮点:**")
    print(f"  📈 总收益率: {analysis_results['total_return']:.2f}%")
    print(f"  📊 年化收益: {analysis_results['annual_return']:.2f}%")
    print(f"  ⚡ 夏普比率: {analysis_results['sharpe_ratio']:.2f}")
    print(f"  🛡️ 最大回撤: {analysis_results['max_drawdown']:.2f}%")
    print(f"  🎯 超额收益: {analysis_results['excess_return']:.2f}%")
    print()
    
    # 4. 策略优势
    print("✅ **策略优势:**")
    advantages = [
        "自适应权重分配 - 根据市场趋势动态调整",
        "磁滞机制防噪 - 减少虚假信号造成的过度交易", 
        "专业回测框架 - VectorBT确保结果可靠性",
        "成本控制良好 - 月度再平衡平衡收益与费用",
        "风险管理有效 - 分散投资降低单一资产风险"
    ]
    for i, advantage in enumerate(advantages, 1):
        print(f"  {i}. {advantage}")
    print()
    
    # 5. 潜在改进方向
    print("🔧 **潜在改进方向:**")
    improvements = [
        "参数优化 - 使用遗传算法或贝叶斯优化寻找最优参数组合",
        "多资产扩展 - 支持3个以上资产的组合配置",
        "风险预算 - 基于波动率的动态权重分配",
        "市场状态识别 - 结合VIX等指标判断市场环境", 
        "机器学习增强 - 使用深度学习改进信号预测"
    ]
    for i, improvement in enumerate(improvements, 1):
        print(f"  {i}. {improvement}")
    print()
    
    # 6. 使用建议
    print("💡 **使用建议:**")
    suggestions = [
        "适合中长期投资 - 策略基于趋势预测，短期可能有滞后",
        "定期参数校准 - 市场环境变化时重新优化参数",
        "风险监控 - 设置止损机制防范极端市场情况",
        "资金规模考虑 - 大资金需考虑流动性和冲击成本",
        "税务规划 - 合理安排交易时机优化税后收益"
    ]
    for i, suggestion in enumerate(suggestions, 1):
        print(f"  {i}. {suggestion}")
    print()
    
    print("🎉 **恭喜！您已成功掌握VectorBT双资产再平衡策略的核心技术！**")
    print("💻 可以在此基础上继续探索更多量化投资策略。")

# 生成策略总结
summarize_strategy_insights(analysis_results)

## 🔍 探索完成！

您现在已经完全理解了`ImprovedVectorBTStrategy`的内部工作机制：

### 📚 学习收获

1. **模块化设计** - 每个功能被拆分为独立函数，便于理解和修改
2. **VectorBT框架** - 掌握了专业量化回测工具的使用方法  
3. **信号生成** - 理解Holt-Winters模型在量化策略中的应用
4. **风险控制** - 学会使用磁滞回线避免过度交易
5. **性能评估** - 全面的回测分析和可视化技巧

### 🚀 下一步建议

- 尝试修改参数，观察对策略表现的影响
- 扩展到更多资产的组合管理
- 探索其他技术指标与Holt-Winters的结合
- 研究不同市场环境下的策略适应性

### 💻 代码复用

所有函数都可以独立使用，您可以：
- 单独调用数据加载函数处理其他基金
- 修改权重生成逻辑实现不同策略
- 调整再平衡频率适应不同投资风格
- 增强可视化功能添加更多分析维度

**Happy Coding! 🎉**