### Setup & Feature Matching

Imports

In [None]:
!pip install open3d
!pip install pillow pillow-heif tqdm
!pip install exifread

In [None]:
import cv2
import numpy as np
import os

import open3d as o3d

from PIL import Image
import pillow_heif
from tqdm import tqdm

import matplotlib.pyplot as plt
import matplotlib.image as mpimg

import exifread

import json

from scipy.optimize import least_squares
from scipy.sparse import lil_matrix

from collections import defaultdict

from mpl_toolkits.mplot3d import Axes3D

In [None]:
#mounting drive to access images

from google.colab import drive
drive.mount('/content/drive')

Reading and Displaying Images

In [None]:
jpg_images = "/content/drive/MyDrive/cv/wall1"

In [None]:
#displaying the images

def display_images(num_pics):
    fig, axes = plt.subplots(1, num_pics, figsize=(15, 6))
    axes = axes.flatten()

    for i in range(1, num_pics + 1):
        img_path = os.path.join(jpg_images, f"{i}.jpg")
        if os.path.exists(img_path):
            img = mpimg.imread(img_path)
            ax = axes[i-1]
            ax.imshow(img)
            ax.axis('off')
            ax.set_title(f"{i}.jpg")

    plt.tight_layout()
    plt.show()

display_images(8)


Preprocessing

In [None]:
def preprocess_images(jpg_images, num_pics):
    preprocessed = []

    for i in range(1, num_pics + 1):
        img_path = os.path.join(jpg_images, f"{i}.jpg")
        if os.path.exists(img_path):
            img = cv2.imread(img_path)
            gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
            preprocessed.append(gray)

    return preprocessed

preprocessed = preprocess_images(jpg_images, 8)

ORB initiation

In [None]:
# NEW: Initialize the SIFT detector
sift = cv2.SIFT_create()

Running ORB model on first 5 images

In [None]:
def extract_features(preprocessed):
    keypoints = []
    descriptors = []

    sift = cv2.SIFT_create()

    for gray in preprocessed:
        kp, des = sift.detectAndCompute(gray, None)
        keypoints.append(kp)
        descriptors.append(des)

    return keypoints, descriptors

def match_consecutive_images(descriptors):
    bf = cv2.BFMatcher(cv2.NORM_L2, crossCheck=False)
    good_matches_list = []

    for i in range(len(descriptors) - 1):
        des1 = descriptors[i]
        des2 = descriptors[i + 1]

        matches = bf.knnMatch(des1, des2, k=2)

        good_matches = []
        for m, n in matches:
            if m.distance < 0.75 * n.distance:  # Lowe's ratio test
                good_matches.append(m)

        good_matches_list.append(good_matches)
        print(f"Image {i+1} and Image {i+2}: {len(good_matches)} good matches")

    return good_matches_list


keypoints, descriptors = extract_features(preprocessed)
good_matches_list = match_consecutive_images(descriptors)

In [None]:
#visualization of the feature matching

def visualize_feature_matches(jpg_images, keypoints, good_matches_list, max_pairs=5):
    for i, good_matches in enumerate(good_matches_list[:max_pairs]):
        img1 = cv2.imread(os.path.join(jpg_images, f"{i+1}.jpg"), cv2.IMREAD_COLOR)
        img2 = cv2.imread(os.path.join(jpg_images, f"{i+2}.jpg"), cv2.IMREAD_COLOR)

        kp1 = keypoints[i]
        kp2 = keypoints[i+1]

        img_matches = cv2.drawMatches(
            img1, kp1,
            img2, kp2,
            good_matches, None,
            flags=cv2.DrawMatchesFlags_NOT_DRAW_SINGLE_POINTS
        )

        plt.figure(figsize=(20, 10))
        plt.imshow(cv2.cvtColor(img_matches, cv2.COLOR_BGR2RGB))
        plt.title(f'ORB Feature Matches: Image {i+1} and Image {i+2}')
        plt.axis('off')
        plt.show()

visualize_feature_matches(jpg_images, keypoints, good_matches_list)

### Two-View Reconstruction

Essential Matrix Estimation

In [None]:
def convert_keypoints_to_arrays(keypoints, good_matches_list):
    all_pts1 = []
    all_pts2 = []

    for i in range(len(good_matches_list)):
        kp1 = keypoints[i]
        kp2 = keypoints[i + 1]
        good_matches = good_matches_list[i]

        pts1 = np.float32([kp1[m.queryIdx].pt for m in good_matches])
        pts2 = np.float32([kp2[m.trainIdx].pt for m in good_matches])

        all_pts1.append(pts1)
        all_pts2.append(pts2)

    return all_pts1, all_pts2

def compute_intrinsic_matrix(jpg_images, sensor_width_mm=6.17, fallback_focal_mm=4.0):
    img_path = os.path.join(jpg_images, "1.jpg")

    with open(img_path, 'rb') as f:
        tags = exifread.process_file(f)

    if "EXIF FocalLength" in tags:
        val = tags["EXIF FocalLength"].values[0]
        f_mm = float(val.num) / float(val.den)
    else:
        f_mm = fallback_focal_mm

    img = Image.open(img_path)
    w, h = img.size

    f_px = f_mm / sensor_width_mm * w
    cx, cy = w / 2, h / 2

    K = np.array([[f_px, 0, cx],
                  [0,    f_px, cy],
                  [0,       0,   1]])

    return K

import cv2

def compute_essential_matrices(all_pts1, all_pts2, K):
    essential_matrices = []
    masks = []

    for pts1, pts2 in zip(all_pts1, all_pts2):
        E, mask = cv2.findEssentialMat(
            pts1,
            pts2,
            K,
            method=cv2.RANSAC,
            prob=0.999,
            threshold=1.0
        )

        essential_matrices.append(E)
        masks.append(mask)

    return essential_matrices, masks

all_pts1, all_pts2 = convert_keypoints_to_arrays(keypoints, good_matches_list)
K = compute_intrinsic_matrix(jpg_images)
essential_matrices, masks = compute_essential_matrices(all_pts1, all_pts2, K)

