DQN主要包含：

1. **两个网络（策略网络和目标网络）**：这两个网络架构一模一样，初始化时参数也相同。但在训练过程中，策略网络每步都更新；目标网络在一定步数后才更新一次（直接将策略网络的参数复制过去）。这样做是为了稳定训练，防止Q值估计的发散。

**目标函数**：DQN使用均方误差（Mean Squared Error, MSE）作为目标函数，计算当前Q值与目标Q值之间的差异。目标Q值由奖励和下一个状态的最大Q值计算得出，具体公式为：

$$
y = r + \gamma \max_{a'} Q_{\text{target}}(s', a')
$$

其中，$y$ 是目标Q值，$r$ 是即时奖励，$\gamma$ 是折扣因子，$s'$ 是下一个状态，$Q_{\text{target}}$ 是目标网络的Q值。

**损失函数**：DQN的损失函数定义为当前Q值与目标Q值之间的均方误差：

$$
L(\theta) = \mathbb{E}_{(s, a, r, s') \sim D} \left[ \left( y - Q_{\text{policy}}(s, a; \theta) \right)^2 \right]
$$

其中，$\theta$ 是策略网络的参数，$D$ 是经验回放缓冲区中的数据分布。

1. **经验回放（Experience Replay）**：DQN使用一个经验回放缓冲区来存储智能体与环境交互的历史数据（状态、动作、奖励、下一个状态）。在训练时，从这个缓冲区随机采样小批量数据进行训练，打破数据之间的相关性，提高训练效率和稳定性。 

## 网络
网络的输入是**状态**，大小是**状态的数量**。

输出的是在采取该动作后的Q值，输出大小跟**动作数**一样

要考虑**终止状态**和**非终止状态**的区别

$$
y_i = r + γ max_a' Q_target(s', a')  (非终止状态)
$$

$$
y_i = r  (终止状态)
$$

In [None]:
"""
用简单的线性网络来实现DQN的网络结构
"""
import torch.nn as nn
import torch.nn.functional as F
class MLP(nn.Module):
    def __init__(self, n_states,n_actions,hidden_dim=128):
        """ 初始化q网络，为全连接网络
        """
        super(MLP, self).__init__()
        self.fc1 = nn.Linear(n_states, hidden_dim) # 输入层
        self.fc2 = nn.Linear(hidden_dim,hidden_dim) # 隐藏层
        self.fc3 = nn.Linear(hidden_dim, n_actions) # 输出层
        
    def forward(self, x):
        # 各层对应的激活函数
        x = F.relu(self.fc1(x)) 
        x = F.relu(self.fc2(x))
        return self.fc3(x)

In [None]:
"""
定义 经验回放池
"""

from collections import deque
import random

class ReplayBuffer(object):
    def __init__(self,capacity):
        self.capacity = capacity  # 最大容量
        self.buffer = deque(maxlen=self.capacity)

    def push(self,transition):
        """
        添加经验到经验回放池中
        """
        self.buffer.append(transition)
    
    def sample(self,batch_size:int, sequential:bool=False):
        """
        从经验回放池中采样一批经验
        """
        if batch_size > len(self.buffer):
            # 如果请求的批量大小大于当前缓冲区大小，则调整为当前回放池大小
            batch_size = len(self.buffer)
        if sequential:
            rand = random.randint(0, len(self.buffer) - batch_size) # 随机起始位置
            batch = [self.buffer[i] for i in range(rand, rand + batch_size)] # 从起始位置开始连续采样
            return zip(*batch)
        else:
            # 随机采样
            indices = random.sample(range(len(self.buffer)), batch_size)
        batch = [self.buffer[idx] for idx in indices]
        return map(list, zip(*batch))  # 转置操作，将每个元素分开返回

    def clear(self):
        ''' 清空经验回放
        '''
        self.buffer.clear()
    def __len__(self):
        ''' 返回当前存储的量
        '''
        return len(self.buffer)    

In [None]:
"""
DQN算法实现
"""

import torch
import torch.optim as optim
import math
import numpy as np

class DQN:
    def __init__(self, model,memory,cfg):
        self.n_actions = cfg.n_actions  # 动作数
        self.device = torch.device(cfg.device)
        self.gamma = cfg.gamma  # 折扣因子
        # epsilon-greedy参数
        self.sample_count = 0  # 用于epsilon的衰减计数
        self.epsilon = cfg.epsilon_start  # 初始epsilon值
        self.sample_count = 0  
        self.epsilon_start = cfg.epsilon_start
        self.epsilon_end = cfg.epsilon_end
        self.epsilon_decay = cfg.epsilon_decay   # 衰减速率
        # 训练参数
        self.batch_size = cfg.batch_size
        self.policy_net = model.to(self.device)  # 策略网络
        self.target_net = model.to(self.device)  # 目标网络

        # 复制策略网络的参数到目标网络
        for target_param, param in zip(self.target_net.parameters(), self.policy_net.parameters()):
            target_param.data.copy_(param.data)
        
        self.optimizer = optim.Adam(self.policy_net.parameters(), lr=cfg.lr)
        self.memory = memory  # 经验回放池
    
    def sample_action(self,state):
        """epsilon-greedy策略选择动作"""
        self.sample_count += 1    # 采样次数加1
        # 计算当前epsilon值，随着采样次数增加逐渐减小
        self.epsilon = self.epsilon_end + (self.epsilon_start - self.epsilon_end) * \
            math.exp(-1. * self.sample_count / self.epsilon_decay)
        
        if random.random() > self.epsilon:
            # 选择Q值最大的动作
            with torch.no_grad():
                state = torch.FloatTensor(state).unsqueeze(0).to(self.device)  
                q_values = self.policy_net(state)  # 策略网络的输出（Q值）
                action = q_values.max(1)[1].item() # 选择最大Q值对应的动作
        else:
            # 随机选择动作
            action = random.randrange(self.n_actions)
        
        return action

    @ torch.no_grad() # 在Evaluation时不计算梯度
    def predict_action(self,state):
        """
        eval时选择动作用，不使用epsilon-greedy，
        直接选择Q值最大的动作
        """

        state = torch.tensor(state, device=self.device, dtype=torch.float32).unsqueeze(dim=0)
        q_values = self.policy_net(state)  # 策略网络的输出（Q值）
        action = q_values.max(1)[1].item() # 选择最大Q值对应的动作

        return action

    def update(self):
        """
        更新策略网络参数
        """
        if len(self.memory) < self.batch_size:
            return  # 经验回放池中样本不足一个batch时，就不更新
        
        # 采样一个batch的状态，动作，奖励，下一个状态，done标志
        state_batch, action_batch, reward_batch, next_state_batch, done_batch = self.memory.sample(self.batch_size)

        # 转换为torch张量
        state_batch = torch.tensor(np.array(state_batch), device=self.device, dtype=torch.float)
        action_batch = torch.tensor(action_batch, device=self.device).unsqueeze(1)  
        reward_batch = torch.tensor(reward_batch, device=self.device, dtype=torch.float)  
        next_state_batch = torch.tensor(np.array(next_state_batch), device=self.device, dtype=torch.float)
        done_batch = torch.tensor(np.float32(done_batch), device=self.device)

        # 计算当前状态（state_batch）下，采取action_batch动作的Q值
        q_values = self.policy_net(state_batch).gather(dim=1, index=action_batch)

        # 计算下一个状态（next_state_batch）下的最大Q值
        next_q_values = self.target_net(next_state_batch).max(1)[0].detach

        # 计算目标Q值，对于终止状态，此时done_batch[0]=1, 对应的expected_q_value等于reward
        target_q_values = reward_batch + self.gamma * next_q_values * (1 - done_batch)

        # 计算损失函数（均方误差）
        loss = torch.nn.functional.mse_loss(q_values, target_q_values.unsqueeze(1))

        # 优化策略网络
        self.optimizer.zero_grad()
        loss.backward()

        # clip,防止梯度爆炸
        for param in self.policy_net.parameters():
            param.grad.data.clamp_(-1, 1)
        self.optimizer.step()

In [None]:
"""
训练代码
"""

def train(cfg, env, agent):
    print("开始训练...")
    rewards = []  # 用于记录每个episode的累计奖励
    steps = []    # 用于记录每个episode的步数

    for episode in range(cfg.num_episodes):
        ep_reward = 0  # 记录当前episode的累计奖励
        ep_steps = 0  # 记录当前episode的步数
        state = env.reset()  # 重置环境，获取初始状态

        for _ in range(cfg.max_steps_per_episode):
            action = agent.sample_action(state)  # 选择动作
            next_state, reward, done, _ = env.step(action)  # 执行动作，获取即时奖励和下一个状态
            agent.memory.push(state, action, reward, next_state, done)  # 存储经验到回放池
            agent.update()  # 更新策略网络

            state = next_state  # 转移到下一个状态
            ep_reward += reward  # 累积奖励
            ep_steps += 1  # 步数加1

            if done:
                break

        if (episode + 1) % cfg.target_update_freq == 0:
            # 每隔一定episode更新目标网络
            for target_param, param in zip(agent.target_net.parameters(), agent.policy_net.parameters()):
                target_param.data.copy_(param.data)

        rewards.append(ep_reward)
        steps.append(ep_steps)

        if (episode + 1) % 10 == 0:
            # 每10个episode打印一次信息
            print(f"回合：{i_ep+1}/{cfg['train_eps']}，奖励：{ep_reward:.2f}，Epislon：{agent.epsilon:.3f}")

    print("训练结束！")
    env.close()
    return {"rewards": rewards, "steps": steps}  # 返回训练结果

def test(cfg,env,agent):
    print("开始测试...")
    rewards = []  # 用于记录每个episode的累计奖励
    steps = []    # 用于记录每个episode的步数

    for episode in range(cfg.test_episodes):
        ep_reward = 0  # 记录当前episode的累计奖励
        ep_steps = 0  # 记录当前episode的步数
        state = env.reset()  # 重置环境，获取初始状态

        for _ in range(cfg.max_steps_per_episode):
            action = agent.predict_action(state)  # 这里动作用predict直接选Q值最大而不是sample
            next_state, reward, done, _ = env.step(action)  # 执行动作，获取即时奖励和下一个状态

            state = next_state  # 转移到下一个状态
            ep_reward += reward  # 累积奖励
            ep_steps += 1  # 步数加1

            if done:
                break

        rewards.append(ep_reward)
        steps.append(ep_steps)

        print(f"回合：{episode+1}/{cfg.test_episodes}，奖励：{ep_reward:.2f}")

    print("测试结束！")
    env.close()
    return {"rewards": rewards, "steps": steps}  # 返回测试结果

In [None]:
"""
定义环境
"""

import gym
import os
def all_seed(env,seed = 1):
    ''' 万能的seed函数
    '''
    env.seed(seed) # env config
    np.random.seed(seed)
    random.seed(seed)
    torch.manual_seed(seed) # config for CPU
    torch.cuda.manual_seed(seed) # config for GPU
    os.environ['PYTHONHASHSEED'] = str(seed) # config for python scripts
    # config for cudnn
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False
    torch.backends.cudnn.enabled = False
def env_agent_config(cfg):
    env = gym.make(cfg['env_name']) # 创建环境
    if cfg['seed'] !=0:
        all_seed(env,seed=cfg['seed'])
    n_states = env.observation_space.shape[0]
    n_actions = env.action_space.n
    print(f"状态空间维度：{n_states}，动作空间维度：{n_actions}")
    cfg.update({"n_states":n_states,"n_actions":n_actions}) # 更新n_states和n_actions到cfg参数中
    model = MLP(n_states, n_actions, hidden_dim = cfg['hidden_dim']) # 创建模型
    memory = ReplayBuffer(cfg['memory_capacity'])
    agent = DQN(model,memory,cfg)
    return env,agent

In [None]:
"""
训练参数
"""

import argparse
import matplotlib.pyplot as plt
import seaborn as sns
def get_args():
    """ 超参数
    """
    parser = argparse.ArgumentParser(description="hyperparameters")      
    parser.add_argument('--algo_name',default='DQN',type=str,help="name of algorithm")
    parser.add_argument('--env_name',default='CartPole-v0',type=str,help="name of environment")
    parser.add_argument('--train_eps',default=200,type=int,help="episodes of training")
    parser.add_argument('--test_eps',default=20,type=int,help="episodes of testing")
    parser.add_argument('--ep_max_steps',default = 100000,type=int,help="steps per episode, much larger value can simulate infinite steps")
    parser.add_argument('--gamma',default=0.95,type=float,help="discounted factor")
    parser.add_argument('--epsilon_start',default=0.95,type=float,help="initial value of epsilon")
    parser.add_argument('--epsilon_end',default=0.01,type=float,help="final value of epsilon")
    parser.add_argument('--epsilon_decay',default=500,type=int,help="decay rate of epsilon, the higher value, the slower decay")
    parser.add_argument('--lr',default=0.0001,type=float,help="learning rate")
    parser.add_argument('--memory_capacity',default=100000,type=int,help="memory capacity")
    parser.add_argument('--batch_size',default=64,type=int)
    parser.add_argument('--target_update',default=4,type=int)
    parser.add_argument('--hidden_dim',default=256,type=int)
    parser.add_argument('--device',default='cpu',type=str,help="cpu or cuda") 
    parser.add_argument('--seed',default=10,type=int,help="seed")   
    args = parser.parse_args([])
    args = {**vars(args)}  # 转换成字典类型    
    ## 打印超参数
    print("超参数")
    print(''.join(['=']*80))
    tplt = "{:^20}\t{:^20}\t{:^20}"
    print(tplt.format("Name", "Value", "Type"))
    for k,v in args.items():
        print(tplt.format(k,v,str(type(v))))   
    print(''.join(['=']*80))      
    return args
def smooth(data, weight=0.9):  
    '''用于平滑曲线，类似于Tensorboard中的smooth曲线
    '''
    last = data[0] 
    smoothed = []
    for point in data:
        smoothed_val = last * weight + (1 - weight) * point  # 计算平滑值
        smoothed.append(smoothed_val)                    
        last = smoothed_val                                
    return smoothed

def plot_rewards(rewards,cfg, tag='train'):
    ''' 画图
    '''
    sns.set()
    plt.figure()  # 创建一个图形实例，方便同时多画几个图
    plt.title(f"{tag}ing curve on {cfg['device']} of {cfg['algo_name']} for {cfg['env_name']}")
    plt.xlabel('epsiodes')
    plt.plot(rewards, label='rewards')
    plt.plot(smooth(rewards), label='smoothed')
    plt.legend()
    plt.show()

In [None]:
"""
训练
"""

# 获取参数
cfg = get_args() 
# 训练
env, agent = env_agent_config(cfg)
res_dic = train(cfg, env, agent)
 
plot_rewards(res_dic['rewards'], cfg, tag="train")  
# 测试
res_dic = test(cfg, env, agent)
plot_rewards(res_dic['rewards'], cfg, tag="test")  # 画出结果