# ISDN 5240 Project

# Implement robot assembly


### Load curobo and kuka

In [1]:
# torch
import torch
import numpy as np

# cuRobo
from curobo.cuda_robot_model.cuda_robot_model import CudaRobotModel, CudaRobotModelConfig
from curobo.types.base import TensorDeviceType
from curobo.types.robot import RobotConfig
from curobo.util_file import get_robot_path, join_path, load_yaml
from robofab.mqtt import publish_robot_trajs, publish_clear, publish_world, publish_frames, publish_grasp_object
# convenience function to store tensor type and device
tensor_args = TensorDeviceType()

# the path of the yml file must be absolute
from robofab import ROBOFAB_DATA_DIR
def load_kuka_kin_model(gripper_width = 0.15):
    robot_name = "robot"
    config_file = load_yaml(ROBOFAB_DATA_DIR+ f"/robot/kuka/cfg.yaml")["robot_cfg"]
    config_kinematics = config_file['kinematics']
    config_kinematics["urdf_path"] = ROBOFAB_DATA_DIR + "/robot/kuka/" + config_kinematics["urdf_path"]
    config_kinematics["collision_spheres"] = ROBOFAB_DATA_DIR + "/robot/kuka/" + config_kinematics["collision_spheres"]
    config_kinematics["lock_joints"] = {"left_gripper_finger_joint": gripper_width, "right_gripper_finger_joint": gripper_width}

    robot_cfg = RobotConfig.from_dict(config_file, tensor_args)
    kin_model = CudaRobotModel(robot_cfg.kinematics)
    return config_file, kin_model

def load_franka_kin_model(gripper_width = 0.15):
    robot_name = "left"
    config_file = load_yaml(ROBOFAB_DATA_DIR+ f"/robot/dual_franka/fr3_franka_{robot_name}.yml")["robot_cfg"]
    config_kinematics = config_file['kinematics']
    config_kinematics["urdf_path"] = ROBOFAB_DATA_DIR + "/robot/dual_franka/" + config_kinematics["urdf_path"]
    config_kinematics["collision_spheres"] = ROBOFAB_DATA_DIR + "/robot/dual_franka/" + config_kinematics["collision_spheres"]
    config_kinematics["lock_joints"] = {"fr3_finger_joint1": gripper_width, "fr3_finger_joint2": gripper_width}

    robot_cfg = RobotConfig.from_dict(config_file, tensor_args)
    kin_model = CudaRobotModel(robot_cfg.kinematics)
    return config_file, kin_model

_, robot_kin_model = load_kuka_kin_model(0.15)
# _, robot_kin_model = load_franka_kin_model(0.15)
retract_q = robot_kin_model.retract_config.cpu().numpy().reshape(1, -1)
print(retract_q)
publish_clear()
publish_robot_trajs(retract_q, 0.15)

[[ 0.    0.    0.   -1.57  0.    1.57]]


### Load tubes

In [2]:
import numpy as np
from typing import List, Dict
import xml.etree.ElementTree as ET
# 你的规划器类

def get_geom_bounds(geom: Dict) -> Dict:
    """Compute world bounds for a geom based on type, pos, euler."""
    pos = geom['pos']
    size = geom['size']
    euler = geom['euler']

    # Rotation matrix from euler (xyz order)
    rx = np.array([[1, 0, 0], [0, np.cos(euler[0]), -np.sin(euler[0])], [0, np.sin(euler[0]), np.cos(euler[0])]])
    ry = np.array([[np.cos(euler[1]), 0, np.sin(euler[1])], [0, 1, 0], [-np.sin(euler[1]), 0, np.cos(euler[1])]])
    rz = np.array([[np.cos(euler[2]), -np.sin(euler[2]), 0], [np.sin(euler[2]), np.cos(euler[2]), 0], [0, 0, 1]])
    R = rz @ ry @ rx

    if geom['type'] == 'box':
        half_sizes = size
        corners_local = np.array(np.meshgrid(*[[-1, 1]] * 3, indexing='ij')).T.reshape(-1, 3) * half_sizes
        corners_world = (R @ corners_local.T).T + pos
        min_bounds = np.min(corners_world, axis=0)
        max_bounds = np.max(corners_world, axis=0)
    elif geom['type'] == 'cylinder':
        # Approximate bounds: radius in perpendicular planes, half-length along axis
        axis = R @ np.array([0, 0, 1])  # Axis direction
        perp1 = R @ np.array([1, 0, 0])
        perp2 = R @ np.array([0, 1, 0])
        half_length = size[1]
        radius = size[0]
        # Bounds along axis +/- half_length, perp +/- radius
        min_bounds = pos - half_length * np.abs(axis) - radius * (np.abs(perp1) + np.abs(perp2))
        max_bounds = pos + half_length * np.abs(axis) + radius * (np.abs(perp1) + np.abs(perp2))
    else:
        raise ValueError(f"Unsupported geom type: {geom['type']}")

    return {'min': min_bounds, 'max': max_bounds}


def compute_facade_contour(facade_geoms, depth=0.6, gap=0.2):
    """Compute segmented contour for scaffold: y segments with corresponding min_x for front."""
    y_bounds = set()
    for geom in facade_geoms:
        bounds = get_geom_bounds(geom)
        y_bounds.add(bounds['min'][1])
        y_bounds.add(bounds['max'][1])

    y_sorted = sorted(y_bounds)
    segments = []
    nx = []
    for i in range(len(y_sorted) - 1):
        y_min = y_sorted[i]
        y_max = y_sorted[i + 1]
        mid_y = (y_min + y_max) / 2
        active_min_x = np.inf
        for geom in facade_geoms:
            bounds = get_geom_bounds(geom)
            if bounds['min'][1] <= mid_y <= bounds['max'][1]:
                active_min_x = min(active_min_x, bounds['min'][0])
        if active_min_x < np.inf:
            segments.append({'y_min': y_min, 'y_max': y_max, 'min_x': active_min_x})
            nx.append(active_min_x)

    for seg in segments:
        min_x = seg['min_x']
        if min_x < max(nx):
            seg['min_x'] = max(nx) - depth - gap
        else:
            seg['min_x'] -= gap
    return segments


def plan_scaffold(facade_geoms: List[Dict], contour_segments: List[Dict], depth: float = 0.6, layer_height: float = 0.5, gap: float = 0.2) -> Dict:
    """Plan scaffold structure: sections, layers based on height."""
    # Max height from geoms
    max_height = max(get_geom_bounds(g)['max'][2] for g in facade_geoms)
    num_layers = int(np.ceil(max_height / layer_height))
    horizontal_levels = np.linspace(0, max_height, num_layers + 1)
    vertical_half_lengths = [layer_height / 2] * num_layers
    z_centers = (horizontal_levels[:-1] + horizontal_levels[1:]) / 2


    # ---- 重排 contour_segments ----
    # 依 min_x 由大到小排序；Python sort 是 stable，
    # 所以同一個 min_x 之間會保持原本的順序。

    contour_segments_sorted = sorted(
        contour_segments,
        key=lambda seg: seg["min_x"],
        reverse=True
    )


    sections = []
    for seg in contour_segments_sorted:
        sections.append({
            'y_start': seg['y_min'],
            'y_end': seg['y_max'],
            'x_front': seg['min_x'] - depth,
            'x_back': seg['min_x'],
            'add_diagonal': (seg['y_max'] - seg['y_min'] > 0.5), # only give true and false
        })

    return {
        'sections': sections,
        'num_layers': num_layers,                       # order of the layers
        'horizontal_levels': horizontal_levels,         # order of the layers
        'vertical_half_lengths': vertical_half_lengths, # order of the layers
        'z_centers': z_centers                          # order of the layers
    }


