# 🤖 外汇强化学习交易系统完整教程

## 📚 课程概述

本教程将带您完成从零到部署的完整流程：

1. 📊 **数据获取** - 从MT5获取历史数据
2. 🏗️ **环境构建** - 创建交易环境
3. 🎓 **模型训练** - 使用PPO算法训练
4. 📈 **结果分析** - 可视化训练效果
5. 📦 **ONNX导出** - 导出为MT5可用格式
6. 🚀 **部署上线** - 部署到MT5平台

---

**作者**: AI Trading System  
**日期**: 2025-10-31  
**版本**: 1.0

## 📦 第一步：环境准备

首先安装必要的依赖包

In [None]:
# 安装必要的包（如果尚未安装）
# !pip install gymnasium stable-baselines3 torch numpy pandas onnx onnxruntime tensorboard matplotlib scikit-learn MetaTrader5

# 导入所有必要的库
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import gymnasium as gym
from gymnasium import spaces
from stable_baselines3 import PPO
from stable_baselines3.common.callbacks import EvalCallback, CheckpointCallback
from stable_baselines3.common.monitor import Monitor
from stable_baselines3.common.vec_env import DummyVecEnv
import torch
import torch.onnx
import onnx
import onnxruntime as ort
from pathlib import Path
from datetime import datetime
import logging
import json

# 配置日志
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)

# 配置matplotlib中文显示
plt.rcParams['font.sans-serif'] = ['SimHei']
plt.rcParams['axes.unicode_minus'] = False

print("✅ 所有库导入成功！")
print(f"PyTorch版本: {torch.__version__}")
print(f"CUDA可用: {torch.cuda.is_available()}")

## 📊 第二步：数据获取与处理

### 2.1 从MT5获取历史数据

这里我们从MT5获取EURUSD的历史数据，并计算所需的技术指标。

In [None]:
import MetaTrader5 as mt5

def initialize_mt5():
    """初始化MT5连接"""
    if not mt5.initialize():
        print("❌ MT5初始化失败")
        return False
    print(f"✅ MT5版本: {mt5.version()}")
    return True

def get_historical_data(symbol="EURUSD", timeframe=mt5.TIMEFRAME_H1, bars=50000):
    """获取历史数据"""
    print(f"📥 获取 {symbol} {timeframe} 数据, {bars} 根K线...")
    
    rates = mt5.copy_rates_from_pos(symbol, timeframe, 0, bars)
    
    if rates is None:
        print(f"❌ 获取数据失败: {mt5.last_error()}")
        return None
    
    df = pd.DataFrame(rates)
    df['time'] = pd.to_datetime(df['time'], unit='s')
    
    print(f"✅ 获取成功: {len(df)} 根K线")
    print(f"📅 时间范围: {df['time'].min()} 到 {df['time'].max()}")
    
    return df

# 尝试连接MT5并获取数据
if initialize_mt5():
    # 获取多个时间周期的数据
    df_h1 = get_historical_data("EURUSD", mt5.TIMEFRAME_H1, 50000)
    df_h4 = get_historical_data("EURUSD", mt5.TIMEFRAME_H4, 20000)
    df_m15 = get_historical_data("EURUSD", mt5.TIMEFRAME_M15, 50000)
    
    mt5.shutdown()
    print("\n✅ 数据获取完成，MT5已关闭")
else:
    print("\n⚠️ 无法连接MT5，将使用已有数据文件")
    # 如果无法连接MT5，加载已保存的数据
    if Path('EURUSD_processed.csv').exists():
        df_m15 = pd.read_csv('EURUSD_processed.csv')
        print(f"✅ 从文件加载数据: {len(df_m15)} 行")

### 2.2 计算技术指标

计算RSI、ATR、MACD等技术指标用于训练

In [None]:
def calculate_rsi(data, period=14):
    """计算RSI指标"""
    delta = data.diff()
    gain = (delta.where(delta > 0, 0)).rolling(window=period).mean()
    loss = (-delta.where(delta < 0, 0)).rolling(window=period).mean()
    rs = gain / loss
    rsi = 100 - (100 / (1 + rs))
    return rsi / 100.0  # 归一化到0-1

