In [1]:
from threading import Thread
from math import pi

import rclpy
from rclpy.callback_groups import ReentrantCallbackGroup
from rclpy.node import Node

from pymoveit2 import MoveIt2
from pymoveit2.robots import ur
from pymoveit2.robots import panda

from geometry_msgs.msg import Pose, Quaternion, Point
import rclpy.wait_for_message
from ur_msgs.srv import SetIO
from ur_commander.srv import VisualizePoses

import numpy as np
from tf_transformations import quaternion_multiply, quaternion_from_euler
import tf_transformations
from geometry_msgs.msg import PoseStamped, Pose, Point, Quaternion
from scipy.spatial.transform import Rotation
from pick_n_place_interfaces.msg import ArucoMarkers, ArucoMarkersArray
from pick_n_place_interfaces.msg import ArucoMarkers, ArucoMarkersArray
from std_msgs.msg import Float64
import json

 

rclpy.init()

# Create node for this example
node = Node("notebook_example")
callback_group = ReentrantCallbackGroup()

moveit2 = MoveIt2(
    node=node,
    joint_names=ur.joint_names(),
    base_link_name=ur.base_link_name(),
    end_effector_name=ur.end_effector_name(),
    group_name=ur.MOVE_GROUP_ARM,
    callback_group=callback_group,
)



print(ur.joint_names())
print(ur.base_link_name())
print(ur.end_effector_name())

# Spin the node in background thread(s) and wait a bit for initialization
executor = rclpy.executors.MultiThreadedExecutor(2)
executor.add_node(node)
executor_thread = Thread(target=executor.spin, daemon=True, args=())
executor_thread.start()
node.create_rate(1.0).sleep()

# Scale down velocity and acceleration of joints (percentage of maximum)
moveit2.max_velocity = 0.2
moveit2.max_acceleration = 0.2
synchronous = True
cancel_after_secs = 0.0
cartesian = False
cartesian_max_step = 0.005
cartesian_fraction_threshold = 0.0
cartesian_jump_threshold = 0.0
cartesian_avoid_collisions = True
# moveit2.planner_id = "RRTConnectkConfigDefault"
moveit2.planner_id = "PTP"

# Add collision objects
moveit2.add_collision_box(
    id="table",
    size=[2.0, 1.0, 0.05],
    position=[0.0, 0.0, -0.025],
    quat_xyzw=[0.0, 0.0, 0.0, 0.0],
)

cli = node.create_client(SetIO, '/io_and_status_controller/set_io')
req = SetIO.Request()

def set_io(state):
    req.fun = 1
    req.pin = 4
    req.state = state
    future = cli.call_async(req)
    return future.result()


['shoulder_pan_joint', 'shoulder_lift_joint', 'elbow_joint', 'wrist_1_joint', 'wrist_2_joint', 'wrist_3_joint']
base_link
tcp


[pick_n_place_interfaces.msg.ArucoMarkers(marker_ids=3, poses=geometry_msgs.msg.Pose(position=geometry_msgs.msg.Point(x=0.9789654298898205, y=-0.0026408003485630938, z=0.03526928436579069), orientation=geometry_msgs.msg.Quaternion(x=-0.05135377793725207, y=0.09928571862857427, z=-0.8602658212854077, w=0.49744130537795367)))]
[pick_n_place_interfaces.msg.ArucoMarkers(marker_ids=3, poses=geometry_msgs.msg.Pose(position=geometry_msgs.msg.Point(x=0.9789873257721884, y=-0.002701508594516188, z=0.03533264084186885), orientation=geometry_msgs.msg.Quaternion(x=-0.0636216761539622, y=0.10156096280382498, z=-0.8597007985166122, w=0.49654022011073756)))]


In [2]:
moveit2.max_velocity = 0.1
moveit2.max_acceleration = 0.1
speed_pub = node.create_publisher(Float64, "/scaled_joint_trajectory_controller/speed_scaling", 10)

# To set speed to 50%
msg = Float64()
msg.data = 0.01
speed_pub.publish(msg)

