In [92]:
import numpy as np
import matplotlib.pyplot as plt
import matplotlib.animation as animation
import plotly.graph_objects as go
import cv2
import os
import pyntcloud
import pandas as pd
from scipy.optimize import least_squares
from scipy.sparse import lil_matrix

In [93]:
def load_images_from_folder(folder):
    images = []
    for filename in os.listdir(folder):
        img = cv2.imread(os.path.join(folder, filename))
        if img is not None:
            images.append(img)
    return images

In [94]:
def feature_extraction_set(images):
    sift = cv2.SIFT_create()

    kp, des = [], []
    for im in images:
        kp_tmp, des_tmp = sift.detectAndCompute(im, None) # This assumes the extraction method to be from the CV2 library
        kp.append(kp_tmp)
        des.append(des_tmp)
    return kp, des # Can't turn them into a np array since their shape can be inhomogeneous

In [95]:
def feature_matching_set(kp, des):
    # Initialize FLANN matching
    FLANN_INDEX_KDTREE = 0
    index_params = dict(algorithm = FLANN_INDEX_KDTREE, trees = 5)
    search_params = dict(checks=50)
    flann = cv2.FlannBasedMatcher(index_params, search_params)

    matches = {} # Dict for easier access to each match
    for i in range(len(kp)):
        for j in range(i+1, len(kp)): # Only match each image with the rest, we don't need the full matrix
            matches_tmp = flann.knnMatch(des[i], des[j], k=2)

            # Lowe's ratio test
            good_matches = [m for m, n in matches_tmp if m.distance < 0.7 * n.distance]

            if len(good_matches) >= 4:
                # RANSAC to find homography and get inlier's mask
                pts1 = np.float32([kp[i][m.queryIdx].pt for m in good_matches]).reshape(-1, 1, 2)
                pts2 = np.float32([kp[j][m.trainIdx].pt for m in good_matches]).reshape(-1, 1, 2)
                _, mask = cv2.findHomography(pts1, pts2, cv2.RANSAC, 5.0)

                inliers = [good_matches[k] for k in range(len(good_matches)) if mask[k]==1]
                matches[(i,j)] = inliers
            else:
                matches[(i,j)] = []
    
    return matches

In [96]:
def find_matches(matcher, keypoints, descriptors, lowes_ratio=0.7):
    matches = []
    n_imgs = len(keypoints)
    for i in range(n_imgs):
        matches.append([])
        for j in range(n_imgs):
            if j <= i: matches[i].append(None)
            else:
                match = []
                m = matcher.knnMatch(descriptors[i], descriptors[j], k=2)
                for k in range(len(m)):
                    try:
                        if m[k][0].distance < lowes_ratio*m[k][1].distance:
                            match.append(m[k][0])
                    except:
                        continue
                matches[i].append(match)
    return matches


def remove_outliers(matches, keypoints):
    for i in range(len(matches)):
        for j in range(len(matches[i])):
            if j <= i: continue
            if len(matches[i][j]) < 20:
                matches[i][j] = []
                continue
            kpts_i = []
            kpts_j = []
            for k in range(len(matches[i][j])):
                kpts_i.append(keypoints[i][matches[i][j][k].queryIdx].pt)
                kpts_j.append(keypoints[j][matches[i][j][k].trainIdx].pt)
            kpts_i = np.int32(kpts_i)
            kpts_j = np.int32(kpts_j)
            F, mask = cv2.findFundamentalMat(kpts_i, kpts_j, cv2.FM_RANSAC, ransacReprojThreshold=3)
            if np.linalg.det(F) > 1e-7: raise ValueError(f"Bad F_mat between images: {i}, {j}. Determinant: {np.linalg.det(F)}")
            matches[i][j] = np.array(matches[i][j])
            if mask is None:
                matches[i][j] = []
                continue
            matches[i][j] = matches[i][j][mask.ravel() == 1]
            matches[i][j] = list(matches[i][j])

            if len(matches[i][j]) < 20:
                matches[i][j] = []
                continue

    return matches

In [97]:
def adjacency_matrix(num_imgs, matches):
    num_img_pairs = 0
    num_pairs = 0
    pairs = []
    img_adjacency = np.zeros((num_imgs, num_imgs))
    for i in range(len(matches)):
        for j in range(len(matches[i])):
            if j <= i: continue
            num_pairs += 1
            if len(matches[i][j]) > 0:
                num_img_pairs += 1
                pairs.append((i,j))
                img_adjacency[i][j] = 1

    list_of_img_pairs = pairs
    return img_adjacency, list_of_img_pairs

In [98]:
import numpy as np
import random
import cv2

class Point3D_with_views:
    def __init__(self, point3d, source_2dpt_idxs):
        self.point3d = point3d
        self.source_2dpt_idxs = source_2dpt_idxs

