## Import Libraries and Utility Functions

#### Import Libraries

In [8]:
from camera.zed_ros.zed_ros import ZedRos
import cv2
import numpy as np
import glob
import re
import open3d as o3d
import plotly.graph_objs as go
import plotly.express as px
import os
from scipy.spatial.transform import Rotation as R


#### Plotting points

In [9]:
def plot_multi_np(plist):
    """
    Plots multiple point clouds in the same plot using plotly
    Assumes points are in meters coordinates.
    
    Args: 
        plist: list of np arrays of shape (N, 3)
    """
    colors = [
        '#1f77b4',  # muted blue
        '#ff7f0e',  # safety orange
        '#2ca02c',  # cooked asparagus green
        '#d62728',  # brick red
        '#9467bd',  # muted purple
    ]
    skip = 1
    go_data = []
    for i in range(len(plist)):
        p_dp = plist[i]
        plot = go.Scatter3d(x=p_dp[::skip,0], y=p_dp[::skip,1], z=p_dp[::skip,2], 
                     mode='markers', marker=dict(size=2, color=colors[i],
                     symbol='circle'))
        go_data.append(plot)
 
    layout = go.Layout(
        scene=dict(
            aspectmode='data',
        ),
        height=1200,
        width=1200,
    )
    fig = go.Figure(data=go_data, layout=layout)
    
    colors = ['red', 'green', 'blue']  # X, Y, Z axis colors
    
    fig = go.Figure()
    
    T = []
    T.append(np.eye(4))

    # for tf in T:
    #     origin = tf[:3, 3]
    #     axes = tf[:3, :3]

    #     for i in range(3):
    #         axis_end = origin + 0.3*axes[:, i]
    #         fig.add_trace(go.Scatter3d(
    #             x=[origin[0], axis_end[0]],
    #             y=[origin[1], axis_end[1]],
    #             z=[origin[2], axis_end[2]],
    #             mode='lines',
    #             line=dict(color=colors[i], width=4),
    #             name='Axis ' + str(i+1)
    #         ))

    for plot in go_data:
        fig.add_trace(plot)
    
    # add axis lines and camera view
    fig.update_layout(scene=dict(
        xaxis=dict(title='X'),
        yaxis=dict(title='Y'),
        zaxis=dict(title='Z'),
        camera = dict(
                      eye=dict(x=-1.30, y=0, z=-.25),
                      center=dict(x=0., y=0, z=-0.25),
                      up=dict(x=0, y=0, z=1),
                     )
        ),
        height=800,
        width=1200,
    )    
    
    fig.show()

#### Transform points using a TF matrix

In [10]:
def transform_pcd(pcd, tf=None, tf_filepath=None):
    """
    Transforms a point cloud using a transformation matrix
    
    Args:
        pcd: open3d.geometry.PointCloud
        tf: 4x4 numpy array
        tf_filepath: str, path to a .npy file containing a 4x4 transformation matrix
        
    Returns:
        open3d.geometry.PointCloud
    """
    if tf_filepath is not None:
        tf = np.load(tf_filepath)
        
    if tf is not None:
        pcd.transform(tf)
        
    return pcd

#### Crop Regions

In [11]:
# table boundaries
table_boundaries = {
    'min': np.array([-0.5, -2, -1]),
    'max': np.array([1.0, 2, 5])
}

# board boundaries
board_boundaries = {
    'min': np.array([0.0, -2, 0.15]),
    'max': np.array([0.5, -0.0, 0.3])
}

# gripper boundaries
gripper_boundaries = {
    'min': np.array([0.2, -0.5, 0.3]),
    'max': np.array([0.35, -0.05, .5])
}

ih_camera_focus = {
    'min': np.array([-0.053, -0.028, 0.0]),
    'max': np.array([0.053, 0.025, 0.3])
}

gripper_focus = {
    'min': np.array([-0.2, -0.06, 0.0]),
    'max': np.array([0.2, 0.06, 0.20]),

}

