In [1]:
#Environment set-up and libraries

#Base libraries
import numpy as np
import random
import torch
import torch.nn as nn
from datetime import datetime

#Plotting libraries
%matplotlib inline
from mpl_toolkits import mplot3d
import matplotlib.pyplot as plt
import plotly
import plotly.graph_objects as go

#Utilities libraries
from glob import glob 
import os

import open3d as o3d

Jupyter environment detected. Enabling Open3D WebVisualizer.
[Open3D INFO] WebRTC GUI backend enabled.
[Open3D INFO] WebRTCWindowSystem: HTTP handshake server disabled.


In [2]:
def load_file(file_name):
    print(file_name)

    if file_name.endswith(".las") or file_name.endswith(".laz"):
        print("[INFO] .las (.laz) file loading")
        try:
            # import lidar .las data and assign to variable
            pcd = laspy.read(file_name)
            # examine the available features for the lidar file we have read
            # list(las.point_format.dimension_names)
            #
            # set(list(las.classification))

            # Creating, Filtering, and Writing Point Cloud Data
            # To create 3D point cloud data, we can stack together with the X, Y, and Z dimensions, using Numpy like this.
            point_data = np.stack([pcd.X, pcd.Y, pcd.Z], axis=0).transpose((1, 0))
            pcd = o3d.geometry.PointCloud()
            pcd.points = o3d.utility.Vector3dVector(point_data)
            # points = point_data
            if pcd is not None:
                print("[Info] Successfully read", file_name)

                # Point cloud
                return pcd

        except Exception:
            print(".las, .laz file load failed")

    elif file_name.endswith(".e57"):
        print("[INFO] .e57 file loading")
        try:
            e57_file = pye57.E57(file_name)

            # other attributes can be read using:
            data = e57_file.read_scan(0)

            # 'data' is a dictionary with the point types as keys
            # assert isinstance(data["cartesianX"], np.ndarray)
            # assert isinstance(data["cartesianY"], np.ndarray)
            # assert isinstance(data["cartesianZ"], np.ndarray)

            point_xyz = np.stack([data["cartesianX"], data["cartesianY"], data["cartesianZ"]]).transpose((1, 0))
            # points_rgb = [data["colorRed"], data["colorGreen"], data["colorBlue"]]
            # points_intensity = data["intensity"]

            pcd = o3d.geometry.PointCloud()
            pcd.points = o3d.utility.Vector3dVector(point_xyz)
            # points = o3d.utility.Vector3dVector(point_xyz)
            # points = point_xyz
            # pcd.colors = o3d.utility.Vector3dVector(points_rgb)
            # pcd.colors[:, 0] = points_intensity
            print("[Info] Successfully read", file_name)
            return pcd

        except Exception:
            print(".e57 file load failed")

    elif file_name.endswith(".bin"):
        print("[INFO] .bin file loading")
        try:
            size_float = 4
            list_pcd = []
            with open(file_name, "rb") as f:
                byte = f.read(size_float * 4)
                while byte:
                    x, y, z, intensity = struct.unpack("ffff", byte)
                    list_pcd.append([x, y, z])
                    byte = f.read(size_float * 4)
            np_pcd = np.asarray(list_pcd)
            pcd = o3d.geometry.PointCloud()
            pcd.points = o3d.utility.Vector3dVector(np_pcd)
            print("[Info] Successfully read", file_name)
            return pcd

        except Exception:
            print(".bin file load failed")

    elif file_name.endswith(".ply"):
        pcd = o3d.io.read_point_cloud(file_name)
        points_xyz = np.asarray(pcd.points)
        #pcd = o3d.geometry.PointCloud() # No need to do that already a PointCloud
        pcd.points = o3d.utility.Vector3dVector(points_xyz)
        # points = points_xyz
        if pcd is not None:
            print("[Info] Successfully read", file_name)
            # Point cloud
            return pcd

    elif file_name.endswith(".pts"):
        try:
            with open(file_name, "r") as f:
                # Log every 1000000 lines.
                LOG_EVERY_N = 1000000
                points_np = []
                for line in f:
                    if len(line.split()) == 4:
                        x, y, z, i = [num for num in line.split()]
                        points_np.append([float(x), float(y), float(z), float(i)])
                        if (len(points_np) % LOG_EVERY_N) == 0:
                            print('point', len(points_np))
                    elif len(line.split()) == 3:
                        x, y, z = [num for num in line.split()]
                        points_np.append([float(x), float(y), float(z)])
                        if (len(points_np) % LOG_EVERY_N) == 0:
                            print('point', len(points_np))
                    elif len(line.split()) == 5:
                        x, y, z, i, zeroes_v = [num for num in line.split()]
                        points_np.append([float(x), float(y), float(z), float(i)])
                        if (len(points_np) % LOG_EVERY_N) == 0:
                            print('point', len(points_np))
                    elif len(line.split()) == 7:
                        x, y, z, r, g, b, i = [num for num in line.split()]
                        points_np.append([float(x), float(y), float(z),
                                          float(r), float(g), float(b),
                                          float(i)])
                        if (len(points_np) % LOG_EVERY_N) == 0:
                            print('point', len(points_np))
                    else:
                        print("[Info] The file has unregistered format")
                        return
            print('loop end')
            points_arr = np.array(points_np).transpose()
            print(len(points_arr))
            point_xyz = points_arr[:3].transpose()
            print("xyz points shape", point_xyz.shape)
            pcd = o3d.geometry.PointCloud()
            pcd.points = o3d.utility.Vector3dVector(point_xyz)
            if len(points_arr) == 4:
                points_intensity = (points_arr[3])/255.0
                print("intensity points len", points_intensity.shape)
                points_intensity_rgb = np.vstack((points_intensity,
                                                  points_intensity,
                                                  points_intensity)).T
                print("intensity_rgb points shape", points_intensity_rgb.shape)
                pcd.colors = o3d.utility.Vector3dVector(points_intensity_rgb)
            elif len(points_arr) == 7:
                points_red = (points_arr[4]) / 255.0
                points_green = (points_arr[5]) / 255.0
                points_blue = (points_arr[6]) / 255.0
                points_rgb = np.vstack((points_red,
                                        points_green,
                                        points_blue)).T

                # points_intensity = ((points_arr[3]) / 255.0).T
                # print("intensity points len", points_intensity.shape)
                print("rgb points shape", points_rgb.shape)
                pcd.colors = o3d.utility.Vector3dVector(points_rgb)
                #pcd.intensities = o3d.utility.Vector3dVector(points_intensity)
            if pcd is not None:
                print("[Info] Successfully read", file_name)
                # Point cloud
                return pcd

        except Exception:
            print("[Info] Reading .pts file failed", file_name)

    # elif file_name.endswith(".kml"):
    #     try:
    #         with open(file_name, "r") as f:
    #             # Log every 1000000 lines.
    #             LOG_EVERY_N = 1000000
    #             points_np = []
    #             for line in f:
    #                 print(line)
    #                 if len(line.split(",")) == 3 and (line[0].isdigit() or line.startswith("-")):
    #                     y, x, z = [num for num in line.split(",")]
    #                     points_np.append([float(x), float(y), float(z)])
    #                     if (len(points_np) % LOG_EVERY_N) == 0:
    #                         print('point', len(points_np))
    #                 else:
    #                     print("[Info] The file has unregistered format")
    #         print('loop end')
    #         points_arr = np.array(points_np).transpose()
    #         print(len(points_arr))
    #         point_xyz = points_arr[:3].transpose()
    #         # points_intensity = points_arr[3]
    #         pcd = o3d.geometry.PointCloud()
    #         pcd.points = o3d.utility.Vector3dVector(point_xyz)
    #         if pcd is not None:
    #             print("[Info] Successfully read", file_name)
    #             # Point cloud
    #             return pcd
    #
    #     except Exception:
    #         print("[Info] Reading .kml file failed", file_name)

    else:
        pcd = None
        geometry_type = o3d.io.read_file_geometry_type(file_name)
        print(geometry_type)

        mesh = None
        if geometry_type & o3d.io.CONTAINS_TRIANGLES:
            mesh = o3d.io.read_triangle_model(file_name)
        if mesh is None:
            print("[Info]", file_name, "appears to be a point cloud")
            cloud = None
            try:
                cloud = o3d.io.read_point_cloud(file_name)
                # print(type(cloud))
            except Exception:
                print("[Info] Unknown filename", file_name)
            if cloud is not None:
                print("[Info] Successfully read", file_name)

                if not cloud.has_normals():
                    cloud.estimate_normals()
                cloud.normalize_normals()
                pcd = cloud
                #points = cloud.points
                pcd.points = o3d.utility.Vector3dVector(cloud.points)
            else:
                print("[WARNING] Failed to read points", file_name)

        if pcd is not None or mesh is not None:
            try:
                if mesh is not None:
                    # Triangle model
                    _scene.scene.add_model("__model__", mesh)
                else:
                    # Point cloud
                    return pcd

            except Exception as e:
                print(e)


