In [1]:
import gymnasium as gym
from stable_baselines3 import PPO, SAC
from stable_baselines3.common.callbacks import EvalCallback
import imageio
import torch
import math
import numpy as np
from tqdm import tqdm
import mujoco


In [2]:
def quaternion_angle_error(q1, q2):
    # q1, q2 shape: (4,), format: [w, x, y, z]
    dot = np.abs(np.dot(q1, q2))  # 绝对值，避免 2π 距离问题
    dot = np.clip(dot, -1.0, 1.0)
    angle = 2 * np.arccos(dot)
    return abs(angle)


class CustomRewardWrapper(gym.Wrapper):
    def __init__(self, env):
        super().__init__(env)
        self.w_upward = 1
        self.w_upfoot = 1
        self.w_uphead = 1

        self.target_orientation = np.array([1.0, 0.0, 0.0, 0.0])  # w, x, y, z

    def step(self, action):
        obs, reward, terminated, truncated, info = self.env.step(action)
        # ------------------------
        # 1. 躯干挺直 torso orientation (w, x, y, z) 在 obs[1:5]
        torso_ori = obs[1:5]
        ori_error = quaternion_angle_error(torso_ori, self.target_orientation)
        rew_upward = np.clip(self.w_upward * ori_error, 0, 5)
        
        # 2. 保持脑袋高度
        rew_uphead = np.clip(self.w_uphead * (self.env.unwrapped.data.xipos[1][2] - 0.9), -5, 5)
        # 3. 鼓励抬脚
        footreward = 0
        if  0.2 <= self.env.unwrapped.data.xipos[6][2] <= 0.6:
            footreward += 1
        if  0.2 <= self.env.unwrapped.data.xipos[9][2] <= 0.6:
            footreward += 1
        rew_foot = self.w_upfoot * footreward # [0, 2]

        new_reward = reward + rew_foot + rew_uphead - rew_upward
        return obs, new_reward, terminated, truncated, info


In [None]:
# 创建 Ant 环境l
env = gym.make('Humanoid-v5')
env = CustomRewardWrapper(env)
print(f"obs space: {env.observation_space}, action space: {env.action_space}")


# 评估环境
eval_env = gym.make("Humanoid-v5")   # 你的环境
eval_env = CustomRewardWrapper(eval_env)

log_dir = "./tb_log/"
total_timesteps = 480000  # 总训练步数

eval_callback = EvalCallback(
    eval_env,
    best_model_save_path=log_dir+"sac_best_model",  # 自动保存最优模型的目录
    log_path=log_dir,                        # 保存评估日志
    eval_freq=10000,                          # 每 1 万步评估一次
    n_eval_episodes=5,                         # 每次评估 5 个 episode
    deterministic=True,                        # 评估时用确定性策略
    render=False
)

In [None]:
policy_kwargs = dict(
    net_arch=dict(pi=[64], qf=[256, 64]), # 每个隐藏层的神经元数量，也可以写成 [400, 300] 等
    activation_fn=torch.nn.ReLU  # 激活函数，可改为 torch.nn.Tanh
)

def warm_sin_lr(progress_remaining: float) -> float:
    """
    progress_remaining: 1 -> 0
    假设总共训练T步：
      - 前10% steps: 线性从 1e-5 升到 3e-4 (warm-up)
      - 之后: 按正弦方式从 3e-4 降到 1e-5
    """
    lr_min = 5e-5   
    lr_max = 1e-3
    warm_ratio = 0.01  # 10% warm-up

    # progress_remaining=1 -> step=0; progress_remaining=0 -> step=end
    progress_done = 1.0 - progress_remaining

    if progress_done < warm_ratio:
        # warm-up: 线性上升
        return lr_min + (lr_max - lr_min) * (progress_done / warm_ratio)
    else:
        # sin下降：这里重新归一化到[0,1]
        x = (progress_done - warm_ratio) / (1 - warm_ratio)
        return lr_min + (lr_max - lr_min) * math.sin((1 - x) * math.pi / 2)


model = SAC(
    "MlpPolicy",
    env,
    verbose=1,
    learning_rate= warm_sin_lr,  # 2e-4,
    buffer_size=1_000_000,      # 经验回放缓冲区大小. 这个参数PPO没有
    batch_size=256,             # 默认256
    tau=0.005,                  # 软更新系数
    gamma=0.99,                 # 折扣因子
    train_freq=1,               # 每步都训练，采集多少个环境步的数据后训练一次
    gradient_steps=1,           # 对replayBuffer中读取到的batch，进行多少次梯度下降更新
    tensorboard_log=log_dir,   # 日志目录
    policy_kwargs=policy_kwargs,  # 将自定义结构传进去
)

# 训练模型, total_timesteps自行调整
model.learn(total_timesteps=total_timesteps, 
            tb_log_name="sac_lr", 
            progress_bar=True,
            callback=eval_callback)
# 保存模型
model.save("sac1")