boundaries = {
    'table': table_boundaries,
    'board': board_boundaries,
    'gripper': gripper_boundaries,
    'ih_camera_focus': ih_camera_focus,
    'gripper_focus': gripper_focus
}

In [12]:
def crop_points(points, boundary: str):
    """
    Crops a point cloud based on a boundary
    Assmes points are in meters coordinates.
    
    Args:
        points:     np array of shape (N, 3)
        boundary:   str, key in boundaries dict. 
                    Predefined boundaries are 'table', 'board', 'gripper'
                    For custom boundaries, add to `boundaries` dict
        
    Returns:
        np array of shape (M, 3)
    """
    if boundary in boundaries:
        world_boundaries = boundaries[boundary]
    else:
        print('Bounds not found')
        return

    #deep copy points
    tmp_points = np.copy(points)
    
    tmp_points[np.isinf(tmp_points)] = np.nan
    tmp_points[np.where(tmp_points[:, 0] < world_boundaries['min'][0])] = np.nan
    tmp_points[np.where(tmp_points[:, 1] < world_boundaries['min'][1])] = np.nan
    tmp_points[np.where(tmp_points[:, 2] < world_boundaries['min'][2])] = np.nan
    tmp_points[np.where(tmp_points[:, 0] > world_boundaries['max'][0])] = np.nan
    tmp_points[np.where(tmp_points[:, 1] > world_boundaries['max'][1])] = np.nan
    tmp_points[np.where(tmp_points[:, 2] > world_boundaries['max'][2])] = np.nan
    tmp_points = tmp_points[np.where(~np.isnan(tmp_points).any(axis=1))]
    
    return tmp_points

## Visualize Pointcloud(s)

In [70]:
def plot_results(time):
    action = np.load(f'/home/mfi/repos/rtc_vision_toolbox/data/demonstrations/09-11-wp/execute_data/{time}/action_points.npy')
    anchor = np.load(f'/home/mfi/repos/rtc_vision_toolbox/data/demonstrations/09-11-wp/execute_data/{time}/anchor_points.npy')
    predicted = np.load(f'/home/mfi/repos/rtc_vision_toolbox/data/demonstrations/09-11-wp/execute_data/{time}/pred_place_points.npy')
    plot_multi_np([action, anchor, predicted])

In [72]:
plot_results('1010_1643')

In [123]:
npz = np.load("/home/mfi/repos/rtc_vision_toolbox/data/demonstrations/09-19-usb/learn_data/train/0_teleport_obj_points.npz")
npz2 = np.load("/home/mfi/repos/rtc_vision_toolbox/data/demonstrations/09-19-usb/learn_data/train/5_teleport_obj_points.npz")

In [None]:
# print(npz.files)

action = npz['clouds'][np.where(npz['classes'] == 0)]
anchor = npz['clouds'][np.where(npz['classes'] == 1)]

action2 = npz2['clouds'][np.where(npz2['classes'] == 0)]
anchor2 = npz2['clouds'][np.where(npz2['classes'] == 1)]

# print(action.shape)
# print(anchor.shape)

# plot_multi_np([action, action2])
plot_multi_np([anchor2, action2])

In [None]:
rgb = cv2.imread("/home/mfi/repos/rtc_vision_toolbox/data/demonstrations/09-18-usb/teach_data/img_data/demo5_gripper_close_up_view0_cam2_closeup_rgb.png")
rgb = cv2.cvtColor(rgb, cv2.COLOR_BGR2RGB)

px.imshow(rgb).show()

In [69]:
pcd = o3d.io.read_point_cloud("/home/mfi/repos/rtc_vision_toolbox/data/demonstrations/09-11-wp/execute_data/1010_1506/pcd_data/ih_camera_view_cam3_gripper_pointcloud.ply")
pcd = pcd.uniform_down_sample(50)
print(np.asarray(pcd.points).shape)
points = np.asarray(pcd.points)