def generate_scaffold_xml(facade_xml_str: str,
                          gap: float = 0.2,
                          depth: float = 0.6,
                          layer_height: float = 0.8,
                          ):
    root = ET.fromstring(facade_xml_str)
    facade_geoms = []
    tube_info = {}
    for geom in root.findall('.//geom'):
        geom_dict = geom.attrib
        geom_dict['size'] = np.fromstring(geom_dict.get('size', '0 0 0'), sep=' ')
        geom_dict['pos'] = np.fromstring(geom_dict.get('pos', '0 0 0'), sep=' ')
        geom_dict['euler'] = np.fromstring(geom_dict.get('euler', '0 0 0'), sep=' ')
        facade_geoms.append(geom_dict)

    # --- 1) 原本的輪廓 & scaffold 計畫 ---
    contour_segments = compute_facade_contour(facade_geoms)
    plan = plan_scaffold(facade_geoms, contour_segments, depth, layer_height)

    # --- 2) 在這裡插入 GridPathPlanner：取得 sequence 並印出 ---
    # planner = GridPathPlanner(1.0, contour_segments, plan["num_layers"])
    # pos_sequences = planner.build_sequences_pos
    # 例如印第一個 segment 的所有 from/to
    # for i, (p_from, p_to) in enumerate(pos_sequences[0]):
    #     print(f"step {i}: from {p_from} to {p_to}")


    # planner.print_info()  # 這裡會印出 sorted segments 和每個 (layer, seg) 的 steps

    tube_radius = 0.024
    rgba = "0.7 0.7 0.7 1"
    material = "metal_mat"

    xml = '<?xml version="1.0" ?>\n<mujoco>\n'
    xml += '    <compiler angle = "radian"/>\n'
    xml += '    <option gravity="0 0 -9.81" timestep="0.002"/>\n'
    xml += '    <asset>\n'  # Assume assets are same
    xml += '        <texture name="wood" type="2d" builtin="flat" rgb1="0.8 0.6 0.4" rgb2="0.6 0.4 0.2" width="100" height="100"/>\n'
    xml += '        <material name="wood_mat" texture="wood" specular="0.5" shininess="0.5"/>\n'
    xml += '        <texture name="metal" type="2d" builtin="flat" rgb1="0.7 0.7 0.7" rgb2="0.5 0.5 0.5" width="100" height="100"/>\n'
    xml += '        <material name="metal_mat" texture="metal" specular="0.8" shininess="0.8"/>\n'
    xml += '        <texture name="groundplane" type="2d" builtin="checker" rgb1="0.2 0.3 0.4" rgb2="0.1 0.2 0.3" width="300" height="300"/>\n'
    xml += '        <material name="groundplane" texture="groundplane" texuniform="true" texrepeat="5 5" reflectance="0.2"/>\n'
    xml += '    </asset>\n'
    xml += '    <worldbody>\n'
    xml += '        <light pos="1 0 3.5" dir="0 0 -1" directional="true"/>\n'
    xml += '        <geom name="floor" size="0 0 0.05" type="plane" material="groundplane"/>\n'

    # Add facade geoms
    for geom in facade_geoms:
        pos_str = ' '.join(f"{p:.1f}" for p in geom['pos'])
        size_str = ' '.join(f"{s:.3f}" for s in geom['size'])
        euler_str = ' '.join(f"{e:.2f}" for e in geom['euler'])
        rgba_str = geom.get('rgba', '0.8 0.6 0.4 1')
        mat_str = geom.get('material', 'wood_mat')
        xml += f'        <geom name="{geom["name"]}" type="{geom["type"]}" size="{size_str}" pos="{pos_str}" euler="{euler_str}" rgba="{rgba_str}" material="{mat_str}"/>\n'

    tube_id = 0
    for sec in plan['sections']:
        y_start = sec['y_start']
        y_end = sec['y_end']
        x_front = sec['x_front']
        x_back = sec['x_back']
        add_diagonal = sec['add_diagonal']
        offset_0_025 =  0.025
        offset_0_05 =  0.05
        # Vertical poles
        for layer in range(plan['num_layers']):
            half_length = plan['vertical_half_lengths'][layer]
            z_center = plan['z_centers'][layer]
            # 0
            from_p = [x_back, y_start, z_center - half_length +offset_0_025]
            to_p = [x_back, y_start, z_center + half_length -offset_0_025]
            xml += f'        <geom name="tube{tube_id}" type="cylinder" fromto="{from_p[0]:.3f} {from_p[1]:.3f} {from_p[2]:.3f} {to_p[0]:.3f} {to_p[1]:.3f} {to_p[2]:.3f}" size="{tube_radius}" rgba="{rgba}" material="{material}"/>\n'
            tube_info[tube_id] = {"from_p": from_p, "to_p": to_p}
            tube_id += 1

            # 1
            from_p = [x_back, y_end, z_center - half_length+offset_0_025]
            to_p = [x_back, y_end, z_center + half_length -offset_0_025]
            xml += f'        <geom name="tube{tube_id}" type="cylinder" fromto="{from_p[0]:.3f} {from_p[1]:.3f} {from_p[2]:.3f} {to_p[0]:.3f} {to_p[1]:.3f} {to_p[2]:.3f}" size="{tube_radius}" rgba="{rgba}" material="{material}"/>\n'
            tube_info[tube_id] = {"from_p": from_p, "to_p": to_p}
            tube_id += 1

            # 2
            from_p = [x_back, y_end -offset_0_05 , z_center + half_length -offset_0_05 ]
            to_p = [x_back, y_start +offset_0_05 ,z_center - half_length  +offset_0_05 ]
            xml += f'        <geom name="tube{tube_id}" type="cylinder" fromto="{from_p[0]:.3f} {from_p[1]:.3f} {from_p[2]:.3f} {to_p[0]:.3f} {to_p[1]:.3f} {to_p[2]:.2f}" size="{tube_radius}" rgba="{rgba}" material="{material}"/>\n'
            tube_info[tube_id] = {"from_p": from_p, "to_p": to_p}
            tube_id += 1

            # 3
            from_p = [x_back, y_end -offset_0_025 , z_center + half_length ]
            to_p = [x_back, y_start +offset_0_025 ,z_center + half_length ]
            xml += f'        <geom name="tube{tube_id}" type="cylinder" fromto="{from_p[0]:.3f} {from_p[1]:.3f} {from_p[2]:.3f} {to_p[0]:.3f} {to_p[1]:.3f} {to_p[2]:.2f}" size="{tube_radius}" rgba="{rgba}" material="{material}"/>\n'
            tube_info[tube_id] = {"from_p": from_p, "to_p": to_p}
            tube_id += 1

            # 4
            from_p = [x_back-offset_0_025, y_end , z_center + half_length ]
            to_p = [x_front+offset_0_025, y_end ,z_center + half_length ]
            xml += f'        <geom name="tube{tube_id}" type="cylinder" fromto="{from_p[0]:.3f} {from_p[1]:.3f} {from_p[2]:.3f} {to_p[0]:.3f} {to_p[1]:.3f} {to_p[2]:.2f}" size="{tube_radius}" rgba="{rgba}" material="{material}"/>\n'
            tube_info[tube_id] = {"from_p": from_p, "to_p": to_p}
            tube_id += 1

            # 5
            from_p = [x_back-offset_0_025, y_start , z_center + half_length ]
            to_p = [x_front+offset_0_025, y_start ,z_center + half_length ]
            xml += f'        <geom name="tube{tube_id}" type="cylinder" fromto="{from_p[0]:.3f} {from_p[1]:.3f} {from_p[2]:.3f} {to_p[0]:.3f} {to_p[1]:.3f} {to_p[2]:.2f}" size="{tube_radius}" rgba="{rgba}" material="{material}"/>\n'
            tube_info[tube_id] = {"from_p": from_p, "to_p": to_p}
            tube_id += 1

            # 6
            from_p = [x_front, y_end , z_center - half_length +offset_0_025]
            to_p = [x_front, y_end ,z_center + half_length  -offset_0_025]
            xml += f'        <geom name="tube{tube_id}" type="cylinder" fromto="{from_p[0]:.3f} {from_p[1]:.3f} {from_p[2]:.3f} {to_p[0]:.3f} {to_p[1]:.3f} {to_p[2]:.2f}" size="{tube_radius}" rgba="{rgba}" material="{material}"/>\n'
            tube_info[tube_id] = {"from_p": from_p, "to_p": to_p}
            tube_id += 1

            # 7
            from_p = [x_front, y_start , z_center - half_length +offset_0_025]
            to_p = [x_front, y_start ,z_center + half_length  -offset_0_025]
            xml += f'        <geom name="tube{tube_id}" type="cylinder" fromto="{from_p[0]:.3f} {from_p[1]:.3f} {from_p[2]:.3f} {to_p[0]:.3f} {to_p[1]:.3f} {to_p[2]:.2f}" size="{tube_radius}" rgba="{rgba}" material="{material}"/>\n'
            tube_info[tube_id] = {"from_p": from_p, "to_p": to_p}
            tube_id += 1

            # 8
            from_p = [x_front, y_end -offset_0_05 , z_center - half_length +offset_0_05 ]
            to_p = [x_front, y_start +offset_0_05 ,z_center + half_length  -offset_0_05 ]
            xml += f'        <geom name="tube{tube_id}" type="cylinder" fromto="{from_p[0]:.3f} {from_p[1]:.3f} {from_p[2]:.3f} {to_p[0]:.3f} {to_p[1]:.3f} {to_p[2]:.2f}" size="{tube_radius}" rgba="{rgba}" material="{material}"/>\n'
            tube_info[tube_id] = {"from_p": from_p, "to_p": to_p}
            tube_id += 1

            # 9
            from_p = [x_front, y_end -offset_0_025 , z_center + half_length ]
            to_p = [x_front, y_start +offset_0_025 ,z_center + half_length ]
            xml += f'        <geom name="tube{tube_id}" type="cylinder" fromto="{from_p[0]:.3f} {from_p[1]:.3f} {from_p[2]:.3f} {to_p[0]:.3f} {to_p[1]:.3f} {to_p[2]:.2f}" size="{tube_radius}" rgba="{rgba}" material="{material}"/>\n'
            tube_info[tube_id] = {"from_p": from_p, "to_p": to_p}
            tube_id += 1

    # key: (x1,y1,z1,x2,y2,z2)（端點排序後、四捨五入），value: [tube_id1, tube_id2, ...]
    segment_dict = {}

    for tid, seg in tube_info.items():
        p1 = np.array(seg["from_p"], dtype=float)
        p2 = np.array(seg["to_p"], dtype=float)

        # 忽略方向：把端點按座標字典序排序，A→B 和 B→A 都變成同一個 key
        if tuple(p1) <= tuple(p2):
            a, b = p1, p2
        else:
            a, b = p2, p1

        # 四捨五入，避免 0.3000000004 之類的浮點數誤差
        key = tuple(np.round(np.concatenate([a, b]), 3))

        if key not in segment_dict:
            segment_dict[key] = []
        segment_dict[key].append(tid)

    overlapped_tube_ids = []
    valid_tube_ids_set = set()

    for key, ids in segment_dict.items():
        ids_sorted = sorted(ids)
        # 只要這個幾何 segment 出現超過一次，就視為重疊群組
        if len(ids_sorted) > 1:
            overlapped_tube_ids.extend(ids_sorted)
        # 無論有沒有重疊，這組裡「最小的 id」保留
        valid_tube_ids_set.add(ids_sorted[0])

    # 排序一下方便看
    overlapped_tube_ids = sorted(set(overlapped_tube_ids))
    valid_tube_index = sorted(valid_tube_ids_set)

    # 實際被丟掉的那幾根（重疊群組裡除了第一根以外）
    removed_tube_ids = sorted(set(overlapped_tube_ids) - valid_tube_ids_set)

    # print("Overlapped tube indices (all in overlapping groups):", overlapped_tube_ids)
    # print("Removed overlapped tube indices (discarded):", removed_tube_ids)
    print("Valid tube indices (kept):", valid_tube_index)


    xml += '    </worldbody>\n</mujoco>\n'
    return xml, tube_info, valid_tube_index


