In [1]:
import numpy as np
import gymnasium as gym
from gymnasium import spaces
import pygame
import time

#使用PPO算法进行训练自定义游戏环境。游戏环境很简单是二维的，设置为一个小球从天上某个地方自由落体（高度一定，水平位置随机），通过控制左右施加的力让小球落到指定地面范围内。

In [None]:
"""游戏环境需要实现的方法和返回值
__init__(self, render_mode=None)：
必须定义action_space和observation_space
应该处理render_mode参数

reset(self, seed=None, options=None)：
必须调用super().reset(seed=seed)
必须返回(observation, info)元组

step(self, action)：
必须返回(observation, reward, terminated, truncated, info)元组

render(self)：
如果支持渲染，必须处理不同的渲染模式
对于"rgb_array"模式，必须返回RGB数组

close(self)：
必须清理所有资源
"""

In [7]:
class BallLandingEnv(gym.Env):
    """
    自定义环境:控制一个从高处落下的球，使其落在指定区域内
              @:要求定义的参数内容通过“## 编号.xxxx”标出
              @: 参考性的注释通过“#@ 编号.xxx”标出
    
    状态空间:
        - 球的x坐标 (水平位置)
        - 球的y坐标 (垂直位置)
        - 球的x方向速度
        - 球的y方向速度
        - 目标区域的中心x坐标
    
    动作空间:
        - 连续值，表示施加在球上的水平力
    """
    
    ## 5.metadata是官方建议定义的，包含渲染模式和FPS
    metadata = {"render_modes": ["human", "rgb_array"], "render_fps": 30}
    
    def __init__(self, render_mode=None):
        ## 1.必须调用父类的__init__方法
        super(BallLandingEnv, self).__init__()
        
        # 物理参数
        self.gravity = 9.8  # 重力加速度
        self.mass = 1.0  # 球的质量
        self.time_step = 0.05  # 时间步长
        self.max_force = 10.0  # 最大水平力
        self.friction = 0.01  # 空气阻力系数
        
        # 环境参数
        self.screen_width = 600
        self.screen_height = 800
        self.ball_radius = 15
        self.initial_height = 700  # 初始高度
        self.target_width = 100  # 目标区域宽度
        
        ## 2.必须定义动作空间(action_space)
        # 动作空间: 连续值，范围为[-1, 1]，表示施加在球上的水平力
        self.action_space = spaces.Box(
            low=-1.0, 
            high=1.0, 
            shape=(1,), #@ 1.动作是一个一维向量，包含一个值（标量），表示水平力的大小和方向。例如，action = [0.5] 表示向右施加0.5的力，action = [-0.7] 表示向左施加0.7的力
            dtype=np.float32 
        )

        """
        观察向量:观察空间定义为 [x, y, vx, vy, target_x]，按照填写顺序，包含 5 个值:
        x:小球的水平位置（单位:像素或任意单位）。
        y:小球的垂直位置（高度）。
        vx:小球的水平速度。
        vy:小球的垂直速度。
        target_x:目标区域的水平中心位置（例如，目标范围 [-1, 1] 的中心可能是 0）。
        """
        
        ## 3.必须定义观察空间(observation_space)
        # 观察空间: [x, y, vx, vy, target_x]
        self.observation_space = spaces.Box(
            low=np.array([0, 0, -30, -30, 0]), 
            high=np.array([self.screen_width, self.screen_height, 30, 30, self.screen_width]), 
            dtype=np.float32
        )
        
        ## 4.渲染相关的官方要求
        self.render_mode = render_mode
        self.screen = None
        self.clock = None
        
        # 初始化状态
        self.reset()
        
    def reset(self, seed=None, options=None):
        ## 6.必须定义：调用父类reset方法设置随机种子
        super().reset(seed=seed)
        
        # 初始化球的位置和速度
        self.x = self.np_random.uniform(self.ball_radius, self.screen_width - self.ball_radius)
        self.y = self.ball_radius
        self.vx = 0.0
        self.vy = 0.0
        
        # 设置目标区域
        self.target_x = self.np_random.uniform(
            self.target_width / 2, 
            self.screen_width - self.target_width / 2
        )
        
        # 计算观察
        self.state = np.array([self.x, self.y, self.vx, self.vy, self.target_x])
        
        # 设置步数
        self.steps = 0
        self.max_steps = 500
        
        ## 7. 渲染初始化
        if self.render_mode == "human" and self.screen is None:
            pygame.init()
            pygame.display.init()
            self.screen = pygame.display.set_mode((self.screen_width, self.screen_height))
            pygame.display.set_caption("Ball Landing Environment")
        if self.render_mode == "human" and self.clock is None:
            self.clock = pygame.time.Clock()
            
        ## 8. 返回初始观察和信息
        return self.state, {}
        
    def step(self, action):
        # 获取动作（水平力）
        force_x = float(action[0]) * self.max_force
        
        # 计算加速度
        ax = force_x / self.mass - self.friction * self.vx / self.mass
        ay = self.gravity - self.friction * self.vy / self.mass
        
        # 更新速度
        self.vx += ax * self.time_step
        self.vy += ay * self.time_step
        
        # 更新位置
        self.x += self.vx * self.time_step
        self.y += self.vy * self.time_step
        
        # 检查边界碰撞
        if self.x < self.ball_radius:
            self.x = self.ball_radius
            self.vx = -self.vx * 0.8  # 反弹损失一些能量
        elif self.x > self.screen_width - self.ball_radius:
            self.x = self.screen_width - self.ball_radius
            self.vx = -self.vx * 0.8
            
        # 检查是否到达地面
        terminated = False
        reward = 0
        
        if self.y >= self.screen_height - self.ball_radius:
            self.y = self.screen_height - self.ball_radius
            terminated = True
            
            # 计算与目标的距离
            distance_to_target = abs(self.x - self.target_x)
            
            # 根据距离给予奖励
            if distance_to_target < self.target_width / 2:
                # 在目标区域内
                normalized_distance = distance_to_target / (self.target_width / 2)
                reward = 10.0 * (1.0 - normalized_distance)  # 越靠近中心奖励越高
            else:
                # 不在目标区域内
                reward = -1.0 - min(distance_to_target / 100, 9.0)  # 距离越远惩罚越大，最低-10
        
        # 更新状态
        self.state = np.array([self.x, self.y, self.vx, self.vy, self.target_x])
        
        # 增加步数
        self.steps += 1
        
        # 检查是否达到最大步数
        truncated = self.steps >= self.max_steps
        
        ## 9. 渲染，如果渲染模式是human，则渲染环境
        if self.render_mode == "human":
            self.render()
            
        ## 10.必须计算自己后续的state, reward, terminated, truncated, info并进行返回。info：额外信息字典，这里是{}
        return self.state, reward, terminated, truncated, {}
    
    """
    渲染模式检查: 检查self.render_mode并相应地处理。

    返回值：
    如果渲染模式为"rgb_array",返回RGB数组。
    如果渲染模式为"human",更新显示并控制帧率
    """
    ## 11.渲染模式检查
    def render(self):
        if self.render_mode is None:
            return
            
        if self.render_mode == "rgb_array":
            return self._render_frame()
            
        # 渲染模式为human时
        self._render_frame()
        pygame.display.flip()
        self.clock.tick(self.metadata["render_fps"])
    
    def _render_frame(self):
        ## 12.初始化pygame（如果尚未初始化）
        if self.screen is None:
            pygame.init()
            pygame.display.init()
            self.screen = pygame.display.set_mode((self.screen_width, self.screen_height))
            
        if self.clock is None:
            self.clock = pygame.time.Clock()
            
        # 填充背景
        self.screen.fill((255, 255, 255))
        
        # 绘制目标区域
        target_left = self.target_x - self.target_width / 2
        pygame.draw.rect(
            self.screen, 
            (0, 255, 0), 
            pygame.Rect(target_left, self.screen_height - 10, self.target_width, 10)
        )
        
        # 绘制球
        pygame.draw.circle(
            self.screen,
            (255, 0, 0),
            (int(self.x), int(self.y)),
            self.ball_radius
        )
        
        ## 13.定义：如果是rgb_array模式，返回屏幕的像素数组
        if self.render_mode == "rgb_array":
            return np.transpose(
                np.array(pygame.surfarray.pixels3d(self.screen)), 
                axes=(1, 0, 2)
            )
    
    def close(self):
        if self.screen is not None:
            pygame.display.quit()
            pygame.quit()
            self.screen = None

