In [None]:
!pip install mediapy

In [None]:
# Get some utility functions from https://github.com/cvg/Hierarchical-Localization/

%cd /kaggle/working/
!rm -rf /kaggle/working/Hierarchical-Localization
!git clone --quiet --recursive https://github.com/cvg/Hierarchical-Localization/
%cd /kaggle/working/Hierarchical-Localization
!pip install -e .

from hloc import extract_features, match_features, reconstruction, visualization, pairs_from_exhaustive
from hloc.visualization import plot_images, read_image
from hloc.utils import viz_3d

%cd /kaggle/working/

In [None]:
import os
import pycolmap
import numpy as np
import mediapy as media
import cv2
from glob import glob
from pathlib import Path
from time import time

In [None]:
dataset = 'urban'
scene = 'kyiv-puppet-theater'
src = f'/kaggle/input/image-matching-challenge-2023/train/{dataset}/{scene}'
#src = f'/kaggle/input/image-matching-challenge-2023'
images = [cv2.cvtColor(cv2.imread(im), cv2.COLOR_BGR2RGB) for im in glob(f'{src}/images/*')]

In [None]:
media.show_images(images, height=300, columns=4)

## Visualizing a Colmap reconstruction

In [None]:
# Visualize a ground-truth 3D reconstruction (points and cameras). Note that some of these might be very large!
# You can also visualize them outside a notebook with the colmap gui: https://colmap.github.io/

rec_gt = pycolmap.Reconstruction(f'{src}/sfm')

fig = viz_3d.init_figure()
viz_3d.plot_cameras(fig, rec_gt, color='rgba(50,255,50, 0.5)', name="Ground Truth", size=10)
viz_3d.plot_reconstruction(fig, rec_gt, cameras = False, color='rgba(255,50,255, 0.5)', name="Ground Truth", cs=5)
fig.show()

## Creating a reconstruction with SIFT and Colmap

In [None]:
tgt = f'/kaggle/working/{dataset}-{scene}'

# Generate a simple reconstruction with SIFT (https://en.wikipedia.org/wiki/Scale-invariant_feature_transform).
if not os.path.isdir(tgt):
    os.makedirs(f'{tgt}/bundle')
    os.system(f'cp -r {src}/images {tgt}/images')

    database_path = f'{tgt}/database.db'

    sift_opt = pycolmap.SiftExtractionOptions()
    sift_opt.max_image_size = 1500  # Extract features at low resolution could significantly reduce the overall accuracy
    sift_opt.max_num_features = 8192  # Generally more features is better, even if behond a certain number it doesn't help incresing accuracy
    sift_opt.upright = True  # rotation invariance
    device = 'cpu'

    t = time()
    pycolmap.extract_features(database_path, f'{tgt}/images', sift_options=sift_opt, verbose=True)
    print(len(os.listdir(f'{tgt}/images')))
    print('TIMINGS --- Feature extraction', time() - t)

    t = time()
    matching_opt = pycolmap.SiftMatchingOptions()
    matching_opt.max_ratio = 0.85 # Ratio threshold significantly influence the performance of the feature extraction method. It varies depending on the local feature but also on the image type 
#     matching_opt.max_distance = 0.7
    matching_opt.cross_check = True
    matching_opt.max_error = 1.0 # The ransac error threshold could help to exclude less accurate tie points
    pycolmap.match_exhaustive(database_path, sift_options=matching_opt, device=device, verbose=True)
    print('TIMINGS --- Feature matching', time() - t)

    t = time()
    mapper_options = pycolmap.IncrementalMapperOptions()
    mapper_options.extract_colors = False
    mapper_options.min_model_size = 3

    # Sometimes you want to impose the first image pair for initialize the incremental reconstruction
    mapper_options.init_image_id1 = -1 
    mapper_options.init_image_id2 = -1

    # Choose which interior will be refined during BA
    mapper_options.ba_refine_focal_length = True
    mapper_options.ba_refine_principal_point = True
    mapper_options.ba_refine_extra_params = True

    maps = pycolmap.incremental_mapping(database_path=database_path, image_path=f'{tgt}/images', output_path=f'{tgt}/bundle', options=mapper_options)
    print('TIMINGS --- Mapping', time() - t)

In [None]:
maps

In [None]:
# Inspect the largest map generated by colmap. Note that it might produce more than one reconstruction.
# Don't expect great results! SIFT might work well on some scenes, but not others.

best_index = None
best_num_reg_images = 0
for idx in maps:
    if maps[idx].num_reg_images() > best_num_reg_images:
        best_index = idx
        best_num_reg_images = maps[idx].num_reg_images()

