In [1]:
# 1. 环境设置
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
import torch.optim as optim
import matplotlib.pyplot as plt
from sklearn.preprocessing import MinMaxScaler
from torch.utils.data import Dataset, DataLoader

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

Using device: cpu


In [None]:
pip install torch -i https://pypi.tuna.tsinghua.edu.cn/simple

In [2]:
# 2. 数据预处理（适配你的数据格式）
# 读取Excel文件
raw_data = pd.read_excel('data/002230.SZ_1min_data.xlsx')

# 数据清洗
def clean_data(df):
    # 转换时间格式
    df['trade_time'] = pd.to_datetime(df['trade_time'])
    # 去重
    df = df.drop_duplicates(subset=['trade_time'])
    # 过滤异常值
    df = df[(df['close'] > 0) & (df['vol'] > 0)]
    # 设置时间索引
    df = df.set_index('trade_time').sort_index()
    # 重命名列
    df = df.rename(columns={'vol': 'volume', 'amount': 'amount'})
    return df

cleaned_data = clean_data(raw_data)

# 技术指标计算（增强版）
def add_technical_indicators(df):
    # 移动平均
    df['MA5'] = df['close'].rolling(window=5).mean()
    df['MA10'] = df['close'].rolling(window=10).mean()
    df['MA20'] = df['close'].rolling(window=20).mean()
    
    # 波动率
    df['Volatility'] = df['close'].pct_change().rolling(20).std() * np.sqrt(240)
    
    # RSI
    delta = df['close'].diff()
    gain = (delta.where(delta > 0, 0)).rolling(window=14).mean()
    loss = (-delta.where(delta < 0, 0)).rolling(window=14).mean()
    rs = gain / loss
    df['RSI'] = 100 - (100 / (1 + rs))
    
    # MACD
    exp12 = df['close'].ewm(span=12, adjust=False).mean()
    exp26 = df['close'].ewm(span=26, adjust=False).mean()
    df['MACD'] = exp12 - exp26
    df['Signal'] = df['MACD'].ewm(span=9, adjust=False).mean()
    
    # 成交量变化率
    df['Volume_Change'] = df['volume'].pct_change()
    
    # 价格动量
    df['Momentum'] = df['close'] / df['close'].shift(4) - 1
    
    return df.dropna()

feature_data = add_technical_indicators(cleaned_data)

# 特征选择
selected_features = [
    'open', 'high', 'low', 'close', 'volume',
    'MA5', 'MA10', 'MA20', 'Volatility',
    'RSI', 'MACD', 'Signal', 'Volume_Change', 'Momentum'
]

# 数据标准化
scaler = MinMaxScaler(feature_range=(-1, 1))
scaled_data = scaler.fit_transform(feature_data[selected_features])

# 转换为numpy数组
processed_data = scaled_data.astype(np.float32)