In [8]:
"""
测试脚本
"""
import gymnasium as gym
from stable_baselines3 import PPO
from stable_baselines3.common.env_util import make_vec_env
from stable_baselines3.common.evaluation import evaluate_policy
import numpy as np

import gymnasium as gym
from gymnasium.envs.registration import register

# 导入环境，在ipynote中不需要
#from ball_landing_env import BallLandingEnv

# 注册环境
register(
    id="BallLanding-v0",
    entry_point="__main__:BallLandingEnv", #ball_landing_env:BallLandingEnv python文件导入环境，在ipynote中不需要
)


# 识别模型保存名
import os

def get_next_model_filename(base_name="ppo_ball_landing"):
    """
    检查现有的模型文件并返回下一个可用的序号文件名
    例如：如果已存在 ppo_ball_landing_0.zip，则返回 ppo_ball_landing_1
    """
    i = 0
    while True:
        filename = f"{base_name}_{i}"
        # 检查文件是否存在（注意：SB3保存时会自动添加.zip扩展名）
        if not os.path.exists(f"{filename}.zip"):
            return filename
        i += 1



PPO 算法参数详解：

Python
model = PPO(
    "MlpPolicy",
    env,
    verbose=1,
    learning_rate=0.0003,
    n_steps=2048,
    batch_size=64,
    n_epochs=10,
    gamma=0.99,
    gae_lambda=0.95,
    clip_range=0.2,
    tensorboard_log="./tensorboard_logs/"
)
参数详解
1. "MlpPolicy"
含义：指定使用多层感知机(Multilayer Perceptron)作为策略网络的架构。
说明：这是一个适用于连续动作空间的默认策略网络。MLP策略使用全连接神经网络来处理状态输入并输出动作。
何时调整：如果您的环境有特殊的观察空间（如图像），可能需要使用其他策略如"CnnPolicy"。
1. env
含义：训练环境的实例。
说明：这是您之前创建的向量化环境 make_vec_env("BallLanding-v0", n_envs=8)。
何时调整：通常不需要调整此参数，它就是您的游戏环境。
1. verbose=1
含义：控制训练过程中输出信息的详细程度。
可选值：
0：不输出任何信息
1：输出训练的基本信息
2：输出更详细的训练信息
何时调整：如果您想看到更多/更少的训练日志信息。
1. learning_rate=0.0003
含义：策略网络和价值网络的学习率。
说明：控制每次参数更新的步长大小。0.0003是一个适合多数连续控制任务的值。
何时调整：
如果训练不稳定或收敛太慢，可以尝试较小的值（如0.0001）
如果学习速度太慢，可以尝试较大的值（如0.001）
1. n_steps=2048
含义：每次更新前收集的环境步数。
说明：在执行一次策略更新前，每个环境会执行的步骤数。总样本数 = n_steps × n_envs。
何时调整：
增大这个值可以使训练更稳定，但会减慢训练速度
减小这个值可以加快训练，但可能使训练不稳定
1. batch_size=64
含义：每次梯度更新使用的小批量样本数。
说明：从收集的轨迹中随机抽取的样本数量，用于计算每次更新的梯度。
何时调整：
增大这个值可以使梯度估计更准确，但会增加计算成本
通常建议设为n_steps的因子，以确保所有样本都被使用
1. n_epochs=10
含义：对每批数据执行策略优化的轮数。
说明：对同一批数据重复学习的次数。PPO的一个特点是可以多次使用同一批数据进行多轮更新。
何时调整：
增大可以提高样本效率，但可能导致过拟合
减小可以防止过拟合，但可能降低样本效率
1. gamma=0.99
含义：折扣因子。
说明：用于计算未来奖励的折现值，决定了代理对短期和长期奖励的权衡。
取值范围：0到1之间，接近1表示更看重长期奖励。
何时调整：
对于需要长期规划的任务，设置接近1的值（如0.99或0.999）
对于只需要短期反馈的任务，可以设置较小的值（如0.9）
1. gae_lambda=0.95
含义：广义优势估计(GAE)的λ参数。
说明：控制偏差和方差之间的权衡，用于计算优势函数。
取值范围：0到1之间。
何时调整：
较高的值（接近1）会导致更高的方差但偏差更小
较低的值会导致更低的方差但偏差更大
1.  clip_range=0.2
含义：PPO算法中的裁剪参数。
说明：限制策略更新的幅度，防止过大的策略变化导致训练不稳定。
何时调整：
减小这个值可以使训练更稳定，但学习速度会变慢
增大这个值可以加快学习，但可能导致训练不稳定
1.  tensorboard_log="./tensorboard_logs/"
含义：TensorBoard日志的保存路径。
说明：指定训练过程中的指标（如奖励、损失等）的保存位置，可以使用TensorBoard可视化这些指标。
何时调整：如果您希望将日志保存在不同的位置，或者不想使用TensorBoard（设为None）。
如何根据您的环境调整这些参数
对于初学者：建议先使用默认参数，观察训练效果。