def calculate_atr_norm(data, period=14, lookback=100):
    """计算ATR标准化值"""
    high = data['high']
    low = data['low']
    close = data['close']
    
    tr1 = high - low
    tr2 = abs(high - close.shift())
    tr3 = abs(low - close.shift())
    tr = pd.concat([tr1, tr2, tr3], axis=1).max(axis=1)
    atr = tr.rolling(window=period).mean()
    
    atr_mean = atr.rolling(window=lookback).mean()
    atr_std = atr.rolling(window=lookback).std()
    atr_norm = (atr - atr_mean) / (atr_std + 1e-8)
    
    return atr_norm

def calculate_macd_hist_norm(data, lookback=100):
    """计算MACD柱状图标准化值"""
    exp1 = data['close'].ewm(span=12, adjust=False).mean()
    exp2 = data['close'].ewm(span=26, adjust=False).mean()
    macd = exp1 - exp2
    signal = macd.ewm(span=9, adjust=False).mean()
    hist = macd - signal
    
    hist_mean = hist.rolling(window=lookback).mean()
    hist_std = hist.rolling(window=lookback).std()
    hist_norm = (hist - hist_mean) / (hist_std + 1e-8)
    
    return hist_norm

def calculate_trend_indicator(data, lookback=50):
    """计算趋势指标"""
    sma = data['close'].rolling(window=lookback).mean()
    trend = (data['close'] - sma) / sma
    return trend * 100  # 转换为百分比

# 为M15数据计算所有指标
print("🔧 计算技术指标...")

if 'df_m15' in locals() and df_m15 is not None:
    df_m15['M15_RSI'] = calculate_rsi(df_m15['close'], 14)
    df_m15['M15_ATR_norm'] = calculate_atr_norm(df_m15, 14, 100)
    df_m15['M15_MACD_hist_norm'] = calculate_macd_hist_norm(df_m15, 100)
    
    # 对于H1和H4数据，需要重采样到M15
    if 'df_h1' in locals() and df_h1 is not None:
        df_h1['H1_RSI'] = calculate_rsi(df_h1['close'], 14)
        df_h1['H1_Trend'] = calculate_trend_indicator(df_h1, 50)
    
    if 'df_h4' in locals() and df_h4 is not None:
        df_h4['H4_RSI'] = calculate_rsi(df_h4['close'], 14)
        df_h4['H4_Trend'] = calculate_trend_indicator(df_h4, 50)
    
    # 删除NaN值
    df_clean = df_m15.dropna()
    
    print(f"✅ 技术指标计算完成")
    print(f"📊 清洗后数据: {len(df_clean)} 行")
    print(f"\n数据预览:")
    print(df_clean[['close', 'M15_RSI', 'M15_ATR_norm', 'M15_MACD_hist_norm']].head())
    
    # 保存处理后的数据
    df_clean.to_csv('EURUSD_processed.csv', index=False)
    print("\n💾 数据已保存到 EURUSD_processed.csv")

## 🏗️ 第三步：构建交易环境

创建符合Gymnasium标准的交易环境

