In [None]:
import cv2
import os
import numpy as np
import open3d as o3d
from PIL import Image
from transformers import pipeline
from tqdm import tqdm
from copy import deepcopy
from sklearn.neighbors import KDTree
import os

##Frame maker


In [None]:
video_path = "<Video IN>"
output_folder = "<Output folder for the frames>"
os.makedirs(output_folder, exist_ok=True)

save_fps = 0.5  #


cap = cv2.VideoCapture(video_path)
fps = cap.get(cv2.CAP_PROP_FPS)


frame_interval = int(fps / save_fps)

frame_count = 0
saved_count = 0

while True:
    ret, frame = cap.read()
    if not ret:
        break


    if frame_count % frame_interval == 0:
        frame_name = f"{output_folder}/frame_{saved_count:05d}.jpg"
        cv2.imwrite(frame_name, frame)
        saved_count += 1

    frame_count += 1

cap.release()
print(f" Saved {saved_count} frames at {save_fps} fps from video.")

##Frame to depth map and cloud

In [None]:
frames_folder = "<Input folder for frames>"
depth_folder = "<Output folder for the depth map>"
pcd_folder = "<Output folder for the point clouds>"

os.makedirs(depth_folder, exist_ok=True)
os.makedirs(pcd_folder, exist_ok=True)


depth_estimator = pipeline("depth-estimation", model="Intel/dpt-large")


fx = fy = 1100


for filename in tqdm(sorted(os.listdir(frames_folder))):
    if not (filename.lower().endswith(".jpg") or filename.lower().endswith(".png")):
        continue

    image_path = os.path.join(frames_folder, filename)
    frame = cv2.imread(image_path)
    if frame is None:
        print(f" Could not read image: {filename}")
        continue

    height, width = frame.shape[:2]
    rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
    rgb_pil = Image.fromarray(rgb)
    cx = width / 2
    cy = height / 2


    depth_output = depth_estimator(rgb_pil)
    depth_array = np.array(depth_output["depth"], dtype=np.float32)

    scale_factor = 25.0
    depth_scaled = depth_array / depth_array.max() * scale_factor

    alpha = 50.0
    depth_log = np.log1p(alpha * depth_scaled)

    depth_vis = cv2.normalize(depth_log, None, 0, 255, cv2.NORM_MINMAX)
    depth_vis = depth_vis.astype(np.uint8)
    depth_path = os.path.join(depth_folder, f"{os.path.splitext(filename)[0]}_depth.png")
    cv2.imwrite(depth_path, depth_vis)

    u, v = np.meshgrid(np.arange(width), np.arange(height))
    X = (u - cx) / fx * depth_log.max()
    Y = (v - cy) / fy * depth_log.max()
    Z = depth_log

    points = np.stack([X.flatten(), Y.flatten(), Z.flatten()], axis=-1)
    colors = rgb.reshape(-1, 3) / 255.0

    pcd = o3d.geometry.PointCloud()
    pcd.points = o3d.utility.Vector3dVector(points)
    pcd.colors = o3d.utility.Vector3dVector(colors)


    pcd.transform([[-1, 0, 0, 0],
                   [0, -1, 0, 0],
                   [0,  0, 1, 0],
                   [0,  0, 0, 1]])


    pcd_path = os.path.join(pcd_folder, f"{os.path.splitext(filename)[0]}_cloud.ply")
    o3d.io.write_point_cloud(pcd_path, pcd)


print("\n All frames processed successfully!")
print(f"Depth maps saved to: {depth_folder}")
print(f"Point clouds saved to: {pcd_folder}")






##Working with ICP threshold 0.3


In [None]:
pcd_folder = "<Individual point clouds folder> "
output_folder = "<Output folder for merged clouds>"
os.makedirs(output_folder, exist_ok=True)

voxel_size = 0.02
fgr_distance_mul = 5.0
icp_distance_mul = 2.0
fitness_threshold = 0.3
save_every = 2
visualize_steps = False


def preprocess(pcd, voxel_size):
    pcd_down = pcd.voxel_down_sample(voxel_size)
    pcd_down.estimate_normals(
        o3d.geometry.KDTreeSearchParamHybrid(radius=voxel_size*2, max_nn=30))
    fpfh = o3d.pipelines.registration.compute_fpfh_feature(
        pcd_down,
        o3d.geometry.KDTreeSearchParamHybrid(radius=voxel_size*5, max_nn=100)
    )
    return pcd_down, fpfh

def register_pair(src, tgt, voxel_size):
    src_down, src_fpfh = preprocess(src, voxel_size)
    tgt_down, tgt_fpfh = preprocess(tgt, voxel_size)
    distance_threshold = voxel_size * fgr_distance_mul
    fgr_option = o3d.pipelines.registration.FastGlobalRegistrationOption(
        maximum_correspondence_distance=distance_threshold)
    fgr_result = o3d.pipelines.registration.registration_fgr_based_on_feature_matching(
        src_down, tgt_down, src_fpfh, tgt_fpfh, fgr_option
    )
    icp_thresh = voxel_size * icp_distance_mul
    src.estimate_normals(o3d.geometry.KDTreeSearchParamHybrid(radius=voxel_size*2, max_nn=30))
    tgt.estimate_normals(o3d.geometry.KDTreeSearchParamHybrid(radius=voxel_size*2, max_nn=30))
    icp_result = o3d.pipelines.registration.registration_icp(
        src, tgt, icp_thresh, fgr_result.transformation,
        o3d.pipelines.registration.TransformationEstimationPointToPlane(),
        o3d.pipelines.registration.ICPConvergenceCriteria(max_iteration=50)
    )
    return icp_result.transformation, icp_result.fitness


