In [1]:
import numpy as np
import time
import sys
import os
from sklearn.neighbors import NearestNeighbors
from xarm.wrapper import XArmAPI

SDK_VERSION: 1.17.0


In [2]:
c_1 = 0.001
#c_2 = 0.9
c_2 = 0.1

JOINT_LIMITS_DEG = [
    (360, 360),         # Joint 1
    (150, 150),         # Joint 2
    (-3.5, 300),        # Joint 3
    (360, 360),         # Joint 4
    (124, 124),         # Joint 5
    (360, 360),         # Joint 6
]
JOINT_LIMITS_DEG = np.deg2rad(JOINT_LIMITS_DEG)

def solve_for_final_joints(end_effector_xyz, target_xyz, current_joint_state,
                          forward_kinematics_func, threshold=1e-6, max_iter=100,
                          joint_limits=None, num_restarts=3, max_reach=0.44):
    if joint_limits is None:
        joint_limits = JOINT_LIMITS_DEG

    # cjheck if target is reachable and project onto workspace if needed
    target_distance = np.linalg.norm(target_xyz)
    original_target = target_xyz.copy()

    if target_distance > max_reach: # gotta make sure we are within reach, otherwise it will cause problems!
        safety_margin = 0.02  # 2cm safety margin
        target_xyz = target_xyz * (max_reach - safety_margin) / target_distance
        print(f"CRITICAL: Target distance ({target_distance:.3f}m) exceeds max reach ({max_reach}m)")

    def objective_function(joint_positions):
        ee_pos = forward_kinematics_func(joint_positions)

        diff = ee_pos - target_xyz
        return 0.5 * np.dot(diff, diff)

    def optimize_from_start(x0):
        xk = np.array(x0, dtype=float)
        hk = np.eye(len(x0))
        gk = None 

        def clamp_to_limits(joint_positions):
            clamped = joint_positions.copy()

            for i in range(min(len(clamped), len(joint_limits))):
                clamped[i] = np.clip(clamped[i], joint_limits[i][0], joint_limits[i][1])

            return clamped

        def gradient_fd(f, x, epsilon=1e-5):
            g = np.zeros_like(x)
            f_curr = f(x)

            for i in range(len(x)):
                x_p = x.copy()
                x_p[i] += epsilon
                g[i] = (f(x_p) - f_curr) / epsilon

            return g

        def line_search(phi, a_max, max_iter_ls):
            alpha = a_max
            phi_0 = phi(0)

            # leverage only armijo condition for simple backtracking
            # Armijo: phi(alpha) <= phi(0) + c_1 * alpha * D(0)
            for _ in range(max_iter_ls):
                phi_alpha = phi(alpha)

                if phi_alpha < phi_0:
                    return alpha
                alpha *= 0.5

                if alpha < 1e-6:
                    return 1e-6

            return alpha

        def bfgs_hessian(hk, sk, yk):
            yk_dot_sk = np.dot(yk, sk)

            # check for matrix singularity or negative curvature
            if abs(yk_dot_sk) < 1e-10:
                return hk
        
            pk = 1.0 / yk_dot_sk
            I = np.eye(len(sk))
            a = I - pk * np.outer(sk, yk)
            b = I - pk * np.outer(yk, sk)
            c = pk * np.outer(sk, sk)

            return a @ hk @ b + c

        # quasi-newton optimization loop
        prev_f = float('inf')
        stagnation_count = 0

        for iteration in range(max_iter):
            # compute or use cached gradient
            if gk is None:
                gk = gradient_fd(objective_function, xk)
            g_norm = np.linalg.norm(gk)

            f_current = objective_function(xk)

            # error in meters
            position_error = np.sqrt(2 * f_current)

            if position_error < threshold:
                break

            if abs(prev_f - f_current) < 1e-10:
                stagnation_count += 1
                if stagnation_count > 5:
                    # reset Hessian if stuck
                    hk = np.eye(len(xk))
                    stagnation_count = 0
            else:
                stagnation_count = 0
            prev_f = f_current

            pk = -hk @ gk

            def phi(alpha):
                candidate = clamp_to_limits(xk + alpha * pk)
                return objective_function(candidate)

            alpha = line_search(phi, a_max=1.0, max_iter_ls=20)

            xk_old = xk
            gk_old = gk
            xk_new = xk + alpha * pk

            xk = clamp_to_limits(xk_new)

            gk = gradient_fd(objective_function, xk)

            sk = xk - xk_old
            yk = gk - gk_old

            if np.linalg.norm(sk) > 1e-10: # update only if we moved
                hk = bfgs_hessian(hk, sk, yk)

        return xk, objective_function(xk)

    best_joints, best_error = optimize_from_start(current_joint_state)

    # if error is too large (> 1cm), try random restarts
    if best_error > 0.01**2 / 2 and num_restarts > 0:
        for _ in range(num_restarts):
            # gen random starting point near current position
            random_start = current_joint_state.copy()
            num_joints = min(len(random_start), len(joint_limits))
            for i in range(num_joints):
                # add random perturbation within +/-30 degrees
                perturbation = np.random.uniform(-np.pi/6, np.pi/6)
                random_start[i] = np.clip(
                    random_start[i] + perturbation,
                    joint_limits[i][0],
                    joint_limits[i][1]
                )

            candidate_joints, candidate_error = optimize_from_start(random_start)

            if candidate_error < best_error:
                best_joints = candidate_joints
                best_error = candidate_error

                if best_error < 0.001**2 / 2:  # termination threshold for good soln
                    break

    return best_joints