# get different designs by setting different wall, col1, and col2 locations
facade_xml = '''
<worldbody>
    <geom name="wall" type="box" size="2.0 0.025 2.4" pos="3.2 0 0" euler="0 0 1.57" rgba="0.8 0.6 0.4 1" material="wood_mat"/>
    <geom name="col1" type="cylinder" size="0.4 1.2" pos="3.2 -0.8 1.2" euler="0 0 1.57" rgba="0.8 0.6 0.4 1" material="wood_mat"/>
    <geom name="col2" type="cylinder" size="0.4 1.2" pos="3.2 0.8 1.2" euler="0 0 1.57" rgba="0.8 0.6 0.4 1" material="wood_mat"/>
</worldbody>
'''

# scaffold_structure[id] = {"start_pos":[], "end_pos": []}
xml, scaffold_structure, valid_tube_index = generate_scaffold_xml(
    facade_xml_str=facade_xml,
)


Valid tube indices (kept): [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86, 87, 88, 89, 92, 93, 94, 95, 96, 97, 98, 99, 102, 103, 104, 105, 106, 107, 108, 109, 112, 113, 114, 115, 116, 117, 118, 119, 122, 123, 124, 125, 126, 127, 128, 129, 132, 133, 134, 135, 136, 137, 138, 139, 142, 143, 144, 145, 146, 147, 148, 149]


In [3]:
from curobo.geom.types import WorldConfig, Mesh, Cylinder, Cuboid
import trimesh
import numpy as np
from curobo.geom.types import Pose
from scipy.spatial.transform import Rotation as R


def rotate_euler(euler_angles, axis, angle, degrees=False):
    """
    Rotate given Euler angles by a specified angle around an arbitrary axis.

    Parameters:
    - euler_angles: tuple or list of 3 floats, Euler angles (roll, pitch, yaw) in radians (or degrees if degrees=True).
    - axis: tuple or list of 3 floats, the arbitrary axis vector (will be normalized).
    - angle: float, the rotation angle in radians (or degrees if degrees=True).
    - degrees: bool, if True, input angles are in degrees; otherwise, in radians.

    Returns:
    - new_euler: tuple of 3 floats, the new Euler angles after rotation, in the same units as input.
    """
    # Normalize the axis vector
    axis = np.array(axis)
    axis /= np.linalg.norm(axis)

    # Create rotation from Euler angles
    rot_euler = R.from_euler('xyz', euler_angles, degrees=degrees)

    # Create rotation from axis-angle
    rot_axis_angle = R.from_rotvec(axis * angle if not degrees else axis * np.deg2rad(angle))

    # Compose the rotations: apply the new rotation to the existing one
    # Note: Order matters; here we apply the new rotation after the original (right-multiply)
    # If you want to apply it before, use rot_axis_angle * rot_euler
    new_rot = rot_euler * rot_axis_angle

    # Convert back to Euler angles
    new_euler = new_rot.as_euler('xyz', degrees=degrees)

    return tuple(new_euler)



def euler_to_quaternion(roll, pitch, yaw):
    qx = np.sin(roll/2) * np.cos(pitch/2) * np.cos(yaw/2) - np.cos(roll/2) * np.sin(pitch/2) * np.sin(yaw/2)
    qy = np.cos(roll/2) * np.sin(pitch/2) * np.cos(yaw/2) + np.sin(roll/2) * np.cos(pitch/2) * np.sin(yaw/2)
    qz = np.cos(roll/2) * np.cos(pitch/2) * np.sin(yaw/2) - np.sin(roll/2) * np.sin(pitch/2) * np.cos(yaw/2)
    qw = np.cos(roll/2) * np.cos(pitch/2) * np.cos(yaw/2) + np.sin(roll/2) * np.sin(pitch/2) * np.sin(yaw/2)
    return [qx, qy, qz, qw]