if best_num_reg_images > 0:
    print(f'Looking at reconstruction #{best_index} with {best_num_reg_images} registered images')

    fig = viz_3d.init_figure()
    viz_3d.plot_reconstruction(fig, maps[best_index], color='rgba(0,0,255,0.5)', name="Reconstruction", cs=5, cameras=False)
    viz_3d.plot_reconstruction(fig, maps[best_index], color='rgba(255,0,0,0.5)', name="Reconstruction", cs=5, points=False)
    fig.show()
else:
    print('No reconstruction. :(')

## Writing the "submission"

In [None]:
# Generate a "submission" from colmap.
# Note that to get a functional submission you'll have to process every scene.

def arr_to_str(a):
    return ';'.join([str(x) for x in a.reshape(-1)])

# Get a list of images on this dataset.
all_images = []
with open('/kaggle/input/image-matching-challenge-2023/train/train_labels.csv', 'r') as f:
    for i, l in enumerate(f):
        if i > 0 and l:
            _dataset, _scene, image_path, _, _ = l.strip().split(',')
            if dataset == _dataset and scene == _scene:
                all_images.append(image_path)

if best_num_reg_images > 0:
    rec = maps[best_index]
    
    with open('submission.csv', 'w') as f:
        f.write('image_path,dataset,scene,rotation_matrix,translation_vector\n')
        registered_images = []
        for imgidx, img in rec.images.items():
            R = img.rotmat()
            T = img.tvec
            img_name  = img.name
            f.write(f'{dataset}/{scene}/images/{img_name},{dataset},{scene},{arr_to_str(R)},{arr_to_str(T)}\n')
            registered_images.append(img_name)
        
        # Fill out unregistered images with zeros or the score function will fail.
        for img_name in all_images:
            if img_name not in registered_images:
                R = np.zeros(9)
                T = np.zeros(3)
                f.write(f'{img_name},{dataset},{scene},{arr_to_str(R)},{arr_to_str(T)}\n')

    !cat submission.csv

# Better features than SIFT?

Let's try one of the IMC2020 top-solutions - the DISK local feature extractor.

We will have to write code to extract features, and then import them into the colmap database. 

