# Actor-Critic (AC), A2C, and A3C 算法教程

1.  **Actor-Critic (AC)**: 基础版本，结合了基于策略（Actor）和基于价值（Critic）的方法。
2.  **Advantage Actor-Critic (A2C)**: AC 的一个改进版本，使用优势函数（Advantage Function）来减少梯度估计的方差，从而使训练更稳定。
3.  **Asynchronous Advantage Actor-Critic (A3C)**: A2C 的一个异步版本，使用多线程并行地与环境交互和更新模型，极大地提高了训练效率和效果。

我们将从定义共享的模型架构开始，然后逐步实现这三种算法的训练逻辑。

---

## 1. 导入必要的库并设置环境

首先，我们需要导入所有必需的库。我们将使用 `torch` 来构建神经网络，使用 `gym` 作为我们的强化学习环境（以经典的 `CartPole-v1` 为例），并使用 `multiprocessing` 来实现 A3C 的异步特性。

In [2]:
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import torch.multiprocessing as mp
import gymnasium as gym
import numpy as np
import matplotlib.pyplot as plt
import time

print(f"PyTorch version: {torch.__version__}")
print(f"Gymnasium version: {gym.__version__}")

PyTorch version: 2.7.1+cu128
Gymnasium version: 1.1.1


## 2. Actor-Critic 模型架构

AC、A2C 和 A3C 都使用相同的核心网络结构。这个网络被称为 Actor-Critic 网络，因为它有两个输出头：

-   **Actor (策略头)**: 输出一个动作的概率分布，用于决定在当前状态下应该采取哪个动作。
-   **Critic (价值头)**: 输出当前状态的价值估计 (V-value)，用于评估当前状态的好坏。

让 Actor 和 Critic 共享网络的前几层可以使模型学习到对两者都有用的状态表示，从而提高学习效率和稳定性。

In [3]:
# Actor-Critic 网络
class ActorCritic(nn.Module):
    def __init__(self, input_shape, n_actions):
        super(ActorCritic, self).__init__()
        # 定义共享的神经网络层
        self.fc1 = nn.Linear(input_shape, 128)
        self.fc2 = nn.Linear(128, 128)
        
        # Actor-specific layer (策略输出)
        self.actor = nn.Linear(128, n_actions)
        
        # Critic-specific layer (价值输出)
        self.critic = nn.Linear(128, 1)

    def forward(self, x):
        """前向传播，返回动作概率和状态价值"""
        # 通过共享层
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        
        # Actor 输出: 动作的概率分布
        actor_output = F.softmax(self.actor(x), dim=-1)
        
        # Critic 输出: 状态的价值
        critic_output = self.critic(x)
        
        return actor_output, critic_output

## 3. Actor-Critic (AC) 算法

AC 算法是基础。它的核心思想是：

-   **Critic** 学习如何评估状态的价值，它通过时序差分 (TD) 误差来更新。具体来说，它试图让 `V(s)` 逼近 `r + γ * V(s')`。
-   **Actor** 根据 Critic 的评估来调整自己的策略。如果 Critic 认为某个动作导致了比预期更好的结果（即 `Q` 值较高），Actor 就会增加选择该动作的概率。

在基础 AC 中，我们直接使用 `Q` 值来更新 Actor。`Q` 值可以通过 `r + γ * V(s')` 来估计。

### 3.1 AC 训练过程 (Update Function)

下面的 `update` 函数实现了 AC 的单步更新逻辑。注意这里的 `q_value` 是如何计算的，以及 Actor 和 Critic 的损失函数是如何定义的。

In [4]:
def update_ac(actor_critic_model, optimizer, state, action, reward, next_state, done, gamma=0.99):
    # 将输入数据转换为 PyTorch Tensors
    state = torch.FloatTensor(state).unsqueeze(0)
    next_state = torch.FloatTensor(next_state).unsqueeze(0)
    action = torch.LongTensor([action])
    reward = torch.FloatTensor([reward])
    done = torch.FloatTensor([int(done)])

    # 使用 Critic 评估 next_state 的价值
    _, next_state_value = actor_critic_model(next_state)
    # 使用 Critic 评估 state 的价值
    _, state_value = actor_critic_model(state)
    
    # 计算 Q 值 (TD Target)
    # Q(s, a) = r + γ * V(s')
    q_value = reward + gamma * next_state_value * (1 - done)

    # 计算 Actor Loss (策略损失)
    # 目标是最大化 log(π(a|s)) * Q(s,a)
    # 在 PyTorch 中，优化器是最小化 loss，所以我们取负号
    probs, _ = actor_critic_model(state) # 获取动作的概率分布
    log_prob = torch.log(probs.squeeze(0)[action]) # 获取当前动作的对数概率
    actor_loss = -(log_prob * q_value.detach()).mean() # detach Q-value to prevent gradients from flowing into the critic

    # 计算 Critic Loss (价值损失)
    # Critic 的目标是让 V(s) 尽可能接近 Q(s,a)
    critic_loss = F.mse_loss(state_value, q_value.detach()) # 使用均方误差
    
    # 总损失 = Actor 损失 + Critic 损失
    loss = actor_loss + critic_loss

    # 更新网络参数
    optimizer.zero_grad()
    loss.backward()
    optimizer.step()

