## PPO（Proximal Policy Optimization）近端策略优化算法
PPO是同策略算法。强化学习（RL）→ 策略梯度方法 → on-policy（在线学习），但通过重要性采样近似实现off-policy（离线学习）的数据复用。PPO 通过约束策略更新幅度，在稳定性和样本效率之间取得平衡，核心思想包括
- **剪裁目标函数**  
  通过剪裁新旧策略比值，防止策略更新幅度过大导致性能崩溃。  
  **剪裁公式**：  
  $$
  \mathcal{L}^{CLIP}(\theta) = \mathbb{E}_{s,a\sim\pi_{\theta_{\text{old}}}} \left[ \min\left( \frac{\pi_{\theta}(a|s)}{\pi_{\theta_{\text{old}}}(a|s)} A^{\pi_{\theta_{\text{old}}}}(s,a), \text{clip}\left( \frac{\pi_{\theta}(a|s)}{\pi_{\theta_{\text{old}}}(a|s)}, 1-\epsilon, 1+\epsilon \right) A^{\pi_{\theta_{\text{old}}}}(s,a) \right) \right]
  $$
  - $ \epsilon $ 为剪裁范围（如0.2）。

- **优势估计**  
  使用广义优势估计（GAE）计算优势函数，平衡偏差与方差：  
  $$
  A_t^{\text{GAE}} = \sum_{k=0}^{T-t-1} (\gamma\lambda)^k \delta_{t+k}, \quad \delta_t = r_t + \gamma V(s_{t+1}) - V(s_t)
  $$
  - $ \gamma $ 为折扣因子，$ \lambda $ 为GAE参数。

- **策略稳定性约束**  
  通过KL散度惩罚项或约束，强制新策略与旧策略的差异不超过阈值：  
  $$
  \mathcal{L}^{KL}(\theta) = \beta \cdot \mathbb{E}_{s\sim\pi_{\theta_{\text{old}}}} \left[ D_{KL}(\pi_{\theta_{\text{old}}}(\cdot|s) || \pi_{\theta}(\cdot|s)) \right]
  $$


### **算法流程**
1. **数据收集**：使用旧策略 $ \pi_{\theta_{\text{old}}} $ 采样轨迹数据。
2. **优势计算**：基于采样数据计算GAE优势 $ A_t^{\text{GAE}} $。
3. **策略更新**：最大化剪裁后的目标函数 $ \mathcal{L}^{CLIP} $，并通过KL散度约束防止策略突变。
4. **价值函数优化**：单独训练Critic网络（如用均方误差优化 $ V(s) $。

### **特点**
- **稳定性**：剪裁机制和KL约束避免策略崩溃。
- **样本效率**：通过重要性采样复用旧数据，减少对新样本的依赖。
- **实现简单**：相比TRPO（Trust Region Policy Optimization），无需复杂的二阶优化。
- **通用性**：适用于连续/离散动作空间，在游戏AI（如Dota 2）、机器人控制等领域表现优异。

### **公式总结**
- **总目标函数**：  
  $$
  \mathcal{L}(\theta) = \mathcal{L}^{CLIP}(\theta) - \mathcal{L}^{KL}(\theta)
  $$
- **优势函数**：GAE公式（见核心原理部分）。


### **应用场景**
- 复杂游戏AI（如OpenAI Five）。
- 机器人运动控制（如四足机器人平衡）。
- 自然语言处理（如文本生成优化）。

**关键公式**：剪裁目标函数、GAE优势估计、KL散度约束。

### 程序实现

代码中要注意实现的环境不同，输入和输出也不同，对应的超参数设置也不同，例如CarPole-v1的输入是4维连续向量，输出是2维，而CliffWalking-v0的输入是48维one-hot编码，输出是4维离散动作。

In [73]:
# 调用相关的包
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.distributions.categorical import Categorical
import random
from collections import deque
import copy
import gym
import numpy as np
import os
import matplotlib.pyplot as plt
import seaborn as sns
import argparse

#### 模型定义

##### 定义演员网络和评论家网络

PPO算法的model就是actor和critic两个网络。分别用MLP来拟合。Actor输出的是一个概率分布，critic输出的是一个值。critic网络的输入维度也可以是n_states+n_actions，也即将action的信息也纳入critic网络中，这样会更好一些。这里actor拟合**策略网络**，critic拟合**价值函数**。

