In [1]:
import os
import numpy as np
import cv2
from tqdm import tqdm

from lib.visualization import plotting
from lib.visualization.video import play_trip

In [22]:
class VisualOdometry():
    def __init__(self, data_dir):
        self.K, self.P = self.load_calib(os.path.join(data_dir, 'calib.txt'))
        self.gt_poses = self.load_poses(os.path.join(data_dir, 'poses.txt'))
        self.images = self.load_images(os.path.join(data_dir, 'image_l'))
        self.orb = cv2.ORB_create(3000)
        FLANN_INDEX_KTREE = 6
        index_params = dict(algorithm=FLANN_INDEX_KTREE, table_number=6, key_size=12,
                           multi_probe_level=1)
        search_params = dict(checks=50)
        self.flann = cv2.FlannBasedMatcher(indexParams=index_params, 
                                          searchParams=search_params)
        
    @staticmethod
    def load_calib(filepath):
        with open(filepath, 'r') as f:
            params = np.fromstring(f.readline(), dtype=np.float64, sep=' ')
            P = np.reshape(params, (3, 4))
            K = P[0:3, 0:3]
        return K, P
    
    @staticmethod
    def load_poses(filepath):
        poses = []
        with open(filepath, 'r') as f:
            for line in f.readlines():
                T = np.fromstring(line, dtype=np.float64, sep=' ')
                T = T.reshape(3, 4)
                T = np.vstack((T, [0, 0, 0, 1]))
                poses.append(T)
        return poses
    
    @staticmethod
    def load_images(filepath):
        image_paths = [
            os.path.join(filepath, file) for file in sorted(os.listdir(filepath))
        ] 
        return [cv2.imread(path, cv2.IMREAD_GRAYSCALE) for path in image_paths]
    
    
    @staticmethod
    def transform_matrix(R, t):
        T = np.eye(4, dtype=np.float64)
        T[:3,:3] = R
        T[:3, 3] = t
        return T
    
    def get_matches(self, i):
        
        # Detects and computes keypoints 
        
        kp1, des1 = self.orb.detectAndCompute(self.images[i-1], None)
        kp2, des2 = self.orb.detectAndCompute(self.images[i], None)
        
        matches = self.flann.knnMatch(des1, des2, k=2)
        
        good = []
        try:
            for m, n in matches:
                if m.distance < 0.8 * n.distance:
                    good.append(m)
        except:
            pass
        
        draw_params = dict(
        matchColor=-1, singlePointColor=None, matchesMask=None,
        flags=2)
        
        img = cv2.drawMatches(self.images[i], kp1, self.images[i-1],
                             kp2, good, None, **draw_params)
        cv2.imshow("image", img)
        cv2.waitKey(200)
        
        # Get image points from the good matches
        
        q1 = np.float32([kp1[m.queryIdx].pt for m in good])
        q2 = np.float32([kp2[m.trainIdx].pt for m in good])
        
        return q1, q2
    
    def get_pose(self, q1, q2):
        
        # Calculates transformation matrix
        
        E, _ = cv2.findEssentialMat(q1, q2 ,self.K, threshold=1)
        
        R, t = self.decomp_essential_mat(E, q1, q2)
        
        transformation_matrix = self.transform_matrix(R, np.squeeze(t))
        
        return transformation_matrix
    
    def decomp_essential_mat(self, E, q1, q2):
        
        # Returns correct R and t matrices
        
        def sum_z_cal_relative_scale(R, t):
            T = self.transform_matrix(R, t)
            
            # Turn into projection matrix
            P = np.matmul(np.concatenate((self.K, np.zeros((3,1))), axis=1), T)
            
            # Triangulate to 3D 
            hom_Q1 = cv2.triangulatePoints(self.P, P, q1.T, q2.T)
            
            # Also seen from frame 2
            hom_Q2 = np.matmul(T, hom_Q1)
            
            # Un-homogenize
            uhom_Q1 = hom_Q1[:3, :] / hom_Q1[3, :]
            uhom_Q2 = hom_Q2[:3, :] / hom_Q2[3, :]
            
            # Find the number of points that has positive z coordinate in both cameras
            
            sum_pos_z_Q1 = sum(uhom_Q1[2, :] > 0)
            sum_pos_z_Q2 = sum(uhom_Q2[2, :] > 0)
            
            # Form point pairs and calculate the relative scale
            
            relative_scale = np.mean(np.linalg.norm(uhom_Q1.T[:-1] - uhom_Q1.T[1:], axis=-1)/
                                     np.linalg.norm(uhom_Q2.T[:-1] - uhom_Q2.T[1:], axis=-1))
            return sum_pos_z_Q1 + sum_pos_z_Q2, relative_scale
        
        R1, R2, t = cv2.decomposeEssentialMat(E)
        t = np.squeeze(t)
        
        pairs = [[R1, t], [R1, -t], [R2, t], [R2, -t]]
        
        z_sums = []
        relative_scales = []
        for R, t in pairs:
            z_sum, scale = sum_z_cal_relative_scale(R, t)
            z_sums.append(z_sum)
            relative_scales.append(scale)
        
        idx = np.argmax(z_sum)
        right_pair = pairs[idx]
        relative_scale = relative_scales[idx]
        R1, t = right_pair
        t = t * relative_scale
        
        return [R1, t]        
            

In [23]:
def main():
    data_dir = 'KITTI_sequence_1'
    vo = VisualOdometry(data_dir)
    
    play_trip(vo.images)
    
    gt_path = []
    est_path = []
    for i, gt_pose in enumerate(tqdm(vo.gt_poses, unit='pose')):
        if i == 0:
            cur_pose = gt_pose
        else:
            q1, q2 = vo.get_matches(i)
            trans = vo.get_pose(q1, q2)
            cur_pose = np.matmul(cur_pose, np.linalg.inv(trans))
        gt_path.append((gt_pose[0,3], gt_pose[2,3]))
        est_path.append((cur_pose[0,3], cur_pose[2,3]))
    plotting.visualize_paths(gt_path, est_path, "Visual Odometry",
                            file_out=os.path.basename(data_dir)+".html")
        

In [24]:
if __name__ == "__main__":
    main()

100%|████████████████████████████████████████████████████████████████████████████████| 51/51 [00:20<00:00,  2.49pose/s]