tube_cylinder = []
tube_mesh = []
tube_pose = []
for tube_id, tube in scaffold_structure.items():
    if tube_id not in valid_tube_index:
        continue

    start_p = tube["from_p"]
    start_p[0] -= 0.2
    end_p = tube["to_p"]
    end_p[0] -= 0.2

    # get rotation quaternion from start_p to end_p
    direction = np.array(end_p) - np.array(start_p)
    direction = direction / np.linalg.norm(direction)
    # Assuming the cylinder's default orientation is along the z-axis
    z_axis = np.array([0, 0, 1])
    # Compute euler angles
    if np.allclose(direction, z_axis):
        roll, pitch, yaw = 0, 0, 0
    elif np.allclose(direction, -z_axis):
        roll, pitch, yaw = 0, np.pi, 0
    else:
        roll = np.arctan2(direction[0], direction[1])
        yaw = np.arccos(direction[2])
        pitch = 0
    qx, qy, qz, qw = euler_to_quaternion(roll, pitch, yaw)

    cylinder = Cylinder(
        name=f"tube{tube_id}",
        radius=0.024,
        height=np.linalg.norm(np.array(end_p)-np.array(start_p)),
        # pose is [x, y, z, qx, qy, qz, qw]
        pose=[(start_p[0]+end_p[0])/2, (start_p[1]+end_p[1])/2, (start_p[2]+end_p[2])/2, qx, qy, qz, qw],
        color=[0.7,0.7,0.7,1.0],
    )
    tube_cylinder.append(cylinder)

    # mesh = trimesh.creation.cylinder(radius=0.024, height=np.linalg.norm(np.array(end_p)-np.array(start_p)), sections=16)
    # mesh.apply_translation([(start_p[0]+end_p[0])/2, (start_p[1]+end_p[1])/2, (start_p[2]+end_p[2])/2])
    mesh = WorldConfig(cylinder=[cylinder]).get_mesh_world(merge_meshes=True).mesh[0]#.get_trimesh_mesh()
    tube_mesh.append(mesh)


    tube_pose_rotate = []
    tube_pose_rotate_pos = []
    tube_pose_rotate_quat = []
    for deg in range(0, 360, 5):
        # # get rotation matrix
        # rotation_matrix = R.from_euler('xyz', [roll, pitch, yaw]).as_matrix()
        # # # Rotation along direction's normal by deg degrees
        # rotation_matrix = R.from_rotvec(direction * np.deg2rad(deg)).as_matrix() @ rotation_matrix
        # r = R.from_matrix(rotation_matrix)
        # qx, qy, qz, qw = r.as_quat()
        [new_roll, new_pitch, new_yaw] = rotate_euler((roll, pitch, yaw), direction, np.deg2rad(deg))
        qx, qy, qz, qw = euler_to_quaternion(new_roll, new_pitch, new_yaw)

        tube_pose_rotate_pos.append([(start_p[0]+end_p[0])/2, (start_p[1]+end_p[1])/2, (start_p[2]+end_p[2])/2])
        tube_pose_rotate_quat.append([qx, qy, qz, qw])

        tube_pose_rotate = Pose(
            position=torch.tensor(tube_pose_rotate_pos, **tensor_args.as_torch_dict()),
            quaternion=torch.tensor(tube_pose_rotate_quat, **tensor_args.as_torch_dict())
        )

    tube_pose.append(tube_pose_rotate)

    # # get rotation matrix
    # rotation_matrix = R.from_euler('xyz', [roll, pitch, yaw]).as_matrix()
    # # Rotation along y axis by -90 degrees
    # rotation_matrix = R.from_euler('y', -np.pi/2).as_matrix() @ rotation_matrix
    # # Rotation along x axis by -90 degrees
    # rotation_matrix = R.from_euler('x', -np.pi/2).as_matrix() @ rotation_matrix
    # # Rotation along y axis by 180 degrees
    # rotation_matrix = R.from_euler('y', np.pi).as_matrix() @ rotation_matrix
    # # Convert back to quaternion
    # r = R.from_matrix(rotation_matrix)
    # qx, qy, qz, qw = r.as_quat()  #
    # tube_pose.append(
    #     Pose(
    #             position=torch.tensor([(start_p[0]+end_p[0])/2, (start_p[1]+end_p[1])/2, (start_p[2]+end_p[2])/2], **tensor_args.as_torch_dict()),
    #             # quaternion=torch.tensor(euler_to_quaternion(roll-np.pi/4, pitch, yaw-np.pi/4), **tensor_args.as_torch_dict())
    #             quaternion=torch.tensor([qx, qy, qz, qw], **tensor_args.as_torch_dict())
    #         )
    #     )

place_world = WorldConfig(
    cylinder=[
        Cylinder(
            name="col1",
            radius=0.4,
            height=1.2*2,
            pose=[3.2, -0.8, 1.2, np.sin(1.57/2), 0, 0, np.cos(1.57/2)],
            color=[0.8, 0.6, 0.4, 1.0],
        ),
        Cylinder(
            name="col2",
            radius=0.4,
            height=1.2*2,
            pose=[3.2, 0.8, 1.2, np.sin(1.57/2), 0, 0, np.cos(1.57/2)],
            color=[0.8, 0.6, 0.4, 1.0],
        ),
        # tube_cylinder[2]
    ],
    cuboid=[Cuboid(
        name="wall",
        dims=[2.0*2, 0.025*2, 2.4],
        pose=[3.2, 0, 1.2, np.sin(1.57/2), 0, 0, np.cos(1.57/2)],
        color=[0.8, 0.6, 0.4, 1.0],
    )]
)


#visualize
publish_clear()
publish_robot_trajs(retract_q, 0.15)
publish_world(place_world, merged = True)

  exec(code_obj, self.user_global_ns, self.user_ns)


In [22]:
from robofab.place import poses_to_frames

### initialize IK solver function
from curobo.geom.types import Pose
from curobo.wrap.reacher.ik_solver import IKSolver, IKSolverConfig

def get_franka_ik_solver(gripper_width=0.15, world_config: WorldConfig = None, num_seeds=500):
    # config_file, _ = load_franka_kin_model(gripper_width)
    config_file, _ = load_kuka_kin_model(gripper_width)
    robot_cfg = RobotConfig.from_dict(config_file)
    ik_config = IKSolverConfig.load_from_robot_config(
            robot_cfg,
            world_model=world_config,
            tensor_args=tensor_args,
            use_cuda_graph=True,
            rotation_threshold=0.001,
            position_threshold=0.001,
            num_seeds=num_seeds,
            self_collision_check=False,
            self_collision_opt=False,
            collision_activation_distance= 0.02
        )
    ik_solver = IKSolver(ik_config)
    return ik_solver

### initialize Plan IK function
def get_poses_by_flag(poses: Pose, flag: torch.tensor):
    return Pose(position=poses.position[flag, :], quaternion=poses.quaternion[flag, :])

def plan_ik(poses: Pose,
            world_config: WorldConfig,
            gripper_width: float = 0.15):
    ik_solver = get_franka_ik_solver(gripper_width=gripper_width, world_config=world_config)
    result = ik_solver.solve_batch(poses)
    ik_solver.reset_cuda_graph()
    if result.success.any():
        flag = result.success
        success_qs = result.solution[result.success]
        sucess_pose = get_poses_by_flag(poses, flag.flatten())
        return (success_qs, sucess_pose, flag.flatten())
    else:
        return None


### Initialize Plan IK with seed function
from curobo.types.state import JointState
def plan_ik_seed_config(poses: Pose,
                        world_config: WorldConfig,
                        seed_config: JointState,
                        gripper_width: float = 0.15):
    ik_solver = get_franka_ik_solver(gripper_width=gripper_width, world_config=world_config, num_seeds = 1)
    seed_config = seed_config.position.view(1, -1)
    result = ik_solver.solve_batch(goal_pose=poses, seed_config=seed_config, num_seeds=1)
    if result.success.any():
        flag = result.success
        success_qs = result.solution[result.success]
        sucess_pose = get_poses_by_flag(poses, flag.flatten())
        return (success_qs, sucess_pose, flag.flatten())
    else:
        return None
    
### Motion gen configuration

from curobo.wrap.reacher.motion_gen import MotionGen, MotionGenConfig, MotionGenPlanConfig
from curobo.util.trajectory import InterpolateType

def get_franka_motion_gen(gripper_width=0.15, world_config: WorldConfig = None, num_seeds=1000):
    # config_file, _ = load_franka_kin_model(gripper_width)
    config_file, _ = load_kuka_kin_model(gripper_width)
    robot_cfg = RobotConfig.from_dict(config_file)
    motion_gen_config = MotionGenConfig.load_from_robot_config(
            robot_cfg,
            world_model=world_config,
            tensor_args=tensor_args,
            use_cuda_graph=True,
            rotation_threshold=0.05,
            position_threshold=0.005,
            num_ik_seeds=num_seeds,
            num_trajopt_seeds=num_seeds,
            interpolation_dt=0.01,
            interpolation_steps = 1000,
            interpolation_type=InterpolateType.CUBIC,
            high_precision=False,
            self_collision_check=False,
            self_collision_opt=False,
            minimize_jerk=False,
            collision_activation_distance=0.02)
    return MotionGen(motion_gen_config)


### Motion gen planner