# points = np.load('/home/mfi/repos/rtc_vision_toolbox/data/demonstrations/09-11-wp/execute_data/1010_1510/anchortest.npy')
plot_multi_np([points])

(5904, 3)


## Estimate Error

In [62]:
exp_dir = "/home/mfi/repos/rtc_vision_toolbox/data/demonstrations/09-19-usb/execute_data/"
exp_id = [f for f in os.listdir(exp_dir) if os.path.isdir(os.path.join(exp_dir, f))]

T_ee2target = [[1, 0, 0, 0],
                [0, 1, 0, 0],
                [0, 0, 1, 0.212],
                [0, 0, 0, 1]]  

total_t_error = 0
total_rot_error = 0
count = 0

for id in exp_id:
    try:
        predicted_tf = np.load(f"{exp_dir}/{id}/pose_data/placement_pose.npy") @ T_ee2target
        gt_tf = np.load(f"{exp_dir}/{id}/pose_data/ground_truth.npy") @ T_ee2target

        rot_error = ((predicted_tf) @ np.linalg.inv(gt_tf))[:3,:3]
        euler = R.from_matrix(rot_error).as_euler('xyz', degrees=True)
        rot_error = np.round(np.max(np.abs(euler)),2)
        t_error = np.linalg.norm(predicted_tf[:3,3] - gt_tf[:3,3])*1000
        t_error2 = np.linalg.norm(predicted_tf[:2,3] - gt_tf[:2,3])*1000
        
        total_t_error += t_error2
        total_rot_error += rot_error
        count += 1
        # print(f"\nID: {id} Rotation error: {rot_error}\u00B0,\tTranslation error: {np.round(t_error,2)}, {np.round(t_error2,2)} mm")    
    except FileNotFoundError:
        # print(f"\nID: {id} not found")
        pass

print(f"\nUSB 0919 \t{count} -> Average rotation error: {np.round(total_rot_error/count,2)}\u00B0,\tAverage translation error: {np.round(total_t_error/count,2)} mm")


USB 0919 	8 -> Average rotation error: 5.58°,	Average translation error: 4.35 mm


In [63]:
exp_dir = "/home/mfi/repos/rtc_vision_toolbox/data/demonstrations/09-11-usb/execute_data/"
exp_id = [f for f in os.listdir(exp_dir) if os.path.isdir(os.path.join(exp_dir, f))]

T_ee2target = [[1, 0, 0, 0],
                [0, 1, 0, 0],
                [0, 0, 1, 0.212],
                [0, 0, 0, 1]]  

total_t_error = 0
total_rot_error = 0
count = 0

for id in exp_id:
    try:
        predicted_tf = np.load(f"{exp_dir}/{id}/pose_data/placement_pose.npy") @ T_ee2target
        gt_tf = np.load(f"{exp_dir}/{id}/pose_data/ground_truth.npy") @ T_ee2target

        rot_error = ((predicted_tf) @ np.linalg.inv(gt_tf))[:3,:3]
        euler = R.from_matrix(rot_error).as_euler('xyz', degrees=True)
        rot_error = np.round(np.max(np.abs(euler)),2)
        t_error = np.linalg.norm(predicted_tf[:3,3] - gt_tf[:3,3])*1000
        t_error2 = np.linalg.norm(predicted_tf[:2,3] - gt_tf[:2,3])*1000
        
        total_t_error += t_error2
        total_rot_error += rot_error
        count += 1
        # print(f"\nID: {id} Rotation error: {rot_error}\u00B0,\tTranslation error: {np.round(t_error,2)}, {np.round(t_error2,2)} mm")    
    except FileNotFoundError:
        # print(f"\nID: {id} not found")
        pass

print(f"\nUSB 0911 \t{count} -> Average rotation error: {np.round(total_rot_error/count,2)}\u00B0,\tAverage translation error: {np.round(total_t_error/count,2)} mm")