You can find examples here: [imc2023-kornia-starter-pack](https://github.com/ducha-aiki/imc2023-kornia-starter-pack.git).

In [None]:
%cd /kaggle/working/
!rm -rf /kaggle/working/imc2023-kornia-starter-pack
!git clone --quiet --recursive https://github.com/ducha-aiki/imc2023-kornia-starter-pack.git

import sys
sys.path.append('/kaggle/working/imc2023-kornia-starter-pack')

import kornia as K
import kornia.feature as KF
import torch
import h5py
from fastprogress import progress_bar


In [None]:
# We will need images in pytorch format
def load_torch_image(fname, device=torch.device('cpu')):
    img = K.image_to_tensor(cv2.imread(fname), False).float() / 255.
    img = K.color.bgr_to_rgb(img.to(device))
    return img

# Load DISK w/o internet connection through Kaggle Models voodoo: while not necessary yet, it will be helpful for offline submissions, which require turning off internet access.
def load_DISK(device=torch.device('cpu')):
    disk = KF.DISK().to(device)
    pretrained_dict = torch.load('/kaggle/input/disk/pytorch/depth-supervision/1/loftr_outdoor.ckpt', map_location=device)
    disk.load_state_dict(pretrained_dict['extractor'])
    disk.eval()
    return disk

In [None]:
def detect_features(img_fnames,
                    num_feats = 2048,
                    device=torch.device('cpu'),
                    feature_dir = '.featureout'):
    disk = load_DISK(device)
    if not os.path.isdir(feature_dir):
        os.makedirs(feature_dir)
    
    # We will save features to h5 files, and then will import them into colmap
    with h5py.File(f'{feature_dir}/keypoints.h5', mode='w') as f_kp, \
         h5py.File(f'{feature_dir}/descriptors.h5', mode='w') as f_desc:
        for img_path in progress_bar(img_fnames):
            img_fname = img_path.split('/')[-1]
            key = img_fname
            with torch.inference_mode():
                timg = load_torch_image(img_path, device=device)
                features = disk(timg, num_feats, pad_if_not_divisible=True)[0]
                kpts, descs = features.keypoints, features.descriptors
            
            # Convert to numpy to store in h5py
            kpts = kpts.reshape(-1, 2).detach().cpu().numpy()
            descs = descs.reshape(-1, 128).detach().cpu().numpy()
            f_kp[key] = kpts
            f_desc[key] = descs
    return

img_fnames =  [fname for fname in glob(f'{src}/images/*')]
feature_dir = 'disk_features'
device=torch.device('cuda')
detect_features(img_fnames, 5000, device=device, feature_dir=feature_dir)

In [None]:
%%capture
# kornia_moons for feature visualization
!pip install kornia_moons

In [None]:
# Let's visualize our detections
from kornia_moons.feature import visualize_LAF

image_index = 5
with h5py.File(f'{feature_dir}/keypoints.h5', mode='r') as f_kp:
    img1 = load_torch_image(img_fnames[image_index])
    key = img_fnames[image_index].split('/')[-1]
    lafs = KF.laf_from_center_scale_ori(torch.from_numpy(f_kp[key][...]).reshape(1,-1, 2))
    visualize_LAF(img1, lafs)


In [None]:
# Now we will match our features on GPU with kornia

def match_features(img_fnames,
                   index_pairs,
                   feature_dir = '.featureout',
                   device=torch.device('cpu'),
                   min_matches=15, verbose = True):
    with h5py.File(f'{feature_dir}/descriptors.h5', mode='r') as f_desc, \
         h5py.File(f'{feature_dir}/matches.h5', mode='w') as f_match:
        for pair_idx in progress_bar(index_pairs):
                    idx1, idx2 = pair_idx
                    fname1, fname2 = img_fnames[idx1], img_fnames[idx2]
                    key1, key2 = fname1.split('/')[-1], fname2.split('/')[-1]
                    desc1 = torch.from_numpy(f_desc[key1][...]).to(device)
                    desc2 = torch.from_numpy(f_desc[key2][...]).to(device)
                    # Matching with mutual nearest neighbor check and Lowe's threshold
                    dists, idxs = KF.match_smnn(desc1, desc2, 0.98)
                    if len(idxs)  == 0:
                        continue
                    n_matches = len(idxs)
                    if verbose:
                        print (f'{key1}-{key2}: {n_matches} matches')
                    group  = f_match.require_group(key1)
                    if n_matches >= min_matches:
                         group.create_dataset(key2, data=idxs.detach().cpu().numpy().reshape(-1, 2))
    return

# matching all to all
index_pairs = []
for i in range(len(img_fnames)):
    for j in range(i+1, len(img_fnames)):
        index_pairs.append((i,j))

match_features(img_fnames, index_pairs, device=torch.device('cuda'), feature_dir=feature_dir)

In [None]:
# Import into colmap

from h5_to_db import import_into_colmap

database_path = f'{tgt}/database_disk.db'
!rm -rf {database_path}
img_dir = f'{src}/images'

import_into_colmap(img_dir, database_path=database_path, feature_dir=feature_dir)

In [None]:
# Now we run colmap. First matching - because it is bundled with RANSAC.
# Don't worry, colmap would not match our features again, it will just estimate geometry.
output_path =  f'{tgt}/disk_reconstruction'
os.makedirs(output_path, exist_ok=True)
pycolmap.match_exhaustive(database_path)

# Then we run reconstruction, as before
maps = pycolmap.incremental_mapping(database_path, img_dir, output_path)

In [None]:
maps

In [None]:
# Inspect the largest map generated by DISK + colmap. 
# Note that it might produce more than one reconstruction.

best_index = None
best_num_reg_images = 0
for idx in maps:
    if maps[idx].num_reg_images() > best_num_reg_images:
        best_index = idx
        best_num_reg_images = maps[idx].num_reg_images()

if best_num_reg_images > 0:
    print(f'Looking at reconstruction #{best_index} with {best_num_reg_images} registered images')
    fig = viz_3d.init_figure()
    viz_3d.plot_reconstruction(fig, maps[best_index], color='rgba(0,0,255,0.5)', name="Reconstruction", cs=3, cameras=False)
    viz_3d.plot_reconstruction(fig, maps[best_index], color='rgba(255,0,0,0.5)', name="Reconstruction", cs=3, points=False)
    fig.show()
else:
    print('No reconstruction. :(')

In [None]:
# Now we will try to align the obtained reconstruction to the GT one with Helmert transform
common_fnames = []
for k, v in maps[best_index].images.items():
    common_fnames.append(v.name)

gt_positions = []
for img_fname in common_fnames:
    for k,v in rec_gt.images.items():
        if v.name == img_fname:
            break
    gt_positions.append(v.projection_center().reshape(3, 1))

min_common_images = 10
transform = maps[best_index].align_robust(common_fnames, gt_positions, 5)
print (transform)

In [None]:
# Visualize the alignment.
# Note that the ground truth will show more cameras than we provide you for the reconstruction.

print(rec_gt)
print()

fig = viz_3d.init_figure()
viz_3d.plot_reconstruction(fig, maps[best_index], color='rgba(0,255,255,0.5)', name="Reconstruction", cs=5)
viz_3d.plot_reconstruction(fig, rec_gt, points=False, color='rgba(0,100,0,0.1)', name="GT Reconstruction", cs=5)
fig.show()
