In [1]:
import sys
sys.executable

%matplotlib inline

In [2]:
import os
import argparse
import statistics
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
sns.set(rc={'figure.figsize':(16,10)})

import torch

from tqdm import tqdm
from scipy.spatial.transform import Rotation
from pytorch3d.transforms import matrix_to_quaternion

import poselib

from solver import Up2P, Solver
from SolverPipeline import P3PBindingWrapperPipeline
from SolverPipeline import SolverPipeline as SP

In [3]:
VSP = "dataset/StMarysChurch_matches"
seq = [3, 5, 13]
SHOW_SINGLE = False

In [4]:
class Config:
    ransac_thresh = 13.
    max_side_length = 320
    max_ransac_iters = 10000
    
conf = Config()

In [5]:
p3pwrapper = P3PBindingWrapperPipeline(
    ransac_conf = {
       # 'max_reproj_error': args.ransac_thresh
       'min_iterations': min(100, conf.max_ransac_iters),
       'max_iterations': conf.max_ransac_iters,
       'progressive_sampling': True,
       'max_prosac_iterations': conf.max_ransac_iters
    },
    
    bundle_adj_conf = {
        'loss_scale' : 1.0,
    }                                              
)

In [6]:
camera = {'model': 'SIMPLE_RADIAL', 'width': 320, 'height': 180, 'params': [277.4716064453125, 160.0, 90.0, 0.0]}
pts2D = [
    np.array([192.12533569,  19.14378548]),
    np.array([91.60398102, 26.73556519]),
    np.array([180.32232666,  33.99654388]),
    np.array([192.33743286,  37.74715424]),
    np.array([188.43441772,  41.1788559 ])
]

pts3D = [
    np.array([ 11.86180782, -14.56327057,  -0.92378181]),
    np.array([ 6.79015875, -9.56949902, -1.78533459]),
    np.array([11.95058823, -0.89410073, -0.36948705]), 
    np.array([ 12.17275715, -13.31939125,  -0.34633577]),
    np.array([ 7.56372643, -2.60536647, -2.24980545])
]

In [7]:
class Sampler:
    
    def __call__(self, pts: np.array, sample_size: int):
        n = len(pts)
        assert n > sample_size
        
        idcs = np.random.choice(n, sample_size)
        
        return pts[idcs], idcs
    
sampler = Sampler()

In [53]:
from typing import Tuple, Dict

class Camera:
    
    def __init__(self,
                 w: int,
                 h: int,
                 f: float,
                 cc: Tuple[int, int]
                ) -> None:
        self.f = f
        self.cx, self.cy = cc
        self.w = w
        self.h = h
        
    def pix2cam(self, x: np.ndarray):
        assert x.ndim == 2
        
        x = np.concatenate([x, np.ones(x.shape[0])[:, np.newaxis]], axis=1)
        x[:, 0] -= self.cx
        x[:, 1] -= self.cy
        x[:, :2] = x[:, :2] / self.f
        x /= np.linalg.norm(x)
        
        return x
    
    def cam2pix(self, x: np.ndarray):
        assert x.ndim == 2
            
        x[:, :2] /= x[:, 2]
        x[:, :2] = x[:, :2] * self.f
        x[:, 0] += self.cx
        x[:, 1] += self.cy
        
        return x
    
    @staticmethod
    def from_camera_dict(camera: Dict):
        w, h = camera["width"], camera["height"]
        params = camera["params"]
        f, cx, cy, _ = params
        
        return Camera(w, h, f, (cx, cy))