Recovering Rotations R and Translations t

In [None]:
def recover_poses(all_pts1, all_pts2, masks, essential_matrices, K):
    rotations = []
    translations = []

    for i in range(len(essential_matrices)):
        pts1 = all_pts1[i]
        pts2 = all_pts2[i]
        E = essential_matrices[i]

        # extract inlier points from RANSAC mask
        pts1_in = pts1[masks[i].ravel() == 1]
        pts2_in = pts2[masks[i].ravel() == 1]

        # recover relative camera pose (R, t)
        _, R, t, mask_pose = cv2.recoverPose(E, pts1_in, pts2_in, K)

        rotations.append(R)
        translations.append(t)

    return rotations, translations

rotations, translations = recover_poses(all_pts1, all_pts2, masks, essential_matrices, K)

Cheirality Check and Point Cloud

In [None]:
def triangulate_cheiralitycheck_ply(all_pts1, all_pts2, masks, essential_matrices, K, output_ply="point_cloud.ply"):
    # Use the first image pair
    pts1 = all_pts1[0]
    pts2 = all_pts2[0]

    # extract RANSAC inliers
    pts1_in = pts1[masks[0].ravel() == 1]
    pts2_in = pts2[masks[0].ravel() == 1]

    # Essential matrix for this pair
    E = essential_matrices[0]

    # decompose into 4 possible [R|t] poses
    R1, R2, t = cv2.decomposeEssentialMat(E)
    t = t.reshape(3, 1)

    pose_candidates = [(R1, t), (R1, -t), (R2, t), (R2, -t)]

    # Projection matrix for first camera
    P1 = K @ np.hstack((np.eye(3), np.zeros((3, 1))))

    best_pose = None
    max_good_points = 0
    best_pts3D = None

    # evaluate each candidate pose
    for R, t in pose_candidates:
        P2 = K @ np.hstack((R, t))

        pts4D_h = cv2.triangulatePoints(P1, P2, pts1_in.T, pts2_in.T)
        pts3D = (pts4D_h[:3] / pts4D_h[3]).T

        good_count = 0

        for j in range(len(pts3D)):
            if pts3D[j, 2] > 0:  # in front of camera 1
                pt_cam2 = R @ pts3D[j] + t.ravel()
                if pt_cam2[2] > 0:  # in front of camera 2
                    good_count += 1

        if good_count > max_good_points:
            max_good_points = good_count
            best_pose = (R, t)
            best_pts3D = pts3D

    R, t = best_pose
    print(f"Best pose has {max_good_points}/{len(best_pts3D)} cheirality-valid points")

    # Filter valid points
    pts3D_filtered = []
    for j in range(len(best_pts3D)):
        if best_pts3D[j, 2] > 0:
            pt_cam2 = R @ best_pts3D[j] + t.ravel()
            if pt_cam2[2] > 0:
                pts3D_filtered.append(best_pts3D[j])

    pts3D_filtered = np.array(pts3D_filtered)

    # Save PLY file
    with open(output_ply, 'w') as f:
        f.write('ply\n')
        f.write('format ascii 1.0\n')
        f.write(f'element vertex {len(pts3D_filtered)}\n')
        f.write('property float x\n')
        f.write('property float y\n')
        f.write('property float z\n')
        f.write('end_header\n')
        for pt in pts3D_filtered:
            f.write(f'{pt[0]} {pt[1]} {pt[2]}\n')

    # visualize 3D point cloud
    fig = plt.figure(figsize=(10, 10))
    ax = fig.add_subplot(111, projection='3d')
    ax.scatter(
        pts3D_filtered[:, 0],
        pts3D_filtered[:, 1],
        pts3D_filtered[:, 2],
        s=1
    )
    ax.set_title("Reconstructed 3D Point Cloud (Two-View)")
    plt.show()

    return R, t, pts3D_filtered

R, t, pts3D_filtered = triangulate_cheiralitycheck_ply(all_pts1, all_pts2, masks, essential_matrices, K)

2D scatterplot of 3d points

In [None]:
#ignoring z axis and just focusing on the xy plane

plt.figure(figsize=(10, 8))
plt.scatter(pts3D_filtered[:, 0], pts3D_filtered[:, 1], s=1, alpha=0.6)
plt.xlabel('X')
plt.ylabel('Y')
plt.title('2d scatter plot of 3d point cloud')
plt.axis('equal')
plt.grid(True)
plt.show()

Incremental SFM

In [None]:
def find_best_initial_pair():
    best_pair_idx = 0
    max_inliers = np.sum(masks[0].ravel() == 1)

    for i in range(len(masks)):
        num_inliers = np.sum(masks[i].ravel() == 1)
        if num_inliers > max_inliers:
            max_inliers = num_inliers
            best_pair_idx = i

    print(f"✓ Using pair {best_pair_idx}-{best_pair_idx+1} with {max_inliers} inliers")
    return best_pair_idx

