In [1]:
import numpy as np
import pandas as pd
import cv2
import os
import collections
import matplotlib as mpl
from tqdm import tqdm
from typing import List, Tuple, Dict, Any
import gc

# Get statistics for Fundamental Matrix Estimation using FLANN and Ratio Test

In [60]:
#####################
### FUNCTIONS
#####################

def normalize_descriptors(desc:np.array) -> np.array:
    """Creates unit vectors for each descriptor."""    
    _n = np.linalg.norm(desc, axis=1, ord=2) # Get norms of each vector
    _d = desc / _n.reshape(-1, 1)            # Build unit vector
    
    return _d

def compute_distances_kpts_to_epilines(points_i, points_j, F:np.array) -> np.array:
    """Given two sets of matching points [Nx2] returns the an array of absolute
    distances [Nx2], where as the first column contains the distances of the 
    first points to the  epipolar lines in the first image and the second image
    vise versa."""
    assert points_i.shape[1] == 2
    assert points_j.shape[1] == 2
    assert points_i.shape == points_j.shape
    
    if F is None:
        return (np.zeros((points_i.shape[0], 2)) + np.inf)

    # Epipolar lines in image I of the points in image J
    lines_i = cv2.computeCorrespondEpilines(points_j.reshape(-1, 1, 2), 2, F).reshape(-1, 3)

    # Epipolar lines in image J of the points in image I
    lines_j = cv2.computeCorrespondEpilines(points_i.reshape(-1, 1, 2), 1, F).reshape(-1, 3)

    dist = []
    for k in range(points_i.shape[0]):
        # Params for image i
        xi, yi = points_i[k]
        ai, bi, ci = lines_i[k]

        # Params for image j
        xj, yj = points_j[k]
        aj, bj, cj = lines_j[k]

        di = np.abs(ai*xi + bi*yi + ci) / np.sqrt(ai*ai + bi*bi)
        dj = np.abs(aj*xj + bj*yj + cj) / np.sqrt(aj*aj + bj*bj)

        dist.append((di, dj))

    dist = np.array(dist)
    return dist

def save_stats(
    path_output:str,
    fout_name,
    collection_name:str,
    df:pd.DataFrame,
    fast_eval:bool=False) -> None:
    
    if not os.path.exists(path_output):
        os.makedirs(path_output, exist_ok=True)

    df.to_csv(os.path.join(path_output, fout_name), 
              index=False, 
              encoding='utf-8')

#####################
### DATAFRAME
#####################
"""
collection_name:str             Name of the collection
set_name:str                    Name of the set
kpts_threshold:int              Number of used features
descriptor_name:str             Name of descriptor
detector_name:str               Name of detector
matching_method:str             Name of matching method
desc_distance_threshold:float   Maximal distance of two descriptor to match for nn2w method

max_num_matches:int             Maximal number of possible matches
num_matches:int                 Actual number of matches
matchability:float              Ratio of num_matches and max_num_matches
accuracy:float                  Mean of 1 - (score of match / desc_distance_threshold)
mse_matching:float              Mean squared error of matched desriptors.

max_num_inliers:int             Maximal number of inliers for F-matrix
num_inliers:int                 Actual number of inliers for F-matrix
inlier_ratio:float              Ratio between num_inliers and max_num_inliers
avg_distance:float              Mean distance between keypoints and corresponding epipolar line
mse_estimation:float            Mean squared error
"""

column_names = ['collection_name', 'set_name', 'kpts_threshold',
                'descriptor_name', 'detector_name', 'matching_method',
                'desc_distance_threshold', 'max_num_matches',
                'num_matches', 'matchability', 'accuracy',
                'mse_matching', 'max_num_inliers', 'num_inliers', 
                'inlier_ratio', 'avg_distance','mse_estimation']

df = pd.DataFrame(columns=column_names)

#####################
### SETTINGS
#####################

root_dir = '/home/mizzade/Workspace/diplom/code'
image_dir = os.path.join(root_dir, 'data')
data_dir = os.path.join(root_dir, 'outputs')
output_dir = '/home/mizzade/Workspace/diplom/outputs/eval_matching_pipeline'

iname1 = '1.png'
iname2 = '2.png'
fname1 = '1_10000.csv'
fname2 = '2_10000.csv'
file_scheme = '_10000.csv'
matching_method = 'flann_ratio'

collection_name = 'eisert'
collection_path_data = os.path.join(data_dir, collection_name)
collection_path_img = os.path.join(image_dir, collection_name)

kpts_thresholds = [1000, 5000, 10000]
desc_distance_thresholds = [0.7]

