In [1]:
# General utilities
import os
from tqdm import tqdm
from time import time
from fastprogress import progress_bar
import gc
import math
import numpy as np
from IPython.display import clear_output
from collections import defaultdict
from copy import deepcopy
import matplotlib.pyplot as plt
import concurrent.futures

# CV/ML
import cv2
import torch
import torch.nn.functional as F
import kornia as K
import kornia.feature as KF
from PIL import Image
import timm
from timm.data import resolve_data_config
from timm.data.transforms_factory import create_transform

# 3D reconstruction
import pycolmap

import sys
import sqlite3
import numpy as np

import os, argparse, h5py, warnings
import numpy as np
from tqdm import tqdm
from PIL import Image, ExifTags

print("Kornia version", K.__version__)
print("Pycolmap version", pycolmap.__version__)

caused by: ['/opt/conda/lib/python3.10/site-packages/tensorflow_io/python/ops/libtensorflow_io_plugins.so: undefined symbol: _ZN3tsl6StatusC1EN10tensorflow5error4CodeESt17basic_string_viewIcSt11char_traitsIcEENS_14SourceLocationE']
caused by: ['/opt/conda/lib/python3.10/site-packages/tensorflow_io/python/ops/libtensorflow_io.so: undefined symbol: _ZTVN10tensorflow13GcsFileSystemE']


Kornia version 0.6.12
Pycolmap version 0.4.0


# Configs

In [2]:
NUM_CORES = 2
SRC = "/kaggle/input/image-matching-challenge-2023"
MODEL_DIR = "/kaggle/input/kornia-local-feature-weights/"
HARDNET_PT = "/kaggle/input/hardnet8v2/hardnet8v2.pt"

MODEL_DICT = {
    "Keynet": {"enable": True, "resize_long_edge_to": 1600},
    "GFTT":   {"enable": True, "resize_long_edge_to": 1600},
    "DoG":    {"enable": True, "resize_long_edge_to": 1600},
    "Harris": {"enable": True, "resize_long_edge_to": 1600},
}

MATCH_FILTER_RATIO = 0.01

device = torch.device("cuda")
print(torch.cuda.is_available())

True


# datadict setup

In [3]:
data_dict = {}

with open(f"{SRC}/sample_submission.csv", "r") as f:
    for i, l in enumerate(f):
        if l and i > 0:
            image, dataset, scene, _, _ = l.strip().split(",")
            if dataset not in data_dict:
                data_dict[dataset] = {}
            if scene not in data_dict[dataset]:
                data_dict[dataset][scene] = []
            data_dict[dataset][scene].append(image)

all_scenes = []
scene_len = []

for dataset in data_dict:
    for scene in data_dict[dataset]:
        print(f"{dataset} / {scene} -> {len(data_dict[dataset][scene])} images")
        all_scenes.append((dataset, scene))
        scene_len.append(len(data_dict[dataset][scene]))

all_scenes = [x for _, x in sorted(zip(scene_len, all_scenes), reverse=True)]

2cfa01ab573141e4 / 2fa124afd1f74f38 -> 3 images
Reconstruction order: 
 --2cfa01ab573141e4 / 2fa124afd1f74f38


# Image loading and Resizing

In [5]:
def load_torch_image(fname, device=torch.device("cpu")):
    img = K.image_to_tensor(cv2.imread(fname), False).float() / 255.0
    img = K.color.bgr_to_rgb(img.to(device))
    return img

def resize_torch_image(
    timg, resize_long_edge_to=None
):
    h, w = timg.shape[2:]
    raw_size = torch.tensor(timg.shape[2:])

    scale = float(resize_long_edge_to) / float(max(raw_size[0], raw_size[1]))
    scale = min(scale, 1)

    h_resized = int(h * scale)
    w_resized = int(w * scale)

    scale_h = h_resized / h
    scale_w = w_resized / w

    timg_resized = K.geometry.resize(timg, (h_resized, w_resized), antialias = True)
    return timg_resized, scale_h, scale_w

# Colmap

In [7]:
MAX_IMAGE_ID = 2**31 - 1

CREATE_CAMERAS_TABLE = """CREATE TABLE IF NOT EXISTS cameras (
    camera_id INTEGER PRIMARY KEY AUTOINCREMENT NOT NULL,
    model INTEGER NOT NULL,
    width INTEGER NOT NULL,
    height INTEGER NOT NULL,
    params BLOB,
    prior_focal_length INTEGER NOT NULL)"""