def best_img_pair(img_adjacency, matches, keypoints, K, top_x_perc=0.2):
    num_matches = []

    for i in range(img_adjacency.shape[0]):
        for j in range(img_adjacency.shape[1]):
            if img_adjacency[i][j] == 1:
                num_matches.append(len(matches[i][j]))

    num_matches = sorted(num_matches, reverse=True)
    min_match_idx = int(len(num_matches)*top_x_perc)
    min_matches = num_matches[min_match_idx]
    best_R = 0
    best_pair = None

    for i in range(img_adjacency.shape[0]):
        for j in range(img_adjacency.shape[1]):
            if img_adjacency[i][j] == 1:
                if len(matches[i][j]) > min_matches:
                    kpts_i, kpts_j, _, _ = get_aligned_kpts(i, j, keypoints, matches)
                    E, _ = cv2.findEssentialMat(kpts_i, kpts_j, K, cv2.FM_RANSAC, 0.999, 1.0)
                    points, R1, _, _ = cv2.recoverPose(E, kpts_i, kpts_j, K)
                    rvec, _ = cv2.Rodrigues(R1)
                    rot_angle = abs(rvec[0]) +abs(rvec[1]) + abs(rvec[2])# sum rotation angles for each dimension
                    if (rot_angle > best_R or best_pair == None) and points == len(kpts_i): #Ensure recoverPose worked.
                        best_R = rot_angle
                        best_pair = (i,j)
    return best_pair

def get_aligned_kpts(i, j, keypoints, matches, mask=None):
    if mask is None:
        mask = np.ones(len(matches[i][j])) #if no mask is given, all matches used. This is helpful if we only want to triangulate certain matches.

    kpts_i, kpts_i_idxs, kpts_j, kpts_j_idxs = [], [], [], []
    for k in range(len(matches[i][j])):
        if mask[k] == 0: continue
        kpts_i.append(keypoints[i][matches[i][j][k].queryIdx].pt)
        kpts_i_idxs.append(matches[i][j][k].queryIdx)
        kpts_j.append(keypoints[j][matches[i][j][k].trainIdx].pt)
        kpts_j_idxs.append(matches[i][j][k].trainIdx)
    kpts_i = np.array(kpts_i)
    kpts_j = np.array(kpts_j)
    kpts_i = np.expand_dims(kpts_i, axis=1) #this seems to be required for cv2.undistortPoints and cv2.trangulatePoints to work
    kpts_j = np.expand_dims(kpts_j, axis=1)

    return kpts_i, kpts_j, kpts_i_idxs, kpts_j_idxs

def triangulate_points_and_reproject(R_l, t_l, R_r, t_r, K, points3d_with_views, img_idx1, img_idx2, kpts_i, kpts_j, kpts_i_idxs, kpts_j_idxs, reproject=True):
    print(f"Triangulating: {len(kpts_i)} points.")
    P_l = np.dot(K, np.hstack((R_l, t_l)))
    P_r = np.dot(K, np.hstack((R_r, t_r)))

    kpts_i = np.squeeze(kpts_i)
    kpts_i = kpts_i.transpose()
    kpts_i = kpts_i.reshape(2,-1)
    kpts_j = np.squeeze(kpts_j)
    kpts_j = kpts_j.transpose()
    kpts_j = kpts_j.reshape(2,-1)

    point_4d_hom = cv2.triangulatePoints(P_l, P_r, kpts_i, kpts_j)
    points_3D = cv2.convertPointsFromHomogeneous(point_4d_hom.transpose())
    for i in range(kpts_i.shape[1]):
        source_2dpt_idxs = {img_idx1:kpts_i_idxs[i], img_idx2:kpts_j_idxs[i]}
        pt = Point3D_with_views(points_3D[i], source_2dpt_idxs)
        points3d_with_views.append(pt)

    if reproject:
        kpts_i = kpts_i.transpose()
        kpts_j = kpts_j.transpose()
        rvec_l, _ = cv2.Rodrigues(R_l)
        rvec_r, _ = cv2.Rodrigues(R_r)
        projPoints_l, _ = cv2.projectPoints(points_3D, rvec_l, t_l, K, distCoeffs=np.array([]))
        projPoints_r, _ = cv2.projectPoints(points_3D, rvec_r, t_r, K, distCoeffs=np.array([]))
        delta_l , delta_r = [], []
        for i in range(len(projPoints_l)):
            delta_l.append(abs(projPoints_l[i][0][0] - kpts_i[i][0]))
            delta_l.append(abs(projPoints_l[i][0][1] - kpts_i[i][1]))
            delta_r.append(abs(projPoints_r[i][0][0] - kpts_j[i][0]))
            delta_r.append(abs(projPoints_r[i][0][1] - kpts_j[i][1]))
        avg_error_l = sum(delta_l)/len(delta_l)
        avg_error_r = sum(delta_r)/len(delta_r)
        errors = list(zip(delta_l, delta_r))
        return points3d_with_views, errors, avg_error_l, avg_error_r

    return points3d_with_views

def initialize_reconstruction(keypoints, matches, K, img_idx1, img_idx2):
    kpts_i, kpts_j, kpts_i_idxs, kpts_j_idxs = get_aligned_kpts(img_idx1, img_idx2, keypoints, matches)
    E, _ = cv2.findEssentialMat(kpts_i, kpts_j, K, cv2.FM_RANSAC, 0.999, 1.0)
    points, R1, t1, mask = cv2.recoverPose(E, kpts_i, kpts_j, K)
    assert abs(np.linalg.det(R1)) - 1 < 1e-7

    R0 = np.eye(3, 3)
    t0 = np.zeros((3, 1))

    points3d_with_views = []
    points3d_with_views = triangulate_points_and_reproject(
        R0, t0, R1, t1, K, points3d_with_views, img_idx1, img_idx2, kpts_i, kpts_j, kpts_i_idxs, kpts_j_idxs, reproject=False)

    return R0, t0, R1, t1, points3d_with_views

