sapien‰ª£Á†Å

In [None]:
#demo3‰∏ªÁ®ãÂ∫è
import argparse
from ast import parse
from typing import Annotated
import gymnasium as gym
import numpy as np
import sapien.core as sapien
from mani_skill.envs.sapien_env import BaseEnv
import sapien.utils.viewer
import h5py
import json
import mani_skill.trajectory.utils as trajectory_utils
from mani_skill.utils import sapien_utils
from mani_skill.utils.wrappers.record import RecordEpisode
import tyro
from dataclasses import dataclass
import sapien
from mani_skill.agents.base_agent import BaseAgent, Keyframe
from mani_skill.agents.controllers import *
from mani_skill.agents.registration import register_agent
from scipy.spatial.transform import Rotation as R
import mani_skill.envs
import argparse
from mani_skill.agents.controllers.pd_joint_vel import PDJointVelControllerConfig
import mani_skill
from mani_skill.agents.controllers.base_controller import DictController
from copy import deepcopy
from mani_skill.utils import gym_utils
from mani_skill.utils.wrappers import RecordEpisode
from typing import List, Optional, Annotated, Union
import mplib
import trimesh
from mani_skill.envs.scene import ManiSkillScene
from mani_skill.utils.structs.pose import to_sapien_pose
import sapien.physx as physx
from my_panda_motion_planner import MyPandaMotionPlanningSolver  # Êñ∞Â¢û


@register_agent()
class MyPanda(BaseAgent):
    uid = "my_panda"
    urdf_path = "/home/shiqintong/Downloads/wheelchair_description/urdf/inte.urdf"
    keyframes = dict(
        standing=Keyframe(
            pose=sapien.Pose(p=[1.4, -1.0, 0.01], q=R.from_euler("z", 90, degrees=True).as_quat().tolist()),
            qpos=np.array([
                # arm: 7 joints
                -1.75, -0.5, 0.0, 1.0, 0.0, -1.5, 1.4,
                # -1.75, -0.35, -3.3, 1.4, 3.4, -1.1, 1.4,
                # gripper: 6 joints
                0.04,  # finger_joint
                0.04, 0.04, 0.04, 0.04, 0.04  # mimic joints
            ])
        )
    )
    urdf_config = dict(
        _materials=dict(
            gripper=dict(static_friction=5.0, dynamic_friction=5.0, restitution=0.0)
        ),
        link=dict(
            left_inner_finger_pad=dict(
                material="gripper", patch_radius=0.03, min_patch_radius=0.01
            ),
            right_inner_finger_pad=dict(
                material="gripper", patch_radius=0.03, min_patch_radius=0.01
            ),
            left_inner_finger=dict(material="gripper"),
            right_inner_finger=dict(material="gripper"),
        ),
    )
    def is_grasping(self, obj) -> bool:
        # Âú® robot.links ‰∏≠Êü•Êâæ name ‰∏∫ left_inner_finger_pad Âíå right_inner_finger_pad ÁöÑ link
        left_finger_pad = next(link for link in self.robot.links if link.get_name() == "left_inner_finger_pad")
        right_finger_pad = next(link for link in self.robot.links if link.get_name() == "right_inner_finger_pad")

        contacts = self.scene.get_contacts()

        left_contact = any(
            (c.actor0 == obj or c.actor1 == obj) and
            (c.actor0 == left_finger_pad or c.actor1 == left_finger_pad)
            for c in contacts
        )
        right_contact = any(
            (c.actor0 == obj or c.actor1 == obj) and
            (c.actor0 == right_finger_pad or c.actor1 == right_finger_pad)
            for c in contacts
        )

        return left_contact and right_contact


    @property
    def _controller_configs(self):
        arm_joint_names = [
            "joint_1", "joint_2", "joint_3",
            "joint_4", "joint_5", "joint_6", "joint_7",
        ]
        gripper_joint_names = ["finger_joint"]

        arm_pd_joint_pos = PDJointPosControllerConfig(
            arm_joint_names,
            lower=[-3.14] * len(arm_joint_names),
            upper=[3.14] * len(arm_joint_names),
            stiffness=1000,
            damping=100,
            force_limit=100,
            normalize_action=False,
        )
        arm_pd_joint_delta_pos = PDJointPosControllerConfig(
            arm_joint_names,
            lower=[-0.1] * len(arm_joint_names),
            upper=[0.1] * len(arm_joint_names),
            stiffness=1000,
            damping=100,
            force_limit=100,
            use_delta=True,
        )
        gripper_pd_joint_pos = PDJointPosMimicControllerConfig(
            gripper_joint_names,
            lower=[0.00],
            upper=[0.80],
            stiffness=1000,
            damping=100,
            force_limit=100,
        )

        return deepcopy({
            "pd_joint_delta_pos": {
                "arm": arm_pd_joint_delta_pos,
                "gripper": gripper_pd_joint_pos,
            },
            "pd_joint_pos": {
                "arm": arm_pd_joint_pos,
                "gripper": gripper_pd_joint_pos,
            },
        })



@dataclass
class Args:
    env_id: Annotated[str, tyro.conf.arg(aliases=["-e"])] = "Empty-v1"
    obs_mode: str = "none"
    robot_uid: Annotated[str, tyro.conf.arg(aliases=["-r"])] = "my_panda"
    """The robot to use. Robot setups supported for teleop in this script are panda and panda_stick"""
    record_dir: str = "demos"
    """directory to record the demonstration data and optionally videos"""
    save_video: bool = False
    """whether to save the videos of the demonstrations after collecting them all"""
    viewer_shader: str = "rt-fast"
    """the shader to use for the viewer. 'default' is fast but lower-quality shader, 'rt' and 'rt-fast' are the ray tracing shaders"""
    video_saving_shader: str = "rt-fast"
    """the shader to use for the videos of the demonstrations. 'minimal' is the fast shader, 'rt' and 'rt-fast' are the ray tracing shaders"""

    keyframe: Annotated[Optional[str], tyro.conf.arg(aliases=["-k"])] = None
    """Name of keyframe to view"""
    pause: Annotated[bool, tyro.conf.arg(aliases=["-p"])] = False
    """Pause viewer on load"""


def parse_args() -> Args:
    return tyro.cli(Args)

def main(args: Args):
    output_dir = f"{args.record_dir}/{args.env_id}/teleop/"
    env = gym.make(
        args.env_id,
        obs_mode=args.obs_mode,
        control_mode="pd_joint_pos",
        render_mode="rgb_array",
        reward_mode="none",
        enable_shadow=True,
        robot_uids=args.robot_uid,
        viewer_camera_configs=dict(shader_pack=args.viewer_shader)
    )
    env = RecordEpisode(
        env,
        output_dir=output_dir,
        trajectory_name="trajectory",
        save_video=False,
        info_on_video=False,
        source_type="teleoperation",
        source_desc="teleoperation via the click+drag system"
    )
    num_trajs = 0
    seed = 0
    env.reset(
        seed=seed,
    )

    # ÂàùÂßãÂåñ‰ΩçÂßø
    kf = None
    if env.agent.keyframes:
        kf_name = args.keyframe or next(iter(env.agent.keyframes))
        kf = env.agent.keyframes[kf_name]
        env.agent.robot.set_pose(kf.pose)
        if kf.qpos is not None:
            env.agent.robot.set_qpos(kf.qpos)
        if kf.qvel is not None:
            env.agent.robot.set_qvel(kf.qvel)
        print(f"üìå Viewing keyframe: {kf_name}")

    if env.gpu_sim_enabled:
        env.scene._gpu_apply_all()
        env.scene.px.gpu_update_articulation_kinematics()
        env.scene._gpu_fetch_all()

    viewer = env.render()
    viewer.paused = args.pause

    # ‚úÖ ËÆæÁΩÆ robot base pose
    from scipy.spatial.transform import Rotation as R
    base_pose = sapien.Pose(
        [1.4, -3.5, 0.01],  # ÂèØÊåâÂÆûÈôÖÊÉÖÂÜµË∞ÉÊï¥
        R.from_euler("z", 90, degrees=True).as_quat().tolist()
    )


    while True:
        print(f"Collecting trajectory {num_trajs+1}, seed={seed}")
        code = solve(env, debug=False, vis=True)
        if code == "quit":
            num_trajs += 1
            break
        elif code == "continue":
            seed += 1
            num_trajs += 1
            env.reset(
                seed=seed,
            )
            continue
        elif code == "restart":
            env.reset(
                seed=seed,
                options=dict(
                    save_trajectory=False,
                )
            )
    h5_file_path = env._h5_file.filename
    json_file_path = env._json_path
    env.close()
    del env
    print(f"Trajectories saved to {h5_file_path}")
    if args.save_video:
        print(f"Saving videos to {output_dir}")

        trajectory_data = h5py.File(h5_file_path)
        with open(json_file_path, "r") as f:
            json_data = json.load(f)
        env = gym.make(
            args.env_id,
            obs_mode=args.obs_mode,
            control_mode="pd_joint_pos",
            render_mode="rgb_array",
            reward_mode="none",
            robot_uids=args.robot_uid,
            human_render_camera_configs=dict(shader_pack=args.video_saving_shader),
        )
        env = RecordEpisode(
            env,
            output_dir=output_dir,
            trajectory_name="trajectory",
            save_video=True,
            info_on_video=False,
            save_trajectory=False,
            video_fps=30
        )
        for episode in json_data["episodes"]:
            traj_id = f"traj_{episode['episode_id']}"
            data = trajectory_data[traj_id]
            env.reset(**episode["reset_kwargs"])
            env_states_list = trajectory_utils.dict_to_list_of_dicts(data["env_states"])

            env.base_env.set_state_dict(env_states_list[0])
            for action in np.array(data["actions"]):
                env.step(action)

        trajectory_data.close()
        env.close()
        del env

def solve(env: BaseEnv, debug=False, vis=False):
    assert env.unwrapped.control_mode in [
        "pd_joint_pos",
        "pd_joint_pos_vel",
    ], env.unwrapped.control_mode
    robot_has_gripper = False

    if env.unwrapped.robot_uids == "my_panda":  # Êñ∞Â¢ûÊîØÊåÅ MyPanda
        robot_has_gripper = True
        planner = MyPandaMotionPlanningSolver(
            env,
            debug=debug,
            vis=vis,
            base_pose=env.unwrapped.agent.robot.pose,
            visualize_target_grasp_pose=False,
            print_env_info=False,
            joint_acc_limits=0.5,
            joint_vel_limits=0.5,
        )
    else:
        raise ValueError(f"Unsupported robot: {env.unwrapped.robot_uids}")

    viewer = env.render_human()

    last_checkpoint_state = None
    gripper_open = True
    def select_panda_hand():
        viewer.select_entity(sapien_utils.get_obj_by_name(env.agent.robot.links, "tool_frame")._objs[0].entity)  # ‰øÆÊîπ‰∏∫ tool_frame
    select_panda_hand()
    for plugin in viewer.plugins:
        if isinstance(plugin, sapien.utils.viewer.viewer.TransformWindow):
            transform_window = plugin
    while True:
        transform_window.enabled = True
        env.render_human()
        execute_current_pose = False
        if viewer.window.key_press("h"):
            print("""Available commands:
            h: print this help menu
            g: toggle gripper to close/open (if there is a gripper)
            u: move the panda hand up
            j: move the panda hand down
            arrow_keys: move the panda hand in the direction of the arrow keys
            n: execute command via motion planning to make the robot move to the target pose indicated by the ghost panda arm
            c: stop this episode and record the trajectory and move on to a new episode
            q: quit the script and stop collecting data. Save trajectories and optionally videos.
            """)
        elif viewer.window.key_press("q"):
            return "quit"
        elif viewer.window.key_press("c"):
            return "continue"
        elif viewer.window.key_press("n"):
            execute_current_pose = True
        elif viewer.window.key_press("g") and robot_has_gripper:
            print("gripper control")
            if gripper_open:
                print("gripper open - close")
                gripper_open = False
                _, reward, _, _, info = planner.close_gripper()
            else:
                print("gripper close - open")
                gripper_open = True
                _, reward, _, _, info = planner.open_gripper()
            print(f"Reward: {reward}, Info: {info}")
        elif viewer.window.key_press("u"):
            select_panda_hand()
            transform_window.gizmo_matrix = (transform_window._gizmo_pose * sapien.Pose(p=[0, 0, -0.01])).to_transformation_matrix()
            transform_window.update_ghost_objects()
        elif viewer.window.key_press("j"):
            select_panda_hand()
            transform_window.gizmo_matrix = (transform_window._gizmo_pose * sapien.Pose(p=[0, 0, +0.01])).to_transformation_matrix()
            transform_window.update_ghost_objects()
        elif viewer.window.key_press("down"):
            select_panda_hand()
            transform_window.gizmo_matrix = (transform_window._gizmo_pose * sapien.Pose(p=[+0.01, 0, 0])).to_transformation_matrix()
            transform_window.update_ghost_objects()
        elif viewer.window.key_press("up"):
            select_panda_hand()
            transform_window.gizmo_matrix = (transform_window._gizmo_pose * sapien.Pose(p=[-0.01, 0, 0])).to_transformation_matrix()
            transform_window.update_ghost_objects()
        elif viewer.window.key_press("right"):
            select_panda_hand()
            transform_window.gizmo_matrix = (transform_window._gizmo_pose * sapien.Pose(p=[0, -0.01, 0])).to_transformation_matrix()
            transform_window.update_ghost_objects()
        elif viewer.window.key_press("left"):
            select_panda_hand()
            transform_window.gizmo_matrix = (transform_window._gizmo_pose * sapien.Pose(p=[0, +0.01, 0])).to_transformation_matrix()
            transform_window.update_ghost_objects()
        if execute_current_pose:
            # # ÊõøÊç¢ÂßøÊÄÅÔºöËÆæÁΩÆ‰∏∫Ê∞¥Âπ≥ÂêëÈáèÔºàÁªï x/y/z ËΩ¥ÁöÑÊóãËΩ¨ËßíÂ∫¶Ôºâ
            # target_pose = transform_window._gizmo_pose * sapien.Pose([0, 0, 0.1])
            # r = R.from_euler('xyz', [0, 0, 0])  # Êàñ [0, 0, np.pi/2] ÂÖ∑‰ΩìÁúã‰Ω†ÊÉ≥ËÆ©Êú´Á´ØÂπ≥Ë°åÂì™Êù°ËΩ¥
            # target_pose.q = r.as_quat()
            # result = planner.move_to_pose_with_screw(target_pose, dry_run=True)
            result = planner.move_to_pose_with_screw(transform_window._gizmo_pose * sapien.Pose([0, 0, 0.1]), dry_run=True)
            if result != -1 and len(result["position"]) < 150:
                _, reward, _, _, info = planner.follow_path(result)
                print(f"Reward: {reward}, Info: {info}")
            else:
                if result == -1:
                    print("Plan failed")
                else:
                    print("Generated motion plan was too long. Try a closer sub-goal")
            execute_current_pose = False





