## environment check

In [6]:
import os

os.environ["MUJOCO_GL"] = "egl"
os.environ["PYOPENGL_PLATFORM"] = "egl"

In [7]:
import gymnasium_robotics  # 关键：导入才会注册 env
import gymnasium as gym
print([k for k in gym.registry.keys() if "Fetch" in k][:50])


['FetchSlide-v1', 'FetchSlide-v4', 'FetchPickAndPlace-v1', 'FetchPickAndPlace-v4', 'FetchReach-v1', 'FetchReach-v4', 'FetchPush-v1', 'FetchPush-v4', 'FetchSlideDense-v1', 'FetchSlideDense-v4', 'FetchPickAndPlaceDense-v1', 'FetchPickAndPlaceDense-v4', 'FetchReachDense-v1', 'FetchReachDense-v4', 'FetchPushDense-v1', 'FetchPushDense-v4']


In [8]:
import mujoco, gymnasium
print("mujoco:", mujoco.__version__)
print("gymnasium:", gymnasium.__version__)

mujoco: 2.3.7
gymnasium: 1.2.2


In [9]:
import sys
print(sys.executable)

/workspace/projects/openpi/.venv/bin/python3


## step1
子目标 1：选定“最小可跑”的模拟环境与任务
验收标准：你能用 random policy 让机器人动起来，并能 reset/step 跑完整个 episode。
- 目标 1.1：选模拟器（建议先 MuJoCo 系）
  - 原则：连续控制 + 简单任务 + 有现成 gym 接口

In [10]:
import gymnasium as gym
from gymnasium.wrappers import RecordVideo

env = gym.make("FetchReach-v4", render_mode="rgb_array")
env = RecordVideo(env, video_folder="videos", episode_trigger=lambda ep: True)

obs, info = env.reset()
print_value = False
for _ in range(200):
    action = env.action_space.sample()
    obs, reward, terminated, truncated, info = env.step(action)
    if not print_value:
        print(obs)
        print(reward)
        print(terminated)
        print(truncated)
        print(info)
        print_value = True
    if terminated or truncated:
        break
        