CREATE_IMAGES_TABLE = """CREATE TABLE IF NOT EXISTS images (
    image_id INTEGER PRIMARY KEY AUTOINCREMENT NOT NULL,
    name TEXT NOT NULL UNIQUE,
    camera_id INTEGER NOT NULL,
    prior_qw REAL,
    prior_qx REAL,
    prior_qy REAL,
    prior_qz REAL,
    prior_tx REAL,
    prior_ty REAL,
    prior_tz REAL,
    CONSTRAINT image_id_check CHECK(image_id >= 0 and image_id < {}),
    FOREIGN KEY(camera_id) REFERENCES cameras(camera_id))
""".format(
    MAX_IMAGE_ID
)

CREATE_TWO_VIEW_GEOMETRIES_TABLE = """
CREATE TABLE IF NOT EXISTS two_view_geometries (
    pair_id INTEGER PRIMARY KEY NOT NULL,
    rows INTEGER NOT NULL,
    cols INTEGER NOT NULL,
    data BLOB,
    config INTEGER NOT NULL,
    F BLOB,
    E BLOB,
    H BLOB)
"""

CREATE_KEYPOINTS_TABLE = """CREATE TABLE IF NOT EXISTS keypoints (
    image_id INTEGER PRIMARY KEY NOT NULL,
    rows INTEGER NOT NULL,
    cols INTEGER NOT NULL,
    data BLOB,
    FOREIGN KEY(image_id) REFERENCES images(image_id) ON DELETE CASCADE)
"""

CREATE_MATCHES_TABLE = """CREATE TABLE IF NOT EXISTS matches (
    pair_id INTEGER PRIMARY KEY NOT NULL,
    rows INTEGER NOT NULL,
    cols INTEGER NOT NULL,
    data BLOB)"""

CREATE_NAME_INDEX = "CREATE UNIQUE INDEX IF NOT EXISTS index_name ON images(name)"

CREATE_ALL = "; ".join(
    [
        CREATE_CAMERAS_TABLE,
        CREATE_IMAGES_TABLE,
        CREATE_KEYPOINTS_TABLE,
        CREATE_MATCHES_TABLE,
        CREATE_TWO_VIEW_GEOMETRIES_TABLE,
        CREATE_NAME_INDEX,
    ]
)

def image_ids_to_pair_id(image_id1, image_id2):
    if image_id1 > image_id2:
        image_id1, image_id2 = image_id2, image_id1
    return image_id1 * MAX_IMAGE_ID + image_id2

def array_to_blob(array):
    return array.tostring()