In [None]:
class ForexTradingEnv(gym.Env):
    """
    外汇交易强化学习环境
    
    状态空间: 10维向量 (7个市场指标 + 3个持仓状态)
    动作空间: 4个离散动作 (Hold, Open Long, Open Short, Close)
    """
    
    metadata = {'render_modes': ['human']}
    
    def __init__(
        self,
        df: pd.DataFrame,
        initial_balance: float = 10000.0,
        leverage: int = 20,
        trade_size_lots: float = 0.1,
        spread_cost_pips: float = 2.0,
        max_position_hold_steps: int = 480,
        max_drawdown_pct: float = 0.50
    ):
        super(ForexTradingEnv, self).__init__()
        
        # 账户参数
        self.initial_balance = initial_balance
        self.leverage = leverage
        self.lot_size_standard = 100000
        self.trade_size_lots = trade_size_lots
        
        # 交易成本
        self.spread_cost_pips = spread_cost_pips
        self.pip_value = 10 * trade_size_lots
        
        # 风险控制
        self.max_position_hold_steps = max_position_hold_steps
        self.max_drawdown_pct = max_drawdown_pct
        
        # 数据处理
        self.df = df.copy()
        self.max_steps = len(df) - 1
        self.warmup_steps = 100
        
        # 验证数据完整性
        required_columns = [
            'close', 'M15_RSI', 'M15_ATR_norm', 'M15_MACD_hist_norm',
            'H1_RSI', 'H1_Trend_Indicator', 'H4_RSI', 'H4_Trend_Indicator'
        ]
        missing_cols = [col for col in required_columns if col not in df.columns]
        if missing_cols:
            raise ValueError(f"DataFrame缺少必需列: {missing_cols}")
        
        # 定义状态空间 (10维)
        self.observation_space = spaces.Box(
            low=np.array([0, -10, -10, 0, -10, 0, -10, 0, -20, 0], dtype=np.float32),
            high=np.array([1, 10, 10, 1, 10, 1, 10, 2, 20, 1], dtype=np.float32),
            shape=(10,),
            dtype=np.float32
        )
        
        # 定义动作空间 (4个离散动作)
        # 0: Hold, 1: Open Long, 2: Open Short, 3: Close
        self.action_space = spaces.Discrete(4)
        
        # 初始化状态变量
        self._reset_state()
        
        print(f"✅ ForexTradingEnv初始化成功")
        print(f"   数据长度: {len(df)}")
        print(f"   初始余额: ${initial_balance}")
        print(f"   状态空间: {self.observation_space.shape}")
        print(f"   动作空间: {self.action_space.n}")
    
    def _reset_state(self):
        """重置环境状态"""
        self.balance = self.initial_balance
        self.equity = self.initial_balance
        self.peak_balance = self.initial_balance
        
        self.position_state = 0
        self.entry_price = 0.0
        self.position_pnl = 0.0
        self.steps_since_trade = 0
        
        self.current_step = self.warmup_steps
        self.total_trades = 0
        self.winning_trades = 0
    
    def _get_observation(self):
        """获取当前状态观测"""
        row = self.df.iloc[self.current_step]
        
        obs = np.array([
            row['M15_RSI'],
            row['M15_ATR_norm'],
            row['M15_MACD_hist_norm'],
            row['H1_RSI'],
            row['H1_Trend_Indicator'],
            row['H4_RSI'],
            row['H4_Trend_Indicator'],
            float(self.position_state),
            self.position_pnl / 100.0,
            min(self.steps_since_trade / self.max_position_hold_steps, 1.0)
        ], dtype=np.float32)
        
        return np.clip(obs, self.observation_space.low, self.observation_space.high)
    
    def _get_info(self):
        """获取额外信息"""
        return {
            'balance': self.balance,
            'equity': self.equity,
            'position': self.position_state,
            'total_trades': self.total_trades,
            'winning_trades': self.winning_trades,
            'profit_pct': (self.balance - self.initial_balance) / self.initial_balance * 100
        }
    
    def reset(self, seed=None, options=None):
        super().reset(seed=seed)
        self._reset_state()
        return self._get_observation(), self._get_info()
    
    def step(self, action):
        """执行一步交易动作"""
        current_price = self.df.iloc[self.current_step]['close']
        reward = 0.0
        
        # 执行动作并计算奖励
        if action == 1 and self.position_state == 0:  # 开多单
            self.position_state = 1
            self.entry_price = current_price
            self.steps_since_trade = 0
            reward = -0.1  # 开仓成本
            self.total_trades += 1
            
        elif action == 2 and self.position_state == 0:  # 开空单
            self.position_state = 2
            self.entry_price = current_price
            self.steps_since_trade = 0
            reward = -0.1
            self.total_trades += 1
            
        elif action == 3 and self.position_state != 0:  # 平仓
            if self.position_state == 1:
                pips = (current_price - self.entry_price) * 10000
            else:
                pips = (self.entry_price - current_price) * 10000
            
            realized_pnl = pips * self.pip_value - self.spread_cost_pips * self.pip_value
            self.balance += realized_pnl
            reward = realized_pnl / 10.0  # 缩放奖励
            
            if realized_pnl > 0:
                self.winning_trades += 1
            
            self.position_state = 0
            self.entry_price = 0.0
            self.position_pnl = 0.0
        
        # 更新持仓盈亏
        if self.position_state != 0:
            if self.position_state == 1:
                pips = (current_price - self.entry_price) * 10000
            else:
                pips = (self.entry_price - current_price) * 10000
            self.position_pnl = pips * self.pip_value
            self.steps_since_trade += 1
        
        # 更新权益
        self.equity = self.balance + self.position_pnl
        self.peak_balance = max(self.peak_balance, self.equity)
        
        # 检查终止条件
        self.current_step += 1
        terminated = False
        truncated = self.current_step >= self.max_steps
        
        # 检查最大回撤
        drawdown = (self.peak_balance - self.equity) / self.peak_balance
        if drawdown > self.max_drawdown_pct:
            terminated = True
            reward = -100
        
        return self._get_observation(), reward, terminated, truncated, self._get_info()