学习率调整：

如果训练不稳定（奖励波动大），尝试减小学习率
如果学习太慢，尝试增加学习率
样本效率调整：

增加 n_epochs 和 batch_size 可以提高样本效率
但要注意可能导致过拟合和训练不稳定
长期vs短期奖励：

如果您的任务需要长期规划，保持 gamma 接近1
例如在您的球落地环境中，小球需要规划整个下落轨迹，所以较高的gamma值（0.99）是合适的
探索与利用平衡：

PPO算法会自动调整探索程度，但您可以通过调整 clip_range 间接影响探索行为
较大的 clip_range 允许更剧烈的策略变化，可能导致更多探索

In [9]:
def main():
    # 创建向量化环境以提高训练效率
    # env = make_vec_env("BallLanding-v0", n_envs=8) # python文件使用，ipynb不需要
    env = gym.make("BallLanding-v0", render_mode=None)
    
    # 创建PPO模型
    model = PPO(
        "MlpPolicy",
        env,
        verbose=1,
        learning_rate=0.0003,
        n_steps=2048,
        batch_size=32,
        n_epochs=10,
        gamma=0.99,
        gae_lambda=0.95,
        clip_range=0.2,
        tensorboard_log="./tensorboard_logs/"
    )
    
    # 训练模型
    total_timesteps = 1_000_000
    model.learn(total_timesteps=total_timesteps, progress_bar=True)
    
    # 按序号保存模型
    model_filename = get_next_model_filename()
    model.save(model_filename)
    print(f"模型已保存为: {model_filename}.zip")
    
    # 评估模型
    eval_env = gym.make("BallLanding-v0", render_mode="human")
    mean_reward, std_reward = evaluate_policy(model, eval_env, n_eval_episodes=10, deterministic=True)
    
    print(f"Mean reward: {mean_reward:.2f} +/- {std_reward:.2f}")
    
    # 展示训练后的模型
    obs, _ = eval_env.reset()
    
    for _ in range(1000):
        action, _states = model.predict(obs, deterministic=True)
        obs, reward, terminated, truncated, info = eval_env.step(action)
        
        if terminated or truncated:
            obs, _ = eval_env.reset()
            
    eval_env.close()