USB 0911 	13 -> Average rotation error: 4.46°,	Average translation error: 2.87 mm


In [64]:
exp_dir = "/home/mfi/repos/rtc_vision_toolbox/data/demonstrations/08-14-wp/execute_data/"

# get list of all dir names in exp_dir
exp_id = [f for f in os.listdir(exp_dir) if os.path.isdir(os.path.join(exp_dir, f))]

T_ee2target = [[1, 0, 0, 0],
                [0, 1, 0, 0],
                [0, 0, 1, 0.212],
                [0, 0, 0, 1]]  

total_t_error = 0
total_rot_error = 0
count = 0

for id in exp_id:
    try:
        predicted_tf = np.load(f"{exp_dir}/{id}/pose_data/placement_pose.npy") @ T_ee2target
        gt_tf = np.load(f"{exp_dir}/{id}/pose_data/ground_truth.npy") @ T_ee2target

        rot_error = ((predicted_tf) @ np.linalg.inv(gt_tf))[:3,:3]
        euler = R.from_matrix(rot_error).as_euler('xyz', degrees=True)
        rot_error = np.round(np.max(np.abs(euler)),2)
        t_error = np.linalg.norm(predicted_tf[:3,3] - gt_tf[:3,3])*1000
        t_error2 = np.linalg.norm(predicted_tf[:2,3] - gt_tf[:2,3])*1000
        
        total_t_error += t_error2
        total_rot_error += rot_error
        count += 1
        # print(f"\nID: {id} Rotation error: {rot_error}\u00B0,\tTranslation error: {np.round(t_error,2)}, {np.round(t_error2,2)} mm")    
    except FileNotFoundError:
        # print(f"\nID: {id} not found")
        pass

print(f"\nWP-C 0814\t{count} -> Average rotation error: {np.round(total_rot_error/count,2)}\u00B0,\tAverage translation error: {np.round(total_t_error/count,2)} mm")


WP-C 0814	146 -> Average rotation error: 2.55°,	Average translation error: 1.85 mm


In [65]:
exp_dir = "/home/mfi/repos/rtc_vision_toolbox/data/demonstrations/09-08-dsub/execute_data/"

# get list of all dir names in exp_dir
exp_id = [f for f in os.listdir(exp_dir) if os.path.isdir(os.path.join(exp_dir, f))]

T_ee2target = [[1, 0, 0, 0],
                [0, 1, 0, 0],
                [0, 0, 1, 0.212],
                [0, 0, 0, 1]]  

total_t_error = 0
total_rot_error = 0
count = 0

for id in exp_id:
    try:
        predicted_tf = np.load(f"{exp_dir}/{id}/pose_data/placement_pose.npy") @ T_ee2target
        gt_tf = np.load(f"{exp_dir}/{id}/pose_data/ground_truth.npy") @ T_ee2target

        rot_error = ((predicted_tf) @ np.linalg.inv(gt_tf))[:3,:3]
        euler = R.from_matrix(rot_error).as_euler('xyz', degrees=True)
        rot_error = np.round(np.max(np.abs(euler)),2)
        t_error = np.linalg.norm(predicted_tf[:3,3] - gt_tf[:3,3])*1000
        t_error2 = np.linalg.norm(predicted_tf[:2,3] - gt_tf[:2,3])*1000
        
        total_t_error += t_error2
        total_rot_error += rot_error
        count += 1
        # print(f"\nID: {id} Rotation error: {rot_error}\u00B0,\tTranslation error: {np.round(t_error,2)}, {np.round(t_error2,2)} mm")    
    except FileNotFoundError:
        # print(f"\nID: {id} not found")
        pass

print(f"\nDSUB 0908\t{count} -> Average rotation error: {np.round(total_rot_error/count,2)}\u00B0,\tAverage translation error: {np.round(total_t_error/count,2)} mm")


DSUB 0908	18 -> Average rotation error: 2.67°,	Average translation error: 1.42 mm
