# Automatic Map Initialization
The main goal of this is to compute the relative pose between 2 frames to triangulate an initial set of map pts. 

## 1. Find initial correspondences
There's several steps they use to determine if two frames are suitible to generate the intial map points. 

Their first main "test" is a method where they use where they extract only the fine ORB features, and then search for matches between their current frame $F_c$ and reference frame $F_r$. If there aren't enough matches, we reset the reference frame. 

In terms of their code, they do a clever method of tracking only a window of ORB features and testing from that. 

Again, note that this is part of a set of tests to determine if two frames are suitible to generate an intial set of map points. If at any point it fails, it will reject it. 

In [4]:
import cv2 
import numpy as np
from typing import Optional

PATH_TO_IMG1 = "./parallax_1.jpg"
PATH_TO_IMG2 = "./parallax_2.jpg"

In [None]:
class ORBCorrespondence:
    def __init__(
            self,
            nn_ratio: float = 0.6, 
            check_orientation: bool = True,
            window_size: int = 10, 
            min_matches: int = 100
            ) -> None:
        """
        Initializes ORB correspondances for SLAM map initialization.

        :param nn_ratio: Nearest neighbor for Lowe's ratio test.
        :param check_orientation: Check for orientation consistency (generally good).
        :param window_size: Search window in pixels (px).
        :param min_matches: Minimum ORB matches required. 
        """
        self.nn_ratio = nn_ratio
        self.check_orientation = check_orientation
        self.window_size = window_size
        self.min_matches = min_matches

        # as mentioned in the paper, just use orb at the finest scale
        self.orb = cv2.ORB.create(
            nfeatures=1000,
            scaleFactor=1.2,
            nlevels=1, # finest level param
            edgeThreshold=31,
            firstLevel=0,
            WTA_K=2,
            scoreType=cv2.ORB_HARRIS_SCORE,
            patchSize=31,
            fastThreshold=20
        )
    
    def extract_orb_features(
            self, 
            img: np.ndarray
            ) -> tuple[list[cv2.KeyPoint], np.ndarray]:
        """
        Does what you think it does. 
        (extract ORB features lol)
        
        :param img: Input grayscale img.

        :return keypoints: List of detected keypoints. 
        :return descriptors: Feature descriptors (BRIEF) (n x 32 uint8 array).
        """

        kp, des = self.orb.detectAndCompute(img, None)

        if des is None:
            return [], np.array([])
        
        return kp, des
    
    def hamming_distance(
            self, 
            desc1: np.ndarray, 
            desc2: np.ndarray
        ) -> int:
        """
        Computes the Hamming distance between 2 ORB descriptors. 

        :param desc1: ORB descriptor 1
        :param desc2: ORB descriptor 2

        :return: Hamming distance (0 - 256)
        """
        return np.sum(np.unpackbits(desc1 ^ desc2))
    
    def search_for_initialization(
            self,
            frame_ref: np.ndarray,
            frame_curr: np.ndarray,
            prev_matched: Optional[list[tuple[float, float]]] = None
        ) -> tuple[list[int], list[tuple[float, float]], int]:
        """
        Finds correspondances between reference and current frame.

        :param frame_ref: Reference frame
        :param frame_curr: Current frame
        :param prev_matched: Previous matched points for guided search

        :return matches_12: Match indices (ref_idx -> curr_idx, -1 if none)
        :return matched_points: Updated matched point positions
        :return num_matches: Number of successful matches
        """
        # get ORB feat
        kp_ref, desc_ref   = self.extract_orb_features(frame_ref)
        kp_curr, desc_curr = self.extract_orb_features(frame_curr)

        if len(kp_ref) == 0 or len(kp_curr) == 0:
            return [], [], 0
        
        # init outputs
        matches_12 = [-1] * len(kp_ref)
        matched_pts = []

        # now match keypts to each other within a window
        pts_ref = np.array([kp.pt for kp in kp_ref])
        pts_curr = np.array([kp.pt for kp in kp_curr])

        for i, (kp_r, desc_r) in enumerate(zip(kp_ref, desc_ref)):
            # search window
            if prev_matched and i < len(prev_matched):
                # use previous match if exists
                center_x, center_y = prev_matched[i].x, prev_matched[i].y
            else:
                # use current kp as center of window
                center_x, center_y = kp_r.pt
            
            # find potetial matches in window
            candidates = []
            for j, (kp_c, desc_c) in enumerate(zip(kp_curr, desc_curr)):
                dx = kp_c.pt[0] - center_x
                dy = kp_c.pt[1] - center_y

                # within window
                if abs(dx) <= self.window_size and abs(dy) <= self.window_size:
                    dist = self.hamming_distance(desc_r, desc_c)
                    candidates.append((j, dist))
            
            if len(candidates) < 2: # not enough
                continue

            candidates.sort(key=lambda x: x[1]) # sort by dist
            
            # Lowe's ratio test 
            best_dist_1 = candidates[0][1]
            best_dist_2 = candidates[1][1]

            if best_dist_1 < self.nn_ratio * best_dist_2:
                best_idx = candidates[0][0]

                # orientation consistency
                if self.check_orientation:
                    angle_diff = abs(kp_ref[i].angle - kp_curr[best_idx].angle)
                    angle_diff = min(angle_diff, 360 - angle_diff) # wraparound

                    if angle_diff > 30: # if more than 30 break
                        continue
                
                # store match
                matches_12[i] = best_idx
                matched_pts.append((kp_curr[best_idx].pt[0], kp_curr[best_idx].pt[1]))
        
        # successful matches seen
        num_matches = sum(1 for match in matches_12 if match != -1)

        return matches_12, matched_pts, num_matches
    
    def get_matched_keypoints(
            self, 
            kp_ref: list[cv2.KeyPoint],
            kp_curr: list[cv2.KeyPoint],
            matches_12: list[int]
        ) -> tuple[np.ndarray, np.ndarray]:
        """
        Extract the matched kp coordinates (np.ndarray)

        
        """
        pts_ref = []
        pts_curr = []

        for i, match_idx in enumerate(matches_12):
            if match_idx != -1:
                pts_ref.append(kp_ref[i].pt)
                pts_curr.append(kp_curr[match_idx].pt)
        
        return np.array(pts_ref), np.array(pts_curr)

    def visualize_matches(
            self,
            img_ref, img_curr,
            kp_ref, kp_curr,
            matches_12
        ) -> np.ndarray:
        good_matches = []
        for i, match_idx in enumerate(matches_12):
            if match_idx != -1:
                good_matches.append(cv2.DMatch(i, match_idx, 0))

        # actual drawing
        img_matches = cv2.drawMatches(
            img_ref, kp_ref,
            img_curr, kp_curr,
            good_matches, None,
            flags=cv2.DrawMatchesFlags_NOT_DRAW_SINGLE_POINTS
        )

        return img_matches