fout_name = 'descriptor_matching_{}_fr_fmatrix'.format(collection_name)
    
#####################
### MAIN
#####################
# NOTE: To avoid memory errors, handle number of descriptors, detecotrs,
# keypoint threshold etc with care.


set_names = sorted([x for x in os.listdir(collection_path_img) \
                    if os.path.isdir(os.path.join(collection_path_img, x))])

fast_eval = True
verbose = False

fout_name = fout_name + '_fast.csv' if fast_eval else fout_name + '.csv'

# Create descriptor matcher using L2-Norms.
matcher = cv2.DescriptorMatcher_create(cv2.DescriptorMatcher_FLANNBASED)

for set_name in tqdm(set_names):
    if verbose:
        print(set_name)
    
    # 1. Open folder of set
    set_path_2_desc = os.path.join(collection_path_data, set_name, 'descriptors')
    desc_names = sorted([x for x in os.listdir(set_path_2_desc) \
                               if os.path.isdir(os.path.join(set_path_2_desc, x))])

    descriptor_names = desc_names[:2] if fast_eval else desc_names
    for descriptor_name in descriptor_names:
        if verbose:
            print(descriptor_name)
        
        set_path_2_dets = os.path.join(set_path_2_desc, descriptor_name)
        
        det_names = sorted([x for x in os.listdir(set_path_2_dets) \
                                 if os.path.isdir(os.path.join(set_path_2_dets, x))])
        
        detector_names = det_names[:2] if fast_eval else det_names
        for detector_name in detector_names:
            if verbose:
                print('\t', detector_name)
            
            set_path_2_files = os.path.join(set_path_2_dets, detector_name)
            file_names = sorted([x for x in os.listdir(set_path_2_files) if file_scheme in x])
            
            # 2. Open detector keypoints.
            kpts_path = os.path.join(collection_path_data, set_name, 'keypoints', detector_name)
            kpts1 = pd.read_csv(os.path.join(kpts_path, fname1), sep=',', comment='#', header=None, usecols=[0, 1]).values
            kpts2 = pd.read_csv(os.path.join(kpts_path, fname2), sep=',', comment='#', header=None, usecols=[0, 1]).values

            # 3. Open corresponding descriptors
            desc_path = os.path.join(collection_path_data, set_name, 'descriptors', descriptor_name, detector_name)
            desc1 = pd.read_csv(os.path.join(desc_path, fname1), sep=',', comment='#', header=None).values.astype(np.float32)
            desc2 = pd.read_csv(os.path.join(desc_path, fname2), sep=',', comment='#', header=None).values.astype(np.float32)
            desc1 = normalize_descriptors(desc1)
            desc2 = normalize_descriptors(desc2)
            
            kpts_thresholds = kpts_thresholds[:1] if fast_eval else kpts_thresholds
            for kpts_thresh in kpts_thresholds:
                # 4. Get the subset of descriptors and detectors
                # Make a copy, otherwise you overwrite the slices of 
                # the original.
                d1 = desc1[:kpts_thresh].copy()
                d2 = desc2[:kpts_thresh].copy()
                k1 = kpts1[:kpts_thresh].copy()
                k2 = kpts2[:kpts_thresh].copy()
                
                max_num_matches = np.min([len(d1), len(d2)])
                
                for desc_dist in desc_distance_thresholds:
                    # Find best 2 matches for each descriptor.
                    knn_matches = matcher.knnMatch(d1, d2, 2)
                    
                    # Filter each matching pair (first, second) using the Lowe's ratio test
                    # desc_dist is here the ratio threshold (0.7)
                    # Create Nx3 array for the matches containing
                    # - the id of the descriptor in d1
                    # - the id of the descriptor in d2
                    # - the distance between those two
                    res = []
                    for first_match, second_match in knn_matches:
                        if first_match.distance < desc_dist * second_match.distance:
                            res.append((first_match.queryIdx, first_match.trainIdx, first_match.distance))
                            
                    res = np.array(res).reshape(-1, 3)
                    
                    num_matches = len(res)
                    
                    matchability = 0 \
                        if max_num_matches == 0 \
                        else num_matches / max_num_matches
                    
                    accuracy = 0 \
                        if num_matches == 0 \
                        else 1.0 - np.mean(res[:, 2] / desc_dist)
                    
                    mse_matching = 0 \
                        if num_matches == 0 \
                        else np.mean(np.linalg.norm(res[:, 2]))
                    
                    max_num_inliers = len(res)
                    
                    # Indices of kpts1 and kpts2 matches
                    idx1 = res[:, 0].astype(np.int)
                    idx2 = res[:, 1].astype(np.int)
                  
                    # Get matching keypoints. Float32 for using cv2 functions.
                    hits1 = (k1.copy()[idx1]).astype(np.float32)
                    hits2 = (k2.copy()[idx2]).astype(np.float32)
                    
                    # ransacReprojThreshold: Maximal distance in pixels from point to epipolar line
                    # to be condiered inlier.
                    # confidence: Confidence value that fundamental matrix is correct.
                    F, mask = cv2.findFundamentalMat(
                        hits1, 
                        hits2, 
                        method=cv2.FM_RANSAC,
                        ransacReprojThreshold=3,
                        confidence=.99)
                    
                    if  mask is not None:
                        num_inliers = 0 if mask is None else np.sum(mask)
                        inlier_ratio = 0 if len(res) == 0 else np.sum(mask) / len(res)
                        
                        mask = mask.ravel()
                        hits1 = hits1[mask==1]
                        hits2 = hits2[mask==1]

                        # Absolute distances of points to corresponding epipoloar line
                        # Column1: distances of points in image1 to epilines
                        # Column2: distances of points in image2 to epilines
                        #d_errors = compute_distances_kpts_to_epilines(hits1, hits2, F)
                        d_errors = compute_distances_kpts_to_epilines(hits1, hits2, F)
                        d_errors = d_errors.reshape(-1, 1)
                        mean_dist = np.mean(d_errors)
                        mse_estimation = 0 \
                            if len(d_errors) == 0 \
                            else np.linalg.norm(d_errors) / len(d_errors)

                    else:
                        num_inliers = 0
                        inlier_ratio = 0
                        mean_dist = np.nan
                        mse_estimation = np.nan
                        
                    # Add to dataframe
                    df = df.append({
                        'collection_name': collection_name, 
                        'set_name': set_name, 
                        'kpts_threshold': kpts_thresh,
                        'descriptor_name': descriptor_name, 
                        'detector_name': detector_name, 
                        'matching_method': matching_method,
                        'desc_distance_threshold': desc_dist,
                        'max_num_matches': max_num_matches,
                        'num_matches': num_matches,
                        'matchability': matchability,
                        'accuracy': accuracy,
                        'mse_matching': mse_matching,
                        'max_num_inliers': max_num_inliers,
                        'num_inliers': num_inliers,
                        'inlier_ratio': inlier_ratio,
                        'avg_distance': mean_dist,
                        'mse_estimation': mse_estimation
                    }, ignore_index=True)
                    
                    
                    # Free memory
                    del knn_matches
                    del idx1, idx2, F, mask, num_inliers, inlier_ratio, mean_dist, mse_estimation, hits1, hits2
                    del num_matches, matchability, accuracy, mse_matching
                    gc.collect()
                
                # Free memory
                del d1, d2, k1, k2, max_num_matches
                gc.collect()
            
            # Free memory
            del kpts1, kpts2, desc1, desc2
            gc.collect()