In [74]:
class Actor(nn.Module):
    def __init__(self, n_states, n_actions, hidden_dim=256):
        super(Actor, self).__init__()

        self.actor = nn.Sequential(
            nn.Linear(n_states, hidden_dim),
            nn.ReLU(),
            nn.Linear(hidden_dim, hidden_dim),
            nn.ReLU(),
            nn.Linear(hidden_dim, n_actions),
            nn.Softmax(dim=1)
        )
    def forward(self, state):
        probs = self.actor(state)
        #dist = Categorical(probs)
        return probs
    
class Critic(nn.Module):
    def __init__(self, n_states, hidden_dim=256):
        super(Critic, self).__init__()

        self.critic = nn.Sequential(
            nn.Linear(n_states, hidden_dim),
            nn.ReLU(),
            nn.Linear(hidden_dim, hidden_dim),
            nn.ReLU(),
            nn.Linear(hidden_dim, 1)
        )
    def forward(self, state):
        value = self.critic(state)
        return value

#### 定义经验回放

**重要性采样**是从评论员采样到的数据进行重要性采样来更新演员模型。需要一个经验回放的队列，这个队列中存储的是状态、动作、奖励、下一个状态、是否结束。

In [75]:
class ReplayBufferQue:
    '''DQN的经验回放池，每次采样batch_size个样本
    '''
    def __init__(self, capacity: int) -> None:
        '''初始化经验回放池
        Args:
            capacity (int): 经验回放池的最大容量
        '''
        self.capacity = capacity
        # 使用deque实现经验回放池，设置最大长度为capacity
        self.buffer = deque(maxlen=self.capacity)
    
    def push(self, transitions):
        '''将一个经验样本添加到回放池中
        Args:
            transitions (tuple): 包含(state, action, log_p, reward, done)的经验样本
        '''
        # 将经验样本添加到buffer中，如果buffer已满，最旧的样本将被移除
        self.buffer.append(transitions)
    
    def sample(self, batch_size: int, sequential: bool = False):
        '''从经验回放池中随机采样一个batch的样本
        Args:
            batch_size (int): 采样的样本数量
            sequential (bool): 是否按照顺序采样，默认为False
        Returns:
            batch (tuple): 包含(state, action, log_p, reward, done)的batch样本
        '''
        if batch_size > len(self.buffer):
            batch_size = len(self.buffer)
        if sequential:
            # 如果需要按照顺序采样，则从buffer中随机选择batch_size个索引
            rand = random.randint(0, len(self.buffer) - batch_size)
            # 根据索引从buffer中取出对应的样本
            batch = [self.buffer[idx] for idx in range(rand, rand + batch_size)]
            return zip(*batch)
        else:
            # 如果不需要按照顺序采样，则直接从buffer中随机选择batch_size个样本
            batch = random.sample(self.buffer, batch_size)
            return zip(*batch) # 解包，返回各部分批量数据的元组
    
    def clear(self):
        '''清空经验回放池
        '''
        self.buffer.clear()
    
    def __len__(self):
        '''返回回放池中样本的数量
        '''
        return len(self.buffer)

class PGReplay(ReplayBufferQue):
    '''PG的经验回放池，每次采样所有样本，因此只需要继承ReplayBufferQue，重写sample方法即可
    '''
    def __init__(self):
        self.buffer = deque()
    
    def sample(self):
        '''从经验回放池中采样所有样本
        Returns:
            batch (tuple): 包含(state, action, log_p, reward, done)的batch样本
        '''
        batch = list(self.buffer)
        return zip(*batch)


#### 定义PPO智能体

