# VO Pipeline
_Vision Algorithms for Mobile Robotics | Fall 2025_<br><br>
_David Jensen, Alessandro Pirini, Matteo Rubini, Alessandro Ferranti_

## Notes on writing code
For now, try to make each block a function; see below for format.

In [None]:
def this_is_a_function(state, params, whatever_else_function_needs):
    # update state in place and return only things that not included in the state
    return None

# you can then call your function below it to debug, process data for the next step
this_is_a_function(None, None, None)

This way debugging still works easily, but then we can have a couple main blocks at the very end that handle everything nicely.<br>
For the blocks in the _Operation_ section, probably pass in the state dictionary and parameter class and then whatever else might be relevant

Also, when you need to add a parameter (ex confidence for RANSAC), you have options. If it is a global paramter that is the same for all datasets, add it [here](#paramaters-for-all-datasets). If it could change based on the dataset, add it [here](#paramaters-for-specific-datasets). After adding the parameter value in the appropriant place, add where required [here](#paramaters)

Another cool thing is `Jupyter Variables`: click on it in the top toolbar, and it shows name, type, size, and value for all variables. Nice for debugging.

## Imports

### Libraries

In [None]:
import os
from glob import glob

from typing import Tuple, Dict

import cv2
import numpy as np
import scipy

import matplotlib as mpl
import matplotlib.pyplot as plt
from matplotlib.collections import LineCollection
from matplotlib.patches import FancyArrowPatch

### Data
_Ensure that all datasets have been downloaded and unzipped into their respective folders_

In [None]:
# Dataset -> 0: KITTI, 1: Malaga, 2: Parking, 3: Own Dataset
DATASET = 2

In [None]:
# Define dataset paths
# (Set these variables before running)
kitti_path = "kitti/kitti05/kitti"
malaga_path = "malaga/malaga-urban-dataset-extract-07"
parking_path = "parking/parking"
# own_dataset_path = "/path/to/own_dataset"

if DATASET == 0:
    assert 'kitti_path' in locals(), "You must define kitti_path"
    img_dir = os.path.join(kitti_path, '05/image_0')
    images = glob(os.path.join(img_dir, '*.png'))
    last_frame = 4540
    K = np.array([
        [7.18856e+02, 0, 6.071928e+02],
        [0, 7.18856e+02, 1.852157e+02],
        [0, 0, 1]
    ])
    ground_truth = np.loadtxt(os.path.join(kitti_path, 'poses', '05.txt'))
    ground_truth = ground_truth[:, [-9, -1]]  # same as MATLAB(:, [end-8 end])

elif DATASET == 1:
    assert 'malaga_path' in locals(), "You must define malaga_path"
    img_dir = os.path.join(malaga_path, 'malaga-urban-dataset-extract-07_rectified_800x600_Images')
    images = sorted(glob(os.path.join(img_dir, '*.png')))
    last_frame = len(images)
    K = np.array([
        [621.18428, 0, 404.0076],
        [0, 621.18428, 309.05989],
        [0, 0, 1]
    ])
    ground_truth = None
    
elif DATASET == 2:
    assert 'parking_path' in locals(), "You must define parking_path"
    img_dir = os.path.join(kitti_path, '05/image_0')
    images = glob(os.path.join(img_dir, '*.png'))
    last_frame = 598
    K = np.loadtxt(os.path.join(parking_path, 'K.txt'), delimiter=",", usecols=(0, 1, 2))
    ground_truth = np.loadtxt(os.path.join(parking_path, 'poses.txt'))
    ground_truth = ground_truth[:, [-9, -1]]
    
elif DATASET == 3:
    # Own Dataset
    # TODO: define your own dataset and load K obtained from calibration of own camera
    assert 'own_dataset_path' in locals(), "You must define own_dataset_path"

else:
    raise ValueError("Invalid dataset index")

## Parameters

### Paramaters for all datasets

In [None]:
# Paramaters for Shi-Tomasi corners
feature_params = dict( maxCorners = 10,
                       qualityLevel = 0.3,
                       minDistance = 7,
                       blockSize = 7 )

# Parameters for LKT
lk_params = dict( winSize  = (15, 15),
                  maxLevel = 2,
                  criteria = (cv2.TERM_CRITERIA_EPS | cv2.TERM_CRITERIA_COUNT, 10, 0.03))

# min dist in pxl from a new feature to the nearest existing feature for the new feature to be added
new_feature_min_dist = 5


### Paramaters for specific datasets

In [None]:

# Next keyframe to use for bootstrapping
KITTI_BS_KF = 5
MALAGA_BS_KF = 5
PARKING_BS_KF = 5
CUSTOM_BS_KF = 5

# Number of rows and columns to divide image into for feature detection and number of features to track in each cell
KITTI_ST_ROWS, KITTI_ST_COLS, KITTI_NUM_FEATURES = 2, 4, 20
MALAGA_ST_ROWS, MALAGA_ST_COLS, MALAGA_NUM_FEATURES = 2, 4, 20
PARKING_ST_ROWS, PARKING_ST_COLS, PARKING_NUM_FEATURES = 2, 4, 20
CUSTOM_ST_ROWS, CUSTOM_ST_COLS, CUSTOM_NUM_FEATURES = 2, 4, 20

### Instantiate params class

#### Generate masks for feature tracking

In [None]:
def get_feature_masks(img_path, rows, cols) -> list[np.ndarray]:
    # get image shape
    img = cv2.imread(img_path)
    H, W = img.shape[:2]

    # get boundries of the cells
    row_boundries = np.linspace(0, H, rows + 1, dtype=int)
    col_boundries = np.linspace(0, W, cols + 1, dtype=int)

    # create masks left to right, top to bottom
    masks = []
    for row in range(rows):
        for col in range(cols):
            mask = np.zeros((H, W), dtype="uint8")
            r_s, r_e = row_boundries[[row, row + 1]]
            c_s, c_e = col_boundries[[col, col + 1]]
            mask[r_s:r_e, c_s:c_e] = 255
            masks.append(mask)
            
            # visulaization
            # vis = np.zeros_like(img)
            # vis[mask] = img[mask]
            # cv2.imshow("masked", vis)
            # cv2.waitKey(0)
            # cv2.destroyAllWindows()

    return masks

#### Paramaters

In [None]:
class VO_Params():
    bs_kf_1: str # path to first keyframe used for bootstrapping dataset
    bs_kf_2: str # path to second keyframe used for bootstrapping dataset
    feature_masks: list[np.ndarray] # mask image into regions for feature tracking 
    shi_tomasi_params: dict
    klt_params: dict
    k: np.ndarray # camera intrinsics matrix
    start_idx: int # index of the frame to start continous operation at (2nd bootstrap keyframe index)
    new_feature_min_dist: float # min dist in pxl from a new feature to the nearest existing feature for the new feature to be added
    # ADD NEW PARAMS HERE

    def __init__(self, bs_kf_1, bs_kf_2, feature_masks, shi_tomasi_params, klt_params, k, start_idx, new_feature_min_dist):
        self.bs_kf_1 = bs_kf_1
        self.bs_kf_2 = bs_kf_2
        self.feature_masks = feature_masks
        self.shi_tomasi_params = shi_tomasi_params
        self.klt_params = klt_params
        self.k = k
        self.start_idx = start_idx
        self.new_feature_min_dist = new_feature_min_dist
        # ADD NEW PARAMS HERE

if DATASET == 0:
    assert 'kitti_path' in locals(), "You must define kitti_path"
    bs_kf_1 = images[0]
    bs_kf_2 = images[KITTI_BS_KF]
    start_idx = KITTI_BS_KF
    feature_masks = get_feature_masks(bs_kf_1, KITTI_ST_ROWS, KITTI_ST_COLS)
    # ADD NEW PARAMS HERE

elif DATASET == 1:
    assert 'malaga_path' in locals(), "You must define malaga_path"
    bs_kf_1 = images[0]
    bs_kf_2 = images[MALAGA_BS_KF]
    start_idx = MALAGA_BS_KF
    feature_masks = get_feature_masks(bs_kf_1, MALAGA_ST_ROWS, MALAGA_ST_COLS)
    # ADD NEW PARAMS HERE

elif DATASET == 2:
    assert 'parking_path' in locals(), "You must define parking_path"
    bs_kf_1 = images[0]
    bs_kf_2 = images[PARKING_BS_KF]
    start_idx = PARKING_BS_KF
    feature_masks = get_feature_masks(bs_kf_1, PARKING_ST_ROWS, PARKING_ST_COLS)
    # ADD NEW PARAMS HERE

elif DATASET == 3:
    # Own Dataset
    # TODO: define your own dataset and load K obtained from calibration of own camera
    assert 'own_dataset_path' in locals(), "You must define own_dataset_path"

else:
    raise ValueError("Invalid dataset index")

# ADD NEW PARAMS HERE TO THE INIT
params = VO_Params(bs_kf_1, bs_kf_2, feature_masks, feature_params, lk_params, K, start_idx, new_feature_min_dist)

# Initialization
- Select two keyframes with large enough baseline
- Use indirect (feature-based) or direct (KLT) method to establish keypoint corrispondences between frames
- Estimate relative pose and triangulate points to bootstrap point cloud (5-pt RANSAC)
- Initialize VO pipeline with inlier keypoints and their associated landmarks

### Initialization,Feature extraction

Detect a set of 2D keypoints in the first bootstrap keyframe (bs_kf_1) that are well distributed across the image.

Implementation:
- We use `cv2.goodFeaturesToTrack` (Shi–Tomasi) to detect corners.
- To avoid clustering of keypoints in high-texture areas, we apply Shi–Tomasi separately in multiple image regions using `params.feature_masks` and then concatenate the results.

Output:
- `st_corners`: array of shape `(N, 1, 2)` (float32), suitable for KLT tracking with `cv2.calcOpticalFlowPyrLK`.


In [None]:
def extractFeatures(img_grayscale, params):
    """
    Step 1 (Initialization): detect Shi–Tomasi corners on a grid using feature masks.

    Args:
        img_grayscale (np.ndarray): bootstrap keyframe 1 in grayscale (H x W).
        params (VO_Params): contains feature_masks and shi_tomasi_params.

    Returns:
        st_corners (np.ndarray): (N, 1, 2) float32 corners for KLT tracking.
    """
    st_corners = np.empty((0, 1, 2), dtype=np.float32)
    for n, mask in enumerate(params.feature_masks):
        features = cv2.goodFeaturesToTrack(img_grayscale, mask=mask, **params.shi_tomasi_params)
        # If no corners are found in this region, skip it
        if features is None: 
            print(f"No features found for mask {n+1}!")
            continue
        # Warn if very few features were found in this region (not necessarily an error)
        if features.shape[0] < 10:
            print(f"Only {features.shape[0]} features found for mask {n+1}!")
        st_corners = np.vstack((st_corners, features))
    return st_corners


### Initialization, KLT tracking across intermediate frames

Establish 2D–2D correspondences between the two bootstrap keyframes (bs_kf_1 → bs_kf_2).

Implementation:
- Tracking the detected keypoints using KLT optical flow (`cv2.calcOpticalFlowPyrLK`).
- Instead of tracking directly from keyframe 1 to keyframe 2 in one shot, we track *frame-by-frame* across the intermediate frames,more stable when the motion between keyframes is larger.
- We maintain a boolean mask `still_detected` that keeps track of keypoints successfully tracked at every frame.

Output:
- `points[still_detected]`: tracked keypoints in `bs_kf_2`
- `initial_points`: initial deteced keypoints in `bs_kf_1`
- `still_detected`: boolean mask (relative to the original set) indicating which keypoints survived the entire bootstrap tracking.



In [None]:
def trackForwardBootstrap(images,params,st_corners_kf_1):
    """
    Module 2-Initialization: track keypoints from bs_kf_1 to bs_kf_2 with KLT across intermediate frames.

    Args:
        images (list[str]): full dataset sequence (paths).
        params (VO_Params): contains bs_kf_1, bs_kf_2, klt_params.
        st_corners_kf_1 (np.ndarray): (N,1,2) keypoints in bs_kf_1.

    Returns:
        points[still_detected] (np.ndarray): (M,1,2) tracked points in bs_kf_2.
        initial_points (np.ndarray): (N,1,2) points in bs_kf_1.
        still_detected (np.ndarray): (N,) boolean mask of points visible i all frames.
    """
    img_bs_kf_1_index=images.index(params.bs_kf_1)
    img_bs_kf_2_index=images.index(params.bs_kf_2)
    still_detected=np.ones(st_corners_kf_1.shape[0],dtype=bool)
    points = st_corners_kf_1.copy()
    initial_points = st_corners_kf_1.copy()
    #Track keypoints frame-by-frame from first bs frame to second bs frame
    for i in range(img_bs_kf_1_index, img_bs_kf_2_index):
        current_image=cv2.imread(images[i],cv2.IMREAD_GRAYSCALE)
        next_image=cv2.imread(images[i+1],cv2.IMREAD_GRAYSCALE)
        nextPts,status,error=cv2.calcOpticalFlowPyrLK(current_image,next_image,points, None, **params.klt_params)
        points=nextPts
        status=status.flatten()
        still_detected=still_detected & (status==1)

    # Keep only points that were successfully tracked throughout
    return points[still_detected], initial_points, still_detected

### Transformation
TODO: find transformation between the two frames using cv2.findHomography()

### Initialization, Relative pose estimation with RANSAC

Estimate the relative camera motion between the two bootstrap keyframes using
2D–2D correspondences obtained after KLT tracking.

Since the tracked correspondences still contain outliers, we  estimate the epipolar geometry using RANSAC.

Procedure:
1. Estimate the Fundamental matrix using `cv2.findFundamentalMat` with RANSAC.
2. Convert the Fundamental matrix into the Essential matrix using camera intrinsics.
3. Recover the relative rotation and translation (up to scale) using `cv2.recoverPose`.
4. Keep only inlier correspondences.

The resulting relative pose (R, t) is returned together with inliers bollean mask.

In [None]:
def ransacHomography(points1,points2):
    """
    Estimate relative pose between two keyframes using RANSAC.

    Args:
        points1, points2: corresponding 2D points (N,2) or (N,1,2)

    Returns:
        R (3x3): relative rotation matrix
        t (3x1): relative translation vector (up to scale)
        inliers (N,): boolean mask of inlier correspondences
    """
    #F mat using ransac
    fundamental_matrix, inliers =cv2.findFundamentalMat(points1,points2,cv2.FM_RANSAC,ransacReprojThreshold=1.0)

    #using boolean vector
    inliers = inliers.ravel().astype(bool)

    #compute the essential matrix
    E= K.T@ fundamental_matrix@K

    #recover the relative camera pose
    _,R,t,mask_pose=cv2.recoverPose(E,points1[inliers],points2[inliers],K)

    return R, t, inliers

In [None]:
#check point 1 and point 2 and point 3
#read bootstrap keyframes
img_bs_kf_1 = cv2.imread(params.bs_kf_1,cv2.IMREAD_GRAYSCALE)
img_bs_kf_2 = cv2.imread(params.bs_kf_2,cv2.IMREAD_GRAYSCALE)

#1)extract features
st_corners_kf_1 = extract_features(img_bs_kf_1, params)

#2)KLT interframe tracking
st_corners_kf_2, st_corners_kf_1,still_detected =feature_tracking_klt(images, params, st_corners_kf_1)

print("Initial corners:",st_corners_kf_1.shape[0])
print("Tracked corners:",st_corners_kf_2.shape[0])


# 3)estimate relative pose with RANSAC
R, t, inliers = estimate_relative_pose(st_corners_kf_1[still_detected], st_corners_kf_2)

print("Inliers after RANSAC:", inliers.sum(), "/", len(inliers))
print("R:\n", R)
print("t (up to scale):\n", t.T)

# Visual test: inliers (green) vs outliers (red) on keyframe 1
vis = cv2.cvtColor(img_bs_kf_1, cv2.COLOR_GRAY2BGR)
pts1 = st_corners_kf_1[still_detected].reshape(-1, 2)

for i, (x, y) in enumerate(pts1):
    color = (0, 255, 0) if inliers[i] else (0, 0, 255)
    cv2.circle(vis, (int(x), int(y)), 2, color, -1)


import matplotlib.pyplot as plt

plt.figure(figsize=(6,4))
plt.imshow(vis[..., ::-1])  # BGR -> RGB
plt.title("Step3 RANSAC: inliers (green) / outliers (red)")
plt.axis("off")
plt.show()

In [None]:
print("Inlier ratio:", inliers.sum() / len(inliers))
print("||t||:", np.linalg.norm(t))


### Triangulate points to get point cloud
TODO: find 3d points using triangulatePoints(); the projection matrices are $K*[R|T]$ where $[R|T]$ is $[I|0]$ for the first image and the homography from above for the second image

In [None]:
def bootstrapPointCloud(params: VO_Params, H: np.ndarray, points_1: np.ndarray, points_2: np.ndarray) -> np.ndarray:
    """Bootstrap the initial 3D point cloud using least squares assuming the first frame is the origin

    Args:
        params (VO_Params): params object for the dataset being used
        H (np.ndarray): homographic transformation from bootstrap keyframe 1 to 2
        points_1 (np.ndarray): keypoints detected in bootstrap keyframe 1
        points_2 (np.ndarray): keypoints tracked in bootstrap keyframe 2

    Returns:
        np.ndarray: [3 x k] array of triangulated points
    """
    # projection matrices
    proj_1 = params.k @ np.hstack([np.eye(3), np.zeros((3,1))])
    proj_2 = params.k @ H

    # triangulate homogeneous coordinates using DLT
    points_homo = cv2.triangulatePoints(proj_1, proj_2, points_1, points_2)

    # convert back to 3D
    points_3d = (points_homo[:3, :]/points_homo[3, :])

    return points_3d
#pts = bootstrapPointCloud(params, np.hstack([np.eye(3), np.ones((3,1))]), H_bootstrap, st_corners_kf_1, st_corners_kf_2)

### Build state to initialize the algorithm
The state is $(P^i, X^i, C^i, F^i, \Tau^i)$ where<br>
$P^i$ is a `[k x 1 x 2]` matrix of initial features' pixel in the second keyframe of the dataset<br>
$X^i$ is a `[3 x k]` matrix of the 3D cooridinates of the corrisponding landmarks<br>
$C^i$ is a `[m x 1 x 2]` matrix of current locations of candidate keypoints (empty to start so `m=0`)<br>
$F^i$ is a `[m x 1 x 2]` matrix of initial observation of candidate keypoints (empty to start so `m=0`)<br>
$\Tau^i$ is a `[m x 12]` matrix of the camera pose during the initial observation (empty at the start so `m=0`)<br>
This can be stored in a dict $S = \{P: [k \times 1 \times 2], X: [3 \times k], ...\}$

In [None]:
def bootstrapState(P_i: np.ndarray, X_i: np.ndarray) -> dict[str : np.ndarray]:
    """
        Builds the initial state taking the 2D keypoints and their respective 3D points generated by the bootstrap
        
        Args:
            P_i: kx1x2 matrix of 2D keypoints found from the first two frames
            X_i: 3xk matrix containing the 3D projections of the keypoints
        
        Returns:
            dictionary of dictionaries, where every variable of interest is a key and its value is the matrix (e.g. S["P"] returns a kx1x2 matrix of keypoints)  
    """
    S : dict[str : np.ndarray] = {}
    assert P_i.shape()[0] == X_i.shape()[1], "2D keypoints number of rows should match the 3D keypoints number of columns"
    
    S["P"] = P_i    
    S["X"] = X_i    
    S["C"] = np.empty(0,1,2)
    S["F"] = np.empty_like(S["C"])
    S["T"] = np.empty(0,12)

    return S

# init_state = bootstrapState(P_i = st_corners_kf_1, X_i = pts) # might be wrong, double check on sunday

# Operation
- Match keypoints in current image to existing landmarks
    - Extract keypoints (Harris)
    - Track (KLT)
- Estimate pose
    - Estimate pose and handle outliers (P3P plus RANSAC)
- Add new landmarks as needed by triangulating new features
    - Keep track of candidate landmarks
        - Keypoint itself
        - Observation when first seen
        - Pose when first seen
    - Only add when they have been tracked for long enough and baselineis large enough
    - Discard if track fails<br>

_NOTE: this starts at the frame after the second keyframe (`bs_kf_2`) and goes until the last frame in the dataset_


### Track keypoints forward one frame
Here we want to use cv2.calcOpticalFlowPyrLK() with previous frame, current frame, $P^i$ and $C^i$,  - see use in initialization for example and then update $P^i$ and $X^i$ as well as $C^i$, $F^i$ and $\Tau^i$ based on the features that were successfully tracked (ie remove any features that were not tracked)

In [None]:
def trackForward(params: VO_Params, state: dict[str:np.ndarray], img_1: np.ndarray, img_2: np.ndarray) -> dict[str:np.ndarray]:
    """
    Track 2D keypoints from img_1 to img_2 using KLT optical flow

    Args:
        params (VO_Params): parameters for the VO pipeline
        img_1 (np.ndarray): first image (grayscale)
        img_2 (np.ndarray): second image (grayscale)
        points_1 (np.ndarray): keypoints in img_1 to be tracked
    Returns:
        np.ndarray: tracked keypoints in img_2
    """
    # NOTE: might make sense to replace assert with ifs since maybe even if we have no points nor candidates we still want to return them empty again
    # FIRST WE TRACK "ESTABILISHED KEYPOINTS"
    points_2D = state["P"]
    assert points_2D.shape[0] > 0, "There are no keypoints here, we can't track them forward"    

    current_points, status, _ = cv2.calcOpticalFlowPyrLK(prevImg=img_1, nextImg=img_2, prevPts=points_2D, nextPts=None, **params.klt_params)
    status = status.flatten()   # we are going to use them as booleans so maybe we should cast them with astype (?)
    
    # update the state with the current points
    state["P"] = current_points[status]
    state["X"] = state["X"][:, status]  # only get the ones with "true" status and slice them as 3xk
    

    # THEN WE TRACK CANDIDATES
    candidates = state["C"]
    assert candidates.shape[0] > 0, "No candidates found, something went wrong"
    
    current_cands, status_cands, _ = cv2.calcOpticalFlowPyrLK(prevImg=img_1, nextImg=img_2, prevPts=points_2D, nextPts=None, **params.klt_params)
    status_cands = status.flatten() # same as above
    
    state["C"] = current_cands[status_cands]
    # initial observations, still have some doubts on these two
    state["F"] = state["F"][status_cands]
    state["T"] = state["T"][status_cands]

    return state      

### Estimate pose
Here we want to use cv2.solvePnPRansac() with updated $P^i$, $X^i$, $K$, to find pose of camera at current position and then update $P^i$ and $X^i$ with the inliers at the end

In [None]:
def estimatePose(params: VO_Params, state: dict[str:np.ndarray]) -> tuple[dict[str:np.ndarray], np.ndarray]:
    """
    Estimate camera pose using PnP RANSAC and update state to keep only inliers
    
    Args:
        params (VO_Params): parameters for the VO pipeline
        state (dict): current state that also contains 2D keypoints and 3D points
    
    Returns:
        tuple[dict, np.ndarray]: updated state with only inliers for P and X and camera pose as 3x4 matrix
    """
    
    pts_2D = state["P"]
    pts_3D = state["X"]
    pts_3D = pts_3D.T # according to documentation we need them as kx3 not 3xk
    K = params.k
    
    success, r_vec, t_vec, inliers_idx =  cv2.solvePnPRansac(objectPoints=pts_3D, imagePoints=pts_2D, cameraMatrix=K, distCoeffs=None, flags=cv2.SOLVEPNP_P3P)  # using P3P as suggested in the statement
    
    if not success:
        print("Pose estimation failed")
        return (np.empty(), {}) # maybe we could raise an error instead of returning this
    
    # r_vec needs to be converted into a 3x3
    R, _ = cv2.Rodrigues(r_vec)    
    camera_pose = np.hstack(R, t_vec)
    
    # now, inliers are the indices in pts_2d and pts_3d corresponding to the inliers; it is a 2D since openCV returns it as such, so we need to convert it in a 1D array to use np features
    inliers_idx = inliers_idx.flatten()
    new_state = state.copy()
    # by doing the following thing the idea is that we are only keeping the inliers
    new_state["P"] = state["P"][inliers_idx]
    new_state["X"] = state["X"][:, inliers_idx] # slice this since we want it as a 3xk
    
    return (new_state, camera_pose)


### Try triangulating
TODO: Check the angle between the the first observation ($X^i$ and $\Tau^i$) of candidate keypoints and the current observation ($C^i$ and current pose) - triangulate features using cv2.triangulatePoints() if angle is above threshold

### Find new features
TODO: use cv2.goodFeaturesToTrack() - see use in initialization for example

In [None]:
print("tstt")

### Add new features
TODO: if the feature is not already being tracked as an actual keypoint or candidate keypoint, update $C^i$ and $X^i$ with keypoint location and $\Tau^i$ with the current camera pose

In [None]:
def keypoints2set(keypoints: np.ndarray) -> set:
    """Convert numpy keypoint list [k x 1 x 2] to a set of keypoints

    Args:
        keypoints (np.ndarray): keypoint array

    Returns:
        set: keypoint set
    """
    return set([(row[0][0], row[0][1]) for row in keypoints.tolist()])

def set2keypoints(keypoint_set: set) -> np.ndarray:
    """Convert keypoint set (u, v) to a numpy array [k x 1 x 2]

    Args:
        keypoint_set (set): keypoint set

    Returns:
        np.ndarray: keypoint array with shape [k x 1 x 2]
    """
    return np.array([[[keypoint[0], keypoint[1]]] for keypoint in keypoint_set])

def addNewFeatures(params: VO_Params, S: dict, potential_candidate_features: np.ndarray, cur_pose: np.ndarray) -> dict:
    """Given an array of features, update S with featurees that are not already being tracked

    Args:
        params (VO_Params): params object for the dataset being used
        S (dict): state
        potential_candidate_features (np.ndarray): features extracted from current frame
        cur_pose (np.ndarray): pose of current frame

    Returns:
        dict: updated state
    """
    # setup
    cur = np.vstack((S["P"], S["C"]))
    new = potential_candidate_features
    n_new = new.shape[0]
    n_cur = cur.shape[0]

    # extend new features to a tensor where every row corrisponds to a new feature,
    # every column will be used to compare to existing features, and depth holds the x, y coords
    new_extended = new.repeat(n_cur, axis=1)

    # reshape current features so it can be subtracted from every column in the tensor of potential new points
    cur_reshaped = cur.reshape((1, n_cur, 2))

    # calculate the distances between every point pair (rows corrispond to new features, cols to cur features)
    dists = np.linalg.norm(new_extended - cur_reshaped, axis=2)

    # for every new point, find the distance to the closest current point
    min_dists = np.max(dists, axis=1)

    # create a mask, keeping only features that are greater than a minimum distance from any already tracked feature
    new_features_mask = np.where(min_dists > params.new_feature_min_dist, True, False)

    # mask the potential new features so only unique ones are kept
    new_features = new[new_features_mask]

    # append new features to current points, first observed points, and first observed camera pose
    S["C"] = np.vstack((S["C"], new_features))
    S["X"] = np.vstack((S["X"], new_features))
    S["T"] = np.vstack(S["T"], cur_pose.flatten()[None, :].repeat(len(new_features)))

    return S


# Plotting
_Thanks to our good friend ChatGPT_

In [None]:
plt.ion()

def initTrajectoryPlot(gt_path: np.ndarray, arrow_len: float = 0.3) -> Dict[str, object]:
    """
    Initialize trajectory plot with ground truth and placeholders
    for estimated trajectory and camera orientation.

    Args:
        gt_path (np.ndarray): Ground truth path [n x 2]
        arrow_len (float): Length of orientation arrow (meters)

    Returns:
        dict: plot state dictionary
    """
    fig, ax = plt.subplots(figsize=(8, 6))

    if DATASET in [0, 2]:
        # ---- Ground truth (time-colored) ----
        x_gt, y_gt = gt_path[:, 0], gt_path[:, 1]
        t = np.arange(len(gt_path))

        points = np.stack((x_gt, y_gt), axis=1).reshape(-1, 1, 2)
        segments = np.concatenate((points[:-1], points[1:]), axis=1)

        norm = mpl.colors.Normalize(vmin=t.min(), vmax=t.max())
        lc = LineCollection(segments, cmap="viridis", norm=norm, linewidth=2.5)
        lc.set_array(t)

        ax.add_collection(lc)
        cbar = fig.colorbar(lc, ax=ax)
        cbar.set_label("Time step (GT)")

    # ---- Estimated trajectory ----
    est_line, = ax.plot([], [], "r-", linewidth=2, label="VO estimate")
    est_point, = ax.plot([], [], "ro", markersize=5)

    # ---- Orientation arrow ----
    heading_arrow = FancyArrowPatch(
        (0.0, 0.0),
        (0.0, 0.0),
        arrowstyle="->",
        linewidth=2,
        color="red",
        mutation_scale=15,
    )
    ax.add_patch(heading_arrow)

    # ---- Formatting ----
    ax.set_title("Ground Truth vs VO Estimated Trajectory")
    ax.set_xlabel("x [m]")
    ax.set_ylabel("y [m]")
    ax.axis("equal")
    ax.grid(True, linestyle="--", alpha=0.5)
    ax.legend()

    ax.autoscale()
    plt.tight_layout()
    plt.show()

    return {
        "fig": fig,
        "ax": ax,
        "est_line": est_line,
        "est_point": est_point,
        "heading_arrow": heading_arrow,
        "arrow_len": arrow_len,
    }

In [None]:
def updateTrajectoryPlot(plot_state: Dict[str, object], est_path: np.ndarray, theta: float) -> None:
    """
    Update estimated trajectory and camera orientation arrow.

    Args:
        plot_state (dict): Plot state returned by initTrajectoryPlot
        est_path (np.ndarray): Estimated path [k x 2]
        theta (float): Current yaw angle (radians)
    """
    x = est_path[:, 0]
    y = est_path[:, 1]

    # Update trajectory
    plot_state["est_line"].set_data(x, y)
    plot_state["est_point"].set_data(x[-1], y[-1])

    # Update orientation arrow
    x0, y0 = x[-1], y[-1]
    L = plot_state["arrow_len"]
    x1 = x0 + L * np.cos(theta)
    y1 = y0 + L * np.sin(theta)

    plot_state["heading_arrow"].set_positions((x0, y0), (x1, y1))

    plt.pause(0.001)

# Real-Time Checking

## Bootstrapping

In [None]:
# extract features from the first image of the dataset
bootstrap_features_kf_1 = extractFeatures(params, params.bs_kf_1)

# track extracted features forward to the next keyframe in the dataset
bootstrap_tracked_features_kf_1, bootstrap_tracked_features_kf_2 = trackForwardBootstrap(params, bootstrap_features_kf_1)

# calculate the homographic transformation between the first two keyframes
homography, ransac_features_kf_1, ransac_features_kf_2 = ransacHomography(params, bootstrap_tracked_features_kf_1, bootstrap_tracked_features_kf_2)

# triangulate features from the first two keyframes to generate initial 3D point cloud
bootstrap_point_cloud = bootstrapPointCloud(params, homography, ransac_features_kf_1, bootstrap_tracked_features_kf_2)

# generate initial state
S = bootstrapState(P_i=bootstrap_tracked_features_kf_2, X_i=bootstrap_point_cloud)

## Looping

In [None]:
# ploting setup
plot_state = initTrajectoryPlot(ground_truth)
est_path = []
theta = 0.0

# initialize previous image
last_image = cv2.imread(images[params.start_idx], cv2.IMREAD_GRAYSCALE)

for i in range(params.start_idx + 1, last_frame + 1):

    # read in next image
    image_path = images[i]
    image = cv2.imread(image_path, cv2.IMREAD_GRAYSCALE)
    if image is None:
        print(f"Warning: could not read {image_path}")
        continue

    # track keypoints forward one frame
    S = trackForward(params, S, last_image, image)

    # estimate pose, only keeping inliers from PnP with RANSAC
    S, pose = estimatePose(params, S)

    # attempt triangulating candidate keypoints, only adding ones with sufficient baseline
    S = attemptTriangulating(params, S, pose)

    # find features in current frame
    potential_candidate_features = extractFeatures(params, image)

    # find which features are not currently tracked and add them as candidate features
    S = addNewFeatures(params, S, potential_candidate_features, pose)

    # plot current pose
    est_path.append(pose[:2, 3])
    theta = scipy.spatial.transform.Rotation.from_matrix(pose[:3, :3]).as_euler("xyz")[2]
    updateTrajectoryPlot(plot_state, np.asarray(est_path), theta)

    # update last image
    last_image = image

    # pause for 0.01 seconds
    cv2.waitKey(10)

cv2.destroyAllWindows()