moveit2._MoveIt2__move_action_goal.request.max_cartesian_speed = 0.01  # m/s


In [3]:
# Add collision objects
moveit2.add_collision_box(
    id="table",
    size=[2.0, 1.0, 0.05],
    position=[0.0, 0.0, -0.025],
    quat_xyzw=[0.0, 0.0, 0.0, 0.0],
)

In [None]:


latest_pose = None  # Store the latest pose
place_list =[]
pick_list = []
objec_length= 0.117
objec_width= 0.023
objec_height= 0.008
tcp_difference = 0.0015
time_step = 1.0

def display_poses(poses: list[Pose], frame_id: str = "base_link") -> None:
    client = node.create_client(VisualizePoses, "/visualize_poses")
    while not client.wait_for_service(timeout_sec=1.0):
        node.get_logger().info("service not available, waiting again...")
    client.call(VisualizePoses.Request(poses=poses, frame_id=frame_id))

def listener_callback(msg:ArucoMarkersArray):
    global latest_pose, pick_list, place_list

    # if latest_pose is None:
    #     latest_pose = msg.aruco_array[0].poses  # Extract the Pose part
    #     node.get_logger().info(f'Received Pose: Position: {latest_pose.position}, Orientation: {latest_pose.orientation}')
    if not pick_list:
        pick_list = msg.aruco_array
        latest_pose = msg.aruco_array[0].poses
        node.get_logger().info(f"Received {len(msg.aruco_array)} arcuo tags.")
        print(pick_list)

subscription = node.create_subscription(
    ArucoMarkersArray,  # Message type
    'transformed_aruco_pose',  # Topic name
    listener_callback,  # The callback function to be called on each message
    qos_profile=10  # Quality of service profile
)


[INFO] [1749062902.208728362] [notebook_example]: Received 1 arcuo tags.


In [5]:

def transform_pose_between_planes(source_plane_pose: Pose, target_plane_pose: Pose, relative_pose: Pose) -> Pose:
    # Source plane: get transform matrix
    trans_source = tf_transformations.translation_matrix([
        source_plane_pose.position.x,
        source_plane_pose.position.y,
        source_plane_pose.position.z
    ])
    rot_source = tf_transformations.quaternion_matrix([
        source_plane_pose.orientation.x,
        source_plane_pose.orientation.y,
        source_plane_pose.orientation.z,
        source_plane_pose.orientation.w
    ])
    tf_source = tf_transformations.concatenate_matrices(trans_source, rot_source)
    tf_source_inv = tf_transformations.inverse_matrix(tf_source)

    # Relative pose (in source frame)
    trans_local = tf_transformations.translation_matrix([
        relative_pose.position.x,
        relative_pose.position.y,
        relative_pose.position.z
    ])
    rot_local = tf_transformations.quaternion_matrix([
        relative_pose.orientation.x,
        relative_pose.orientation.y,
        relative_pose.orientation.z,
        relative_pose.orientation.w
    ])
    tf_local = tf_transformations.concatenate_matrices(trans_local, rot_local)

    # Target plane
    trans_target = tf_transformations.translation_matrix([
        target_plane_pose.position.x,
        target_plane_pose.position.y,
        target_plane_pose.position.z
    ])
    rot_target = tf_transformations.quaternion_matrix([
        target_plane_pose.orientation.x,
        target_plane_pose.orientation.y,
        target_plane_pose.orientation.z,
        target_plane_pose.orientation.w
    ])
    tf_target = tf_transformations.concatenate_matrices(trans_target, rot_target)

    # Final transform: target * inverse(source) * local
    tf_world = tf_transformations.concatenate_matrices(tf_target, tf_source_inv, tf_local)

    # Extract result
    translation = tf_transformations.translation_from_matrix(tf_world)
    quaternion = tf_transformations.quaternion_from_matrix(tf_world)

    result_pose = Pose()
    result_pose.position.x = translation[0]
    result_pose.position.y = translation[1]
    result_pose.position.z = translation[2]
    result_pose.orientation.x = quaternion[0]
    result_pose.orientation.y = quaternion[1]
    result_pose.orientation.z = quaternion[2]
    result_pose.orientation.w = quaternion[3]

    return result_pose