In [10]:
if __name__ == "__main__":
    main()

In [19]:
#测试
import gymnasium as gym
from stable_baselines3 import PPO
import time


def test():
    # 加载训练好的模型
    model = PPO.load("ppo_ball_landing")
    
    # 创建环境
    env = gym.make("BallLanding-v0", render_mode="human")
    
    # 运行多个回合
    for episode in range(10):
        obs, _ = env.reset()
        episode_reward = 0
        done = False
        
        while not done:
            # 根据当前观察预测动作
            action, _states = model.predict(obs, deterministic=True)
            
            # 执行动作
            obs, reward, terminated, truncated, info = env.step(action)
            episode_reward += reward
            
            # 检查是否完成
            done = terminated or truncated
            
            # 添加一点延迟，使观察更容易
            time.sleep(0.01)
        
        print(f"Episode {episode + 1}: Reward = {episode_reward:.2f}")
    
    env.close()

In [21]:
test()

#查看训练数据用tensorboard

Episode 1: Reward = 2.41
Episode 2: Reward = -1.57
Episode 3: Reward = -3.60
Episode 4: Reward = 4.91
Episode 5: Reward = 6.20
Episode 6: Reward = 9.00
Episode 7: Reward = -2.42
Episode 8: Reward = -1.70
Episode 9: Reward = 6.38
Episode 10: Reward = -3.11


# 阅读TensorBoard
## rollout数据
1. rollout/ep_len_mean - 平均回合长度
定义：此指标表示每个回合（episode）的平均步数。
技术解释：
在强化学习中，一个"回合"是指从环境重置到终止状态的完整序列
这个指标计算了最近N个回合的平均步数
在 Stable Baselines3 中，默认是对最近100个回合取平均

如何解读曲线：

上升趋势：表明代理能够在环境中存活更长时间，通常是积极的信号
在"保持平衡"类任务中：更长的回合长度直接意味着更好的性能
在有固定终止条件的任务中：可能表示代理学会了避免失败条件
下降趋势：解读取决于环境类型
在"尽快到达目标"类任务中：较短的回合长度可能表示代理学会了更有效的策略
在大多数其他环境中：可能表示代理性能下降或正在探索新策略
稳定值：表示代理的行为模式已经稳定，可能达到了该策略下的最佳表现