class COLMAPDatabase(sqlite3.Connection):
    @staticmethod
    def connect(database_path):
        return sqlite3.connect(database_path, factory=COLMAPDatabase)

    def __init__(self, *args, **kwargs):
        super(COLMAPDatabase, self).__init__(*args, **kwargs)
        self.create_tables = lambda: self.executescript(CREATE_ALL)
        self.create_cameras_table = lambda: self.executescript(CREATE_CAMERAS_TABLE)
        self.create_images_table = lambda: self.executescript(CREATE_IMAGES_TABLE)
        self.create_two_view_geometries_table = lambda: self.executescript(CREATE_TWO_VIEW_GEOMETRIES_TABLE)
        self.create_keypoints_table = lambda: self.executescript(CREATE_KEYPOINTS_TABLE)
        self.create_matches_table = lambda: self.executescript(CREATE_MATCHES_TABLE)
        self.create_name_index = lambda: self.executescript(CREATE_NAME_INDEX)

    def add_camera(
        self, model, width, height, params, prior_focal_length=False, camera_id=None
    ):
        params = np.asarray(params, np.float64)
        cursor = self.execute(
            "INSERT INTO cameras VALUES (?, ?, ?, ?, ?, ?)",
            (
                camera_id,
                model,
                width,
                height,
                array_to_blob(params),
                prior_focal_length,
            ),
        )
        return cursor.lastrowid

    def add_image(
        self, name, camera_id, prior_q=np.zeros(4), prior_t=np.zeros(3), image_id=None
    ):
        cursor = self.execute(
            "INSERT INTO images VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
            (
                image_id,
                name,
                camera_id,
                prior_q[0],
                prior_q[1],
                prior_q[2],
                prior_q[3],
                prior_t[0],
                prior_t[1],
                prior_t[2],
            ),
        )
        return cursor.lastrowid

    def add_keypoints(self, image_id, keypoints):
        assert len(keypoints.shape) == 2
        assert keypoints.shape[1] in [2, 4, 6]

        keypoints = np.asarray(keypoints, np.float32)
        self.execute(
            "INSERT INTO keypoints VALUES (?, ?, ?, ?)",
            (image_id,) + keypoints.shape + (array_to_blob(keypoints),),
        )

    def add_matches(self, image_id1, image_id2, matches):
        assert len(matches.shape) == 2
        assert matches.shape[1] == 2

        if image_id1 > image_id2:
            matches = matches[:, ::-1]

        pair_id = image_ids_to_pair_id(image_id1, image_id2)
        matches = np.asarray(matches, np.uint32)
        self.execute(
            "INSERT INTO matches VALUES (?, ?, ?, ?)",
            (pair_id,) + matches.shape + (array_to_blob(matches),),
        )

    def add_two_view_geometry(
        self,
        image_id1,
        image_id2,
        matches,
        F=np.eye(3),
        E=np.eye(3),
        H=np.eye(3),
        config=2,
    ):
        assert len(matches.shape) == 2
        assert matches.shape[1] == 2

        if image_id1 > image_id2:
            matches = matches[:, ::-1]

        pair_id = image_ids_to_pair_id(image_id1, image_id2)
        matches = np.asarray(matches, np.uint32)
        F = np.asarray(F, dtype=np.float64)
        E = np.asarray(E, dtype=np.float64)
        H = np.asarray(H, dtype=np.float64)
        self.execute(
            "INSERT INTO two_view_geometries VALUES (?, ?, ?, ?, ?, ?, ?, ?)",
            (pair_id,)
            + matches.shape
            + (
                array_to_blob(matches),
                config,
                array_to_blob(F),
                array_to_blob(E),
                array_to_blob(H),
            ),
        )

# Image Focal Length, Camera Model, Adding Key Points and Matches. Add to Database.

In [8]:
def get_focal(image_path):
    """
      Obtain the focal length of the image.

        :param image_path: The path of the image.
        :return: Return the focal length and a boolean value indicating whether the focal length comes from Exif information.
    """
    image = Image.open(image_path)
    max_size = max(image.size)
    exif = image.getexif()
    exif_ifd = exif.get_ifd(0x8769)
    exif.update(exif_ifd)

    focal = None
    is_from_exif = False
    if exif is not None:
        focal_35mm = None
        for tag, value in exif.items():
            focal_35mm = None
            if ExifTags.TAGS.get(tag, None) == "FocalLengthIn35mmFilm":
                focal_35mm = float(value)
                is_from_exif = True
                break

        if focal_35mm is not None:
            focal = focal_35mm / 35.0 * max_size

    if focal is None:
        FOCAL_PRIOR = 1.2
        focal = FOCAL_PRIOR * max_size

    return focal, is_from_exif


def create_camera(db, image_path, camera_model):
    """
    Create a camera model.

    :param db: The database object.
    :param image_path: The path of the image.
    :param camera_model: The camera model type, could be "simple-pinhole", "pinhole", "simple-radial" or "opencv".
    :return: Return the ID of the created camera model.
    """
    image = Image.open(image_path)
    width, height = image.size

    focal, is_from_exif = get_focal(image_path)

    if camera_model == "simple-pinhole":
        model = 0  # simple pinhole
        param_arr = np.array([focal, width / 2, height / 2])
    if camera_model == "pinhole":
        model = 1  # pinhole
        param_arr = np.array([focal, focal, width / 2, height / 2])
    elif camera_model == "simple-radial":
        model = 2  # simple radial
        param_arr = np.array([focal, width / 2, height / 2, 0.1])
    elif camera_model == "opencv":
        model = 4  # opencv
        param_arr = np.array([focal, focal, width / 2, height / 2, 0.0, 0.0, 0.0, 0.0])

    return db.add_camera(
        model, width, height, param_arr, prior_focal_length=is_from_exif
    )