def get_idxs_in_correct_order(idx1, idx2):
    if idx1 < idx2: return idx1, idx2
    else: return idx2, idx1

def next_img_pair_to_grow_reconstruction(n_imgs, init_pair, resected_imgs, unresected_imgs, img_adjacency):
    if len(unresected_imgs) == 0: raise ValueError('Should not check next image to resect if all have been resected already!')
    straddle = False
    if init_pair[1] - init_pair[0] > n_imgs/2 : straddle = True #initial pair straddles "end" of the circle (ie if init pair is idxs (0, 49) for 50 images)

    init_arc = init_pair[1] - init_pair[0] + 1 # Number of images between and including initial pair

    #fill in images between initial pair
    if len(resected_imgs) < init_arc:
        if straddle == False: idx = resected_imgs[-2] + 1
        else: idx = resected_imgs[-1] + 1
        while True:
            if idx not in resected_imgs:
                prepend = True
                unresected_idx = idx
                resected_idx = random.choice(resected_imgs)
                return resected_idx, unresected_idx, prepend
            idx = idx + 1 % n_imgs

    extensions = len(resected_imgs) - init_arc # How many images have been resected after the initial arc
    if straddle == True: #smaller init_idx should be increased and larger decreased
        if extensions % 2 == 0:
            unresected_idx = (init_pair[0] + int(extensions/2) + 1) % n_imgs
            resected_idx = (unresected_idx - 1) % n_imgs
        else:
            unresected_idx = (init_pair[1] - int(extensions/2) - 1) % n_imgs
            resected_idx = (unresected_idx + 1) % n_imgs
    else:
        if extensions % 2 == 0:
            unresected_idx = (init_pair[1] + int(extensions/2) + 1) % n_imgs
            resected_idx = (unresected_idx - 1) % n_imgs
        else:
            unresected_idx = (init_pair[0] - int(extensions/2) - 1) % n_imgs
            resected_idx = (unresected_idx + 1) % n_imgs

    prepend = False
    return resected_idx, unresected_idx, prepend

def check_and_get_unresected_point(resected_kpt_idx, match, resected_idx, unresected_idx):
    if resected_idx < unresected_idx:
        if resected_kpt_idx == match.queryIdx:
            unresected_kpt_idx = match.trainIdx
            success = True
            return unresected_kpt_idx, success
        else:
            return None, False
    elif unresected_idx < resected_idx:
        if resected_kpt_idx == match.trainIdx:
            unresected_kpt_idx = match.queryIdx
            success = True
            return unresected_kpt_idx, success
        else:
            return None, False

def get_correspondences_for_pnp(resected_idx, unresected_idx, pts3d, matches, keypoints):
    idx1, idx2 = get_idxs_in_correct_order(resected_idx, unresected_idx)
    triangulation_status = np.ones(len(matches[idx1][idx2])) # if triangulation_status[x] = 1, then matches[x] used for triangulation
    pts3d_for_pnp = []
    pts2d_for_pnp = []
    for pt3d in pts3d:
        if resected_idx not in pt3d.source_2dpt_idxs: continue
        resected_kpt_idx = pt3d.source_2dpt_idxs[resected_idx]
        for k in range(len(matches[idx1][idx2])):
            unresected_kpt_idx, success = check_and_get_unresected_point(resected_kpt_idx, matches[idx1][idx2][k], resected_idx, unresected_idx)
            if not success: continue
            pt3d.source_2dpt_idxs[unresected_idx] = unresected_kpt_idx #Add new 2d/3d correspondences to 3D point object
            pts3d_for_pnp.append(pt3d.point3d)
            pts2d_for_pnp.append(keypoints[unresected_idx][unresected_kpt_idx].pt)
            triangulation_status[k] = 0

    return pts3d, pts3d_for_pnp, pts2d_for_pnp, triangulation_status

def do_pnp(pts3d_for_pnp, pts2d_for_pnp, K, iterations=200, reprojThresh=5):
    list_pts3d_for_pnp = pts3d_for_pnp
    list_pts2d_for_pnp = pts2d_for_pnp
    pts3d_for_pnp = np.squeeze(np.array(pts3d_for_pnp))
    pts2d_for_pnp = np.expand_dims(np.squeeze(np.array(pts2d_for_pnp)), axis=1)
    num_pts = len(pts3d_for_pnp)

    highest_inliers = 0
    for i in range(iterations):
        pt_idxs = np.random.choice(num_pts, 6, replace=False)
        pts3 = np.array([pts3d_for_pnp[pt_idxs[i]] for i in range(len(pt_idxs))])
        pts2 = np.array([pts2d_for_pnp[pt_idxs[i]] for i in range(len(pt_idxs))])
        _, rvec, tvec = cv2.solvePnP(pts3, pts2, K, distCoeffs=np.array([]), flags=cv2.SOLVEPNP_ITERATIVE)
        R, _ = cv2.Rodrigues(rvec)
        _, _, _, perc_inliers = test_reproj_pnp_points(list_pts3d_for_pnp, list_pts2d_for_pnp, R, tvec, K, rep_thresh=reprojThresh)
        if highest_inliers < perc_inliers:
            highest_inliers = perc_inliers
            best_R = R
            best_tvec = tvec
    R = best_R
    tvec = best_tvec
    print('rvec:', rvec,'\n\ntvec:', tvec)

    return R, tvec