In [5]:
# 3. 修正强化学习环境类（替换之前的定义）
class EnhancedStockTradingEnv:
    def __init__(self, data, initial_balance=100000, transaction_cost=0.0005):
        self.data = data
        self.index = 0
        self.initial_balance = initial_balance
        self.transaction_cost = transaction_cost  # 加入交易成本
        
        # 状态信息
        self.balance = initial_balance  # 修正拼写错误
        self.shares_held = 0
        self.position_value = 0
        self.total_value = [initial_balance]
        self.trades = []
        
        # 状态维度：市场数据 + 持仓信息（3个） + 时间信息（1个）
        self.state_size = data.shape[1] + 4
    
    def reset(self):
        self.index = 0
        self.balance = self.initial_balance
        self.shares_held = 0
        self.position_value = 0
        self.total_value = [self.initial_balance]
        self.trades = []
        return self._next_state()
    
    def _next_state(self):
        # 市场数据
        market_state = self.data[self.index]
        
        # 持仓信息
        position_state = [
            self.shares_held / 1e4,  # 标准化持仓量
            self.balance / self.initial_balance,
            (self.total_value[-1] / self.initial_balance) - 1  # 收益率
        ]
        
        # 时间信息（标准化到0-1）
        time_state = [self.index / len(self.data)]
        
        return np.concatenate([market_state, position_state, time_state])
    
    def step(self, action):
        current_price = self.data[self.index][3]  # close价格的位置
        done = self.index == len(self.data)-1
        reward = 0
        trade_cost = 0
        
        # 动作执行
        if action == 1:  # 买入
            max_buy = self.balance // (current_price * (1 + self.transaction_cost))
            if max_buy > 0:
                self.shares_held += max_buy
                trade_cost = max_buy * current_price * self.transaction_cost
                self.balance -= max_buy * current_price * (1 + self.transaction_cost)
                self.trades.append(('buy', self.index, current_price, max_buy))
                
        elif action == 2:  # 卖出
            if self.shares_held > 0:
                trade_cost = self.shares_held * current_price * self.transaction_cost
                self.balance += self.shares_held * current_price * (1 - self.transaction_cost)
                self.trades.append(('sell', self.index, current_price, self.shares_held))
                self.shares_held = 0
                
        # 更新状态
        self.index += 1
        self.position_value = self.shares_held * current_price
        new_total = self.balance + self.position_value
        self.total_value.append(new_total)
        
        # 计算奖励（考虑夏普比率）
        price_change = current_price / self.data[self.index-1][3] - 1 if self.index > 0 else 0
        portfolio_return = (new_total / self.total_value[-2]) - 1
        reward = portfolio_return - 0.5 * (price_change ** 2)  # 风险调整后收益
        
        # 添加终止条件
        if new_total < self.initial_balance * 0.8:  # 最大回撤20%终止
            done = True
            reward -= 10
            
        return self._next_state(), reward, done, {'current_price': current_price}

In [6]:
# 4. 改进的DQN网络结构
class EnhancedDQN(nn.Module):
    def __init__(self, input_size, output_size):
        super(EnhancedDQN, self).__init__()
        self.feature = nn.Sequential(
            nn.Linear(input_size, 256),
            nn.LayerNorm(256),
            nn.LeakyReLU(),
            nn.Dropout(0.3),
            
            nn.Linear(256, 128),
            nn.LayerNorm(128),
            nn.LeakyReLU(),
            nn.Dropout(0.2),
            
            nn.Linear(128, 64),
            nn.LayerNorm(64),
            nn.LeakyReLU()
        )
        
        self.advantage = nn.Sequential(
            nn.Linear(64, 32),
            nn.LeakyReLU(),
            nn.Linear(32, output_size)
        )
        
        self.value = nn.Sequential(
            nn.Linear(64, 32),
            nn.LeakyReLU(),
            nn.Linear(32, 1)
        )
    
    def forward(self, x):
        x = self.feature(x)
        advantage = self.advantage(x)
        value = self.value(x)
        return value + advantage - advantage.mean(dim=1, keepdim=True)

In [9]:
# 5.1 补充经验回放缓存定义（在步骤5之前运行）
class ReplayBuffer:
    def __init__(self, capacity):
        self.capacity = capacity
        self.buffer = []
        self.position = 0
        
    def push(self, state, action, reward, next_state, done):
        if len(self.buffer) < self.capacity:
            self.buffer.append(None)
        self.buffer[self.position] = (state, action, reward, next_state, done)
        self.position = (self.position + 1) % self.capacity
        
    def sample(self, batch_size):
        indices = np.random.choice(len(self.buffer), batch_size, replace=False)
        states, actions, rewards, next_states, dones = [], [], [], [], []
        for idx in indices:
            s, a, r, ns, d = self.buffer[idx]
            states.append(s)
            actions.append(a)
            rewards.append(r)
            next_states.append(ns)
            dones.append(d)
        return (np.array(states, dtype=np.float32),
                np.array(actions, dtype=np.int64),
                np.array(rewards, dtype=np.float32),
                np.array(next_states, dtype=np.float32),
                np.array(dones, dtype=np.float32))
    
    def __len__(self):
        return len(self.buffer)

# 现在可以继续执行之前的步骤5代码

In [10]:
# 5. 训练参数配置
state_size = processed_data.shape[1] + 4  # 原始特征数 + 新增状态特征
action_size = 3  # 0-持有，1-买入，2-卖出