print("✅ ForexTradingEnv 类定义完成")

## 🎓 第四步：模型训练

使用PPO算法训练交易策略

In [None]:
# 训练配置
TOTAL_TIMESTEPS = 500000  # 总训练步数
EVAL_FREQ = 10000
SAVE_FREQ = 50000
N_EVAL_EPISODES = 5

# 环境参数
INITIAL_BALANCE = 10000.0
TRADE_SIZE_LOTS = 0.1

# PPO超参数
LEARNING_RATE = 3e-4
N_STEPS = 2048
BATCH_SIZE = 64
N_EPOCHS = 10

print("🎯 训练配置:")
print(f"   总训练步数: {TOTAL_TIMESTEPS:,}")
print(f"   学习率: {LEARNING_RATE}")
print(f"   批大小: {BATCH_SIZE}")
print(f"   初始余额: ${INITIAL_BALANCE}")

In [None]:
# 加载数据
df = pd.read_csv('EURUSD_processed.csv')
print(f"📊 加载数据: {len(df)} 行")

# 分割训练集和测试集
split_idx = int(len(df) * 0.8)
train_df = df.iloc[:split_idx].reset_index(drop=True)
test_df = df.iloc[split_idx:].reset_index(drop=True)

print(f"训练集: {len(train_df)} 行")
print(f"测试集: {len(test_df)} 行")

# 创建目录
Path('./logs').mkdir(exist_ok=True)
Path('./models').mkdir(exist_ok=True)
Path('./models/checkpoints').mkdir(exist_ok=True)
Path('./models/best_model').mkdir(exist_ok=True)

In [None]:
# 创建训练和评估环境
print("\n🏗️ 创建环境...")

train_env = ForexTradingEnv(train_df, initial_balance=INITIAL_BALANCE, trade_size_lots=TRADE_SIZE_LOTS)
train_env = Monitor(train_env, './logs/train_monitor.csv')
train_env = DummyVecEnv([lambda: train_env])

eval_env = ForexTradingEnv(test_df, initial_balance=INITIAL_BALANCE, trade_size_lots=TRADE_SIZE_LOTS)
eval_env = Monitor(eval_env, './logs/eval_monitor.csv')
eval_env = DummyVecEnv([lambda: eval_env])

print("✅ 环境创建完成")

In [None]:
# 创建PPO模型
print("\n🤖 创建PPO模型...")

device = 'cuda' if torch.cuda.is_available() else 'cpu'
print(f"使用设备: {device}")

