In [1]:
import pde_control_gym 
import gymnasium as gym
import numpy as np
import math
import matplotlib.pyplot as plt
import time
# 加载一些工具
from utils import set_size
from utils import linestyle_tuple
from utils import load_csv
# use the stable_baselines3 
from stable_baselines3 import PPO
from stable_baselines3 import SAC
from stable_baselines3.common.env_checker import check_env
from stable_baselines3.common.callbacks import CheckpointCallback
# choose the pre-implemented reward function
from pde_control_gym.src import TunedReward1D
import os

envs initialized


In [2]:
# NO NOISE
# lambda state : state 输入状态值返回状态值
def noiseFunc(state):
    return state

# Chebyshev Polynomial Beta Functions
def solveBetaFunction(x, gamma):
    # 先创建一个数组 shape =（len(x),）
    beta = np.zeros(len(x), dtype=np.float32)
    # 遍历赋值
    for idx, val in enumerate(x):
        # 在每一个离散点上计算beta
        beta[idx] = 5 * math.cos(gamma * math.acos(val))
    return beta

# Returns beta functions passed into PDE environment. Currently gamma is always
# set to 7.35, but this can be modified for further problems.
# This function is used to create the beta function for the PDE environment.
def getBetaFunction(nx):
    return solveBetaFunction(np.linspace(0, 1, nx), 7.35)

# Kernel function solver for backstepping
def solveKernelFunction(beta, dx):
    # theta 一个一维数组
    # 创建一个和 theta 一样长度的数组 kappa
    kappa = np.zeros(len(beta))
    # 索引从 0 到 len（theta）- 1，总的长度还是 len（theta）
    for i in range(0, len(beta)):
        kernelIntegral = 0
        # 矩形法离散积分函数值取左端点，所以只积分到 i-1 项
        for j in range(0, i):
            kernelIntegral += (kappa[i-j]*beta[j])*dx
        kappa[i] = kernelIntegral  - beta[i]
        # np.flip 用来翻转数组 （倒序 第一位为k（1））这样做是因为控制器是加权积分，（可以想象成卷积）因为状态是 u = [u(0), ····，u(1)]，对应的离散权重应该是 k =  [k(1), ····，u(0)]    
        # 则U(1,t) = np.sum(u * k) 按位相乘再相加
    return np.flip(kappa)

# Control convolution solver
def solveControl(kernel, u, dx):
    res = 0
    for i in range(len(u)):
        res += kernel[i]*u[i]
    return res*dx

# Set initial condition function here
def getInitialCondition(nx):
    # *是按位乘法
    return np.ones(nx)*np.random.uniform(1, 10)


In [3]:
# Timestep and spatial step for PDE Solver
T = 5
# 0.0001
dt = 1e-4
X = 1
# 0.01
dx = 1e-2

In [4]:
# 定义奖励函数：设置时间步、提前截断的单位时间步惩罚、正常终止的奖励
reward_class =  TunedReward1D(int(round(T/dt)), -1e3, 3e2)

# 先设置一个通用的参数字典
hyperbolicParameters = {
        "T": T, 
        "dt": dt, 
        "X": X,
        "dx": dx, 
        "reward_class": reward_class,
        "normalize":None, 
        "sensing_loc": "full", 
        "control_type": "Dirchilet", 
        "sensing_type": None,
        # 确定传感器返回的测量值是否添加噪声，这里的这个表示精确的返回状态，并未添加任何噪声；（*lambda*构建了一个简单的函数，输入state，返回state）
        "sensing_noise_func": lambda state: state,
        # 用于早期停止的参数
        "limit_pde_state_size": True,
        "max_state_value": 1e10,
        "max_control_value": 20,
        # 传入初始条件的函数
        "reset_init_condition_func": getInitialCondition,
        # 传入计算beta的函数
        "reset_recirculation_func": getBetaFunction,
        # 控制采样频率 数值仿真时需要很小的时间步长，但控制器的接收控制信号无法这么快
        "control_sample_rate": 0.1,
}

# All of the 1D PDE boundary control environments have the same set of optional parameters for ease of use! 

# 通过浅拷贝的方式设置 Backstepping 方法参数字典
hyperbolicParametersBackstepping = hyperbolicParameters.copy()
# 在复制过来的通用基础上修改某些关键参数 ⬇️ Normalize 专为强化学习控制器设计，如果设置为True，则控制器的动作值会被归一化到[-1, 1]之间,并根据"max_control_value"转换为实际控制值
hyperbolicParametersBackstepping["normalize"] = False

# 设置 Rl 参数字典
hyperbolicParametersRL = hyperbolicParameters.copy()
# 需要用到线性化
hyperbolicParametersRL["normalize"] = True

In [5]:
from stable_baselines3.common.vec_env import DummyVecEnv, VecNormalize
from stable_baselines3.common.monitor import Monitor

# 设置路径 （algo_name 和 run 需要更改）
algo_name = "PPO"
run = 1
run_id = f"{algo_name}_run{run}"
log_dir = f"./logs/{run_id}"
# print(run_id) 
# print(log_dir)
model_path = os.path.join(log_dir, "final_model.zip")
vecnorm_path = os.path.join(log_dir, "vecnormalize.pkl")

# =====创建环境=======
eval_env = DummyVecEnv([lambda: Monitor(gym.make("PDEControlGym-TransportPDE1D", **hyperbolicParametersRL))])

# =====加载VecNormalize状态=====
eval_env = VecNormalize.load(vecnorm_path, eval_env)
eval_env.training = False     # 禁止在测试时更新统计量
eval_env.norm_reward = False  # 还原 reward 为原始量纲（如需要）

# ===加载模型===
model_class = {"PPO": PPO, "SAC": SAC}[algo_name]
model = model_class.load(model_path, env=eval_env)

