# Visual Odometry
This notebook apply stereo depth estimation and multiple view geometry to track vehicle position through a sequencee of the images from kitti dataset.

In [2]:
import os
import cv2
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

from dotenv import load_dotenv
from dataclasses import dataclass

_ = load_dotenv()

In [None]:
from typing import Iterator, Optional


class DatasetHandler(object):
    def __init__(self) -> None:
        self.l_dr = os.getenv("IMG_L_DIR")
        self.r_dr = os.getenv("IMG_R_DIR")
        self.l_ls = sorted(os.listdir(self.l_dr), key=lambda x: int(x.split('.')[0]))
        self.r_ls = sorted(os.listdir(self.r_dr), key=lambda x: int(x.split('.')[0]))
        self._reset_frame()
        
        # Ground truth
        p = pd.read_csv(os.getenv("POSES_FILE"), delimiter=' ', header=None)
        self.gt: np.ndarray = p.values.reshape((-1, 3, 4))

        # Camera intrinsic matrix, and lidar transform matrix
        c = pd.read_csv(os.getenv("CALIB_FILE"), delimiter=' ', header=None, index_col=0)
        self.K0, self.K1, self.Tr = map(
            lambda x: np.array(c.loc[x, :]).reshape((3, 4)), ('P0:', 'P1:', 'Tr:')
        )

        self.frame_num: int = len(self.l_ls)
        self.frame_idx: int = 1
        self.l_image_1: cv2.Mat = next(self.l_images)


    def frame(self) -> tuple:
        pass

    def next_frame(self) -> None:
        self.l_image_0 = self.l_image_1
        self.r_image_0 = next(self.r_images)
        self.l_image_1 = next(self.l_images)

        self.lidar_t = next(self.lidar)
    
    def stop(self) -> bool:
        return self.frame_num == self.frame_idx
    
    def imsize(self) -> tuple[int, int]:
        return self.l_image_1.shape[0], self.l_image_1.shape[1]

    def _reset_frame(self) -> None:
        self.l_images: Iterator[cv2.Mat] = (
            cv2.imread(os.path.join(self.l_dr, file)) 
            for file in self.l_ls
        )
        self.r_images: Iterator[cv2.Mat] = (
            cv2.imread(os.path.join(self.r_dr, file)) 
            for file in self.r_ls
        )
        self.lidar: Iterator[np.ndarray] = (
            np.fromfile(os.path.join("", file))
            for file in self._ls
        )

In [None]:
@dataclass
class Config(object):
    lidar_correct: bool
    stereo_matcher: Optional[str] = None

    P0: np.ndarray
    P1: np.ndarray
    Tr: np.ndarray

    feature_detector: str = 'orb'
    feature_matcher: str = 'bf'
    matching_threshold: Optional[float] = None