In [3]:
filepath_mob1 = "/home/mekala/PycharmProjects/SabreProject_code/Sabre_proj/SABRE - Selected Static Scan Data/SABRE ADVANCED 3D - Selected MMS Data/"
filename1 = "SABRE MMS_S3 - 0002.pts"
filepath1 = filepath_mob1 + filename1
pc1 = load_file(filepath1)

filepath_static2 = "/home/mekala/PycharmProjects/SabreProject_code/Sabre_proj/SABRE - Selected Static Scan Data/SABRE - Selected Static Scan Data/"
filename2 = "SABRE Static Scan_T17_003.pts"
filepath2 = filepath_static2 + filename2
pc2 = load_file(filepath2)

/home/mekala/PycharmProjects/SabreProject_code/Sabre_proj/SABRE - Selected Static Scan Data/SABRE ADVANCED 3D - Selected MMS Data/SABRE MMS_S3 - 0002.pts
point 1000000
point 2000000
point 3000000
point 4000000
point 5000000
point 6000000
point 7000000
point 8000000
point 9000000
point 10000000
point 11000000
loop end
4
xyz points shape (11008825, 3)
intensity points len (11008825,)
intensity_rgb points shape (11008825, 3)
[Info] Successfully read /home/mekala/PycharmProjects/SabreProject_code/Sabre_proj/SABRE - Selected Static Scan Data/SABRE ADVANCED 3D - Selected MMS Data/SABRE MMS_S3 - 0002.pts
/home/mekala/PycharmProjects/SabreProject_code/Sabre_proj/SABRE - Selected Static Scan Data/SABRE - Selected Static Scan Data/SABRE Static Scan_T17_003.pts
point 1000000
point 2000000
point 3000000
point 4000000
point 5000000
point 6000000
point 7000000
point 8000000
point 9000000
point 10000000
point 11000000
point 12000000
point 13000000
point 14000000
point 15000000
point 16000000
point 17

In [4]:
def preprocess_point_cloud(pcd, voxel_size):
    pcd_down = pcd.voxel_down_sample(voxel_size)
    pcd_down.estimate_normals(
        o3d.geometry.KDTreeSearchParamHybrid(radius=voxel_size * 2.0,
                                             max_nn=30))
    pcd_fpfh = o3d.pipelines.registration.compute_fpfh_feature(
        pcd_down,
        o3d.geometry.KDTreeSearchParamHybrid(radius=voxel_size * 5.0,
                                             max_nn=100))
    return pcd_down, pcd_fpfh

In [5]:
# Downsampling
voxel_size = 0.3

pc1_down, pc1_fpfh = preprocess_point_cloud(pc1, voxel_size)
pc2_down, pc2_fpfh = preprocess_point_cloud(pc2, voxel_size)

In [6]:
device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')
print(device)

cuda


In [7]:
pc1_down_points = np.asarray(pc1_down.points)
pc2_down_points = np.asarray(pc2_down.points)
print(pc1_down_points.shape)
print(pc2_down_points.shape)

(56746, 3)
(123234, 3)


In [11]:
print("voxel_size = ", voxel_size)
distance_threshold = 2.5 * voxel_size
print("Distance threshold: ", distance_threshold)
mutual_filter = True
print("mutual_filter = ", mutual_filter)
max_iterations = 1000000
print("max_iterations = ", max_iterations)
max_validation = np.min([len(pc1_down.points), len(pc2_down.points)]) // 2
print("max_validation = ", max_validation)

# getting the current date and time
start = datetime.now()
# getting the date and time from the current date and time in the given format
start_date_time = start.strftime("%m/%d/%Y, %H:%M:%S")
print('\nRANSAC Started', start_date_time, '\n')
print('Running RANSAC\n')
result = o3d.pipelines.registration.registration_ransac_based_on_feature_matching(
    pc1_down, pc2_down, pc1_fpfh, pc2_fpfh,
    mutual_filter=mutual_filter,
    max_correspondence_distance=distance_threshold,
    estimation_method=o3d.pipelines.registration.
    TransformationEstimationPointToPoint(True),
    ransac_n=3,
    checkers=[
        o3d.pipelines.registration.CorrespondenceCheckerBasedOnEdgeLength(0.9),
        o3d.pipelines.registration.CorrespondenceCheckerBasedOnDistance(distance_threshold)
    ],
    criteria=o3d.pipelines.registration.RANSACConvergenceCriteria(
        max_iterations, max_validation))  # max_validation replaces args.confidence in mobile-static
# getting the current date and time
finish = datetime.now()
# getting the date and time from the current date and time in the given format
finish_date_time = finish.strftime("%m/%d/%Y, %H:%M:%S")
print('RANSAC Finished', finish_date_time,
      "\nGlobal registration took %.3f sec.\n" % (finish - start).total_seconds())


voxel_size =  0.3
Distance threshold:  0.75
mutual_filter =  True
max_iterations =  1000000
max_validation =  28373

RANSAC Started 10/05/2023, 11:33:19 

Running RANSAC

RANSAC Finished 10/05/2023, 11:33:24 
Global registration took 4.778 sec.



In [12]:
trans = result.transformation
print("The estimated transformation matrix:")
print(trans)
print("Saving the transformation matrix in ransac_transformation_matrix.txt ...")
np.savetxt('ransac_transformation_matrix.txt', trans)
print("")

The estimated transformation matrix:
[[-1.08126246e+00 -1.38211475e-01  1.43432818e-02  1.82059464e+06]
 [ 1.38643162e-01 -1.08060222e+00  3.89046418e-02  7.55750455e+06]
 [ 9.28521138e-03  4.04114555e-02  1.08936557e+00 -2.60582322e+05]
 [ 0.00000000e+00  0.00000000e+00  0.00000000e+00  1.00000000e+00]]
Saving the transformation matrix in ransac_transformation_matrix.txt ...



In [13]:
pc1_down_ransac = pc1_down.transform(result.transformation)
pc1_ransac = pc1.transform(result.transformation)

In [39]:
o3d.visualization.draw_geometries([pc1_ransac, pc2])

[Open3D INFO] Window window_0 created.
[Open3D INFO] EGL headless mode enabled.
[Open3D INFO] ICE servers: {"stun:stun.l.google.com:19302", "turn:user:password@34.69.27.100:3478", "turn:user:password@34.69.27.100:3478?transport=tcp"}
[Open3D INFO] Set WEBRTC_STUN_SERVER environment variable add a customized WebRTC STUN server.
[Open3D INFO] WebRTC Jupyter handshake mode enabled.


KeyboardInterrupt: 

In [14]:
pc1_down_ransac_points = np.asarray(pc1_down_ransac.points)
print(pc1_down_ransac_points.shape)
print(pc2_down_points.shape)

(56746, 3)
(123234, 3)


#### Voxel boxes

In [29]:
# Compute the bounding box of the (pc1) point cloud
min_bound = np.min(pc1_down_ransac_points, axis=0)
max_bound = np.max(pc1_down_ransac_points, axis=0)

# Determine the voxel size in each dimension
num_voxels = [10, 20, 1]
voxel_box_size = (max_bound - min_bound) / num_voxels
print(f"Voxel box size: {voxel_box_size} \n")

voxel_boxes = []

