### Demo_DQN

In [1]:
import gym
import pygame
import time

import random
import numpy as np
from collections import deque

import torch
import torch.nn as nn
import torch.optim as optim

import matplotlib.pyplot as plt
import pandas as pd



In [2]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("使用设备：", device)

使用设备： cuda


In [3]:
# QNetWork
class QNetwork(nn.Module):
    def __init__(self, state_dim, action_dim):
        super(QNetwork, self).__init__()
        self.model = nn.Sequential(
            nn.Linear(state_dim,256),  # 隐藏层1 输入state
            nn.ReLU(),
            nn.Linear(256, 256),       
            nn.ReLU(),
            nn.Linear(256, 256),       
            nn.ReLU(),
            nn.Linear(256, 256),       
            nn.ReLU(),
            nn.Linear(256, action_dim)  # 输出层：每个动作的Q值
        )

    def forward(self, x):
        return self.model(x)

In [4]:
class ReplayBuffer:
    def __init__(self, capacity):
        self.buffer = deque(maxlen=capacity)

    # one record: (s, a, r, s')
    def add(self, state, action, reward, next_state, done):
        self.buffer.append((state, action, reward, next_state, done))

    def sample(self, batch_size):
        batch = random.sample(self.buffer, batch_size)
        states, actions, rewards, next_states, dones = zip(*batch)

        states      = torch.from_numpy(np.array(states)).float()
        actions     = torch.tensor(actions).long()
        rewards     = torch.tensor(rewards).float()
        next_states = torch.from_numpy(np.array(next_states)).float()
        dones       = torch.tensor(dones).float()

        return states, actions, rewards, next_states, dones


    def __len__(self):
        return len(self.buffer)


In [5]:
class DQNAgent:
    def __init__(self, 
                 state_dim, action_dim, 
                 gamma=0.99, lr=1e-3, tau=1e-2, epsilon=1.0, epsilon_decay=0.995, epsilon_min=5e-3, 
                 use_cuda=True):
        '''
        gamma=0.99 未来回报估值
        lr=1e-3 learning rate
        tau=1e-2 软更新率
        epsilon=1.0, epsilon_decay=0.995, epsilon_min=0.01 贪心率
        '''
        self.device = torch.device("cuda" if use_cuda and torch.cuda.is_available() else "cpu")
        print("Agent使用设备：", self.device)

        self.q_net = QNetwork(state_dim, action_dim).to(self.device)
        self.target_q_net = QNetwork(state_dim, action_dim).to(self.device)
        self.target_q_net.load_state_dict(self.q_net.state_dict())

        self.optimizer = optim.Adam(self.q_net.parameters(), lr=lr)

        self.gamma = gamma
        self.tau = tau
        self.epsilon = epsilon
        self.epsilon_decay = epsilon_decay
        self.epsilon_min = epsilon_min
        self.action_dim = action_dim

    def select_action(self, state):
        if random.random() < self.epsilon:
            return random.randint(0, self.action_dim - 1)
        else:
            state = torch.FloatTensor(state).unsqueeze(0).to(self.device)
            with torch.no_grad():
                q_values = self.q_net(state)
            return q_values.argmax().item()

    def train(self, replay_buffer, batch_size):
        if len(replay_buffer) < batch_size:
            return

        states, actions, rewards, next_states, dones = replay_buffer.sample(batch_size)

        # 转为 GPU tensor
        states = torch.FloatTensor(states).to(self.device)
        actions = torch.LongTensor(actions).to(self.device)
        rewards = torch.FloatTensor(rewards).to(self.device)
        next_states = torch.FloatTensor(next_states).to(self.device)
        dones = torch.FloatTensor(dones).to(self.device)

        # 当前 Q(s,a)
        q_values = self.q_net(states) # 给定s, 对所有可能动作的Q值 输出为 (batch_size, action_dim)
        q_a = q_values.gather(1, actions.unsqueeze(1)).squeeze(1)

        # Q(s, a) = r(s, a, s') + gamma * Q_target(s', a')
        # 目标 Q(s',a')
        with torch.no_grad():
            next_q_values = self.target_q_net(next_states)
            max_next_q = next_q_values.max(1)[0]
            q_target = rewards + self.gamma * max_next_q * (1 - dones)

        # MSE 损失
        loss = nn.MSELoss()(q_a, q_target)

        # 反向传播
        self.optimizer.zero_grad() # 清除已有梯度
        # backward只负责计算梯度, step才是真正更新
        loss.backward()
        self.optimizer.step()

        # 软更新目标网络
        for target_param, param in zip(self.target_q_net.parameters(), self.q_net.parameters()):
            target_param.data.copy_(self.tau * param.data + (1.0 - self.tau) * target_param.data)

        # 衰减 ε
        if self.epsilon > self.epsilon_min:
            self.epsilon *= self.epsilon_decay

### 环境选择

In [6]:
#game = "CartPole-v1"
game = "LunarLander-v2"

In [7]:
# advance "LunarLander-v2"
env = gym.make(game)

In [8]:
state_dim = env.observation_space.shape[0]  
action_dim = env.action_space.n             

In [10]:
print(f"game: {game}, state_dim: {state_dim}, action_dim: {action_dim}\n")

game: LunarLander-v2, state_dim: 8, action_dim: 4



### 网络定义 训练

In [11]:
agent = DQNAgent(state_dim, action_dim, use_cuda=True)
replay_buffer = ReplayBuffer(capacity=50000)

Agent使用设备： cuda


In [21]:
env = gym.make(game)
num_episodes = 500
batch_size = 128
train_every = 10
reward_list = []
max_steps = 500

for episode in range(num_episodes):
    state, _ = env.reset()
    
    total_reward = 0
    action_round = 0
    done = False 
    
    while not done and action_round < max_steps:
        action = agent.select_action(state)
        action_round +=1
        
        next_state, reward, done, _, _ = env.step(action)
        replay_buffer.add(state, action, reward, next_state, done)

        state = next_state
        total_reward += reward
        
        if action_round % train_every == 0:
            agent.train(replay_buffer, batch_size)
        
    reward_list.append(total_reward)
    
    if episode % 10 == 0:
        print(f"Episode {episode}, Total Reward: {total_reward:.2f}, Epsilon: {agent.epsilon:.3f}")
        agent.epsilon = max(agent.epsilon, 0.1)

Episode 0, Total Reward: 242.56, Epsilon: 0.000


### 动画实际演示

In [26]:
# 设置渲染模式为 pygame
env = gym.make(game, render_mode="human")

# 关闭 epsilon，完全贪婪策略
agent.epsilon = 0.0

# 可视化运行一回合
num_episodes = 1
for ep in range(num_episodes):
    state, _ = env.reset()
    done = False
    total_reward = 0
    t = 0

    while not done and t < max_steps:
        action = agent.select_action(state)
        state, reward, done, _, _ = env.step(action)
        total_reward += reward
        t += 1

        # render 使用 pygame 显示界面
        env.render()
        if t % 50 == 0: 
            print(f"t = {t}, total_reward: {total_reward}")
        # time.sleep(0.001)  # 控制每帧速度

    print(f"Episode {ep}, Reward: {total_reward}")
env.close()


t = 50, total_reward: 3.6320660600030816
t = 100, total_reward: 94.00498341382443
t = 150, total_reward: 120.91730386012924
t = 200, total_reward: 167.64466571253888
Episode 0, Reward: 267.6446673539538
