In [1]:
import gymnasium as gym
import math
import random
import matplotlib
import matplotlib.pyplot as plt
from collections import namedtuple, deque
from itertools import count

import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F

env = gym.make("CartPole-v1")  # 环境，这里是一个简单的游戏

# set up matplotlib
is_ipython = 'inline' in matplotlib.get_backend()
if is_ipython:
    from IPython import display

plt.ion()

# if GPU is to be used
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [2]:
class DQN(nn.Module):

    def __init__(self, n_observations, n_actions):  # n_observations: 环境的状态数，n_actions: 环境的动作数
        super(DQN, self).__init__()
        self.layer1 = nn.Linear(n_observations, 128)
        self.layer2 = nn.Linear(128, 128)
        self.layer3 = nn.Linear(128, n_actions)

    # Called with either one element to determine next action, or a batch
    # during optimization. Returns tensor([[left0exp,right0exp]...]).
    def forward(self, x):
        x = F.relu(self.layer1(x))
        x = F.relu(self.layer2(x))
        return self.layer3(x)

In [3]:
# 用于存储环境的状态
Transition = namedtuple('Transition',
                        ('state', 'action', 'next_state', 'reward')) 

# 使用Python标准库中的namedtuple来创建一个名为Transition的数据类型，用于表示存储在回放内存中的单个转换。
# 在强化学习中，我们通常使用Transition这个数据类型来表示一个状态转移，
# 即从一个状态执行一个动作，得到下一个状态和相应的奖励值。
# 例如，如果我们正在训练一个Q-learning算法，那么每次执行一个动作后，
# 我们将会得到一个Transition对象，其中包含了当前状态、动作、下一个状态以及相应的奖励值。
# 这些Transition对象可以用于更新Q值函数，从而实现强化学习算法的训练。


# 创建一个名为ReplayMemory的类，用于存储环境的状态。
class ReplayMemory(object):

    def __init__(self, capacity):
        self.memory = deque([], maxlen=capacity) # deque是Python标准库中的一个双端队列，它可以高效地实现插入和删除操作。

    def push(self, *args):
        """Save a transition"""
        self.memory.append(Transition(*args)) # 将一个Transition对象添加到双端队列中

    def sample(self, batch_size):
        return random.sample(self.memory, batch_size) # 从双端队列中随机抽取一个batch_size大小的样本

    def __len__(self):
        return len(self.memory) # 返回双端队列的长度

# 这段代码定义了一个ReplayMemory类，用于存储和管理强化学习算法中的经验回放数据。
# 经验回放是一种常用的技术，用于训练深度强化学习模型。
# 它的主要思想是将智能体（agent）在环境中采集到的经验存储在一个回放缓冲区（replay buffer）中，
# 然后从中随机抽样一批数据用于模型的训练   




In [4]:
# BATCH_SIZE is the number of transitions sampled from the replay buffer
# GAMMA is the discount factor as mentioned in the previous section
# EPS_START is the starting value of epsilon
# EPS_END is the final value of epsilon
# EPS_DECAY controls the rate of exponential decay of epsilon, higher means a slower decay
# TAU is the update rate of the target network
# LR is the learning rate of the ``AdamW`` optimizer
BATCH_SIZE = 128
GAMMA = 0.99
EPS_START = 0.9
EPS_END = 0.05
EPS_DECAY = 1000
TAU = 0.005
LR = 1e-4

# Get number of actions from gym action space
# 行为空间的大小，即动作的个数
n_actions = env.action_space.n   
    # 如果环境是CartPole-v0，那么动作空间是一个离散的二元空间（左、右），因此n_actions将被设置为2
    # 如果环境是Pendulum-v0，那么动作空间是一个连续的一维空间，包括从-2到2的连续值。
        # 在这种情况下，n_actions将被设置为一个Box对象，其中包含动作空间的上下限
# Get the number of state observations
state, info = env.reset()
# state是一个包含4个元素的列表，分别表示小车的水平位置、速度、杆子的角度以及角速度
# n_observations是一个整数，表示环境的状态数
n_observations = len(state)

# 策略网络和目标网络
policy_net = DQN(n_observations, n_actions).to(device) # 将policy_net模型转换为PyTorch的张量
target_net = DQN(n_observations, n_actions).to(device) # 将target_net模型转换为PyTorch的张量
# 将target_net模型的参数复制到policy_net模型中
target_net.load_state_dict(policy_net.state_dict())