In [None]:
class PPOAgent:
    def __init__(self, cfg) -> None:
        self.gamma = cfg.gamma # 折扣因子
        self.device = cfg.device # 设备
        self.actor = Actor(n_states=cfg.n_states, n_actions=cfg.n_actions, hidden_dim=cfg.actor_hidden_dim).to(self.device)
        self.critic = Critic(n_states=cfg.n_states, hidden_dim=cfg.critic_hidden_dim).to(self.device)
        self.actor_optimizer = torch.optim.Adam(self.actor.parameters(), lr=cfg.actor_lr)
        self.critic_optimizer = torch.optim.Adam(self.critic.parameters(), lr=cfg.critic_lr)
        self.memory = PGReplay() # 记忆回放
        self.k_epochs = cfg.k_epochs # 更新策略网络的轮数
        self.eps_clip = cfg.eps_clip # PPO的截断参数
        self.entropy_coef = cfg.entropy_coef # 熵系数
        self.sample_count = 0 # 记录采样次数
        self.update_freq = cfg.update_freq # 更新策略网络的频率

    def sample_action(self, state):
        '''根据当前状态采样动作
        Args:
            state: 当前环境状态，可以是连续的向量或离散的值。  
        Returns:
            action: 采样的动作，基于当前策略。
        '''
        # 每调用一次sample_action，样本计数增加1
        self.sample_count += 1
        # 输入时连续的向量，需要将其转换为tensor，如果是离散输入的话，需要转换为one-hot向量
        state = torch.tensor(state, device=self.device, dtype=torch.float32).unsqueeze(dim=0)
        # 使用actor网络计算动作概率分布
        probs = self.actor(state)
        # 创建一个分类分布对象，便于之后采样动作
        #print(probs)
        dist = Categorical(probs)  
        # 从概率分布中采样得到具体动作
        action = dist.sample() 
        # 计算所选动作的对数概率，并从计算图中分离，防止梯度更新时对其产生影响
        self.log_probs = dist.log_prob(action).detach()
        # 将动作从tensor转换为numpy数组，并返回其第一个元素，作为实际执行的动作
        return action.detach().cpu().numpy().item()
        
    @torch.no_grad()
    def predict_action(self, state):
        state = torch.tensor(state, device=self.device, dtype=torch.float32).unsqueeze(dim=0)
        probs = self.actor(state)
        dist = Categorical(probs)
        action = dist.sample()
        return action.detach().cpu().numpy().item()
        
    def update(self,):
        '''PPO更新策略网络
        每进行一次交互更新一次模型
        '''
        # 每n次更新一次策略网络
        if self.sample_count % self.update_freq != 0:
            return
        #print("update policy network")
        old_states, old_actions, old_log_probs, old_rewards, old_dones = self.memory.sample()
        # 转换数据为tensor
        old_states = torch.tensor(old_states, device=self.device, dtype=torch.float32)
        old_actions = torch.tensor(old_actions, device=self.device, dtype=torch.long)
        old_log_probs = torch.tensor(old_log_probs, device=self.device, dtype=torch.float32)

        # 状态奖励的monte carlo估计
        returns = [] # 存储每个状态的累计奖励
        discounted_sum = 0 # 折扣累计和
        # 逆序遍历旧的奖励和完成标志，计算每个状态的折扣累计奖励
        for reward, done in zip(reversed(old_rewards), reversed(old_dones)):
            # 如果遇到完成标志（done为True），则重置折扣累计和为0
            # 这是因为在一个episode结束时，累积奖励应当重新计算
            if done:
                discounted_sum = 0
            # 根据当前奖励和折扣因子更新折扣累计和
            # 这一步是monte carlo方法中计算回报的关键步骤
            discounted_sum = reward + self.gamma * discounted_sum
            # 将计算得到的折扣累计和插入到returns列表的最前面
            # 以保持与原始状态序列的顺序一致
            returns.insert(0, discounted_sum)
            
        # 归一化奖励
        returns = torch.tensor(returns, device=self.device, dtype=torch.float32)
        returns = (returns - returns.mean()) / (returns.std() + 1e-5)

        # 每次更新更新k_epochs次
        for _ in range(self.k_epochs):
            # 计算广义优势估计GAE = Q(s,a) - V(s)
            values = self.critic(old_states) # detach防止通过critic反传
            advantages = returns - values.detach()

            # 获取旧策略动作概率
            probs = self.actor(old_states)
            dist = Categorical(probs)
            # 获取新策略的动作概率
            new_probs = dist.log_prob(old_actions)

            # 计算比值(pi_theta / pi_theta_old)
            ratio = (new_probs - old_log_probs).exp()

            # 计算替代损失
            surr1 = ratio * advantages
            surr2 = torch.clamp(ratio, 1 - self.eps_clip, 1 + self.eps_clip) * advantages
            # 计算actor损失：剪裁机制+熵正则化（鼓励策略探索性，防止过早收敛到次优解）
            actor_loss = -torch.min(surr1, surr2).mean() + self.entropy_coef * dist.entropy().mean()
            # 计算critic损失
            critic_loss = (returns - values).pow(2).mean()
                
            # 更新策略网络
            self.actor_optimizer.zero_grad()
            self.critic_optimizer.zero_grad()
            actor_loss.backward()
            critic_loss.backward()
            self.actor_optimizer.step()
            self.critic_optimizer.step()
        self.memory.clear()
        


#### 定义训练

