In [None]:
%load_ext autoreload
%autoreload 2

from mast3r.model import AsymmetricMASt3R
from mast3r.fast_nn import fast_reciprocal_NNs
import os
import numpy as np
import trimesh
import copy
from scipy.spatial.transform import Rotation
import tempfile
import pandas as pd
import dataclasses
from pathlib import Path
import gc
import shutil
from copy import deepcopy
import torch
import glob
import matplotlib.pyplot as pl
import imageio.v2 as iio
import time
from boq.boq_infer import get_trained_boq, boq_sort_topk
import json

from hloc import (
    extract_features,
    match_features,
    reconstruction,
    visualization,
    pairs_from_exhaustive,
)
from hloc.visualization import plot_images, read_image
from hloc.utils import viz_3d
from transformers import AutoImageProcessor, AutoModel
import pycolmap

In [None]:
@dataclasses.dataclass
class Prediction:
    image_id: str | None  # A unique identifier for the row -- unused otherwise. Used only on the hidden test set.
    dataset: str
    filename: str
    cluster_index: int | None = None
    rotation: np.ndarray | None = None
    translation: np.ndarray | None = None

# Set is_train=True to run the notebook on the training data.
# Set is_train=False if submitting an entry to the competition (test data is hidden, and different from what you see on the "test" folder).
is_train = False
data_dir = 'data/image-matching-challenge-2025'
workdir = 'result/'
os.makedirs(workdir, exist_ok=True)
workdir = Path(workdir)

if is_train:
    sample_submission_csv = os.path.join(data_dir, 'train_labels.csv')
else:
    sample_submission_csv = os.path.join(data_dir, 'sample_submission.csv')

samples = {}
competition_data = pd.read_csv(sample_submission_csv)
for _, row in competition_data.iterrows():
    # Note: For the test data, the "scene" column has no meaning, and the rotation_matrix and translation_vector columns are random.
    if row.dataset not in samples:
        samples[row.dataset] = []
    samples[row.dataset].append(
        Prediction(
            image_id=None if is_train else row.image_id,
            dataset=row.dataset,
            filename=row.image
        )
    )

for dataset in samples:
    print(f'Dataset "{dataset}" -> num_images={len(samples[dataset])}')

max_images = None  # Used For debugging only. Set to None to disable.
datasets_to_process = None  # Not the best convention, but None means all datasets.


if is_train:
    # max_images = 5

    # Note: When running on the training dataset, the notebook will hit the time limit and die. Use this filter to run on a few specific datasets.
    datasets_to_process = [
    	# New data.
    	'amy_gardens',
    	'ETs',
    	'fbk_vineyard',
    	'stairs',
    	# Data from IMC 2023 and 2024.
    	'imc2024_dioscuri_baalshamin',
    	'imc2023_theather_imc2024_church',
    	'imc2023_heritage',
    	'imc2023_haiper',
    	'imc2024_lizard_pond',
    	'pt_stpeters_stpauls',
    	'pt_brandenburg_british_buckingham',
    	'pt_piazzasanmarco_grandplace',
    	'pt_sacrecoeur_trevi_tajmahal',
    ]

for dataset, predictions in samples.items():
    if datasets_to_process and dataset not in datasets_to_process:
        print(f'Skipping "{dataset}"')
        continue
    
    images_dir = os.path.join(data_dir, 'train' if is_train else 'test', dataset)
    if not os.path.exists(images_dir):
        print(f'Images dir "{images_dir}" does not exist. Skipping "{dataset}"')
        continue
    
    images_dir = Path(images_dir)

    print(f'Images dir: {images_dir}')

    image_names = [p.filename for p in predictions]
    if max_images is not None:
        image_names = image_names[:max_images]

    print(f'\nProcessing dataset "{dataset}": {len(image_names)} images')

    filename_to_index = {p.filename: idx for idx, p in enumerate(predictions)}

    sfm_pairs = workdir / dataset / "pairs-sfm.txt"
    loc_pairs = workdir / dataset / "pairs-loc.txt"
    sfm_dir = workdir / dataset / "sfm"
    features = workdir / dataset / "features.h5"
    matches = workdir / dataset / "matches.h5"


    feature_conf = extract_features.confs["disk"]
    matcher_conf = match_features.confs["disk+lightglue"]
    
    extract_features.main(feature_conf, images_dir, image_list=image_names, feature_path=features)
    pairs_from_exhaustive.main(sfm_pairs, image_list=image_names)
    match_features.main(matcher_conf, sfm_pairs, features=features, matches=matches)
    
    # # By default colmap does not generate a reconstruction if less than 10 images are registered.
    # # Lower it to 3.
    mapper_options = {"min_model_size" : 5, "max_num_models": 45}
    max_map, maps = reconstruction.main(
        sfm_dir, images_dir, sfm_pairs, features, matches,
        image_list=image_names, min_match_score=0.1, mapper_options = mapper_options, 
    )
    gc.collect()
    time.sleep(1)

    registered = 0
    for map_index, cur_map in maps.items():
        for index, image in cur_map.images.items():
            prediction_index = filename_to_index[image.name]
            predictions[prediction_index].cluster_index = map_index
            predictions[prediction_index].rotation = deepcopy(image.cam_from_world.rotation.matrix())
            predictions[prediction_index].translation = deepcopy(image.cam_from_world.translation)
            registered += 1
    mapping_result_str = f'Dataset "{dataset}" -> Registered {registered} / {len(image_names)} images with {len(maps)} clusters'
    print(mapping_result_str)


In [None]:
# Must Create a submission file.

array_to_str = lambda array: ';'.join([f"{x:.09f}" for x in array])
none_to_str = lambda n: ';'.join(['nan'] * n)

submission_file = 'result/submission.csv'
with open(submission_file, 'w') as f:
    if is_train:
        f.write('dataset,scene,image,rotation_matrix,translation_vector\n')
        for dataset in samples:
            for prediction in samples[dataset]:
                cluster_name = 'outliers' if prediction.cluster_index is None else f'cluster{prediction.cluster_index}'
                rotation = none_to_str(9) if prediction.rotation is None else array_to_str(prediction.rotation.flatten())
                translation = none_to_str(3) if prediction.translation is None else array_to_str(prediction.translation)
                f.write(f'{prediction.dataset},{cluster_name},{prediction.filename},{rotation},{translation}\n')
    else:
        f.write('image_id,dataset,scene,image,rotation_matrix,translation_vector\n')
        for dataset in samples:
            for prediction in samples[dataset]:
                cluster_name = 'outliers' if prediction.cluster_index is None else f'cluster{prediction.cluster_index}'
                rotation = none_to_str(9) if prediction.rotation is None else array_to_str(prediction.rotation.flatten())
                translation = none_to_str(3) if prediction.translation is None else array_to_str(prediction.translation)
                f.write(f'{prediction.image_id},{prediction.dataset},{cluster_name},{prediction.filename},{rotation},{translation}\n')

!head {submission_file}

In [None]:
# Definitely Compute results if running on the training set.
# Do not do this when submitting a notebook for scoring. All you have to do is save your submission to /kaggle/working/submission.csv.
is_train = True
if is_train:
    import metric
    final_score, dataset_scores = metric.score(
        gt_csv='data/image-matching-challenge-2025/train_labels.csv',
        user_csv=submission_file,
        thresholds_csv='data/image-matching-challenge-2025/train_thresholds.csv',
        mask_csv=None if is_train else os.path.join(data_dir, 'mask.csv'),
        inl_cf=0,
        strict_cf=-1,
        verbose=True,
    )