from curobo.types.state import JointState
def plan_motion_attached_object(start_state: JointState,
                                goal_poses: Pose,
                                attached_object: Mesh,
                                attached_state : JointState,
                                world_config: WorldConfig,
                                gripper_width: float = 0.15):

    motion_gen = get_franka_motion_gen(world_config=world_config, gripper_width=gripper_width)
    place_poses = Pose(position=goal_poses.position.view(1, -1, 3), quaternion=goal_poses.quaternion.view(1, -1, 4))
    start_state.joint_names = motion_gen.kinematics.joint_names.copy()

    attached_state.joint_names = motion_gen.kinematics.joint_names.copy()
    motion_gen.attach_external_objects_to_robot(attached_state,
                                                [attached_object],
                                                surface_sphere_radius=0.015,
                                                link_name="attached_object")

    result = motion_gen.plan_goalset(start_state, place_poses, MotionGenPlanConfig(max_attempts=100))
    if result.success.any():
        qtraj = result.get_interpolated_plan().position
        return (qtraj, result.goalset_index.item())
    else:
        return None

def plan_motion_js_attached_object(start_state: JointState,
                                goal_state: JointState,
                                attached_object: Mesh,
                                attached_state : JointState,
                                world_config: WorldConfig,
                                gripper_width: float = 0.15):

    motion_gen = get_franka_motion_gen(world_config=world_config, gripper_width=gripper_width)
    start_state.joint_names = motion_gen.kinematics.joint_names.copy()
    goal_state.joint_names = motion_gen.kinematics.joint_names.copy()

    attached_state.joint_names = motion_gen.kinematics.joint_names.copy()
    motion_gen.attach_external_objects_to_robot(attached_state,
                                                [attached_object],
                                                surface_sphere_radius=0.015,
                                                link_name="attached_object")

    result = motion_gen.plan_single_js(start_state, goal_state, MotionGenPlanConfig(max_attempts=100))
    if result.success.any():
        qtraj = result.get_interpolated_plan().position
        return qtraj
    else:
        return None


def compute_pick_place(tube_id):
    ## 2.1 Compute place poses
    place_result = plan_ik(tube_pose[tube_id], place_world, 0.15)
    if place_result is None:
        raise RuntimeError("No solution found!")
    
    # update
    place_poses = place_result[1]
    
    ## 2.3 Plan Place IK
    ### Plan place approach IK
    place_approach_poses = place_poses.clone()
    place_approach_poses.position += torch.tensor([0, 0, 0.1], **tensor_args.as_torch_dict())
    place_approach_result =  plan_ik(place_approach_poses, place_world, 0.15)
    if place_approach_result is None:
        raise RuntimeError("No solution found!")
    
    # update
    place_approach_poses = place_approach_result[1]
    place_poses = get_poses_by_flag(place_poses, place_approach_result[2])
    # print("Number of place poses", place_poses.position.shape[0])
    
    ### Compute place to pick transformation matrix
    pick_pose_pos = [-1.5, -1.5, 0]
    pick_pose_quat = [0, 1, -1, 0]
    pick_poses = Pose(position=torch.tensor([pick_pose_pos]*len(place_poses), **tensor_args.as_torch_dict()), quaternion=torch.tensor([pick_pose_quat]*len(place_poses), **tensor_args.as_torch_dict()))
    pick_frames = poses_to_frames(pick_poses)
    
    ### Compute pick poses
    from curobo.geom.transform import matrix_to_quaternion
    pick_frames = torch.tensor(pick_frames, **tensor_args.as_torch_dict())
    pick_poses = Pose(position=pick_frames[:, :3, 3], quaternion=matrix_to_quaternion(pick_frames[:, :3, :3]))
    
    ### Plan pick ik
    pick_result = plan_ik(pick_poses, place_world, 0.15)
    if pick_result is None:
        raise RuntimeError("No solution found!")
    
    # update
    pick_poses = pick_result[1]
    place_poses = get_poses_by_flag(place_poses, pick_result[2])
    place_approach_poses = get_poses_by_flag(place_approach_poses, pick_result[2])
    
    ### Plan pick approach ik
    pick_approach_poses = pick_poses.clone()
    pick_approach_poses.position += torch.tensor([0, 0, 0.1], **tensor_args.as_torch_dict())
    pick_approach_result =  plan_ik(pick_approach_poses, place_world, 0.15)
    if pick_approach_result is None:
        raise RuntimeError("No solution found!")
    
    # update
    pick_approach_poses = pick_approach_result[1]
    pick_poses = get_poses_by_flag(pick_poses, pick_approach_result[2])
    place_poses = get_poses_by_flag(place_poses, pick_approach_result[2])
    # print("Number of pick poses", place_poses.position.shape[0])
    
    
    ## 4.1 Retract -> pick_approach
    ### Choose one pick-place-approach pair
    
    pick_place_id = 0
    pick_pose = pick_poses.clone()[pick_place_id]
    place_pose = place_poses.clone()[pick_place_id]
    pick_approach_pose = pick_approach_poses.clone()[pick_place_id]
    place_approach_pose = place_approach_poses.clone()[pick_place_id]
    
    
    ### Compute attach objects and states
    
    mesh = trimesh.creation.cylinder(radius=0.024, height=np.linalg.norm(np.array(end_p)-np.array(start_p)), sections=16)
    mesh.apply_translation(pick_pose_pos)
    mesh = Mesh(
        name=f"tube_mesh{tube_id}",
        pose=[*pick_pose_pos, *pick_pose_quat],
        vertices=mesh.vertices,
        faces=mesh.faces,
        color=[0.7,0.7,0.7,1.0],
    )
    
    # pick_result = plan_ik(pick_pose, pick_world[3], 0.15)
    pick_result = plan_ik(pick_pose, place_world, 0.15)
    if pick_result is None:
        raise RuntimeError("No solution found!")
    pick_q = pick_result[0].cpu().numpy().reshape(1, -1)
    
    attached_state = JointState.from_position(pick_result[0])
    attached_object = mesh
    start_state = JointState.from_position(robot_kin_model.retract_config.view(1, -1), joint_names=robot_kin_model.joint_names)
    
    attached_object_for_rendering = trimesh.Trimesh(attached_object.vertices.copy(), attached_object.faces.copy())
    T = poses_to_frames(pick_pose).reshape(4, 4)
    invT = np.linalg.inv(T)
    attached_object_for_rendering.apply_transform(invT)
    
    ### Plan motion
    trajs_retract_pick_approach_result = plan_motion_attached_object(start_state, pick_approach_pose, attached_object, attached_state, place_world, 0.15)
    if trajs_retract_pick_approach_result is None:
        raise RuntimeError("No solution found!")
    trajs_retract_pick = trajs_retract_pick_approach_result[0].clone().cpu().numpy()
    trajs_retract_pick = np.vstack([trajs_retract_pick, pick_q])
    
    ### Resolve IK Jumping
    pick_approach_state = JointState.from_position(trajs_retract_pick_approach_result[0][-1], joint_names=robot_kin_model.joint_names)
    pick_result = plan_ik_seed_config(pick_pose, place_world, pick_approach_state, 0.15)
    if pick_result is None:
        raise RuntimeError("No solution found!")
    pick_q = pick_result[0].cpu().numpy()
    trajs_retract_pick = trajs_retract_pick_approach_result[0].clone().cpu().numpy()
    trajs_retract_pick = np.vstack([trajs_retract_pick, pick_q])
    
    
    ## 4.2 retract -> place_approach
    ### Compute attach objects and states
    
    place_result = plan_ik(place_pose, place_world, 0.15)
    if place_result is None:
        raise RuntimeError("No solution found!")
    place_q = place_result[0].cpu().numpy().reshape(1, -1)
    attached_state = JointState.from_position(place_result[0])
    attached_object = tube_mesh[tube_id]
    start_state = JointState.from_position(robot_kin_model.retract_config.view(1, -1), joint_names=robot_kin_model.joint_names)
    attached_object_for_rendering = trimesh.Trimesh(attached_object.vertices.copy(), attached_object.faces.copy())
    T = poses_to_frames(place_pose).reshape(4, 4)
    invT = np.linalg.inv(T)
    attached_object_for_rendering.apply_transform(invT)
    
    ### Plan motion
    trajs_retract_place_approach_result = plan_motion_attached_object(start_state, place_approach_pose, attached_object, attached_state, place_world, 0.02)
    if trajs_retract_place_approach_result is None:
        raise RuntimeError("No solution found!")
    trajs_retract_place = trajs_retract_place_approach_result[0].cpu().numpy()
    trajs_retract_place = np.vstack([trajs_retract_place, place_q])
    
    ### Resolve IK Jumping
    place_approach_state = JointState.from_position(trajs_retract_place_approach_result[0][-1], joint_names=robot_kin_model.joint_names)
    place_result = plan_ik_seed_config(place_pose, place_world, place_approach_state, 0.15)
    if place_result is None:
        raise RuntimeError("No solution found!")
    place_q = place_result[0].cpu().numpy()
    trajs_retract_place = trajs_retract_place_approach_result[0].clone().cpu().numpy()
    trajs_retract_place = np.vstack([trajs_retract_place, place_q])
    
    
    ## 4.3 pick_approach -> place_approach
    pick_approach_q = torch.tensor(trajs_retract_pick[-2, :].reshape(1, -1), **tensor_args.as_torch_dict())
    place_approach_q = torch.tensor(trajs_retract_place[-2, :].reshape(1, -1), **tensor_args.as_torch_dict())
    start_state = JointState.from_position(pick_approach_q, joint_names=robot_kin_model.joint_names)
    goal_state = JointState.from_position(place_approach_q, joint_names=robot_kin_model.joint_names)
    
    
    trajs_pick_place_approach_result = plan_motion_js_attached_object(start_state, goal_state, attached_object, attached_state, place_world, 0.15)
    if trajs_pick_place_approach_result is None:
        raise RuntimeError("No solution found!")
    trajs_pick_place = trajs_pick_place_approach_result.cpu().numpy()
    trajs_pick_place = np.vstack([pick_q, trajs_pick_place, place_q, trajs_pick_place[::-1], pick_q])

    return trajs_pick_place, attached_object_for_rendering

