In [8]:
import gymnasium as gym # 导入gym模块
import numpy as np 
import matplotlib.pyplot as plt
import torch
import math

import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.distributions import Categorical       # 离散概率分布 

from collections import deque                     # 标准库中的一个类，它实现双端队列，允许从两端进行插入和删除操作，而不需要移动元素。
import numpy as np

import imageio                                  # imageio 是一个用于读写图像和视频的 Python 库

import matplotlib.pyplot as plt
import time

    ##########
baseline = 0
losses = []
##########

### REINFORCE

In [7]:
class REINFORCE(nn.Module):
    def __init__(self, s_size, a_size, h_size,optimizer_type = 'Adam', lr = 1e-4, env, n_training_episodes, max_t, gamma, print_every, std_op = False):       
        super(REINFORCE, self).__init__()                # 使子类Policy继承神经网络父类的方法和属性
        ## Creating Layers
        # Hidden Layers
        self.fc1 = nn.Linear(s_size, h_size)
        self.fc2 = nn.Linear(h_size, h_size*2)
        # Output Layer
        self.fc3 = nn.Linear(h_size*2, a_size)
        self.lr = lr
        self.env = env
        self.n_training_episodes = n_training_episodes
        self.max_t = max_t
        self.gamma = gamma
        self.print_every = print_every
        self.std_op = False
        
        # 根据 optimizer_type 选择优化器
        if optimizer_type == 'Adam':
            self.optimizer = optim.Adam(self.parameters(), lr = self.lr)
        elif optimizer_type == 'SGD':
            self.optimizer = optim.SGD(self.parameters(), lr = self.lr)
        else:
            raise ValueError("Invalid optimizer_type. Supported types: 'Adam', 'SGD'.")

    # Define the Forward Pass
    def forward(self, x):
        # apply relu activation function to the hidden layers
        x = F.relu(self.fc1(x))                       # 负值直接为0，可以使稀疏后的模型能够更好地挖掘相关特征，加速学习
        x = F.relu(self.fc2(x))
        # output with softmax
        x = F.softmax(self.fc3(x), dim = 1)           # 归一化指数函数，多分类问题必用； 这里必须是二维输入； dim = 1表示在行维度上使用softmax   , dim=1;
        return x                                      # x[0] 就包含第一个样本在所有类别上的概率分布

    
    # Define the act i.e. given a state, take action
    def act(self, state):
        state = torch.from_numpy(state).float().unsqueeze(0).to(device)   #from_numpy   pytorch搭建的网络不接受一维张量输入,unsqueeze(0)增加一个维度;  因为Net想要的是一批数据，一批自然是两维，但如果只是一个的话就需要扩展维度。 
        probs = self.forward(state).cpu()                                 # 模型的计算可能在 GPU 上进行，将结果移回到 CPU 上进行后续处理或者与其他 CPU 上的数据进行交互。
        m = Categorical(probs)
        action = m.sample()                                   # 从每个动作的概率抽取，返回一个值
        return action.item(), m.log_prob(action)              # 将抽取的动作及其对数概率作为结果返回; action.item()从tensor转换为int标量值

    def reinforce(self):
        # Help us to calculate the score during the training
        scores_deque = deque(maxlen=100)                        # 创建长度为100的双端队列
        scores = []
    
        for i_episode in range(1, self.n_training_episodes+1):
            saved_log_probs = []
            rewards = []
            state = self.env.reset()[0]
            for t in range(self.max_t):
                action, log_prob = self.act(state)
                saved_log_probs.append(log_prob)                       # 储存所有的对数概率   
                state, reward, terminated, truncated, info = self.env.step(action)
                rewards.append(reward)                                 # 把每步的reward都存起来了
                if terminated or truncated:
                    break
           # if t==max_t-1:
           #     rewards = rewards*100000
            scores_deque.append(sum(rewards))                          # 把这次的累计得分存储起来
            scores.append(sum(rewards))                                
    
            returns = deque(maxlen=self.max_t) 
            n_steps = len(rewards)                                    # 这个rewards的长度应该和max_t不一致，因为可能会terminated
            
            for t in range(n_steps)[::-1]:
                 disc_return_t = (returns[0] if len(returns)>0 else 0)     # 要把初始Reward(t=maxt+1)设为0，这样的话Return(t=max)时才是正常的，只等于自己的reward，动态规划才能正常开始.
                 returns.appendleft(self.gamma*disc_return_t + rewards[t]   )  # 动态规划了，计算每个时刻t下的回报。 这里是从后往前算; 从前插入，先进的在后面;
                                                                           # 因为如果正着算的话很繁琐，r+y(rt+1)+...
            if std_op == True:
                # standardizing returns to make traininig more stable       # 将回报标准化，这样就会有正有负，加快收敛。（问题是有时已经最好了，还会去削减一些动作。）
                eps = np.finfo(np.float32).eps.item()                       # smallest representable float  获取 np.float32 类型的最小正浮点数
                returns = torch.tensor(returns)
                returns = (returns - returns.mean()) / (returns.std()+eps)    # added to std deviation to avoid numerical instabilities
    
            policy_loss = []
            for log_prob, disc_return in zip(saved_log_probs, returns):
                policy_loss.append(-log_prob * (disc_return - baseline))               # REINFORCE的损失函数    # 原文是 -log_prob * disc_return，因为默认梯度下降为“-”，加上负号就是梯度上升。   当输出负的回报时，就会导致梯度还是往下走，导致出现该动作的可能降低。 
                                                                         
            losses = []
            policy_loss = torch.cat(policy_loss).sum()                    # 求和，作为一个epoch的结果。
    
            self.optimizer.zero_grad()
            policy_loss.backward()
            self.optimizer.step()
            ########
            losses.append(policy_loss.item())
            ########
            if i_episode % (self.print_every) == 0:       # 就是每10轮打印一次
                print(f"Episode: {i_episode}\tAverage Score: {scores[-1]}\tPolicy Loss: {policy_loss.item()}")   # 打印第几轮训练，平均得分，以及损失
            # print(f"Episode: {i_episode}\tScore: {scores_deque[-1]}\tPolicy Loss: {policy_loss.item()}")   # 打印第几轮训练，平均得分，以及损失
            
        return scores, policy_loss