for i in range(num_voxels[0]):
    for j in range(num_voxels[1]):
        for k in range(num_voxels[2]):
            # Compute the center of the voxel
            center = min_bound + voxel_box_size * (np.array([i, j, k]) + 0.5)
            # Create an oriented bounding box for the voxel
            obb = o3d.geometry.OrientedBoundingBox(center=center, R=np.eye(3), extent=voxel_box_size)
            voxel_boxes.append(obb)
print(len(voxel_boxes),type(voxel_boxes),voxel_boxes)

Voxel box size: [15.4568427  12.16199458 40.42341137] 

200 <class 'list'> [OrientedBoundingBox: center: (371308, 796826, 82.381), extent: 15.4568, 12.162, 40.4234), OrientedBoundingBox: center: (371308, 796838, 82.381), extent: 15.4568, 12.162, 40.4234), OrientedBoundingBox: center: (371308, 796850, 82.381), extent: 15.4568, 12.162, 40.4234), OrientedBoundingBox: center: (371308, 796862, 82.381), extent: 15.4568, 12.162, 40.4234), OrientedBoundingBox: center: (371308, 796874, 82.381), extent: 15.4568, 12.162, 40.4234), OrientedBoundingBox: center: (371308, 796886, 82.381), extent: 15.4568, 12.162, 40.4234), OrientedBoundingBox: center: (371308, 796899, 82.381), extent: 15.4568, 12.162, 40.4234), OrientedBoundingBox: center: (371308, 796911, 82.381), extent: 15.4568, 12.162, 40.4234), OrientedBoundingBox: center: (371308, 796923, 82.381), extent: 15.4568, 12.162, 40.4234), OrientedBoundingBox: center: (371308, 796935, 82.381), extent: 15.4568, 12.162, 40.4234), OrientedBoundingBox: cen

In [21]:
def visualize_point_cloud_with_voxels(point_cloud, voxel_boxes):
    """
    Visualize a point cloud alongside voxel boxes.

    Parameters:
    - point_cloud: Nx3 numpy array representing the point cloud.
    - voxel_boxes: List of Open3D OrientedBoundingBox objects.
    """
    
    pcd = o3d.geometry.PointCloud()
    pcd.points = o3d.utility.Vector3dVector(point_cloud)
    print("Number of voxels:", len(voxel_boxes))

    o3d.visualization.draw_geometries([pcd, *voxel_boxes])
    
#     #Visualize
#     o3d.visualization.draw_plotly([pcd, *voxel_boxes],point_sample_factor=0.5, width=600, height=400)
#     visualizer = JVisualizer()
#     visualizer.add_geometry(pcd)
#     visualizer.add_geometry(*voxel_boxes)
#     visualizer.show()
    
    #draw(pcd)


In [31]:
# Create voxel dictionary for point cloud 1
voxel_dict1 = {}

