In [1]:
from vnpy.trader.object import TickData, OrderData, TradeData
from vnpy.trader.constant import Direction, Offset, Exchange
from vnpy_optionmaster.base import OptionData, UnderlyingData, ChainData, PortfolioData
from vnpy_optionmaster.straddle_strategy import StraddleStrategy, StraddleManager

In [3]:
import sys
import os
from datetime import datetime, timedelta
from typing import Dict, List, Optional
import numpy as np
import pandas as pd

# 创建模拟的算法引擎和组合管理器
class MockOptionAlgoEngine:
    """模拟期权算法引擎"""
    def __init__(self):
        self.orders = {}
        self.trades = {}
        self.order_count = 0
        self.trade_count = 0
        
    def send_order(self, strategy, vt_symbol: str, direction: Direction, 
                   offset: Offset, price: float, volume: int) -> str:
        self.order_count += 1
        vt_orderid = f"order_{self.order_count}"
        
        order = OrderData(
            symbol=vt_symbol.split('.')[0],
            exchange=Exchange.CFFEX,
            orderid=str(self.order_count),
            direction=direction,
            offset=offset,
            price=price,
            volume=volume,
            gateway_name="mock"
        )
        order.vt_symbol = vt_symbol
        
        self.orders[vt_orderid] = order
        print(f"📋 委托: {vt_symbol} {direction.value} {offset.value} {volume}@{price:.4f}")
        
        # 模拟成交
        self.simulate_trade(strategy, order, vt_orderid)
        return vt_orderid
    
    def simulate_trade(self, strategy, order: OrderData, vt_orderid: str):
        """模拟成交"""
        self.trade_count += 1
        trade = TradeData(
            symbol=order.symbol,
            exchange=order.exchange,
            orderid=order.orderid,
            tradeid=f"trade_{self.trade_count}",
            direction=order.direction,
            offset=order.offset,
            price=order.price,
            volume=order.volume,
            datetime=datetime.now(),
            gateway_name=order.gateway_name
        )
        trade.vt_symbol = order.vt_symbol
        trade.size = 10000  # 合约乘数
        
        self.trades[vt_orderid] = trade
        strategy.on_trade(trade)
        print(f"✅ 成交: {trade.vt_symbol} {trade.direction.value} {trade.volume}@{trade.price:.4f}")
    
    def write_log(self, msg: str):
        print(f"🔧 Engine: {msg}")

print("✅ 算法引擎类定义完成")

✅ 算法引擎类定义完成


In [4]:
# 创建增强的组合数据管理器
class EnhancedPortfolioData(PortfolioData):
    """增强的组合数据类"""
    
    def __init__(self):
        super().__init__()
        self.straddle_strategies: Dict[str, StraddleStrategy] = {}
        self.total_premium_received = 0.0
        self.total_pnl = 0.0
        self.risk_metrics = {}
        
    def add_straddle_strategy(self, strategy: StraddleStrategy):
        """添加跨式策略到组合"""
        self.straddle_strategies[strategy.strategy_name] = strategy
        print(f"📈 添加跨式策略: {strategy.strategy_name}")
    
    def remove_straddle_strategy(self, strategy_name: str):
        """从组合中移除策略"""
        if strategy_name in self.straddle_strategies:
            del self.straddle_strategies[strategy_name]
            print(f"📉 移除跨式策略: {strategy_name}")
    
    def calculate_portfolio_metrics(self):
        """计算组合风险指标"""
        total_delta = 0.0
        total_gamma = 0.0
        total_theta = 0.0
        total_vega = 0.0
        total_pnl = 0.0
        total_premium = 0.0
        
        for strategy in self.straddle_strategies.values():
            if strategy.call_option and strategy.put_option:
                # 汇总希腊字母
                total_delta += strategy.call_option.pos_delta + strategy.put_option.pos_delta
                total_gamma += strategy.call_option.pos_gamma + strategy.put_option.pos_gamma
                total_vega += strategy.call_option.pos_vega + strategy.put_option.pos_vega
                
                # 汇总盈亏
                total_pnl += strategy.calculate_pnl()
                total_premium += strategy.total_premium
        
        self.risk_metrics = {
            'total_delta': total_delta,
            'total_gamma': total_gamma,
            'total_theta': total_theta,
            'total_vega': total_vega,
            'total_pnl': total_pnl,
            'total_premium': total_premium,
            'portfolio_count': len(self.straddle_strategies)
        }
        
        return self.risk_metrics
    
    def get_portfolio_summary(self):
        """获取组合摘要"""
        metrics = self.calculate_portfolio_metrics()
        
        summary = {
            'strategies_count': len(self.straddle_strategies),
            'active_strategies': sum(1 for s in self.straddle_strategies.values() if s.active),
            'total_premium_received': metrics['total_premium'],
            'total_pnl': metrics['total_pnl'],
            'net_delta': metrics['total_delta'],
            'net_gamma': metrics['total_gamma'],
            'net_vega': metrics['total_vega'],
            'return_rate': (metrics['total_pnl'] / metrics['total_premium'] * 100) if metrics['total_premium'] > 0 else 0
        }
        
        return summary