def prep_for_reproj(img_idx, points3d_with_views, keypoints):
    points_3d = []
    points_2d = []
    pt3d_idxs = []
    i = 0
    for pt3d in points3d_with_views:
        if img_idx in pt3d.source_2dpt_idxs.keys():
            pt3d_idxs.append(i)
            points_3d.append(pt3d.point3d)
            kpt_idx = pt3d.source_2dpt_idxs[img_idx]
            points_2d.append(keypoints[img_idx][kpt_idx].pt)
        i += 1

    return np.array(points_3d), np.array(points_2d), pt3d_idxs

def calculate_reproj_errors(projPoints, points_2d):
    assert len(projPoints) == len(points_2d)
    delta = []
    for i in range(len(projPoints)):
        delta.append(abs(projPoints[i] - points_2d[i]))

    average_delta = sum(delta)/len(delta) # 2-vector, average error for x and y coord
    average_delta = (average_delta[0] + average_delta[1])/2 # average error overall

    return average_delta, delta

def get_reproj_errors(img_idx, points3d_with_views, R, t, K, keypoints, distCoeffs=np.array([])):
    points_3d, points_2d, pt3d_idxs = prep_for_reproj(img_idx, points3d_with_views, keypoints)
    rvec, _ = cv2.Rodrigues(R)
    projPoints, _ = cv2.projectPoints(points_3d, rvec, t, K, distCoeffs=distCoeffs)
    projPoints = np.squeeze(projPoints)
    avg_error, errors = calculate_reproj_errors(projPoints, points_2d)

    return points_3d, points_2d, avg_error, errors

def test_reproj_pnp_points(pts3d_for_pnp, pts2d_for_pnp, R_new, t_new, K, rep_thresh=5):
    errors = []
    projpts = []
    inliers = []
    for i in range(len(pts3d_for_pnp)):
        Xw = pts3d_for_pnp[i][0]
        Xr = np.dot(R_new, Xw).reshape(3,1)
        Xc = Xr + t_new
        x = np.dot(K, Xc)
        x /= x[2]
        errors.append([np.float64(x[0] - pts2d_for_pnp[i][0]), np.float64(x[1] - pts2d_for_pnp[i][1])])
        projpts.append(x)
        if abs(errors[-1][0]) > rep_thresh or abs(errors[-1][1]) > rep_thresh: inliers.append(0)
        else: inliers.append(1)
    a = 0
    for e in errors:
        a = a + abs(e[0]) + abs(e[1])
    avg_err = a/(2*len(errors))
    perc_inliers = sum(inliers)/len(inliers)

    return errors, projpts, avg_err, perc_inliers

In [99]:
def bundle_adjustment_sparsity(n_cameras, n_points, camera_indices, point_indices):
    m = camera_indices.size * 2
    n = n_cameras * 12 + n_points * 3
    A = lil_matrix((m, n), dtype=int)

    i = np.arange(camera_indices.size)
    for s in range(12):
        A[2 * i, camera_indices * 12 + s] = 1
        A[2 * i + 1, camera_indices * 12 + s] = 1

    for s in range(3):
        A[2 * i, n_cameras * 12 + point_indices * 3 + s] = 1
        A[2 * i + 1, n_cameras * 12 + point_indices * 3 + s] = 1

    return A

def project(points, camera_params, K):
    points_proj = []

    for idx in range(len(camera_params)): # idx applies to both points and cam_params, they are = length vectors
        R = camera_params[idx][:9].reshape(3,3)
        rvec, _ = cv2.Rodrigues(R)
        t = camera_params[idx][9:]
        pt = points[idx]
        pt = np.expand_dims(pt, axis=0)
        pt, _ = cv2.projectPoints(pt, rvec, t, K, distCoeffs=np.array([]))
        pt = np.squeeze(np.array(pt))
        points_proj.append(pt)

    return points_proj

def fun(params, n_cameras, n_points, camera_indices, point_indices, points_2d, K):
    camera_params = params[:n_cameras * 12].reshape((n_cameras, 12))
    points_3d = params[n_cameras * 12:].reshape((n_points, 3))
    points_proj = project(points_3d[point_indices], camera_params[camera_indices], K)
    return (points_proj - points_2d).ravel()