In [5]:
publish_clear()
publish_world(place_world, merged = True)
final_trajs = np.empty((0, len(robot_kin_model.joint_names)))
for tube_id in range(len(tube_mesh)):
    try:
        traj, attached_object_for_rendering = compute_pick_place(tube_id)
        final_trajs = np.vstack([final_trajs, traj])
        # update place world
        place_world = WorldConfig(
            cylinder=[*place_world.cylinder, tube_cylinder[tube_id]],
            cuboid=[*place_world.cuboid],
        )
        publish_robot_trajs(final_trajs, 0.15)
        publish_grasp_object(attached_object_for_rendering)
        publish_world(place_world, merged = True)
        print(f"Successfully processed tube_id {tube_id}")
    except RuntimeError as e:
        print(f"Error processing tube_id {tube_id}: {e}")

Error processing tube_id 0: No solution found!
Error processing tube_id 1: No solution found!
Successfully processed tube_id 2
Successfully processed tube_id 3
Successfully processed tube_id 4


only got 27/28 samples!


Successfully processed tube_id 5
Error processing tube_id 6: No solution found!
Error processing tube_id 7: No solution found!
Successfully processed tube_id 8


only got 49/50 samples!


Successfully processed tube_id 9
Error processing tube_id 10: No solution found!
Error processing tube_id 11: No solution found!
Successfully processed tube_id 12
Successfully processed tube_id 13
Successfully processed tube_id 14
Successfully processed tube_id 15
Error processing tube_id 16: No solution found!
Error processing tube_id 17: No solution found!
Successfully processed tube_id 18
Successfully processed tube_id 19
Error processing tube_id 20: No solution found!
Error processing tube_id 21: No solution found!
Error processing tube_id 22: No solution found!
Error processing tube_id 23: No solution found!
Successfully processed tube_id 24
Error processing tube_id 25: No solution found!
Error processing tube_id 26: No solution found!
Error processing tube_id 27: No solution found!
Successfully processed tube_id 28


only got 20/23 samples!


Successfully processed tube_id 29
Error processing tube_id 30: No solution found!
Error processing tube_id 31: No solution found!
Successfully processed tube_id 32
Successfully processed tube_id 33


only got 27/28 samples!


Successfully processed tube_id 34
Successfully processed tube_id 35
Error processing tube_id 36: No solution found!
Error processing tube_id 37: No solution found!


only got 45/50 samples!


Successfully processed tube_id 38
Successfully processed tube_id 39
Error processing tube_id 40: No solution found!
Error processing tube_id 41: No solution found!
Successfully processed tube_id 42


only got 22/23 samples!


Successfully processed tube_id 43
Successfully processed tube_id 44


only got 48/50 samples!


Successfully processed tube_id 45
Error processing tube_id 46: No solution found!
Error processing tube_id 47: No solution found!
Successfully processed tube_id 48
Successfully processed tube_id 49
Error processing tube_id 50: No solution found!
Error processing tube_id 51: No solution found!
Successfully processed tube_id 52
Error processing tube_id 53: No solution found!
Successfully processed tube_id 54


only got 27/28 samples!


Successfully processed tube_id 55
Error processing tube_id 56: No solution found!
Error processing tube_id 57: No solution found!


only got 49/50 samples!


Successfully processed tube_id 58


only got 21/23 samples!


Successfully processed tube_id 59
Error processing tube_id 60: No solution found!
Error processing tube_id 61: No solution found!
Error processing tube_id 62: No solution found!
Error processing tube_id 63: No solution found!


only got 25/28 samples!


Successfully processed tube_id 64
Error processing tube_id 65: No solution found!
Error processing tube_id 66: No solution found!
Error processing tube_id 67: No solution found!
Successfully processed tube_id 68
Successfully processed tube_id 69
Error processing tube_id 70: No solution found!
Error processing tube_id 71: No solution found!
Successfully processed tube_id 72
Error processing tube_id 73: No solution found!
Successfully processed tube_id 74
Successfully processed tube_id 75
Error processing tube_id 76: No solution found!
Error processing tube_id 77: No solution found!


only got 49/50 samples!


Successfully processed tube_id 78


only got 22/23 samples!


Successfully processed tube_id 79
Error processing tube_id 80: No solution found!
Error processing tube_id 81: No solution found!
Error processing tube_id 82: No solution found!
Error processing tube_id 83: No solution found!
Error processing tube_id 84: No solution found!


only got 27/28 samples!


Successfully processed tube_id 85
Error processing tube_id 86: No solution found!
Error processing tube_id 87: No solution found!
Successfully processed tube_id 88
Successfully processed tube_id 89
Successfully processed tube_id 90
Successfully processed tube_id 91


only got 26/28 samples!


Successfully processed tube_id 92
Successfully processed tube_id 93
Error processing tube_id 94: No solution found!
Error processing tube_id 95: No solution found!


only got 26/27 samples!


Successfully processed tube_id 96
Successfully processed tube_id 97


only got 49/50 samples!


Successfully processed tube_id 98
Successfully processed tube_id 99
Successfully processed tube_id 100
Successfully processed tube_id 101
Error processing tube_id 102: No solution found!
Error processing tube_id 103: No solution found!


only got 23/26 samples!


Successfully processed tube_id 104
Successfully processed tube_id 105
Successfully processed tube_id 106
Successfully processed tube_id 107
Successfully processed tube_id 108
Successfully processed tube_id 109
Error processing tube_id 110: No solution found!
Error processing tube_id 111: No solution found!


only got 49/50 samples!
only got 25/26 samples!


Successfully processed tube_id 112
Successfully processed tube_id 113
Successfully processed tube_id 114
Successfully processed tube_id 115
Successfully processed tube_id 116
Successfully processed tube_id 117
Error processing tube_id 118: No solution found!
Error processing tube_id 119: No solution found!


only got 26/27 samples!


Successfully processed tube_id 120


only got 47/50 samples!


Successfully processed tube_id 121


only got 48/50 samples!


Successfully processed tube_id 122


only got 49/50 samples!


Successfully processed tube_id 123
Successfully processed tube_id 124
Successfully processed tube_id 125
Error processing tube_id 126: No solution found!
Error processing tube_id 127: No solution found!
Successfully processed tube_id 128


only got 21/22 samples!


Successfully processed tube_id 129
Successfully processed tube_id 130


only got 47/50 samples!


Successfully processed tube_id 131
Successfully processed tube_id 132
Successfully processed tube_id 133
Error processing tube_id 134: No solution found!
Error processing tube_id 135: No solution found!
Successfully processed tube_id 136
Successfully processed tube_id 137


# 2. Compute Place IK

## 2.1 Compute place poses

In [7]:
tube_id = 45