def load_design_data(filepath):
    with open(filepath, 'r') as file:
        data = json.load(file)
    return data['aruco_array']

def extract_pose(pose_dict):
    pose = Pose()
    pose.position.x = pose_dict['position']['x'] / 1000.0
    pose.position.y = pose_dict['position']['y'] / 1000.0
    pose.position.z = pose_dict['position']['z'] / 1000.0
    pose.orientation.x = pose_dict['orientation']['x']
    pose.orientation.y = pose_dict['orientation']['y']
    pose.orientation.z = pose_dict['orientation']['z']
    pose.orientation.w = pose_dict['orientation']['w']
    return pose

def process_aruco_data(aruco_data,source_frame, target_frame):
    pose_list = []
    aruco_marker_list = []

    for item in aruco_data:
        marker_id = item['marker_ids']
        pose = extract_pose(item['poses'])
        transformed_pose = transform_pose_between_planes(source_frame, target_frame, pose)
        pose_list.append(transformed_pose)

        aruco_marker = ArucoMarkers()
        aruco_marker.marker_ids = marker_id
        aruco_marker.poses = transformed_pose
        aruco_marker_list.append(aruco_marker)

    return pose_list, aruco_marker_list



In [None]:
# Source frame
design_source = Pose()
design_source.orientation.w = 1.0

# Target frame
design_target = Pose()

design_target.position.x = 0.475
design_target.position.y = 0.09
design_target.position.z = 0.0016 - tcp_difference

roll, pitch, yaw = 0.0, 0.0, -1.5708  # Z-axis

qx, qy, qz, qw = tf_transformations.quaternion_from_euler(roll, pitch, yaw)

design_target.orientation.x = qx
design_target.orientation.y = qy
design_target.orientation.z = qz
design_target.orientation.w = qw



json_path = '/dev_ws/src/design/design.json'

design_data = load_design_data(json_path)

design_pose_list, design_aruco_marker_list = process_aruco_data(design_data, design_source, design_target)

display_poses(design_pose_list)

[INFO] [1749062899.273841773] [notebook_example]: Received 1 arcuo tags.


In [None]:
print(design_pose_list)

In [8]:
num = 0
for i in design_aruco_marker_list:
    # Add collision objects
    moveit2.add_collision_box(
        id=f"pick_target{num}",
        size=[objec_length, objec_width, objec_height],
        position=[i.poses.position.x, i.poses.position.y, (i.poses.position.z - objec_height/2)],
        quat_xyzw=i.poses.orientation,
    )
    num+=1
    print('add collision successful')
    print(f"pick_target{num}")




add collision successful
pick_target1
add collision successful
pick_target2
add collision successful
pick_target3
add collision successful
pick_target4
add collision successful
pick_target5
add collision successful
pick_target6
add collision successful
pick_target7
add collision successful
pick_target8
add collision successful
pick_target9
add collision successful
pick_target10
add collision successful
pick_target11
add collision successful
pick_target12
add collision successful
pick_target13
add collision successful
pick_target14
add collision successful
pick_target15
add collision successful
pick_target16
add collision successful
pick_target17
add collision successful
pick_target18
add collision successful
pick_target19
add collision successful
pick_target20
add collision successful
pick_target21
add collision successful
pick_target22
add collision successful
pick_target23
add collision successful
pick_target24
add collision successful
pick_target25
add collision successful
pick_targ

**Define Joint Position**   

Forward kinematics, a trajectory will be planned

In [9]:

# Define a safe joint configuration
safe_joints = [0.0, -1.57, 1.57, -1.57, -1.57, 0.0]  # Adjust these values

# Move to the safe state
safe_traj = moveit2.move_to_configuration(safe_joints)
if safe_traj is not None:
    moveit2.execute(safe_traj)
    moveit2.wait_until_executed()


**Define a Target Pose**    

Display the pose

In [10]:
# test pick and place single piece
place_target_num = 0

place_target = design_aruco_marker_list[place_target_num].poses
print(place_target)