In [None]:
unwrapped_env = env.unwrapped

mj_model = unwrapped_env.model  # MjModel

print(f"qpos size: {mj_model.nq}, qvel size: {mj_model.nv}, num_joints: {mj_model.njnt}")  # 都是旋转关节，所以这一项都相同
print(f"actuator size: {mj_model.nu}, ctrl_size: {unwrapped_env.data.ctrl.shape}")  # actuators and muscle
print(f"body_size: {mj_model.nbody}, body pos size: {unwrapped_env.data.xipos.shape}")  # nbody, 3

# print(f"action range: {env.action_space.low} to {env.action_space.high}")


qpos_idx = 0
for joint_id in range(mj_model.njnt):
    joint_name = mujoco.mj_id2name(mj_model, mujoco.mjtObj.mjOBJ_JOINT, joint_id)
    joint_type = mj_model.jnt_type[joint_id]
    
    # 根据关节类型确定占用的 qpos 数量
    if joint_type == mujoco.mjtJoint.mjJNT_FREE:    # 自由关节：7个qpos (x,y,z,qw,qx,qy,qz)
        for i, coord in enumerate(['x', 'y', 'z', 'qw', 'qx', 'qy', 'qz']):
            print(f"qpos[{qpos_idx:2d}]: {joint_name}_{coord}")
            qpos_idx += 1
    elif joint_type == mujoco.mjtJoint.mjJNT_HINGE:  # 铰链关节：1个qpos
        print(f"qpos[{qpos_idx:2d}]: {joint_name}")
        qpos_idx += 1
    elif joint_type == mujoco.mjtJoint.mjJNT_SLIDE:  # 滑动关节：1个qpos  
        print(f"qpos[{qpos_idx:2d}]: {joint_name}")
        qpos_idx += 1


data  = mujoco.MjData(mj_model)
mujoco.mj_forward(mj_model, data)  # 必须有这一步
pos = data.xipos            # shape = (nbody, 3)
x, y, z = pos[:, 0], pos[:, 1], pos[:, 2]
names = [mujoco.mj_id2name(mj_model, mujoco.mjtObj.mjOBJ_BODY, i)
         for i in range(mj_model.nbody)]

for i, name in enumerate(names):
    print(f"body[{i:2d}]: {name}, pos=({x[i]:.3f}, {y[i]:.3f}, {z[i]:.3f})")


env.close()

In [None]:
model = SAC(
    "MlpPolicy",
    env,
    verbose=1,
    learning_rate=1e-4,
    buffer_size=1_000_000,      # 经验回放缓冲区大小. 这个参数PPO没有
    batch_size=256,             # 默认256
    tau=0.005,                  # 软更新系数
    gamma=0.99,                 # 折扣因子
    train_freq=1,               # 每步都训练，采集多少个环境步的数据后训练一次
    gradient_steps=1,           # 对replayBuffer中读取到的batch，进行多少次梯度下降更新
    tensorboard_log="./tb_log/",   # 日志目录
)

# 训练模型, total_timesteps自行调整
model.learn(total_timesteps=24000, tb_log_name="sac", progress_bar=True )
# 保存模型
model.save("humanoid_sac_upward")

### 测试模型效果

In [None]:
print(gym.__file__)

In [None]:
# 使用可视化界面记录显示SAC测试结果
# 加载模型
model = SAC.load("./tb_log/sac_best_model/best_model.zip")
# 创建测试环境
env = gym.make("Humanoid-v5", max_episode_steps=100000, render_mode="human")

for i in range(5):
    # 测试模型
    state, info = env.reset()
    cum_reward = 0
    for _ in tqdm(range(3000)):
        env.render()
        action, _ = model.predict(state)
        next_state, reward, terminated, truncated, info = env.step(action)
        cum_reward += reward
        if terminated or truncated:
            print("累积奖励: ", cum_reward)
            break
            
        state = next_state

env.close()

You are using a GLFW raw input patch. This is not the official GLFW library.


 13%|█▎        | 389/3000 [00:11<01:17, 33.79it/s]


KeyboardInterrupt: 

: 

In [6]:
print(env._max_episode_steps)  # 输出默认最大 episode 步数

1000


### 测试代码

In [None]:
# 测试humanoid环境
env = gym.make("Humanoid-v5", render_mode="human")  # human 模式会弹出窗口
state, info = env.reset(seed=0)


for j in range(5):
    reward_sum = 0
    state, info = env.reset()
    for i in range(1000):
        action = env.action_space.sample()  # actor选择动作
        next_state, reward, terminated, truncated, info = env.step(action)
        reward_sum += reward
        env.render()
        if terminated or truncated:
            print("Total reward:", reward_sum)
            break
        else:
            state = next_state


env.close()

In [None]:
!tensorboard --logdir ./ppo_ant_tb/
!tensorboard --logdir ./tb_log/
# then 然后浏览器打开 http://localhost:6006