from robofab.place import compute_part_place_pose
from robofab.place import poses_to_frames

### Predefining the gripper z-axis
# place_poses_with_zaxis = compute_part_place_pose(tube_mesh[21], tool_offset = 0, zaxis = np.array([-1, 0, 0]))

# render
frames = poses_to_frames(tube_pose[tube_id])
publish_frames(frames)

## 2.2 Initialize IK Solver

In [8]:
### initialize IK solver function
from curobo.geom.types import Pose
from curobo.wrap.reacher.ik_solver import IKSolver, IKSolverConfig


def get_franka_ik_solver(gripper_width=0.15, world_config: WorldConfig = None, num_seeds=500):
    # config_file, _ = load_franka_kin_model(gripper_width)
    config_file, _ = load_kuka_kin_model(gripper_width)
    robot_cfg = RobotConfig.from_dict(config_file)
    ik_config = IKSolverConfig.load_from_robot_config(
            robot_cfg,
            world_model=world_config,
            tensor_args=tensor_args,
            use_cuda_graph=True,
            rotation_threshold=0.001,
            position_threshold=0.001,
            num_seeds=num_seeds,
            self_collision_check=False,
            self_collision_opt=False,
            collision_activation_distance= 0.02
        )
    ik_solver = IKSolver(ik_config)
    return ik_solver

### initialize Plan IK function
def get_poses_by_flag(poses: Pose, flag: torch.tensor):
    return Pose(position=poses.position[flag, :], quaternion=poses.quaternion[flag, :])

def plan_ik(poses: Pose,
            world_config: WorldConfig,
            gripper_width: float = 0.15):
    ik_solver = get_franka_ik_solver(gripper_width=gripper_width, world_config=world_config)
    result = ik_solver.solve_batch(poses)
    ik_solver.reset_cuda_graph()
    if result.success.any():
        flag = result.success
        success_qs = result.solution[result.success]
        sucess_pose = get_poses_by_flag(poses, flag.flatten())
        return (success_qs, sucess_pose, flag.flatten())
    else:
        return None


### Initialize Plan IK with seed function
from curobo.types.state import JointState
def plan_ik_seed_config(poses: Pose,
                        world_config: WorldConfig,
                        seed_config: JointState,
                        gripper_width: float = 0.15):
    ik_solver = get_franka_ik_solver(gripper_width=gripper_width, world_config=world_config, num_seeds = 1)
    seed_config = seed_config.position.view(1, -1)
    result = ik_solver.solve_batch(goal_pose=poses, seed_config=seed_config, num_seeds=1)
    if result.success.any():
        flag = result.success
        success_qs = result.solution[result.success]
        sucess_pose = get_poses_by_flag(poses, flag.flatten())
        return (success_qs, sucess_pose, flag.flatten())
    else:
        return None

## 2.3 Plan Place IK

In [9]:

# for i in range(len(tube_pose)):
#     place_result = plan_ik(tube_pose[i], place_world, 0.15)
#     if place_result is not None:
#         print(f"Tube {i} placed.")
#         place_world = WorldConfig(
#             cylinder=[*place_world.cylinder, tube_cylinder[i]],
#             cuboid=[*place_world.cuboid],
#         )
#         # update
#         place_poses = place_result[1]
#         # render
#         # publish_clear()
#         publish_robot_trajs(place_result[0].cpu().numpy(), 0.15)
#         frames = poses_to_frames(place_poses)
#         publish_frames(frames)
#         publish_world(place_world, merged = True)
#         # break
#     else:
#         print(f"Tube {i} no solution.")

place_result = plan_ik(tube_pose[tube_id], place_world, 0.15)
if place_result is None:
    raise RuntimeError(f"No solution.")

# update
place_poses = place_result[1]
# render
# publish_clear()
publish_robot_trajs(place_result[0].cpu().numpy(), 0.15)
frames = poses_to_frames(place_poses)
publish_frames(frames)
publish_world(place_world, merged = True)


In [10]:
### Plan place approach IK
place_approach_poses = place_poses.clone()
place_approach_poses.position += torch.tensor([0, 0, 0.1], **tensor_args.as_torch_dict())
place_approach_result =  plan_ik(place_approach_poses, place_world, 0.15)

# update
place_approach_poses = place_approach_result[1]
place_poses = get_poses_by_flag(place_poses, place_approach_result[2])
print("Number of place poses", place_poses.position.shape[0])

# render
publish_clear()
publish_robot_trajs(place_approach_result[0].cpu().numpy(), 0.15)
frames = poses_to_frames(place_approach_poses)
publish_frames(frames)
publish_world(place_world, merged = True)

Number of place poses 45


# 3. Compute Pick IK

## 3.1 Compute pick poses

In [11]:
### Compute place to pick transformation matrix
from robofab.pick import compute_part_transformation_from_place_to_pick

# # It can have multiple mats for achieving the same effect
# place_to_pick_mats = compute_part_transformation_from_place_to_pick(mesh, tube_mesh[2], voxel_size = 0.05)

pick_pose_pos = [-1.5, -1.5, 0]
pick_pose_quat = [0, 1, -1, 0]
pick_poses = Pose(position=torch.tensor([pick_pose_pos]*len(place_poses), **tensor_args.as_torch_dict()), quaternion=torch.tensor([pick_pose_quat]*len(place_poses), **tensor_args.as_torch_dict()))
pick_frames = poses_to_frames(pick_poses)

### Compute pick frames
# render
# publish_clear()
place_frames = poses_to_frames(place_poses)
# pick_frames = place_to_pick_mats[0] @ place_frames
publish_frames(pick_frames)
# publish_world(pick_world[3], merged = False)

## 3.2 Compute pick IK

In [12]:
### Compute pick poses
from curobo.geom.transform import matrix_to_quaternion
pick_frames = torch.tensor(pick_frames, **tensor_args.as_torch_dict())
pick_poses = Pose(position=pick_frames[:, :3, 3], quaternion=matrix_to_quaternion(pick_frames[:, :3, :3]))

### Plan pick ik
pick_result = plan_ik(pick_poses, place_world, 0.15)
if pick_result is None:
    raise RuntimeError("No solution found!")

# update
pick_poses = pick_result[1]
place_poses = get_poses_by_flag(place_poses, pick_result[2])
place_approach_poses = get_poses_by_flag(place_approach_poses, pick_result[2])

# render
publish_clear()
publish_robot_trajs(pick_result[0].cpu().numpy(), 0.15)
# publish_world(pick_world[3], merged = True)
publish_world(place_world, merged = True)

In [13]:
### Plan pick approach ik
pick_approach_poses = pick_poses.clone()
pick_approach_poses.position += torch.tensor([0, 0, 0.1], **tensor_args.as_torch_dict())
# pick_approach_result =  plan_ik(pick_approach_poses, pick_world[3], 0.15)
pick_approach_result =  plan_ik(pick_approach_poses, place_world, 0.15)

# update
pick_approach_poses = pick_approach_result[1]
pick_poses = get_poses_by_flag(pick_poses, pick_approach_result[2])
place_poses = get_poses_by_flag(place_poses, pick_approach_result[2])
place_approach_poses = get_poses_by_flag(place_approach_poses, pick_approach_result[2])
print("Number of pick poses", place_poses.position.shape[0])

# render
publish_clear()
publish_robot_trajs(pick_approach_result[0].cpu().numpy(), 0.15)
# publish_world(pick_world[3], merged = True)
publish_world(place_world, merged = True)

Number of pick poses 45


# 4. Plan Motions

## 4.1 Initialize motion solver

In [14]:
### Motion gen configuration

from curobo.wrap.reacher.motion_gen import MotionGen, MotionGenConfig, MotionGenPlanConfig
from curobo.util.trajectory import InterpolateType

def get_franka_motion_gen(gripper_width=0.15, world_config: WorldConfig = None, num_seeds=1000):
    # config_file, _ = load_franka_kin_model(gripper_width)
    config_file, _ = load_kuka_kin_model(gripper_width)
    robot_cfg = RobotConfig.from_dict(config_file)
    motion_gen_config = MotionGenConfig.load_from_robot_config(
            robot_cfg,
            world_model=world_config,
            tensor_args=tensor_args,
            use_cuda_graph=True,
            rotation_threshold=0.05,
            position_threshold=0.005,
            num_ik_seeds=num_seeds,
            num_trajopt_seeds=num_seeds,
            interpolation_dt=0.01,
            interpolation_steps = 1000,
            interpolation_type=InterpolateType.CUBIC,
            high_precision=False,
            self_collision_check=False,
            self_collision_opt=False,
            minimize_jerk=False,
            collision_activation_distance=0.02)
    return MotionGen(motion_gen_config)