def add_kpts_matches(db, img_dir, kpts, matches, fms=None):
    """
    Add keypoints and matches.

    :param db: The database object.
    :param img_dir: The directory of the image.
    :param kpts: The dictionary of keypoints, with the filename as the key and keypoints as the value.
    :param matches: The dictionary of matches, with the filename as the key and matches as the value.
    :param fms: Fundamental matrices, optional parameter.
    """
    fname_to_id = {}

    for filename in tqdm(kpts):
        path = os.path.join(img_dir, filename)
        camera_model = "simple-radial"
        camera_id = create_camera(db, path, camera_model)
        image_id = db.add_image(filename, camera_id)
        fname_to_id[filename] = image_id
        db.add_keypoints(image_id, kpts[filename])

        n_keys = len(matches)
        n_total = (n_keys * (n_keys - 1)) // 2

    added = set()
    with tqdm(total=n_total) as pbar:
        for key1 in matches:
            for key2 in matches[key1]:
                id_1 = fname_to_id[key1]
                id_2 = fname_to_id[key2]
                pair_id = image_ids_to_pair_id(id_1, id_2)
                if pair_id in added:
                    warnings.warn(f"Pair {pair_id} ({id_1}, {id_2}) already added!")
                    continue
                db.add_matches(id_1, id_2, matches[key1][key2])
                added.add(pair_id)
                pbar.update(1)

                if fms is not None:
                    db.add_two_view_geometry(id_1, id_2, matches[key1][key2], fms[key1][key2], np.eye(3), np.eye(3))

    db.commit()


# MyModel

In [10]:
class MyModel(KF.LocalFeature):
    """
    The model integrates detector + AffNet + HardNet descriptor.
    """

    def __init__(
        self,
        num_features: int = 5000,
        device=torch.device("cpu"),
        detector = "keynet"
    ):
        detector_options = ["keynet", "GFTT", "Hessian", "Harris", "DoG"]
        if detector not in detector_options:
            raise ValueError("Detector must be one of {}".format(detector_options))

        ori_module = (KF.LAFOrienter(angle_detector=KF.OriNet(False)).eval())
        ori_module.angle_detector.load_state_dict(torch.load(os.path.join(MODEL_DIR, "OriNet.pth"))["state_dict"])

        if detector == "keynet":
            detector = KF.KeyNetDetector(
                False,
                num_features=num_features,
                ori_module=ori_module,
                aff_module=KF.LAFAffNetShapeEstimator(False).eval(),
            ).to(device)

            detector.model.load_state_dict(torch.load(os.path.join(MODEL_DIR, "keynet_pytorch.pth"))["state_dict"])
        
        elif detector == "GFTT":
            detector = KF.MultiResolutionDetector(
                KF.CornerGFTT(),
                num_features=num_features,
                ori_module=ori_module,
                aff_module=KF.LAFAffNetShapeEstimator(False).eval(),
            ).to(device)

        elif detector == "Harris":
            detector = KF.MultiResolutionDetector(
                KF.CornerHarris(0.04),
                num_features=num_features,
                ori_module=ori_module,
                aff_module=KF.LAFAffNetShapeEstimator(False).eval(),
            ).to(device)

        elif detector == "DoG":
            detector = KF.MultiResolutionDetector(
                KF.BlobDoGSingle(),
                num_features=num_features,
                ori_module=ori_module,
                aff_module=KF.LAFAffNetShapeEstimator(False).eval(),
            ).to(device)

        detector.aff.load_state_dict(torch.load(os.path.join(MODEL_DIR, "AffNet.pth"))["state_dict"])

        hardnet8 = KF.HardNet8(False).eval()
        hardnet8.load_state_dict(torch.load(HARDNET_PT))

        descriptor = KF.LAFDescriptor(
            hardnet8, patch_size=32, grayscale_descriptor=True
        ).to(device)
        super().__init__(detector, descriptor)

# ModelDetector