# 网络参数
policy_net = EnhancedDQN(state_size, action_size).to(device)
target_net = EnhancedDQN(state_size, action_size).to(device)
target_net.load_state_dict(policy_net.state_dict())

# 优化器
optimizer = optim.AdamW(policy_net.parameters(), lr=0.0005, weight_decay=1e-5)
scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer, 'max', factor=0.5, patience=10)

# 经验回放
replay_buffer = ReplayBuffer(50000)

# 训练参数
batch_size = 512
gamma = 0.97
epsilon_start = 1.0
epsilon_end = 0.01
epsilon_decay = 0.997
target_update = 15

In [None]:
# 6. 改进的训练循环
env = EnhancedStockTradingEnv(processed_data, transaction_cost=0.0005)
episodes = 800
best_score = -np.inf
epsilon = epsilon_start

# 训练记录
training_log = {
    'episode': [],
    'total_return': [],
    'max_drawdown': [],
    'sharpe_ratio': [],
    'epsilon': []
}

for episode in range(episodes):
    state = env.reset()
    total_reward = 0
    episode_values = []
    done = False
    
    while not done:
        # 动态epsilon贪婪策略
        if np.random.rand() < epsilon:
            action = np.random.choice([0, 1, 2], p=[0.4, 0.3, 0.3])  # 偏向持有
        else:
            state_tensor = torch.FloatTensor(state).unsqueeze(0).to(device)
            with torch.no_grad():
                q_values = policy_net(state_tensor)
            action = q_values.argmax().item()
        
        next_state, reward, done, info = env.step(action)
        total_reward += reward
        episode_values.append(info.get('current_price', 0))
        
        # 存储经验
        replay_buffer.push(state, action, reward, next_state, done)
        state = next_state
        
        # 经验回放学习
        if len(replay_buffer) > batch_size * 2:
            states, actions, rewards, next_states, dones = replay_buffer.sample(batch_size)
            
            states = torch.FloatTensor(states).to(device)
            next_states = torch.FloatTensor(next_states).to(device)
            actions = torch.LongTensor(actions).to(device)
            rewards = torch.FloatTensor(rewards).to(device)
            dones = torch.FloatTensor(dones).to(device)
            
            # 计算目标Q值（Double DQN）
            with torch.no_grad():
                next_actions = policy_net(next_states).argmax(1)
                next_q = target_net(next_states).gather(1, next_actions.unsqueeze(1))
                target_q = rewards + gamma * next_q.squeeze() * (1 - dones)
            
            # 计算当前Q值
            current_q = policy_net(states).gather(1, actions.unsqueeze(1)).squeeze()
            
            # Huber损失
            loss = nn.SmoothL1Loss()(current_q, target_q)
            
            # 反向传播
            optimizer.zero_grad()
            loss.backward()
            torch.nn.utils.clip_grad_norm_(policy_net.parameters(), 1.0)
            optimizer.step()
    
    # 更新目标网络
    if episode % target_update == 0:
        target_net.load_state_dict(policy_net.state_dict())
    
    # 动态调整学习率
    scheduler.step(total_reward)
    
    # 记录训练指标
    portfolio = env.total_value
    returns = np.diff(portfolio) / portfolio[:-1]
    sharpe = np.mean(returns) / (np.std(returns) + 1e-9) * np.sqrt(240)
    drawdown = (np.max(portfolio) - portfolio[-1]) / np.max(portfolio)
    
    training_log['episode'].append(episode+1)
    training_log['total_return'].append((portfolio[-1]/env.initial_balance-1)*100)
    training_log['max_drawdown'].append(drawdown*100)
    training_log['sharpe_ratio'].append(sharpe)
    training_log['epsilon'].append(epsilon)
    
    # 保存最佳模型
    if total_reward > best_score:
        best_score = total_reward
        torch.save(policy_net.state_dict(), 'best_model.pth')
    
    # 衰减epsilon
    epsilon = max(epsilon_end, epsilon * epsilon_decay)
    
    # 打印训练进度
    if (episode+1) % 50 == 0:
        print(f"Episode {episode+1}/{episodes}")
        print(f"Total Return: {training_log['total_return'][-1]:.2f}%")
        print(f"Sharpe Ratio: {training_log['sharpe_ratio'][-1]:.2f}")
        print(f"Max Drawdown: {training_log['max_drawdown'][-1]:.2f}%")
        print(f"Epsilon: {epsilon:.3f}\n")