print("✅ 增强组合数据类定义完成")

✅ 增强组合数据类定义完成


In [5]:
# 创建模拟市场数据生成器
class MarketDataSimulator:
    """市场数据模拟器"""
    
    def __init__(self, underlying_price: float = 3.000):
        self.underlying_price = underlying_price
        self.volatility = 0.25  # 25% 年化波动率
        self.time_step = 0
        
    def create_option_tick(self, symbol: str, strike: float, is_call: bool, 
                          days_to_expiry: int, volatility_shift: float = 0) -> TickData:
        """创建期权行情数据"""
        # 简化的BS定价模型
        S = self.underlying_price
        K = strike
        T = days_to_expiry / 365.0
        r = 0.03  # 无风险利率
        sigma = self.volatility + volatility_shift
        
        # 计算期权理论价格（简化版本）
        if T <= 0:
            if is_call:
                theo_price = max(S - K, 0)
            else:
                theo_price = max(K - S, 0)
        else:
            # 简化的期权定价
            moneyness = S / K
            time_value = np.sqrt(T) * sigma * S * 0.4  # 简化时间价值
            
            if is_call:
                intrinsic = max(S - K, 0)
                theo_price = intrinsic + time_value * (1 if moneyness > 1 else moneyness)
            else:
                intrinsic = max(K - S, 0)
                theo_price = intrinsic + time_value * (1 if moneyness < 1 else 1/moneyness)
        
        # 添加买卖价差
        spread = theo_price * 0.02  # 2% 价差
        bid_price = max(theo_price - spread/2, 0.001)
        ask_price = theo_price + spread/2
        
        tick = TickData(
            symbol=symbol.split('.')[0],
            exchange=Exchange.CFFEX,
            datetime=datetime.now(),
            last_price=theo_price,
            bid_price_1=bid_price,
            ask_price_1=ask_price,
            bid_volume_1=10,
            ask_volume_1=10,
            gateway_name="simulator"
        )
        tick.vt_symbol = symbol
        
        return tick
    
    def update_market(self, price_change_pct: float = 0, vol_change: float = 0):
        """更新市场状态"""
        self.underlying_price *= (1 + price_change_pct)
        self.volatility += vol_change
        self.time_step += 1
        
        print(f"📊 市场更新: 标的价格={self.underlying_price:.4f}, 波动率={self.volatility:.2%}")

market_sim = MarketDataSimulator()
print("✅ 市场数据模拟器创建完成")

✅ 市场数据模拟器创建完成


