In [1]:
import numpy as np
import scipy
from tqdm import tqdm

def svd_rot(M):
    U, _, VT = np.linalg.svd(M)
    V = VT.T
    R = U @ V.T
    if np.linalg.det(R) < 0:
        V[:, -1] *= -1
        R = U @ V.T
    return R

def rigid_transform(ps, qs):
    
    pmean = np.mean(ps, axis=0)
    qmean = np.mean(qs, axis=0)
    pbars = ps - pmean
    qbars = qs - qmean

    # project onto space of orthogonal matrices
    M = qbars.T @ pbars
    R = svd_rot(M)
    t = qmean - R @ pmean

    # write transformation matrix
    T = np.eye(4)
    T[:3, :3] = R
    T[:3, 3] = t
    return T


def sample_3d_point(r=1):
    rho = r * (1 + (np.random.rand() / 2))
    theta = np.random.rand() * 2 * np.pi
    phi = np.random.rand() * np.pi
    return np.array([
        rho * np.cos(theta) * np.sin(phi),
        rho * np.sin(theta) * np.sin(phi),
        rho * np.cos(phi),
    ])

def R_and_T(T):
    return T[:3, :3], T[:3, 3]

def icp(source_pcd, target_pcd, max_attempts=10, max_iters=1000, finish_loop_thresh=1e-5, acceptable_thresh=1e-5, pbar=False, seed=0):
    """Iterative closest point.
    
    Args:
        source_pcd (np.ndarray): [N1, 3]
        target_pcd (np.ndarray): [N2, 3]
    
    Returns:
        np.ndarray: [4, 4] rigid transformation to align source to target.
    """

    np.random.seed(seed)

    targ_center = np.mean(target_pcd, axis=0)
    targ_rad = np.max(np.linalg.norm(target_pcd - targ_center, axis=1))

    attempt_dists = []
    attempt_Ts = []

    attempt_iter = tqdm(range(max_attempts)) if pbar else range(max_attempts)
    for attempt in attempt_iter:

        ps = source_pcd 
        # for retries:
        # (1) move to point on 4*targ_rad ball around targ pcd
        # (2) randomly rotate
        if attempt > 0:
            ps = ps - np.mean(ps, axis=0) + targ_center
            ps = ps + sample_3d_point(r = 2 * targ_rad)
            ps = ps @ svd_rot(np.random.rand(3, 3))

        last_dist = np.inf

        tree = scipy.spatial.KDTree(target_pcd)
        qs = target_pcd[tree.query(ps)[1]]
        for _ in range(max_iters):

            # get transformation
            T = rigid_transform(ps, qs)

            # transform pt cld
            ps = ps @ T[:3, :3].T + T[:3, 3]

            # update qs
            qs = target_pcd[tree.query(ps)[1]]

            # if avg_dist doesn't change, ret early, else keep going
            avg_dist = np.mean(np.linalg.norm(ps - qs, axis=1))
            if np.abs(avg_dist - last_dist) <= finish_loop_thresh:
                break
            last_dist = avg_dist
        
        # if error acceptably small, continue
        T = rigid_transform(source_pcd, ps)
        if avg_dist <= acceptable_thresh:
            return T
            
        # else, we save all Ts and ret whichever is best
        attempt_dists.append(avg_dist)
        attempt_Ts.append(T)

    # ret transformation
    return attempt_Ts[np.argmin(attempt_dists)]

In [2]:
def pred(ds, data_idx, pose_evaluator, max_attempts=100, max_iters=1000):
    from learning.utils import IDX_TO_OBJ_NAMES
    cloud, model, obj_idx, pose = ds[data_idx]
    if len(cloud) < 1:
        R_pred, t_pred = R_and_T(np.eye(4))
    else:
        R_pred_cloud, t_pred_cloud = R_and_T(icp(cloud, model, max_attempts=max_attempts, max_iters=max_iters))
        R_pred, t_pred = R_and_T(rigid_transform(model, (model - t_pred_cloud) @ R_pred_cloud))
    return pose_evaluator.evaluate(IDX_TO_OBJ_NAMES[obj_idx[0]], R_pred, pose[:3, :3], t_pred, pose[:3, 3])

In [3]:
def pred_over_ds(data_dir='processed_for_icp/val', max_attempts=100, max_iters=1000):
    from benchmark_utils.pose_evaluator import PoseEvaluator
    pose_evaluator = PoseEvaluator()

    from learning.load import PoseDataset
    ds = PoseDataset(data_dir=data_dir, cloud=True, model=True, transform=None)
    
    successes, rre_syms, rres, rtes = [], [], [], []
    pbar = tqdm(range(len(ds)))
    for i in pbar:
        eval = pred(ds, i, pose_evaluator, max_attempts=max_attempts, max_iters=max_iters)
        rre_syms.append(eval['rre_symmetry'])
        rres.append(eval['rre'])
        rtes.append(eval['rte'])
        successes.append(int(rre_syms[-1] <= 5 and rtes[-1] <= 0.01))
        pbar.set_description('\t'.join([
            'running rates', f'success={np.mean(successes):.4f}',
            f'rre_sym={np.mean(rre_syms):.4f}', f'rte={np.mean(rtes):.4f}']
        ))
    return np.mean(successes)

In [4]:
# pred_over_ds(data_dir='processed_for_icp_2/val', max_attempts=10, max_iters=1000)

In [5]:
# pred_over_ds(data_dir='processed_for_icp_2/val', max_attempts=100, max_iters=1000)

In [6]:
from pathlib import Path
from learning.load import PoseDataset
import pickle

def pred_over_raw_data(output_json_name='icp_pred.json', processed_data_dir='processed_for_icp_2', max_attempts=100, max_iters=1000, thresh=1e-5):
    def get_stripped_lines(fp, levels=[1, 2]):
        return [x.strip() for x in open(fp, 'r').readlines() if int(x[0]) in levels]

    raw_data_dir = Path('raw_data')
    raw_test_dir = raw_data_dir / 'testing_data'
    raw_test_obj_dir = raw_test_dir / 'v2.2'
    processed_data_dir = Path(processed_data_dir)
    processed_test_dir = processed_data_dir / 'test'

    test_scene_names = get_stripped_lines(raw_test_dir / 'test.txt')
    test_ds = PoseDataset(data_dir=processed_test_dir, train=False, cloud=True, model=True, transform=None)

    data_point_num = 0
    all_data = dict()
    pbar = tqdm(test_scene_names)
    for scene_name in pbar:
        meta_path = raw_test_obj_dir / f'{scene_name}_meta.pkl'
        meta = pickle.load(open(meta_path, 'rb'))

        scene_data = dict(poses_world=[None] * 79)

        for obj_id, obj_name in zip(meta['object_ids'], meta['object_names']):

            pbar.set_description(f'dp_num={data_point_num}')

            cloud, model, _ = test_ds[data_point_num]
            if len(cloud) < 1:
                T = np.eye(4)
            else:
                R_pred_cloud, t_pred_cloud = R_and_T(icp(cloud, model, max_attempts=max_attempts, max_iters=max_iters, finish_loop_thresh=thresh, acceptable_thresh=thresh))
                T = rigid_transform(model, (model - t_pred_cloud) @ R_pred_cloud)

            scene_data['poses_world'][obj_id] = T.tolist()

            data_point_num += 1

        all_data[scene_name] = scene_data

    import json
    with open(output_json_name, 'w') as fp:
        json.dump(all_data, fp)

In [None]:
pred_over_raw_data(output_json_name='icp_pred_no_thresh.json', processed_data_dir='icp/processed_for_icp_2',  max_attempts=100, max_iters=1000, thresh=-np.inf)