def initialize_two_view(pair_idx):

    pts1 = all_pts1[pair_idx]
    pts2 = all_pts2[pair_idx]
    E = essential_matrices[pair_idx]
    mask = masks[pair_idx]

    mask_essential = mask.ravel() == 1
    pts1_in = pts1[mask_essential]
    pts2_in = pts2[mask_essential]

    R1, R2, t = cv2.decomposeEssentialMat(E)
    t = t.reshape(3, 1)
    pose_candidates = [(R1, t), (R1, -t), (R2, t), (R2, -t)]

    P1 = K @ np.hstack((np.eye(3), np.zeros((3, 1))))

    best_pose = None
    max_good_points = 0
    best_pts3D = None
    best_mask = None

    for R, t in pose_candidates:
        P2 = K @ np.hstack((R, t))
        pts4D_h = cv2.triangulatePoints(P1, P2, pts1_in.T, pts2_in.T)
        pts3D = (pts4D_h[:3] / pts4D_h[3]).T

        z1 = pts3D[:, 2]
        z2 = (R @ pts3D.T + t).T[:, 2]
        mask_cheirality = (z1 > 0) & (z2 > 0) & (z1 < 50) & (z2 < 50)
        good_count = np.sum(mask_cheirality)

        if good_count > max_good_points:
            max_good_points = good_count
            best_pose = (R, t)
            best_pts3D = pts3D
            best_mask = mask_cheirality

    R, t = best_pose
    pts3D_filtered = best_pts3D[best_mask]

    essential_inlier_indices = np.where(mask_essential)[0]
    cheirality_inlier_indices = essential_inlier_indices[best_mask]

    keypoint_to_3d = defaultdict(dict)

    for i, match_idx in enumerate(cheirality_inlier_indices):
        match = good_matches_list[pair_idx][match_idx]
        kp1_idx = match.queryIdx
        kp2_idx = match.trainIdx

        keypoint_to_3d[pair_idx][kp1_idx] = i
        keypoint_to_3d[pair_idx + 1][kp2_idx] = i

    print(f"  Triangulated {len(pts3D_filtered)} points")
    print(f"  Camera {pair_idx}: {len(keypoint_to_3d[pair_idx])} tracked")
    print(f"  Camera {pair_idx + 1}: {len(keypoint_to_3d[pair_idx + 1])} tracked")

    return R, t, pts3D_filtered, keypoint_to_3d

def initialize_global_state(best_pair_idx, R_init, t_init, pts3D_init, keypoint_to_3d):
    camera_poses = {
        best_pair_idx: (np.eye(3), np.zeros((3, 1))),
        best_pair_idx + 1: (R_init, t_init)
    }

    points_3d = list(pts3D_init)
    point_observations = [
        [(best_pair_idx, kp_idx),
         (best_pair_idx + 1, keypoint_to_3d[best_pair_idx + 1].get(kp_idx, -1))]
        for kp_idx in keypoint_to_3d[best_pair_idx]
    ]

    print(f"\nInitialized: {len(points_3d)} points, cameras {best_pair_idx} and {best_pair_idx + 1}")

    return camera_poses, points_3d, point_observations

def add_next_camera(cam_idx, camera_poses, keypoint_to_3d, point_observations, points_3d):

    print(f"\n--- Adding Camera {cam_idx} ---")

    prev_cam = None
    matches = None
    forward = True

    if cam_idx - 1 in camera_poses:
        prev_cam = cam_idx - 1
        matches = good_matches_list[prev_cam]
        forward = True
    elif cam_idx + 1 in camera_poses:
        prev_cam = cam_idx + 1
        matches = good_matches_list[cam_idx]
        forward = False
    else:
        print("No adjacent camera registered!")
        return False

    print(f"  Adjacent camera: {prev_cam} ({'forward' if forward else 'backward'})")

    pts_2d = []
    pts_3d_indices = []

    if forward:
        for match in matches:
            kp_prev_idx = match.queryIdx
            kp_new_idx = match.trainIdx
            if kp_prev_idx in keypoint_to_3d[prev_cam]:
                pts_2d.append(keypoints[cam_idx][kp_new_idx].pt)
                pts_3d_indices.append((keypoint_to_3d[prev_cam][kp_prev_idx], kp_new_idx))
    else:
        for match in matches:
            kp_new_idx = match.queryIdx
            kp_prev_idx = match.trainIdx
            if kp_prev_idx in keypoint_to_3d[prev_cam]:
                pts_2d.append(keypoints[cam_idx][kp_new_idx].pt)
                pts_3d_indices.append((keypoint_to_3d[prev_cam][kp_prev_idx], kp_new_idx))

    print(f"  Found {len(pts_2d)} 2D-3D correspondences")

    if len(pts_2d) < 6:
        print(f"Not enough correspondences")
        return False

    pts_2d = np.array(pts_2d, dtype=np.float32)
    pts_3d_for_pnp = np.array([points_3d[i[0]] for i in pts_3d_indices], dtype=np.float32)

    success, rvec, tvec, inliers = cv2.solvePnPRansac(
        pts_3d_for_pnp, pts_2d, K, None,
        iterationsCount=2000,
        reprojectionError=4.0,
        confidence=0.999
    )

    if not success or inliers is None or len(inliers) < 6:
        print(f"PnP failed (inliers: {len(inliers) if inliers is not None else 0})")
        return False

    R_new, _ = cv2.Rodrigues(rvec)
    t_new = tvec.reshape(3, 1)
    camera_poses[cam_idx] = (R_new, t_new)

    for inlier_idx in inliers.ravel():
        pt3d_idx, kp_new_idx = pts_3d_indices[inlier_idx]
        keypoint_to_3d[cam_idx][kp_new_idx] = pt3d_idx
        point_observations[pt3d_idx].append((cam_idx, kp_new_idx))

    print(f"  ✓ Success: {len(inliers)} inliers, {len(keypoint_to_3d[cam_idx])} tracked")

    triangulate_with_camera(cam_idx, prev_cam, camera_poses, keypoint_to_3d, points_3d, point_observations)
    return True