### Motion gen planner

from curobo.types.state import JointState
def plan_motion_attached_object(start_state: JointState,
                                goal_poses: Pose,
                                attached_object: Mesh,
                                attached_state : JointState,
                                world_config: WorldConfig,
                                gripper_width: float = 0.15):

    motion_gen = get_franka_motion_gen(world_config=world_config, gripper_width=gripper_width)
    place_poses = Pose(position=goal_poses.position.view(1, -1, 3), quaternion=goal_poses.quaternion.view(1, -1, 4))
    start_state.joint_names = motion_gen.kinematics.joint_names.copy()

    attached_state.joint_names = motion_gen.kinematics.joint_names.copy()
    motion_gen.attach_external_objects_to_robot(attached_state,
                                                [attached_object],
                                                surface_sphere_radius=0.015,
                                                link_name="attached_object")

    result = motion_gen.plan_goalset(start_state, place_poses, MotionGenPlanConfig(max_attempts=100))
    if result.success.any():
        qtraj = result.get_interpolated_plan().position
        return (qtraj, result.goalset_index.item())
    else:
        return None

def plan_motion_js_attached_object(start_state: JointState,
                                goal_state: JointState,
                                attached_object: Mesh,
                                attached_state : JointState,
                                world_config: WorldConfig,
                                gripper_width: float = 0.15):

    motion_gen = get_franka_motion_gen(world_config=world_config, gripper_width=gripper_width)
    start_state.joint_names = motion_gen.kinematics.joint_names.copy()
    goal_state.joint_names = motion_gen.kinematics.joint_names.copy()

    attached_state.joint_names = motion_gen.kinematics.joint_names.copy()
    motion_gen.attach_external_objects_to_robot(attached_state,
                                                [attached_object],
                                                surface_sphere_radius=0.015,
                                                link_name="attached_object")

    result = motion_gen.plan_single_js(start_state, goal_state, MotionGenPlanConfig(max_attempts=100))
    if result.success.any():
        qtraj = result.get_interpolated_plan().position
        return qtraj
    else:
        return None




## 4.1 Retract -> pick_approach

In [15]:
### Choose one pick-place-approach pair

pick_place_id = 0
pick_pose = pick_poses.clone()[pick_place_id]
place_pose = place_poses.clone()[pick_place_id]
pick_approach_pose = pick_approach_poses.clone()[pick_place_id]
place_approach_pose = place_approach_poses.clone()[pick_place_id]


### Compute attach objects and states

mesh = trimesh.creation.cylinder(radius=0.024, height=np.linalg.norm(np.array(end_p)-np.array(start_p)), sections=16)
mesh.apply_translation(pick_pose_pos)
mesh = Mesh(
    name=f"tube_mesh{tube_id}",
    pose=[*pick_pose_pos, *pick_pose_quat],
    vertices=mesh.vertices,
    faces=mesh.faces,
    color=[0.7,0.7,0.7,1.0],
)

# pick_result = plan_ik(pick_pose, pick_world[3], 0.15)
pick_result = plan_ik(pick_pose, place_world, 0.15)
pick_q = pick_result[0].cpu().numpy().reshape(1, -1)

attached_state = JointState.from_position(pick_result[0])
attached_object = mesh
start_state = JointState.from_position(robot_kin_model.retract_config.view(1, -1), joint_names=robot_kin_model.joint_names)

attached_object_for_rendering = trimesh.Trimesh(attached_object.vertices.copy(), attached_object.faces.copy())
T = poses_to_frames(pick_pose).reshape(4, 4)
invT = np.linalg.inv(T)
attached_object_for_rendering.apply_transform(invT)

# render
publish_clear()
publish_robot_trajs(retract_q, 0.15)
publish_world(place_world, merged = True)
publish_grasp_object(attached_object_for_rendering)




In [16]:
### Plan motion
trajs_retract_pick_approach_result = plan_motion_attached_object(start_state, pick_approach_pose, attached_object, attached_state, place_world, 0.15)
trajs_retract_pick = trajs_retract_pick_approach_result[0].clone().cpu().numpy()
trajs_retract_pick = np.vstack([trajs_retract_pick, pick_q])

#render
publish_clear()
publish_robot_trajs(trajs_retract_pick, 0.15)
publish_world(place_world, merged = True)
publish_grasp_object(attached_object_for_rendering)



In [17]:
### Resolve IK Jumping

pick_approach_state = JointState.from_position(trajs_retract_pick_approach_result[0][-1], joint_names=robot_kin_model.joint_names)
pick_result = plan_ik_seed_config(pick_pose, place_world, pick_approach_state, 0.15)
pick_q = pick_result[0].cpu().numpy()
trajs_retract_pick = trajs_retract_pick_approach_result[0].clone().cpu().numpy()
trajs_retract_pick = np.vstack([trajs_retract_pick, pick_q])

# render
publish_clear()
publish_robot_trajs(trajs_retract_pick, 0.15)
publish_world(place_world, merged = True)
publish_grasp_object(attached_object_for_rendering)

## 4.2 retract -> place_approach

In [18]:
### Compute attach objects and states

place_result = plan_ik(place_pose, place_world, 0.15)
place_q = place_result[0].cpu().numpy().reshape(1, -1)
attached_state = JointState.from_position(place_result[0])
attached_object = tube_mesh[tube_id]
start_state = JointState.from_position(robot_kin_model.retract_config.view(1, -1), joint_names=robot_kin_model.joint_names)
attached_object_for_rendering = trimesh.Trimesh(attached_object.vertices.copy(), attached_object.faces.copy())
T = poses_to_frames(place_pose).reshape(4, 4)
invT = np.linalg.inv(T)
attached_object_for_rendering.apply_transform(invT)

publish_clear()
publish_robot_trajs(retract_q, 0.15)
publish_world(place_world, merged = True)
publish_grasp_object(attached_object_for_rendering)




In [19]:
### Plan motion

trajs_retract_place_approach_result = plan_motion_attached_object(start_state, place_approach_pose, attached_object, attached_state, place_world, 0.02)
trajs_retract_place = trajs_retract_place_approach_result[0].cpu().numpy()
trajs_retract_place = np.vstack([trajs_retract_place, place_q])

#render
publish_clear()
publish_robot_trajs(trajs_retract_place, 0.15)
publish_grasp_object(attached_object_for_rendering)
publish_world(place_world, merged = True)




In [20]:
### Resolve IK Jumping

place_approach_state = JointState.from_position(trajs_retract_place_approach_result[0][-1], joint_names=robot_kin_model.joint_names)
place_result = plan_ik_seed_config(place_pose, place_world, place_approach_state, 0.15)
place_q = place_result[0].cpu().numpy()
trajs_retract_place = trajs_retract_place_approach_result[0].clone().cpu().numpy()
trajs_retract_place = np.vstack([trajs_retract_place, place_q])

# render
publish_clear()
publish_robot_trajs(trajs_retract_place, 0.15)
publish_grasp_object(attached_object_for_rendering)
publish_world(place_world, merged = True)

## 4.3 pick_approach -> place_approach

In [21]:
pick_approach_q = torch.tensor(trajs_retract_pick[-2, :].reshape(1, -1), **tensor_args.as_torch_dict())
place_approach_q = torch.tensor(trajs_retract_place[-2, :].reshape(1, -1), **tensor_args.as_torch_dict())
start_state = JointState.from_position(pick_approach_q, joint_names=robot_kin_model.joint_names)
goal_state = JointState.from_position(place_approach_q, joint_names=robot_kin_model.joint_names)


trajs_pick_place_approach_result = plan_motion_js_attached_object(start_state, goal_state, attached_object, attached_state, place_world, 0.15)
trajs_pick_place = trajs_pick_place_approach_result.cpu().numpy()
trajs_pick_place = np.vstack([pick_q, trajs_pick_place, place_q])

#render
publish_clear()
publish_robot_trajs(trajs_pick_place, 0.15)
publish_grasp_object(attached_object_for_rendering)
publish_world(place_world, merged = True)