### 3.2 AC 采样与训练循环

AC 是一种在线 (on-policy) 策略，它每执行一步（采样）就立即进行一次更新。下面的循环展示了智能体如何与环境交互并调用 `update_ac` 函数。

In [5]:
def train_ac():
    env = gym.make('CartPole-v1')
    input_shape = env.observation_space.shape[0]
    n_actions = env.action_space.n
    
    ac_model = ActorCritic(input_shape, n_actions)
    optimizer = optim.Adam(ac_model.parameters(), lr=0.001)
    
    print("--- Starting AC Training ---")
    for i in range(200):  # 训练200个episode
        state, _ = env.reset()
        done = False
        total_reward = 0

        while not done:
            # 1. 采样动作
            state_tensor = torch.FloatTensor(state)
            probs, _ = ac_model(state_tensor)
            action = probs.multinomial(num_samples=1).item()
            
            # 2. 与环境交互
            next_state, reward, done, _, info = env.step(action)
            
            # 3. 更新网络
            update_ac(ac_model, optimizer, state, action, reward, next_state, done)
            
            state = next_state
            total_reward += reward
        
        if i % 20 == 0:
            print(f"Episode {i}, Total Reward: {total_reward}")
    print("--- AC Training Finished ---")
    env.close()

# train_ac() # 取消注释以运行AC训练

## 4. Advantage Actor-Critic (A2C) 算法

A2C 是对 AC 的一个关键改进。AC 使用 `Q` 值来指导策略更新，但 `Q` 值的绝对大小波动很大，这会导致梯度方差很大，训练不稳定。

A2C 引入了 **优势函数 (Advantage Function)** 来解决这个问题。

$$ A(s, a) = Q(s, a) - V(s) $$

优势函数衡量的是，在状态 `s` 下采取动作 `a` 比平均情况（即 `V(s)`）好多少。用优势函数替代 `Q` 值来更新 Actor，可以显著降低梯度方差，因为我们只关心相对好坏，而不是绝对价值。

由于 `Q(s, a)` 通常被估计为 `r + γ * V(s')`，所以优势函数可以写成：