optimizer = optim.AdamW(policy_net.parameters(), lr=LR, amsgrad=True) # AdamW优化器，只优化policy_net模型的参数
    # AdamW是Adam优化器的一种变体，它在Adam的基础上增加了一个权重衰减项，用于防止过拟合
memory = ReplayMemory(10000) # ReplayMemory类的实例化


steps_done = 0
# 在强化学习中，通常有两个神经网络：
    # 一个是策略网络（policy network），用于预测智能体在当前状态下采取每个动作的概率；
    # 另一个是值函数网络（value function network），用于估计每个状态的价值或动作的价值。
# 在使用深度强化学习算法时，我们需要对这两个神经网络的参数进行优化。
# 在这段代码中，只对策略网络`policy_net`的参数进行了优化，而没有对值函数网络`target_net`的参数进行优化。
    # 这是因为在这里使用了一个特殊的算法，即DDPG（Deep Deterministic Policy Gradient），
    # 它是一种策略优化算法，可以直接优化策略网络的参数，而无需使用值函数网络。
    
# 具体来说，DDPG算法的基本思想是在连续动作空间中，将策略网络的输出作为智能体采取的动作，
    # 同时使用一个Q值函数网络来估计每个状态和动作对的Q值，从而进行策略优化。
    
# 在DDPG算法中，策略网络和Q值函数网络是相互独立的，它们的优化目标不同，因此在训练过程中需要分别更新它们的参数。
# 具体来说，在DDPG算法的训练过程中，首先从经验回放缓冲区中随机采样一批数据，使用Q值函数网络来估计每个状态和动作对的Q值，
# 并计算出每个动作的梯度。然后，将这个梯度传递给策略网络，通过反向传播来更新策略网络的参数。
# 在这个过程中，只有策略网络的参数被更新，而值函数网络的参数保持不变。
# 因此，在这段代码中，只对策略网络`policy_net`的参数进行了优化，而值函数网络`target_net`的参数保持不变。


def select_action(state):
    global steps_done                                       # 首先使用一个全局计数器steps_done来记录当前已经采取了多少步
    sample = random.random()
    eps_threshold = EPS_END + (EPS_START - EPS_END) * \
        math.exp(-1. * steps_done / EPS_DECAY)
    steps_done += 1
    if sample > eps_threshold:
        with torch.no_grad():
            # t.max(1) will return the largest column value of each row.
            # second column on max result is index of where max element was
            # found, so we pick action with the larger expected reward.
            return policy_net(state).max(1)[1].view(1, 1)
    else:
        return torch.tensor([[env.action_space.sample()]], device=device, dtype=torch.long)


# select_action 用于根据当前状态选择一个动作，一个动态的随机选择策略

# 函数采用了epsilon-greedy策略，在一定概率下随机选择动作，以便探索新的动作，提高策略的多样性
# 函数的输入参数是当前状态state，输出是一个包含所选动作的张量
# 计算一个epsilon值。
# 这个epsilon值初始时比较大，随着训练的进行而逐渐减小，
# 以控制探索和利用之间的平衡。如果一个随机数大于epsilon值，
# 函数将使用当前策略网络来选择动作，否则将随机选择一个动作。
# 这样，策略网络可以在一定程度上探索新的动作，提高策略的多样性。



episode_durations = []


def plot_durations(show_result=False):  # 用于绘制训练过程中智能体的累计奖励。函数的输入参数show_result表示是否显示训练结果
    plt.figure(1)
    durations_t = torch.tensor(episode_durations, dtype=torch.float)
    if show_result:
        plt.title('Result')
    else:
        plt.clf()
        plt.title('Training...')
    plt.xlabel('Episode')
    plt.ylabel('Duration')
    plt.plot(durations_t.numpy())
    # Take 100 episode averages and plot them too
    if len(durations_t) >= 100:
        means = durations_t.unfold(0, 100, 1).mean(1).view(-1)
        means = torch.cat((torch.zeros(99), means))
        plt.plot(means.numpy())

    plt.pause(0.001)  # pause a bit so that plots are updated
    if is_ipython:
        if not show_result:
            display.display(plt.gcf())
            display.clear_output(wait=True)
        else:
            display.display(plt.gcf())