def do_BA(points3d_with_views, R_mats, t_vecs, resected_imgs, keypoints, K, ftol):
    point_indices = []
    points_2d = []
    camera_indices = []
    points_3d = []
    camera_params = []
    BA_cam_idxs = {} # maps from true cam indices to 'normalized' (i.e 11, 23, 31 maps to -> 0, 1, 2)
    cam_count = 0

    for r in resected_imgs:
        BA_cam_idxs[r] = cam_count
        camera_params.append(np.hstack((R_mats[r].ravel(), t_vecs[r].ravel())))
        cam_count += 1

    for pt3d_idx in range(len(points3d_with_views)):
        points_3d.append(points3d_with_views[pt3d_idx].point3d)
        for cam_idx, kpt_idx in points3d_with_views[pt3d_idx].source_2dpt_idxs.items():
            if cam_idx not in resected_imgs: continue
            point_indices.append(pt3d_idx)
            camera_indices.append(BA_cam_idxs[cam_idx])#append normalized cam idx
            points_2d.append(keypoints[cam_idx][kpt_idx].pt)
    if len(points_3d[0]) == 3: points_3d = np.expand_dims(points_3d, axis=0)

    point_indices = np.array(point_indices)
    points_2d = np.array(points_2d)
    camera_indices = np.array(camera_indices)
    points_3d = np.squeeze(points_3d)
    camera_params = np.array(camera_params)

    n_cameras = camera_params.shape[0]
    n_points = points_3d.shape[0]
    x0 = np.hstack((camera_params.ravel(), points_3d.ravel()))
    A = bundle_adjustment_sparsity(n_cameras, n_points, camera_indices, point_indices)

    res = least_squares(fun, x0, jac_sparsity=A, verbose=2, x_scale='jac', loss='linear', ftol=ftol, xtol=1e-12, method='trf',
                        args=(n_cameras, n_points, camera_indices, point_indices, points_2d, K))

    adjusted_camera_params = res.x[:n_cameras * 12].reshape(n_cameras, 12)
    adjusted_points_3d = res.x[n_cameras * 12:].reshape(n_points, 3)
    adjusted_R_mats = {}
    adjusted_t_vecs = {}
    for true_idx, norm_idx in BA_cam_idxs.items():
        adjusted_R_mats[true_idx] = adjusted_camera_params[norm_idx][:9].reshape(3,3)
        adjusted_t_vecs[true_idx] = adjusted_camera_params[norm_idx][9:].reshape(3,1)
    R_mats = adjusted_R_mats
    t_vecs = adjusted_t_vecs
    for pt3d_idx in range(len(points3d_with_views)):
        points3d_with_views[pt3d_idx].point3d = np.expand_dims(adjusted_points_3d[pt3d_idx], axis=0)

    return points3d_with_views, R_mats, t_vecs

In [100]:
def normalize(pts):
    x_mean = np.mean(pts[:, 0])
    y_mean = np.mean(pts[:, 1])
    sigma = np.mean(np.sqrt((pts[:, 0] - x_mean) ** 2 + (pts[:, 1] - y_mean) ** 2))
    M = np.sqrt(2) / sigma
    T = np.array([
        [M, 0, -M * x_mean],
        [0, M, -M * y_mean],
        [0, 0, 1]
    ])
    return T

def eight_point_algorithm(pts1, pts2):

    pts1_homo = np.vstack((pts1, np.ones(pts1.shape[1]))).T
    pts2_homo = np.vstack((pts2, np.ones(pts2.shape[1]))).T

    # Normalization
    T = normalize(pts1_homo)
    T_prime = normalize(pts2_homo)


    pts1_homo = (T @ pts1_homo.T).T
    pts2_homo = (T_prime @ pts2_homo.T).T

    # x2.T*F*x1=0
    # A*f=0, f is F flattened into a 1D array
    

    # Create A
    A = np.zeros((pts1.shape[1], 9))
    for i in range(pts1.shape[1]):
        A[i] = np.array([
            pts1_homo[i,0]*pts2_homo[i,0], pts1_homo[i,1]*pts2_homo[i,0], pts1_homo[i,2]*pts2_homo[i,0],
            pts1_homo[i,0]*pts2_homo[i,1], pts1_homo[i,1]*pts2_homo[i,1], pts1_homo[i,2]*pts2_homo[i,1],
            pts1_homo[i,0]*pts2_homo[i,2], pts1_homo[i,1]*pts2_homo[i,2], pts1_homo[i,2]*pts2_homo[i,2]
            ])
    
    # Solve Af=0 using svd
    U,S,Vt = np.linalg.svd(A)
    F = Vt[-1,:].reshape((3,3))

    # Enforce rank2 constraint
    U,S,Vt = np.linalg.svd(F)
    S[-1] = 0
    F = U @ np.diag(S) @ Vt

    F = T_prime.T @ F @ T
    return F

In [101]:
def essential_from_fundamental(K1, F, K2):
    return K1.T @ F @ K2

In [102]:
def pose_from_essential(E):
    U,_,Vt = np.linalg.svd(E)
    W = np.array([[0, -1, 0], [1, 0, 0], [0, 0, 1]])
    
    Rs = [U @ W @ Vt, U @ W.T @ Vt]
    for i in range(len(Rs)):
        if np.linalg.det(Rs[i]) < 0:
            Rs[i] = Rs[i] * -1

    # Array with all possible camera poses (extrinsics)
    RTs = np.array([
        np.hstack((Rs[0], U[:, 2, np.newaxis])),
        np.hstack((Rs[0], -U[:, 2, np.newaxis])),
        np.hstack((Rs[1], U[:, 2, np.newaxis])),
        np.hstack((Rs[1], -U[:, 2, np.newaxis])),
    ])

    return RTs