model = PPO(
    policy="MlpPolicy",
    env=train_env,
    learning_rate=LEARNING_RATE,
    n_steps=N_STEPS,
    batch_size=BATCH_SIZE,
    n_epochs=N_EPOCHS,
    gamma=0.99,
    gae_lambda=0.95,
    clip_range=0.2,
    ent_coef=0.01,
    vf_coef=0.5,
    max_grad_norm=0.5,
    policy_kwargs=dict(
        net_arch=dict(pi=[256, 256], vf=[256, 256])
    ),
    tensorboard_log='./logs',
    device=device,
    verbose=1
)

print("✅ 模型创建完成")

In [None]:
# 配置回调
eval_callback = EvalCallback(
    eval_env,
    best_model_save_path='./models/best_model',
    log_path='./logs',
    eval_freq=EVAL_FREQ,
    n_eval_episodes=N_EVAL_EPISODES,
    deterministic=True,
    render=False,
    verbose=1
)

checkpoint_callback = CheckpointCallback(
    save_freq=SAVE_FREQ,
    save_path='./models/checkpoints',
    name_prefix='ppo_forex',
    verbose=1
)

print("✅ 回调配置完成")

In [None]:
# 开始训练
print("\n" + "="*60)
print("🚀 开始训练...")
print("="*60)

start_time = datetime.now()

try:
    model.learn(
        total_timesteps=TOTAL_TIMESTEPS,
        callback=[eval_callback, checkpoint_callback],
        progress_bar=True
    )
    
    # 保存最终模型
    model.save('./models/ppo_forex_final')
    print("\n✅ 训练完成！模型已保存")
    
except KeyboardInterrupt:
    print("\n⚠️ 训练被中断")
    model.save('./models/ppo_forex_interrupted')
    print("模型已保存")

end_time = datetime.now()
duration = end_time - start_time

print(f"\n⏱️ 训练时长: {duration}")

# 清理
train_env.close()
eval_env.close()

## 📈 第五步：训练结果分析

可视化训练过程和效果

In [None]:
# 加载训练日志
train_log = pd.read_csv('./logs/train_monitor.csv', skiprows=1)
eval_log = pd.read_csv('./logs/eval_monitor.csv', skiprows=1)

print("📊 训练统计:")
print(f"   训练回合数: {len(train_log)}")
print(f"   平均奖励: {train_log['r'].mean():.2f}")
print(f"   最佳奖励: {train_log['r'].max():.2f}")
print(f"\n📊 测试统计:")
print(f"   测试回合数: {len(eval_log)}")
print(f"   平均奖励: {eval_log['r'].mean():.2f}")
print(f"   最佳奖励: {eval_log['r'].max():.2f}")

In [None]:
# 创建可视化
fig, axes = plt.subplots(2, 2, figsize=(15, 10))

# 训练奖励曲线
ax1 = axes[0, 0]
ax1.plot(train_log['r'], alpha=0.3, label='原始')
ax1.plot(train_log['r'].rolling(5).mean(), linewidth=2, label='移动平均')
ax1.axhline(y=0, color='r', linestyle='--', alpha=0.5)
ax1.set_xlabel('回合')
ax1.set_ylabel('奖励')
ax1.set_title('训练集奖励曲线')
ax1.legend()
ax1.grid(True, alpha=0.3)

# 评估奖励曲线
ax2 = axes[0, 1]
ax2.plot(eval_log['r'], alpha=0.3, label='原始')
ax2.plot(eval_log['r'].rolling(5).mean(), linewidth=2, label='移动平均')
ax2.axhline(y=0, color='r', linestyle='--', alpha=0.5)
ax2.set_xlabel('回合')
ax2.set_ylabel('奖励')
ax2.set_title('测试集奖励曲线')
ax2.legend()
ax2.grid(True, alpha=0.3)

# 奖励分布
ax3 = axes[1, 0]
ax3.hist(train_log['r'], bins=30, alpha=0.5, label='训练集', density=True)
ax3.hist(eval_log['r'], bins=30, alpha=0.5, label='测试集', density=True)
ax3.axvline(x=0, color='r', linestyle='--', alpha=0.5)
ax3.set_xlabel('奖励')
ax3.set_ylabel('频率')
ax3.set_title('奖励分布对比')
ax3.legend()
ax3.grid(True, alpha=0.3)