In [54]:
class DisplacementRefinerSolver(Solver):
    
    def __init__(self, min_sample_size: int = 100, models_to_evaluate: int = 3, verbose: bool = False):
        self.min_sample_size = min_sample_size
        self.models_to_evaluate = models_to_evaluate
        self.internal_solver = Up2P()
        self.verbose = verbose
        self.camera: Camera = None
    
    def get_sample_size(self) -> int:
        return self.min_sample_size
    
    def __call__(self, x, X, camera_dict):
        # assert x.shape == (self.min_sample_size, 3)
        # assert X.shape == (self.min_sample_size, 3)
        assert x.shape[0] == X.shape[0]
        
        self.camera = Camera.from_camera_dict(camera_dict)
        x = self.camera.pix2cam(x)
        
        idcs = np.random.choice(len(X), self.internal_solver.get_sample_size() + 1)
        xx, XX = x[idcs[:idcs.shape[0] - 1]], X[idcs[:idcs.shape[0] - 1]]
            
        solv_res = self.internal_solver(xx, XX)
            
        err, Rf, tf = None, None, None
        for sol in solv_res:
            R, t = sol
            R, t = R.detach().cpu().numpy(), t.detach().cpu().numpy()
         
            gts, projs = [], []
            for (xx, XX) in zip(x, X):
                proj = R.T @ (XX - t)
                proj = self.camera.cam2pix(proj[None, :])[0]
                
                gts.append(xx)
                projs.append(proj)
                    
            _, angles = self._get_rot_angles(np.stack(gts)[:, 2], np.stack(projs)[:, :2])
                
            deg_angles = [angle.as_euler("XYZ", degrees=True)[2] for angle in angles]

            if self.verbose:
                plt.hist(deg_angles, density=True, color='black', bins=np.arange(-180, 180, 5))
                plt.xticks(range(-180, 180, 5))
                plt.show()
                
            counts = np.bincount([angle + 180.0 for angle in deg_angles])
            prerotate_with = angles[np.argmax(counts)].as_matrix()
                
            Xs = np.array([prerotate_with.T @ XX for XX in X.copy()])
            iidcs = np.random.choice(len(X), self.internal_solver.get_sample_size() + 1)
            internal_x, internal_X = x[iidcs], Xs[iidcs]
                
            inner_solv_res = self.internal_solver(
                internal_x[:internal_x.shape[0] - 1],
                internal_X[:internal_X.shape[0] - 1],
            )
                
            ierr, IR, It = None, None, None
            for iR, it in inner_solv_res:
                rp = R.T @ (internal_X[internal_X.shape[0] - 1] - t)
                rp = self.camera.cam2pix(rp[None, :])[0]
                    
                cerr = np.linalg.norm(internal_x[internal_x.shape[0] - 1] - rp)
                if ierr is None or cerr < ierr:
                    ierr = cerr
                    IR, It = iR, it
                   
            R = prerotate_with @ IR.detach().cpu().numpy()
            
            rp = R.T @ (XX[idcs.shape[0] - 1] - t)
            rp[:2] /= rp[2]
            rp[:2] *= f
            rp[0] += cx
            rp[1] += cy
                        
            cerr = np.linalg.norm(xx[idcs.shape[0] - 1] - rp)
            if err is None or cerr < err:
                err = cerr
                Rf, tf = R, t
                
        pose = poselib.CameraPose()
        try:
            pose.q = matrix_to_quaternion(torch.tensor(Rf))
            pose.t = tf

            return pose
        except:
            return None
                
                    
    def _get_rot_angles(self, gt: np.array, proj: np.array):
        centers = []
        indexes = []
        angles = []
        for _ in range(1000):
            idcs = np.random.choice(len(gt), 2)
            indexes.append(idcs)

            gt1, proj1 = gt[idcs[0]], proj[idcs[0]]
            gt2, proj2 = gt[idcs[1]], proj[idcs[1]]

            c = self._get_center_of_rotation(gt1, proj1, gt2, proj2)
            centers.append(c)
            
        mean_c = np.array([np.median([elm[0] for elm in centers]), np.median([elm[1] for elm in centers])])

        for i in range(1000):
            idcs = indexes[i]

            gt1, proj1 = gt[idcs[0]], proj[idcs[0]]
            gt2, proj2 = gt[idcs[1]], proj[idcs[1]]

            try:
                angle = self._get_rotation(gt1, proj1, gt2, proj2, mean_c)
            except Exception as ex:
                continue

            angles.append(angle)

        return centers, angles            
            
    
    def _get_intersect(self, a1, a2, b1, b2):
        """ 
        Returns the point of intersection of the lines passing through a2,a1 and b2,b1.
        a1: [x, y] a point on the first line
        a2: [x, y] another point on the first line
        b1: [x, y] a point on the second line
        b2: [x, y] another point on the second line
        """
        s = np.vstack([a1,a2,b1,b2])        # s for stacked
        h = np.hstack((s, np.ones((4, 1)))) # h for homogeneous
        l1 = np.cross(h[0], h[1])           # get first line
        l2 = np.cross(h[2], h[3])           # get second line
        x, y, z = np.cross(l1, l2)          # point of intersection
        if z == 0:                          # lines are parallel
            return (float('inf'), float('inf'))
        return (x/z, y/z)
    
    def _get_norm_of_disp(self, gt, proj):
        vec = (proj - gt)
        return (-vec[1], vec[0])
    
    def _get_center_of_rotation(self, gt1, proj1, gt2, proj2):
        n1, n2 = self._get_norm_of_disp(gt1, proj1), self._get_norm_of_disp(gt2, proj2)

        first_center = (gt1 + proj1) / 2
        second_center = (gt2 + proj2) / 2

        c = self._get_intersect(
            first_center,
            first_center + n1,
            second_center,
            second_center + n2,
        )


        return c
    
    def _get_rotation(self, gt1, proj1, gt2, proj2, c):
        cgt1 = gt1 - c
        cproj1 = proj1 - c

        cgt2 = gt2 - c
        cproj2 = proj2 - c


        res = Rotation.align_vectors(
            a=np.array(
                [[*cgt1, 0],
                 [*cgt2, 0]]
            ),
            b=np.array(
                [
                    [*cproj1, 0],
                    [*cproj2, 0]
                ]
            )
        )

        return res[0]