class VisualOdometry(object):
    def __init__(self, config: Config) -> None:
        self.config: Config = config

        ## camera coefficient
        self.P0 = self.config.P0
        self.P1 = self.config.P1
        self.Tr = self.config.Tr
        self.K0, self.R0, self.t0, *_ = cv2.decomposeProjectionMatrix(self.P0)
        self.K1, self.R1, self.t1, *_ = cv2.decomposeProjectionMatrix(self.P1)
        self.f = self.K0[0, 0]
        self.b = self.t1[0] - self.t0[0]
        self.ih, self.iw = 0, 0
        self.cx, self.cy = self.K0[0, 2], self.K0[1, 2]
        self.fx, self.fy = self.K0[0, 0], self.K0[1, 1]

        ## stereo matcher
        self._stereo_matcher: Optional[cv2.StereoBM | cv2.StereoSGBM] = None
        if self.config.stereo_matcher == 'sgbm':
            self._stereo_matcher = cv2.StereoSGBM(
                numDisparities=96,
                minDisparity=0,
                blockSize=11,
                P1 = 864, P2 = 3456,
                mode=cv2.STEREO_SGBM_MODE_SGBM_3WAY
            )
        elif self.config.stereo_matcher == 'bm':
            self._stereo_matcher = cv2.StereoBM(numDisparities=96, blockSize=11)

        ## feature detection and matching
        det_map: dict[str, cv2.ORB | cv2.SIFT] = {
            'orb': cv2.ORB.create,
            'sift': cv2.SIFT.create, 
        }
        self._feature_detector: cv2.SIFT | cv2.ORB = det_map[self.config.feature_detector]()

        if self.config.feature_matcher == 'bf':
            self._feature_matcher = cv2.BFMatcher.create(
                cv2.NORM_L2 if self.config.feature_detector == 'sift' else cv2.NORM_HAMMING2,
                crossCheck=False
            )
        else:
            self._feature_matcher = cv2.FlannBasedMatcher(
                dict(algorithm=1, trees=5), 
                dict(checks=50)
            )
        self.threshold = self.config.matching_threshold

    def update(
        self,
        image_l0: np.ndarray,
        image_r0: np.ndarray,
        image_l1: np.ndarray,
        point_clould: Optional[np.ndarray] = None
    ) -> None:
        
        depth: Optional[np.ndarray] = None
        if self._stereo_matcher is not None:
            depth = self._depth_estimate(image_l0, image_r0, point_clould)

        ## feature extraction and matching
        kp0, des0 = self._feature_detector.detectAndCompute(image_l0, None)
        kp1, des1 = self._feature_detector.detectAndCompute(image_l1, None)

        m = self._features_matching(des0, des1)
        R, t = self._motion_estimate(kp0, kp1, m, depth)

        
    def _depth_estimate(
        self, 
        image_l0: np.ndarray,
        image_r0: np.ndarray,
        point_clould: Optional[np.ndarray] = None
    ) -> np.ndarray:
        disp = self._stereo_matcher.compute(image_l0, image_r0).astype(np.float32) / 16
        disp = np.where((disp == 0.0) | (disp == -1.0), 0.1, disp)
        
        depth = (self.f * self.b) / disp
        if not point_clould or not self.config.lidar_correct:
            return depth
        
        ## lidar depth correction
        pcl = point_clould[point_clould[:, 0] > 0]
        pcl = np.hstack([pcl[:, :3], np.ones(pcl.shape[0]).reshape((-1, 1))])
        cam = self.Tr @ pcl.T
        cam = cam[:, cam[2] > 0]
        dep = cam[2].copy()
        cam /= cam[2]
        cam = np.vstack([cam, np.ones(cam.shape[1])])

        project = self.P0 @ cam
        pixel_coord = project.T.round(0)[:, :2].astype(np.int32)

        indices = np.where(
            (pixel_coord[:, 0] < self.iw) &
            (pixel_coord[:, 1] < self.ih) &
            (pixel_coord[:, 0] >= 0) &
            (pixel_coord[:, 1] >= 0)
        )
        pixel_coord = pixel_coord[indices]
        dep = dep[indices]

        depth_map = np.zeros((self.ih, self.iw))
        for i, (u, v) in enumerate(pixel_coord):
            if u < 0 or u >= self.iw or v < 0 or v > self.ih:
                continue
            depth_map[v, u] = depth[i]
    
        depth_map[depth_map == 0.0] = 3000

        indices = np.where(depth_map < 3000)
        depth[indices] = depth_map[indices]
    
        return depth

    def _features_matching(
        self,
        des1: np.ndarray,
        des2: np.ndarray,
    ) -> list[cv2.DMatch]:
        matches = self._feature_matcher.knnMatch(des1, des2, 2)
        matches = sorted(matches, key=lambda x: x[0].distance)

        return [
            m for m, n in matches 
            if not self.threshold or m.distance <= n.distance * self.threshold
        ]
    
    def _motion_estimate(
        self,
        kp1: list[cv2.KeyPoint],
        kp2: list[cv2.KeyPoint],
        matches: list[cv2.DMatch],
        depth: Optional[np.ndarray] = None,
        depth_max: int = 3000
    ) -> tuple[np.ndarray, np.ndarray]:
        R: np.ndarray = np.eye(3)
        t: np.ndarray = np.zeros((3, 1))

        pts_1 = np.float32([kp1[m.queryIdx].pt for m in matches])
        pts_2 = np.float32([kp2[m.trainIdx].pt for m in matches])

        if depth is None: # apply essential matrix decomposition, scale ambiguous
            E = cv2.findEssentialMat(pts_1, pts_2, self.K0)
            _, R, t, _ = cv2.recoverPose(E, pts_1, pts_2, self.K0)
            return R, t
        
        ## convert points in image1 into world coordinate with camera parameter and depth information,
        ## and apply PnP algorithm with RANSAC for robustness to outlier
        pts_w = np.zeros((0, 3))
        remove_idx: list[int] = []

        for i, (u, v) in enumerate(pts_1):
            z = depth[int(v), int[u]]
            if z > depth_max:
                remove_idx.append(i)
                continue

            x = z * (u - self.cx) / self.fx
            y = z * (u - self.cy) / self.fy
            pts_w = np.vstack([pts_w, np.array([x, y, z])])

        pts_2 = np.delete(pts_2, remove_idx, 0)
        _, R_vec, t, _ = cv2.solvePnPRansac(pts_w, pts_2, self.K0, None)
        R = cv2.Rodrigues(R_vec)[0]
        return R, t


In [None]:
from tqdm import tqdm


def pipeline(config) -> VisualOdometry:
    
    handler = DatasetHandler()
    h, w = handler.imsize()
    
    vo = VisualOdometry()
    for _ in tqdm(range(handler.frame_num - 1)):
        image0, image1, image2, pc = handler.next_frame()

        vo.update(image0, image1, image2, pc)