In [1]:
import os
import numpy as np
import pandas as pd
import copy
import open3d as o3d
import cv2
import pickle
from tqdm import tqdm
from PIL import Image
import shutil
import time

from utilities import PointCloudGen, get_angular_error

Jupyter environment detected. Enabling Open3D WebVisualizer.
[Open3D INFO] WebRTC GUI backend enabled.
[Open3D INFO] WebRTCWindowSystem: HTTP handshake server disabled.


In [2]:
class FPFH_RANSAC:

    @staticmethod
    def preprocess_point_cloud(pcd: PointCloudGen, voxel_size):
        pcd_down = copy.deepcopy(pcd)

        radius_normal = voxel_size * 2
        pcd_down.estimate_normals(
            o3d.geometry.KDTreeSearchParamHybrid(radius=radius_normal, max_nn=30))

        radius_feature = voxel_size * 5
        pcd_fpfh = o3d.pipelines.registration.compute_fpfh_feature(
            pcd_down,
            o3d.geometry.KDTreeSearchParamHybrid(radius=radius_feature, max_nn=100))
        return pcd_down, pcd_fpfh

    @staticmethod        
    def run(cur_pc: PointCloudGen, voxel_size = 0.05, verbose = True,
            transform_inplace = True):
        start = time.time()

        source_down, source_fpfh = FPFH_RANSAC.preprocess_point_cloud(cur_pc.pcd_visible, voxel_size)
        target_down, target_fpfh = FPFH_RANSAC.preprocess_point_cloud(cur_pc.pcd_original, voxel_size)

        distance_threshold = voxel_size * 2

        result = o3d.pipelines.registration.registration_ransac_based_on_feature_matching(
            source_down, target_down, source_fpfh, target_fpfh, True, distance_threshold,
            o3d.pipelines.registration.TransformationEstimationPointToPoint(False),
            4, [
                o3d.pipelines.registration.CorrespondenceCheckerBasedOnEdgeLength(
                    0.9),
                o3d.pipelines.registration.CorrespondenceCheckerBasedOnDistance(
                    distance_threshold)
            ], o3d.pipelines.registration.RANSACConvergenceCriteria(1000, 200))

        end = time.time()
        
        if transform_inplace:
            cur_pc.pcd_visible.transform(result.transformation)

        expected_transformation = cur_pc.expected_transformation()
        estimated_transformation = result.transformation
        
        if verbose:
            print("Expected rotation: ")
            print(expected_transformation[:3, :3])
            print("Estimated rotation: ")
            print(estimated_transformation[:3, :3])
            print("Error (rad): ")
            print(get_angular_error(expected_transformation[:3,:3], estimated_transformation[:3, :3]))

            print("Expected translation: ")
            print(expected_transformation[:3, 3])
            print("Estimated translation: ")
            print(estimated_transformation[:3, 3])
            print("Error (m): ")
            print(np.linalg.norm(expected_transformation[:3, 3] - estimated_transformation[:3, 3]))

            print("Time taken (s): ", end - start)

        return {
            "expected_rotation": expected_transformation[:3, :3],
            "estimated_rotation": estimated_transformation[:3, :3],
            "error_rotation": get_angular_error(expected_transformation[:3,:3], estimated_transformation[:3, :3]),
            "expected_translation": expected_transformation[:3, 3],
            "estimated_translation": estimated_transformation[:3, 3],
            "error_translation": np.linalg.norm(expected_transformation[:3, 3] - estimated_transformation[:3, 3]),
            "time_taken": end - start
        }

In [3]:
STORE_TEXTUAL_RESULTS = True

In [4]:
ply_path = "./data/office_chair_ext_points.ply"
cur_pc = PointCloudGen(ply_path)

voxel_sizes = [
    0.03, 0.02, 0.05
]

voxel_size_dirs = {
    0.03: "./outputs/office_chair/3K_points/",
    0.02: "./outputs/office_chair/6K_points/",
    0.05: "./outputs/office_chair/1K_points/"
}

test_rotations = [
    [60, 10, 10],
    [30, 5, 5,],
    [10, 60, 10],
    [5, 30, 5],
    [10, 10, 60],
    [5, 5, 30],
    [45, 45, 10],
    [45, 10, 45],
    [10, 45, 45],
    [120, 10, 10],
    [10, 120, 10],
    [10, 10, 120]
]

test_translations = [
    [0, 0, 0],
    [1, 0, 0],
    [0, 1, 0],
    [0, 0, 1],
    [1, 1, 0],
    [0, 1, 1],
    [1, 0, 1],
    [1, 1, 1],
    [0.2, 0.2, 0],
    [0.4, 0.1, 0.4],
    [0.3, 0.7, 0.1]
]

d = cur_pc.pcd_original_diameter
camera_positions = [
    [d, d, 0],
    [0, d, -d],
    [d, 0, d],
    [0, d, d]
]

In [5]:
# Takes 10 minutes on i7

progress_bar = tqdm(total=len(test_rotations) * len(test_translations) * len(camera_positions) * len(voxel_sizes), desc='Creating pcds')

for voxel_size in voxel_sizes:
    cur_source_base_dir = voxel_size_dirs[voxel_size]

    frame_counter = 0

    for rot in test_rotations:
        for trans in test_translations:
            for cur_camera_pos in camera_positions:

                source_dir = os.path.join(cur_source_base_dir, f"{frame_counter}")

                cur_pc = PointCloudGen()
                cur_pc.load_from_dir(source_dir)

                results = FPFH_RANSAC.run(cur_pc, voxel_size = voxel_size, verbose=False)
                
                files_to_remove = [
                    os.path.join(source_dir, "03_fast_FPFH_RANSAC_output_transformation.png"),
                    os.path.join(source_dir, "03_fast_FPFH_RANSAC_results.pkl"),
                    os.path.join(source_dir, "03_fast_FPFH_RANSAC_results.txt")
                ]
                
                for file_path in files_to_remove:
                    if os.path.exists(file_path):
                        os.remove(file_path)

                cur_pc.save_viewport_image(os.path.join(source_dir, "03_fast_FPFH_RANSAC_output_transformation.png"), highlight_pcd_visible_colour=[0, 0, 1])

                # for faster loading
                with open(os.path.join(source_dir, "03_fast_FPFH_RANSAC_results.pkl"), "wb") as pickle_file:
                    pickle.dump(results, pickle_file)

                if STORE_TEXTUAL_RESULTS:
                    results_str = "\n".join([f"{key}: {value}" for key, value in results.items()])
                    with open(os.path.join(source_dir, "03_fast_FPFH_RANSAC_results.txt"), 'w') as text_file:
                        text_file.write(results_str)

                frame_counter += 1
                progress_bar.update(1)

Creating pcds: 100%|██████████| 1584/1584 [14:09<00:00,  2.48it/s]