# 绘制训练曲线
plt.figure(figsize=(12, 6))
plt.subplot(2,2,1)
plt.plot(training_log['episode'], training_log['total_return'])
plt.title('Total Return (%)')
plt.subplot(2,2,2)
plt.plot(training_log['episode'], training_log['sharpe_ratio'])
plt.title('Sharpe Ratio')
plt.subplot(2,2,3)
plt.plot(training_log['episode'], training_log['max_drawdown'])
plt.title('Max Drawdown (%)')
plt.subplot(2,2,4)
plt.plot(training_log['episode'], training_log['epsilon'])
plt.title('Epsilon Decay')
plt.tight_layout()
plt.show()

In [None]:
# 7. 模型测试与交易分析
def enhanced_test_model(env, model_path, scaler, features):
    # 加载最佳模型
    model = EnhancedDQN(state_size, action_size).to(device)
    model.load_state_dict(torch.load(model_path))
    model.eval()
    
    # 运行测试
    state = env.reset()
    done = False
    trade_records = []
    portfolio_values = []
    
    while not done:
        state_tensor = torch.FloatTensor(state).unsqueeze(0).to(device)
        with torch.no_grad():
            q_values = model(state_tensor)
            action = q_values.argmax().item()
        
        next_state, reward, done, info = env.step(action)
        portfolio_values.append(env.total_value[-1])
        
        if action != 0:  # 记录交易
            trade_records.append({
                'time': env.data.index[env.index-1],
                'action': 'Buy' if action == 1 else 'Sell',
                'price': info.get('current_price', 0),
                'shares': env.trades[-1][3] if env.trades else 0,
                'portfolio_value': portfolio_values[-1]
            })
        
        state = next_state
    
    # 可视化结果
    plt.figure(figsize=(14, 8))
    
    # 价格曲线
    plt.subplot(3,1,1)
    plt.plot(feature_data.index[:len(portfolio_values)], 
             scaler.inverse_transform(processed_data[:len(portfolio_values)])[:, 3], 
             label='Price')
    plt.title('Price Movement')
    plt.legend()
    
    # 组合价值
    plt.subplot(3,1,2)
    plt.plot(feature_data.index[:len(portfolio_values)], portfolio_values, label='Portfolio')
    plt.title(f'Portfolio Value (Final: {portfolio_values[-1]:.2f})')
    plt.legend()
    
    # 交易信号
    plt.subplot(3,1,3)
    buy_times = [t['time'] for t in trade_records if t['action'] == 'Buy']
    sell_times = [t['time'] for t in trade_records if t['action'] == 'Sell']
    plt.scatter(buy_times, [scaler.inverse_transform(processed_data[i])[3] 
               for i in range(len(buy_times))], color='g', label='Buy')
    plt.scatter(sell_times, [scaler.inverse_transform(processed_data[i])[3] 
                for i in range(len(sell_times))], color='r', label='Sell')
    plt.title('Trade Signals')
    plt.legend()
    
    plt.tight_layout()
    plt.show()
    
    # 生成交易报告
    initial_value = env.initial_balance
    final_value = portfolio_values[-1]
    total_return = (final_value / initial_value - 1) * 100
    num_trades = len(trade_records)
    win_rate = len([t for t in trade_records if t['portfolio_value'] > initial_value]) / num_trades if num_trades >0 else 0
    
    print(f"\n{' Backtest Report ':=^40}")
    print(f"初始资金: {initial_value:.2f}")
    print(f"最终资金: {final_value:.2f}")
    print(f"总收益率: {total_return:.2f}%")
    print(f"交易次数: {num_trades}")
    print(f"胜率: {win_rate*100:.2f}%")
    print(f"最大回撤: {training_log['max_drawdown'][-1]:.2f}%")
    print(f"夏普比率: {training_log['sharpe_ratio'][-1]:.2f}")
    
    return trade_records

# 运行测试
test_env = EnhancedStockTradingEnv(processed_data)
trade_log = enhanced_test_model(test_env, 'best_model.pth', scaler, selected_features)