In [12]:
class ModelDetector:
    def __init__(
        self,
        model,
        device=torch.device("cuda"),
        resize_long_edge_to=600,
    ):
        self.model = model
        self.device = device
        self.resize_long_edge_to = resize_long_edge_to
        print("Longer edge will be resized to", self.resize_long_edge_to)

    def detect_features(self, img_fnames): 
        """
        Perform feature detection on the given list of images and return the detection results.

        :param img_fnames: List containing image file names
        :return: Feature data in the form of a dictionary.
        """
        f_lafs = dict()
        f_kpts = dict()
        f_descs = dict()
        f_raw_size = dict()
        print("Detecting features")
        for img_path in tqdm(img_fnames):
            img_fname = img_path.split("/")[-1]
            key = img_fname
            with torch.inference_mode():
                timg = load_torch_image(img_path, device=device)
                raw_size = torch.tensor(timg.shape[2:])
                timg_resized, h_scale, w_scale = resize_torch_image(
                    timg, self.resize_long_edge_to
                )
                lafs, _, descs = self.model(K.color.rgb_to_grayscale(timg_resized))  
                
                # Recover scale
                lafs[:, :, 0, :] *= 1 / w_scale
                lafs[:, :, 1, :] *= 1 / h_scale
                desc_dim = descs.shape[-1]
                # Move keypoints to cpu for later colmap operations
                kpts = KF.get_laf_center(lafs).reshape(-1, 2).detach().cpu().numpy()
                descs = descs.reshape(-1, desc_dim).detach()
                f_lafs[key] = lafs.detach()
                f_kpts[key] = kpts
                f_descs[key] = descs
                f_raw_size[key] = raw_size
        gc.collect()  
        torch.cuda.empty_cache()
        return f_lafs, f_kpts, f_descs, f_raw_size

# Laf Matcher

In [None]:
def get_unique_idxs(A, dim=0):
    """
    Extracts the unique elements from the PyTorch tensor, and returns the indices of their first occurrence in the original tensor.

    Parameters:
    A (torch.Tensor): The input PyTorch tensor.
    dim (int, optional): The dimension of operation. Default is 0.

    Returns:
    first_indices (torch.Tensor): The tensor of indices where unique elements first occur in the original tensor.
    """

    unique, idx, counts = torch.unique(
        A, dim=dim, sorted=True, return_inverse=True, return_counts=True
    )

    _, ind_sorted = torch.sort(idx, stable=True)

    cum_sum = counts.cumsum(0)

    cum_sum = torch.cat((torch.tensor([0], device=cum_sum.device), cum_sum[:-1]))

    first_indices = ind_sorted[cum_sum]
    
    return first_indices


In [13]:
class LafMatcher:
    def __init__(self, min_matches=15, device="cuda"):
'''
        Initialize LafMatcher class

        min_matches: Minimum number of matches
        device: Computing device, default is "cuda"
        matcher: Matching method, default is "adalam"
        Initialize adalam configuration
'''
        self.adalam_config = KF.adalam.get_adalam_default_config()
        self.adalam_config["force_seed_mnn"] = True
        self.adalam_config["search_expansion"] = 16
        self.adalam_config["ransac_iters"] = 256
        self.adalam_config["device"] = device
        self.min_matches = min_matches
        self.matcher = "adalam"

    def match(self, img_fnames, f_lafs, f_kpts, f_descs, f_raw_size):
        """
        Execute the matching operation.
        img_fnames: List of image filenames.
        f_lafs: Intrinsic affine features of the image.
        f_kpts: Keypoints of the image.
        f_descs: Descriptors of the image.
        f_raw_size: Original size of the image.
        get_roi: Whether to compute the Region of Interest (ROI), default is False.
        """

        num_imgs = len(img_fnames)
        print("Matching to get index pairs")
        pair_count = 0
        f_matches = defaultdict(dict)
        f_rois = defaultdict(dict)

        for idx1 in tqdm(range(num_imgs - 1)):
            for idx2 in range(idx1 + 1, num_imgs):
                fname1, fname2 = img_fnames[idx1], img_fnames[idx2]
                key1, key2 = fname1.split("/")[-1], fname2.split("/")[-1]
                lafs1 = f_lafs[key1]
                lafs2 = f_lafs[key2]
                desc1 = f_descs[key1]
                desc2 = f_descs[key2]
                
                hw1, hw2 = f_raw_size[key1], f_raw_size[key2]

                # Match using adalam
                # dists: Each element in the tensor is a distance, representing the distance between the i-th feature point of the first image and the j-th feature point of the second image.
                # idxs: Each element in the tensor is an index pair (i, j), indicating that the i-th feature point of the first image is matched with the j-th feature point of the second image.
                dists, idxs = KF.match_adalam(
                    desc1,
                    desc2,
                    lafs1,
                    lafs2,  
                    hw1=hw1,
                    hw2=hw2,
                    config=self.adalam_config,
                )

                if dists.mean().cpu().numpy() < 0.5:
                    first_indices = get_unique_idxs(idxs[:, 1])
                    idxs = idxs[first_indices]
                    dists = dists[first_indices]
                    n_matches = len(idxs)

                    if n_matches >= self.min_matches:
                        pair_count += 1
                        f_matches[key1][key2] = (
                            idxs.detach().cpu().numpy().reshape(-1, 2)
                        )

        print(f" Get {pair_count} from {int(num_imgs * (num_imgs-1)/2)} possible pairs")
        torch.cuda.empty_cache()
        gc.collect()
        return f_kpts, f_matches