In [3]:
ROBOT_IP = "192.168.1.161" 
USE_RADIANS = True
NUM_JOINTS = 6 

JOINT_LIMITS_DEG = [
    (-360, 360),         # Joint 1
    (-150, 150),         # Joint 2
    (-3.5, 300),        # Joint 3
    (-360, 360),         # Joint 4
    (-124, 124),         # Joint 5
    (-360, 360),         # Joint 6
]
JOINT_LIMITS = np.deg2rad(JOINT_LIMITS_DEG)
print(JOINT_LIMITS)


[[-6.28318531  6.28318531]
 [-2.61799388  2.61799388]
 [-0.06108652  5.23598776]
 [-6.28318531  6.28318531]
 [-2.16420827  2.16420827]
 [-6.28318531  6.28318531]]


In [4]:
arm = None
if XArmAPI is not None:
    arm = XArmAPI(ROBOT_IP, is_radian=USE_RADIANS)
    
    if arm.connected:
        arm.motion_enable(enable=True)
        arm.set_mode(0)  
        arm.set_state(state=0)  
    else:
        print(f"FAILED TO CONNECT TO ROBOT {ROBOT_IP}")

ROBOT_IP: 192.168.1.161, VERSION: v1.11.4, PROTOCOL: V1, DETAIL: 6,9,LI1003,DL1000,v1.11.4, TYPE1300: [0, 0]
change protocol identifier to 3


In [5]:
def forward_kinematics(joint_angles, num_joints=6):
    if isinstance(joint_angles, np.ndarray):
        state_list = joint_angles.tolist()
    else:
        state_list = list(joint_angles)

    while len(state_list) < 7:
        state_list.append(0.0)
    
    code, result = arm.get_forward_kinematics(state_list[:num_joints], input_is_radian=True)
    
    if code == 0 and result:
        position = np.array(result[:3]) / 1000.0
        return position
    else:
        raise RuntimeError(f"FWD KINEMATICS FAILED: {code}")

def get_current_joint_angles():
    code, angles = arm.get_servo_angle(is_radian=True)
    if code == 0:
        return np.array(angles[:NUM_JOINTS])
    else:
        return None