def triangulate_with_camera(cam_idx, other_cam_idx, camera_poses, keypoint_to_3d, points_3d, point_observations):

    if cam_idx > other_cam_idx:
        matches = good_matches_list[other_cam_idx]
        cam1, cam2 = other_cam_idx, cam_idx
    else:
        matches = good_matches_list[cam_idx]
        cam1, cam2 = cam_idx, other_cam_idx

    R1, t1 = camera_poses[cam1]
    R2, t2 = camera_poses[cam2]

    P1 = K @ np.hstack((R1, t1))
    P2 = K @ np.hstack((R2, t2))

    new_pts_1 = []
    new_pts_2 = []
    new_info = []

    for match in matches:
        kp1_idx = match.queryIdx
        kp2_idx = match.trainIdx

        if kp1_idx not in keypoint_to_3d[cam1] and kp2_idx not in keypoint_to_3d[cam2]:
            new_pts_1.append(keypoints[cam1][kp1_idx].pt)
            new_pts_2.append(keypoints[cam2][kp2_idx].pt)
            new_info.append((kp1_idx, kp2_idx))

    if len(new_pts_1) == 0:
        return 0

    new_pts_1 = np.array(new_pts_1).T
    new_pts_2 = np.array(new_pts_2).T

    pts4D = cv2.triangulatePoints(P1, P2, new_pts_1, new_pts_2)
    pts3D_new = (pts4D[:3] / pts4D[3]).T

    count_added = 0
    for i, pt3d in enumerate(pts3D_new):
        z1 = (R1 @ pt3d.reshape(3, 1) + t1)[2, 0]
        z2 = (R2 @ pt3d.reshape(3, 1) + t2)[2, 0]

        if 0 < z1 < 50 and 0 < z2 < 50:
            idx = len(points_3d)
            points_3d.append(pt3d)

            kp1_idx, kp2_idx = new_info[i]
            keypoint_to_3d[cam1][kp1_idx] = idx
            keypoint_to_3d[cam2][kp2_idx] = idx
            point_observations.append([(cam1, kp1_idx), (cam2, kp2_idx)])
            count_added += 1

    if count_added > 0:
        print(f"  Triangulated {count_added} new points")

    return count_added

def run_full_reconstruction(best_pair_idx, camera_poses, points_3d, point_observations, keypoint_to_3d):

    print("\n" + "="*60)
    print("INCREMENTAL RECONSTRUCTION")
    print("="*60)

    num_images = len(preprocessed)

    cameras_forward = list(range(best_pair_idx + 2, num_images))
    cameras_backward = list(range(best_pair_idx - 1, -1, -1))

    print(f"Forward: {cameras_forward}")
    print(f"Backward: {cameras_backward}")

    for cam_idx in cameras_forward:
        add_next_camera(cam_idx, camera_poses, keypoint_to_3d, point_observations, points_3d)

    for cam_idx in cameras_backward:
        add_next_camera(cam_idx, camera_poses, keypoint_to_3d, point_observations, points_3d)

    print("\n" + "="*60)
    print("FINAL RESULTS")
    print("="*60)
    print(f"Cameras registered: {len(camera_poses)}/{num_images}")
    print(f"Total 3D points: {len(points_3d)}")
    print("="*60)

def visualize_and_save(points_3d, camera_poses):

    points_3d_array = np.array(points_3d)
    with open('point_cloud_multiview.ply', 'w') as f:
        f.write('ply\nformat ascii 1.0\n')
        f.write(f'element vertex {len(points_3d_array)}\n')
        f.write('property float x\nproperty float y\nproperty float z\n')
        f.write('end_header\n')
        for pt in points_3d_array:
            f.write(f'{pt[0]} {pt[1]} {pt[2]}\n')

    print("\n✓ Saved to 'point_cloud_multiview.ply'")

    fig = plt.figure(figsize=(15, 5))

    ax1 = fig.add_subplot(131, projection='3d')
    ax1.scatter(points_3d_array[:, 0], points_3d_array[:, 1], points_3d_array[:, 2],
                s=1, alpha=0.5)
    ax1.set_title(f"3D Points ({len(points_3d_array)})")

    ax2 = fig.add_subplot(132, projection='3d')
    cam_positions = []
    labels = []

    for cam_idx in sorted(camera_poses):
        R, t = camera_poses[cam_idx]
        C = -R.T @ t
        cam_positions.append(C.ravel())
        labels.append(cam_idx)

    cam_positions = np.array(cam_positions)
    ax2.plot(cam_positions[:, 0], cam_positions[:, 1], cam_positions[:, 2], 'o-')
    for i, lab in enumerate(labels):
        pos = cam_positions[i]
        ax2.text(pos[0], pos[1], pos[2], f'{lab}')

    ax3 = fig.add_subplot(133, projection='3d')
    ax3.scatter(points_3d_array[:, 0], points_3d_array[:, 1], points_3d_array[:, 2],
                s=0.5, alpha=0.3)
    ax3.plot(cam_positions[:, 0], cam_positions[:, 1], cam_positions[:, 2], 'o-')

    plt.tight_layout()
    plt.show()


best_pair_idx = find_best_initial_pair()
R_init, t_init, pts3D_init, keypoint_to_3d = initialize_two_view(best_pair_idx)
camera_poses, points_3d, point_observations = initialize_global_state(best_pair_idx, R_init, t_init, pts3D_init, keypoint_to_3d)

run_full_reconstruction(best_pair_idx, camera_poses, points_3d, point_observations, keypoint_to_3d)

visualize_and_save(points_3d, camera_poses)


Refinement using Bundling

In [None]:
def filter_outlier_points(points_3d, point_observations, camera_poses, keypoints, K, threshold=8.0):
    """
    Remove 3D points with high reprojection errors before bundle adjustment
    """
    print(f"\nFiltering outliers (threshold={threshold} pixels)...")

    good_points = []
    good_observations = []

    for pt_idx, pt3d in enumerate(points_3d):
        observations = point_observations[pt_idx]
        errors = []

        for cam_idx, kp_idx in observations:
            if cam_idx not in camera_poses:
                continue

            R, t = camera_poses[cam_idx]

            # Project 3D point
            pt3d_cam = R @ pt3d + t.flatten()
            if pt3d_cam[2] <= 0:
                continue

            pt2d_h = K @ pt3d_cam
            u_proj = pt2d_h[0] / pt2d_h[2]
            v_proj = pt2d_h[1] / pt2d_h[2]

            # Get observed keypoint
            u_obs, v_obs = keypoints[cam_idx][kp_idx].pt

            # Reprojection error
            error = np.sqrt((u_proj - u_obs)**2 + (v_proj - v_obs)**2)
            errors.append(error)

        if len(errors) > 0:
            mean_error = np.mean(errors)
            if mean_error < threshold:
                good_points.append(pt3d)
                good_observations.append(observations)

    print(f"  Kept {len(good_points)}/{len(points_3d)} points")
    return good_points, good_observations