In [77]:
def train(cfg, env, agent):
    '''训练
    '''
    print("开始训练！")
    print(f"环境: {cfg.env_name}, 算法: {cfg.algo_name}, 设备: {cfg.device}")

    rewards = [] # 记录所有回合的奖励
    steps = [] # 记录所有回合的步数
    best_ep_reward = 0 # 记录最优episode的奖励
    output_agent = None # 输出agent

    # 训练循环
    for i_ep in range(cfg.train_eps):
        ep_rewards = 0 # 记录一回合内的奖励
        ep_step = 0 # 记录一回合内的步数
        state = env.reset() # 重置环境, 获取初始状态
        for _ in range(cfg.max_steps): # 每个episode最大步数
            ep_step += 1
            action = agent.sample_action(state) # 选择动作
            next_state, reward, done, _ = env.step(action) # 更新环境，返回transition
            agent.memory.push((state, action, agent.log_probs, reward, done)) # 保存transition到经验池中
            state = next_state
            agent.update() # 更新agent
            ep_rewards += reward
            if done:
                break
        
        # 一个epoch之后考虑是否进行评估
        if (i_ep+1) % cfg.eval_per_episode == 0:
            sum_eval_reward = 0

            # 评估eval_eps个episode
            for _ in range(cfg.eval_eps):
                eval_ep_reward = 0
                state = env.reset()
                # 玩一回合
                for _ in range(cfg.max_steps):
                    action = agent.predict_action(state) # 选择动作
                    next_state, reward, done, _ = env.step(action) # 更新环境，返回transition
                    state = next_state
                    eval_ep_reward += reward
                    if done:
                        break
                sum_eval_reward += eval_ep_reward
            mean_eval_reward = sum_eval_reward / cfg.eval_eps
            if mean_eval_reward >= best_ep_reward: # 记录最优episode的奖励
                best_ep_reward = mean_eval_reward
                output_agent = copy.deepcopy(agent) # 保存最优agent
                print(f"回合: {i_ep+1}/{cfg.train_eps}, 奖励: {ep_rewards:.2f}, 评估奖励: {mean_eval_reward:.2f}, 最大奖励: {best_ep_reward:.2f}")
            else:
                print(f"回合: {i_ep+1}/{cfg.train_eps}, 奖励: {ep_rewards:.2f}, 评估奖励: {mean_eval_reward:.2f}, 最大奖励: {best_ep_reward:.2f}")
        
        # 一次训练结束
        steps.append(ep_step) # 记录步数
        rewards.append(ep_rewards)
    
    # 训练结束，返回agent和奖励列表
    print("训练完成！")
    env.close()
    return output_agent, {"rewards": rewards}

def test(cfg, env, agent):
    '''测试
    '''
    print("开始测试！")
    print(f"环境: {cfg.env_name}, 算法: {cfg.algo_name}, 设备: {cfg.device}")
    rewards = []
    steps = []
    for i_ep in range(cfg.test_eps):
        ep_rewards = 0
        ep_step = 0
        state = env.reset()
        for _ in range(cfg.max_steps):
            ep_step += 1
            action = agent.predict_action(state) # 选择动作
            next_state, reward, done, _ = env.step(action) # 更新环境，返回transition
            state = next_state # 更新状态
            ep_rewards += reward
            if done:
                break
        steps.append(ep_step)
        rewards.append(ep_rewards)
        print(f"回合: {i_ep+1}/{cfg.test_eps}, 奖励: {ep_rewards:.1f}, 步数: {ep_step}")
        env.close()
        return {"rewards": rewards}


#### 定义环境

In [78]:
def all_seed(env, seed = 1):
    '''万能的seed函数
    '''
    if seed == 0:
        return
    env.seed(seed) # config for env
    np.random.seed(seed)
    random.seed(seed)
    torch.manual_seed(seed) # config for CPU
    torch.cuda.manual_seed(seed) # config for GPU
    os.environ["PYTHONHASHSEED"] = str(seed) # config for Python scripts
    # config for cudnn
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False
    torch.backends.cudnn.enabled = False

def env_agent_config(cfg):
    env = gym.make(cfg.env_name) # 创建环境
    all_seed(env, seed=cfg.seed)
    n_states = env.observation_space.shape[0]
    n_actions = env.action_space.n
    print(f"状态空间维度:{n_states}, 动作空间维度:{n_actions}")

    # 更新n_states和n_actions到cfg参数中
    setattr(cfg, 'n_states', n_states)
    setattr(cfg, 'n_actions', n_actions)
    if torch.cuda.is_available():
        setattr(cfg, 'device', "cuda")
    agent = PPOAgent(cfg)
    return env, agent


#### 设置超参数

