In [None]:
!python -m pip install --no-deps /kaggle/input/dependencies-imc/pycolmap/pycolmap-0.4.0-cp310-cp310-manylinux2014_x86_64.whl
!python -m pip install --no-deps /kaggle/input/dependencies-imc/safetensors/safetensors-0.4.1-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl
!python -m pip install --no-index --find-links=/kaggle/input/dependencies-imc/transformers/ transformers > /dev/null
!python -m pip install  --no-deps /kaggle/input/imc2024-packages-lightglue-rerun-kornia/lightglue-0.0-py3-none-any.whl

!python -m pip install --no-index --find-links=/kaggle/input/dkm-dependencies/packages einops > /dev/null

In [None]:
!pip install --no-index /kaggle/input/imc2024-packages-lightglue-rerun-kornia/* --no-deps
!mkdir -p /root/.cache/torch/hub/checkpoints
!cp /kaggle/input/extractors-points-v2/depth-save.pth /root/.cache/torch/hub/checkpoints/
!cp /kaggle/input/extractors-points-v2/disk_lightglue.pth /root/.cache/torch/hub/checkpoints/disk_lightglue_v0-1_arxiv-pth
!cp /kaggle/input/extractors-points-v2/superpoint_v1.pth /root/.cache/torch/hub/checkpoints/
!cp /kaggle/input/aliked/pytorch/aliked-n16/1/* /root/.cache/torch/hub/checkpoints/
!cp /kaggle/input/lightglue/pytorch/aliked/1/* /root/.cache/torch/hub/checkpoints/
!cp /kaggle/input/lightglue/pytorch/aliked/1/aliked_lightglue.pth /root/.cache/torch/hub/checkpoints/aliked_lightglue_v0-1_arxiv-pth
!cp /kaggle/input/extractors-points-v2/superpoint_lightglue.pth /root/.cache/torch/hub/checkpoints/superpoint_lightglue_v0-1_arxiv-pth
!cp /kaggle/input/dkm-dependencies/DKMv3_outdoor.pth /root/.cache/torch/hub/checkpoints/

In [None]:
import sys
import os
from pathlib import Path
sys.path.append('/kaggle/input/colmap-db-import')
sys.path.append('/kaggle/input/utils-reconstruction')
sys.path.append('/kaggle/input/dkm-dependencies/DKM/')
from database import *
from h5_to_db import *
from utils_reconstruction import *
import gc
import torch
import kornia as K

In [None]:
class Config:
    base_path: Path = Path('/kaggle/input/image-matching-challenge-2024')
    feature_dir: Path = Path.cwd() / '.featureout'
        
    device: torch.device = K.utils.get_cuda_device_if_available(0)
    
    aliked_lightglue = True
    disk_lightglue = False
    superpoint_lightglue = False
    dkm = False
    
    model_name = '/kaggle/input/dinov2/pytorch/large/1'
    
    pair_matching_args = {
        'similarity_threshold': 0.30,
        'tolerance': 500,
        'min_matches': 50,
        'exhaustive_if_less': 50,
        'p': 2.0,
    }
    
    keypoint_detection_args = {
        'num_features': 4098,
        'resize_to': 1280,
        'detection_threshold' : 0.01,
    }
    
    keypoint_distances_args = {
        'min_matches': 100,
        'verbose': False,
    }
    
    colmap_mapper_options = {
        'min_model_size': 5, 
        'max_num_models': 2,
    }
    
    params_dkm = {
        'num_features' : 4096,
        'detection_threshold' : 0.01,
        'min_matches' : 100,
        'resize_to' : (540, 720),    
    }

In [None]:
def parse_sample_submission(
    base_path: Path,
) -> dict[dict[str, list[Path]]]:
    """Construct a dict describing the test data as 
    
    {'dataset': {'scene': [<image paths>]}}
    """
    data_dict = {}
    with open(base_path / 'sample_submission.csv', 'r') as f:
        for i, l in enumerate(f):
            if i == 0:
                print('header:', l)

            if l and i > 0:
                image_path, dataset, scene, _, _ = l.strip().split(',')
                if dataset not in data_dict:
                    data_dict[dataset] = {}
                if scene not in data_dict[dataset]:
                    data_dict[dataset][scene] = []
                data_dict[dataset][scene].append(Path(base_path / image_path))

    for dataset in data_dict:
        for scene in data_dict[dataset]:
            print(f'{dataset} / {scene} -> {len(data_dict[dataset][scene])} images')

    return data_dict

In [None]:
def create_submission(results: dict, data_dict: dict[dict[str, list[Path]]], base_path: Path) -> None:
    """Prepares a submission file."""
    
    with open('submission.csv', 'w') as f:
        f.write('image_path,dataset,scene,rotation_matrix,translation_vector\n')
        
        for dataset in data_dict:
            if dataset in results:
                res = results[dataset]
            else:
                res = {}

            for scene in data_dict[dataset]:
                if scene in res:
                    scene_res = res[scene]
                else:
                    scene_res = {'R':{}, 't':{}}
                    
                for image in data_dict[dataset][scene]:
                    if image in scene_res:
                        print(image)
                        R = scene_res[image]['R'].reshape(-1)
                        T = scene_res[image]['t'].reshape(-1)
                    else:
                        R = np.eye(3).reshape(-1)
                        T = np.zeros((3))
                    image_path = str(image.relative_to(base_path))
                    f.write(f'{image_path},{dataset},{scene},{arr_to_str(R)},{arr_to_str(T)}\n')

In [None]:
def run_from_config(config: Config) -> None:
    results = {}
    
    data_dict = parse_sample_submission(config.base_path)
    datasets = list(data_dict.keys())
    
    for dataset in datasets:
        if dataset not in results:
            results[dataset] = {}
            
        for scene in data_dict[dataset]:
            images_dir = data_dict[dataset][scene][0].parent
            results[dataset][scene] = {}
            image_paths = data_dict[dataset][scene]
            print (f'Got {len(image_paths)} images')
            
            try:
                feature_dir = config.feature_dir / f'{dataset}_{scene}'
                feature_dir.mkdir(parents=True, exist_ok=True)
                database_path = feature_dir / 'colmap.db'
                if database_path.exists():
                    database_path.unlink()
                
                pairs = GetPairs(config.model_name, image_paths, config.device)
                index_pairs = pairs.get_image_pairs(**config.pair_matching_args)
                torch.cuda.empty_cache()
                gc.collect()
                
                files_keypoints = []

                if config.dkm:
                    dkm = DKMReconstruction(image_paths, **config.params_dkm, device=device)
                    file_keypoints = f'{feature_dir}/matches_dkm.h5'
                    dkm.detect_dkm(index_pairs, feature_dir, file_keypoints)
                    
                    torch.cuda.empty_cache()
                    gc.collect()
                    files_keypoints.append(file_keypoints)
                
                
                if config.aliked_lightglue:
                    lga = LightglueReconstruction('aliked', image_paths, device=device)
                    file_keypoints = f'{feature_dir}/matches_lightglue_aliked.h5'
                    lga.detect_keypoints(feature_dir, **config.keypoint_detection_args)
                    lga.keypoint_distances(index_pairs, file_keypoints, feature_dir, **config.keypoint_distances_args)
                    
                    torch.cuda.empty_cache()
                    gc.collect()
                    files_keypoints.append(file_keypoints)
                
                if config.disk_lightglue:
                    lgd = LightglueReconstruction('disk', image_paths, device=device)
                    file_keypoints = f'{feature_dir}/matches_lightglue_disk.h5'
                    lgd.detect_keypoints(feature_dir, **config.keypoint_detection_args)
                    lgd.keypoint_distances(index_pairs, file_keypoints, feature_dir, **config.keypoint_distances_args)
                    
                    torch.cuda.empty_cache()
                    gc.collect()
                    files_keypoints.append(file_keypoints)
              
                if config.superpoint_lightglue:
                    lgs = LightglueReconstruction('superpoint', image_paths, device=device)
                    file_keypoints = f'{feature_dir}/matches_lightglue_superpoint.h5'
                    lgs.detect_keypoints(feature_dir, **config.keypoint_detection_args)
                    lgs.keypoint_distances(index_pairs, file_keypoints, feature_dir, **config.keypoint_distances_args)
                    
                    torch.cuda.empty_cache()
                    gc.collect()
                    files_keypoints.append(file_keypoints)
                
                merger = Merge(image_paths, index_pairs, files_keypoints, feature_dir = feature_dir)
                merger.keypoints_merger()
                torch.cuda.empty_cache()
                gc.collect()
                
                import_into_colmap(
                    images_dir, 
                    feature_dir, 
                    database_path,
                )
                
                output_path = feature_dir / 'colmap_rec_aliked'
                output_path.mkdir(parents=True, exist_ok=True)

                pycolmap.match_exhaustive(database_path)
                
                mapper_options = pycolmap.IncrementalPipelineOptions(**config.colmap_mapper_options)
                
                maps = pycolmap.incremental_mapping(
                    database_path=database_path, 
                    image_path=images_dir,
                    output_path=output_path, 
                    options=mapper_options,
                )

                clear_output(wait=False)
                images_registered  = 0
                best_idx = None
                
                if isinstance(maps, dict):
                    for idx1, rec in maps.items():
                        print(idx1, rec.summary())
                        try:
                            if len(rec.images) > images_registered:
                                images_registered = len(rec.images)
                                best_idx = idx1
                        except Exception:
                            continue
                
                if best_idx is not None:
                    for k, im in maps[best_idx].images.items():
                        key = config.base_path / 'test' / scene / 'images' / im.name
                        results[dataset][scene][key] = {}
                        results[dataset][scene][key]['R'] = deepcopy(im.cam_from_world.rotation.matrix())
                        results[dataset][scene][key]['t'] = deepcopy(np.array(im.cam_from_world.translation))
                        

                create_submission(results, data_dict, config.base_path)
                gc.collect()
            
            except Exception as e:
                print(e)

In [None]:
run_from_config(Config)