In [15]:
img_ref = cv2.imread(PATH_TO_IMG1)
img_curr = cv2.imread(PATH_TO_IMG2)

finder = ORBCorrespondence(
    min_matches=3,
    check_orientation=False,
    window_size=4000
)

matches_12, matched_pts, num_matches = finder.search_for_initialization(
    img_ref, img_curr
)

print(f"Found {num_matches} matches")

if num_matches > finder.min_matches:
    print("Enough matches found")

    # orb features
    kp_ref, _ = finder.extract_orb_features(img_ref)
    kp_curr, _ = finder.extract_orb_features(img_curr)

    # matched coords (to be used in the rest of testing)
    pts_ref, pts_curr = finder.get_matched_keypoints(kp_ref, kp_curr, matches_12)

    img_matches = finder.visualize_matches(
        img_ref, img_curr, 
        kp_ref, kp_curr, 
        matches_12
    )
    cv2.imshow("Initial correspondences", img_matches)
    cv2.waitKey(0)
    cv2.destroyAllWindows()
else:
    print(f"Not enough matches ({num_matches}/{finder.min_matches})")


Found 5 matches
Enough matches found


## 2. Parallel computation of the two models
The two models chosen is the computation of a homography $H_{cr}$ and a fundamental matrix $F_{cr}$. This is done using the normalized DLT and 8-point algorithms respectively, both within a RANSAC scheme. 