def run_bundle_adjustment(camera_poses, points_3d, point_observations, K,
                          max_nfev=50, verbose=2, xtol=1e-8, ftol=1e-8):
    """
    Fixed bundle adjustment with conservative parameters
    """
    cam_ids = sorted(camera_poses.keys())
    num_cameras = len(cam_ids)
    num_points = len(points_3d)

    # Build observation arrays
    obs_cam_idx, obs_point_idx, obs_xy = build_observation_arrays(
        point_observations, keypoints, cam_ids
    )
    n_obs = len(obs_cam_idx)
    print(f"Observations: {n_obs}, cameras: {num_cameras}, points: {num_points}")

    # Pack initial parameters
    x0 = pack_parameters(camera_poses, points_3d, cam_ids)

    # Build jacobian sparsity pattern
    J = build_jacobian_sparsity(num_cameras, num_points, obs_cam_idx, obs_point_idx)

    def fun(params):
        return residuals_vectorized(
            params, K, num_cameras, num_points,
            obs_cam_idx, obs_point_idx, obs_xy
        )

    # FIXED: More conservative parameters
    res = least_squares(
        fun,
        x0,
        jac_sparsity=J,
        method='trf',
        verbose=verbose,
        max_nfev=max_nfev,  # Reduced iterations
        x_scale='jac',      # Better scaling
        loss='huber',       # Robust loss function
        f_scale=1.0,        # Huber loss threshold
        xtol=xtol,
        ftol=ftol,
    )

    # Unpack and update camera poses & points
    camera_params, point_params = unpack_parameters(res.x, num_cameras, num_points)

    for i, cam_idx in enumerate(cam_ids):
        rvec = camera_params[i][:3]
        t = camera_params[i][3:].reshape(3, 1)
        R, _ = cv2.Rodrigues(rvec)
        camera_poses[cam_idx] = (R, t)

    for i in range(num_points):
        points_3d[i] = point_params[i]

    print("✓ Bundle adjustment complete!")
    print(f"  Final cost: {res.cost:.4f}")
    print(f"  Optimality: {res.optimality:.2e}")

    return res

def skew_mat(v):
    """v: (...,3) -> (...,3,3) skew-symmetric matrices"""
    vx = v[..., 0]
    vy = v[..., 1]
    vz = v[..., 2]
    zero = np.zeros_like(vx)
    m = np.stack([
        zero, -vz,  vy,
         vz, zero, -vx,
        -vy,  vx, zero
    ], axis=-1)
    return m.reshape(v.shape[:-1] + (3, 3))

def rodrigues_batch(rvecs):
    """
    rvecs: (N,3) axis-angle vectors
    returns: R (N,3,3)
    """
    theta = np.linalg.norm(rvecs, axis=1)
    small = theta < 1e-8
    R = np.zeros((len(rvecs), 3, 3), dtype=float)

    # For non-small angles, use Rodrigues formula
    nonzero_idx = ~small
    if np.any(nonzero_idx):
        r_non = rvecs[nonzero_idx]
        theta_n = theta[nonzero_idx][:, None]
        k = r_non / theta_n  # unit axis
        K = skew_mat(k)      # (M,3,3)
        I = np.eye(3)[None, :, :]
        sin_t = np.sin(theta_n)[:, None]
        cos_t = np.cos(theta_n)[:, None]
        # R = I + sin(theta) * K + (1-cos(theta)) * K^2
        R_n = I + sin_t * K + (1 - cos_t) * (K @ K)
        R[nonzero_idx] = R_n

    # For very small angles, use first-order approximation: R ≈ I + K
    if np.any(small):
        idx = np.where(small)[0]
        K = skew_mat(rvecs[small])
        I = np.eye(3)[None, :, :]
        R_s = I + K
        R[small] = R_s

    return R  # (N,3,3)

def build_observation_arrays(point_observations, keypoints, cam_ids):

    cam_index = {cid: i for i, cid in enumerate(cam_ids)}
    obs_cam_idx = []
    obs_point_idx = []
    obs_xy = []

    for p_idx, obs in enumerate(point_observations):
        for cam_idx, kp_idx in obs:
            # skip missing kp (sometimes -1) just in case
            if kp_idx is None or kp_idx < 0:
                continue
            obs_cam_idx.append(cam_index[cam_idx])
            obs_point_idx.append(p_idx)
            obs_xy.append(keypoints[cam_idx][kp_idx].pt)

    obs_cam_idx = np.array(obs_cam_idx, dtype=np.int32)
    obs_point_idx = np.array(obs_point_idx, dtype=np.int32)
    obs_xy = np.array(obs_xy, dtype=np.float64)
    return obs_cam_idx, obs_point_idx, obs_xy

def pack_parameters(camera_poses, points_3d, cam_ids):
    cams = []
    for idx in cam_ids:
        R, t = camera_poses[idx]
        rvec, _ = cv2.Rodrigues(R)
        cams.append(np.hstack([rvec.flatten(), t.flatten()]))
    cams = np.hstack(cams)
    pts = np.hstack(points_3d).flatten()
    return np.hstack([cams, pts])

def unpack_parameters(params, num_cameras, num_points):
    camera_params = params[:num_cameras * 6].reshape((num_cameras, 6))
    point_params = params[num_cameras * 6:].reshape((num_points, 3))
    return camera_params, point_params

