In [None]:
import os
import cv2
import numpy as np
from glob import glob
from os.path import join
import time
import math
import operator

In [None]:
"""
KNN Implementation
"""
def eucledianDistance(desc1, desc2):
    distance = 0
    for x in range(len(desc1)):
        distance += pow(desc1[x] - desc2[x], 2)

    return math.sqrt(distance)


# Eucledian distance matcher, custom/better version
def EUBetter(desc1, desc2, distances, iteration):
    desc1 = np.array(desc1)
    desc2 = np.array(desc2)
    
    distance = desc1 - desc2
    distance = np.power(distance, 2)
    distance = np.sum(distance, axis = 1)
    distance = np.sqrt(distance)
    distance_indices = np.argsort(distance)
    
    k_match = []
    for i in range(2):
        k_match.append(cv2.DMatch(iteration, distance_indices[i], distance[distance_indices[i]]))
    
    return k_match

def KNNMatch(desc1, desc2, k =2): # query, # train
    matches = []
    for i in range(len(desc1)):
        distances = []
        
        t1 = time.time()
        k_match = EUBetter(desc1[i], desc2, distances, i)
        matches.append(k_match)
    
    return matches

In [None]:
import open3d as o3d

class SfMSolver(object):
    
    def __init__(self, img_pattern, intrinsic, output_dir, downscale = 1):
        """   
        img_pattern: regex pattern used by glob to read the files
        """
        self.img_pattern = img_pattern
        self.K_orig = self.intrinsic_orig = intrinsic.copy()
        self.output_dir = output_dir
        self.downscale = downscale
        self.rescale_intrinsic()
    
    def rescale_intrinsic(self):
        """ 
        if we downscale the image, the intrinsic matrix also need to be changed"""
        
        start = time.time()
        # scale focal length and principal points w.r.t image processing
        if self.downscale > 1:
            self.K = self.K_orig.copy()
            self.K[0, 0] /= float(self.downscale)
            self.K[1, 1] /= float(self.downscale)
            self.K[0, 2] /= float(self.downscale)
            self.K[1, 2] /= float(self.downscale)
            self.intrinsic = self.K
        else:
            self.K = self.intrinsic = self.K_orig.copy()
        elapsed = time.time() - start

    def load_images(self):
        """  
        Loads a set of images to self.imgs list
        """
        start = time.time()
        self.img_paths = sorted(glob(self.img_pattern))
        self.imgs = []
        for idx, img_path in enumerate(self.img_paths):
            try:
                img = cv2.imread(img_path)
                if self.downscale > 1:
                    img = cv2.resized(img, (0, 0), 
                                      fx = 1/float(self.downscale), fy= 1/float(self.downscale),
                                      interpolation = cv2.INTER_LINEAR)
            except Exception as e:
                print("Error loading img: %s" % (img_path))
            
            if img is not None:
                self.imgs.append(img)
                print("loaded img %d size=(%d, %d): %s" % 
                      (idx, img.shape[0], img.shape[1], img_path))
        elapsed = time.time() - start
        
        
    def visualize_matches(self, img1, img2,
                          kp1, kp2, good, 
                          mask = None, save_path = None):
        start = time.time()
        draw_params = dict(matchColor = (0, 255, 0),
                           singlePointColor = None,
                           flags = 2)
        if mask is not None:
            if not isinstance(mask, list):
                matchesMask = mask.ravel().tolist()
            else:
                matchesMask = mask
            draw_matches['matchesMask'] = matchesMask
        img_matches = cv2.drawMatches(
            img1, kp1, img2, kp2, good, None, **draw_params
        )
        cv2.imwrite(save_path, img_matches)
        elapsed = time.time() - start
        
    
    def drawlines(self, img1, img2, lines, pts1, pts2, line_num = None):
        """ 
        Draw line connecting points in two images"""
        start = time.time()
        if img1.ndim ==2:  # Gray
            img1 = cv2.cvtColor(img1, cv2.COLOR_GRAY2BGR)
            img2 = cv2.cvtColor(img2, cv2.COLOR_GRAY2BGR)
            r, c = img1.shape
        else:  # 3
            r, c, _ = img1.shape
        if line_num is not None:
            draw_list = np.random.choice(
                pts1.shape[0], line_num, replace = False
            )
        else:
            draw_list = np.arange(pts1.shape[0])
        
        for ifx, (r, pt1, pt2) in enumerate(zip(lines, pts1, pts2)):
            if idx not in list(draw_list):
                continue
            color = tuple(np.random.randint(0, 255, 3).tolist())
            x0, y0 = map(int, [0, -r[2]/r[1]])
            x1, y1 = map(int, [c, -(r[2]+r[0]*c)/r[1]])
            
            pt1_int = (int(tuple(pt1.ravel())[0]), int(tuple(pt1.ravel())[1]))
            pt2_int = (int(tuple(pt2.ravel())[0]), int(tuple(pt2.ravel())[1]))
            
            img1 = cv2.line(img1, (int(x0), int(y0)), (int(x1), int(y1)), color, 1)
            img1 = cv2.circle(img1, pt1_int, 5, color, -1)
            img2 = cv2.circle(img2, pt2_int, 5, color, -1)
        
        elapsed = time.time() - start
        
        return img1, img2
    
    
    def visualize_epipolar_lines(self, img1, img2, p1, p2, E, save_path):
        start = time.time()
        # get fundamental matrix
        F, mask_fdm = cv2.findFundamentalMat(p1, p2, cv2.RANSAC)
        p1_selected = p1[mask_fdm.ravel() == 1]
        p2_selected = p2[mask_fdm.ravel() == 1]
        
        # draw lines
        lines1 = cv2.computeCorrespondEpilines(
            p2_selected.reshape(-1, 1, 2), 2, F).reshape(-1, 3)
        img5, _ = self.drawlines(
            img1, img2, lines1, p1_selected, p2_selected, 100
        )
        
        lines2= cv2.computeCorrespondEpilines(
            p1_selected.reshape(-1, 1, 2), 1, F).reshape(-1, 3)
        
        img3, _ = self.drawlines(
            img2, img1, lines2, p2_selected, p1_selected, 100
        )
        canvas = np.concatenate((img5, img3), axis = 1)
        cv2.imwrite(save_path, canvas)
        elapsed = time.time() - start
    
    def write_simple_obj(self, mesh_v, mesh_f, filepath, verbose = False):
        """ 
        Saves 3D points which can be read in meshlab
        """
        start = time.time()
        with open(filepath, 'w') as fp:
            for v in mesh_v:
                fp.write('v %f %f %f\n' % (v[0], v[1], v[2]))
            if mesh_f is not None:
                for f in mesh_f+1: # Faces are 1-based, not 0-based in obj files
                    fp.write('f %d %d %d\n' % (f[0], f[1], f[2]))
        if verbose:
            print('mesh saved to: ', filepath)
        elapsed = time.time()
    
    
    def write_simple_pcd(self, point_3d, filepath, verbose = False):
        """ 
        Saves 3D points which can be read in read in meshlab
        """
        pcd = o3d.geometry.PointCloud()
        pcd.points = o3d.utilty.Vector3dVector(point_3d)
        pcd.colors = o3d.utility.Vector3dVector(np.zeros_like(point_3d))
        
        o3d.io.write_point_cloud(filepath, pcd)
    
    
    def detect_and_match_feature(self, img1, img2):
        start = time.time()
        sift = cv2.xfeatures2d.SIFT_create() # create SIFT object
        kp1, desc1 = sift.detectAndCompute(img1, None) # Detect keypoints and find descriptors of first image
        start1 = time.time()
        kp2, desc2 = sift.detectAndCompute(img2, None)
        print(np.shape(desc1))
        print(np.shape(desc2))
        
        
        start2 = time.time()
        start4 = time.time()
        matches = KNNMatch(desc1, desc2, k = 2)
        
        elapsed2 = time.time() - start4
        print("Time for detect and match feature description {}".format(elapsed2))
        
        matches_good = []
        
        start3 = time.time()
        for m, n in matches:
            if m.distance < 0.7 * n.distance: # Peform ratio test to select good feature matches
                matches_good.append(m)
        elapsed3 = time.time() - start3
        
        p1 = np.float32([kp1[m.queryIdx].pt for m in matches_good]).reshape(-1, 1, 2)  # descriptors that pass ratio test
        p2 = np.float32([kp2[m.trainIdx].pt for m in matches_good]).reshape(-1, 1, 2)
        elapsed = time.time() - start
        
        return p1, p2, matches_good, kp1, kp2
    
    def compute_essential(self, p1, p2):
        start = time.time()
        E, mask = cv2.findEssentialMat(p1, p2, self.intrinsic)  # Find essential matrix
        
        elapsed = time.time() - start
        
        return E, mask
    
    
    def compute_pose(self, p1, p2, E):
        start = time.time()
        retval, R, trans, mask = cv2.recoverPose(E, p1, p2, self.intrinsic)  # recovered relative rotation and translation given essential matrix and p1, p2
        
        elapsed = time.time() - start
        return R, trans
    
    def triangulate(self, p1, p2, R, trans, mask):
        
        start = time.time()
        matchesMask = mask.ravel().tolist() # Use mask to remove outliers
        p1 = p1[np.array(matchesMask)== 1, : , :]
        p2 = p2[np.array(matchesMask)== 1, :, :]
        
        P1 = cv2.undistortPoints(p1, self.intrinsic, None) # Convert image coords to normalized coords for first image
        P2 = cv2.undistortPoints(p2, self.intrinsic, None)
        
        I = np.identity(3) # Rotation of first camera. Identity as origin is at first camera
        z = np.zeros((3, 1)) # Translation of first camera. Zero as origin is at first camera
        
        projMatr1 = np.concatenate((I, z), axis = 1) # Calculate matrix of extrinsic parameters( [R t]) of first camera
        
        projMatr2 = np.concatenate((R, trans), axis = 1) # Calculate matrix of extrinsic parameter ([ R t]) of second camera
        
        points_4d_hom = cv2.triangulatePoints(projMatr1, projMatr2, P1, P2) # Homogeneous coordinate
        
        points_4d = points_4d_hom / np.tile(points_4d_hom[-1, :], (4, 1))
        points_3d = points_4d[:3, :].T # Take first three coordinates (3D points)
        
        elapsed = time.time()
        
        return points_3d
        
    def run(self):
        
        self.load_images()
        
        # pair processing
        
        # step1 and 2 : detect and match feature
        p1, p2, matches_good, kp1, kp2 = self.detect_and_match_feature(
            self.imgs[1], self.imgs[2]
        )
        
        self.visualize_matche(
            self.imgs[1], self.imgs[2], kp1, kp2, matches_good,
            save_path = join(self.output_dir, 'sift_match_01_7.png')
        )
        
        # step3: compute essential matrix
        E, mask = self.compute_essential(p1, p2)
        
        self.visualize_matches(
            self.imgs[1], self.imgs[2], kp1, kp2, matches_good, mask = mask,
            save_path=join(self.output_dir, 'inlier_match_01_7.png')
        )
        
        self.visualize_epipolar_lines(
            self.imgs[1], self.imgs[2], p1, p2, E,
            save_path = join(self.output_dir, 'epipolar_lines_01_7.png')
        )
        
        # Step 4: recover pose
        R, trans = self.compute_pose(p1, p2, E)
        # step 5: Triangulation
        point_3d = self.triangulate(p1, p2, R, trans, mask)
        
        self.write_simple_pcd(point_3d, join(self.output_dir, 'temple1.pcd'))
        
    
def safe_mkdir(file_dir):
    if not os.path.exists(file_dir):
        os.mkdir(file_dir)

def intrinsic_reader(txt_file):
    with open(txt_file) as f:
        lines = f.readlines()
    
    return np.array(
        [l.strip(' ') for l in lines], 
        dtype = np.float32
    )
    
            

In [None]:
def main():
    
    img_pattern = ""
    intrinsic = ""
    output_dir = './output'
    safe_mkdir(output_dir)
    
    sfm_solver = SfMSolver(img_pattern, intrinsic, output_dir, downscale = 2)
    sfm_solver.run()

if __name__ == '__main__':
    main()

### Visualize

In [None]:
import open3d as o3d

pcd_path = "output/temple1.pcd"
pcd = o3d.io.read_point_cloud(pcd_path)

o3d.visualization.draw_geometries([pcd])