In [79]:
def get_config():
    parser = argparse.ArgumentParser()
    parser.add_argument('--device', type=str, default="cpu", help='设备')
    parser.add_argument('--algo_name', type=str, default='PPO', help='算法名称')
    parser.add_argument('--env_name', type=str, default='CartPole-v0', help='环境名称')
    parser.add_argument('--render', type=bool, default=False, help='是否显示环境')

    parser.add_argument('--actor_lr', type=float, default=3e-4, help='学习率')
    parser.add_argument('--critic_lr', type=float, default=3e-4, help='学习率')
    parser.add_argument('--batch_size', type=int, default=64, help='batch_size')
    parser.add_argument('--train_eps', type=int, default=10000, help='训练次数')
    parser.add_argument('--test_eps', type=int, default=20, help='测试次数')
    parser.add_argument('--eval_eps', type=int, default=5, help='评估次数')
    parser.add_argument('--eval_per_episode', type=int, default=10, help='评估频率')

    parser.add_argument('--seed', type=int, default=8, help='随机种子')
    parser.add_argument('--if_load_ckpt', type=bool, default=False, help='是否加载模型')
    parser.add_argument('--ckpt_path', type=str, default="ckpt/", help='模型保存路径')
    parser.add_argument('--ckpt_name', type=str, default="ppo.pth", help='模型保存名称')
    
    parser.add_argument('--gamma', type=float, default=0.99, help='折扣因子')
    parser.add_argument('--n_states', type=int, default=4, help='状态个数')
    parser.add_argument('--n_actions', type=int, default=2, help='动作个数')
    parser.add_argument('--actor_hidden_dim', type=int, default=256, help='actor隐藏层维度')
    parser.add_argument('--critic_hidden_dim', type=int, default=256, help='critic隐藏层维度')
    parser.add_argument('--max_steps', type=int, default=200, help='最大步数')
    parser.add_argument('--eps_clip', type=float, default=0.2, help='截断阈值')
    parser.add_argument('--k_epochs', type=int, default=4, help='PPO更新策略网络的次数')
    parser.add_argument('--entropy_coef', type=float, default=0.01, help='熵系数')
    parser.add_argument('--update_freq', type=int, default=100, help='更新频率')
    

    return parser.parse_args([])

def smooth(data, weight=0.9):  
    '''用于平滑曲线，类似于Tensorboard中的smooth曲线
    '''
    last = data[0] 
    smoothed = []
    for point in data:
        smoothed_val = last * weight + (1 - weight) * point  # 计算平滑值
        smoothed.append(smoothed_val)                    
        last = smoothed_val                                
    return smoothed

def plot_rewards(rewards,cfg, tag='train'):
    ''' 画图
    '''
    sns.set()
    plt.figure()  # 创建一个图形实例，方便同时多画几个图
    plt.title(f"{tag}ing curve on {cfg.device} of {cfg.algo_name} for {cfg.env_name}")
    plt.xlabel('epsiodes')
    plt.plot(rewards, label='rewards')
    plt.plot(smooth(rewards), label='smoothed')
    plt.legend()


#### 开始训练

In [80]:
# 获取参数
cfg = get_config()

# 训练
env, agent = env_agent_config(cfg)
best_agent, res_dic = train(cfg, env, agent)
plot_rewards(res_dic)

# 测试
res_dic = test(cfg, env, best_agent)
plot_rewards(res_dic)

状态空间维度:4, 动作空间维度:2
开始训练！
环境: CartPole-v0, 算法: PPO, 设备: cpu
update policy network
update policy network
回合: 10/10000, 奖励: 21.00, 评估奖励: 21.20, 最大奖励: 21.20
update policy network
update policy network
回合: 20/10000, 奖励: 20.00, 评估奖励: 24.80, 最大奖励: 24.80
update policy network
update policy network
update policy network
回合: 30/10000, 奖励: 15.00, 评估奖励: 19.80, 最大奖励: 24.80
update policy network
update policy network
update policy network
回合: 40/10000, 奖励: 17.00, 评估奖励: 28.80, 最大奖励: 28.80
update policy network
update policy network
update policy network
回合: 50/10000, 奖励: 42.00, 评估奖励: 31.40, 最大奖励: 31.40
update policy network
update policy network
update policy network
update policy network
update policy network
回合: 60/10000, 奖励: 52.00, 评估奖励: 37.80, 最大奖励: 37.80
update policy network
update policy network
update policy network
update policy network
update policy network
回合: 70/10000, 奖励: 62.00, 评估奖励: 49.40, 最大奖励: 49.40
update policy network
update policy network
update policy network
update policy netwo

KeyboardInterrupt: 