def residuals_vectorized(params, K, num_cameras, num_points,
                         obs_cam_idx, obs_point_idx, obs_xy):
    """
    params: vector of length 6*num_cameras + 3*num_points
    obs_*: arrays of length n_obs
    returns: residuals shape (2*n_obs,)
    """
    camera_params, point_params = unpack_parameters(params, num_cameras, num_points)
    # camera params
    rvecs = camera_params[:, :3]        # (C,3)
    tvecs = camera_params[:, 3:].reshape(num_cameras, 3)  # (C,3)

    # build rotation matrices
    Rmats = rodrigues_batch(rvecs)  # (C,3,3)

    # select per-observation camera R and t
    R_obs = Rmats[obs_cam_idx]      # (n_obs,3,3)
    t_obs = tvecs[obs_cam_idx]      # (n_obs,3)

    # select 3D points per observation
    P_obs = point_params[obs_point_idx]  # (n_obs,3)

    # transform points into camera frame: X_c = R * X + t
    Xc = np.einsum('nij,nj->ni', R_obs, P_obs) + t_obs  # (n_obs,3)

    # perspective divide + intrinsics (assumes fx=K[0,0], fy=K[1,1], cx, cy)
    X = Xc[:, 0]
    Y = Xc[:, 1]
    Z = Xc[:, 2]
    # avoid divide by zero
    Z_safe = np.where(Z == 0, 1e-8, Z)

    fx = K[0, 0]
    fy = K[1, 1]
    cx = K[0, 2]
    cy = K[1, 2]

    u = fx * (X / Z_safe) + cx
    v = fy * (Y / Z_safe) + cy

    proj = np.vstack((u, v)).T  # (n_obs,2)
    res = (proj - obs_xy).ravel()  # (2*n_obs,)
    return res

def build_jacobian_sparsity(num_cameras, num_points, obs_cam_idx, obs_point_idx):
    n_obs = len(obs_cam_idx)
    m = 2 * n_obs
    n = 6 * num_cameras + 3 * num_points
    J = lil_matrix((m, n), dtype=int)

    for i in range(n_obs):
        cam = obs_cam_idx[i]
        p = obs_point_idx[i]
        row_x = 2 * i
        row_y = 2 * i + 1
        # camera columns: 6 * cam  .. 6*cam+5
        cam_col = 6 * cam
        J[row_x, cam_col:cam_col + 6] = 1
        J[row_y, cam_col:cam_col + 6] = 1
        # point columns: 6*num_cameras + 3*p .. +2
        pt_col = 6 * num_cameras + 3 * p
        J[row_x, pt_col:pt_col + 3] = 1
        J[row_y, pt_col:pt_col + 3] = 1

    return J

def run_bundle_adjustment(camera_poses, points_3d, point_observations, K,
                               max_nfev=200, verbose=2, xtol=1e-10, ftol=1e-10):

    cam_ids = sorted(camera_poses.keys())
    num_cameras = len(cam_ids)
    num_points = len(points_3d)

    # Build observation arrays
    obs_cam_idx, obs_point_idx, obs_xy = build_observation_arrays(point_observations, keypoints, cam_ids)
    n_obs = len(obs_cam_idx)
    print(f"Observations: {n_obs}, cameras: {num_cameras}, points: {num_points}")

    # Pack initial parameters
    x0 = pack_parameters(camera_poses, points_3d, cam_ids)

    # Build jacobian sparsity pattern
    J = build_jacobian_sparsity(num_cameras, num_points, obs_cam_idx, obs_point_idx)

    # Closure wrapper for least_squares (to avoid passing many args repeatedly)
    def fun(params):
        return residuals_vectorized(params, K, num_cameras, num_points, obs_cam_idx, obs_point_idx, obs_xy)

    # Run solver (TRF uses sparse Jacobian)
    res = least_squares(
        fun,
        x0,
        jac_sparsity=J,
        method='trf',
        verbose=verbose,
        max_nfev=max_nfev,
        xtol=xtol,
        ftol=ftol,
        # xtol_abs=1e-16
    )

    # Unpack and update camera poses & points
    camera_params, point_params = unpack_parameters(res.x, num_cameras, num_points)

    for i, cam_idx in enumerate(cam_ids):
        rvec = camera_params[i][:3]
        t = camera_params[i][3:].reshape(3, 1)
        R, _ = cv2.Rodrigues(rvec)
        camera_poses[cam_idx] = (R, t)

    for i in range(num_points):
        points_3d[i] = point_params[i]

    print("✓ Bundle adjustment complete!")
    return res

In [None]:
# Filter outliers before BA
points_3d_filtered, point_observations_filtered = filter_outlier_points(
    points_3d, point_observations, camera_poses, keypoints, K, threshold=8.0
)

# Update with filtered data
points_3d[:] = points_3d_filtered
point_observations[:] = point_observations_filtered

# Run BA with conservative settings
res = run_bundle_adjustment(
    camera_poses,
    points_3d,
    point_observations,
    K,
    max_nfev=50,  # Reduced from 200
    verbose=2
)

In [None]:

def plot_reconstruction(camera_poses, points_3d):
    fig = plt.figure(figsize=(10, 8))
    ax = fig.add_subplot(111, projection="3d")
    ax.set_title("Bundle Adjustment Result (Cameras + 3D Points)")

    # Convert list of points to array
    pts = np.array(points_3d)

    # Plot 3D points
    ax.scatter(pts[:,0], pts[:,1], pts[:,2], s=1, c="blue", alpha=0.5)

    # Plot cameras
    for cam_id, (R, t) in camera_poses.items():
        C = -R.T @ t   # camera center in world coords
        C = C.reshape(3)

        # Plot camera center
        ax.scatter(C[0], C[1], C[2], c="red", s=40)

        # Plot camera orientation
        z_dir = R.T @ np.array([0, 0, 1]) * 0.2
        ax.plot(
            [C[0], C[0] + z_dir[0]],
            [C[1], C[1] + z_dir[1]],
            [C[2], C[2] + z_dir[2]],
            color="red"
        )

        ax.text(C[0], C[1], C[2], f"Cam {cam_id}", color="red")

    ax.set_xlabel("X")
    ax.set_ylabel("Y")
    ax.set_zlabel("Z")
    ax.view_init(elev=15, azim=70)
    plt.tight_layout()
    plt.show()

plot_reconstruction(camera_poses, points_3d)

Exporting data