In [6]:
# 构建完整的跨式期权策略系统
def build_straddle_system():
    """构建跨式期权策略系统"""
    
    print("🚀 开始构建跨式期权策略系统")
    
    # 1. 创建算法引擎
    algo_engine = MockOptionAlgoEngine()
    
    # 2. 创建增强组合管理器
    portfolio = EnhancedPortfolioData()
    
    # 3. 创建策略管理器
    strategy_manager = StraddleManager(algo_engine)
    
    # 4. 创建标的数据
    underlying = UnderlyingData()
    underlying.vt_symbol = "510050.SSE"
    underlying.price = market_sim.underlying_price
    underlying.theo_delta = 1.0
    
    # 5. 定义多个执行价格的跨式策略
    strike_prices = [2.950, 3.000, 3.050]  # ATM, ITM, OTM
    strategies_data = []
    
    for i, strike in enumerate(strike_prices):
        print(f"\n📋 创建执行价格 {strike} 的跨式策略")
        
        # 创建看涨期权
        call_option = OptionData()
        call_option.vt_symbol = f"C{int(strike*1000)}.CFFEX"
        call_option.strike_price = strike
        call_option.days_to_expiry = 30
        call_option.net_pos = 0
        call_option.pos_delta = 0
        call_option.pos_gamma = 0
        call_option.pos_vega = 0
        call_option.size = 10000
        call_option.tick = market_sim.create_option_tick(
            call_option.vt_symbol, strike, True, 30
        )
        
        # 创建看跌期权
        put_option = OptionData()
        put_option.vt_symbol = f"P{int(strike*1000)}.CFFEX"
        put_option.strike_price = strike
        put_option.days_to_expiry = 30
        put_option.net_pos = 0
        put_option.pos_delta = 0
        put_option.pos_gamma = 0
        put_option.pos_vega = 0
        put_option.size = 10000
        put_option.tick = market_sim.create_option_tick(
            put_option.vt_symbol, strike, False, 30
        )
        
        # 创建期权链
        chain = ChainData()
        chain.underlying = underlying
        chain.calls = {str(int(strike*1000)): call_option}
        chain.puts = {str(int(strike*1000)): put_option}
        
        # 创建策略
        strategy_name = f"Straddle_{int(strike*1000)}"
        strategy = strategy_manager.create_strategy(
            strategy_name, chain, str(int(strike*1000))
        )
        
        # 添加到组合
        portfolio.add_straddle_strategy(strategy)
        
        strategies_data.append({
            'strategy': strategy,
            'call': call_option,
            'put': put_option,
            'strike': strike
        })
        
        print(f"  看涨期权: {call_option.vt_symbol}, 价格: {call_option.tick.last_price:.4f}")
        print(f"  看跌期权: {put_option.vt_symbol}, 价格: {put_option.tick.last_price:.4f}")
    
    return {
        'algo_engine': algo_engine,
        'portfolio': portfolio,
        'strategy_manager': strategy_manager,
        'strategies_data': strategies_data,
        'underlying': underlying
    }

# 构建系统
system = build_straddle_system()
print("\n✅ 跨式期权策略系统构建完成")

🚀 开始构建跨式期权策略系统


TypeError: PortfolioData.__init__() missing 2 required positional arguments: 'name' and 'event_engine'

In [None]:
# 启动跨式策略组合
def start_straddle_portfolio(system):
    """启动跨式策略组合"""
    
    print("\n🎯 启动跨式策略组合")
    
    portfolio = system['portfolio']
    
    # 策略参数配置
    base_params = {
        "target_volume": 1,           # 每个策略1手
        "max_loss": 3000.0,          # 单策略最大亏损
        "profit_target": 1000.0,     # 单策略盈利目标
        "days_to_close": 7,          # 距离到期7天平仓
        "delta_limit": 50.0,         # Delta限制
        "gamma_limit": 30.0,         # Gamma限制
        "vega_limit": 80.0,          # Vega限制
        "hedge_active": True,        # 启用对冲
        "hedge_delta_threshold": 30.0 # Delta对冲阈值
    }
    
    # 启动所有策略
    active_count = 0
    for strategy_name, strategy in portfolio.straddle_strategies.items():
        print(f"\n📈 启动策略: {strategy_name}")
        
        # 根据策略类型调整参数
        params = base_params.copy()
        if "3000" in strategy_name:  # ATM策略
            params["target_volume"] = 2  # ATM策略加大仓位
        elif "2950" in strategy_name:  # ITM策略
            params["profit_target"] = 800.0  # 降低盈利目标
        elif "3050" in strategy_name:  # OTM策略
            params["profit_target"] = 1200.0  # 提高盈利目标
        
        success = strategy.start_strategy(params)
        if success:
            active_count += 1
            print(f"  ✅ 策略启动成功")
        else:
            print(f"  ❌ 策略启动失败")
    
    print(f"\n📊 组合状态: {active_count}/{len(portfolio.straddle_strategies)} 个策略已启动")
    
    # 显示组合摘要
    summary = portfolio.get_portfolio_summary()
    print(f"\n📋 组合摘要:")
    for key, value in summary.items():
        if isinstance(value, float):
            print(f"  {key}: {value:.2f}")
        else:
            print(f"  {key}: {value}")
    
    return active_count