def is_state_in_collision(state):
    state = np.array(state)
    if len(state) != NUM_JOINTS:
        return True
    
    for i in range(min(len(state), len(JOINT_LIMITS))): # check against joint limits
        if state[i] < JOINT_LIMITS[i][0] or state[i] > JOINT_LIMITS[i][1]:
            return True
    
    # use sdk joint limit checking aswell
    try:
        state_list = state.tolist()
        while len(state_list) < 7:
            state_list.append(0.0)
        code, limit = arm.is_joint_limit(state_list[:6], is_radian=True)
        if code == 0 and limit:
            return True
    except:
        pass
    
    # ensure that  position is reachable
    try:
        state_list = state.tolist()
        while len(state_list) < 7:
            state_list.append(0.0)
        code, pose = arm.get_forward_kinematics(state_list[:6], input_is_radian=True)
        if code != 0:
            print("FORWARD KINEMATICS CHECK FAILED")
            return True  # fwd kinematics failed
    except:
        pass
    
    return False

def sample_feasible_state():
    state = []
    for i in range(NUM_JOINTS):
        if i < len(JOINT_LIMITS):
            low, high = JOINT_LIMITS[i]
            state.append(np.random.uniform(low, high))
        else:
            state.append(np.random.uniform(-np.pi, np.pi))
    
    return np.array(state)

def move_to_state(state, speed=0.20, wait=True):
    if arm is None or not arm.connected:
        return -1
    
    try:
        state_list = state.tolist() if isinstance(state, np.ndarray) else list(state)
        while len(state_list) < 7:
            state_list.append(0.0) # pad to 7 joints for xarm sdk
        
        code = arm.set_servo_angle(
            angle=state_list[:6],
            speed=speed,
            is_radian=True,
            wait=wait
        )
        return code
    except Exception as e:
        print(f"Error moving robot: {e}")
        return -1

In [6]:
class Vertex:
    def __init__(self, state):
        self.state = np.array(state)
        self.parent = None
        self.cost = 0.0 

class VertexListKDTree:
    def __init__(self, dim):
        self.dim = dim
        self.vertices = []
        self.nn = NearestNeighbors(n_neighbors=1, algorithm='kd_tree', metric='euclidean')
        self.states = None
        
    def add_vertex(self, vertex):
        self.vertices.append(vertex)
        if len(self.vertices) > 0:
            self.states = np.array([v.state for v in self.vertices])
            if len(self.vertices) == 1:
                self.nn.fit(self.states.reshape(1, -1))
            else:
                self.nn.fit(self.states)
        return vertex
    
    def get_closest_vertex_in_graph(self, state):
        if len(self.vertices) == 0:
            return None, float('inf')
        
        state = np.array(state).reshape(1, -1)
        distances, indices = self.nn.kneighbors(state)
        closest_idx = indices[0][0]
        closest_vertex = self.vertices[closest_idx]
        closest_distance = distances[0][0]
        
        return closest_vertex, closest_distance
    
    def get_k_nearest_vertices(self, state, k):
        if len(self.vertices) == 0:
            return [], []
        
        k = min(k, len(self.vertices))
        state = np.array(state).reshape(1, -1)
        
        # temp set n_neighbors for this query
        nn_temp = NearestNeighbors(n_neighbors=k, algorithm='kd_tree', metric='euclidean')
        nn_temp.fit(self.states)
        
        distances, indices = nn_temp.kneighbors(state)
        
        near_vertices = [self.vertices[i] for i in indices[0]]
        near_distances = distances[0].tolist()
        
        return near_vertices, near_distances

def distance(x, y):
    return np.linalg.norm(np.array(x) - np.array(y))

def propagate_cost_to_descendants(vertex, vertex_list):
    for v in vertex_list.vertices:
        if v.parent == vertex:
            old_cost = v.cost
            v.cost = vertex.cost + distance(vertex.state, v.state)
            if abs(v.cost - old_cost) > 1e-10:  # propagate if cost changed
                propagate_cost_to_descendants(v, vertex_list)

def steer(from_state, to_state, step_size=0.1):
    from_state = np.array(from_state)
    to_state = np.array(to_state)
    dir_vec = to_state - from_state
    norm = np.linalg.norm(dir_vec)
    if norm < step_size:
        return to_state
    return from_state + (dir_vec / norm) * step_size

def collision_free_path(from_state, to_state, num_checks=5):
    for alpha in np.linspace(0, 1, num_checks):
        interp = from_state + alpha * (to_state - from_state)
        if is_state_in_collision(interp):
            return False
    return True