if __name__ == "__main__":
    main(parse_args())


In [None]:
#ËØ∑Âú®demo3Ê±ÇËß£Âô®ÁõÆÂΩï‰∏ãÈáåÈù¢ËøêË°å
shiqintong@shiqintong-Jiaolong-Series-MRID6:~/.local/lib/python3.10/site-packages/mani_skill/examples$ python3 -m mani_skill.examples.demo03 -r my_panda  -e "RoboCasaKitchen-v1"
/usr/lib/python3/dist-packages/requests/__init__.py:87: RequestsDependencyWarning: urllib3 (2.3.0) or chardet (4.0.0) doesn't match a supported version!
  warnings.warn("urllib3 ({}) or chardet ({}) doesn't match a supported "
2025-04-10 05:49:26,196 - mani_skill  - WARNING - my_panda is not in the task's list of supported robots. Code may not run as intended
WARNING:root:Robot my_panda doesn't have a defined front facing size, defaulting to 0.7m
2025-04-10 05:49:29,117 - mani_skill  - WARNING - mani_skill is not installed with git.
/home/shiqintong/.local/lib/python3.10/site-packages/gymnasium/core.py:311: UserWarning: WARN: env.agent to get variables from other wrappers is deprecated and will be removed in v1.0, to get this variable you can do `env.unwrapped.agent` for environment variables or `env.get_wrapper_attr('agent')` that will search the reminding wrappers.
  logger.warn(
üìå Viewing keyframe: standing
/home/shiqintong/.local/lib/python3.10/site-packages/gymnasium/core.py:311: UserWarning: WARN: env.gpu_sim_enabled to get variables from other wrappers is deprecated and will be removed in v1.0, to get this variable you can do `env.unwrapped.gpu_sim_enabled` for environment variables or `env.get_wrapper_attr('gpu_sim_enabled')` that will search the reminding wrappers.
  logger.warn(
Collecting trajectory 1, seed=0

=== Initializing Motion Planning Solver ===
- Robot UID: my_panda
- Control mode: pd_joint_pos
- Base pose: Pose(raw_pose=tensor([[ 1.4000, -1.0000,  0.0100,  0.0000,  0.0000,  0.7071,  0.7071]]))
- Joint vel limits: 0.5, acc limits: 0.5

Robot Initial State:
- Qpos: [-1.75 -0.5   0.    1.    0.   -1.5   1.4   0.04  0.04  0.04  0.04  0.04
  0.04]
/home/shiqintong/.local/lib/python3.10/site-packages/gymnasium/core.py:311: UserWarning: WARN: env.render_human to get variables from other wrappers is deprecated and will be removed in v1.0, to get this variable you can do `env.unwrapped.render_human` for environment variables or `env.get_wrapper_attr('render_human')` that will search the reminding wrappers.
  logger.warn(

=== Attempting Screw Motion Planning ===
- Target pose: P=[ 1.0843096  -0.62143683  1.6021563 ], Q=[-0.15279359 -0.00195442  0.47976848  0.86398643]

First planning attempt...
Joint limits: [[-3.14    3.14  ]
 [-2.41    2.41  ]
 [-3.14    3.14  ]
 [-2.66    2.66  ]
 [-3.14    3.14  ]
 [-2.23    2.23  ]
 [-3.14    3.14  ]
 [ 0.      0.8   ]
 [-0.8757  0.8757]
 [ 0.      0.8757]
 [ 0.      0.8757]
 [ 0.      0.81  ]
 [-0.8757  0.8757]]
Joint limit size: 13
Qpos size: 13
Relative theta = 1
Initial omega = [[-0.24097177]
 [-0.29286608]
 [ 0.09391727]
 [ 0.        ]
 [ 0.        ]
 [ 0.        ]]
Start qpos = [-1.75       -0.5         0.          1.          0.         -1.5
  1.39999998  0.          0.          0.          0.          0.
  0.        ]
Start planning. Initial theta = 1.0000
Initial qpos: [-1.75       -0.5         0.          1.          0.         -1.5
  1.39999998  0.          0.          0.          0.          0.
  0.        ]
- First attempt status: Success
Reward: tensor([0.]), Info: {'elapsed_steps': tensor([68], dtype=torch.int32)}

=== Attempting Screw Motion Planning ===
- Target pose: P=[ 1.112874   -0.62143683  1.6021563 ], Q=[-0.15279359 -0.00195442  0.47976848  0.86398643]

First planning attempt...
Joint limits: [[-3.14    3.14  ]
 [-2.41    2.41  ]
 [-3.14    3.14  ]
 [-2.66    2.66  ]
 [-3.14    3.14  ]
 [-2.23    2.23  ]
 [-3.14    3.14  ]
 [ 0.      0.8   ]
 [-0.8757  0.8757]
 [ 0.      0.8757]
 [ 0.      0.8757]
 [ 0.      0.81  ]
 [-0.8757  0.8757]]
Joint limit size: 13
Qpos size: 13
Relative theta = 0.027555050696384954
Initial omega = [[-0.04973805]
 [ 0.01351322]
 [ 0.0019452 ]
 [ 0.00651   ]
 [ 0.02389754]
 [-0.01207511]]
Start qpos = [-1.40837812 -1.07247722 -0.76377302  1.93014789 -0.28605628 -2.02220678
  1.87674761  0.          0.          0.          0.          0.
  0.        ]
Start planning. Initial theta = 0.0276
Initial qpos: [-1.40837812 -1.07247722 -0.76377302  1.93014789 -0.28605628 -2.02220678
  1.87674761  0.          0.          0.          0.          0.
  0.        ]
- First attempt status: Success
Reward: tensor([0.]), Info: {'elapsed_steps': tensor([85], dtype=torch.int32)}
gripper control
gripper open - close
Reward: tensor([0.]), Info: {'elapsed_steps': tensor([91], dtype=torch.int32)}

=== Attempting Screw Motion Planning ===
- Target pose: P=[ 1.112874  -0.5889796  1.6021563], Q=[-0.15279359 -0.00195442  0.47976848  0.86398643]

First planning attempt...
Joint limits: [[-3.14    3.14  ]
 [-2.41    2.41  ]
 [-3.14    3.14  ]
 [-2.66    2.66  ]
 [-3.14    3.14  ]
 [-2.23    2.23  ]
 [-3.14    3.14  ]
 [ 0.      0.8   ]
 [-0.8757  0.8757]
 [ 0.      0.8757]
 [ 0.      0.8757]
 [ 0.      0.81  ]
 [-0.8757  0.8757]]
Joint limit size: 13
Qpos size: 13
Relative theta = 1
Initial omega = [[-0.00097217]
 [ 0.00136578]
 [ 0.02936145]
 [ 0.        ]
 [ 0.        ]
 [ 0.        ]]
Start qpos = [-1.35878599 -1.05818331 -0.82271039  1.84942532 -0.32118884 -2.01015973
  1.904158    0.          0.          0.          0.          0.
  0.        ]
Start planning. Initial theta = 1.0000
Initial qpos: [-1.35878599 -1.05818331 -0.82271039  1.84942532 -0.32118884 -2.01015973
  1.904158    0.          0.          0.          0.          0.
  0.        ]
- First attempt status: Success
Reward: tensor([0.]), Info: {'elapsed_steps': tensor([112], dtype=torch.int32)}

=== Attempting Screw Motion Planning ===
- Target pose: P=[ 1.112874   -0.54069954  1.6021563 ], Q=[-0.15279359 -0.00195442  0.47976848  0.86398643]

First planning attempt...
Joint limits: [[-3.14    3.14  ]
 [-2.41    2.41  ]
 [-3.14    3.14  ]
 [-2.66    2.66  ]
 [-3.14    3.14  ]
 [-2.23    2.23  ]
 [-3.14    3.14  ]
 [ 0.      0.8   ]
 [-0.8757  0.8757]
 [ 0.      0.8757]
 [ 0.      0.8757]
 [ 0.      0.81  ]
 [-0.8757  0.8757]]
Joint limit size: 13
Qpos size: 13
Relative theta = 1
Initial omega = [[-0.00078145]
 [ 0.00063628]
 [ 0.04844625]
 [ 0.        ]
 [ 0.        ]
 [ 0.        ]]
Start qpos = [-1.38849187 -1.10174119 -0.81277204  1.81061542 -0.32311127 -1.92746556
  1.9350642   0.          0.          0.          0.          0.
  0.        ]
Start planning. Initial theta = 1.0000
Initial qpos: [-1.38849187 -1.10174119 -0.81277204  1.81061542 -0.32311127 -1.92746556
  1.9350642   0.          0.          0.          0.          0.
  0.        ]
- First attempt status: Success
Reward: tensor([0.]), Info: {'elapsed_steps': tensor([132], dtype=torch.int32)}

=== Attempting Screw Motion Planning ===
- Target pose: P=[ 1.0892382  -0.54069954  1.6021563 ], Q=[-0.15279359 -0.00195442  0.47976848  0.86398643]

First planning attempt...
Joint limits: [[-3.14    3.14  ]
 [-2.41    2.41  ]
 [-3.14    3.14  ]
 [-2.66    2.66  ]
 [-3.14    3.14  ]
 [-2.23    2.23  ]
 [-3.14    3.14  ]
 [ 0.      0.8   ]
 [-0.8757  0.8757]
 [ 0.      0.8757]
 [ 0.      0.8757]
 [ 0.      0.81  ]
 [-0.8757  0.8757]]
Joint limit size: 13
Qpos size: 13
Relative theta = 1
Initial omega = [[0.02198686]
 [0.00104147]
 [0.00284703]
 [0.        ]
 [0.        ]
 [0.        ]]
Start qpos = [-1.42865145 -1.16317487 -0.80347687  1.74164319 -0.32803977 -1.79875302
  1.98488069  0.          0.          0.          0.          0.
  0.        ]
Start planning. Initial theta = 1.0000
Initial qpos: [-1.42865145 -1.16317487 -0.80347687  1.74164319 -0.32803977 -1.79875302
  1.98488069  0.          0.          0.          0.          0.
  0.        ]
- First attempt status: Success
Reward: tensor([0.]), Info: {'elapsed_steps': tensor([144], dtype=torch.int32)}

=== Attempting Screw Motion Planning ===
- Target pose: P=[ 1.0892382 -0.4942905  1.6021563], Q=[-0.15279359 -0.00195442  0.47976848  0.86398643]

First planning attempt...
Joint limits: [[-3.14    3.14  ]
 [-2.41    2.41  ]
 [-3.14    3.14  ]
 [-2.66    2.66  ]
 [-3.14    3.14  ]
 [-2.23    2.23  ]
 [-3.14    3.14  ]
 [ 0.      0.8   ]
 [-0.8757  0.8757]
 [ 0.      0.8757]
 [ 0.      0.8757]
 [ 0.      0.81  ]
 [-0.8757  0.8757]]
Joint limit size: 13
Qpos size: 13
Relative theta = 1
Initial omega = [[0.0022903 ]
 [0.0004684 ]
 [0.04540406]
 [0.        ]
 [0.        ]
 [0.        ]]
Start qpos = [-1.47202647 -1.16953361 -0.76648414  1.77170396 -0.30532959 -1.79104257
  1.97067881  0.          0.          0.          0.          0.
  0.        ]
Start planning. Initial theta = 1.0000
Initial qpos: [-1.47202647 -1.16953361 -0.76648414  1.77170396 -0.30532959 -1.79104257
  1.97067881  0.          0.          0.          0.          0.
  0.        ]
- First attempt status: Success
Reward: tensor([0.]), Info: {'elapsed_steps': tensor([164], dtype=torch.int32)}
gripper control
gripper close - open
Reward: tensor([0.]), Info: {'elapsed_steps': tensor([170], dtype=torch.int32)}
gripper control
gripper open - close
Reward: tensor([0.]), Info: {'elapsed_steps': tensor([176], dtype=torch.int32)}

=== Attempting Screw Motion Planning ===
- Target pose: P=[ 1.0892382  -0.47289333  1.6021563 ], Q=[-0.15279359 -0.00195442  0.47976848  0.86398643]

First planning attempt...
Joint limits: [[-3.14    3.14  ]
 [-2.41    2.41  ]
 [-3.14    3.14  ]
 [-2.66    2.66  ]
 [-3.14    3.14  ]
 [-2.23    2.23  ]
 [-3.14    3.14  ]
 [ 0.      0.8   ]
 [-0.8757  0.8757]
 [ 0.      0.8757]
 [ 0.      0.8757]
 [ 0.      0.81  ]
 [-0.8757  0.8757]]
Joint limit size: 13
Qpos size: 13
Relative theta = 1
Initial omega = [[-0.00152687]
 [ 0.00123507]
 [ 0.0238825 ]
 [ 0.        ]
 [ 0.        ]
 [ 0.        ]]
Start qpos = [-1.5033555  -1.21955502 -0.76115298  1.69657969 -0.31161901 -1.66715991
  2.01480889  0.          0.          0.          0.          0.
  0.        ]
Start planning. Initial theta = 1.0000
Initial qpos: [-1.5033555  -1.21955502 -0.76115298  1.69657969 -0.31161901 -1.66715991
  2.01480889  0.          0.          0.          0.          0.
  0.        ]
- First attempt status: Success
Reward: tensor([0.]), Info: {'elapsed_steps': tensor([190], dtype=torch.int32)}

=== Attempting Screw Motion Planning ===
- Target pose: P=[ 1.0892382  -0.44604614  1.6021563 ], Q=[-0.15279359 -0.00195442  0.47976848  0.86398643]

First planning attempt...
Joint limits: [[-3.14    3.14  ]
 [-2.41    2.41  ]
 [-3.14    3.14  ]
 [-2.66    2.66  ]
 [-3.14    3.14  ]
 [-2.23    2.23  ]
 [-3.14    3.14  ]
 [ 0.      0.8   ]
 [-0.8757  0.8757]
 [ 0.      0.8757]
 [ 0.      0.8757]
 [ 0.      0.81  ]
 [-0.8757  0.8757]]
Joint limit size: 13
Qpos size: 13
Relative theta = 1
Initial omega = [[-0.00137763]
 [ 0.0006873 ]
 [ 0.02920512]
 [ 0.        ]
 [ 0.        ]
 [ 0.        ]]
Start qpos = [-1.51044083 -1.23841608 -0.7632435   1.64623797 -0.31787008 -1.60324168
  2.03910875  0.          0.          0.          0.          0.
  0.        ]
Start planning. Initial theta = 1.0000
Initial qpos: [-1.51044083 -1.23841608 -0.7632435   1.64623797 -0.31787008 -1.60324168
  2.03910875  0.          0.          0.          0.          0.
  0.        ]
- First attempt status: Success
Reward: tensor([0.]), Info: {'elapsed_steps': tensor([209], dtype=torch.int32)}
gripper control
gripper close - open
Reward: tensor([0.]), Info: {'elapsed_steps': tensor([215], dtype=torch.int32)}

=== Attempting Screw Motion Planning ===
- Target pose: P=[ 1.0892382  -0.53558093  1.6021563 ], Q=[-0.15279359 -0.00195442  0.47976848  0.86398643]

First planning attempt...
Joint limits: [[-3.14    3.14  ]
 [-2.41    2.41  ]
 [-3.14    3.14  ]
 [-2.66    2.66  ]
 [-3.14    3.14  ]
 [-2.23    2.23  ]
 [-3.14    3.14  ]
 [ 0.      0.8   ]
 [-0.8757  0.8757]
 [ 0.      0.8757]
 [ 0.      0.8757]
 [ 0.      0.81  ]
 [-0.8757  0.8757]]
Joint limit size: 13
Qpos size: 13
Relative theta = 0.023603784188018704
Initial omega = [[ 0.01108832]
 [ 0.0083743 ]
 [-0.1117632 ]
 [ 0.02202276]
 [ 0.00838851]
 [ 0.00133022]]
Start qpos = [-1.49839818 -1.25491333 -0.7838366   1.58519459 -0.34893078 -1.52853358
  2.08528352  0.          0.          0.          0.          0.
  0.        ]
Start planning. Initial theta = 0.0236
Initial qpos: [-1.49839818 -1.25491333 -0.7838366   1.58519459 -0.34893078 -1.52853358
  2.08528352  0.          0.          0.          0.          0.
  0.        ]
- First attempt status: Success
Reward: tensor([0.]), Info: {'elapsed_steps': tensor([244], dtype=torch.int32)}

=== Attempting Screw Motion Planning ===
- Target pose: P=[ 1.0892382 -0.67662    1.6021563], Q=[-0.15279359 -0.00195442  0.47976848  0.86398643]

First planning attempt...
Joint limits: [[-3.14    3.14  ]
 [-2.41    2.41  ]
 [-3.14    3.14  ]
 [-2.66    2.66  ]
 [-3.14    3.14  ]
 [-2.23    2.23  ]
 [-3.14    3.14  ]
 [ 0.      0.8   ]
 [-0.8757  0.8757]
 [ 0.      0.8757]
 [ 0.      0.8757]
 [ 0.      0.81  ]


In [None]:
#demo3ÁöÑÊú´Á´ØÊ±ÇËß£Âô®
import mplib
import numpy as np
import sapien
import trimesh

from mani_skill.agents.base_agent import BaseAgent
from mani_skill.envs.sapien_env import BaseEnv
from mani_skill.envs.scene import ManiSkillScene
from mani_skill.utils.structs.pose import to_sapien_pose
import sapien.physx as physx
OPEN = 1
CLOSED = -1


class MyPandaMotionPlanningSolver:
    def __init__(
        self,
        env: BaseEnv,
        debug: bool = False,
        vis: bool = True,
        base_pose: sapien.Pose = None,  # TODO mplib doesn't support robot base being anywhere but 0
        visualize_target_grasp_pose: bool = True,
        print_env_info: bool = True,
        joint_vel_limits=0.9,
        joint_acc_limits=0.9,
    ):
        print("\n=== Initializing Motion Planning Solver ===")
        print(f"- Robot UID: {env.unwrapped.robot_uids}")
        print(f"- Control mode: {env.unwrapped.control_mode}")
        print(f"- Base pose: {base_pose}")
        print(f"- Joint vel limits: {joint_vel_limits}, acc limits: {joint_acc_limits}")
        self.env = env
        self.base_env: BaseEnv = env.unwrapped
        self.env_agent: BaseAgent = self.base_env.agent
        self.robot = self.env_agent.robot
        self.joint_vel_limits = joint_vel_limits
        self.joint_acc_limits = joint_acc_limits

        self.base_pose = to_sapien_pose(base_pose)

        self.planner = self.setup_planner()
        self.control_mode = self.base_env.control_mode

        self.debug = debug
        self.vis = vis
        self.print_env_info = print_env_info
        self.visualize_target_grasp_pose = visualize_target_grasp_pose
        self.gripper_state = OPEN
        self.grasp_pose_visual = None
        if self.vis and self.visualize_target_grasp_pose:
            if "grasp_pose_visual" not in self.base_env.scene.actors:
                self.grasp_pose_visual = build_panda_gripper_grasp_pose_visual(
                    self.base_env.scene
                )
            else:
                self.grasp_pose_visual = self.base_env.scene.actors["grasp_pose_visual"]
            self.grasp_pose_visual.set_pose(self.base_env.agent.tcp.pose)
        self.elapsed_steps = 0

        self.use_point_cloud = False
        self.collision_pts_changed = False
        self.all_collision_pts = None
            # ÊâìÂç∞Êú∫Âô®‰∫∫ÂàùÂßãÁä∂ÊÄÅ
        print("\nRobot Initial State:")
        print(f"- Qpos: {self.robot.get_qpos().cpu().numpy()[0]}")


    def render_wait(self):
        if not self.vis or not self.debug:
            return
        print("Press [c] to continue")
        viewer = self.base_env.render_human()
        while True:
            if viewer.window.key_down("c"):
                break
            self.base_env.render_human()

    def setup_planner(self):
        planner = mplib.Planner(
            urdf="/home/shiqintong/Downloads/wheelchair_description/urdf/inte.urdf",
            srdf="/home/shiqintong/Downloads/wheelchair_description/urdf/inte.srdf",
            move_group="tool_frame",
            joint_vel_limits=np.ones(7) * self.joint_vel_limits,
            joint_acc_limits=np.ones(7) * self.joint_acc_limits,
        )

        planner.set_base_pose(np.hstack([self.base_pose.p, self.base_pose.q]))
        return planner

    def follow_path(self, result, refine_steps: int = 0):
        n_step = result["position"].shape[0]
        for i in range(n_step + refine_steps):
            qpos = result["position"][min(i, n_step - 1)]
            if self.control_mode == "pd_joint_pos_vel":
                qvel = result["velocity"][min(i, n_step - 1)]
                action = np.hstack([qpos, qvel, self.gripper_state])
            else:
                action = np.hstack([qpos, self.gripper_state])
            obs, reward, terminated, truncated, info = self.env.step(action)
            self.elapsed_steps += 1
            if self.print_env_info:
                print(
                    f"[{self.elapsed_steps:3}] Env Output: reward={reward} info={info}"
                )
            if self.vis:
                self.base_env.render_human()
        return obs, reward, terminated, truncated, info

    def move_to_pose_with_RRTConnect(
        self, pose: sapien.Pose, dry_run: bool = False, refine_steps: int = 0
    ):
        pose = to_sapien_pose(pose)
        if self.grasp_pose_visual is not None:
            self.grasp_pose_visual.set_pose(pose)
        pose = sapien.Pose(p=pose.p, q=pose.q)
        result = self.planner.plan_qpos_to_pose(
            np.concatenate([pose.p, pose.q]),
            self.robot.get_qpos().cpu().numpy()[0],
            time_step=self.base_env.control_timestep,
            use_point_cloud=self.use_point_cloud,
            wrt_world=True,
        )
        if result["status"] != "Success":
            print(result["status"])
            self.render_wait()
            return -1
        self.render_wait()
        if dry_run:
            return result
        return self.follow_path(result, refine_steps=refine_steps)

    def move_to_pose_with_screw(
        self, pose: sapien.Pose, dry_run: bool = False, refine_steps: int = 0
    ):
        print("\n=== Attempting Screw Motion Planning ===")
        pose = to_sapien_pose(pose)
        print(f"- Target pose: P={pose.p}, Q={pose.q}")
        # try screw two times before giving up
        if self.grasp_pose_visual is not None:
            self.grasp_pose_visual.set_pose(pose)
        pose = sapien.Pose(p=pose.p , q=pose.q)
        print("\nFirst planning attempt...")
        result = self.planner.plan_screw(
            np.concatenate([pose.p, pose.q]),
            self.robot.get_qpos().cpu().numpy()[0][:7],
            time_step=self.base_env.control_timestep,
            use_point_cloud=self.use_point_cloud,
        )
        print(f"- First attempt status: {result['status']}")
        if result["status"] != "Success":
            print("\nSecond planning attempt...")
            result = self.planner.plan_screw(
                np.concatenate([pose.p, pose.q]),
                self.robot.get_qpos().cpu().numpy()[0][:7],
                time_step=self.base_env.control_timestep,
                use_point_cloud=self.use_point_cloud,
            )
            if result["status"] != "Success":
                print("\n!!! Planning Failed !!!")
                print(f"- Error status: {result['status']}")
                print(result["status"])
                self.render_wait()
                return -1
        self.render_wait()
        if dry_run:
            return result
        return self.follow_path(result, refine_steps=refine_steps)

    def open_gripper(self):
        self.gripper_state = OPEN
        qpos = self.robot.get_qpos()[0, :7].cpu().numpy()
        for i in range(6):
            if self.control_mode == "pd_joint_pos":
                action = np.hstack([qpos, self.gripper_state])
            else:
                action = np.hstack([qpos, qpos * 0, self.gripper_state])
            obs, reward, terminated, truncated, info = self.env.step(action[None])
            self.elapsed_steps += 1
            if self.print_env_info:
                print(
                    f"[{self.elapsed_steps:3}] Env Output: reward={reward} info={info}"
                )
            if self.vis:
                self.base_env.render_human()
        return obs, reward, terminated, truncated, info

    def close_gripper(self, t=6, gripper_state = CLOSED):
        self.gripper_state = gripper_state
        qpos = self.robot.get_qpos()[0, :7].cpu().numpy()
        for i in range(t):
            if self.control_mode == "pd_joint_pos":
                action = np.hstack([qpos, self.gripper_state])
            else:
                action = np.hstack([qpos, qpos * 0, self.gripper_state])
            obs, reward, terminated, truncated, info = self.env.step(action[None])
            self.elapsed_steps += 1
            if self.print_env_info:
                print(
                    f"[{self.elapsed_steps:3}] Env Output: reward={reward} info={info}"
                )
            if self.vis:
                self.base_env.render_human()
        return obs, reward, terminated, truncated, info

    def add_box_collision(self, extents: np.ndarray, pose: sapien.Pose):
        self.use_point_cloud = True
        box = trimesh.creation.box(extents, transform=pose.to_transformation_matrix())
        pts, _ = trimesh.sample.sample_surface(box, 256)
        if self.all_collision_pts is None:
            self.all_collision_pts = pts
        else:
            self.all_collision_pts = np.vstack([self.all_collision_pts, pts])
        self.planner.update_point_cloud(self.all_collision_pts)

    def add_collision_pts(self, pts: np.ndarray):
        if self.all_collision_pts is None:
            self.all_collision_pts = pts
        else:
            self.all_collision_pts = np.vstack([self.all_collision_pts, pts])
        self.planner.update_point_cloud(self.all_collision_pts)

    def clear_collisions(self):
        self.all_collision_pts = None
        self.use_point_cloud = False

    def close(self):
        pass

from transforms3d import quaternions


def build_panda_gripper_grasp_pose_visual(scene: ManiSkillScene):
    builder = scene.create_actor_builder()
    grasp_pose_visual_width = 0.01
    grasp_width = 0.05

    builder.add_sphere_visual(
        pose=sapien.Pose(p=[0, 0, 0.0]),
        radius=grasp_pose_visual_width,
        material=sapien.render.RenderMaterial(base_color=[0.3, 0.4, 0.8, 0.7])
    )

    builder.add_box_visual(
        pose=sapien.Pose(p=[0, 0, -0.08]),
        half_size=[grasp_pose_visual_width, grasp_pose_visual_width, 0.02],
        material=sapien.render.RenderMaterial(base_color=[0, 1, 0, 0.7]),
    )
    builder.add_box_visual(
        pose=sapien.Pose(p=[0, 0, -0.05]),
        half_size=[grasp_pose_visual_width, grasp_width, grasp_pose_visual_width],
        material=sapien.render.RenderMaterial(base_color=[0, 1, 0, 0.7]),
    )
    builder.add_box_visual(
        pose=sapien.Pose(
            p=[
                0.03 - grasp_pose_visual_width * 3,
                grasp_width + grasp_pose_visual_width,
                0.03 - 0.05,
            ],
            q=quaternions.axangle2quat(np.array([0, 1, 0]), theta=np.pi / 2),
        ),
        half_size=[0.04, grasp_pose_visual_width, grasp_pose_visual_width],
        material=sapien.render.RenderMaterial(base_color=[0, 0, 1, 0.7]),
    )
    builder.add_box_visual(
        pose=sapien.Pose(
            p=[
                0.03 - grasp_pose_visual_width * 3,
                -grasp_width - grasp_pose_visual_width,
                0.03 - 0.05,
            ],
            q=quaternions.axangle2quat(np.array([0, 1, 0]), theta=np.pi / 2),
        ),
        half_size=[0.04, grasp_pose_visual_width, grasp_pose_visual_width],
        material=sapien.render.RenderMaterial(base_color=[1, 0, 0, 0.7]),
    )
    grasp_pose_visual = builder.build_kinematic(name="grasp_pose_visual")
    return grasp_pose_visual

In [None]:
# ÂçïÂÖ≥ËäÇÊéßÂà∂Ôºödemo2 ÊâßË°åÊåá‰ª§Ôºö‰ªª‰ΩïÊÉÖÂÜµ‰∏ãËøêË°å  python3 -m mani_skill.examples.demo02 -r my_panda --random_actions  -e "RoboCasaKitchen-v1"

import sapien
import numpy as np
from mani_skill.agents.base_agent import BaseAgent, Keyframe
from mani_skill.agents.controllers import *
from mani_skill.agents.registration import register_agent
from scipy.spatial.transform import Rotation as R
import mani_skill.envs
import argparse

from mani_skill.agents.controllers.pd_joint_vel import PDJointVelControllerConfig
import gymnasium as gym
import mani_skill
from mani_skill.agents.controllers.base_controller import DictController
from mani_skill.envs.sapien_env import BaseEnv
from copy import deepcopy



import gymnasium as gym
import numpy as np
import sapien

from mani_skill.envs.sapien_env import BaseEnv
from mani_skill.utils import gym_utils
from mani_skill.utils.wrappers import RecordEpisode


import tyro
from dataclasses import dataclass
from typing import List, Optional, Annotated, Union


import mplib
import numpy as np
import sapien
import trimesh

import mplib
import numpy as np
import sapien
import trimesh

from mani_skill.agents.base_agent import BaseAgent
from mani_skill.envs.sapien_env import BaseEnv
from mani_skill.envs.scene import ManiSkillScene
from mani_skill.utils.structs.pose import to_sapien_pose
import sapien.physx as physx
OPEN = 1
CLOSED = -1


class MyPandaMotionPlanningSolver:
    def __init__(
        self,
        env: BaseEnv,
        debug: bool = False,
        vis: bool = True,
        base_pose: sapien.Pose = None,  # TODO mplib doesn't support robot base being anywhere but 0
        visualize_target_grasp_pose: bool = True,
        print_env_info: bool = True,
        joint_vel_limits=0.9,
        joint_acc_limits=0.9,
    ):
        self.env = env
        self.base_env: BaseEnv = env.unwrapped
        self.env_agent: BaseAgent = self.base_env.agent
        self.robot = self.env_agent.robot
        self.joint_vel_limits = joint_vel_limits
        self.joint_acc_limits = joint_acc_limits

        self.base_pose = to_sapien_pose(base_pose)

        self.planner = self.setup_planner()
        self.control_mode = self.base_env.control_mode

        self.debug = debug
        self.vis = vis
        self.print_env_info = print_env_info
        self.visualize_target_grasp_pose = visualize_target_grasp_pose
        self.gripper_state = OPEN
        self.grasp_pose_visual = None
        if self.vis and self.visualize_target_grasp_pose:
            if "grasp_pose_visual" not in self.base_env.scene.actors:
                self.grasp_pose_visual = build_panda_gripper_grasp_pose_visual(
                    self.base_env.scene
                )
            else:
                self.grasp_pose_visual = self.base_env.scene.actors["grasp_pose_visual"]
            self.grasp_pose_visual.set_pose(self.base_env.agent.tcp.pose)
        self.elapsed_steps = 0

        self.use_point_cloud = False
        self.collision_pts_changed = False
        self.all_collision_pts = None

    def render_wait(self):
        if not self.vis or not self.debug:
            return
        print("Press [c] to continue")
        viewer = self.base_env.render_human()
        while True:
            if viewer.window.key_down("c"):
                break
            self.base_env.render_human()

    def setup_planner(self):
        link_names = [link.get_name() for link in self.robot.get_links()]
        joint_names = [joint.get_name() for joint in self.robot.get_active_joints()]
        planner = mplib.Planner(
            urdf="/home/shiqintong/Downloads/wheelchair_description/urdf/inte.urdf",
            srdf="/home/shiqintong/Downloads/wheelchair_description/urdf/inte.srdf",
            user_link_names=link_names,
            user_joint_names=joint_names,
            move_group="tool_frame",
            joint_vel_limits=np.ones(7) * self.joint_vel_limits,
            joint_acc_limits=np.ones(7) * self.joint_acc_limits,
        )
        planner.set_base_pose(np.hstack([self.base_pose.p, self.base_pose.q]))
        return planner

    def follow_path(self, result, refine_steps: int = 0):
        n_step = result["position"].shape[0]
        for i in range(n_step + refine_steps):
            qpos = result["position"][min(i, n_step - 1)]
            if self.control_mode == "pd_joint_pos_vel":
                qvel = result["velocity"][min(i, n_step - 1)]
                action = np.hstack([qpos, qvel, self.gripper_state])
            else:
                action = np.hstack([qpos, self.gripper_state])
            obs, reward, terminated, truncated, info = self.env.step(action)
            self.elapsed_steps += 1
            if self.print_env_info:
                print(
                    f"[{self.elapsed_steps:3}] Env Output: reward={reward} info={info}"
                )
            if self.vis:
                self.base_env.render_human()
        return obs, reward, terminated, truncated, info

    def move_to_pose_with_RRTConnect(
        self, pose: sapien.Pose, dry_run: bool = False, refine_steps: int = 0
    ):
        pose = to_sapien_pose(pose)
        if self.grasp_pose_visual is not None:
            self.grasp_pose_visual.set_pose(pose)
        pose = sapien.Pose(p=pose.p, q=pose.q)
        result = self.planner.plan_qpos_to_pose(
            np.concatenate([pose.p, pose.q]),
            self.robot.get_qpos().cpu().numpy()[0],
            time_step=self.base_env.control_timestep,
            use_point_cloud=self.use_point_cloud,
            wrt_world=True,
        )
        if result["status"] != "Success":
            print(result["status"])
            self.render_wait()
            return -1
        self.render_wait()
        if dry_run:
            return result
        return self.follow_path(result, refine_steps=refine_steps)

    def move_to_pose_with_screw(
        self, pose: sapien.Pose, dry_run: bool = False, refine_steps: int = 0
    ):
        pose = to_sapien_pose(pose)
        # try screw two times before giving up
        if self.grasp_pose_visual is not None:
            self.grasp_pose_visual.set_pose(pose)
        pose = sapien.Pose(p=pose.p , q=pose.q)
        result = self.planner.plan_screw(
            np.concatenate([pose.p, pose.q]),
            self.robot.get_qpos().cpu().numpy()[0],
            time_step=self.base_env.control_timestep,
            use_point_cloud=self.use_point_cloud,
        )
        if result["status"] != "Success":
            result = self.planner.plan_screw(
                np.concatenate([pose.p, pose.q]),
                self.robot.get_qpos().cpu().numpy()[0],
                time_step=self.base_env.control_timestep,
                use_point_cloud=self.use_point_cloud,
            )
            if result["status"] != "Success":
                print(result["status"])
                self.render_wait()
                return -1
        self.render_wait()
        if dry_run:
            return result
        return self.follow_path(result, refine_steps=refine_steps)

    def open_gripper(self):
        self.gripper_state = OPEN
        qpos = self.robot.get_qpos()[0, :-2].cpu().numpy()
        for i in range(6):
            if self.control_mode == "pd_joint_pos":
                action = np.hstack([qpos, self.gripper_state])
            else:
                action = np.hstack([qpos, qpos * 0, self.gripper_state])
            obs, reward, terminated, truncated, info = self.env.step(action)
            self.elapsed_steps += 1
            if self.print_env_info:
                print(
                    f"[{self.elapsed_steps:3}] Env Output: reward={reward} info={info}"
                )
            if self.vis:
                self.base_env.render_human()
        return obs, reward, terminated, truncated, info

    def close_gripper(self, t=6, gripper_state = CLOSED):
        self.gripper_state = gripper_state
        qpos = self.robot.get_qpos()[0, :-2].cpu().numpy()
        for i in range(t):
            if self.control_mode == "pd_joint_pos":
                action = np.hstack([qpos, self.gripper_state])
            else:
                action = np.hstack([qpos, qpos * 0, self.gripper_state])
            obs, reward, terminated, truncated, info = self.env.step(action)
            self.elapsed_steps += 1
            if self.print_env_info:
                print(
                    f"[{self.elapsed_steps:3}] Env Output: reward={reward} info={info}"
                )
            if self.vis:
                self.base_env.render_human()
        return obs, reward, terminated, truncated, info

    def add_box_collision(self, extents: np.ndarray, pose: sapien.Pose):
        self.use_point_cloud = True
        box = trimesh.creation.box(extents, transform=pose.to_transformation_matrix())
        pts, _ = trimesh.sample.sample_surface(box, 256)
        if self.all_collision_pts is None:
            self.all_collision_pts = pts
        else:
            self.all_collision_pts = np.vstack([self.all_collision_pts, pts])
        self.planner.update_point_cloud(self.all_collision_pts)

    def add_collision_pts(self, pts: np.ndarray):
        if self.all_collision_pts is None:
            self.all_collision_pts = pts
        else:
            self.all_collision_pts = np.vstack([self.all_collision_pts, pts])
        self.planner.update_point_cloud(self.all_collision_pts)

    def clear_collisions(self):
        self.all_collision_pts = None
        self.use_point_cloud = False

    def close(self):
        pass

from transforms3d import quaternions


def build_panda_gripper_grasp_pose_visual(scene: ManiSkillScene):
    builder = scene.create_actor_builder()
    grasp_pose_visual_width = 0.01
    grasp_width = 0.05

    builder.add_sphere_visual(
        pose=sapien.Pose(p=[0, 0, 0.0]),
        radius=grasp_pose_visual_width,
        material=sapien.render.RenderMaterial(base_color=[0.3, 0.4, 0.8, 0.7])
    )

    builder.add_box_visual(
        pose=sapien.Pose(p=[0, 0, -0.08]),
        half_size=[grasp_pose_visual_width, grasp_pose_visual_width, 0.02],
        material=sapien.render.RenderMaterial(base_color=[0, 1, 0, 0.7]),
    )
    builder.add_box_visual(
        pose=sapien.Pose(p=[0, 0, -0.05]),
        half_size=[grasp_pose_visual_width, grasp_width, grasp_pose_visual_width],
        material=sapien.render.RenderMaterial(base_color=[0, 1, 0, 0.7]),
    )
    builder.add_box_visual(
        pose=sapien.Pose(
            p=[
                0.03 - grasp_pose_visual_width * 3,
                grasp_width + grasp_pose_visual_width,
                0.03 - 0.05,
            ],
            q=quaternions.axangle2quat(np.array([0, 1, 0]), theta=np.pi / 2),
        ),
        half_size=[0.04, grasp_pose_visual_width, grasp_pose_visual_width],
        material=sapien.render.RenderMaterial(base_color=[0, 0, 1, 0.7]),
    )
    builder.add_box_visual(
        pose=sapien.Pose(
            p=[
                0.03 - grasp_pose_visual_width * 3,
                -grasp_width - grasp_pose_visual_width,
                0.03 - 0.05,
            ],
            q=quaternions.axangle2quat(np.array([0, 1, 0]), theta=np.pi / 2),
        ),
        half_size=[0.04, grasp_pose_visual_width, grasp_pose_visual_width],
        material=sapien.render.RenderMaterial(base_color=[1, 0, 0, 0.7]),
    )
    grasp_pose_visual = builder.build_kinematic(name="grasp_pose_visual")
    return grasp_pose_visual





def parse_args(args=None):
    parser = argparse.ArgumentParser()
    parser.add_argument("-r", "--robot-uid", type=str, default="panda", help="The id of the robot to place in the environment")
    parser.add_argument("-b", "--sim-backend", type=str, default="auto", help="Which simulation backend to use. Can be 'auto', 'cpu', 'gpu'")
    parser.add_argument("-c", "--control-mode", type=str, default="pd_joint_pos", help="The control mode to use. Note that for new robots being implemented if the _controller_configs is not implemented in the selected robot, we by default provide two default controllers, 'pd_joint_pos' and 'pd_joint_delta_pos' ")
    parser.add_argument("-k", "--keyframe", type=str, help="The name of the keyframe of the robot to display")
    parser.add_argument("--shader", default="default", type=str, help="Change shader used for rendering. Default is 'default' which is very fast. Can also be 'rt' for ray tracing and generating photo-realistic renders. Can also be 'rt-fast' for a faster but lower quality ray-traced renderer")
    parser.add_argument("--keyframe-actions", action="store_true", help="Whether to use the selected keyframe to set joint targets to try and hold the robot in its position")
    parser.add_argument("--random-actions", action="store_true", help="Whether to sample random actions to control the agent. If False, no control signals are sent and it is just rendering.")
    parser.add_argument("--none-actions", action="store_true", help="If set, then the scene and rendering will update each timestep but no joints will be controlled via code. You can use this to control the robot freely via the GUI.")
    parser.add_argument("--zero-actions", action="store_true", help="Whether to send zero actions to the robot. If False, no control signals are sent and it is just rendering.")
    parser.add_argument("--sim-freq", type=int, default=100, help="Simulation frequency")
    parser.add_argument("--control-freq", type=int, default=20, help="Control frequency")
    parser.add_argument(
        "-s",
        "--seed",
        type=int,
        help="Seed the random actions and environment. Default is no seed",
    )
    args = parser.parse_args()

    return args

@dataclass
class Args:
    env_id: Annotated[str, tyro.conf.arg(aliases=["-e"])] = "Empty-v1"
    """Environment ID (e.g. PushCube-v1, Empty-v1)"""

    robot_uids: Annotated[Optional[str], tyro.conf.arg(aliases=["-r"])] = "panda"
    """Robot UID(s). Comma-separated or single string. Default: panda"""

    sim_backend: Annotated[str, tyro.conf.arg(aliases=["-b"])] = "auto"
    """Simulation backend: auto, cpu, gpu"""

    control_mode: Annotated[str, tyro.conf.arg(aliases=["-c"])] = "pd_joint_pos"
    """Control mode (e.g. pd_joint_pos, pd_joint_vel, etc.)"""

    keyframe: Annotated[Optional[str], tyro.conf.arg(aliases=["-k"])] = None
    """Name of keyframe to view"""

    shader: str = "default"
    """Shader used for rendering (default, rt, rt-fast)"""

    keyframe_actions: bool = False
    """Use keyframe to control robot pose"""

    random_actions: bool = False
    """Send random actions each step"""

    none_actions: bool = False
    """Send no actions (manual GUI only)"""

    zero_actions: bool = False
    """Send zero actions"""

    sim_freq: int = 100
    """Simulation frequency"""

    control_freq: int = 20
    """Control frequency"""

    obs_mode: str = "none"
    """Observation mode"""

    reward_mode: Optional[str] = None
    """Reward mode"""

    render_mode: str = "human"
    """Render mode (rgb_array, human, etc.)"""

    pause: Annotated[bool, tyro.conf.arg(aliases=["-p"])] = False
    """Pause viewer on load"""

    record_dir: Optional[str] = None
    """Directory to save recordings"""

    quiet: bool = False
    """Disable verbose output"""

    seed: Annotated[Optional[Union[int, List[int]]], tyro.conf.arg(aliases=["-s"])] = None
    """Random seed or list of seeds"""

    num_envs: Annotated[int, tyro.conf.arg(aliases=["-n"])] = 1
    """Number of environments"""
    plan_actions: bool = False
@register_agent()
class MyPanda(BaseAgent):
    uid = "my_panda"
    urdf_path = "/home/shiqintong/Downloads/wheelchair_description/urdf/inte.urdf"
    keyframes = dict(
        standing=Keyframe(
            # notice how we set the z position to be above 0, so the robot is not intersecting the ground
            pose=sapien.Pose(p=[1.4, -1.5, 0.01], q=R.from_euler("z", 90, degrees=True).as_quat().tolist()),
            qpos=np.array([
                # arm: 7 joints
                -1.75, -0.35, -3.3, 1.4, 3.4, -1.1, 1.4,0.0, 0.0, 0.0, 0.0,
                # gripper: 6 joints
                0.04,  # finger_joint
                0.04, 0.04, 0.04, 0.04, 0.04  # mimic joints
            ])
        )
    )
    urdf_config = dict(
        _materials=dict(
            gripper=dict(static_friction=2.0, dynamic_friction=2.0, restitution=0.0)
        ),
        link=dict(
            left_inner_finger_pad=dict(
                material="gripper", patch_radius=0.1, min_patch_radius=0.1
            ),
            right_inner_finger_pad=dict(
                material="gripper", patch_radius=0.1, min_patch_radius=0.1
            ),
            left_inner_finger=dict(material="gripper"),
            right_inner_finger=dict(material="gripper"),
        ),
    )

    @property
    def _controller_configs(self):
        arm_joint_names = [
            "joint_1", "joint_2", "joint_3",
            "joint_4", "joint_5", "joint_6", "joint_7",
        ]
        gripper_joint_names = ["finger_joint"]
                # ‚úÖ Ê∑ªÂä† wheel joints
        wheel_joint_names = [
            "joint_LF01", "joint_LB01", "joint_RF01", "joint_RB01",
        ]

        arm_pd_joint_pos = PDJointPosControllerConfig(
            arm_joint_names,
            lower=[-3.14] * len(arm_joint_names),
            upper=[3.14] * len(arm_joint_names),
            stiffness=1000,
            damping=100,
            force_limit=100,
            normalize_action=False,
        )
        arm_pd_joint_delta_pos = PDJointPosControllerConfig(
            arm_joint_names,
            lower=[-0.1] * len(arm_joint_names),
            upper=[0.1] * len(arm_joint_names),
            stiffness=1000,
            damping=100,
            force_limit=100,
            use_delta=True,
        )
        gripper_pd_joint_pos = PDJointPosMimicControllerConfig(
            gripper_joint_names,
            lower=[-0.01],
            upper=[0.04],
            stiffness=1000,
            damping=100,
            force_limit=100,
        )
        wheel_pd_joint_pos = PDJointPosControllerConfig(
            wheel_joint_names,
            lower=[-100] * len(wheel_joint_names),
            upper=[100] * len(wheel_joint_names),
            stiffness=1000,
            damping=50,
            force_limit=50,
            normalize_action=False,
        )

        return deepcopy({
            "pd_joint_delta_pos": {
                "arm": arm_pd_joint_delta_pos,
                "gripper": gripper_pd_joint_pos,
                "wheels": wheel_pd_joint_pos,
            },
            "pd_joint_pos": {
                "arm": arm_pd_joint_pos,
                "gripper": gripper_pd_joint_pos,
                "wheels": wheel_pd_joint_pos,
            },
        })


    # wheel_joint_names = [
    #     "joint_LF01",
    #     "joint_LB01",
    #     "joint_RF01",
    #     "joint_RB01"
    # ]
    #     # ËÆæÂÆöÊéßÂà∂ÂèÇÊï∞ÔºàÂèØÊ†πÊçÆÂÆûÈôÖÈúÄË¶Å‰øÆÊîπÔºâ
    # wheel_stiffness = 1000
    # wheel_damping = 100
    # wheel_force_limit = 100



# ‚úÖ ‰∏ªÂáΩÊï∞
def main():
    args = tyro.cli(Args)

    env = gym.make(
        args.env_id,
        obs_mode=args.obs_mode,
        reward_mode=args.reward_mode,
        enable_shadow=True,
        control_mode=args.control_mode,
        robot_uids=args.robot_uids,
        sensor_configs={"shader_pack": args.shader},
        human_render_camera_configs={"shader_pack": args.shader},
        viewer_camera_configs={"shader_pack": args.shader},
        render_mode=args.render_mode,
        sim_config=dict(sim_freq=args.sim_freq, control_freq=args.control_freq),
        sim_backend=args.sim_backend,
    )

    env.reset(seed=args.seed or 0)
    env: BaseEnv = env.unwrapped

    print(f"‚úÖ Selected robot: {args.robot_uids}, control mode: {args.control_mode}")
    print(f"üîë Available keyframes: {list(env.agent.keyframes.keys())}")
    import sapien
    from mani_skill.envs.scene import ManiSkillScene
    from mani_skill.utils.building import URDFLoader
    loader = URDFLoader()
    loader.set_scene(ManiSkillScene())
    robot = loader.load("/home/shiqintong/Downloads/wheelchair_description/urdf/inte.urdf")
    print(robot.active_joints_map.keys())







    # ÂàùÂßãÂåñ‰ΩçÂßø
    kf = None
    if env.agent.keyframes:
        kf_name = args.keyframe or next(iter(env.agent.keyframes))
        kf = env.agent.keyframes[kf_name]
        env.agent.robot.set_pose(kf.pose)
        if kf.qpos is not None:
            env.agent.robot.set_qpos(kf.qpos)
        if kf.qvel is not None:
            env.agent.robot.set_qvel(kf.qvel)
        print(f"üìå Viewing keyframe: {kf_name}")

    if env.gpu_sim_enabled:
        env.scene._gpu_apply_all()
        env.scene.px.gpu_update_articulation_kinematics()
        env.scene._gpu_fetch_all()

    viewer = env.render()
    viewer.paused = args.pause

    # ‚úÖ ËÆæÁΩÆ robot base pose
    from scipy.spatial.transform import Rotation as R
    base_pose = sapien.Pose(
        [1.4, -3.5, 0.01],  # ÂèØÊåâÂÆûÈôÖÊÉÖÂÜµË∞ÉÊï¥
        R.from_euler("z", 90, degrees=True).as_quat().tolist()
    )

    planner = None
    if args.plan_actions:
        from transforms3d.euler import euler2quat
        from transforms3d.euler import mat2euler

        planner = MyPandaMotionPlanningSolver(
            env,
            debug=False,
            vis=True,
            base_pose=env.agent.robot.pose, # ‰Ω†‰πüÂèØÊåáÂÆöÂà´ÁöÑ
            visualize_target_grasp_pose=False,
            print_env_info=True,
            joint_acc_limits=0.5,
            joint_vel_limits=0.5,
        )
    arm_qpos = env.agent.robot.get_qpos()[0, :7].cpu().numpy()
    gripper_state = 0.8
        # ÂàùÂßãÂåñËΩÆÂ≠êÁä∂ÊÄÅ
    wheel_qpos = {
        "joint_LF01": 0.0,
        "joint_LB01": 0.0,
        "joint_RF01": 0.0,
        "joint_RB01": 0.0,
    }

    # Ëé∑ÂèñËΩÆÂ≠ê joint index
    wheel_joint_ids = {
        name: env.agent.robot.active_joints_map[name].index
        for name in wheel_qpos
    }
    while True:
        viewer = env.render()

        if args.random_actions:
            # ÊéßÂà∂Êú∫Ê¢∞ËáÇ
            if viewer.window.key_down("b"):
                arm_qpos[1] += 0.05
            if viewer.window.key_down("n"):
                arm_qpos[1] -= 0.05
            if viewer.window.key_down("z"):
                arm_qpos[2] += 0.05
            if viewer.window.key_down("x"):
                arm_qpos[2] -= 0.05
            if viewer.window.key_down("c"):
                arm_qpos[3] += 0.05
            if viewer.window.key_down("v"):
                arm_qpos[3] -= 0.05
            if viewer.window.key_down("m"):
                arm_qpos[4] += 0.05
            if viewer.window.key_down("f"):
                arm_qpos[4] -= 0.05
            if viewer.window.key_down("g"):
                arm_qpos[5] += 0.05
            if viewer.window.key_down("h"):
                arm_qpos[5] -= 0.05
            if viewer.window.key_down("r"):
                arm_qpos[6] += 0.05
            if viewer.window.key_down("t"):
                arm_qpos[6] -= 0.05
            if viewer.window.key_down("up"):
                arm_qpos[0] += 0.05
            if viewer.window.key_down("down"):
                arm_qpos[0] -= 0.05

            # ÊéßÂà∂Â§πÁà™
            if viewer.window.key_down("o"):
                gripper_state = 0.1
            if viewer.window.key_down("p"):
                gripper_state = 0.0

            # ÊéßÂà∂ËΩÆÂ≠ê
            delta = 0.1
            if viewer.window.key_down("i"):  # ÂâçËøõ
                for k in wheel_qpos:
                    wheel_qpos[k] += delta
            if viewer.window.key_down("k"):  # ÂêéÈÄÄ
                for k in wheel_qpos:
                    wheel_qpos[k] -= delta
            if viewer.window.key_down("j"):  # Â∑¶ËΩ¨
                wheel_qpos["joint_LF01"] -= delta
                wheel_qpos["joint_LB01"] -= delta
                wheel_qpos["joint_RF01"] += delta
                wheel_qpos["joint_RB01"] += delta
            if viewer.window.key_down("l"):  # Âè≥ËΩ¨
                wheel_qpos["joint_LF01"] += delta
                wheel_qpos["joint_LB01"] += delta
                wheel_qpos["joint_RF01"] -= delta
                wheel_qpos["joint_RB01"] -= delta

            # ÊûÑÈÄ†ÊÄªÁöÑ action (7 + 1 + 4 = 12)
            action = np.hstack([
                arm_qpos,               # 7 joints
                [gripper_state],        # 1 gripper joint
                [wheel_qpos["joint_LF01"]],
                [wheel_qpos["joint_LB01"]],
                [wheel_qpos["joint_RF01"]],
                [wheel_qpos["joint_RB01"]],
            ])[None, :]  # shape: (1, 12)
            # else:
            #     print("‚ö†Ô∏è ÂΩìÂâçÊéßÂà∂Âô®‰∏çÊòØ DictControllerÔºåËØ∑ÊâãÂä®ÊãºÊé• actionÔºÅ")
            #     action = np.hstack([
            #         arm_qpos,
            #         gripper_state,
            #         wheel_qpos["joint_LF01"],
            #         wheel_qpos["joint_LB01"],
            #         wheel_qpos["joint_RF01"],
            #         wheel_qpos["joint_RB01"],
            #     ])[None, :]

            env.step(action)
            print("Current qpos:", np.round(arm_qpos, 3))

        elif args.none_actions:
            env.step(None)
        elif args.zero_actions:
            env.step(np.zeros_like(env.action_space.sample()))
        elif args.keyframe_actions:
            if kf is not None:
                if isinstance(env.agent.controller, DictController):
                    env.step(env.agent.controller.from_qpos(kf.qpos))
                else:
                    env.step(kf.qpos)
        elif args.plan_actions:
            target_pose = sapien.Pose([0.0, 0.0, 0.0])
            result = planner.move_to_pose_with_screw(target_pose, dry_run=True)
            if result != -1:
                planner.follow_path(result)

    # while True:
    #         viewer = env.render()

    #         if args.random_actions:
    #             # ÊéßÂà∂Êú∫Ê¢∞ËáÇ
    #             if viewer.window.key_down("b"):
    #                 arm_qpos[1] += 0.05
    #             if viewer.window.key_down("n"):
    #                 arm_qpos[1] -= 0.05

    #             # ÊéßÂà∂Â§πÁà™
    #             if viewer.window.key_down("o"):
    #                 gripper_state = 0.1
    #             if viewer.window.key_down("p"):
    #                 gripper_state = 0.0

    #             # ÊéßÂà∂ËΩÆÂ≠ê
    #             delta = 0.1
    #             if viewer.window.key_down("i"):
    #                 for k in wheel_qpos:
    #                     wheel_qpos[k] += delta
    #             if viewer.window.key_down("k"):
    #                 for k in wheel_qpos:
    #                     wheel_qpos[k] -= delta
    #             if viewer.window.key_down("j"):
    #                 wheel_qpos["joint_LF01"] -= delta
    #                 wheel_qpos["joint_LB01"] -= delta
    #                 wheel_qpos["joint_RF01"] += delta
    #                 wheel_qpos["joint_RB01"] += delta
    #             if viewer.window.key_down("l"):
    #                 wheel_qpos["joint_LF01"] += delta
    #                 wheel_qpos["joint_LB01"] += delta
    #                 wheel_qpos["joint_RF01"] -= delta
    #                 wheel_qpos["joint_RB01"] -= delta

    #             # ÊûÑÈÄ†ÊÄªÁöÑ action
    #             total_qpos = np.zeros(env.agent.robot.dof)

    #             # Êú∫Ê¢∞ËáÇ 7 joints + gripper
    #             total_qpos[:7] = arm_qpos
    #             total_qpos[7] = gripper_state

    #             # ËÆæÁΩÆËΩÆÂ≠ê joint ÁöÑËßíÂ∫¶
    #             for name, value in wheel_qpos.items():
    #                 index = wheel_joint_ids[name]
    #                 total_qpos[index] = value

    #             action = total_qpos[None, :]  # (1, DOF)
    #             env.step(action)

    #         elif args.none_actions:
    #             env.step(None)
    #         elif args.zero_actions:
    #             env.step(np.zeros_like(env.action_space.sample()))
    #         elif args.keyframe_actions:
    #             if kf is not None:
    #                 if isinstance(env.agent.controller, DictController):
    #                     env.step(env.agent.controller.from_qpos(kf.qpos))
    #                 else:
    #                     env.step(kf.qpos)
    #         elif args.plan_actions:
    #             # Á§∫‰æãË∑ØÂæÑËßÑÂàí
    #             target_pose = sapien.Pose([0.0, 0.0, 0.0])
    #             result = planner.move_to_pose_with_screw(target_pose, dry_run=True)
    #             if result != -1:
    #                 planner.follow_path(result)

    # # ‚úÖ ‰∏ªÂæ™ÁéØ
    # while True:
    #     if args.random_actions:
    #         # full_qpos = env.agent.robot.get_qpos()[0].cpu().numpy()

    #         # # Âè™ÂèñÂâç 7 ‰∏™ arm joints
    #         # arm_qpos = full_qpos[:7]

    #         # ÈîÆÁõòÊéßÂà∂ joint_3 ‰∏ä‰∏ãÔºàÁ¥¢Âºï 2Ôºâ
    #         if viewer.window.key_down("b"):
    #             arm_qpos[1] += 0.05  # joint_3 Âêë‰∏ä
    #         if viewer.window.key_down("n"):
    #             arm_qpos[1] -= 0.05  # joint_3 Âêë‰∏ã


    #         if viewer.window.key_down("o"):
    #             gripper_state = 0.1  # fully open
    #         if viewer.window.key_down("p"):
    #             gripper_state = 0.0  # fully closed

    #         # ÊûÑÈÄ† action
    #         if env.control_mode == "pd_joint_pos":
    #             action = np.hstack([arm_qpos, gripper_state])[None, :]  # (1, 8)
    #         elif env.control_mode == "pd_joint_pos_vel":
    #             qvel = np.zeros_like(arm_qpos)
    #             action = np.hstack([arm_qpos, qvel, gripper_state])[None, :]





    #         else:
    #             print("‚ö†Ô∏è ÂΩìÂâçÊéßÂà∂Ê®°ÂºèÊú™Â§ÑÁêÜÔºå‰ΩøÁî®ÈöèÊú∫Âä®‰Ωú‰ª£Êõø")
    #             action = env.action_space.sample()

    #         env.step(action)
    #         # env.step(env.action_space.sample())

    #         # env.step(env.action_space.sample())
    #     elif args.none_actions:
    #         env.step(None)
    #     elif args.zero_actions:
    #         env.step(np.zeros_like(env.action_space.sample()))
    #     elif args.keyframe_actions:
    #         if kf is not None:
    #             if isinstance(env.agent.controller, DictController):
    #                 env.step(env.agent.controller.from_qpos(kf.qpos))
    #             else:
    #                 env.step(kf.qpos)
    #     elif args.plan_actions:
    #         # ‰æãÂ¶ÇÔºöËßÑÂàí‰∏Ä‰∏™ÁõÆÊ†áÊú´Á´Ø‰ΩçÂßø
    #         target_pose = sapien.Pose([0.0, 0.0, 0.0])  # Èöè‰æøÂÜô
    #         result = planner.move_to_pose_with_screw(target_pose, dry_run=True)
    #         if result == -1:
    #             print("Plan failed!")
    #         else:
    #             planner.follow_path(result)

    #         # ËøôÈáåÊºîÁ§∫‰∏ÄÊ¨°Â∞±Â§üÔºåÂ¶ÇÊûú‰Ω†Ë¶ÅÊØèÂ∏ßÂÅö‰∏çÂêåÁöÑËßÑÂàíÔºåÂèØ‰ª•Ëá™Â∑±ÊîπÈÄªËæë

    #     viewer = env.render()

if __name__ == "__main__":
    main()



In [None]:
#‰øÆÊîπ‰πãÂêéÁöÑplanner Âú®mplib‰∏ã ÈóÆÈ¢òÂ∞±ÊòØÊâæÂà∞Âá∫ÈóÆÈ¢òÁöÑÂáΩÊï∞ Êó•Âøó+AI ‰∏ÄÊ≠•‰∏ÄÊ≠•Êé®ÁêÜÂæóÂà∞ÂÖ≥ËäÇÊúâËá™Êíû ‰øÆÊîπsrdfÂéªÊéâËá™Êíû


from __future__ import annotations

import os
from typing import Optional, Sequence

import numpy as np
import toppra as ta
import toppra.algorithm as algo
import toppra.constraint as constraint
from transforms3d.quaternions import mat2quat, quat2mat

from mplib.pymp import ArticulatedModel, PlanningWorld
from mplib.pymp.planning import ompl


class Planner:
    """Motion planner"""

    # TODO(jigu): default joint vel and acc limits
    # TODO(jigu): how does user link names and joint names are exactly used?
    # constructor ankor
    def __init__(
        self,
        urdf: str,
        move_group: str,
        srdf: str = "",
        package_keyword_replacement: str = "",
        user_link_names: Sequence[str] = [],
        user_joint_names: Sequence[str] = [],
        joint_vel_limits: Optional[Sequence[float] | np.ndarray] = None,
        joint_acc_limits: Optional[Sequence[float] | np.ndarray] = None,
        **kwargs,
    ):
        # constructor ankor end
        """Motion planner for robots.

        Args:
            urdf: Unified Robot Description Format file.
            user_link_names: names of links, the order matters.
                If empty, all links will be used.
            user_joint_names: names of the joints to plan.
                If empty, all active joints will be used.
            move_group: target link to move, usually the end-effector.
            joint_vel_limits: maximum joint velocities for time parameterization,
                which should have the same length as
            joint_acc_limits: maximum joint accelerations for time parameterization,
                which should have the same length as
            srdf: Semantic Robot Description Format file.
        References:
            http://docs.ros.org/en/kinetic/api/moveit_tutorials/html/doc/urdf_srdf/urdf_srdf_tutorial.html

        """
        if joint_vel_limits is None:
            joint_vel_limits = []
        if joint_acc_limits is None:
            joint_acc_limits = []
        self.urdf = urdf
        if srdf == "" and os.path.exists(urdf.replace(".urdf", ".srdf")):
            self.srdf = urdf.replace(".urdf", ".srdf")
            print(f"No SRDF file provided but found {self.srdf}")

        # replace package:// keyword if exists
        urdf = self.replace_package_keyword(package_keyword_replacement)

        self.robot = ArticulatedModel(
            urdf,
            srdf,
            [0, 0, -9.81],
            user_link_names,
            user_joint_names,
            convex=True,
            verbose=False,
        )
        self.pinocchio_model = self.robot.get_pinocchio_model()
        self.user_link_names = self.pinocchio_model.get_link_names()
        self.user_joint_names = self.pinocchio_model.get_joint_names()

        self.planning_world = PlanningWorld(
            [self.robot],
            ["robot"],
            kwargs.get("normal_objects", []),
            kwargs.get("normal_object_names", []),
        )

        if srdf == "":
            self.generate_collision_pair()
            self.robot.update_SRDF(self.srdf)

        self.joint_name_2_idx = {}
        for i, joint in enumerate(self.user_joint_names):
            self.joint_name_2_idx[joint] = i
        self.link_name_2_idx = {}
        for i, link in enumerate(self.user_link_names):
            self.link_name_2_idx[link] = i

        assert (
            move_group in self.user_link_names
        ), f"end-effector not found as one of the links in {self.user_link_names}"
        self.move_group = move_group
        self.robot.set_move_group(self.move_group)
        self.move_group_joint_indices = self.robot.get_move_group_joint_indices()

        self.joint_types = self.pinocchio_model.get_joint_types()
        self.joint_limits = np.concatenate(self.pinocchio_model.get_joint_limits())
        self.joint_vel_limits = (
            joint_vel_limits
            if len(joint_vel_limits)
            else np.ones(len(self.move_group_joint_indices))
        )
        self.joint_acc_limits = (
            joint_acc_limits
            if len(joint_acc_limits)
            else np.ones(len(self.move_group_joint_indices))
        )
        self.move_group_link_id = self.link_name_2_idx[self.move_group]
        assert len(self.joint_vel_limits) == len(self.joint_acc_limits), (
            f"length of joint_vel_limits ({len(self.joint_vel_limits)}) =/= "
            f"length of joint_acc_limits ({len(self.joint_acc_limits)})"
        )
        assert len(self.joint_vel_limits) == len(self.move_group_joint_indices), (
            f"length of joint_vel_limits ({len(self.joint_vel_limits)}) =/= "
            f"length of move_group ({len(self.move_group_joint_indices)})"
        )
        assert len(self.joint_vel_limits) <= len(self.joint_limits), (
            f"length of joint_vel_limits ({len(self.joint_vel_limits)}) > "
            f"number of total joints ({len(self.joint_limits)})"
        )

        self.planning_world = PlanningWorld([self.robot], ["robot"], [], [])
        self.planner = ompl.OMPLPlanner(world=self.planning_world)

    def replace_package_keyword(self, package_keyword_replacement):
        """
        some ROS URDF files use package:// keyword to refer the package dir
        replace it with the given string (default is empty)

        Args:
            package_keyword_replacement: the string to replace 'package://' keyword
        """
        rtn_urdf = self.urdf
        with open(self.urdf) as in_f:
            content = in_f.read()
            if "package://" in content:
                rtn_urdf = self.urdf.replace(".urdf", "_package_keyword_replaced.urdf")
                content = content.replace("package://", package_keyword_replacement)
                if not os.path.exists(rtn_urdf):
                    with open(rtn_urdf, "w") as out_f:
                        out_f.write(content)
        return rtn_urdf

    def generate_collision_pair(self, sample_time=1000000, echo_freq=100000):
        """
        We read the srdf file to get the link pairs that should not collide.
        If not provided, we need to randomly sample configurations
        to find the link pairs that will always collide.
        """
        print(
            "Since no SRDF file is provided. We will first detect link pairs that will"
            " always collide. This may take several minutes."
        )
        n_link = len(self.user_link_names)
        cnt = np.zeros((n_link, n_link), dtype=np.int32)
        for i in range(sample_time):
            qpos = self.pinocchio_model.get_random_configuration()
            self.robot.set_qpos(qpos, True)
            collisions = self.planning_world.collide_full()
            for collision in collisions:
                u = self.link_name_2_idx[collision.link_name1]
                v = self.link_name_2_idx[collision.link_name2]
                cnt[u][v] += 1
            if i % echo_freq == 0:
                print("Finish %.1f%%!" % (i * 100 / sample_time))

        import xml.etree.ElementTree as ET
        from xml.dom import minidom

        root = ET.Element("robot")
        robot_name = self.urdf.split("/")[-1].split(".")[0]
        root.set("name", robot_name)
        self.srdf = self.urdf.replace(".urdf", ".srdf")

        for i in range(n_link):
            for j in range(n_link):
                if cnt[i][j] == sample_time:
                    link1 = self.user_link_names[i]
                    link2 = self.user_link_names[j]
                    print(
                        f"Ignore collision pair: ({link1}, {link2}), "
                        "reason: always collide"
                    )
                    collision = ET.SubElement(root, "disable_collisions")
                    collision.set("link1", link1)
                    collision.set("link2", link2)
                    collision.set("reason", "Default")
        with open(self.srdf, "w") as srdf_file:
            srdf_file.write(
                minidom.parseString(ET.tostring(root)).toprettyxml(indent="    ")
            )
            srdf_file.close()
        print("Saving the SRDF file to %s" % self.srdf)

    def distance_6D(self, p1, q1, p2, q2):
        """
        compute the distance between two poses

        Args:
            p1: position of pose 1
            q1: quaternion of pose 1
            p2: position of pose 2
            q2: quaternion of pose 2
        """
        return np.linalg.norm(p1 - p2) + min(
            np.linalg.norm(q1 - q2), np.linalg.norm(q1 + q2)
        )

    def wrap_joint_limit(self, q) -> bool:
        """
        Checks if the joint configuration is within the joint limits.
        For revolute joints, the joint angle is wrapped to be within [q_min, q_min+2*pi)

        Args:
            q: joint configuration, angles of revolute joints might be modified

        Returns:
            True if q can be wrapped to be within the joint limits
        """
        n = len(q)
        flag = True
        for i in range(n):
            if self.joint_types[i].startswith("JointModelR"):
                if -1e-3 <= q[i] - self.joint_limits[i][0] < 0:
                    continue
                q[i] -= (
                    2 * np.pi * np.floor((q[i] - self.joint_limits[i][0]) / (2 * np.pi))
                )
                if q[i] > self.joint_limits[i][1] + 1e-3:
                    flag = False
            else:
                if (
                    q[i] < self.joint_limits[i][0] - 1e-3
                    or q[i] > self.joint_limits[i][1] + 1e-3
                ):
                    flag = False
        return flag

    def pad_qpos(self, qpos, articulation=None):
        """
        if the user does not provide the full qpos but only the move_group joints,
        pad the qpos with the rest of the joints
        """
        if len(qpos) == len(self.move_group_joint_indices):
            tmp = (
                articulation.get_qpos()
                if articulation is not None
                else self.robot.get_qpos()
            )
            tmp[: len(qpos)] = qpos
            qpos = tmp

        assert len(qpos) == len(self.joint_limits), (
            f"length of qpos ({len(qpos)}) =/= "
            f"number of total joints ({len(self.joint_limits)})"
        )

        return qpos

    def check_for_collision(
        self,
        collision_function,
        articulation: Optional[ArticulatedModel] = None,
        qpos: Optional[np.ndarray] = None,
    ) -> list:
        """helper function to check for collision"""
        # handle no user input
        if articulation is None:
            articulation = self.robot
        if qpos is None:
            qpos = articulation.get_qpos()
        qpos = self.pad_qpos(qpos, articulation)

        # first save the current qpos
        old_qpos = articulation.get_qpos()
        # set robot to new qpos
        articulation.set_qpos(qpos, True)
        # find the index of the articulation inside the array
        idx = self.planning_world.get_articulations().index(articulation)
        # check for self-collision
        collisions = collision_function(idx)
        # reset qpos
        articulation.set_qpos(old_qpos, True)
        return collisions

    def check_for_self_collision(
        self,
        articulation: Optional[ArticulatedModel] = None,
        qpos: Optional[np.ndarray] = None,
    ) -> list:
        """Check if the robot is in self-collision.

        Args:
            articulation: robot model. if none will be self.robot
            qpos: robot configuration. if none will be the current pose

        Returns:
            A list of collisions.
        """
        return self.check_for_collision(
            self.planning_world.self_collide, articulation, qpos
        )

    def check_for_env_collision(
        self,
        articulation: Optional[ArticulatedModel] = None,
        qpos: Optional[np.ndarray] = None,
        with_point_cloud=False,
        use_attach=False,
    ) -> list:
        """Check if the robot is in collision with the environment

        Args:
            articulation: robot model. if none will be self.robot
            qpos: robot configuration. if none will be the current pose
            with_point_cloud: whether to check collision against point cloud
            use_attach: whether to include the object attached to the end effector
                in collision checking
        Returns:
            A list of collisions.
        """
        # store previous results
        prev_use_point_cloud = self.planning_world.use_point_cloud
        prev_use_attach = self.planning_world.use_attach
        self.planning_world.set_use_point_cloud(with_point_cloud)
        self.planning_world.set_use_attach(use_attach)

        results = self.check_for_collision(
            self.planning_world.collide_with_others, articulation, qpos
        )

        # restore
        self.planning_world.set_use_point_cloud(prev_use_point_cloud)
        self.planning_world.set_use_attach(prev_use_attach)
        return results

    def IK(self, goal_pose, start_qpos, mask=None, n_init_qpos=20, threshold=1e-3):
        """
        Inverse kinematics

        Args:
            goal_pose: [x,y,z,qw,qx,qy,qz] pose of the goal
            start_qpos: initial configuration
            mask: if the value at a given index is True,
                the joint is *not* used in the IK
            n_init_qpos: number of random initial configurations
            threshold: threshold for the distance between the goal pose and
                the result pose
        """

        if mask is None:
            mask = []
        index = self.link_name_2_idx[self.move_group]
        min_dis = 1e9
        idx = self.move_group_joint_indices
        qpos0 = np.copy(start_qpos)
        results = []
        self.robot.set_qpos(start_qpos, True)
        for _ in range(n_init_qpos):
            ik_results = self.pinocchio_model.compute_IK_CLIK(
                index, goal_pose, start_qpos, mask
            )
            flag = self.wrap_joint_limit(ik_results[0])  # will wrap revolute joints

            # check collision
            self.planning_world.set_qpos_all(ik_results[0][idx])
            if len(self.planning_world.collide_full()) != 0:
                flag = False

            if flag:
                self.pinocchio_model.compute_forward_kinematics(ik_results[0])
                new_pose = self.pinocchio_model.get_link_pose(index)
                tmp_dis = self.distance_6D(
                    goal_pose[:3], goal_pose[3:], new_pose[:3], new_pose[3:]
                )
                if tmp_dis < min_dis:
                    min_dis = tmp_dis
                if tmp_dis < threshold:
                    result = ik_results[0]
                    unique = True
                    for j in range(len(results)):
                        if np.linalg.norm(results[j][idx] - result[idx]) < 0.1:
                            unique = False
                    if unique:
                        results.append(result)
            start_qpos = self.pinocchio_model.get_random_configuration()
            mask_len = len(mask)
            if mask_len > 0:
                for j in range(mask_len):
                    if mask[j]:
                        start_qpos[j] = qpos0[j]
        if len(results) != 0:
            status = "Success"
        elif min_dis != 1e9:
            status = "IK Failed! Distance {:f} is greater than threshold {:f}.".format(
                min_dis,
                threshold,
            )
        else:
            status = "IK Failed! Cannot find valid solution."
        return status, results

    def TOPP(self, path, step=0.1, verbose=False):
        """
        Time-Optimal Path Parameterization

        Args:
            path: numpy array of shape (n, dof)
            step: step size for the discretization
            verbose: if True, will print the log of TOPPRA
        """

        N_samples = path.shape[0]
        dof = path.shape[1]
        assert dof == len(self.joint_vel_limits)
        assert dof == len(self.joint_acc_limits)
        ss = np.linspace(0, 1, N_samples)
        path = ta.SplineInterpolator(ss, path)
        pc_vel = constraint.JointVelocityConstraint(self.joint_vel_limits)
        pc_acc = constraint.JointAccelerationConstraint(self.joint_acc_limits)
        instance = algo.TOPPRA(
            [pc_vel, pc_acc], path, parametrizer="ParametrizeConstAccel"
        )
        jnt_traj = instance.compute_trajectory()
        if jnt_traj is None:
            raise RuntimeError("Fail to parameterize path")
        ts_sample = np.linspace(0, jnt_traj.duration, int(jnt_traj.duration / step))
        qs_sample = jnt_traj(ts_sample)
        qds_sample = jnt_traj(ts_sample, 1)
        qdds_sample = jnt_traj(ts_sample, 2)
        return ts_sample, qs_sample, qds_sample, qdds_sample, jnt_traj.duration

    def update_point_cloud(self, pc, radius=1e-3):
        """
        Args:
            pc: numpy array of shape (n, 3)
            radius: radius of each point. This gives a buffer around each point
                that planner will avoid
        """
        self.planning_world.update_point_cloud(pc, radius)

    def update_attached_tool(self, fcl_collision_geometry, pose, link_id=-1):
        """helper function to update the attached tool"""
        if link_id == -1:
            link_id = self.move_group_link_id
        self.planning_world.update_attached_tool(fcl_collision_geometry, link_id, pose)

    def update_attached_sphere(self, radius, pose, link_id=-1):
        """
        attach a sphere to some link

        Args:
            radius: radius of the sphere
            pose: [x,y,z,qw,qx,qy,qz] pose of the sphere
            link_id: if not provided, the end effector will be the target.
        """
        if link_id == -1:
            link_id = self.move_group_link_id
        self.planning_world.update_attached_sphere(radius, link_id, pose)

    def update_attached_box(self, size, pose, link_id=-1):
        """
        attach a box to some link

        Args:
            size: [x,y,z] size of the box
            pose: [x,y,z,qw,qx,qy,qz] pose of the box
            link_id: if not provided, the end effector will be the target.
        """
        if link_id == -1:
            link_id = self.move_group_link_id
        self.planning_world.update_attached_box(size, link_id, pose)

    def update_attached_mesh(self, mesh_path, pose, link_id=-1):
        """
        attach a mesh to some link

        Args:
            mesh_path: path to the mesh
            pose: [x,y,z,qw,qx,qy,qz] pose of the mesh
            link_id: if not provided, the end effector will be the target.
        """
        if link_id == -1:
            link_id = self.move_group_link_id
        self.planning_world.update_attached_mesh(mesh_path, link_id, pose)

    def set_base_pose(self, pose):
        """
        tell the planner where the base of the robot is w.r.t the world frame

        Args:
            pose: [x,y,z,qw,qx,qy,qz] pose of the base
        """
        self.robot.set_base_pose(pose)

    def set_normal_object(self, name, collision_object):
        """adds or updates a non-articulated collision object in the scene"""
        self.planning_world.set_normal_object(name, collision_object)

    def remove_normal_object(self, name):
        """returns true if the object was removed, false if it was not found"""
        return self.planning_world.remove_normal_object(name)

    def plan_qpos_to_qpos(
        self,
        goal_qposes: list,
        current_qpos,
        time_step=0.1,
        rrt_range=0.1,
        planning_time=1,
        fix_joint_limits=True,
        use_point_cloud=False,
        use_attach=False,
        planner_name="RRTConnect",
        no_simplification=False,
        constraint_function=None,
        constraint_jacobian=None,
        constraint_tolerance=1e-3,
        fixed_joint_indices=None,
        verbose=False,
    ):
        """
        plan a path from a specified joint position to a goal pose

        Args:
            goal_pose: 7D pose of the end-effector [x,y,z,qw,qx,qy,qz]
            current_qpos: current joint configuration (either full or move_group joints)
            mask: mask for IK. When set, the IK will leave certain joints out of
                planning
            time_step: time step for TOPP
            rrt_range: step size for RRT
            planning_time: time limit for RRT
            fix_joint_limits: if True, will clip the joint configuration to be within
                the joint limits
            use_point_cloud: if True, will use the point cloud to avoid collision
            use_attach: if True, will consider the attached tool collision when planning
            planner_name: planner name pick from {"RRTConnect", "RRTstar"}
            no_simplification: if true, will not simplify the path. constraint planning
                does not support simplification
            constraint_function: evals to 0 when constraint is satisfied
            constraint_jacobian: jacobian of constraint_function
            constraint_tolerance: tolerance for constraint_function
            fixed_joint_indices: list of indices of joints that are fixed during
                planning
            verbose: if True, will print the log of OMPL and TOPPRA
        """
        if fixed_joint_indices is None:
            fixed_joint_indices = []
        self.planning_world.set_use_point_cloud(use_point_cloud)
        self.planning_world.set_use_attach(use_attach)
        n = current_qpos.shape[0]
        if fix_joint_limits:
            for i in range(n):
                if current_qpos[i] < self.joint_limits[i][0]:
                    current_qpos[i] = self.joint_limits[i][0] + 1e-3
                if current_qpos[i] > self.joint_limits[i][1]:
                    current_qpos[i] = self.joint_limits[i][1] - 1e-3

        current_qpos = self.pad_qpos(current_qpos)

        self.robot.set_qpos(current_qpos, True)
        collisions = self.planning_world.collide_full()
        if len(collisions) != 0:
            print("Invalid start state!")
            for collision in collisions:
                print(f"{collision.link_name1} and {collision.link_name2} collide!")

        idx = self.move_group_joint_indices

        goal_qpos_ = [goal_qposes[i][idx] for i in range(len(goal_qposes))]

        fixed_joints = set()
        for joint_idx in fixed_joint_indices:
            fixed_joints.add(ompl.FixedJoint(0, joint_idx, current_qpos[joint_idx]))

        assert len(current_qpos[idx]) == len(goal_qpos_[0])
        status, path = self.planner.plan(
            current_qpos[idx],
            goal_qpos_,
            range=rrt_range,
            time=planning_time,
            fixed_joints=fixed_joints,
            planner_name=planner_name,
            no_simplification=no_simplification,
            constraint_function=constraint_function,
            constraint_jacobian=constraint_jacobian,
            constraint_tolerance=constraint_tolerance,
            verbose=verbose,
        )

        if status == "Exact solution":
            if verbose:
                ta.setup_logging("INFO")
            else:
                ta.setup_logging("WARNING")
            times, pos, vel, acc, duration = self.TOPP(path, time_step)
            return {
                "status": "Success",
                "time": times,
                "position": pos,
                "velocity": vel,
                "acceleration": acc,
                "duration": duration,
            }
        else:
            return {"status": "RRT Failed. %s" % status}

    def transform_goal_to_wrt_base(self, goal_pose):
        base_pose = self.robot.get_base_pose()
        base_tf = np.eye(4)
        base_tf[0:3, 3] = base_pose[:3]
        base_tf[0:3, 0:3] = quat2mat(base_pose[3:])
        goal_tf = np.eye(4)
        goal_tf[0:3, 3] = goal_pose[:3]
        goal_tf[0:3, 0:3] = quat2mat(goal_pose[3:])
        goal_tf = np.linalg.inv(base_tf).dot(goal_tf)
        new_goal_pose = np.zeros(7)
        new_goal_pose[:3] = goal_tf[0:3, 3]
        new_goal_pose[3:] = mat2quat(goal_tf[0:3, 0:3])
        return new_goal_pose

    def plan_qpos_to_pose(
        self,
        goal_pose,
        current_qpos,
        mask=None,
        time_step=0.1,
        rrt_range=0.1,
        planning_time=1,
        fix_joint_limits=True,
        use_point_cloud=False,
        use_attach=False,
        wrt_world=True,
        planner_name="RRTConnect",
        no_simplification=False,
        constraint_function=None,
        constraint_jacobian=None,
        constraint_tolerance=1e-3,
        verbose=False,
    ):
        """
        plan from a start configuration to a goal pose of the end-effector

        Args:
            goal_pose: [x,y,z,qw,qx,qy,qz] pose of the goal
            current_qpos: current joint configuration (either full or move_group joints)
            mask: if the value at a given index is True, the joint is *not* used in the
                IK
            time_step: time step for TOPPRA (time parameterization of path)
            rrt_range: step size for RRT
            planning_time: time limit for RRT
            fix_joint_limits: if True, will clip the joint configuration to be within
                the joint limits
            use_point_cloud: if True, will use the point cloud to avoid collision
            use_attach: if True, will consider the attached tool collision when planning
            wrt_world: if true, interpret the target pose with respect to
                the world frame instead of the base frame
            verbose: if True, will print the log of OMPL and TOPPRA
        """
        if mask is None:
            mask = []
        n = current_qpos.shape[0]
        if fix_joint_limits:
            for i in range(n):
                if current_qpos[i] < self.joint_limits[i][0]:
                    current_qpos[i] = self.joint_limits[i][0] + 1e-3
                if current_qpos[i] > self.joint_limits[i][1]:
                    current_qpos[i] = self.joint_limits[i][1] - 1e-3

        if wrt_world:
            goal_pose = self.transform_goal_to_wrt_base(goal_pose)

        # we need to take only the move_group joints when planning
        # idx = self.move_group_joint_indices

        ik_status, goal_qpos = self.IK(goal_pose, current_qpos, mask)
        if ik_status != "Success":
            return {"status": ik_status}

        if verbose:
            print("IK results:")
            for i in range(len(goal_qpos)):
                print(goal_qpos[i])

        # goal_qpos_ = [goal_qpos[i][idx] for i in range(len(goal_qpos))]
        self.robot.set_qpos(current_qpos, True)

        ik_status, goal_qpos = self.IK(goal_pose, current_qpos, mask)
        if ik_status != "Success":
            return {"status": ik_status}

        if verbose:
            print("IK results:")
            for i in range(len(goal_qpos)):
                print(goal_qpos[i])

        return self.plan_qpos_to_qpos(
            goal_qpos,
            current_qpos,
            time_step,
            rrt_range,
            planning_time,
            fix_joint_limits,
            use_point_cloud,
            use_attach,
            planner_name,
            no_simplification,
            constraint_function,
            constraint_jacobian,
            constraint_tolerance,
            verbose=verbose,
        )

    # plan_screw ankor
    def plan_screw(
        self,
        target_pose,
        qpos,
        qpos_step=0.1,
        time_step=0.1,
        use_point_cloud=False,
        use_attach=False,
        wrt_world=True,
        verbose=False,
    ):
        # plan_screw ankor end
        """
        Plan from a start configuration to a goal pose of the end-effector using
        screw motion

        Args:
            target_pose: [x, y, z, qw, qx, qy, qz] pose of the goal
            qpos: current joint configuration (either full or move_group joints)
            qpos_step: size of the random step for RRT
            time_step: time step for the discretization
            use_point_cloud: if True, will use the point cloud to avoid collision
            use_attach: if True, will use the attached tool to avoid collision
            wrt_world: if True, interpret the target pose with respect to the
                world frame instead of the base frame
            verbose: if True, will print the log of TOPPRA
        """
        self.planning_world.set_use_point_cloud(use_point_cloud)
        self.planning_world.set_use_attach(use_attach)
        qpos = self.pad_qpos(qpos.copy())
        self.robot.set_qpos(qpos, True)
        # ÊîæÂú®ËøôÈáå
        print("Joint limits:", self.joint_limits)
        print("Joint limit size:", len(self.joint_limits))
        print("Qpos size:", len(qpos))


        if wrt_world:
            target_pose = self.transform_goal_to_wrt_base(target_pose)

        def pose7D2mat(pose):
            mat = np.eye(4)
            mat[0:3, 3] = pose[:3]
            mat[0:3, 0:3] = quat2mat(pose[3:])
            return mat

        def skew(vec):
            return np.array([
                [0, -vec[2], vec[1]],
                [vec[2], 0, -vec[0]],
                [-vec[1], vec[0], 0],
            ])

        def pose2exp_coordinate(pose: np.ndarray) -> tuple[np.ndarray, float]:
            def rot2so3(rotation: np.ndarray):
                assert rotation.shape == (3, 3)
                if np.isclose(rotation.trace(), 3):
                    return np.zeros(3), 1
                if np.isclose(rotation.trace(), -1):
                    return np.zeros(3), -1e6
                theta = np.arccos((rotation.trace() - 1) / 2)
                omega = (
                    1
                    / 2
                    / np.sin(theta)
                    * np.array([
                        rotation[2, 1] - rotation[1, 2],
                        rotation[0, 2] - rotation[2, 0],
                        rotation[1, 0] - rotation[0, 1],
                    ]).T
                )
                return omega, theta

            omega, theta = rot2so3(pose[:3, :3])
            if theta < -1e5:
                return omega, theta
            ss = skew(omega)
            inv_left_jacobian = (
                np.eye(3) / theta
                - 0.5 * ss
                + (1.0 / theta - 0.5 / np.tan(theta / 2)) * ss @ ss
            )
            v = inv_left_jacobian @ pose[:3, 3]
            return np.concatenate([v, omega]), theta

        self.pinocchio_model.compute_forward_kinematics(qpos)
        ee_index = self.link_name_2_idx[self.move_group]
        current_p = pose7D2mat(self.pinocchio_model.get_link_pose(ee_index))
        target_p = pose7D2mat(target_pose)
        relative_transform = target_p @ np.linalg.inv(current_p)

        omega, theta = pose2exp_coordinate(relative_transform)

        if theta < -1e4:
            return {"status": "screw plan failed."}
        omega = omega.reshape((-1, 1)) * theta

        index = self.move_group_joint_indices
        path = [np.copy(qpos[index])]


        print("Relative theta =", theta)
        print("Initial omega =", omega)
        print("Start qpos =", qpos)
        print(f"Start planning. Initial theta = {theta:.4f}")
        print(f"Initial qpos: {qpos}")
        collisions = self.planning_world.collide_full()
        if collisions:
            print("üîç Initial configuration already in collision!")
            for c in collisions:
                print(f"üî∏ {c.link_name1} ‚õî {c.link_name2}")




        while True:
            self.pinocchio_model.compute_full_jacobian(qpos)
            J = self.pinocchio_model.get_link_jacobian(ee_index, local=False)
            delta_q = np.linalg.pinv(J) @ omega
            delta_q *= qpos_step / (np.linalg.norm(delta_q))
            delta_twist = J @ delta_q

            flag = False
            if np.linalg.norm(delta_twist) > np.linalg.norm(omega):
                ratio = np.linalg.norm(omega) / np.linalg.norm(delta_twist)
                delta_q = delta_q * ratio
                delta_twist = delta_twist * ratio
                flag = True

            qpos += delta_q.reshape(-1)
            omega -= delta_twist

            def check_joint_limit(q):
                n = len(q)
                for i in range(n):
                    if q[i] < self.joint_limits[i][0] - 1e-3:
                        print(f"‚ùó Joint {i} too low: {q[i]:.3f} < {self.joint_limits[i][0]:.3f}")
                        return False
                    if q[i] > self.joint_limits[i][1] + 1e-3:
                        print(f"‚ùó Joint {i} too high: {q[i]:.3f} > {self.joint_limits[i][1]:.3f}")
                        return False
                return True

            # def check_joint_limit(q):
            #     n = len(q)
            #     for i in range(n):
            #         if (
            #             q[i] < self.joint_limits[i][0] - 1e-3
            #             or q[i] > self.joint_limits[i][1] + 1e-3
            #         ):
            #             return False
            #     return True

            within_joint_limit = check_joint_limit(qpos)
            self.planning_world.set_qpos_all(qpos[index])
            collide = self.planning_world.collide()

            if collide:
                print("‚ùå Collision detected!")
            if not within_joint_limit:
                print("‚ùå Joint limit exceeded!")
            if np.linalg.norm(delta_twist) < 1e-4:
                print("‚úÖ Reached convergence, but early exit.")

            if np.linalg.norm(delta_twist) < 1e-4 or collide or not within_joint_limit:
                return {"status": "screw plan failed"}

            path.append(np.copy(qpos[index]))

            if flag:
                if verbose:
                    ta.setup_logging("INFO")
                else:
                    ta.setup_logging("WARNING")
                times, pos, vel, acc, duration = self.TOPP(np.vstack(path), time_step)
                return {
                    "status": "Success",
                    "time": times,
                    "position": pos,
                    "velocity": vel,
                    "acceleration": acc,
                    "duration": duration,
                }