In [103]:
def linear_triangulation(K1, RT1, K2, RT2, pts1, pts2):
    # First, set all points to homogeneous
    pts1_homo = np.vstack((pts1, np.ones(pts1.shape[1])))
    pts2_homo = np.vstack((pts2, np.ones(pts2.shape[1])))

    # Calculate every projection matrix
    P1 = K1 @ RT1
    P2 = K2 @ RT2

    # Solve using svd
    pts3d = np.zeros((3, pts1.shape[1]))
    for i in range(pts1.shape[1]):
        A = np.array([pts1_homo[1,i]*P1[2,:] - P1[1,:],
            P1[0,:] - pts1_homo[0,i]*P1[2,:],
            pts2_homo[1,i]*P2[2,:] - P2[1,:],
            P2[0,:] - pts2_homo[0,i]*P2[2,:]])
        ATA = A.T @ A
        _, _, Vt = np.linalg.svd(ATA)
        pts3d[:, i] = Vt[-1, :3]/Vt[-1, -1]
    
    return pts3d

def linear_triangulation2(P1, P2, pts1, pts2):
    # First, set all points to homogeneous
    pts1_homo = np.vstack((pts1, np.ones(pts1.shape[1])))
    pts2_homo = np.vstack((pts2, np.ones(pts2.shape[1])))

    # Solve using svd
    pts3d = np.zeros((3, pts1.shape[1]))
    for i in range(pts1.shape[1]):
        A = np.array([pts1_homo[1,i]*P1[2,:] - P1[1,:],
                      P1[0,:] - pts1_homo[0,i]*P1[2,:],
                      pts2_homo[1,i]*P2[2,:] - P2[1,:],
                      P2[0,:] - pts2_homo[0,i]*P2[2,:]])
        ATA = A.T @ A
        _, _, Vt = np.linalg.svd(ATA)
        pts3d[:, i] = Vt[-1, :3]/Vt[-1, -1]
    
    return pts3d

In [104]:
def reprojection(P1, P2, pts3d):
    pts3d_homo = np.vstack((pts3d, np.ones(pts3d.shape[1])))
    pts2d1_homo = np.dot(P1, pts3d_homo)
    pts2d2_homo = np.dot(P2, pts3d_homo)
    return pts2d1_homo/pts2d1_homo[-1], pts2d2_homo/pts2d2_homo[-1]

def double_disambiguation(K1, RT1, K2, RT2s, pts1, pts2, pts3d):
    max_positive_z = 0
    min_error = np.finfo('float').max
    best_RT = None
    best_pts3d = None
    P1 = K1 @ RT1

    pts1_homo = np.vstack((pts1, np.ones(pts1.shape[1])))
    pts2_homo = np.vstack((pts2, np.ones(pts2.shape[1])))

    for i in range(RT2s.shape[0]):
        P2 = K2 @ RT2s[i]
        num_positive_z = np.sum(pts3d[i][2, :] > 0)
        re1_pts2, re2_pts2 = reprojection(P1, P2, pts3d[i])

        err1 = np.sum(np.square(re1_pts2 - pts1_homo))
        err2 = np.sum(np.square(re2_pts2 - pts2_homo))

        err = err1 + err2

        if num_positive_z >= max_positive_z and err < min_error:
            max_positive_z = num_positive_z
            min_error = err
            best_RT = RT2s[i]
            best_pts3d = pts3d[i]
    
    return best_RT, best_pts3d

In [105]:
def calculate_projection_matrix(K, pts3d, pts2d):
    _, rod, T, _ = cv2.solvePnPRansac(pts3d.T, pts2d.T, K, None)#, flags=cv2.SOLVEPNP_P3P)
    R = cv2.Rodrigues(rod)[0]
    if np.linalg.det(R) < 0:
        R = R * -1
    P = K @ np.hstack((R, T))
    return P

In [106]:
def plot_model(pts_cloud):
    # Assuming points_3D is your array of 3D points
    x = pts_cloud[0]
    y = pts_cloud[1]
    z = pts_cloud[2]

    fig = go.Figure(data=[go.Scatter3d(x=x, y=y, z=z,
                                    mode='markers',
                                    marker=dict(size=2, color=z, colorscale='Viridis'))])

    fig.update_layout(scene=dict(xaxis_title='X',
                                yaxis_title='Y',
                                zaxis_title='Z'))

    fig.show()

In [107]:
def two_images_sfm(im1, im2, K):
    # Feature extraction
    kp, des = feature_extraction_set([im1, im2])

    # Feature matching
    matches = feature_matching_set(kp, des)

    # Fundamental matrix
    pts1 = np.transpose([kp[0][m.queryIdx].pt for m in matches[(0,1)]])
    pts2 = np.transpose([kp[1][m.trainIdx].pt for m in matches[(0,1)]])

    F = eight_point_algorithm(pts1, pts2)

    # Essential matrix
    E = essential_from_fundamental(K, F, K) # In this case, the same intrinsic values apply to all images

    # Get camera extrinsics from Essential matrix
    RT2s = pose_from_essential(E)

    # Define RT for camera 1 (center at world origin and matching orientation)
    RT1 = np.hstack((np.eye(3), np.zeros((3, 1))))
    
    RT2 = RT2s[0]

    pts3d = np.array([linear_triangulation(K, RT1, K, RT2, pts1, pts2) for RT2 in RT2s])

    RT2, pts_cloud = double_disambiguation(K, RT1, K, RT2s, pts1, pts2, pts3d)

    plot_model(pts_cloud)

    # Initialize general Projection matrix list, RTs list and 3d points list

    return K @ RT1, K @ RT2, pts_cloud