### Q-learing

In [None]:
class REINFORCE(nn.Module):
    def __init__(self, s_size, a_size, h_size, env, epochs, max_steps, gamma = 0.9, print_every = 10,optimizer_type = 'Adam', lr = 1e-4, std_op = False):       
        super(REINFORCE, self).__init__()                # 使子类Policy继承神经网络父类的方法和属性
        ## Creating Layers
        # Hidden Layers
        self.fc1 = nn.Linear(s_size, h_size)
        self.fc2 = nn.Linear(h_size, h_size*2)
        # Output Layer
        self.fc3 = nn.Linear(h_size*2, a_size)
        self.lr = lr
        self.env = env
        self.epochs = epochs
        self.max_steps = max_steps
        self.gamma = gamma
        self.print_every = print_every
        self.std_op = False
        
        # 根据 optimizer_type 选择优化器
        if optimizer_type == 'Adam':
            self.optimizer = optim.Adam(self.parameters(), lr = self.lr)
        elif optimizer_type == 'SGD':
            self.optimizer = optim.SGD(self.parameters(), lr = self.lr)
        else:
            raise ValueError("Invalid optimizer_type. Supported types: 'Adam', 'SGD'.")

    # Define the Forward Pass
    def forward(self, x):
        # apply relu activation function to the hidden layers
        x = F.relu(self.fc1(x))                       # 负值直接为0，可以使稀疏后的模型能够更好地挖掘相关特征，加速学习
        x = F.relu(self.fc2(x))
        # output with softmax
        x = F.softmax(self.fc3(x), dim = 1)           # 归一化指数函数，多分类问题必用； 这里必须是二维输入； dim = 1表示在行维度上使用softmax   , dim=1;
        return x                                      # x[0] 就包含第一个样本在所有类别上的概率分布

    
    # Define the act i.e. given a state, take action
    def act(self, state):
        state = torch.from_numpy(state).float().unsqueeze(0).to(device)   #from_numpy   pytorch搭建的网络不接受一维张量输入,unsqueeze(0)增加一个维度;  因为Net想要的是一批数据，一批自然是两维，但如果只是一个的话就需要扩展维度。 
        probs = self.forward(state).cpu()                                 # 模型的计算可能在 GPU 上进行，将结果移回到 CPU 上进行后续处理或者与其他 CPU 上的数据进行交互。
        m = Categorical(probs)
        action = m.sample()                                   # 从每个动作的概率抽取，返回一个值
        return action.item(), m.log_prob(action)              # 将抽取的动作及其对数概率作为结果返回; action.item()从tensor转换为int标量值

    def train(self):
        # Help us to calculate the score during the training
        scores_deque = deque(maxlen=100)                        # 创建长度为100的双端队列
        scores = []
        losses = []
        
        for i_episode in range(1, self.epochs+1):
            saved_log_probs = []
            rewards = []
            state = self.env.reset()[0]
            for t in range(self.max_steps):
                action, log_prob = self.act(state)
                saved_log_probs.append(log_prob)                       # 储存所有的对数概率   
                state, reward, terminated, truncated, info = self.env.step(action)
                rewards.append(reward)                                 # 把每步的reward都存起来了
                if terminated or truncated:
                    break
           # if t==max_steps-1:
           #     rewards = rewards*100000
            scores_deque.append(sum(rewards))                          # 把这次的累计得分存储起来
            scores.append(sum(rewards))                                
    
            returns = deque(maxlen=self.max_steps) 
            n_steps = len(rewards)                                    # 这个rewards的长度应该和max_steps不一致，因为可能会terminated
            
            for t in range(n_steps)[::-1]:
                 disc_return_t = (returns[0] if len(returns)>0 else 0)     # 要把初始Reward(t=maxt+1)设为0，这样的话Return(t=max)时才是正常的，只等于自己的reward，动态规划才能正常开始.
                 returns.appendleft(self.gamma*disc_return_t + rewards[t]   )  # 动态规划了，计算每个时刻t下的回报。 这里是从后往前算; 从前插入，先进的在后面;
                                                                           # 因为如果正着算的话很繁琐，r+y(rt+1)+...
            if self.std_op == True:
                # standardizing returns to make traininig more stable       # 将回报标准化，这样就会有正有负，加快收敛。（问题是有时已经最好了，还会去削减一些动作。）
                eps = np.finfo(np.float32).eps.item()                       # smallest representable float  获取 np.float32 类型的最小正浮点数
                returns = torch.tensor(returns)
                returns = (returns - returns.mean()) / (returns.std()+eps)    # added to std deviation to avoid numerical instabilities
    
            policy_loss = []
            for log_prob, disc_return in zip(saved_log_probs, returns):
                policy_loss.append(-log_prob * (disc_return - baseline))               # REINFORCE的损失函数    # 原文是 -log_prob * disc_return，因为默认梯度下降为“-”，加上负号就是梯度上升。   当输出负的回报时，就会导致梯度还是往下走，导致出现该动作的可能降低。 
                                                                         
            policy_loss = torch.cat(policy_loss).sum()                    # 求和，作为一个epoch的结果。
    
            self.optimizer.zero_grad()
            policy_loss.backward()
            self.optimizer.step()
            ########
            losses.append(policy_loss.item())
            ########
            if i_episode % (self.print_every) == 0:       # 就是每10轮打印一次
                print(f"Episode: {i_episode}\tAverage Score: {scores[-1]}\tPolicy Loss: {policy_loss.item()}")   # 打印第几轮训练，平均得分，以及损失
            # print(f"Episode: {i_episode}\tScore: {scores_deque[-1]}\tPolicy Loss: {policy_loss.item()}")   # 打印第几轮训练，平均得分，以及损失
            
        return scores, losses