files = sorted([f for f in os.listdir(pcd_folder) if f.endswith(".ply")])
print(files)
pcds = [o3d.io.read_point_cloud(os.path.join(pcd_folder, f)) for f in files]
print(f"Loaded {len(pcds)} point clouds.")

merged = deepcopy(pcds[0])
o3d.io.write_point_cloud(os.path.join(output_folder, f"merged_1.ply"), merged)

for i in range(1, len(pcds)):
    source = deepcopy(pcds[i])
    print(f"\n Aligning {files[i]} to merged scene...")

    transformation, fitness = register_pair(source, merged, voxel_size)
    print(f" ICP Fitness: {fitness:.4f}")

    if fitness >= fitness_threshold:
        print(f" Merging {files[i]} (fitness â‰¥ {fitness_threshold})")
        source.transform(transformation)
        merged += source
    else:
        print(f" Skipping {files[i]} (fitness {fitness:.4f} < {fitness_threshold})")


    if (i + 1) % save_every == 0 or (i + 1) == len(pcds):
        merged_path = os.path.join(output_folder, f"merged_up_to_{i+1}.ply")
        merged_down = merged.voxel_down_sample(voxel_size/2)
        merged_down.estimate_normals(o3d.geometry.KDTreeSearchParamHybrid(radius=voxel_size*2, max_nn=30))
        o3d.io.write_point_cloud(merged_path, merged_down)
        print(f" Saved intermediate merged cloud: {merged_path}")


final_path = os.path.join(output_folder, "<Final merged cloud name> ")
merged_down = merged.voxel_down_sample(voxel_size/2)
merged_down.estimate_normals(o3d.geometry.KDTreeSearchParamHybrid(radius=voxel_size*2, max_nn=30))
o3d.io.write_point_cloud(final_path, merged_down)
print(f"\n Final merged cloud saved to: {final_path}")


In case you want a smoothened cloud

In [None]:


pcd = o3d.io.read_point_cloud("<Input point cloud to smoothen>")
print("And reached here")
print("Original points:", len(pcd.points))


pcd, ind = pcd.remove_statistical_outlier(nb_neighbors=30, std_ratio=2.0)
pcd, ind = pcd.remove_radius_outlier(nb_points=20, radius=0.03)
pcd = pcd.voxel_down_sample(voxel_size=0.01)
print("After filtering and downsampling:", len(pcd.points))


pcd.estimate_normals(search_param=o3d.geometry.KDTreeSearchParamKNN(knn=30))


mesh, densities = o3d.geometry.TriangleMesh.create_from_point_cloud_poisson(
    pcd, depth=10
)

mask = densities < np.quantile(densities, 0.03)
mesh.remove_vertices_by_mask(mask)


mesh = mesh.filter_smooth_simple(number_of_iterations=5)
mesh.compute_vertex_normals()


o3d.io.write_triangle_mesh("<Final merged cloud path to be added here>", mesh)
print("Mesh vertices:", len(mesh.vertices))
print("Mesh faces:", len(mesh.triangles))
print("Mesh has colors?", mesh.has_vertex_colors())
o3d.visualization.draw_geometries([mesh])


##Projection Error Mean and Median

In [None]:
def compute_reprojection_error(pcd_file, rgb_file,
                               fx, fy, cx, cy,
                               R=np.eye(3), t=np.zeros((3,1))):

    pcd = o3d.io.read_point_cloud(pcd_file)
    points = np.asarray(pcd.points)
    if points.shape[0] == 0:
        raise ValueError(f"Point cloud {pcd_file} is empty.")

    img = cv2.imread(rgb_file, cv2.IMREAD_GRAYSCALE)
    if img is None:
        raise FileNotFoundError(f"Could not load image: {rgb_file}")

    edges = cv2.Canny(img, 80, 160)
    edge_pixels = np.column_stack(np.where(edges > 0))
    edge_pixels = np.fliplr(edge_pixels)

    if edge_pixels.shape[0] < 10:
        raise ValueError(f"Not enough edges in image: {rgb_file}")

    tree = KDTree(edge_pixels)


    pts_cam = (R @ points.T + t).T
    pts_cam = pts_cam[pts_cam[:, 2] > 0]

    x_proj = (fx * pts_cam[:, 0] / pts_cam[:, 2]) + cx
    y_proj = (fy * pts_cam[:, 1] / pts_cam[:, 2]) + cy
    proj_2d = np.vstack((x_proj, y_proj)).T


    distances, _ = tree.query(proj_2d, k=1)
    mean_error = float(np.mean(distances))
    median_error = float(np.median(distances))

    return mean_error, median_error


pcd_folder = "<Point Cloud folder>"
rgb_folder = "<RGB Image folder>"


pcd_files = sorted([f for f in os.listdir(pcd_folder) if f.endswith(".ply")])
rgb_files = sorted([f for f in os.listdir(rgb_folder) if f.lower().endswith((".jpg",".png"))])

mean_errors = []
median_errors = []

# These are for the device my data was captured in. Please change according to your needs
fx, fy = 1100, 1100
cx, cy = 1080/2, 1920/2

for pcd_name, rgb_name in zip(pcd_files, rgb_files):
    pcd_path = os.path.join(pcd_folder, pcd_name)
    rgb_path = os.path.join(rgb_folder, rgb_name)

    try:
        mean_err, med_err = compute_reprojection_error(
            pcd_path, rgb_path, fx, fy, cx, cy
        )
        mean_errors.append(mean_err)
        median_errors.append(med_err)
    except Exception as e:
        print(f"Skipped {pcd_name} / {rgb_name}: {e}")

overall_mean = np.mean(mean_errors)
overall_median = np.median(median_errors)

print(f"Mean: {overall_mean:.2f}px")
print(f"Median: {overall_median:.2f}px")