def rrt_star_plan(start, goal, max_iters=1000, step_size=0.1, goal_radius=0.1, 
                  rewire_radius=0.5, collision_checks=3):
    """
    RRT* planner for xArm robot (produces smoother, more optimal paths)
    
    Args:
        start: Start joint angles [q1, q2, q3, q4, q5, q6]
        goal: Goal joint angles [q1, q2, q3, q4, q5, q6]
        max_iters: Maximum number of iterations
        step_size: Maximum step size for steering (radians)
        goal_radius: Radius around goal to consider as reached (radians)
        rewire_radius: Radius for rewiring neighbors (radians)
        collision_checks: Number of collision checks along path
    
    Returns:
        vertex_list: VertexListKDTree containing all vertices
        goal_vertex: Vertex at goal (or None if failed)
    """
    vertex_list = VertexListKDTree(dim=NUM_JOINTS)

    start_vertex = vertex_list.add_vertex(Vertex(start))
    start_vertex.parent = None
    start_vertex.cost = 0.0

    best_goal_vertex = None
    best_goal_cost = float('inf')
    
    for it in range(max_iters):
        if it % 100 == 0:
            print(f"Iteration {it}/{max_iters}, Tree size: {len(vertex_list.vertices)}, " +
                  f"Best goal cost: {best_goal_cost:.3f}")
        
        if np.random.rand() < 0.05:
            x_rand = goal
        else:
            x_rand = sample_feasible_state()

        nearest_vertex, _ = vertex_list.get_closest_vertex_in_graph(x_rand)
        if nearest_vertex is None:
            continue
        x_nearest = nearest_vertex.state

        x_new = steer(x_nearest, x_rand, step_size)

        # check collisions
        if is_state_in_collision(x_new):
            continue
        
        if not collision_free_path(x_nearest, x_new, num_checks=collision_checks):
            continue
        
        # find nearby vertices for connection and rewiring, calc near radius based on tree size w/ RRT* optimal formula
        n = len(vertex_list.vertices)
        near_radius = min(rewire_radius, step_size * np.power(np.log(n + 1) / (n + 1), 1.0 / NUM_JOINTS))
        k_nearest = int(np.ceil(np.log(n + 1)))
        
        near_vertices, near_distances = vertex_list.get_k_nearest_vertices(x_new, k_nearest)
        
        # filter vertices within near_radius
        near_vertices = [v for v, d in zip(near_vertices, near_distances) if d < near_radius]
        
        # choose parent with minimum cost
        min_cost = nearest_vertex.cost + distance(x_nearest, x_new)
        best_parent = nearest_vertex
        
        for near_vertex in near_vertices:
            potential_cost = near_vertex.cost + distance(near_vertex.state, x_new)
            if potential_cost < min_cost:
                if collision_free_path(near_vertex.state, x_new, num_checks=collision_checks):
                    min_cost = potential_cost
                    best_parent = near_vertex
        
        # add new vertex with best parent
        new_vertex = vertex_list.add_vertex(Vertex(x_new))
        new_vertex.parent = best_parent
        new_vertex.cost = min_cost
        
        # rewire nearby vertices if routing through new vertex is cheaper
        for near_vertex in near_vertices:
            if near_vertex == best_parent:
                continue
            
            new_cost = new_vertex.cost + distance(x_new, near_vertex.state)
            if new_cost < near_vertex.cost:
                if collision_free_path(x_new, near_vertex.state, num_checks=collision_checks):
                    near_vertex.parent = new_vertex
                    near_vertex.cost = new_cost
                    propagate_cost_to_descendants(near_vertex, vertex_list)
        
        # check if we reached the goal region
        dist_to_goal = distance(x_new, goal)
        if dist_to_goal < goal_radius:
            # check if this is better than previous goal connection
            potential_goal_cost = new_vertex.cost + dist_to_goal
            if potential_goal_cost < best_goal_cost:
                if best_goal_vertex is None:
                    print(f"Goal reached in {it} iterations!")
                    goal_vertex = vertex_list.add_vertex(Vertex(goal))
                    goal_vertex.parent = new_vertex
                    goal_vertex.cost = potential_goal_cost
                    best_goal_vertex = goal_vertex
                    best_goal_cost = potential_goal_cost
                else:
                    # rewire goal to better parent
                    print(f"Improved path! Cost: {best_goal_cost:.3f} -> {potential_goal_cost:.3f}")
                    best_goal_vertex.parent = new_vertex
                    best_goal_vertex.cost = potential_goal_cost
                    best_goal_cost = potential_goal_cost

    if best_goal_vertex is not None:
        print(f"\nFinal path cost: {best_goal_cost:.3f}")
        return vertex_list, best_goal_vertex
    else:
        print(f"\nFailed to find path after {max_iters} iterations")
        return vertex_list, None