ref = DisplacementRefinerSolver(verbose=False)    

In [58]:
orientation_errors, pose_errors = [], []
for s in seq:
    p = f"{VSP}/seq{s}"
    for f in tqdm(os.listdir(p)):
        if f.split(".")[1] != "npy":
            continue
        pe, oe = process_file(ref, f"{p}/{f}", conf, camera_dict, gt_dict)
        print(pe, oe)
        pose_errors.append(pe)
        orientation_errors.append(oe)
        break
    break

  res = Rotation.align_vectors(
  0%|          | 0/99 [00:00<?, ?it/s]


UFuncTypeError: ufunc 'multiply' did not contain a loop with signature matching types (dtype('float64'), dtype('<U22')) -> None

In [34]:
res = ref(
    np.array(pts2D),
    np.array(pts3D),
    camera
)
res, type(res)

KeyError: 'width'

In [11]:
class UP2PSolverPipeline(SP):
    
    def __init__(self, num_models_to_eval: int = 100, verbose: bool = False):
        self.solver = Up2P()
        self.sampler = Sampler()
        self.num_models_to_eval = num_models_to_eval
        self.verbose = verbose
    
    def __call__(self, pts2D, pts3D, camera_dict):
        
        w, h = camera_dict["width"], camera_dict["height"]
        params = camera_dict["params"]
        f, cx, cy, _ = params
        
        pts2D = np.concatenate([pts2D, np.ones(pts2D.shape[0])[:, np.newaxis]], axis=1)
        pts2D[:, 0] -= cx
        pts2D[:, 1] -= cy
        pts2D[:2] /= f
        pts2D /= np.linalg.norm(pts2D)
        
        solution, sol_err = None, float("+inf")
        
        iterator = range(self.num_models_to_eval)
        
        for i in (tqdm(iterator) if self.verbose else iterator):
            try:
                # +1 for evaluation in here
                pts2d, idcs = self.sampler(pts2D, self.solver.get_sample_size() + 1)
                pts3d = pts3D[idcs]

                solv_res = self.solver(pts2d[:pts2d.shape[0] - 1], pts3d[:pts3d.shape[0] - 1])
                best_sol, err = None, float("+inf")
                for sol in solv_res:
                    R, t = sol
                    R, t = R.detach().cpu().numpy(), t.detach().cpu().numpy()
                    translated = R.T @ (pts3d[pts3d.shape[0] - 1] - t)
                    translated[:2] /= translated[2]
                    translated[:2] *= f
                    translated[0] += cx
                    translated[1] += cy
                    if np.linalg.norm(translated - pts2d[pts2d.shape[0] - 1]) < err:
                        err = np.linalg.norm(translated - pts2D[i])
                        best_sol = sol
            except:
                continue

            if err < sol_err:
                sol_err = err
                solution = best_sol 

        pose = poselib.CameraPose()
        pose.q = matrix_to_quaternion(solution[0])
        pose.t = solution[1]
        
        return pose
        
solv_pipe = UP2PSolverPipeline()

In [12]:
res = solv_pipe(
    np.array(pts2D),
    np.array(pts3D),
    camera
)
res, type(res)

([q: 0.0916511         0  0.995791         0, t:   11.9355   -14.397 -0.907716],
 poselib.CameraPose)

In [13]:
pose = p3pwrapper(
    np.array(pts2D),
    np.array(pts3D),
    camera
)
pose.t = - pose.R.T @ pose.t
pose

[q:  0.770395 -0.620412  0.145098 -0.022967, t:   12.4091   4.84072 -0.453523]

In [14]:
def process_file(executor: SP, path: str, conf, camera_dict, gts):
    data = np.load(path)
    pts2D = list(data[:, :2])
    pts3D = list(data[:, 2:])
    pp = "/".join(path.split("/")[-2:])
    pp = pp.replace("_matches", "")
    pp = pp.replace(".npy", ".png")
    camera_dict = camera_dict[pp]
    gt = gts[pp]
    c, r = gt[:3], gt[3:]

    pose = executor(np.array(pts2D), np.array(pts3D), camera_dict)

    pose.t = - pose.R.T @ pose.t          

    gt_pose = poselib.CameraPose()
    gt_pose.q = r / np.linalg.norm(r)

    rot_error = np.arccos((np.trace(np.matmul(gt_pose.R.transpose(), pose.R)) - 1.0) / 2.0) * 180.0 / np.pi

    if SHOW_SINGLE:
        print(np.trace(np.matmul(gt_pose.R.transpose(), pose.R)))
        print(" Position error: " + str(np.linalg.norm(c - pose.t)) + " orientation error: " + str(rot_error))
    if np.isnan(rot_error):
        return 1000000.0, 180.0
    else:
        return np.linalg.norm(c - pose.t), rot_error

In [15]:
def prepare_camera_dict(path: str, args):
    with open(path) as file:
        data = file.readlines()

    camera_dict = {}
    for _, line in enumerate(data):
        # image width, image height, focal length, x of pp, y of pp, radial distortion factor 
        path, cam_type, w, h, f, x, y, rd = line.split()
        scaling_factor = 320 / max(np.float32(w), np.float32(h))
  
        # camera = {'model': 'SIMPLE_PINHOLE', 'width': 1200, 'height': 800, 'params': [960, 600, 400]}
        camera_dict[path] = {
          'model': cam_type,
          'width': int(np.float32(w) * scaling_factor),
          'height': int(np.float32(h) * scaling_factor),
          'params': list(map(float, [np.float32(f) * scaling_factor,
                                 np.float32(x) * scaling_factor,
                                 np.float32(y) * scaling_factor,
                                 np.float32(rd)])),
        }
  
    return camera_dict

In [16]:
def prepare_gts(path: str):
    # ImageFile, Camera Position [X Y Z W P Q R]

    with open(path) as file:
        data = file.readlines()

    gts = {}
    for _, line in enumerate(data):
        try:
            # seq13/frame00158.png 25.317314 -0.228082 54.493720 0.374564 0.002123 0.915022 -0.149782
            path, x, y, z, w, p, q, r = line.split()
            rest = [x, y, z, w, p, q, r]
            rest = list(map(float, rest))
        except Exception as ex:
            # print(ex)
            continue
        gts[path] = rest

    return gts

In [17]:
camera_dict = prepare_camera_dict(
    "dataset/StMarysChurch_matches/st_marys_church_list_queries_with_intrinsics_simple_radial_sorted.txt",
    conf
)

gt_dict = prepare_gts(
    "dataset/StMarysChurch_matches/dataset_test.txt"
)

In [18]:
orientation_errors, pose_errors = [], []
for s in seq:
    p = f"{VSP}/seq{s}"
    for f in tqdm(os.listdir(p)):
        if f.split(".")[1] != "npy":
            continue
        pe, oe = process_file(p3pwrapper, f"{p}/{f}", conf, camera_dict, gt_dict)
        pose_errors.append(pe)
        orientation_errors.append(oe)

100%|██████████| 99/99 [00:00<00:00, 219.86it/s]
100%|██████████| 83/83 [00:00<00:00, 194.15it/s]
100%|██████████| 351/351 [00:02<00:00, 148.83it/s]


In [19]:
pos_errors = pose_errors
orient_errors = orientation_errors
print(" Couldn't localize " + str(orientation_errors.count(180.0)) + " out of " + str(len(orientation_errors)) + " images") 
print(" Median position error: " +  str(round(statistics.median(pos_errors),3)) + ", median orientation errors: " + str(round(statistics.median(orient_errors),2)))

med_pos = statistics.median(pos_errors)
med_orient = statistics.median(orient_errors)
counter = 0
for i in range(0, len(pose_errors)):
    if pose_errors[i] <= med_pos and orientation_errors[i] <= med_orient:
        counter += 1
print(" Percentage of poses within the median: " + str(100.0 * float(counter) / float(len(pose_errors))) + " % ")

 Couldn't localize 0 out of 530 images
 Median position error: 0.086, median orientation errors: 0.29
 Percentage of poses within the median: 41.132075471698116 % 


In [20]:
orientation_errors, pose_errors = [], []
for s in seq:
    p = f"{VSP}/seq{s}"
    for f in tqdm(os.listdir(p)):
        if f.split(".")[1] != "npy":
            continue
        pe, oe = process_file(solv_pipe, f"{p}/{f}", conf, camera_dict, gt_dict)
        pose_errors.append(pe)
        orientation_errors.append(oe)

100%|██████████| 99/99 [00:11<00:00,  8.99it/s]
100%|██████████| 83/83 [00:07<00:00, 11.42it/s]
100%|██████████| 351/351 [00:36<00:00,  9.62it/s]


In [21]:
pos_errors = pose_errors
orient_errors = orientation_errors
print(" Couldn't localize " + str(orientation_errors.count(180.0)) + " out of " + str(len(orientation_errors)) + " images") 
print(" Median position error: " +  str(round(statistics.median(pos_errors),3)) + ", median orientation errors: " + str(round(statistics.median(orient_errors),2)))

med_pos = statistics.median(pos_errors)
med_orient = statistics.median(orient_errors)
counter = 0
for i in range(0, len(pose_errors)):
    if pose_errors[i] <= med_pos and orientation_errors[i] <= med_orient:
        counter += 1
print(" Percentage of poses within the median: " + str(100.0 * float(counter) / float(len(pose_errors))) + " % ")

 Couldn't localize 0 out of 530 images
 Median position error: 31.759, median orientation errors: 97.03
 Percentage of poses within the median: 24.528301886792452 % 


In [42]:
orientation_errors, pose_errors = [], []
for s in seq[:-1]:
    p = f"{VSP}/seq{s}"
    for f in tqdm(os.listdir(p)):
        if f.split(".")[1] != "npy":
            continue
        try:
            pe, oe = process_file(ref, f"{p}/{f}", conf, camera_dict, gt_dict)
            pose_errors.append(pe)
            orientation_errors.append(oe)
        except:
            continue

100%|██████████| 99/99 [00:00<00:00, 663.99it/s]
100%|██████████| 83/83 [00:00<00:00, 452.56it/s]


In [24]:
pos_errors = pose_errors
orient_errors = orientation_errors
print(" Couldn't localize " + str(orientation_errors.count(180.0)) + " out of " + str(len(orientation_errors)) + " images") 
print(" Median position error: " +  str(round(statistics.median(pos_errors),3)) + ", median orientation errors: " + str(round(statistics.median(orient_errors),2)))

med_pos = statistics.median(pos_errors)
med_orient = statistics.median(orient_errors)
counter = 0
for i in range(0, len(pose_errors)):
    if pose_errors[i] <= med_pos and orientation_errors[i] <= med_orient:
        counter += 1
print(" Percentage of poses within the median: " + str(100.0 * float(counter) / float(len(pose_errors))) + " % ")

 Couldn't localize 0 out of 169 images
 Median position error: 27.328, median orientation errors: 129.93
 Percentage of poses within the median: 20.118343195266274 % 