In [108]:
# Load all images
images = np.array(load_images_from_folder('dinos'))[:-1]

In [109]:
# K = np.loadtxt('intrinsic_matrix.txt', dtype=float)

height, width = images.shape[1:3]
K = np.array([  # for dino
    [2360, 0, width / 2],
    [0, 2360, height / 2],
    [0, 0, 1]])

In [110]:
kp, des = feature_extraction_set(images)
matches = feature_matching_set(kp, des)

In [111]:
image_count = 0
pts_cloud = []
P_list = []

while image_count < images.shape[0]:
    P1, P2, pts3d = two_images_sfm(images[image_count], images[image_count+1], K)
    pts_cloud.append(pts3d)
    image_count += 2
    P_list.append([P1, P2])

    for i in range(image_count, images.shape[0]):
        idx1, idx2, idx3 = i-2, i-1, i
        image_count += 1
        
        # Get Common matches of 3 images
        common_matches1 = [m1 for m1 in matches[(idx1, idx3)] for m2 in matches[(idx2, idx3)] if m1.trainIdx == m2.trainIdx]
        common_matches2 = [m2 for m1 in matches[(idx1, idx3)] for m2 in matches[(idx2, idx3)] if m1.trainIdx == m2.trainIdx]
        
        if len(common_matches1) < 4:
            print('STOP')
            break

        # Triangulate common points
        common_pts1 = np.transpose([kp[idx1][m.queryIdx].pt for m in common_matches1])
        common_pts2 = np.transpose([kp[idx2][m.queryIdx].pt for m in common_matches2])
        common_pts3 = np.transpose([kp[idx3][m.trainIdx].pt for m in common_matches1])

        common_pts3d = linear_triangulation2(P_list[-1][-2], P_list[-1][-1], common_pts1, common_pts2)

        # Get Projection matrix
        P = calculate_projection_matrix(K, common_pts3d, common_pts3)

        P_list[-1].append(P)
        
        # Triangulate
        pts1_list = [np.transpose([kp[j][m.queryIdx].pt for m in matches[(j,idx3)]]) for j in range(idx1, idx3)]
        pts2_list = [np.transpose([kp[idx3][m.trainIdx].pt for m in matches[(j,idx3)]]) for j in range(idx1, idx3)]

        for j in range(2):
            pts3d = linear_triangulation2(P_list[-1][-3+j], P_list[-1][-1], pts1_list[j], pts2_list[j])
            pts_cloud[-1] = np.hstack((pts_cloud[-1], pts3d))

STOP


In [112]:
for pts in pts_cloud:
    plot_model(pts)

In [113]:
# Load all images
images = np.array(load_images_from_folder('dinos'))[:-1]
height, width = images.shape[1:3]
K = np.array([  # for dino
    [2360, 0, width / 2],
    [0, 2360, height / 2],
    [0, 0, 1]])

kp, des = feature_extraction_set(images)
matcher = cv2.BFMatcher(cv2.NORM_L1)
matches = find_matches(matcher, kp, des)
matches = remove_outliers(matches, kp)
img_adjacency, list_of_img_pairs = create_img_adjacency_matrix(len(images), matches)

In [114]:
best_pair = best_img_pair(img_adjacency, matches, kp, K, top_x_perc=0.2)
R0, t0, R1, t1, points3d_with_views = initialize_reconstruction(kp, matches, K, best_pair[0], best_pair[1])

R_mats = {best_pair[0]: R0, best_pair[1]: R1}
t_vecs = {best_pair[0]: t0, best_pair[1]: t1}

resected_imgs = [best_pair[0], best_pair[1]] 
unresected_imgs = [i for i in range(len(images)) if i not in resected_imgs] 
avg_err = 0

Triangulating: 453 points.