def reconstruct_path(curr_vertex):
    if curr_vertex is None:
        return []
    
    out = [curr_vertex.state]
    while curr_vertex.parent is not None:
        out.append(curr_vertex.parent.state)
        curr_vertex = curr_vertex.parent
    return out[::-1]

def smooth_path(path, max_iterations=50, collision_checks=10):
    if len(path) <= 2:
        return path
    
    smoothed = path.copy()

    # try to smooth path with shortcutting
    for _ in range(max_iterations):
        if len(smoothed) <= 2:
            break
        
        i = np.random.randint(0, len(smoothed) - 2)
        j = np.random.randint(i + 2, len(smoothed))
        
        if collision_free_path(smoothed[i], smoothed[j], num_checks=collision_checks):
                smoothed = smoothed[:i+1] + smoothed[j:]
    
    print(f"Path smoothing: {len(path)} -> {len(smoothed)} waypoints")
    return smoothed

In [7]:
def move_robot_to_3d_goal(goal_xyz, max_iters=3000, step_size=0.1, goal_radius=0.1, 
                          newton_threshold=1e-4, newton_max_iter=100, speed=0.20):
    """
    move robot from current state to specified 3D position using Newton solver + RRT + Shortcutting
    """
    if arm is None or not arm.connected:
        print("Robot not connected!")
        return None
    
    current_joints = get_current_joint_angles()
    if current_joints is None:
        print("Failed to get current robot state!")
        return None
    
    current_joints = current_joints[:NUM_JOINTS]
    
    try:
        current_position = forward_kinematics(current_joints)
    except:
        print("Failed to get current end effector position!")
        return None
    
    goal_xyz = np.array(goal_xyz)
    
    print(f"Current position: {current_position} m")
    print(f"Target position:  {goal_xyz} m")
    #print(f"Distance to target: {np.linalg.norm(goal_xyz - current_position):.4f} m")
    
    def forward_kinematics_wrapper(joint_angles):
        try:
            pos = forward_kinematics(joint_angles)
            return pos
        except:
            return np.array([1e6, 1e6, 1e6])
    
    print("solving for joint angles using Newton solver...")
    start_time = time.time()
    
    try:
        goal_joint_angles = solve_for_final_joints(
            end_effector_xyz=current_position, 
            target_xyz=goal_xyz,
            current_joint_state=current_joints,
            forward_kinematics_func=forward_kinematics_wrapper,
            threshold=newton_threshold,
            max_iter=newton_max_iter
        )
        
        elapsed_time = time.time() - start_time
        print(f"Newton solver completed in {elapsed_time:.2f} seconds")
        
        # validate solution
        try:
            predicted_position = forward_kinematics(goal_joint_angles)
            final_error = np.linalg.norm(predicted_position - goal_xyz)
            if final_error > 0.01:
                print(f"Warning: Error is large ({final_error*1000:.1f} mm).")
        except:
            pass
        
    except Exception as e:
        print(f"Newton solver failed: {e}")
        return None
    
    print("Planning path with RRT...")
    
    start_state = current_joints
    goal_state = goal_joint_angles[:NUM_JOINTS]
    
    vertex_list, goal_vertex = rrt_star_plan(
        start_state,
        goal_state,
        max_iters=max_iters,
        step_size=step_size,
        goal_radius=goal_radius
    )
    
    if goal_vertex is None:
        print("\nNo path found!")
        return None
    
    # reconstruct path
    raw_path = reconstruct_path(goal_vertex)
    print(f"\n Raw Path found with {len(raw_path)} waypoints!")
    
    print("Smoothing path with shortcutting...")
    path = smooth_path(raw_path, max_iterations=200, collision_checks=15)
    print(f"Smoothed path has {len(path)} waypoints!")
    
    print("STARTING EXECUTION")
    
    try:
        for i, state in enumerate(path):
            print(f"Waypoint {i+1}/{len(path)}")
            code = move_to_state(state, speed=speed, wait=True)
            if code != 0:
                print(f"WARNING: Movement returned code {code}")
        
        print("\nPath execution COMPLETE!")
    
    except KeyboardInterrupt:
        print("\nPath execution CANCELLED by user")
        return path

    except Exception as e:
        print(f"\nERROR during execution: {e}")
        return path
    
    return path