latest_pose= pick_list[0].poses
original_orientation = latest_pose.orientation
original_quat = [original_orientation.x, original_orientation.y, original_orientation.z, original_orientation.w]
flip_quat = quaternion_from_euler(0, np.pi, 0)  # (Roll, Pitch, Yaw)
new_quat = quaternion_multiply(original_quat, flip_quat)
z_flip_quat = quaternion_from_euler(0, 0, np.pi/2)  # (Roll, Pitch, Yaw)
final_quat = quaternion_multiply(new_quat, z_flip_quat)

pick_target = Pose(
            position=Point(
                x=latest_pose.position.x - 0.00,
                y=latest_pose.position.y - 0.003,
                z=latest_pose.position.z - tcp_difference
            ),
            # orientation = pose_subscriber.latest_pose.orientation
            orientation=Quaternion(
                x=final_quat[0],
                y=final_quat[1],
                z=final_quat[2],
                w=final_quat[3]
            )
)


display_poses([pick_target,place_target])

pick_safe_in_n_out = Pose(
    position = Point(
        x=latest_pose.position.x,
        y=latest_pose.position.y,
        z=latest_pose.position.z + 0.3
    ),

    orientation = pick_target.orientation
)

place_safe_in_n_out = Pose(
    position = Point(
        x=place_target.position.x,
        y=place_target.position.y,
        z=place_target.position.z + 0.3
    ),

    orientation = place_target.orientation
)

display_poses([pick_safe_in_n_out, pick_target, pick_safe_in_n_out,place_safe_in_n_out, place_target,place_safe_in_n_out])

# time_step = 0.15

geometry_msgs.msg.Pose(position=geometry_msgs.msg.Point(x=0.47500073453914027, y=0.2899999999986511, z=0.008100000000000001), orientation=geometry_msgs.msg.Quaternion(x=-0.5034981159480717, y=0.8639963236245524, z=0.0, w=0.0))


In [12]:


# Define a safe joint configuration
safe_joints = [0.0, -1.57, 1.57, -1.57, -1.57, 0.0]  # Adjust these values

# Move to the safe state
safe_traj = moveit2.move_to_configuration(safe_joints)
if safe_traj is not None:
    moveit2.execute(safe_traj)
    moveit2.wait_until_executed()



In [13]:
###### home to saft in
moveit2.planner_id = "PTP"

target = pick_safe_in_n_out
traj = moveit2.move_to_pose(
    position=target.position,
    quat_xyzw=target.orientation,
    cartesian=cartesian,
    cartesian_max_step=cartesian_max_step,
    cartesian_fraction_threshold=cartesian_fraction_threshold,
)

if traj is not None:
    # # Manually adjust time_from_start to slow down the trajectory
    #   # seconds between each point
    # for i, point in enumerate(traj.points):
    #     total_time = rclpy.time.Duration(seconds=time_step * i)
    #     point.time_from_start = total_time.to_msg()

    node.get_logger().info("Move to target with time-scaled trajectory.")
    moveit2.execute(traj)
    success = moveit2.wait_until_executed()
    print(success)



[INFO] [1749062933.483715240] [notebook_example]: Move to target with time-scaled trajectory.


True


In [14]:
###### saft in to pick target

moveit2.planner_id = "LIN"

target = pick_target
traj = moveit2.move_to_pose(
    position=target.position,
    quat_xyzw=target.orientation,
    cartesian=True,
    cartesian_max_step=cartesian_max_step,
    cartesian_fraction_threshold=cartesian_fraction_threshold,
)

if traj is not None:
#     # Manually adjust time_from_start to slow down the trajectory
#  # seconds between each point
#     for i, point in enumerate(traj.points):
#         total_time = rclpy.time.Duration(seconds=time_step * i)
#         point.time_from_start = total_time.to_msg()

    node.get_logger().info("Move to target with time-scaled trajectory.")
    moveit2.execute(traj)
    success = moveit2.wait_until_executed()
    print(success)

# set_io(1.0)
# set_io(0.0)