In [115]:
### This cell grows and refines the reconstruction 
BA_chkpts = [3,4,5,6] + [int(6*(1.34**i)) for i in range(25)]
while len(unresected_imgs) > 0:
    resected_idx, unresected_idx, prepend = next_img_pair_to_grow_reconstruction(len(images), best_pair, resected_imgs, unresected_imgs, img_adjacency)
    points3d_with_views, pts3d_for_pnp, pts2d_for_pnp, triangulation_status = get_correspondences_for_pnp(resected_idx, unresected_idx, points3d_with_views, matches, kp)
    if len(pts3d_for_pnp) < 12:
        print(f"{len(pts3d_for_pnp)} is too few correspondences for pnp. Skipping imgs resected:{resected_idx} and unresected:{unresected_idx}")
        print(f"Currently resected imgs: {resected_imgs}, unresected: {unresected_imgs}")
        continue

    R_res = R_mats[resected_idx]
    t_res = t_vecs[resected_idx]
    print(f"Unresected image: {unresected_idx}, resected: {resected_idx}")
    R_new, t_new = do_pnp(pts3d_for_pnp, pts2d_for_pnp, K)
    R_mats[unresected_idx] = R_new
    t_vecs[unresected_idx] = t_new
    if prepend == True: resected_imgs.insert(0, unresected_idx)
    else: resected_imgs.append(unresected_idx)
    unresected_imgs.remove(unresected_idx)
    pnp_errors, projpts, avg_err, perc_inliers = test_reproj_pnp_points(pts3d_for_pnp, pts2d_for_pnp, R_new, t_new, K)
    print(f"Average error of reprojecting points used to resect image {unresected_idx} back onto it is: {avg_err}")
    print(f"Fraction of Pnp inliers: {perc_inliers} num pts used in Pnp: {len(pnp_errors)}")
    
    if resected_idx < unresected_idx:
        kpts1, kpts2, kpts1_idxs, kpts2_idxs = get_aligned_kpts(resected_idx, unresected_idx, kp, matches, mask=triangulation_status)
        if np.sum(triangulation_status) > 0: #at least 1 point needs to be triangulated
            points3d_with_views, tri_errors, avg_tri_err_l, avg_tri_err_r = triangulate_points_and_reproject(R_res, t_res, R_new, t_new, K, points3d_with_views, resected_idx, unresected_idx, kpts1, kpts2, kpts1_idxs, kpts2_idxs, reproject=True)
    else:
        kpts1, kpts2, kpts1_idxs, kpts2_idxs = get_aligned_kpts(unresected_idx, resected_idx, kp, matches, mask=triangulation_status)
        if np.sum(triangulation_status) > 0: #at least 1 point needs to be triangulated
            points3d_with_views, tri_errors, avg_tri_err_l, avg_tri_err_r = triangulate_points_and_reproject(R_new, t_new, R_res, t_res, K, points3d_with_views, unresected_idx, resected_idx, kpts1, kpts2, kpts1_idxs, kpts2_idxs, reproject=True)
    if 0.8 < perc_inliers < 0.95 or 5 < avg_tri_err_l < 10 or 5 < avg_tri_err_r < 10: 
        #If % of inlers from Pnp is too low or triangulation error on either image is too high, bundle adjust
        points3d_with_views, R_mats, t_vecs = do_BA(points3d_with_views, R_mats, t_vecs, resected_imgs, kp, K, ftol=1e0)
        
    if len(resected_imgs) in BA_chkpts or len(unresected_imgs) == 0 or perc_inliers <= 0.8 or avg_tri_err_l >= 10 or avg_tri_err_r >= 10:
        #If % of inlers from Pnp is very low or triangulation error on either image is very high, bundle adjust with stricter tolerance
        points3d_with_views, R_mats, t_vecs = do_BA(points3d_with_views, R_mats, t_vecs, resected_imgs, kp, K, ftol=1e-1)
    
    av = 0
    for im in resected_imgs:
        p3d, p2d, avg_error, errors = get_reproj_errors(im, points3d_with_views, R_mats[im], t_vecs[im], K, kp, distCoeffs=np.array([]))
        print(f'Average reprojection error on image {im} is {avg_error} pixels')
        av += avg_error
    av = av/len(resected_imgs)
    print(f'Average reprojection error across all {len(resected_imgs)} resected images is {av} pixels')

Unresected image: 28, resected: 27
rvec: [[-70.93074461]
 [ 29.52146087]
 [-87.26459294]] 

tvec: [[-1.56303514]
 [-0.06422056]
 [ 0.44759153]]
Average error of reprojecting points used to resect image 28 back onto it is: 1.0923338695900682
Fraction of Pnp inliers: 1.0 num pts used in Pnp: 175
Triangulating: 157 points.
Average reprojection error for just-triangulated points on image 27 is: 0.1747849541501749 pixels.
Average reprojection error for just-triangulated points on image 28 is: 0.16497836917669037 pixels.
   Iteration     Total nfev        Cost      Cost reduction    Step norm     Optimality   
       0              1         4.3584e+02                                    1.36e+05    
       1              4         2.6538e+02      1.70e+02       8.37e-01       1.34e+05    
       2              6         1.5715e+02      1.08e+02       9.19e-02       5.03e+04    
       3              7         1.0911e+02      4.80e+01       9.93e-02       7.89e+04    
       4              8 

In [116]:
import plotly.graph_objects as go

def plot_model(points3d_with_views):
    # Extract the 3D points from points3d_with_views
    pts_cloud = np.array([pt3.point3d[0] for pt3 in points3d_with_views if np.abs(pt3.point3d[0]).sum() < 200])

    # Assuming points_3D is your array of 3D points
    x = pts_cloud[:, 0]
    y = pts_cloud[:, 1]
    z = pts_cloud[:, 2]

    fig = go.Figure(data=[go.Scatter3d(
        x=x,
        y=y,
        z=z,
        mode='markers',
        marker=dict(
            size=2,
            color=z,                # set color to an array/list of desired values
            colorscale='Viridis',   # choose a colorscale
            opacity=0.8
        )
    )])

    # tight layout
    fig.update_layout(margin=dict(l=0, r=0, b=0, t=0))
    fig.show()
plot_model(points3d_with_views)