# 启动组合
active_strategies = start_straddle_portfolio(system)

In [None]:
# 市场场景模拟和策略表现分析
def simulate_market_scenarios(system):
    """模拟不同市场场景"""
    
    print("\n🎭 市场场景模拟")
    
    portfolio = system['portfolio']
    strategies_data = system['strategies_data']
    
    scenarios = [
        {"name": "平稳市场", "price_change": 0.00, "vol_change": -0.05, "days_passed": 5},
        {"name": "小幅上涨", "price_change": 0.02, "vol_change": 0.00, "days_passed": 10},
        {"name": "大幅下跌", "price_change": -0.08, "vol_change": 0.10, "days_passed": 15},
        {"name": "高波动", "price_change": 0.00, "vol_change": 0.15, "days_passed": 20},
        {"name": "临近到期", "price_change": 0.01, "vol_change": -0.10, "days_passed": 25}
    ]
    
    results = []
    
    for scenario in scenarios:
        print(f"\n📊 场景: {scenario['name']}")
        print(f"   价格变化: {scenario['price_change']:+.1%}")
        print(f"   波动率变化: {scenario['vol_change']:+.1%}")
        print(f"   经过天数: {scenario['days_passed']}")
        
        # 更新市场状态
        market_sim.update_market(
            scenario['price_change'],
            scenario['vol_change']
        )
        
        # 更新所有期权的行情和时间
        for strategy_data in strategies_data:
            strike = strategy_data['strike']
            days_remaining = max(30 - scenario['days_passed'], 1)
            
            # 更新看涨期权
            call = strategy_data['call']
            call.days_to_expiry = days_remaining
            call.tick = market_sim.create_option_tick(
                call.vt_symbol, strike, True, days_remaining
            )
            
            # 更新看跌期权
            put = strategy_data['put']
            put.days_to_expiry = days_remaining
            put.tick = market_sim.create_option_tick(
                put.vt_symbol, strike, False, days_remaining
            )
            
            # 模拟持仓变化
            if call.net_pos == 0:  # 首次设置持仓
                call.net_pos = -strategy_data['strategy'].target_volume
                put.net_pos = -strategy_data['strategy'].target_volume
            
            # 触发策略更新
            strategy_data['strategy'].on_tick(call.tick)
            strategy_data['strategy'].on_tick(put.tick)
        
        # 计算组合表现
        metrics = portfolio.calculate_portfolio_metrics()
        
        scenario_result = {
            'scenario': scenario['name'],
            'underlying_price': market_sim.underlying_price,
            'total_pnl': metrics['total_pnl'],
            'total_premium': metrics['total_premium'],
            'net_delta': metrics['total_delta'],
            'net_gamma': metrics['total_gamma'],
            'net_vega': metrics['total_vega'],
            'return_rate': (metrics['total_pnl'] / metrics['total_premium'] * 100) if metrics['total_premium'] > 0 else 0
        }
        
        results.append(scenario_result)
        
        print(f"   标的价格: {scenario_result['underlying_price']:.4f}")
        print(f"   组合盈亏: {scenario_result['total_pnl']:.2f}")
        print(f"   收益率: {scenario_result['return_rate']:.2f}%")
        print(f"   净Delta: {scenario_result['net_delta']:.2f}")
    
    return results