# 评估进展
ax4 = axes[1, 1]
if Path('./logs/evaluations.npz').exists():
    evals = np.load('./logs/evaluations.npz')
    if 'results' in evals:
        results = evals['results']
        timesteps = evals['timesteps']
        means = [np.mean(r) for r in results]
        stds = [np.std(r) for r in results]
        
        ax4.errorbar(timesteps, means, yerr=stds, marker='o', capsize=5)
        ax4.axhline(y=0, color='r', linestyle='--', alpha=0.5)
        ax4.set_xlabel('训练步数')
        ax4.set_ylabel('平均奖励')
        ax4.set_title('评估性能进展')
        ax4.grid(True, alpha=0.3)
        
        best_idx = np.argmax(means)
        print(f"\n🏆 最佳模型: 第 {timesteps[best_idx]} 步, 平均奖励: {means[best_idx]:.2f}")

plt.tight_layout()
plt.savefig('./logs/training_analysis.png', dpi=150, bbox_inches='tight')
plt.show()

print("\n✅ 分析图表已保存: ./logs/training_analysis.png")

## 📦 第六步：导出ONNX模型

将训练好的模型导出为MT5可用的ONNX格式

In [None]:
class PolicyWrapper(torch.nn.Module):
    """包装PPO策略网络"""
    def __init__(self, policy_net):
        super(PolicyWrapper, self).__init__()
        self.policy_net = policy_net
    
    def forward(self, obs):
        features = self.policy_net.extract_features(obs)
        latent_pi = self.policy_net.mlp_extractor.forward_actor(features)
        action_logits = self.policy_net.action_net(latent_pi)
        action_probs = torch.softmax(action_logits, dim=-1)
        return action_probs

print("✅ PolicyWrapper 定义完成")

In [None]:
# 导出ONNX模型
print("\n📦 导出ONNX模型...")

model_path = './models/best_model/best_model.zip'
onnx_path = './models/forex_policy.onnx'

# 加载最佳模型
model = PPO.load(model_path, device='cpu')
policy = model.policy
policy.eval()
policy.to('cpu')

# 包装策略
wrapped_policy = PolicyWrapper(policy)
wrapped_policy.eval()

# 创建dummy输入
dummy_input = torch.randn(1, 10, dtype=torch.float32)

print(f"输入形状: {dummy_input.shape}")

# 导出ONNX
torch.onnx.export(
    wrapped_policy,
    dummy_input,
    onnx_path,
    input_names=['observation'],
    output_names=['action_probs'],
    dynamic_axes={},
    opset_version=11,
    do_constant_folding=True,
    export_params=True
)

print(f"✅ ONNX模型已导出: {onnx_path}")

In [None]:
# 验证ONNX模型
print("\n🔍 验证ONNX模型...")

# 检查模型
onnx_model = onnx.load(onnx_path)
onnx.checker.check_model(onnx_model)
print("✅ 模型结构有效")

# 测试推理
ort_session = ort.InferenceSession(onnx_path)
test_input = np.random.randn(1, 10).astype(np.float32)
ort_inputs = {ort_session.get_inputs()[0].name: test_input}
ort_outputs = ort_session.run(None, ort_inputs)

print(f"✅ 测试推理成功")
print(f"   输入形状: {test_input.shape}")
print(f"   输出形状: {ort_outputs[0].shape}")
print(f"   输出示例: {ort_outputs[0][0]}")
print(f"   概率和: {ort_outputs[0][0].sum():.4f}")

In [None]:
# 生成模型规格文档
spec = {
    'model_file': 'forex_policy.onnx',
    'input_name': 'observation',
    'input_shape': [1, 10],
    'input_type': 'float32',
    'output_name': 'action_probs',
    'output_shape': [1, 4],
    'output_type': 'float32',
    'actions': {
        '0': 'Hold',
        '1': 'Open Long',
        '2': 'Open Short',
        '3': 'Close'
    },
    'opset_version': 11,
    'created_at': datetime.now().isoformat()
}