In [None]:
def export_cameras_to_json(camera_poses, keypoints, image_folder, output_file="cameras.json"):

    cameras_data = {
        "cameras": []
    }

    for cam_idx in sorted(camera_poses.keys()):
        R, t = camera_poses[cam_idx]

        # Camera center in world coordinates: C = -R^T * t
        C = -R.T @ t
        C = C.flatten()

        # Image filename
        img_filename = f"{cam_idx + 1}.jpg"

        camera_info = {
            "id": int(cam_idx),
            "image": img_filename,
            "rotation": R.tolist(),
            "translation": t.flatten().tolist(),
            "center": C.tolist(),
            "num_keypoints": len(keypoints[cam_idx])
        }

        cameras_data["cameras"].append(camera_info)

    with open(output_file, 'w') as f:
        json.dump(cameras_data, f, indent=2)

    print(f"✓ Exported {len(cameras_data['cameras'])} cameras to {output_file}")
    return cameras_data

def create_view_graph(camera_poses, point_observations, threshold=10):
    cam_ids = sorted(camera_poses.keys())
    view_graph = {cam_id: [] for cam_id in cam_ids}

    # Count shared points between camera pairs
    shared_points = {}

    for obs in point_observations:
        cams_seeing_point = [cam_id for cam_id, kp_id in obs]

        # Create edges between all cameras seeing this point
        for i, cam1 in enumerate(cams_seeing_point):
            for cam2 in cams_seeing_point[i+1:]:
                if cam1 in cam_ids and cam2 in cam_ids:
                    pair = tuple(sorted([cam1, cam2]))
                    shared_points[pair] = shared_points.get(pair, 0) + 1

    # Build adjacency list
    for (cam1, cam2), count in shared_points.items():
        if count >= threshold:
            if cam2 not in view_graph[cam1]:
                view_graph[cam1].append(cam2)
            if cam1 not in view_graph[cam2]:
                view_graph[cam2].append(cam1)

    # Export view graph
    view_graph_data = {
        "nodes": [{"id": cam_id} for cam_id in cam_ids],
        "edges": []
    }

    for cam1, neighbors in view_graph.items():
        for cam2 in neighbors:
            if cam1 < cam2:  # Avoid duplicates
                view_graph_data["edges"].append({
                    "source": int(cam1),
                    "target": int(cam2),
                    "weight": int(shared_points.get(tuple(sorted([cam1, cam2])), 0))
                })

    with open("view_graph.json", 'w') as f:
        json.dump(view_graph_data, f, indent=2)

    print(f"✓ Created view graph with {len(view_graph_data['edges'])} edges")
    return view_graph

In [None]:
export_cameras_to_json(camera_poses, keypoints, jpg_images, "cameras.json")
view_graph = create_view_graph(camera_poses, point_observations)

In [None]:
def extract_point_colors(points_3d, point_observations, camera_poses, keypoints, image_folder, K):

    print("\n" + "="*60)
    print("EXTRACTING POINT COLORS FROM IMAGES")
    print("="*60)

    colors = []

    for pt_idx, pt3d in enumerate(points_3d):
        observations = point_observations[pt_idx]
        point_colors = []

        # Sample colors from multiple views for robustness
        for cam_idx, kp_idx in observations[:3]:  # Use up to 3 views
            if cam_idx not in camera_poses:
                continue

            R, t = camera_poses[cam_idx]

            # Project 3D point to image
            pt3d_cam = R @ pt3d + t.flatten()

            # Skip if behind camera
            if pt3d_cam[2] <= 0:
                continue

            # Project to pixel coordinates
            pt2d_h = K @ pt3d_cam
            u = int(pt2d_h[0] / pt2d_h[2])
            v = int(pt2d_h[1] / pt2d_h[2])

            # Load image (cache this in practice)
            img_path = os.path.join(image_folder, f"{cam_idx + 1}.jpg")
            img = cv2.imread(img_path)

            if img is None:
                continue

            h, w = img.shape[:2]

            # Check bounds
            if 0 <= u < w and 0 <= v < h:
                # Get color (OpenCV uses BGR, convert to RGB)
                color_bgr = img[v, u]
                color_rgb = color_bgr[[2, 1, 0]]  # BGR to RGB
                point_colors.append(color_rgb)

        # Average colors from multiple views
        if len(point_colors) > 0:
            avg_color = np.mean(point_colors, axis=0).astype(np.uint8)
            colors.append(avg_color)
        else:
            # Default gray color if no valid projection
            colors.append(np.array([128, 128, 128], dtype=np.uint8))

        if (pt_idx + 1) % 1000 == 0:
            print(f"  Processed {pt_idx + 1}/{len(points_3d)} points...")

    colors = np.array(colors)
    print(f"✓ Extracted colors for {len(colors)} points")
    return colors

def export_colored_ply(points_3d, colors, output_file="output/reconstruction_colored.ply"):

    points = np.array(points_3d)
    colors = np.array(colors, dtype=np.uint8)

    assert len(points) == len(colors), "Points and colors must have same length"

    with open(output_file, 'w') as f:
        # Header
        f.write('ply\n')
        f.write('format ascii 1.0\n')
        f.write(f'element vertex {len(points)}\n')
        f.write('property float x\n')
        f.write('property float y\n')
        f.write('property float z\n')
        f.write('property uchar red\n')
        f.write('property uchar green\n')
        f.write('property uchar blue\n')
        f.write('end_header\n')

        # Data
        for pt, color in zip(points, colors):
            f.write(f'{pt[0]} {pt[1]} {pt[2]} {color[0]} {color[1]} {color[2]}\n')

    print(f"✓ Saved colored point cloud to {output_file}")

def add_colors_to_reconstruction(camera_poses, points_3d, point_observations,
                                 keypoints, image_folder, K,
                                 output_file="reconstruction_colored.ply"):
    # Extract colors
    colors = extract_point_colors(
        points_3d,
        point_observations,
        camera_poses,
        keypoints,
        image_folder,
        K
    )

    # Export colored PLY
    export_colored_ply(points_3d, colors, output_file)

    print("\n" + "="*60)
    print("✓ COLORED POINT CLOUD READY!")
    print(f"  Use {output_file} in your viewer")
    print("="*60)

    return colors

In [None]:
# Add colors to your reconstruction
colors = add_colors_to_reconstruction(
    camera_poses,
    points_3d,
    point_observations,
    keypoints,
    jpg_images,
    K,
    "wall_A.ply"
)