fjasdl

In [None]:
def normalizePts(pts):
    """
    Normalizes the points and returns the transformation matrix as well. 

    Parameters
    ----------
    pts: 2D numpy.array (dtype=float)
        Coordinates of N points in the left and right view stored in arrays of shape (2,N)

    Returns
    -------
    pts_normalized: 2D numpy.array (dtype=float)
        Normalized coordinates for the points, centered at the origin w/ mean squared dist = 2 (px)
    T: 2D numpy.array (dtype=float)
        Transformation matrix for the point (to be undone later in 8pt alg).
    """
    _, n = pts.shape

    # Compute the centroid
    centroid = np.mean(pts, axis=1)

    # Shift to origin
    pts_centered = pts - centroid[:, np.newaxis]

    # Compute avg dist from origin
    scale = np.mean(np.linalg.norm(pts_centered, axis=0)) / np.sqrt(2) # 2 px

    # Create T
    T = np.linalg.inv(np.array([
        [scale, 0, centroid[0]], 
        [0, scale, centroid[1]], 
        [0, 0, 1]]))
    
    # Apply T
    pts_normalized = np.dot(T, np.vstack((pts, np.ones(n))))

    return pts_normalized, T

In [None]:
def computeFitError(pts2L, pts2R, F):
    """
    Computes the quality of the fit measured as the average of (x^T*F*x')^2 over all corresponding points x,x'.

    Parameters
    ----------
    pts2L : 2D numpy.array (dtype=float)
    pts2R : 2D numpy.array (dtype=float)
        Coordinates of N points in the left and right view stored in arrays of shape (2,N)

    F : 2D numpy.array (dtype=float)
        Fundamental matrix of shape (3,3)
    
    Returns
    -------
    fiterror : float
        The quality of the fit measured as the average of (x^T*F*x')^2 over all corresponding points x,x'
    """
    _, n = pts2L.shape
    fiterror = 0

    for i in range(n):
        x = np.array([ # conver to homogeneous corods
            pts2L[0, i], pts2L[1, i], 1 
        ])
        x_ = np.array([
            pts2R[0, i], pts2R[1, i], 1
        ])

        fiterror += (x.T @ F @ x_) ** 2

    return fiterror / n

In [None]:
def fitFudamental(pts2L,pts2R):
    """
    Estimate fundamental matrix from point correspondences in two views 
    
    Parameters
    ----------
    pts2L : 2D numpy.array (dtype=float)
    pts2R : 2D numpy.array (dtype=float)
        Coordinates of N points in the left and right view stored in arrays of shape (2,N)


    Returns
    -------
    F : 2D numpy.array (dtype=float)
        Fundamental matrix of shape (3,3)
        
    fiterror : float
        The quality of the fit measured as the average of (x^T*F*x')^2 over all corresponding points x,x'
    
    """    
    _, n = pts2L.shape
    # needs at least 8 pts
    assert(n >= 8)

    ### NORMALIZE DATA
    # Center data at origin, scale it
    # keep transformations for last part
    norm_pts2L, T_L = normalizePts(pts2L)
    norm_pts2R, T_R = normalizePts(pts2R)

    ### EIGHT-POINT ALGORITHM
    # for every pair of points (u, v, 1) and (u', v', 1)...
    #   add a row add flattened outer product
    #   = (uu', uv', u, vu', vv', v, u', v', 1)
    #   then use the trick learned from normal E
    #   where we only take the last vector of Vt to solve
    A = np.array([
        [
            p1[0] * p2[0], p1[0] * p2[1], p1[0],    # uu', uv', u
            p1[1] * p2[0], p1[1] * p2[1], p1[1],    # vu', vv', v
            p2[0], p2[1], 1                         # u', v', 1
        ] for p1, p2 in zip(norm_pts2L.T, norm_pts2R.T)
    ])
    _, _, Vt = np.linalg.svd(A)
    F = Vt[-1].reshape(3, 3)


    ### ENFORCE RANK-2 CONSTRAINT
    # Just take svd and throw out smallest singular value
    U, S, Vt = np.linalg.svd(F)
    S[2] = 0     # remember singular values are sorted
    F = U @ np.diag(S) @ Vt

    ### TRANSFORM F BACK TO ORIGNAL UNITS
    # If T and T' are normalizing transformations, then fix by
    # T.T @ F @ T'
    F = T_R.T @ F @ T_L
    
    # print(F)
    ### COMPUTE FIT ERROR
    fiterror = computeFitError(pts2L, pts2R, F)
    
    return (F, fiterror)