for point in pc1_down_ransac_points:
    # Determine the voxel index for the point
    voxel_index = tuple(((point - min_bound) // voxel_size).astype(int))

    if voxel_index not in voxel_dict1:
        voxel_dict1[voxel_index] = []

    voxel_dict1[voxel_index].append(point)

# Count the number of points in each voxel
for voxel_index, points in voxel_dict1.items():
    print(f"Voxel {voxel_index} contains {len(points)} points.")
    
visualize_point_cloud_with_voxels(pc1_down_ransac_points, voxel_boxes)

Voxel (499, 295, 40) contains 2 points.
Voxel (500, 295, 41) contains 2 points.
Voxel (485, 481, 7) contains 1 points.
Voxel (502, 298, 45) contains 1 points.
Voxel (485, 489, 6) contains 1 points.
Voxel (488, 516, 11) contains 2 points.
Voxel (485, 457, 8) contains 2 points.
Voxel (491, 518, 16) contains 4 points.
Voxel (485, 501, 7) contains 5 points.
Voxel (486, 451, 9) contains 1 points.
Voxel (485, 480, 7) contains 1 points.
Voxel (507, 282, 55) contains 2 points.
Voxel (497, 280, 38) contains 2 points.
Voxel (489, 516, 12) contains 1 points.
Voxel (485, 488, 6) contains 1 points.
Voxel (492, 519, 17) contains 2 points.
Voxel (490, 517, 15) contains 4 points.
Voxel (499, 296, 40) contains 2 points.
Voxel (491, 519, 16) contains 2 points.
Voxel (489, 441, 16) contains 2 points.
Voxel (490, 518, 14) contains 1 points.
Voxel (498, 279, 39) contains 1 points.
Voxel (514, 292, 66) contains 1 points.
Voxel (514, 293, 66) contains 2 points.
Voxel (485, 450, 9) contains 2 points.
Voxel (4



In [32]:
# Calculating voxel size for each voxel (in number of points) 
# and finding voxel with the max number of points
max_vox_size = 0 
key_max_vox = ""

for k in voxel_dict1.keys(): 
    print(len(voxel_dict1[k])) 
    if max_vox_size < len(voxel_dict1[k]): 
        max_vox_size = len(voxel_dict1[k]) 
        key_max_vox = k 
print('\n') 
print("Max points in voxel:", max_vox_size) 
print("Vox with Max points index:", key_max_vox)
print("Number of voxels:", len(voxel_dict1.keys()))
print("Avg number of points in voxel:", len(pc1_down_ransac_points)//len(voxel_dict1.keys()))

2
2
1
1
1
2
2
4
5
1
1
2
2
1
1
2
4
2
2
2
1
1
1
2
2
1
1
1
1
4
2
4
1
2
2
1
1
2
1
2
2
1
1
1
1
2
2
1
2
1
1
1
1
2
1
2
1
1
1
2
1
1
2
2
1
3
1
1
1
1
1
2
2
2
1
2
2
1
1
1
2
1
2
1
1
1
1
2
3
2
3
2
2
1
1
1
1
1
1
1
1
2
1
1
1
1
1
2
1
1
1
1
4
1
1
1
2
3
1
1
1
2
1
1
1
1
2
1
1
2
1
1
1
1
2
1
1
2
1
1
1
1
1
1
2
1
1
2
1
2
1
2
1
3
1
2
2
1
2
1
2
1
1
1
1
1
1
1
1
1
3
1
2
1
1
4
1
2
1
1
1
1
1
1
1
1
1
3
1
1
2
3
1
1
1
1
1
1
2
1
1
1
3
2
1
1
2
1
1
1
1
2
1
1
2
3
1
1
1
1
1
2
1
1
2
1
1
2
1
3
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
2
1
1
1
1
1
1
1
1
2
1
1
3
1
1
1
2
1
1
1
2
1
1
1
1
1
1
1
1
1
1
2
1
2
1
1
1
1
1
1
1
1
1
2
1
1
1
3
2
2
1
1
1
1
1
1
1
1
1
2
1
1
1
1
1
1
2
1
1
2
1
1
2
1
2
1
2
1
1
1
1
1
1
1
1
2
1
1
1
1
1
1
1
2
3
2
1
1
1
1
1
1
2
1
1
1
1
1
1
2
1
1
1
1
1
2
2
1
1
1
1
1
1
1
1
1
1
2
1
1
2
1
2
1
1
1
1
1
1
1
1
1
2
1
1
1
1
1
1
1
1
1
2
1
1
1
1
1
1
1
2
1
2
1
1
1
1
2
1
1
2
1
1
1
1
1
1
1
2
1
1
1
1
1
1
1
1
2
1
1
1
1
1
4
2
1
2
2
2
1
1
1
3
1
1
1
1
1
1
1
1
1
2
1
1
1
1
2
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
3
1
2
1
1
1
1
1
1
1
1
1
1
2
1
1
2
2


#### Cropping point cloud 2 to the size of point cloud 1

In [40]:
# Crop point cloud 2 to the size of transformed point cloud 1
oriented_bounding_box = pc1_down_ransac.get_oriented_bounding_box()
oriented_bounding_box.color = (0, 1, 0)
pc2_down_croppped = pc2_down.crop(oriented_bounding_box)
o3d.visualization.draw_geometries([pc2_down_croppped, oriented_bounding_box])

In [49]:
o3d.visualization.draw_geometries([pc2_down_croppped, oriented_bounding_box, pc1_down_ransac])

In [50]:
def compute_chamfer_distance(pcd1, pcd2):
    """
    Compute the Chamfer distance between two point clouds.

    Parameters:
    - pcd1, pcd2: Open3D point cloud objects.

    Returns:
    - chamfer_distance: The Chamfer distance between the two point clouds.
    """
    
    # Compute distance from pcd1 to pcd2
    distances_1_to_2 = pcd1.compute_point_cloud_distance(pcd2)
    avg_distance_1_to_2 = np.mean([np.min(dist) for dist in distances_1_to_2])

    # Compute distance from pcd2 to pcd1
    distances_2_to_1 = pcd2.compute_point_cloud_distance(pcd1)
    avg_distance_2_to_1 = np.mean([np.min(dist) for dist in distances_2_to_1])

    # Compute the Chamfer distance
    chamfer_distance = (avg_distance_1_to_2 + avg_distance_2_to_1) / 2

    return chamfer_distance

In [51]:
chamfer_dist = compute_chamfer_distance(pc1_down_ransac, pc2_down_croppped)
print(f"Chamfer Distance: {chamfer_dist}")

Chamfer Distance: 4.310548850621758


#### RANSAC Evaluation

In [52]:
#RANSAC Evaluation

fitness = result.fitness
print("Fitness:")
print(fitness)
print("")

rmse = result.inlier_rmse
print("RMSE of all inlier correspondences:")
print(rmse)
print("")

# trans = result.transformation
# print("The estimated transformation matrix:")
# print(trans)
# print("Saving the transformation matrix in ransac_transformation_matrix.txt ...")
# np.savetxt('ransac_transformation_matrix.txt', trans)
# print("")

correspondences = result.correspondence_set
print("Correspondence Set:")
print(correspondences)
print("")

Fitness:
0.6134705529905191

RMSE of all inlier correspondences:
0.28231077169481944

Correspondence Set:
std::vector<Eigen::Vector2i> with 34812 elements.
Use numpy.asarray() to access data.



In [53]:
def registration_error(sour, targ, transformation):
    # # Make source and target of the same size
    # minimum_len = min(len(sour), len(targ))
    # source = sour[:minimum_len, :3]
    # target = sour[:minimum_len, :3]
    # # Apply transformation to point cloud
    # source_transformed = np.dot(transformation[:3, :3], source.T).T + transformation[:3, 3]
    # # Compute the difference between the transformed source and target point clouds
    # diff = np.subtract(target, source_transformed)
    # # RMSE of the difference
    # rmse = np.sqrt(np.mean(np.sum(diff ** 2, axis=1)))
    # # Compute the rotational error using quaternions
    # r = R.from_matrix(transformation)
    # q = r.as_quat()
    # q_target = R.from_matrix(np.identity(3)).as_quat()
    # rot_error = np.arccos(np.abs(np.dot(q, q_target))) * 180 / np.pi
    # # Compute the translational error
    # trans_error = np.linalg.norm(transformation - np.array([0, 0, 0]))
    # return rmse, rot_error, trans_error
    print('Calculating errors...')
    # Calculate the centroid of the source and target points
    source_centroid = np.mean(sour, axis=0)
    target_centroid = np.mean(targ, axis=0)
    print(f'Sour centroid: {source_centroid}')
    print(f'Targ centroid: {target_centroid}')

    # Calculate the covariance matrix of the source and target points
    source_covariance = np.cov(sour.T)
    target_covariance = np.cov(targ.T)

    # Calculate the singular value decomposition of the covariance matrices
    U_source, S_source, Vt_source = np.linalg.svd(source_covariance)
    U_target, S_target, Vt_target = np.linalg.svd(target_covariance)

    # Calculate the rotation matrix
    rot = Vt_target.T @ U_source.T

    # Calculate the translation vector
    transl = target_centroid - rot @ source_centroid
    print(f'Transl vector: {transl}')

    rot_err = rot - np.eye(3)
    # Mean Absolute error for each axis (row in rot_err)
    rot_mae_xyz = np.mean(np.abs(rot_err), axis=1)

    # Calculating translational error
    transl_xyz = np.divide(np.abs(transl), (np.abs(source_centroid)+np.abs(target_centroid)+np.abs(transl))/3)
    transl_xyz_mae = np.divide(transl_xyz, 100)
    # Calculate the mean squared error
    #mse = np.mean(np.sum((targ - (sour @ rot.T + transl)) ** 2, axis=1))

    return rot_mae_xyz, transl_xyz_mae


In [54]:
# We have pc1_down_ransac_points need pc2_down_croppped_points
pc2_down_croppped_points = np.asarray(pc2_down_croppped.points)

rot_err, transl_err = registration_error(pc1_down_ransac_points, pc2_down_croppped_points, trans)
print(f'Rotational MAE error xyz: {rot_err}, Translational MAE error xyz: {transl_err}')
print(f'Rotational MAE: {np.mean(rot_err)}, Translational MAE: {np.mean(transl_err)}')
print("")

Calculating errors...
Sour centroid: [3.71369173e+05 7.96959379e+05 6.61160557e+01]
Targ centroid: [3.71372772e+05 7.96931177e+05 7.09160305e+01]
Transl vector: [-118861.61942229   67952.64729698   36400.85257927]
Rotational MAE error xyz: [0.0667991  0.0794899  0.03746284], Translational MAE error xyz: [0.00413862 0.0012267  0.02988749]
Rotational MAE: 0.06125061446151112, Translational MAE: 0.011750934711295355



## Transformer

#### Input data preparation

In [55]:
# Convert NumPy arrays to PyTorch tensors
source_tensor = torch.FloatTensor(pc1_down_ransac_points)
target_tensor = torch.FloatTensor(pc2_down_croppped_points)

In [64]:
pcd_combined = pc1_down_ransac + pc2_down_croppped
pcd_combined_points = np.asarray(pcd_combined.points)
print(pc1_down_ransac_points.shape, pc2_down_croppped_points.shape, pcd_combined_points.shape)

# Compute the bounding box of the (pc1) point cloud
min_bound = np.min(pcd_combined_points, axis=0)
max_bound = np.max(pcd_combined_points, axis=0)

# Determine the voxel size in each dimension
num_voxels = [10, 20, 1]
voxel_box_size = (max_bound - min_bound) / num_voxels
print(f"Voxel box size: {voxel_box_size} \n")

voxel_size = ((max_bound - min_bound)/voxel_box_size)
print("voxel_size = ", voxel_size)

(56746, 3) (98836, 3) (155582, 3)
Voxel box size: [25.9076     12.81505025 40.42341137] 

voxel_size =  [10. 20.  1.]


In [66]:
# Define voxelization parameters
voxel_size = 0.3
print("voxel_size = ", voxel_size)

voxel_size =  0.3


In [67]:
# Voxel grid dimensions (Can I use voxel_boxes?)
voxel_grid = o3d.geometry.VoxelGrid.create_from_point_cloud(pc1_down_ransac + pc2_down_croppped, voxel_size)
min_bound, max_bound = voxel_grid.get_min_bound(), voxel_grid.get_max_bound()
voxel_grid_size = ((max_bound - min_bound) / voxel_size).astype(int)

print(voxel_grid, voxel_grid_size)

VoxelGrid with 102126 voxels. [865 855 136]


***

***

#### Point Clouds data exploration

In [8]:
# Preprocessing the point clouds data to arrange points 
# from both point clouds into the same Cartesian space
pc1_points_adj = []
pc1_mean = pc1_points_fsc.mean(axis=0)
for point in pc1_points_fsc:
    x_prm = point[0] - pc1_mean[0]
    y_prm = point[1] - pc1_mean[1]
    z_prm = point[2] - pc1_mean[2]
    pc1_points_adj.append([x_prm,y_prm,z_prm])
pc1_points_adj = np.array(pc1_points_adj)
print(pc1_points_adj.shape)

(56746, 3)


In [9]:
pc2_points_adj = []
pc2_mean = pc2_points_fsc.mean(axis=0)
for point in pc2_points_fsc:
    x_prm = point[0] - pc2_mean[0]
    y_prm = point[1] - pc2_mean[1]
    z_prm = point[2] - pc2_mean[2]
    pc2_points_adj.append([x_prm,y_prm,z_prm])
pc2_points_adj = np.array(pc2_points_adj)
print(pc2_points_adj.shape)
# print(pc2_points_adj)

(123234, 3)


In [10]:
print("pcd1 x-width, y-depth, z-hight", pc1_points_adj.max(axis=0)-pc1_points_adj.min(axis=0))
print("pcd2 x-width, y-depth, z-hight", pc2_points_adj.max(axis=0)-pc2_points_adj.min(axis=0))

pcd1 x-width, y-depth, z-hight [154.09143103 217.361       36.09      ]
pcd2 x-width, y-depth, z-hight [518.134      318.336       28.81333333]


In [11]:
# def create_voxel_boxes(point_cloud, num_voxels):
#     """
#     Create voxel boxes for a given point cloud and number of voxels along each axis.

#     Parameters:
#     - point_cloud: Nx3 numpy array representing the point cloud.
#     - num_voxels: List or tuple with the number of voxels along each axis (x, y, z).

#     Returns:
#     - voxel_boxes: List of Open3D OrientedBoundingBox objects.
#     """
    
#     # Compute the bounding box of the point cloud
#     min_bound = np.min(point_cloud, axis=0)
#     max_bound = np.max(point_cloud, axis=0)

#     # Determine the voxel size in each dimension
#     voxel_size = (max_bound - min_bound) / num_voxels
#     print(f"Voxel size: {voxel_size} \n")

#     voxel_boxes = []

#     for i in range(num_voxels[0]):
#         for j in range(num_voxels[1]):
#             for k in range(num_voxels[2]):
#                 # Compute the center of the voxel
#                 center = min_bound + voxel_size * (np.array([i, j, k]) + 0.5)
#                 # Create an oriented bounding box for the voxel
#                 obb = o3d.geometry.OrientedBoundingBox(center=center, R=np.eye(3), extent=voxel_size)
#                 voxel_boxes.append(obb)
                
#     voxel_dict = {}

#     for point in point_cloud:
#         # Determine the voxel index for the point
#         voxel_index = tuple(((point - min_bound) // voxel_size).astype(int))
        
#         if voxel_index not in voxel_dict:
#             voxel_dict[voxel_index] = []
        
#         voxel_dict[voxel_index].append(point)

#     return voxel_boxes, voxel_dict

In [12]:
def visualize_point_cloud_with_voxels(point_cloud, voxel_boxes):
    """
    Visualize a point cloud alongside voxel boxes.

    Parameters:
    - point_cloud: Nx3 numpy array representing the point cloud.
    - voxel_boxes: List of Open3D OrientedBoundingBox objects.
    """
    
    pcd = o3d.geometry.PointCloud()
    pcd.points = o3d.utility.Vector3dVector(point_cloud)
    print("Number of voxels:", len(voxel_boxes))

    o3d.visualization.draw_geometries([pcd, *voxel_boxes])
    
#     #Visualize
#     o3d.visualization.draw_plotly([pcd, *voxel_boxes],point_sample_factor=0.5, width=600, height=400)
#     visualizer = JVisualizer()
#     visualizer.add_geometry(pcd)
#     visualizer.add_geometry(*voxel_boxes)
#     visualizer.show()
    
    #draw(pcd)


In [15]:
# Compute the bounding box of the (pc1) point cloud
min_bound = np.min(pc1_points_adj, axis=0)
max_bound = np.max(pc1_points_adj, axis=0)

# Determine the voxel size in each dimension
num_voxels = [10, 20, 1]
voxel_size = (max_bound - min_bound) / num_voxels
print(f"Voxel size: {voxel_size} \n")

voxel_boxes = []

for i in range(num_voxels[0]):
    for j in range(num_voxels[1]):
        for k in range(num_voxels[2]):
            # Compute the center of the voxel
            center = min_bound + voxel_size * (np.array([i, j, k]) + 0.5)
            # Create an oriented bounding box for the voxel
            obb = o3d.geometry.OrientedBoundingBox(center=center, R=np.eye(3), extent=voxel_size)
            voxel_boxes.append(obb)

Voxel size: [15.4091431 10.86805   36.09     ] 



In [17]:
# Create voxel dictionary for point cloud 1
voxel_dict1 = {}

for point in pc1_points_adj:
    # Determine the voxel index for the point
    voxel_index = tuple(((point - min_bound) // voxel_size).astype(int))

    if voxel_index not in voxel_dict1:
        voxel_dict1[voxel_index] = []

    voxel_dict1[voxel_index].append(point)

# Count the number of points in each voxel
for voxel_index, points in voxel_dict1.items():
    print(f"Voxel {voxel_index} contains {len(points)} points.")
    
visualize_point_cloud_with_voxels(pc1_points_adj, voxel_boxes)

Voxel (0, 12, 0) contains 808 points.
Voxel (1, 7, 0) contains 2832 points.
Voxel (1, 6, 0) contains 776 points.
Voxel (1, 8, 0) contains 3126 points.
Voxel (0, 13, 0) contains 42 points.
Voxel (1, 12, 0) contains 198 points.
Voxel (1, 9, 0) contains 28 points.
Voxel (1, 13, 0) contains 224 points.
Voxel (0, 19, 0) contains 2 points.
Voxel (2, 7, 0) contains 4734 points.
Voxel (2, 6, 0) contains 548 points.
Voxel (2, 8, 0) contains 4826 points.
Voxel (2, 9, 0) contains 76 points.
Voxel (2, 5, 0) contains 120 points.
Voxel (3, 6, 0) contains 974 points.
Voxel (3, 7, 0) contains 3718 points.
Voxel (3, 5, 0) contains 64 points.
Voxel (3, 0, 0) contains 4 points.
Voxel (3, 8, 0) contains 4256 points.
Voxel (3, 9, 0) contains 1612 points.
Voxel (2, 13, 0) contains 22 points.
Voxel (2, 14, 0) contains 42 points.
Voxel (3, 14, 0) contains 36 points.
Voxel (4, 5, 0) contains 686 points.
Voxel (4, 0, 0) contains 60 points.
Voxel (4, 6, 0) contains 1036 points.
Voxel (4, 7, 0) contains 3324 poin

In [18]:
# Calculating voxel size for each voxel (in number of points) 
# and finding voxel with the max number of points
max_vox_size = 0 
key_max_vox = ""

for k in voxel_dict1.keys(): 
    print(len(voxel_dict1[k])) 
    if max_vox_size < len(voxel_dict1[k]): 
        max_vox_size = len(voxel_dict1[k]) 
        key_max_vox = k 
print('\n') 
print("Max points in voxel:", max_vox_size) 
print("Vox with Max points index:", key_max_vox)
print("Number of voxels:", len(voxel_dict1.keys()))
print("Avg number of points in voxel:", len(pc1_points_adj)//len(voxel_dict1.keys()))

808
2832
776
3126
42
198
28
224
2
4734
548
4826
76
120
974
3718
64
4
4256
1612
22
42
36
686
60
1036
3324
4274
1298
2642
2076
2246
4296
5190
3740
2624
4148
2252
310
4100
2472
2850
4140
1790
2060
746
2280
436
3348
3722
1768
670
224
1726
4328
1986
1764
612
128
78
304
330
448
158
26
80
46
490
154
4
282
34
66
12
104
88
110
12
36
46
6
58
40
12
4
6
2
8
2
8
6
2
2
2
2
2
2


Max points in voxel: 5190
Vox with Max points index: (9, 9, 0)
Number of voxels: 97
Avg number of points in voxel: 585


In [27]:
# Create voxel dictionary for pc2
voxel_dict2 = {}

# Iterate over pc2 points
for point in pc2_points_adj:
    # Determine the voxel index for the point
    voxel_index = tuple(((point - min_bound) // voxel_size).astype(int))

    # Check if the voxel index exists in pc1's voxel boxes
    if voxel_index in voxel_boxes:
        # Add the point to voxel_dict2 under the corresponding voxel index
        if voxel_index not in voxel_dict2:
            voxel_dict2[voxel_index] = []
        voxel_dict2[voxel_index].append(point)
        
# Count the number of points in each voxel
for voxel_index, points in voxel_dict2.items():
    print(f"Voxel {voxel_index} contains {len(points)} points.")

    
# Create a PointCloud object to store the points from voxel_dict2
pc2_adj_from_voxel_dict = o3d.geometry.PointCloud()

# Populate pc2_adj_from_voxel_dict with points from voxel_dict2
pc2_points = []
for voxel_index, points in voxel_dict2.items():
    pc2_points.extend(points)
pc2_adj_from_voxel_dict.points = o3d.utility.Vector3dVector(pc2_points)

print(voxel_dict2)
visualize_point_cloud_with_voxels(pc2_points, voxel_boxes)

{}
Number of voxels: 200


In [24]:
# Create a voxel_dictionary for cropped pc2 with voxels for pc1
# Compute the bounding box of the point cloud
cropped_pc2_adj_points = np.asarray(cropped_point_cloud.points)
voxel_dict2 = {}

for point in cropped_pc2_adj_points:
    # Determine the voxel index for the point
    voxel_index = tuple(((point - min_bound) // voxel_size).astype(int))

    if voxel_index not in voxel_dict:
        voxel_dict2[voxel_index] = []

    voxel_dict2[voxel_index].append(point)

visualize_point_cloud_with_voxels(cropped_pc2_adj_points, voxel_boxes)

KeyError: (0, 12, 0)

In [64]:
# pc2_points = np.asarray(pc2_down.points)
# print(pc2_points.shape)

num_voxels = [10, 20, 1]
voxel_boxes2, voxel_dict2 = create_voxel_boxes(pc2_points_adj, num_voxels)

# Count the number of points in each voxel
for voxel_index, points in voxel_dict2.items():    
    print(f"Voxel {voxel_index} contains {len(points)} points.")
    
visualize_point_cloud_with_voxels(pc2_points_adj, voxel_boxes)

Voxel size: [51.8134     15.9168     28.81333333] 

Voxel (2, 9, 0) contains 2112 points.
Voxel (3, 9, 0) contains 1133 points.
Voxel (6, 8, 0) contains 4728 points.
Voxel (7, 8, 0) contains 428 points.
Voxel (4, 9, 0) contains 4390 points.
Voxel (6, 9, 0) contains 1333 points.
Voxel (9, 8, 0) contains 99 points.
Voxel (7, 9, 0) contains 186 points.
Voxel (5, 9, 0) contains 5610 points.
Voxel (2, 8, 0) contains 1606 points.
Voxel (3, 8, 0) contains 1193 points.
Voxel (4, 8, 0) contains 7646 points.
Voxel (8, 9, 0) contains 589 points.
Voxel (9, 9, 0) contains 16 points.
Voxel (8, 10, 0) contains 750 points.
Voxel (9, 10, 0) contains 65 points.
Voxel (0, 7, 0) contains 23 points.
Voxel (1, 7, 0) contains 86 points.
Voxel (5, 8, 0) contains 8488 points.
Voxel (1, 6, 0) contains 215 points.
Voxel (0, 6, 0) contains 6 points.
Voxel (8, 11, 0) contains 623 points.
Voxel (9, 11, 0) contains 4 points.
Voxel (1, 5, 0) contains 506 points.
Voxel (0, 5, 0) contains 21 points.
Voxel (9, 12, 0) co

In [44]:

max_val = 0 
teacher = ""

for k in voxel_dict2.keys(): 
    print(len(voxel_dict2[k])) 
    if max_val < len(voxel_dict2[k]): 
        max_val = len(voxel_dict2[k]) 
        teacher = k 
print('\n') 
print(max_val) 
print(teacher)
print(len(voxel_dict2.keys()))
print(len(pc2_points_adj)//len(voxel_dict2.keys()))

2112
1133
4728
428
4390
1333
99
186
5610
1606
1193
7646
589
16
750
65
23
86
8488
215
6
623
4
506
21
83
431
2
39
4180
203
2
326
96
30
6
422
13
542
12
3
1
3981
7
7644
1646
3492
2
1
208
1066
4988
17
2210
3666
1
893
7508
2189
1174
1136
6176
788
4987
4534
1771
4932
1883
2434
263
66
1930
973
601
151
241
393
194
135
344
83
33
32
17
20
33
21
8
8
14
4
2
29
8
1
8
12
3
6
3
1
10
5
1


8488
(5, 8, 0)
104
1184


In [45]:
def compute_chamfer_distance(pcd1, pcd2):
    """
    Compute the Chamfer distance between two point clouds.

    Parameters:
    - pcd1, pcd2: Open3D point cloud objects.

    Returns:
    - chamfer_distance: The Chamfer distance between the two point clouds.
    """
    
    # Compute distance from pcd1 to pcd2
    distances_1_to_2 = pcd1.compute_point_cloud_distance(pcd2)
    avg_distance_1_to_2 = np.mean([np.min(dist) for dist in distances_1_to_2])

    # Compute distance from pcd2 to pcd1
    distances_2_to_1 = pcd2.compute_point_cloud_distance(pcd1)
    avg_distance_2_to_1 = np.mean([np.min(dist) for dist in distances_2_to_1])

    # Compute the Chamfer distance
    chamfer_distance = (avg_distance_1_to_2 + avg_distance_2_to_1) / 2

    return chamfer_distance

In [46]:
# points1_mean = pc1_points.mean(axis=0)
# points2_mean = pc2_points.mean(axis=0)
# pc1_points = (pc1_points - points1_mean)
# pc2_points = (pc2_points - points2_mean)
# print(pc1_points, pc2_points)

pcd1 = o3d.geometry.PointCloud()
pcd1.points = o3d.utility.Vector3dVector(pc1_points_adj)

pcd2 = o3d.geometry.PointCloud()
pcd2.points = o3d.utility.Vector3dVector(pc2_points_adj)

chamfer_dist = compute_chamfer_distance(pcd1, pcd2)
print(f"Chamfer Distance: {chamfer_dist}")

Chamfer Distance: 17.04458131853736


In [49]:
pcd1.estimate_normals(
            o3d.geometry.KDTreeSearchParamHybrid(radius=voxel_size * 2.0,
                                                 max_nn=30))
pcd1_fpfh = o3d.pipelines.registration.compute_fpfh_feature(
        pcd1,
        o3d.geometry.KDTreeSearchParamHybrid(radius=voxel_size * 5.0,
                                             max_nn=100))
pcd2.estimate_normals(
            o3d.geometry.KDTreeSearchParamHybrid(radius=voxel_size * 2.0,
                                                 max_nn=30))
pcd2_fpfh = o3d.pipelines.registration.compute_fpfh_feature(
        pcd2,
        o3d.geometry.KDTreeSearchParamHybrid(radius=voxel_size * 5.0,
                                             max_nn=100))

In [51]:
distance_threshold = 2.5 * voxel_size


In [62]:
result = o3d.pipelines.registration.registration_ransac_based_on_feature_matching(
        pcd1, pcd2, pcd1_fpfh, pcd2_fpfh, True,
        distance_threshold,
        mutual_filter=True,
        max_correspondence_distance=distance_threshold,
        estimation_method=o3d.pipelines.registration.
        TransformationEstimationPointToPoint(True),
        ransac_n=3,
        checkers=[
            o3d.pipelines.registration.CorrespondenceCheckerBasedOnEdgeLength(0.9),
            o3d.pipelines.registration.CorrespondenceCheckerBasedOnDistance(distance_threshold)
        ],
        o3d.pipelines.registration.RANSACConvergenceCriteria(100000, 0.999))

SyntaxError: positional argument follows keyword argument (3507712253.py, line 14)

In [53]:
fitness = result.fitness
print("Fitness:")
print(fitness)
print("")

rmse = result.inlier_rmse
print("RMSE of all inlier correspondences:")
print(rmse)
print("")

trans = result.transformation
print("The estimated transformation matrix:")
print(trans)
print("Saving the transformation matrix in ransac_transformation_matrix.txt ...")
np.savetxt('ransac_transformation_matrix.txt', trans)
print("")

correspondences = result.correspondence_set
print("Correspondence Set:")
print(correspondences)
print("")

Fitness:
0.9181440101504952

RMSE of all inlier correspondences:
0.24512461098479713

The estimated transformation matrix:
[[-2.19049814e-01 -1.03928916e-02  8.95576224e-04  1.79144652e+01]
 [-1.04283765e-02  2.18631423e-01 -1.35345773e-02  2.90522202e+01]
 [-2.51428179e-04 -1.35618444e-02 -2.18878159e-01 -6.95889742e+00]
 [ 0.00000000e+00  0.00000000e+00  0.00000000e+00  1.00000000e+00]]
Saving the transformation matrix in ransac_transformation_matrix.txt ...

Correspondence Set:
std::vector<Eigen::Vector2i> with 52101 elements.
Use numpy.asarray() to access data.



In [56]:
def registration_error(sour, targ, transformation):
    # # Make source and target of the same size
    # minimum_len = min(len(sour), len(targ))
    # source = sour[:minimum_len, :3]
    # target = sour[:minimum_len, :3]
    # # Apply transformation to point cloud
    # source_transformed = np.dot(transformation[:3, :3], source.T).T + transformation[:3, 3]
    # # Compute the difference between the transformed source and target point clouds
    # diff = np.subtract(target, source_transformed)
    # # RMSE of the difference
    # rmse = np.sqrt(np.mean(np.sum(diff ** 2, axis=1)))
    # # Compute the rotational error using quaternions
    # r = R.from_matrix(transformation)
    # q = r.as_quat()
    # q_target = R.from_matrix(np.identity(3)).as_quat()
    # rot_error = np.arccos(np.abs(np.dot(q, q_target))) * 180 / np.pi
    # # Compute the translational error
    # trans_error = np.linalg.norm(transformation - np.array([0, 0, 0]))
    # return rmse, rot_error, trans_error
    print('Calculating errors...')
    # Calculate the centroid of the source and target points
    source_centroid = np.mean(sour, axis=0)
    target_centroid = np.mean(targ, axis=0)
    print(f'Sour centroid: {source_centroid}')
    print(f'Targ centroid: {target_centroid}')

    # Calculate the covariance matrix of the source and target points
    source_covariance = np.cov(sour.T)
    target_covariance = np.cov(targ.T)

    # Calculate the singular value decomposition of the covariance matrices
    U_source, S_source, Vt_source = np.linalg.svd(source_covariance)
    U_target, S_target, Vt_target = np.linalg.svd(target_covariance)

    # Calculate the rotation matrix
    rot = Vt_target.T @ U_source.T

    # Calculate the translation vector
    transl = target_centroid - rot @ source_centroid
    print(f'Transl vector: {transl}')

    rot_err = rot - np.eye(3)
    # Mean Absolute error for each axis (row in rot_err)
    rot_mae_xyz = np.mean(np.abs(rot_err), axis=1)

    # Calculating translational error
    transl_xyz = np.divide(np.abs(transl), (np.abs(source_centroid)+np.abs(target_centroid)+np.abs(transl))/3)
    transl_xyz_mae = np.divide(transl_xyz, 100)
    # Calculate the mean squared error
    #mse = np.mean(np.sum((targ - (sour @ rot.T + transl)) ** 2, axis=1))

    return rot_mae_xyz, transl_xyz_mae


In [57]:
rot_err, transl_err = registration_error(pc1_points_adj, pc2_points_adj, trans)
print(f'Rotational MAE error xyz: {rot_err}, Translational MAE error xyz: {transl_err}')
print(f'Rotational MAE: {np.mean(rot_err)}, Translational MAE: {np.mean(transl_err)}')
print("")

Calculating errors...
Sour centroid: [-5.47380346e-09 -1.74165987e-08  4.58375319e-13]
Targ centroid: [ 1.66717002e-09 -9.69259248e-09  7.83185372e-13]
Transl vector: [ 9.05579524e-09  6.77118038e-09 -2.76559270e-09]
Rotational MAE error xyz: [0.04146623 0.09933892 0.06860678], Translational MAE error xyz: [0.01677334 0.00599567 0.02998654]
Rotational MAE: 0.06980397806224113, Translational MAE: 0.01758518050428177



# ViT image classification based Transformer
from: https://www.kaggle.com/code/umongsain/vision-transformer-from-scratch-pytorch

The next Block takes a batch of Point Cloud voxels as input and returns a batch of sequences. Later on we feed this sequences into the transformer encoder.

In [None]:
class Img2Seq(nn.Module):
    """
    This layers takes a batch of images as input and
    returns a batch of sequences
    
    Shape:
        input: (b, h, w, c)
        output: (b, s, d)
    """
    def __init__(self, img_size, patch_size, n_channels, d_model):
        super().__init__()
        self.patch_size = patch_size
        self.img_size = img_size

        nh, nw = img_size[0] // patch_size[0], img_size[1] // patch_size[1]
        n_tokens = nh * nw

        token_dim = patch_size[0] * patch_size[1] * n_channels
        self.linear = nn.Linear(token_dim, d_model)
        self.cls_token = nn.Parameter(torch.randn(1, 1, d_model))
        self.pos_emb = nn.Parameter(torch.randn(n_tokens, d_model))

    def __call__(self, batch):
        batch = patchify(batch, self.patch_size)

        b, c, nh, nw, ph, pw = batch.shape

        # Flattening the patches
        batch = torch.permute(batch, [0, 2, 3, 4, 5, 1])
        batch = torch.reshape(batch, [b, nh * nw, ph * pw * c])

        batch = self.linear(batch)
        cls = self.cls_token.expand([b, -1, -1])
        emb = batch + self.pos_emb

        return torch.cat([cls, emb], axis=1)

### Visual Transformer Module
This modules wraps up everything. We can divide this module into 3 parts:

- An image to sequence encoder
- Transformer encoder
- Multilayer perceptron head classification<br> 
<br> 
We use torch.nn.TransformerEncoder and torch.nn.TransformerEncoderLayer to implement our transformer encoder. I highly recommend to read the official documentation to learn more about the layers.

In [None]:
class ViT(nn.Module):
    def __init__(
        self,
        img_size,
        patch_size,
        n_channels,
        d_model,
        nhead,
        dim_feedforward,
        blocks,
        mlp_head_units,
        n_classes,
    ):
        super().__init__()
        """
        Args:
            img_size: Size of the image
            patch_size: Size of the patch
            n_channels: Number of image channels
            d_model: The number of features in the transformer encoder
            nhead: The number of heads in the multiheadattention models
            dim_feedforward: The dimension of the feedforward network model in the encoder
            blocks: The number of sub-encoder-layers in the encoder
            mlp_head_units: The hidden units of mlp_head
            n_classes: The number of output classes
        """
        self.img2seq = Img2Seq(img_size, patch_size, n_channels, d_model)

        encoder_layer = nn.TransformerEncoderLayer(
            d_model, nhead, dim_feedforward, activation="gelu", batch_first=True
        )
        self.transformer_encoder = nn.TransformerEncoder(
            encoder_layer, blocks
        )
        self.mlp = get_mlp(d_model, mlp_head_units, n_classes)
        
        self.output = nn.Sigmoid() if n_classes == 1 else nn.Softmax()

    def __call__(self, batch):

        batch = self.img2seq(batch)
        batch = self.transformer_encoder(batch)
        batch = batch[:, 0, :]
        batch = self.mlp(batch)
        output = self.output(batch)
        return output

### Model Hyperparameters

In [None]:
img_size = (512, 512)
patch_size = (16, 16)
n_channels = 1
d_model = 1024
nhead = 4
dim_feedforward = 2048
blocks = 8
mlp_head_units = [1024, 512]
n_classes = 1
# device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')

### Training and Validation

In [None]:
model = ViT(
    img_size = (512, 512),
    patch_size = (16, 16),
    n_channels = 1,
    d_model = 1024,
    nhead = 4,
    dim_feedforward = 1024,
    blocks = 8,
    mlp_head_units = [512, 512],
    n_classes = 1,
).to(device)

optimizer = Adam(model.parameters())
criterion = nn.BCELoss()

trainer = create_supervised_trainer(model, optimizer, criterion, device=device)
val_metrics = {
    "bce": Loss(criterion)
}
evaluator = create_supervised_evaluator(model, metrics=val_metrics, device=device)

In [None]:
log_interval = 10
max_epochs = 5
best_loss = float('inf')

RunningAverage(output_transform=lambda x: x).attach(trainer, 'loss')

pbar = ProgressBar()
pbar.attach(trainer, ['loss'])
    
@trainer.on(Events.EPOCH_COMPLETED)
def log_validation_results(trainer):
    global best_loss
    evaluator.run(val_loader)
    loss = evaluator.state.metrics['bce']
    if loss < best_loss:
        best_loss = loss
        torch.save(model.state_dict(), 'best_model_vit.pt')
    print(f"Validation Results - Epoch: {trainer.state.epoch} Avg loss: {loss:.2f}")
    
output_state = trainer.run(train_loader, max_epochs=max_epochs)

***

***

***

# ChatGPT based simple Transformer

In [16]:
# Define the Chamfer distance loss function
def chamfer_distance(p1, p2):
    p1 = p1.unsqueeze(1)  # Shape: (B, 1, N, 3)
    p2 = p2.unsqueeze(0)  # Shape: (1, B, M, 3)
    
    p1_repeat = p1.repeat(1, 1, 1, p2.shape[3])  # Shape: (B, B, N, 3)
    p2_repeat = p2.repeat(1, 1, p1.shape[2], 1)  # Shape: (B, B, N, 3)
    
    distances = torch.norm(p1_repeat - p2_repeat, dim=3, p=2)  # Shape: (B, B, N)
    
    min_distances_p1, _ = torch.min(distances, dim=2)  # Shape: (B, B)
    min_distances_p2, _ = torch.min(distances, dim=1)  # Shape: (B, B)
    
    chamfer_dist = torch.mean(min_distances_p1, dim=1) + torch.mean(min_distances_p2, dim=1)  # Shape: (B,)
    
    return chamfer_dist

In [17]:
# Convert NumPy arrays to PyTorch tensors
source_tensor = torch.FloatTensor(pc1_points)
target_tensor = torch.FloatTensor(pc2_points)

In [18]:
# Define voxelization parameters
voxel_size = 0.3

In [19]:
# Voxel grid dimensions (customize as needed)
voxel_grid = o3d.geometry.VoxelGrid.create_from_point_cloud(pcd1 + pcd2, voxel_size)
min_bound, max_bound = voxel_grid.get_min_bound(), voxel_grid.get_max_bound()
voxel_grid_size = ((max_bound - min_bound) / voxel_size).astype(int)

In [20]:
# Function to map points to voxel indices
def points_to_voxel_indices(points, voxel_size, min_bound):
    indices = ((points - min_bound) / voxel_size).astype(int)
    return indices

In [21]:
# Map points to voxel indices
source_indices = points_to_voxel_indices(pc1_points, voxel_size, min_bound)
target_indices = points_to_voxel_indices(pc2_points, voxel_size, min_bound)

In [22]:
# Create tensors to hold voxelized point clouds
source_voxels = torch.zeros(voxel_grid_size[0], voxel_grid_size[1], voxel_grid_size[2], dtype=torch.float32)
target_voxels = torch.zeros(voxel_grid_size[0], voxel_grid_size[1], voxel_grid_size[2], dtype=torch.float32)

In [23]:
# Assign points to voxels
source_voxels[source_indices[:, 0], source_indices[:, 1], source_indices[:, 2]] = 1.0  # Set occupancy to 1
target_voxels[target_indices[:, 0], target_indices[:, 1], target_indices[:, 2]] = 1.0  # Set occupancy to 1


In [24]:
# Define batch size
batch_size = 8

In [25]:
# Split voxelized point clouds into batches
source_batches = torch.split(source_voxels.unsqueeze(0), batch_size)
target_batches = torch.split(target_voxels.unsqueeze(0), batch_size)


In [26]:
# Concat source and target batches into tensors
source_data = torch.cat(source_batches, dim=0)
target_data = torch.cat(target_batches, dim=0)

In [27]:
from torch.utils.data import DataLoader, TensorDataset

In [28]:
# Ensure the feature dimensions of the source and target data match d_model
d_model = source_data.size(-1)  # Use the feature dimension of the data as d_model


In [29]:
# Combine source and target batches into a dataset
dataset = TensorDataset(source_data, target_data)

In [30]:
# class TransformerEncoder(nn.Module):
#     def __init__(self, d_model, nhead, num_encoder_layers):
#         super(TransformerEncoder, self).__init__()
#         self.transformer = nn.Transformer(d_model, nhead, num_encoder_layers)
#         self.fc = nn.Linear(3, d_model)

#     def forward(self, src):
#         src = self.fc(src)
#         return self.transformer.encoder(src)

# class TransformerDecoder(nn.Module):
#     def __init__(self, d_model, nhead, num_decoder_layers):
#         super(TransformerDecoder, self).__init__()
#         self.transformer = nn.Transformer(d_model, nhead, num_decoder_layers)
#         self.fc = nn.Linear(d_model, 3)

#     def forward(self, src, memory):
#         src = self.transformer.decoder(src, memory)
#         return self.fc(src)

# class PointCloudTransformer(nn.Module):
#     def __init__(self, d_model=512, nhead=8, num_encoder_layers=6, num_decoder_layers=6):
#         super(PointCloudTransformer, self).__init__()
#         self.encoder = TransformerEncoder(d_model, nhead, num_encoder_layers)
#         self.decoder = TransformerDecoder(d_model, nhead, num_decoder_layers)

#     def forward(self, src, tgt):
#         memory = self.encoder(src)
#         output = self.decoder(tgt, memory)
#         return output

# A simple Transformer-based registration model
class TransformerRegistration(nn.Module):
    def __init__(self, d_model):
        super(TransformerRegistration, self).__init__()

        # Define the Transformer layers here (customize as needed)
        self.transformer = nn.Transformer(d_model=d_model, nhead=d_model, num_encoder_layers=4, num_decoder_layers=4)

        # Add any additional layers for point cloud registration (e.g., FC layers)
        self.fc = nn.Linear(d_model, 3)  # 3 for 3D transformation vector

    def forward(self, source, target):
        # Flatten source and target for Transformer input
        source = source.view(source.size(0), -1, source.size(-1))
        target = target.view(target.size(0), -1, target.size(-1))
        
        # Perform point cloud registration using Transformer
        source_transformed = self.transformer(source, target)

        # Apply any additional layers for registration refinement
        transformation = self.fc(source_transformed)

        return transformation

In [109]:
# Initialize the transformer model and Define the optimizer
# model = PointCloudTransformer()
model = TransformerRegistration(d_model)
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

In [110]:
# Create a data loader
dataloader = DataLoader(dataset, batch_size=1, shuffle=True)


In [None]:
# output = model(source_tensor, target_tensor)


# Training loop
num_epochs = 10
for epoch in range(num_epochs):
    for source_batch, target_batch in dataloader:
        # Forward pass through the model
        transformed_points = model(source_batch, target_batch)

        # Calculate Chamfer distance loss
        loss = chamfer_distance(transformed_points, target_data)

        # Backpropagation
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
    
    # Print loss for monitoring
    print(f"Epoch [{epoch+1}/{num_epochs}] Loss: {loss.item()}")
#     if (epoch + 1) % 100 == 0:
#         print(f"Epoch [{epoch+1}/{num_epochs}] Loss: {loss.item()}")


In [None]:
# Align the source point cloud using the learned transformation
aligned_source_points = src_encoded.detach().numpy()

# Convert aligned source points back to Open3D format for visualization
aligned_source_pcd = o3d.geometry.PointCloud()
aligned_source_pcd.points = o3d.utility.Vector3dVector(aligned_source_points)

# Visualize the aligned point cloud
o3d.visualization.draw_geometries([aligned_source_pcd, target_pcd])