# 运行场景模拟
scenario_results = simulate_market_scenarios(system)

In [None]:
# 策略表现分析和风险报告
def generate_performance_report(system, scenario_results):
    """生成策略表现报告"""
    
    print("\n📈 策略表现分析报告")
    print("=" * 60)
    
    portfolio = system['portfolio']
    
    # 1. 组合整体表现
    print("\n📊 组合整体表现:")
    final_metrics = portfolio.calculate_portfolio_metrics()
    
    print(f"策略数量: {final_metrics['portfolio_count']}")
    print(f"总权利金收入: {final_metrics['total_premium']:.2f}")
    print(f"当前总盈亏: {final_metrics['total_pnl']:.2f}")
    print(f"整体收益率: {(final_metrics['total_pnl']/final_metrics['total_premium']*100):.2f}%")
    
    # 2. 风险指标
    print(f"\n⚖️ 风险指标:")
    print(f"净Delta敞口: {final_metrics['total_delta']:.2f}")
    print(f"净Gamma敞口: {final_metrics['total_gamma']:.2f}")
    print(f"净Vega敞口: {final_metrics['total_vega']:.2f}")
    
    # 3. 单策略表现
    print(f"\n📋 各策略表现:")
    for strategy_name, strategy in portfolio.straddle_strategies.items():
        status = strategy.get_status()
        print(f"\n策略: {strategy_name}")
        print(f"  执行价格: {status['strike_index']}")
        print(f"  权利金收入: {status['total_premium']:.2f}")
        print(f"  当前盈亏: {status['current_pnl']:.2f}")
        if status['total_premium'] > 0:
            return_rate = (status['current_pnl'] / status['total_premium']) * 100
            print(f"  收益率: {return_rate:.2f}%")
        print(f"  距离到期: {status['days_to_expiry']} 天")
    
    # 4. 场景分析摘要
    print(f"\n🎭 场景分析摘要:")
    best_scenario = max(scenario_results, key=lambda x: x['return_rate'])
    worst_scenario = min(scenario_results, key=lambda x: x['return_rate'])
    
    print(f"最佳场景: {best_scenario['scenario']} (收益率: {best_scenario['return_rate']:.2f}%)")
    print(f"最差场景: {worst_scenario['scenario']} (收益率: {worst_scenario['return_rate']:.2f}%)")
    
    # 5. 风险提示
    print(f"\n⚠️ 风险提示:")
    risk_warnings = []
    
    if abs(final_metrics['total_delta']) > 100:
        risk_warnings.append(f"Delta敞口过大: {final_metrics['total_delta']:.2f}")
    
    if final_metrics['total_pnl'] < -5000:
        risk_warnings.append(f"组合亏损较大: {final_metrics['total_pnl']:.2f}")
    
    min_expiry = min([s.call_option.days_to_expiry for s in portfolio.straddle_strategies.values() 
                     if s.call_option])
    if min_expiry <= 5:
        risk_warnings.append(f"部分期权即将到期: {min_expiry} 天")
    
    if risk_warnings:
        for warning in risk_warnings:
            print(f"  ⚠️  {warning}")
    else:
        print("  ✅ 当前风险水平可控")
    
    # 6. 操作建议
    print(f"\n💡 操作建议:")
    suggestions = []
    
    if final_metrics['total_pnl'] > 2000:
        suggestions.append("考虑部分获利了结")
    
    if abs(final_metrics['total_delta']) > 50:
        suggestions.append("考虑进行Delta对冲")
    
    if final_metrics['total_pnl'] < -3000:
        suggestions.append("考虑止损或调整策略")
    
    if min_expiry <= 7:
        suggestions.append("准备到期平仓操作")
    
    if suggestions:
        for suggestion in suggestions:
            print(f"  💡 {suggestion}")
    else:
        print("  ✅ 继续持有当前策略")