In [None]:
def fitFundamentalRANSAC(
        pts2L, pts2R, 
        thresh, 
        niter=256
        ):
    """
    Robust estimation of fundamental matrix from point correspondences in two views 
    
    Parameters
    ----------
    pts2L : 2D numpy.array (dtype=float)
    pts2R : 2D numpy.array (dtype=float)
            Coordinates of N points in the left and right view stored in arrays of shape (2,N)

    thresh : float
        Error threshold to decide whether a point is an inlier or outlier

    niter : int
        Number of RANSAC iterations to run
        
    Returns
    -------
    F : 2D numpy.array (dtype=float)
        Fundamental matrix of shape (3,3)
    
    inliers : numpy.array (dtype=int)
        Indices of the points which were determined to be inliers
        
    fiterror : float
        The quality of the fit measured as the average of (x^T*F*x')^2 over all corresponding points x,x'
    
    """    
    _, n = pts2L.shape
    assert(n >= 8)

    # store best inliers
    F = np.zeros((3, 3))
    inliers = np.array([])
    fiterror = float('inf')
    

    for _ in range(niter):
        # Find subset of points
        #   choose random subset of MINIMUM points (in this case, 8)
        idxs_min = np.random.choice(n, 8, replace=False)

        # generate F from sample
        F_est, _ = fitFudamental(pts2L[:, idxs_min], pts2R[:, idxs_min])

        # compute inliers 
        inliers_new = []
        for i in range(n):
            x = np.array([pts2L[0, i], pts2L[1, i], 1])
            x_ = np.array([pts2R[0, i], pts2R[1, i], 1])
            if abs(x.T @ F_est @ x_) < thresh:
                inliers_new.append(i)

        # if too few inliers, exit
        if len(inliers_new) < 8:
            continue
        
        # generate F with final inliers + score model
        F_new, fiterror_new = fitFudamental(pts2L[:, inliers_new], pts2R[:, inliers_new])

        # update final F + inliers (if better)
        if fiterror_new < fiterror:
            F = F_new
            inliers = inliers_new
            fiterror = fiterror_new
    
    return (F, inliers, fiterror)


In [None]:
def fitHomography(pts2L, pts2R):
    """
    Estimates a homography between 2 points using (normalized) DLT


    """

    _, n = pts2L.shape
    # need at least 4 pts
    assert(n >= 4)

    ### NORMALIZE DATA
    # Center data at origin, scale it
    # keep transformations for last part
    norm_pts2L, T_L = normalizePts(pts2L)
    norm_pts2R, T_R = normalizePts(pts2R)

    ### DLT ALGORITHM
    #   for every pair of points add 2 new rows
    #   [ 0, 0, 0, -pt, pt[1] * pt]
    #   [ -pt, [0, 0, 0], -pt[0] * pt]
    A = []
    for p1, p2 in zip(norm_pts2L.T, norm_pts2R.T):
        pass

    A = np.array(A)
    _, _, Vt = np.linalg.svd(A)
    H = Vt[-1].reshape(3, 3)

    ### TRANSFORM H BACK TO ORIGNAL UNITS
    # If T and T' are normalizing transformations, then fix by
    # T.T @ H @ T'
    H = T_R.T @ H @ T_L

    ### COMPUTE ERROR
    pass

    return H


The third step is to select a good model. Ideally one that the fundamental matrix has a good result on and the homography on a subset is ideal. 

The fourth step is to perform motion and structure from 