# Model Setup

In [16]:
if MODEL_DICT["Keynet"]["enable"]:
    keynet_model = (
        MyModel(num_features=8000, device=device, detector="keynet")
        .to(device)
        .eval()
    )
    keynet_detector = ModelDetector(keynet_model, resize_long_edge_to=MODEL_DICT["Keynet"]["resize_long_edge_to"])

if MODEL_DICT["GFTT"]["enable"]:
    gftt_model = (
        MyModel(num_features=8000, device=device, detector="GFTT")
        .to(device)
        .eval()
    )
    gftt_detector = ModelDetector(gftt_model, resize_long_edge_to=MODEL_DICT["GFTT"]["resize_long_edge_to"])

if MODEL_DICT["DoG"]["enable"]:
    DoG_model = (
        MyModel(num_features=8000, device=device, detector="DoG")
        .to(device)
        .eval()
    )
    DoG_detector = ModelDetector(DoG_model, resize_long_edge_to=MODEL_DICT["DoG"]["resize_long_edge_to"])

if MODEL_DICT["Harris"]["enable"]:
    harris_model = (
        MyModel(num_features=8000, device=device, detector="Harris")
        .to(device)
        .eval()
    )
    harris_detector = ModelDetector(harris_model, resize_long_edge_to=MODEL_DICT["Harris"]["resize_long_edge_to"])

laf_matcher = LafMatcher(device=device)

  aff_module=KF.LAFAffNetShapeEstimator(False).eval(),


Init AffNetHardNetDetector
Longer edge will be resized to 1600


  aff_module=KF.LAFAffNetShapeEstimator(False).eval(),


Init AffNetHardNetDetector
Longer edge will be resized to 1600


  aff_module=KF.LAFAffNetShapeEstimator(False).eval(),


Init AffNetHardNetDetector
Longer edge will be resized to 1600


  aff_module=KF.LAFAffNetShapeEstimator(False).eval(),


Init AffNetHardNetDetector
Longer edge will be resized to 1600


# Merge keypoints and matches， Compute fundamental matrix， Filter matching points.

In [14]:
def merge_kpts_matches(kpts, matches, new_kpts, new_matches):
    """
    Merge keypoints (kpts) and matches.

    Parameters:
    kpts: dict, original keypoints.
    matches: dict, original matching points.
    new_kpts: dict, new keypoints.
    new_matches: dict, new matching points.

    Returns:
    tuple: A tuple containing merged keypoints and matching points.
    """
    prev_len = dict()

    for new_key in new_kpts:
        if new_key in kpts:
            old_len = len(kpts[new_key])
            kpts[new_key] = np.concatenate([kpts[new_key], new_kpts[new_key]], axis=0)
        else:
            old_len = 0
            kpts[new_key] = new_kpts[new_key]
        prev_len[new_key] = old_len

    for new_key1 in new_matches:
        for new_key2 in new_matches[new_key1]:
            old_len1 = prev_len[new_key1]
            old_len2 = prev_len[new_key2]
            new_match = new_matches[new_key1][new_key2] + [old_len1, old_len2]

            if new_key1 in matches and new_key2 in matches[new_key1]:
                matches[new_key1][new_key2] = np.concatenate(
                    [
                        matches[new_key1][new_key2],
                        new_match,
                    ],
                    axis=0,
                )
            else:
                if new_key1 not in matches:
                    matches[new_key1] = dict()
                matches[new_key1][new_key2] = new_match
    return kpts, matches


def get_fms(kpts, matches):
    """
    Compute the fundamental matrix (Fundamental Matrix) based on the input keypoints and matching points.
    The fundamental matrix is a 3x3 matrix that represents the geometric relationship between two cameras (or two viewpoints) (the two cameras capture two different viewpoints of the same scene).
    """

    fms = defaultdict(dict)
    
    print("Get Fundamental Matrix")

    for key1 in tqdm(matches):
        for key2 in matches[key1]:

            match = matches[key1][key2]

            mkpts1 = kpts[key1][match[:, 0]]
            mkpts2 = kpts[key2][match[:, 1]]

            Fm, inliers = cv2.findFundamentalMat(mkpts1, mkpts2, cv2.USAC_MAGSAC, 5, 0.9999, 50000)
            new_match = match[inliers.ravel() == 1]
            matches[key1][key2] = new_match
            fms[key1][key2] = Fm

    return kpts, matches, fms