In [6]:
def optimize_model():
    if len(memory) < BATCH_SIZE:
        return
    transitions = memory.sample(BATCH_SIZE)
    # Transpose the batch (see https://stackoverflow.com/a/19343/3343043 for
    # detailed explanation). This converts batch-array of Transitions
    # to Transition of batch-arrays.
    batch = Transition(*zip(*transitions))

    # Compute a mask of non-final states and concatenate the batch elements
    # (a final state would've been the one after which simulation ended)
    non_final_mask = torch.tensor(tuple(map(lambda s: s is not None,
                                          batch.next_state)), device=device, dtype=torch.bool)
    non_final_next_states = torch.cat([s for s in batch.next_state
                                                if s is not None])
    state_batch = torch.cat(batch.state)
    action_batch = torch.cat(batch.action)
    reward_batch = torch.cat(batch.reward)

    # Compute Q(s_t, a) - the model computes Q(s_t), then we select the
    # columns of actions taken. These are the actions which would've been taken
    # for each batch state according to policy_net
    state_action_values = policy_net(state_batch).gather(1, action_batch)

    # Compute V(s_{t+1}) for all next states.
    # Expected values of actions for non_final_next_states are computed based
    # on the "older" target_net; selecting their best reward with max(1)[0].
    # This is merged based on the mask, such that we'll have either the expected
    # state value or 0 in case the state was final.
    next_state_values = torch.zeros(BATCH_SIZE, device=device)
    with torch.no_grad():
        next_state_values[non_final_mask] = target_net(non_final_next_states).max(1)[0]
    # Compute the expected Q values
    expected_state_action_values = (next_state_values * GAMMA) + reward_batch

    # Compute Huber loss
    criterion = nn.SmoothL1Loss()
    loss = criterion(state_action_values, expected_state_action_values.unsqueeze(1))

    # Optimize the model
    optimizer.zero_grad()
    loss.backward()
    # In-place gradient clipping
    torch.nn.utils.clip_grad_value_(policy_net.parameters(), 100)
    optimizer.step()

In [10]:
if torch.cuda.is_available():
    num_episodes = 600
else:
    num_episodes = 50

for i_episode in range(num_episodes):
    # Initialize the environment and get it's state
    state, info = env.reset()
    state = torch.tensor(state, dtype=torch.float32, device=device).unsqueeze(0)
    for t in count():  # count()是Python中内置的一个函数，它可以用于生成一个无限迭代器，该迭代器返回一个整数序列，从0开始，每次递增1。
        action = select_action(state)  # 根据当前状态选择动作
        observation, reward, terminated, truncated, _ = env.step(action.item()) # 执行动作，获得下一个状态、奖励、终止信号和截断信号
        reward = torch.tensor([reward], device=device) # 将奖励转换为张量
        done = terminated or truncated # 如果游戏终止或者截断，done为True，否则为False

        if terminated: # 如果游戏终止，将下一个状态置为None
            next_state = None
        else: # 否则，将下一个状态转换为张量
            next_state = torch.tensor(observation, dtype=torch.float32, device=device).unsqueeze(0)

        # Store the transition in memory  将当前状态、动作、下一个状态和奖励存储到经验回放池中
        memory.push(state, action, next_state, reward)

        # Move to the next state
        state = next_state

        # Perform one step of the optimization (on the policy network)
        optimize_model() # 优化网络

        # Soft update of the target network's weights
        # θ′ ← τ θ + (1 −τ )θ′
        target_net_state_dict = target_net.state_dict() # 获取目标网络的参数
        policy_net_state_dict = policy_net.state_dict() # 获取策略网络的参数
        for key in policy_net_state_dict: # 目标网络的软更新（soft update）
            target_net_state_dict[key] = policy_net_state_dict[key]*TAU + target_net_state_dict[key]*(1-TAU)
        target_net.load_state_dict(target_net_state_dict)

        if done:
            episode_durations.append(t + 1)
            plot_durations()
            break

print('Complete')
plot_durations(show_result=True)
plt.ioff()
plt.show()

KeyboardInterrupt: 

In [8]:
torch.cuda.device_count()

8

In [9]:
torch.cuda.get_device_name(0)

'A100-SXM4-40GB'