save_stats(
    output_dir,
    fout_name,
    collection_name,
    df,
    fast_eval=fast_eval)
















  0%|          | 0/13 [00:00<?, ?it/s][A[A[A[A[A[A[A[A[A[A[A[A[A[A[A














  8%|▊         | 1/13 [00:04<00:57,  4.77s/it][A[A[A[A[A[A[A[A[A[A[A[A[A[A[A














 15%|█▌        | 2/13 [00:09<00:51,  4.71s/it][A[A[A[A[A[A[A[A[A[A[A[A[A[A[A














 23%|██▎       | 3/13 [00:16<00:55,  5.52s/it][A[A[A[A[A[A[A[A[A[A[A[A[A[A[A














 31%|███       | 4/13 [00:24<00:54,  6.04s/it][A[A[A[A[A[A[A[A[A[A[A[A[A[A[A














 38%|███▊      | 5/13 [00:31<00:51,  6.47s/it][A[A[A[A[A[A[A[A[A[A[A[A[A[A[A














 46%|████▌     | 6/13 [00:38<00:45,  6.50s/it][A[A[A[A[A[A[A[A[A[A[A[A[A[A[A














 54%|█████▍    | 7/13 [00:44<00:38,  6.47s/it][A[A[A[A[A[A[A[A[A[A[A[A[A[A[A














 62%|██████▏   | 8/13 [00:50<00:31,  6.36s/it][A[A[A[A[A[A[A[A[A[A[A[A[A[A[A














 69%|██████▉   | 9/13 [00:54<00:22,  5.