env.close()

  logger.warn(


{'observation': array([ 1.36332594e+00,  7.40685537e-01,  5.56886409e-01,  0.00000000e+00,
        0.00000000e+00,  1.85473173e-02, -7.14499277e-03,  1.91560858e-02,
        1.74330338e-04,  1.18430677e-04]), 'achieved_goal': array([1.36332594, 0.74068554, 0.55688641]), 'desired_goal': array([1.25189598, 0.70962972, 0.65148934])}
-1.0
False
False
{'is_success': 0.0}


In [3]:
import gymnasium as gym
from gymnasium.wrappers import RecordVideo

env = gym.make("Reacher-v4", render_mode="rgb_array")
env = RecordVideo(env, video_folder="videos", episode_trigger=lambda ep: True)

obs, info = env.reset()
print_value = False
for _ in range(200):
    action = env.action_space.sample()
    obs, reward, terminated, truncated, info = env.step(action)
    if not print_value:
        print(obs)
        print(reward)
        print(terminated)
        print(truncated)
        print(info)
        print_value = True
    if terminated or truncated:
        break
        
env.close()

  logger.deprecation(


[ 0.99984884  0.99667307  0.01738671 -0.08150336  0.04898681 -0.02666616
  2.78217946 -1.11384841  0.16077139  0.02134506  0.        ]
-0.7345963868279263
False
False
{'reward_dist': -0.16151435700843775, 'reward_ctrl': -0.57308203}


In [11]:
from IPython.display import Video
Video("videos/rl-video-episode-0.mp4", embed=True)

- 目标 1.2：选任务（建议先从最简单开始）
  - reach（到达目标点）/ push（推物体到目标）/ pick（抓取）

In [None]:
import os
import numpy as np
import gymnasium as gym
from gymnasium.wrappers import RecordVideo

In [None]:
# ---------- utils ----------
def _obs_dict(obs):
    # gymnasium 有时 reset 返回 (obs, info)；这里假设你传进来的就是 obs
    assert isinstance(obs, dict), f"Expected dict obs, got {type(obs)}"
    return obs

def extract_positions(obs):
    """尽量用最通用的方式从 Fetch obs 里取出 grip_pos / object_pos / goal"""
    obs = _obs_dict(obs)
    vec = np.asarray(obs["observation"], dtype=np.float32)
    goal = np.asarray(obs["desired_goal"], dtype=np.float32)

    grip_pos = vec[0:3].copy()

    # 旧版 Fetch: object_pos 通常在 vec[3:6]（Reach 可能没有 object 或者占位）
    object_pos = None
    if vec.shape[0] >= 6:
        object_pos = vec[3:6].copy()

    return grip_pos, object_pos, goal

def clip_action(a, action_space):
    a = np.asarray(a, dtype=np.float32)
    return np.clip(a, action_space.low, action_space.high)

def vec3_to_action(delta_xyz, gripper=0.0, gain=10.0, max_step=0.05):
    """
    把目标位移向量转成 action (dx,dy,dz,gripper)
    - gain: 把位移放大成速度指令
    - max_step: 限制每步最大幅度，避免抖动/爆冲
    """
    d = np.asarray(delta_xyz, dtype=np.float32)
    # 计算向量 d 的长度（模、范数） 
    # 1e-8是为了防止 norm 为0
    norm = np.linalg.norm(d) + 1e-8
    # 限幅：方向不变，长度最多 max_step
    d = d / norm * min(norm, max_step)
    a_xyz = gain * d
    return np.array([a_xyz[0], a_xyz[1], a_xyz[2], gripper], dtype=np.float32)

In [None]:
# ---------- scripted policies ----------
class ReachPolicy:
    def act(self, obs):
        print("Reaching")
        grip_pos, _, goal = extract_positions(obs)
        delta = goal - grip_pos
        return vec3_to_action(delta, gripper=0.0)

In [59]:
import numpy as np

class PickAndPlacePolicyFrozen:
    """
    修复版：抓取成功后“冻结”关键目标点，避免 lift 目标跟着 obj_pos 一起漂移导致卡住。
    phases:
      0 pregrasp -> 1 descend -> 2 grasp(close N steps)
      3 lift(to fixed lift_target)
      4 move_above_goal(to fixed above_goal_target)
      5 descend_to_goal(to fixed at_goal_target)
      6 hold (keep holding a few steps)
      7 release(optional)
      8 retreat
    """
    def __init__(
        self,
        pregrasp_h=0.10,
        grasp_h=0.01,
        lift_h=0.20,
        above_goal_h=0.10,
        grasp_close_steps=8,
        hold_steps=12,
        allow_release=True,          # 先给你开着；如果 goal 在空中你可以改成 False
        retreat_h=0.12,
    ):
        self.pregrasp_h = pregrasp_h
        self.grasp_h = grasp_h
        self.lift_h = lift_h
        self.above_goal_h = above_goal_h
        self.grasp_close_steps = grasp_close_steps
        self.hold_steps = hold_steps
        self.allow_release = allow_release
        self.retreat_h = retreat_h

        self.reset()

    def reset(self):
        self.phase = 0
        self._close_count = 0
        self._hold_count = 0

        # 冻结用
        self.obj_locked = None
        self.goal_locked = None

    @staticmethod
    def _dist(p, q):
        return float(np.linalg.norm(p - q))

    def act(self, obs):
        grip_pos, obj_pos, goal = extract_positions(obs)

        if obj_pos is None:
            delta = goal - grip_pos
            return vec3_to_action(delta, gripper=0.0)

        # 计算一些动态点（用于 phase 0/1：还没抓住前）
        above_obj = obj_pos.copy()
        above_obj[2] = obj_pos[2] + self.pregrasp_h

        at_obj = obj_pos.copy()
        at_obj[2] = obj_pos[2] + self.grasp_h

        # phase 0: 到物体上方
        if self.phase == 0:
            if self._dist(grip_pos, above_obj) < 0.04:
                self.phase = 1
            return vec3_to_action(above_obj - grip_pos, gripper=+1.0)

        # phase 1: 下去到抓取高度
        if self.phase == 1:
            if self._dist(grip_pos, at_obj) < 0.03:
                self.phase = 2
                self._close_count = 0
            return vec3_to_action(at_obj - grip_pos, gripper=+1.0)

        # phase 2: 连续闭合若干步（然后“冻结” obj/goal，并进入 lift）
        if self.phase == 2:
            self._close_count += 1
            if self._close_count >= self.grasp_close_steps:
                # ✅ 冻结：以后都用这一帧的 obj_pos / goal
                self.obj_locked = obj_pos.copy()
                self.goal_locked = goal.copy()
                self.phase = 3
            return np.array([0.0, 0.0, 0.0, -1.0], dtype=np.float32)

        # 下面开始：全部基于冻结点算固定路标
        obj0 = self.obj_locked if self.obj_locked is not None else obj_pos
        g0 = self.goal_locked if self.goal_locked is not None else goal

        lift_target = obj0.copy()
        lift_target[2] = obj0[2] + self.lift_h

        above_goal = g0.copy()
        above_goal[2] = g0[2] + self.above_goal_h

        at_goal = g0.copy()
        # 保持 goal 原本高度（不要随便 +0.02）
        at_goal[2] = g0[2]

        # phase 3: 抬起到固定 lift_target（不会再漂移）
        if self.phase == 3:
            if self._dist(grip_pos, lift_target) < 0.05:
                self.phase = 4
            return vec3_to_action(lift_target - grip_pos, gripper=-1.0)

        # phase 4: 移到目标上方（固定 above_goal）
        if self.phase == 4:
            if self._dist(grip_pos, above_goal) < 0.06:
                self.phase = 5
            return vec3_to_action(above_goal - grip_pos, gripper=-1.0)

        # phase 5: 下降到目标点（固定 at_goal）
        if self.phase == 5:
            if self._dist(grip_pos, at_goal) < 0.05:
                self.phase = 6
                self._hold_count = 0
            return vec3_to_action(at_goal - grip_pos, gripper=-1.0)

        # phase 6: 在目标处保持几步（让环境有机会判 success）
        if self.phase == 6:
            self._hold_count += 1
            if self._hold_count >= self.hold_steps:
                self.phase = 7 if self.allow_release else 8
            return vec3_to_action(at_goal - grip_pos, gripper=-1.0, gain=4.0, max_step=0.03)

        # phase 7: 松手（可选）
        if self.phase == 7:
            self.phase = 8
            return np.array([0.0, 0.0, 0.0, +1.0], dtype=np.float32)

        # phase 8: 抬起撤离，避免碰到物体
        retreat = at_goal.copy()
        retreat[2] = at_goal[2] + self.retreat_h
        g = +1.0 if self.allow_release else -1.0
        return vec3_to_action(retreat - grip_pos, gripper=g)


In [30]:
import numpy as np

class PushPolicyLineSafe:
    """
    按你图的共线+顺序策略，并且在“去物体后方”时做绕行避免撞走物体：
    - APPROACH: lift -> move_xy -> descend （始终避开物体）
    - ALIGN:    在后方微调到共线 & 顺序正确
    - PUSH:     推，同时纠偏；丢失姿态则回到 APPROACH
    """

    APPROACH_LIFT = 0
    APPROACH_XY   = 1
    APPROACH_DOWN = 2
    ALIGN         = 3
    PUSH          = 4

    def __init__(
        self,
        behind_dist=0.12,
        push_z_offset=0.02,     # 推的高度：obj_z + offset
        safe_lift=0.15,         # 绕行时抬高到 obj_z + safe_lift（关键参数）
        line_eps=0.015,
        order_margin=0.02,
        contact_dist=0.03,
        push_advance=0.03,
        anchor_gain=0.7,
        perp_gain=1.0,
        reacquire_line_eps=0.03,
        reacquire_behind=0.04,
        gain_approach=6.0,
        gain_align=5.0,
        gain_push=4.0,
        max_step_approach=0.06,
        max_step_align=0.05,
        max_step_push=0.04,
        # “离物体太近就先抬高”的安全阈值
        danger_xy=0.06,
    ):
        self.behind_dist = behind_dist
        self.push_z_offset = push_z_offset
        self.safe_lift = safe_lift

        self.line_eps = line_eps
        self.order_margin = order_margin
        self.contact_dist = contact_dist

        self.push_advance = push_advance
        self.anchor_gain = anchor_gain
        self.perp_gain = perp_gain

        self.reacquire_line_eps = reacquire_line_eps
        self.reacquire_behind = reacquire_behind

        self.gain_approach = gain_approach
        self.gain_align = gain_align
        self.gain_push = gain_push

        self.max_step_approach = max_step_approach
        self.max_step_align = max_step_align
        self.max_step_push = max_step_push

        self.danger_xy = danger_xy

        self.phase = self.APPROACH_LIFT

    def reset(self):
        self.phase = self.APPROACH_LIFT

    @staticmethod
    def _unit(v):
        n = np.linalg.norm(v) + 1e-8
        return v / n

    def _line_dist_xy(self, p, a, u):
        # 只在 xy 平面算点到直线距离
        p2 = p.copy(); a2 = a.copy(); u2 = u.copy()
        p2[2] = 0.0; a2[2] = 0.0; u2[2] = 0.0
        u2 = u2 / (np.linalg.norm(u2) + 1e-8)
        proj = a2 + np.dot(p2 - a2, u2) * u2
        return np.linalg.norm((p2 - proj)[:2])

    def _perp_vec_xy(self, p, a, u):
        # 横向纠偏向量（xy），z=0
        p2 = p.copy(); a2 = a.copy(); u2 = u.copy()
        p2[2] = 0.0; a2[2] = 0.0; u2[2] = 0.0
        u2 = u2 / (np.linalg.norm(u2) + 1e-8)
        proj = a2 + np.dot(p2 - a2, u2) * u2
        perp = p2 - proj
        v = np.zeros(3, dtype=np.float32)
        v[:2] = perp[:2]
        return v

    def _order_ok(self, grip, obj, u_fwd):
        # gripper 在 object 后方：dot(grip-obj, u_fwd) < -margin
        return np.dot(grip - obj, u_fwd) < -self.order_margin

    def act(self, obs):
        grip_pos, obj_pos, goal = extract_positions(obs)

        if obj_pos is None:
            delta = goal - grip_pos
            return vec3_to_action(delta, gripper=0.0, gain=self.gain_approach, max_step=self.max_step_approach)

        # 方向定义
        u_fwd  = self._unit(goal - obj_pos)   # obj -> goal
        u_back = -u_fwd                       # goal -> obj 反向

        # 你要的 behind 点（在延长线上，且顺序正确）
        push_z = obj_pos[2] + self.push_z_offset
        behind = obj_pos + self.behind_dist * u_back
        behind[2] = push_z

        # 直线：从 goal 指向 obj
        line_a = goal.copy()
        line_u = self._unit(obj_pos - goal)

        # 当前共线/顺序
        line_dist = self._line_dist_xy(grip_pos, line_a, line_u)
        order_ok = self._order_ok(grip_pos, obj_pos, u_fwd)

        # 推阶段丢失姿态 -> 回到绕行 approach
        if self.phase == self.PUSH:
            behind_amount = -np.dot(grip_pos - obj_pos, u_fwd)  # 在后方多少
            if line_dist > self.reacquire_line_eps or behind_amount < self.reacquire_behind:
                self.phase = self.APPROACH_LIFT

        # 安全判断：如果 gripper 离物体 xy 太近，任何“平面运动”都可能撞到物体 -> 先抬高
        near_obj_xy = np.linalg.norm((grip_pos - obj_pos)[:2]) < self.danger_xy

        # ---------------- APPROACH：lift -> move_xy -> down ----------------
        safe_z = obj_pos[2] + self.safe_lift

        if self.phase == self.APPROACH_LIFT:
            # 抬高到 safe_z（即使现在不近，也统一抬高会更稳）
            target = grip_pos.copy()
            target[2] = safe_z
            delta = target - grip_pos

            if abs(grip_pos[2] - safe_z) < 0.02:
                self.phase = self.APPROACH_XY

            return vec3_to_action(delta, gripper=0.0, gain=self.gain_approach, max_step=self.max_step_approach)

        if self.phase == self.APPROACH_XY:
            # 在高处水平移动到 behind 的 xy（z 保持 safe_z）
            target = behind.copy()
            target[2] = safe_z
            delta = target - grip_pos
            delta[2] = safe_z - grip_pos[2]

            # 到位后下降
            if np.linalg.norm((grip_pos - target)[:2]) < 0.03:
                self.phase = self.APPROACH_DOWN

            # 如果离物体太近，仍然优先保持高处（其实这阶段就是高处）
            return vec3_to_action(delta, gripper=0.0, gain=self.gain_approach, max_step=self.max_step_approach)

        if self.phase == self.APPROACH_DOWN:
            # 在 behind 的 xy 上，下降到 push_z
            target = behind.copy()
            target[2] = push_z
            delta = target - grip_pos

            if abs(grip_pos[2] - push_z) < 0.02 and np.linalg.norm((grip_pos - behind)[:2]) < 0.04:
                self.phase = self.ALIGN

            # 如果下降过程中又靠物体太近（很少见），回去 lift
            if near_obj_xy and grip_pos[2] < safe_z - 0.03:
                self.phase = self.APPROACH_LIFT

            return vec3_to_action(delta, gripper=0.0, gain=self.gain_approach, max_step=self.max_step_approach)

        # ---------------- ALIGN：在 behind 附近微调，确保共线 + 顺序 ----------------
        if self.phase == self.ALIGN:
            # 横向纠偏（把自己压回直线），同时锁住 z
            perp = self._perp_vec_xy(grip_pos, line_a, line_u)

            target = behind.copy()
            target[2] = push_z
            delta = (target - grip_pos) - self.perp_gain * perp
            delta[2] = push_z - grip_pos[2]

            close_to_behind = np.linalg.norm((grip_pos - behind)[:2]) < 0.03 and abs(grip_pos[2] - push_z) < 0.03
            if close_to_behind and (line_dist < self.line_eps) and order_ok:
                self.phase = self.PUSH

            # 如果对齐时又离物体太近（可能绕行失败/被推回来了），重新绕行
            if near_obj_xy and not order_ok:
                self.phase = self.APPROACH_LIFT

            return vec3_to_action(delta, gripper=0.0, gain=self.gain_align, max_step=self.max_step_align)

        # ---------------- PUSH：保持在后方锚点 + 沿方向推 + 横向纠偏 ----------------
        anchor = obj_pos + self.contact_dist * u_back
        anchor[2] = push_z

        perp = self._perp_vec_xy(grip_pos, line_a, line_u)
        forward = self.push_advance * u_fwd
        anchor_term = self.anchor_gain * (anchor - grip_pos)

        delta = anchor_term + forward - self.perp_gain * perp
        delta[2] = push_z - grip_pos[2]

        return vec3_to_action(delta, gripper=0.0, gain=self.gain_push, max_step=self.max_step_push)


In [None]:
import os
import gymnasium as gym
from gymnasium.wrappers import RecordVideo

def force_set_max_episode_steps(env, max_steps: int):
    """
    遍历所有 wrapper，找到 TimeLimit 并把其 _max_episode_steps 改掉。
    同时也尝试改 env.spec.max_episode_steps（如果存在）。
    """
    # 1) 改 spec（有些环境会读这个）
    try:
        if env.spec is not None:
            env.spec.max_episode_steps = max_steps
    except Exception:
        pass

    # 2) 遍历 wrappers，改掉所有 TimeLimit 的 _max_episode_steps
    cur = env
    changed = 0
    while True:
        # gymnasium.wrappers.TimeLimit 的典型字段名就是 _max_episode_steps
        if hasattr(cur, "_max_episode_steps"):
            try:
                cur._max_episode_steps = max_steps
                changed += 1
            except Exception:
                pass

        if hasattr(cur, "env"):
            cur = cur.env
        else:
            break

    return changed


In [68]:
def run(env_id, policy, video_dir="videos", max_steps=200, episodes=1, name_prefix=None, seed=0, stop_when_success=True):
    os.makedirs(video_dir, exist_ok=True)

    # ✅ 优先：make 时覆盖（如果 gymnasium 支持这个参数，就会直接替换内层 TimeLimit）
    try:
        env = gym.make(env_id, render_mode="rgb_array", max_episode_steps=max_steps)
        made_with_override = True
    except TypeError:
        env = gym.make(env_id, render_mode="rgb_array")
        made_with_override = False

    # ✅ 兜底：不管 make 是否覆盖成功，都强制把所有 TimeLimit 改成 max_steps
    changed = force_set_max_episode_steps(env, max_steps)

    # ✅ 打印确认：现在到底是谁在控制 max_episode_steps
    print(f"[env setup] made_with_override={made_with_override}, patched_TimeLimit_wrappers={changed}")
    # 你也可以再确认 env.spec
    try:
        print(f"[env setup] env.spec.max_episode_steps={getattr(env.spec, 'max_episode_steps', None)}")
    except Exception:
        pass

    env = RecordVideo(
        env,
        video_folder=video_dir,
        name_prefix=name_prefix or env_id.replace("/", "_"),
        episode_trigger=lambda ep: True,
    )

    for ep in range(episodes):
        obs, info = env.reset(seed=seed + ep)

        # ✅ 正确重置 policy
        if hasattr(policy, "reset"):
            policy.reset()
        elif hasattr(policy, "phase"):
            policy.phase = 0

        for t in range(max_steps):
            action = clip_action(policy.act(obs), env.action_space)
            obs, reward, terminated, truncated, info = env.step(action)
            if terminated or truncated or (stop_when_success and info.get('is_success')==1.0):
                break

        print(
            f"[{env_id}] ep={ep} steps={t+1} "
            f"terminated={terminated} truncated={truncated} "
            f"is_success={info.get('is_success', None)} reward={reward}"
        )

    env.close()


In [69]:
run("FetchReach-v4", ReachPolicy(), name_prefix="reach_scripted")

[env setup] made_with_override=True, patched_TimeLimit_wrappers=1
[env setup] env.spec.max_episode_steps=200
Reaching
Reaching
Reaching
Reaching
Reaching
Reaching
Reaching
[FetchReach-v4] ep=0 steps=7 terminated=False truncated=False is_success=1.0 reward=-0.0


In [70]:
from IPython.display import Video, display

display(Video("videos/reach_scripted-episode-0.mp4", embed=True))

In [71]:
run("FetchPush-v4", PushPolicyLineSafe(), max_steps=200, name_prefix="push_scripted")

[env setup] made_with_override=True, patched_TimeLimit_wrappers=1
[env setup] env.spec.max_episode_steps=200
[FetchPush-v4] ep=0 steps=131 terminated=False truncated=False is_success=1.0 reward=-0.0


In [72]:
from IPython.display import Video, display

display(Video("videos/push_scripted-episode-0.mp4", embed=True))


In [73]:
run("FetchPickAndPlace-v4", PickAndPlacePolicyFrozen(), max_steps=200, name_prefix="pickplace_scripted")

[env setup] made_with_override=True, patched_TimeLimit_wrappers=1
[env setup] env.spec.max_episode_steps=200
[FetchPickAndPlace-v4] ep=0 steps=57 terminated=False truncated=False is_success=1.0 reward=-0.0


In [74]:
from IPython.display import Video, display

display(Video("videos/pickplace_scripted-episode-0.mp4", embed=True))

- 目标 1.3：定义 episode 成功条件（success signal）
  - 距离阈值、物体到目标、抓取检测等

In [75]:
import os
import gymnasium as gym
import gymnasium_robotics
from gymnasium.wrappers import RecordVideo

gym.register_envs(gymnasium_robotics)

def run_kitchen_random(video_dir="videos", max_steps=200, tasks=("microwave",), seed=0):
    os.makedirs(video_dir, exist_ok=True)

    env = gym.make(
        "FrankaKitchen-v1",
        render_mode="rgb_array",
        tasks_to_complete=list(tasks),   # 任务列表：先 1 个任务，跑通再加
    )

    env = RecordVideo(
        env,
        video_folder=video_dir,
        name_prefix=f"kitchen_{'_'.join(tasks)}",
        episode_trigger=lambda ep: True,
    )

    obs, info = env.reset(seed=seed)

    for t in range(max_steps):
        action = env.action_space.sample()  # 9维动作：关节速度 [-1,1]（含夹爪）:contentReference[oaicite:1]{index=1}
        obs, reward, terminated, truncated, info = env.step(action)
        if terminated or truncated:
            break

    print("steps:", t+1, "terminated:", terminated, "truncated:", truncated)
    env.close()

run_kitchen_random(tasks=("microwave",), max_steps=300)


steps: 280 terminated: False truncated: True


In [76]:
from IPython.display import Video, display

display(Video("videos/kitchen_microwave-episode-0.mp4", embed=True))