$$ A(s, a) \approx (r + \gamma V(s')) - V(s) $$

这个值就是 TD 误差 (TD Error)。

### 4.1 A2C 训练过程 (Update Function)

A2C 的 `update` 函数与 AC 非常相似，主要区别在于：

1.  我们计算 `advantage` 而不是 `q_value`。
2.  Actor loss 使用 `advantage` 进行加权。
3.  Critic loss 现在直接最小化 `advantage` 的平方，这等价于让 `V(s)` 逼近 `r + γ * V(s')`。

In [6]:
def update_a2c(actor_critic_model, optimizer, state, action, reward, next_state, done, gamma=0.99):
    # 将输入数据转换为 PyTorch Tensors
    state = torch.FloatTensor(state).unsqueeze(0)
    next_state = torch.FloatTensor(next_state).unsqueeze(0)
    action = torch.LongTensor([action])
    reward = torch.FloatTensor([reward])
    done = torch.FloatTensor([int(done)])

    # 使用 Critic 评估 state 和 next_state 的价值
    _, next_state_value = actor_critic_model(next_state)
    _, state_value = actor_critic_model(state)

    # 计算 Advantage (优势)
    # A(s, a) = r + γ * V(s') - V(s)
    advantage = reward + gamma * next_state_value * (1 - done) - state_value

    # 计算 Actor Loss (策略损失)
    # 使用 advantage 替代 Q-value
    probs, _ = actor_critic_model(state)
    log_prob = torch.log(probs.squeeze(0)[action])
    actor_loss = -(log_prob * advantage.detach()).mean()

    # 计算 Critic Loss (价值损失)
    # Critic 的目标是最小化 TD 误差，即 advantage
    critic_loss = advantage.pow(2).mean() # 最小化advantage的平方误差
    
    # 总损失
    loss = actor_loss + critic_loss

    # 更新网络参数
    optimizer.zero_grad()
    loss.backward()
    optimizer.step()

## 5. Asynchronous Advantage Actor-Critic (A3C) 算法

尽管 A2C 已经很有效，但它仍然受到样本相关性高的困扰（因为是 on-policy 连续采样）。A3C 通过 **异步训练** 来解决这个问题。

A3C 的核心思想是：

1.  **一个全局网络 (Global Network)**：保存着主模型的参数。
2.  **多个工作者 (Workers)**：每个 worker 都在自己的 CPU 核心上运行。每个 worker 都有一个 **本地网络 (Local Network)** 和一个独立的环境实例。
3.  **并行探索**：所有 workers 并行地在各自的环境中进行探索和采样。因为每个 worker 的探索轨迹不同，所以收集到的数据是高度多样化和去相关的。
4.  **异步更新**：每个 worker 在本地收集到一定量的数据后，会计算出参数的梯度，然后用这个梯度去 **异步地更新全局网络**。更新完后，它会从全局网络拉取最新的参数到自己的本地网络，然后开始新一轮的探索。

这种并行化和异步更新的机制使得 A3C 不需要经验回放池 (Replay Buffer)，并且训练过程非常高效和稳定。

### 5.1 A3C 主进程

主进程负责初始化全局网络、优化器，并创建和启动所有 worker 进程。`global_model.share_memory()` 是一个关键步骤，它允许不同进程共享全局模型的参数。

In [7]:
def setup_a3c_and_run():
    # 注意：多进程代码通常需要放在 `if __name__ == '__main__':` 块中以避免问题

    # 创建环境以获取参数
    env = gym.make('CartPole-v1')
    n_actions = env.action_space.n
    input_shape = env.observation_space.shape[0]
    env.close()

    # 1. 创建全局模型
    global_model = ActorCritic(input_shape, n_actions)
    global_model.share_memory()

    # (新增) 使用 Manager 创建一个所有进程共享的列表
    with mp.Manager() as manager:
        rewards_list = manager.list()  # 用于存储所有 worker 的奖励

        # 2. 创建全局优化器
        optimizer = optim.Adam(global_model.parameters(), lr=0.0005)

        # 3. 创建并启动工作进程
        processes = []
        num_processes = mp.cpu_count()
        print(f"Starting {num_processes} worker processes...")

        for i in range(num_processes):
            # (修改) 将共享列表传递给 worker
            p = mp.Process(target=worker, args=(global_model, optimizer, input_shape, n_actions, i, rewards_list))
            p.start()
            processes.append(p)

        for p in processes:
            p.join()

        print("--- A3C Training Finished ---")

        # (修改) 训练结束后，调用评估函数
        # 此时的 global_model 包含了所有 worker 的训练成果
        evaluate(global_model)

        # (新增) 训练结束后，绘制奖励曲线
        plt.figure(figsize=(10, 5))
        plt.plot(list(rewards_list))
        plt.title('A3C Training Rewards')
        plt.xlabel('Episodes (across all workers)')
        plt.ylabel('Total Reward')
        # (可选) 计算并绘制奖励的移动平均线，让趋势更清晰
        rewards_np = np.array(list(rewards_list))
        moving_avg = np.convolve(rewards_np, np.ones(100) / 100, mode='valid')
        plt.plot(moving_avg, linewidth=3, label='Moving Average (100 episodes)')
        plt.legend()
        plt.grid(True)
        plt.show()

### 5.2 A3C Worker 逻辑

每个 `worker` 函数是 A3C 的核心。它包含一个完整的训练循环：

1.  从全局模型复制参数到本地模型。
2.  在自己的环境中与智能体交互、采样数据。
3.  调用 `update_a3c` 函数计算梯度并更新全局模型。
4.  重复此过程。

In [8]:
def worker(global_model, optimizer, input_shape, n_actions, worker_id, rewards_list):
    """每个 Worker 进程执行的函数"""
    # 1. 创建一个新的环境实例和本地模型
    env = gym.make('CartPole-v1')
    local_model = ActorCritic(input_shape, n_actions)

    # 对每一个 Episode 进行迭代
    for i_episode in range(200): # 每个worker跑200个episode
        # 从全局模型同步最新的参数到本地模型
        local_model.load_state_dict(global_model.state_dict())
        
        state, _ = env.reset()
        done = False
        total_reward = 0
        
        # 在一个 episode 内部进行循环
        while not done:
            # 使用本地模型进行采样
            state_tensor = torch.FloatTensor(state)
            probs, _ = local_model(state_tensor)
            action = probs.multinomial(num_samples=1).item()
            
            next_state, reward, done, _, info = env.step(action)
            total_reward += reward

            # 使用收集到的数据更新全局模型
            update_a3c(global_model, optimizer, local_model, state, action, reward, next_state, done)

            state = next_state

        # (新增) 在 episode 结束后，将总奖励添加到共享列表中
        rewards_list.append(total_reward)
        print(f"Worker {worker_id}, Episode: {i_episode}, Total Reward: {total_reward}")
    
    env.close()

### 5.3 A3C 更新函数

A3C 的更新函数与 A2C 的几乎完全一样。最大的区别在于，**计算梯度和反向传播是作用在全局模型上的**。这样，由本地 worker 产生的“经验”就被用来改进所有 worker 共享的“大脑”。

**注意**：在原始的 A3C 论文中，worker 会累积多步的梯度再一次性更新全局模型，这里为了简化，我们采用了每步都更新的方式。这在实践中通常被称为 A2C 的并行实现，但其核心思想与 A3C 是一致的。

In [9]:
# A3C 更新函数
def update_a3c(global_model, optimizer, local_model, state, action, reward, next_state, done, gamma=0.99):
    # 将输入数据转换为 PyTorch Tensors
    state = torch.FloatTensor(state).unsqueeze(0)
    next_state = torch.FloatTensor(next_state).unsqueeze(0)
    action = torch.LongTensor([action])
    reward = torch.FloatTensor([reward])
    done = torch.FloatTensor([int(done)])

    # 计算 Advantage (优势)
    # 注意：价值的计算是基于全局模型的预测，因为它被认为是更准确的
    _, next_state_value = global_model(next_state)
    _, state_value = global_model(state)
    advantage = reward + gamma * next_state_value * (1 - done) - state_value

    # 计算 Actor 和 Critic 的 Loss
    # 使用本地模型计算动作概率，因为这是产生该动作的模型
    probs, _ = local_model(state)
    log_prob = torch.log(probs.squeeze(0)[action])
    actor_loss = -(log_prob * advantage.detach()).mean()
    critic_loss = advantage.pow(2).mean()
    loss = actor_loss + critic_loss

    # 关键：计算梯度并更新全局模型的参数
    optimizer.zero_grad()
    loss.backward()
    # 将本地模型的梯度复制到全局模型
    for local_param, global_param in zip(local_model.parameters(), global_model.parameters()):
        if global_param.grad is not None:
            break
        global_param._grad = local_param.grad
    optimizer.step()

    # 更新后，将全局模型的参数复制回本地模型
    local_model.load_state_dict(global_model.state_dict())

### 5.4 评估

In [None]:
def evaluate(model, n_episodes=5):
    """用训练好的模型进行评估并渲染游戏画面"""
    print("\n--- Starting Evaluation ---")
    # (重要) 创建一个可以渲染画面的新环境
    env = gym.make('CartPole-v1', render_mode='human')
    input_shape = env.observation_space.shape[0]
    n_actions = env.action_space.n

    # 加载模型的最终状态
    final_model = ActorCritic(input_shape, n_actions)
    final_model.load_state_dict(model.state_dict())
    final_model.eval() # 设置为评估模式

    for i in range(n_episodes):
        state, _ = env.reset()
        done = False
        total_reward = 0
        while not done:
            state_tensor = torch.FloatTensor(state)
            with torch.no_grad(): # 评估时不需要计算梯度
                probs, _ = final_model(state_tensor)
                # 选择概率最高的动作，而不是随机采样
                action = torch.argmax(probs).item()

            next_state, reward, done, _, _ = env.step(action)
            state = next_state
            total_reward += reward
            time.sleep(0.02) # 短暂暂停，方便肉眼观察

        print(f"Evaluation Episode {i+1}, Total Reward: {total_reward}")

    env.close()
    print("--- Evaluation Finished ---")

## 6. 运行 A3C 训练

现在，我们可以把所有部分组合起来，运行 A3C 训练。下面的代码块只有在作为主脚本运行时才会执行。

**注意**: 在 Jupyter Notebook 中直接运行多进程代码可能会遇到问题。

将 A3C 的相关代码保存为一个 `.py` 文件然后运行 。

In [10]:
if __name__ == '__main__':
    # 设置多进程启动方法，'spawn' 在多平台下更稳定
    try:
        mp.set_start_method('spawn')
        print("Multiprocessing start method set to 'spawn'.")
    except RuntimeError:
        pass  # 如果已经设置过了，会抛出 RuntimeError，可以忽略

    # 你可以在这里选择运行哪个模型的训练
    # print("--- Training AC Model ---")
    # train_ac()

    print("\n--- Training A3C Model ---")
    setup_a3c_and_run()

Multiprocessing start method set to 'spawn'.

--- Training A3C Model ---
Starting 16 worker processes...
--- A3C Training Finished ---