1. rollout/ep_rew_mean - 平均回合奖励
定义：此指标表示每个回合获得的平均总奖励。
技术解释：
计算最近N个回合（通常是100个）中每个回合获得的总奖励的平均值
这是评估代理性能的最直接且最重要的指标
在 Stable Baselines3 中实时更新，显示训练过程中的性能变化
如何解读曲线：

上升趋势：表示代理性能正在改善，这几乎总是积极的信号
陡峭上升：快速学习，可能发现了新的有效策略
缓慢上升：渐进改善，通常是稳健学习的标志
平稳趋势：可能表示学习已经达到饱和或局部最优

如果值较高：可能已接近环境的最优性能
如果值较低：可能陷入局部最优，需要调整超参数或探索策略
下降趋势：通常表示问题

暂时下降：可能是探索新策略的过程
持续下降：可能是学习率过高或其他超参数问题

对于球落地环境：
理想的曲线应该从负值或低值开始（初始随机策略）
随着训练进行，应该持续上升
最终应该达到并稳定在一个正的高值，表示代理已学会将球准确落在目标区域

## train数据
1. approx_kl - 近似 KL 散度
含义：测量更新前后策略分布的差异程度。
理想曲线：
通常应该在 0.01 到 0.05 之间
应相对稳定，略有波动但不应该持续增长
偶尔的峰值是正常的，但不应经常出现
解读：
太低（接近0）：策略几乎没有更新，学习停滞
太高（>0.1）：策略变化太剧烈，可能导致训练不稳定
稳定在适当范围：表明策略更新适度，学习正常进行

2. clip_fraction - 裁剪比例
含义：被 PPO 的裁剪机制裁剪的样本比例。
理想曲线：
训练初期可能较高（0.1-0.3）
随着训练进行应逐渐降低并稳定在较低水平（<0.1）
解读：
持续较高（>0.2）：表明clip_range可能设置得太小，限制了学习
几乎为零：可能clip_range太大或学习率太小
从高到低再稳定：理想的模式，表明策略逐渐收敛

3. clip_range - 裁剪范围
含义：PPO 算法中限制策略更新幅度的参数值。
理想曲线：
如果使用固定值（如0.2），应该是一条水平线
如果使用衰减策略，应该是一条平滑下降的曲线
解读：
这通常是一个设置值而非监控指标
确认它是否符合您的预期设置

4. entropy_loss - 熵损失
含义：衡量策略的不确定性/随机性。
理想曲线：
训练初期较高，随后逐渐下降
不应该太快降到接近零
解读：
持续较高：策略保持高随机性，可能在过度探索
快速降至接近零：策略变得过于确定，可能陷入局部最优
缓慢下降：良好的探索-利用权衡，策略逐渐从探索转向利用

5. explained_variance - 解释方差
含义：价值函数预测与实际回报的匹配程度。
理想曲线：
从负值或接近0开始，逐渐上升并稳定在接近1的位置
解读：
接近1：价值函数很好地预测了回报，学习有效
接近0：价值函数仅预测平均回报，没有提供额外信息
负值：价值函数预测比使用平均值更差，表明严重问题

6. loss - 总损失
含义：PPO 的总体损失函数。
理想曲线：
训练初期较高，随后应该逐渐下降并趋于稳定
可能有波动，但总体趋势应下降
解读：
持续下降：学习正常进行
停滞不降：可能遇到学习瓶颈
剧烈波动：训练不稳定，可能需要调整学习率或批量大小

7. policy_gradient_loss - 策略梯度损失
含义：代表策略网络更新的主要损失组件。
理想曲线：
应该是负值（因为 PPO 最大化此值）
训练初期可能波动较大，随后应该变得更稳定
不应有持续上升趋势（变得更正）
解读：
持续变得更负：策略正在改进
趋于零：策略更新变小，可能接近收敛或学习停滞
变为正值或波动剧烈：训练不稳定，需要调整

8. std - 动作标准差
含义：策略输出的动作分布的标准差。
理想曲线：
训练初期较高（更多探索）
随着训练进行应逐渐减小并稳定（更多利用）
不应该太快降到接近零
解读：
保持较高：策略保持高随机性，持续探索
迅速降至接近零：策略变得确定性太快，可能过早收敛
缓慢下降并保持合理水平：良好的探索-利用平衡

9. value_loss - 价值损失
含义：价值函数预测误差的度量。
理想曲线：
训练初期较高，随后应逐渐下降并稳定
可能永远不会降到非常接近零
解读：
持续下降：价值估计在改进
下降后稳定在低水平：价值函数学习良好
波动剧烈或上升：价值估计不稳定或环境本身有高方差