In [8]:
arm.move_gohome(wait=True)

0

In [9]:
# set target 3D position in meters [x, y, z]
x_y_z_meters = [0.2, 0.1, 0.2]
GOAL_POSITION = np.array(x_y_z_meters)

# set home position for later reference
HOME_POSITION = np.array([4.4, -18.4, 45.3, 0.0, 0.0, -1.3, 4.8])

# RRT params
RRT_MAX_ITERS = 300

# Descent params
STEP_SIZE = 1  # Step size
GOAL_RAD = 0.2 # Goal radius

# Robot params
SPEED = 0.2

# for testing purposes
# Execute the path planning and movement
path = move_robot_to_3d_goal(
    goal_xyz=GOAL_POSITION,
    max_iters=RRT_MAX_ITERS,      
    step_size=STEP_SIZE,            
    goal_radius=GOAL_RAD,    
    speed=SPEED             
)

Current position: [ 8.70000000e-02 -2.29008944e-17  1.54199997e-01] m
Target position:  [0.2 0.1 0.2] m
solving for joint angles using Newton solver...
Newton solver completed in 0.05 seconds
Planning path with RRT...
Iteration 0/3000, Tree size: 1, Best goal cost: inf
Goal reached in 7 iterations!
Iteration 100/3000, Tree size: 102, Best goal cost: 0.740
Iteration 200/3000, Tree size: 201, Best goal cost: 0.740
Iteration 300/3000, Tree size: 301, Best goal cost: 0.740
Iteration 400/3000, Tree size: 401, Best goal cost: 0.740
Iteration 500/3000, Tree size: 500, Best goal cost: 0.740
Iteration 600/3000, Tree size: 597, Best goal cost: 0.740
Iteration 700/3000, Tree size: 697, Best goal cost: 0.740
Iteration 800/3000, Tree size: 794, Best goal cost: 0.740
Iteration 900/3000, Tree size: 891, Best goal cost: 0.740
Iteration 1000/3000, Tree size: 989, Best goal cost: 0.740
Iteration 1100/3000, Tree size: 1087, Best goal cost: 0.740
Iteration 1200/3000, Tree size: 1185, Best goal cost: 0.740

In [None]:
import numpy as np
import cv2
import pyrealsense2 as rs
import time
from xarm.wrapper import XArmAPI

ROBOT_IP = "192.168.1.161"
FRAME_W, FRAME_H, FPS = 640, 480, 30

LOW_YELLOW = np.array([20, 100, 100])
HIGH_YELLOW = np.array([35, 255, 255])

RRT_MAX_ITERS = 300
RRT_STEP_SIZE = 0.1
RRT_GOAL_RAD = 0.2

# Descent params
STEP_SIZE = 1  # Step size
GOAL_RAD = 0.2 # Goal radius

# Robot params
SPEED = 0.2

def mask_yellow(frame):
    hsv = cv2.cvtColor(frame, cv2.COLOR_BGR2HSV)
    mask = cv2.inRange(hsv, LOW_YELLOW, HIGH_YELLOW)
    kernel = np.ones((5, 5), np.uint8)
    mask = cv2.morphologyEx(mask, cv2.MORPH_OPEN, kernel)

    return mask

