In [None]:
from threading import Thread

import rclpy
from rclpy.callback_groups import ReentrantCallbackGroup
from rclpy.node import Node
from commander_py import commander
from commander_py import commander_utils
from geometry_msgs.msg import Pose, Point, Quaternion
from ur_commander.srv import VisualizePoses

# Initialize rclpy, handling already initialized case
try:
    rclpy.init()
except RuntimeError:
    pass

node = Node("ex_pose_goal")
callback_group = ReentrantCallbackGroup()
commander = commander.Commander(
    node=node,
    callback_group=callback_group,
    move_group="ur_manipulator",
    end_effector_frame=["camera_color_optical_frame", "tcp_ee"],
)
executor = rclpy.executors.MultiThreadedExecutor(2)
executor.add_node(node)
executor_thread = Thread(target=executor.spin, daemon=True)
executor_thread.start()
node.create_rate(1.0).sleep()

In [None]:
def display_poses(poses: list[Pose], frmae_id: str = "base_link") -> None:

    client = node.create_client(VisualizePoses, "/commander_viz/pose_visualization")
    while not client.wait_for_service(timeout_sec=3.0):
        node.get_logger().info("Waiting for /commander_viz/pose_visualization service...")
    client.call_async(VisualizePoses.Request(poses=poses, frame_id=frmae_id))

In [None]:
target = Pose(
    position=Point(x=0.5, y=0.0, z=0.5), orientation=Quaternion(x=0.0, y=0.0, z=0.0, w=1.0)
)

success = display_poses([target], "base_link")
if success:
    node.get_logger().info("Pose visualization request sent successfully.")

In [None]:
target = commander_utils.rotate_pose_euler(target, euler_deg=(0, 0, 270))
target = commander_utils.rotate_pose_euler(target, euler_deg=(180, 0, 0))
target_1 = commander_utils.translate_pose(target, translation=(0.0, 0.4, 0.0))
success = display_poses([target, target_1], "base_link")

In [None]:
joint_values = [0.0, -1.57, 1.57, -1.57, -1.57, 0.0]
goal = commander.set_joint_goal(joint_values=joint_values, ee_link="tcp_ee")

In [None]:
goal = commander.set_pose_goal(pose=target, frame_id="base_link", ee_link="tcp_ee")
print(goal)

In [None]:
traj = commander.plan(
    # pose_goal=goal,
    joint_goal=goal,
    planner_id="PTP",
    pipeline_id="pilz_industrial_motion_planner",
    ee_frame="tcp_ee",
    acc_scale=0.2,
    vel_scale=0.2,
)

In [None]:
success = commander.execute_trajectory(traj, wait_until_executed=True)

In [None]:
commander.add_collision_object(
    object_id="box",
    pose=target,
    object_type=1,  # BOX
    dimensions=(0.1, 0.1, 0.01),
    frame_id="base_link",
)

In [None]:
commander.attach_collision_object(
    object_id="box",
    link_name="tool0",
    touch_links=["tool0", "suction_cup_frame"],
    weight=1.0,
)

In [None]:
sequence_goals = [
    commander.set_pose_goal(pose=target, frame_id="base_link", ee_link="tcp_ee"),
    commander.set_pose_goal(pose=target_1, frame_id="base_link", ee_link="tcp_ee"),
    commander.set_joint_goal(joint_values=joint_values),
]
traj_sequence = commander.plan_sequence(
    pose_goals=sequence_goals,
    pipeline_id="pilz_industrial_motion_planner",
    planner_ids=["PTP", "LIN", "PTP"],
    ee_frame="tcp_ee",
    acc_scale=0.2,
    vel_scale=0.2,
)

In [None]:
success = commander.execute_trajectory(traj_sequence, wait_until_executed=True)

In [None]:
commander.detach_collision_object()

In [None]:
commander.remove_collision_object(object_id="box")