# 生成报告
generate_performance_report(system, scenario_results)

In [None]:
# 创建策略监控和管理界面
def create_strategy_dashboard(system):
    """创建策略监控面板"""
    
    print("\n🖥️ 策略监控面板")
    print("=" * 80)
    
    portfolio = system['portfolio']
    
    # 实时监控函数
    def monitor_strategies():
        """实时监控策略状态"""
        
        current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
        print(f"\n⏰ 监控时间: {current_time}")
        
        # 组合概况
        summary = portfolio.get_portfolio_summary()
        print(f"\n📊 组合概况:")
        print(f"  总策略数: {summary['strategies_count']}")
        print(f"  活跃策略: {summary['active_strategies']}")
        print(f"  总收益率: {summary['return_rate']:.2f}%")
        print(f"  净Delta: {summary['net_delta']:.2f}")
        
        # 策略明细
        print(f"\n📋 策略明细:")
        print("-" * 80)
        print(f"{'策略名称':<15} {'状态':<6} {'盈亏':<10} {'收益率':<8} {'Delta':<8} {'到期':<6}")
        print("-" * 80)
        
        for strategy_name, strategy in portfolio.straddle_strategies.items():
            status = strategy.get_status()
            
            active_status = "✅" if status['active'] else "❌"
            pnl = status['current_pnl']
            
            if status['total_premium'] > 0:
                return_rate = (pnl / status['total_premium']) * 100
            else:
                return_rate = 0.0
            
            total_delta = 0
            if strategy.call_option and strategy.put_option:
                total_delta = strategy.call_option.pos_delta + strategy.put_option.pos_delta
            
            print(f"{strategy_name:<15} {active_status:<6} {pnl:<10.2f} {return_rate:<8.2f}% {total_delta:<8.2f} {status['days_to_expiry']:<6}")
        
        print("-" * 80)
        
        # 风险警报
        alerts = []
        metrics = portfolio.calculate_portfolio_metrics()
        
        if abs(metrics['total_delta']) > 100:
            alerts.append(f"🚨 Delta敞口警告: {metrics['total_delta']:.2f}")
        
        if metrics['total_pnl'] < -5000:
            alerts.append(f"🚨 亏损警告: {metrics['total_pnl']:.2f}")
        
        min_expiry = min([s.get_status()['days_to_expiry'] for s in portfolio.straddle_strategies.values()])
        if min_expiry <= 5:
            alerts.append(f"🚨 到期警告: {min_expiry} 天")
        
        if alerts:
            print(f"\n🚨 风险警报:")
            for alert in alerts:
                print(f"  {alert}")
        else:
            print(f"\n✅ 无风险警报")
    
    # 策略控制函数
    def strategy_controls():
        """策略控制面板"""
        
        print(f"\n🎛️ 策略控制:")
        print(f"  1. 全部停止策略")
        print(f"  2. 全部平仓")
        print(f"  3. Delta对冲")
        print(f"  4. 风险检查")
        
        # 模拟控制操作
        control_actions = {
            "stop_all": "停止所有策略",
            "close_all": "平仓所有策略", 
            "hedge_delta": "执行Delta对冲",
            "risk_check": "执行风险检查"
        }
        
        for action, description in control_actions.items():
            print(f"  📋 可执行操作: {description}")
    
    # 执行监控
    monitor_strategies()
    strategy_controls()
    
    return {
        'monitor': monitor_strategies,
        'controls': strategy_controls
    }

# 创建监控面板
dashboard = create_strategy_dashboard(system)

print(f"\n🎯 跨式期权策略系统完整演示完成!")
print(f"✅ 系统包含:")
print(f"  📈 {len(system['portfolio'].straddle_strategies)} 个跨式策略")
print(f"  🏛️ 1 个增强组合管理器") 
print(f"  🤖 1 个算法交易引擎")
print(f"  📊 完整的风险监控系统")
print(f"  🎛️ 实时监控和控制面板")