def transform_camera_to_robot_base(cam_xyz, current_pose):
    x, y, z = current_pose[0], current_pose[1], current_pose[2]
    r, p, yaw = current_pose[3], current_pose[4], current_pose[5]

    T_base_flange = np.eye(4)
    T_base_flange[0:3, 3] = [x/1000.0, y/1000.0, z/1000.0]
    
    Rx = np.array([[1, 0, 0], [0, np.cos(r), -np.sin(r)], [0, np.sin(r), np.cos(r)]])
    Ry = np.array([[np.cos(p), 0, np.sin(p)], [0, 1, 0], [-np.sin(p), 0, np.cos(p)]])
    Rz = np.array([[np.cos(yaw), -np.sin(yaw), 0], [np.sin(yaw), np.cos(yaw), 0], [0, 0, 1]])
    R_rot = Rz @ Ry @ Rx
    T_base_flange[0:3, 0:3] = R_rot

    T_flange_cam = np.eye(4)

    R_cam_mounting = np.array([
        [ 0,  -1,  0],  
        [1,  0,  0],   
        [ 0,  0,  1]  
    ])

    T_flange_cam[0:3, 0:3] = R_cam_mounting

    T_flange_cam[0:3, 3] = [0.0, 0.0762, 0.0] 

    # P_base = T_base_flange * T_flange_cam * P_cam
    P_cam_homogenous = np.array([cam_xyz[0], cam_xyz[1], cam_xyz[2], 1])
    P_base = T_base_flange @ T_flange_cam @ P_cam_homogenous

    return P_base[:3]