def select_matches(matches, keep_ratio = 0.01):
    """
    Select matching points.

    Parameters:
    matches: dict, original matching points.
    keep_ratio: float, proportion of matching points to retain, default is 0.01.

    Returns:
    dict: Retained matching points.
    """
    max_matches = defaultdict(int)
    old_matches_count = 0
    for key1 in matches:
        for key2 in matches[key1]:
            max_matches[key1] = max(max_matches[key1], len(matches[key1][key2]))
            max_matches[key2] = max(max_matches[key2], len(matches[key1][key2]))
            old_matches_count +=1

    new_matches_count = 0
    new_matches = defaultdict(dict)
    for key1 in matches:
        for key2 in matches[key1]:
            n_matches = len(matches[key1][key2])
            if n_matches > max_matches[key1] * keep_ratio or n_matches > max_matches[key2] * keep_ratio:
                new_matches[key1][key2] = matches[key1][key2]
                new_matches_count+=1

    return new_matches

# Feature detection and matching, results written to the database.

In [18]:
def generate_scene_db(dataset, scene):
    """
    Perform feature detection and matching on a 3D scene, then write the results to the database.

    Parameters:
    dataset -- Name of the dataset
    scene -- Name of the scene

    Returns:
    matching_time -- Time taken for feature detection and matching.
    """

    feature_det_start = time()

    img_dir = f"{SRC}/test/{dataset}/{scene}/images"
    if not os.path.exists(img_dir):
        print("Image dir does not exist:", img_dir)
        return

    img_fnames = [f"{SRC}/test/{x}" for x in data_dict[dataset][scene]]
    print(f"Got {len(img_fnames)} images")

    matches = dict()
    kpts = dict()

    if MODEL_DICT["Keynet"]["enable"]:
        f_lafs, f_kpts, f_descs, f_raw_size = keynet_detector.detect_features(img_fnames)
        keynet_kpts, keynet_matches = laf_matcher.match(
            img_fnames, f_lafs, f_kpts, f_descs, f_raw_size
        )
        kpts, matches = merge_kpts_matches(kpts, matches, keynet_kpts, keynet_matches)

    if MODEL_DICT["GFTT"]["enable"]:
        gftt_lafs, gftt_kpts, gftt_descs, gftt_raw_size = gftt_detector.detect_features(img_fnames)
        gftt_kpts, gftt_matches = laf_matcher.match(
            img_fnames, gftt_lafs, gftt_kpts, gftt_descs, gftt_raw_size
        )
        kpts, matches = merge_kpts_matches(kpts, matches, gftt_kpts, gftt_matches)

    if MODEL_DICT["DoG"]["enable"]:
        DoG_lafs, DoG_kpts, DoG_descs, DoG_raw_size = DoG_detector.detect_features(img_fnames)
        DoG_kpts, DoG_matches = laf_matcher.match(
            img_fnames, DoG_lafs, DoG_kpts, DoG_descs, DoG_raw_size
        )
        kpts, matches = merge_kpts_matches(kpts, matches, DoG_kpts, DoG_matches)

    if MODEL_DICT["Harris"]["enable"]:
        harris_lafs, harris_kpts, harris_descs, harris_raw_size = harris_detector.detect_features(img_fnames)
        harris_kpts, harris_matches = laf_matcher.match(
            img_fnames, harris_lafs, harris_kpts, harris_descs, harris_raw_size
        )
        kpts, matches = merge_kpts_matches(kpts, matches, harris_kpts, harris_matches)

    kpts, matches, fms = get_fms(kpts, matches)

    matches = select_matches(matches, MATCH_FILTER_RATIO)

    feature_dir = f"featureout/{dataset}_{scene}"
    os.makedirs(feature_dir, exist_ok=True)
    database_path = f"{feature_dir}/colmap.db"
    if os.path.isfile(database_path):
        os.remove(database_path)

    db = COLMAPDatabase.connect(database_path)
    db.create_tables()
    print("Add kpts and matches to database")
    add_kpts_matches(db, img_dir, kpts, matches, fms)
    feature_det_end = time()
    matching_time = feature_det_end - feature_det_start
    torch.cuda.empty_cache()
    gc.collect()

    return matching_time