[INFO] [1749062942.867064997] [notebook_example]: Move to target with time-scaled trajectory.


True


In [15]:
set_io(1.0)

In [16]:
###### pick target to saft out

moveit2.planner_id = "LIN"

target = pick_safe_in_n_out
traj = moveit2.move_to_pose(
    position=target.position,
    quat_xyzw=target.orientation,
    cartesian=True,
    cartesian_max_step=cartesian_max_step,
    cartesian_fraction_threshold=cartesian_fraction_threshold,
)

if traj is not None:
    # Manually adjust time_from_start to slow down the trajectory
#  # seconds between each point
#     for i, point in enumerate(traj.points):
#         total_time = rclpy.time.Duration(seconds=time_step * i)
#         point.time_from_start = total_time.to_msg()

    node.get_logger().info("Move to target with time-scaled trajectory.")
    moveit2.execute(traj)
    success = moveit2.wait_until_executed()
    print(success)

[INFO] [1749062950.930151619] [notebook_example]: Move to target with time-scaled trajectory.


True


In [17]:
###### pick saft out to place saft in

moveit2.planner_id = "PTP"

target = place_safe_in_n_out
traj = moveit2.move_to_pose(
    position=target.position,
    quat_xyzw=target.orientation,
    cartesian=cartesian,
    cartesian_max_step=cartesian_max_step,
    cartesian_fraction_threshold=cartesian_fraction_threshold,
)

if traj is not None:
    # Manually adjust time_from_start to slow down the trajectory
#  # seconds between each point
#     for i, point in enumerate(traj.points):
#         total_time = rclpy.time.Duration(seconds=time_step * i)
#         point.time_from_start = total_time.to_msg()

    node.get_logger().info("Move to target with time-scaled trajectory.")
    moveit2.execute(traj)
    success = moveit2.wait_until_executed()
    print(success)

[INFO] [1749062954.367578443] [notebook_example]: Move to target with time-scaled trajectory.


True


In [18]:
###### place saft in to place target

moveit2.planner_id = "LIN"

target = place_target
traj = moveit2.move_to_pose(
    position=target.position,
    quat_xyzw=target.orientation,
    cartesian=True,
    cartesian_max_step=cartesian_max_step,
    cartesian_fraction_threshold=cartesian_fraction_threshold,
)

if traj is not None:
    # Manually adjust time_from_start to slow down the trajectory
# # seconds between each point
#     for i, point in enumerate(traj.points):
#         total_time = rclpy.time.Duration(seconds=time_step * i)
#         point.time_from_start = total_time.to_msg()

    node.get_logger().info("Move to target with time-scaled trajectory.")
    moveit2.execute(traj)
    success = moveit2.wait_until_executed()
    print(success)
# set_io(1.0)
set_io(0.0)

[INFO] [1749062966.108051900] [notebook_example]: Move to target with time-scaled trajectory.


True


In [19]:
###### place target to saft out

moveit2.planner_id = "LIN"

target = place_safe_in_n_out
traj = moveit2.move_to_pose(
    position=target.position,
    quat_xyzw=target.orientation,
    cartesian=True,
    cartesian_max_step=cartesian_max_step,
    cartesian_fraction_threshold=cartesian_fraction_threshold,
)

if traj is not None:
    # Manually adjust time_from_start to slow down the trajectory
# # seconds between each point
#     for i, point in enumerate(traj.points):
#         total_time = rclpy.time.Duration(seconds=time_step * i)
#         point.time_from_start = total_time.to_msg()

    node.get_logger().info("Move to target with time-scaled trajectory.")
    moveit2.execute(traj)
    success = moveit2.wait_until_executed()
    print(success)

[INFO] [1749062970.300400725] [notebook_example]: Move to target with time-scaled trajectory.


True


In [20]:


# Define a safe joint configuration
safe_joints = [0.0, -1.57, 1.57, -1.57, -1.57, 0.0]  # Adjust these values

# Move to the safe state
safe_traj = moveit2.move_to_configuration(safe_joints)
if safe_traj is not None:
    moveit2.execute(safe_traj)
    moveit2.wait_until_executed()