def main():
    arm = XArmAPI(ROBOT_IP, is_radian=True)
    arm.motion_enable(True)
    arm.set_mode(0)
    arm.set_state(0)

    pipeline = rs.pipeline()
    config = rs.config()
    config.enable_stream(rs.stream.color, FRAME_W, FRAME_H, rs.format.bgr8, FPS)
    config.enable_stream(rs.stream.depth, FRAME_W, FRAME_H, rs.format.z16, FPS)

    try:
        profile = pipeline.start(config)
        depth_sensor = profile.get_device().first_depth_sensor()
        depth_scale = depth_sensor.get_depth_scale()
        intrinsics = profile.get_stream(rs.stream.depth).as_video_stream_profile().get_intrinsics()

        align_to = rs.stream.color
        align = rs.align(align_to)

        print("RealSense camera started.")
        print(f"Depth scale: {depth_scale}")
    except Exception as e:
        print(f"Failed to start RealSense pipeline: {e}")
        return

    last_cube_info = None

    try:
        while True:
            frames = pipeline.wait_for_frames()
            aligned_frames = align.process(frames)
            color_frame = aligned_frames.get_color_frame()
            depth_frame = aligned_frames.get_depth_frame()

            if not color_frame or not depth_frame: continue

            frame = np.asanyarray(color_frame.get_data())
            mask = mask_yellow(frame)

            cnts, _ = cv2.findContours(mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
            
            cube_info_list = [] 
            out = frame.copy()

            for c in cnts:
                if cv2.contourArea(c) < 1000:
                    continue

                x, y, w, h = cv2.boundingRect(c)
                cx, cy = x + w // 2, y + h // 2

                dist_to_center = depth_frame.get_distance(cx, cy)
                
                if 0.1 < dist_to_center < 1.0:
                    camera_xyz = rs.rs2_deproject_pixel_to_point(intrinsics, [cx, cy], dist_to_center)
                    
                    cube_data = {
                        'camera_xyz': camera_xyz,  # [x, y, z] in meters
                        'pixel_center': (cx, cy),
                        'bbox': (x, y, w, h)
                    }
                    cube_info_list.append(cube_data)

                    cv2.rectangle(out, (x, y), (x+w, y+h), (0, 255, 0), 2)
                    label = f"X:{camera_xyz[0]:.2f} Y:{camera_xyz[1]:.2f} Z:{camera_xyz[2]:.2f}"
                    cv2.putText(out, label, (x, y - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 2)

            if len(cube_info_list) > 0:
                last_cube_info = cube_info_list[0] 
                
                cv2.putText(out, "Target Locked", (12, 60), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 0), 2)
            else:
                last_cube_info = None
                cv2.putText(out, "Searching...", (12, 60), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 0, 255), 2)

            cv2.putText(out, "Press 'a' to Move to Cube | 'h' Home | 'q' Quit",
                        (12, out.shape[0] - 20), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 255), 1)

            cv2.imshow("RealSense", out)
            cv2.imshow("Mask", mask)

            key = cv2.waitKey(1) & 0xFF
            if key in (27, ord('q')): 
                break
            
            elif key == ord('h'):
                arm.set_servo_angle(angle=[0, -0.15, 0.86, 0.06, 1.0, 0], speed=0.4, is_radian=True, wait=True)

            elif key == ord('a'):  
                if last_cube_info is not None:
                    print("\n[ACTION] Calculating Target...")
                    
                    cube_pos_cam = last_cube_info['camera_xyz']

                    code, curr_pose = arm.get_position(is_radian=True)
                    if code == 0:
                        target_base_xyz = transform_camera_to_robot_base(cube_pos_cam, curr_pose)

                        # target_pose = [
                        #     target_base_xyz[0] * 1000, 
                        #     target_base_xyz[1] * 1000,
                        #     target_base_xyz[2] * 1000,
                        #     curr_pose[3], curr_pose[4], curr_pose[5] 
                        # ]
                        
                        # arm.set_position(x=target_pose[0], y=target_pose[1], z=target_pose[2], 
                        #                roll=target_pose[3], pitch=target_pose[4], yaw=target_pose[5], 
                        #                speed=100, wait=True)
                        
                        # path = move_robot_to_3d_goal(
                        #     goal_xyz=GOAL_POSITION,
                        #     max_iters=RRT_MAX_ITERS,      
                        #     step_size=STEP_SIZE,            
                        #     goal_radius=GOAL_RAD,    
                        #     speed=SPEED             
                        # )
                        success = move_robot_to_3d_goal(
                            goal_xyz=target_base_xyz,
                            max_iters=RRT_MAX_ITERS,      
                            step_size=RRT_STEP_SIZE,            
                            goal_radius=RRT_GOAL_RAD,    
                            speed=SPEED             
                        )
        
                        if success:
                            print("Successfully moved to the cube!")
                        else:
                            print("Failed to reach the cube.")

                    else:
                        print("[ERROR] Could not get robot pose.")

    finally:
        pipeline.stop()
        cv2.destroyAllWindows()
        arm.disconnect()

In [14]:
if __name__ == "__main__":
    result = main()
    print(result)

ROBOT_IP: 192.168.1.161, VERSION: v1.11.4, PROTOCOL: V1, DETAIL: 6,9,LI1003,DL1000,v1.11.4, TYPE1300: [0, 0]
change protocol identifier to 3
RealSense camera started.
Depth scale: 0.0010000000474974513

[ACTION] Calculating Target...
Cam XYZ:  [-0.20576713979244232, -0.13271737098693848, 0.3150000274181366]
Base XYZ: [0.34996582 0.14403158 0.02547598]
Current position: [0.2097877  0.00310317 0.33225903] m
Target position:  [0.34996582 0.14403158 0.02547598] m
solving for joint angles using Newton solver...
Newton solver completed in 0.04 seconds
Planning path with RRT...
Iteration 0/300, Tree size: 1, Best goal cost: inf
Iteration 100/300, Tree size: 101, Best goal cost: inf
Iteration 200/300, Tree size: 201, Best goal cost: inf
Goal reached in 201 iterations!

Final path cost: 3.368

 Raw Path found with 34 waypoints!
Smoothing path with shortcutting...
Path smoothing: 34 -> 2 waypoints
Smoothed path has 2 waypoints!
STARTING EXECUTION
Waypoint 1/2
Waypoint 2/2

Path execution COMPLET