with open('./models/forex_policy_spec.json', 'w', encoding='utf-8') as f:
    json.dump(spec, f, indent=2, ensure_ascii=False)

print("\n✅ 模型规格已保存: ./models/forex_policy_spec.json")

In [None]:
# 生成测试用例
print("\n🧪 生成测试用例...")

test_cases = []
for i in range(10):
    test_input = np.random.randn(1, 10).astype(np.float32)
    ort_inputs = {ort_session.get_inputs()[0].name: test_input}
    ort_outputs = ort_session.run(None, ort_inputs)
    
    test_cases.append({
        'case_id': i + 1,
        'input': test_input.tolist(),
        'expected_output': ort_outputs[0].tolist(),
        'expected_action': int(np.argmax(ort_outputs[0]))
    })

with open('./models/forex_policy_test_cases.json', 'w', encoding='utf-8') as f:
    json.dump(test_cases, f, indent=2, ensure_ascii=False)

print(f"✅ 已生成 {len(test_cases)} 个测试用例")
print("   保存位置: ./models/forex_policy_test_cases.json")

## 🚀 第七步：部署到MT5

将ONNX模型部署到MetaTrader 5

In [None]:
import shutil
import os

# 自动部署到MT5
mt5_files_path = os.path.join(
    os.environ['APPDATA'],
    'MetaQuotes',
    'Terminal',
    'BB16F565FAAA6B23A20C26C49416FF05',
    'MQL5',
    'Files'
)

if os.path.exists(mt5_files_path):
    # 复制ONNX模型
    shutil.copy2(onnx_path, mt5_files_path)
    print(f"✅ 模型已复制到: {mt5_files_path}")
    print(f"   文件: forex_policy.onnx")
    
    # 检查文件
    deployed_file = os.path.join(mt5_files_path, 'forex_policy.onnx')
    if os.path.exists(deployed_file):
        size = os.path.getsize(deployed_file) / 1024
        print(f"   大小: {size:.2f} KB")
else:
    print(f"⚠️ MT5目录不存在: {mt5_files_path}")
    print("   请手动复制 forex_policy.onnx 到 MT5/Files/ 目录")

## 📋 部署总结

### ✅ 完成的工作

1. **数据准备** ✅
   - 从MT5获取历史数据
   - 计算技术指标
   - 数据清洗和保存

2. **环境构建** ✅
   - 创建Gymnasium交易环境
   - 定义状态空间和动作空间
   - 实现奖励函数

3. **模型训练** ✅
   - 使用PPO算法训练
   - 保存最佳模型
   - 训练过程监控

4. **结果分析** ✅
   - 可视化训练曲线
   - 分析模型性能
   - 保存分析图表

5. **ONNX导出** ✅
   - 导出ONNX模型
   - 验证模型有效性
   - 生成测试用例

6. **模型部署** ✅
   - 复制到MT5目录
   - 生成部署文档

### 📁 生成的文件

- `EURUSD_processed.csv` - 处理后的数据
- `models/best_model/best_model.zip` - 最佳模型
- `models/forex_policy.onnx` - ONNX模型
- `models/forex_policy_spec.json` - 模型规格
- `models/forex_policy_test_cases.json` - 测试用例
- `logs/training_analysis.png` - 训练分析图

### 🎯 下一步操作

1. 打开 MetaTrader 5
2. 将 `ForexRLTrader` EA 应用到 EURUSD M15 图表
3. 先在策略测试器中回测
4. 然后在模拟账户测试 1-2 周
5. 充分测试后再考虑实盘

### ⚠️ 重要提醒

- ✅ 永远先用模拟账户测试
- ✅ 历史表现不代表未来结果
- ✅ 仅使用您能承受损失的资金
- ✅ 密切监控EA运行状态

---

**教程完成！** 🎉