Running the whole pipeline for second wall

In [None]:
jpg_images = "/content/drive/MyDrive/cv/wall2"
display_images(12)
preprocessed = preprocess_images(jpg_images, 12)
sift = cv2.SIFT_create()
keypoints, descriptors = extract_features(preprocessed)
good_matches_list = match_consecutive_images(descriptors)
all_pts1, all_pts2 = convert_keypoints_to_arrays(keypoints, good_matches_list)
K = compute_intrinsic_matrix(jpg_images)
essential_matrices, masks = compute_essential_matrices(all_pts1, all_pts2, K)
rotations, translations = recover_poses(all_pts1, all_pts2, masks, essential_matrices, K)
R, t, pts3D_filtered = triangulate_cheiralitycheck_ply(all_pts1, all_pts2, masks, essential_matrices, K)
best_pair_idx = find_best_initial_pair()
R_init, t_init, pts3D_init, keypoint_to_3d = initialize_two_view(best_pair_idx)
camera_poses, points_3d, point_observations = initialize_global_state(best_pair_idx, R_init, t_init, pts3D_init, keypoint_to_3d)
run_full_reconstruction(best_pair_idx, camera_poses, points_3d, point_observations, keypoint_to_3d)
visualize_and_save(points_3d, camera_poses)

# Filter outliers before BA
points_3d_filtered, point_observations_filtered = filter_outlier_points(
    points_3d, point_observations, camera_poses, keypoints, K, threshold=8.0
)

# Update with filtered data
points_3d[:] = points_3d_filtered
point_observations[:] = point_observations_filtered

# Run BA with conservative settings
res = run_bundle_adjustment(
    camera_poses,
    points_3d,
    point_observations,
    K,
    max_nfev=50,  # Reduced from 200
    verbose=2
)
plot_reconstruction(camera_poses, points_3d)
export_cameras_to_json(camera_poses, keypoints, jpg_images, "cameras_B.json")
view_graph = create_view_graph(camera_poses, point_observations)
colors = add_colors_to_reconstruction(
    camera_poses,
    points_3d,
    point_observations,
    keypoints,
    jpg_images,
    K,
    "wall_B.ply"
)

Merging point clouds and combining cameras

In [None]:
def merge_and_save():
    pcd_A = o3d.io.read_point_cloud("wall_A.ply")
    pcd_B = o3d.io.read_point_cloud("wall_B.ply")

    # Scaling
    bbox_A = pcd_A.get_axis_aligned_bounding_box()
    bbox_B = pcd_B.get_axis_aligned_bounding_box()
    height_A = bbox_A.get_max_bound()[2] - bbox_A.get_min_bound()[2]
    height_B = bbox_B.get_max_bound()[2] - bbox_B.get_min_bound()[2]

    scale_factor = height_A / height_B
    pcd_B.scale(scale_factor, center=bbox_B.get_center())

    rx = np.radians(5)
    Rx = np.array([
        [1, 0, 0],
        [0, np.cos(rx), -np.sin(rx)],
        [0, np.sin(rx),  np.cos(rx)]
    ])

    rz = np.radians(-5)
    Rz = np.array([
        [np.cos(rz), -np.sin(rz), 0],
        [np.sin(rz),  np.cos(rz), 0],
        [0,           0,          1]
    ])

    ry = np.radians(90)
    Ry = np.array([
        [np.cos(ry), 0, np.sin(ry)],
        [0,          1, 0         ],
        [-np.sin(ry),0, np.cos(ry)]
    ])

    # Combined rotation: Y then X then Z
    R = Ry @ Rx @ Rz

    t = np.array([10.0, 1.0, -12.0])

    T = np.eye(4)
    T[:3, :3] = R
    T[:3, 3] = t

    # Apply and save
    pcd_B.transform(T)
    combined = pcd_A + pcd_B

    o3d.io.write_point_cloud("merged_room.ply", combined, write_ascii=True)
    np.save("transformation.npy", T)

    return T, scale_factor


def transform_camera_pose(R_old, t_old, T):
    R_transform = T[:3, :3]
    t_transform = T[:3, 3].reshape(3, 1)

    # Convert translation to camera center
    t_old = np.array(t_old).reshape(3, 1)
    C_old = -R_old.T @ t_old

    # Apply transformation
    R_new = R_transform @ R_old
    C_new = R_transform @ C_old + t_transform

    t_new = -R_new @ C_new
    return R_new, t_new.flatten()


def transform_cameras(T, scale_factor):

    with open("cameras_B.json", "r") as f:
        cameras = json.load(f)

    for cam in cameras["cameras"]:
        R_old = np.array(cam["rotation"])
        t_old = np.array(cam["translation"])

        # First scale the translation
        t_old_scaled = t_old * scale_factor

        # Then apply transformation
        R_new, t_new = transform_camera_pose(R_old, t_old_scaled, T)

        cam["rotation"] = R_new.tolist()
        cam["translation"] = t_new.tolist()

        # Update center as well
        C_new = -R_new.T @ t_new.reshape(3, 1)
        cam["center"] = C_new.flatten().tolist()

    with open("cameras_wall_B_transformed.json", "w") as f:
        json.dump(cameras, f, indent=2)


def combine_cameras():

    with open("cameras.json", "r") as f:
        cameras_A = json.load(f)

    with open("cameras_wall_B_transformed.json", "r") as f:
        cameras_B = json.load(f)

    all_cameras = {
        "cameras": cameras_A["cameras"] + cameras_B["cameras"]
    }

    with open("all_cameras.json", "w") as f:
        json.dump(all_cameras, f, indent=2)


# Main execution
print("Step 1: Merging point clouds...")
T, scale_factor = merge_and_save()

print("\nStep 2: Transforming cameras...")
transform_cameras(T, scale_factor)

print("\nStep 3: Combining all cameras...")
combine_cameras()

print("\n✓ Complete! Use merged_room.ply and all_cameras.json")