# Reconstruct 3D model.

In [19]:
def reconstruct_from_db(dataset, scene):
    """
    Reconstruct a 3D model from the database.

    Parameters:
    dataset (str): Name of the dataset.
    scene (str): Name of the scene.

    Returns:
    tuple: The reconstruction results and the time taken for reconstruction.
    """

    scene_result = {}
    reconst_start = time()

    img_dir = f"{SRC}/test/{dataset}/{scene}/images"
    if not os.path.exists(img_dir):
        print("Image dir does not exist:", img_dir)
        return

    database_path = f"featureout/{dataset}_{scene}/colmap.db"
    db = COLMAPDatabase.connect(database_path)
    output_path = f"featureout/{dataset}_{scene}/colmap_rec"
    os.makedirs(output_path, exist_ok=True)

    gc.collect()
    t = time()

    mapper_options = pycolmap.IncrementalMapperOptions()
    mapper_options.min_model_size = 3

    maps = pycolmap.incremental_mapping(
        database_path=database_path,
        image_path=img_dir,
        output_path=output_path,
        options=mapper_options,
    )
    print(maps)
    t = time() - t
    print(f"Reconstruction done in  {t:.4f} sec")


    imgs_registered = 0
    best_idx = None
    print("Looking for the best reconstruction")
    if isinstance(maps, dict):
        for idx1, rec in maps.items():
            if len(rec.images) > imgs_registered:
                imgs_registered = len(rec.images)
                best_idx = idx1

    if best_idx is not None:
        for k, im in maps[best_idx].images.items():
            key1 = f"{dataset}/{scene}/images/{im.name}"
            scene_result[key1] = {}
            scene_result[key1]["R"] = deepcopy(im.rotmat())
            scene_result[key1]["t"] = deepcopy(np.array(im.tvec))

    gc.collect()
    reconst_end = time()
    reconst_time = reconst_end - reconst_start
    return scene_result, reconst_time

# Main Loop

In [20]:
datasets = []
time_dict = dict()

for dataset in data_dict:
    datasets.append(dataset)
out_results = defaultdict(dict)
total_start = time()

with concurrent.futures.ProcessPoolExecutor(max_workers=NUM_CORES) as executors:
    futures = defaultdict(dict)

    for dataset, scene in all_scenes:
        print(dataset, scene)
        time_dict["matching-" + scene] = generate_scene_db(dataset, scene)
        futures[dataset][scene] = executors.submit(reconstruct_from_db, dataset, scene)

    for dataset, scene in all_scenes:
        result = futures[dataset][scene].result()
        if result is not None:
            out_results[dataset][scene], time_dict["reconst-" + scene] = result
total_end = time()
time_dict["TOTAL"] = total_end - total_start

2cfa01ab573141e4 2fa124afd1f74f38
Image dir does not exist: /kaggle/input/image-matching-challenge-2023/test/2cfa01ab573141e4/2fa124afd1f74f38/images
Image dir does not exist: /kaggle/input/image-matching-challenge-2023/test/2cfa01ab573141e4/2fa124afd1f74f38/images


In [22]:
def arr_to_str(a):
    return ";".join([str(x) for x in a.reshape(-1)])

def create_submission(out_results, data_dict):
    with open("submission.csv", "w") as f:
        # 写入列名
        f.write("image_path,dataset,scene,rotation_matrix,translation_vector\n")
        for dataset in data_dict:
            if dataset in out_results:
                res = out_results[dataset]
            else:
                res = {}
            for scene in data_dict[dataset]:
                if scene in res:
                    scene_res = res[scene]
                else:
                    scene_res = {"R": {}, "t": {}}
                for image in data_dict[dataset][scene]:
                    if image in scene_res:
                        print(image)
                        R = scene_res[image]["R"].reshape(-1)
                        T = scene_res[image]["t"].reshape(-1)
                    else:
                        R = np.eye(3).reshape(-1)
                        T = np.zeros((3))
                    f.write(
                        f"{image},{dataset},{scene},{arr_to_str(R)},{arr_to_str(T)}\n"
                    )

create_submission(out_results, data_dict)