In [1]:
#environment configuration：when kaggle GPU running，closeInternet
#relative link：
#https://www.kaggle.com/competitions/image-matching-challenge-2022
#https://www.kaggle.com/datasets/gufanmingmie/dkm-dependecies
#https://www.kaggle.com/datasets/ammarali32/kornia-loftr

# ***Install Libs***
### loadkornia&kornia_moons

In [2]:
dry_run = False
!pip install ../input/kornia-loftr/kornia-0.6.4-py2.py3-none-any.whl
!pip install ../input/kornia-loftr/kornia_moons-0.1.9-py3-none-any.whl

# ***Import dependencies***

In [3]:
#load basic operating framwork
import os
import numpy as np
import cv2
import csv
from glob import glob
import torch
import matplotlib.pyplot as plt
from PIL import Image
import kornia
from kornia_moons.feature import *
import kornia as K
import kornia.feature as KF
import gc
import sys, os, csv
sys.path.append('../input/dkm-dependecies/DKM/')

In [4]:
#load dkm
!mkdir -p pretrained/checkpoints
!cp ../input/dkm-dependecies/pretrained/dkm.pth pretrained/checkpoints/dkm_base_v11.pth

!pip install -f ../input/dkm-dependecies/wheels --no-index einops
!cp -r ../input/dkm-dependecies/DKM/ /kaggle/working/DKM/
!cd /kaggle/working/DKM/; pip install -f ../input/dkm-dependecies/wheels -e . 

# ***Model***

In [5]:
#loftr load
device = torch.device('cuda')#use GPU
matcher = KF.LoFTR(pretrained=None)# load LoFTR structure
matcher.load_state_dict(torch.load("../input/kornia-loftr/loftr_outdoor.ckpt")['state_dict'])#load weight
matcher = matcher.to(device).eval()#trasnfer to GPU

IMG_MAX_SIZE = [1280, 840]#use two sacles to TTA（Test-time augment）infer
DO_FLIP = True #decide image flip
MAX_NUM_PAIRS = 8000 #max matching points

##load dkm
torch.hub.set_dir('/kaggle/working/pretrained/')
from dkm import dkm_base
dkm_model = dkm_base(pretrained=True, version="v11").to(device).eval()

## *Utils*

In [6]:
src = '/kaggle/input/image-matching-challenge-2022/'

#load test info
test_samples = []
with open(f'{src}/test.csv') as f:
    reader = csv.reader(f, delimiter=',')
    for i, row in enumerate(reader):
        # Skip header.
        if i == 0:
            continue
        test_samples += [row]

#flattern matrix
def FlattenMatrix(M, num_digits=8):
    '''Convenience function to write CSV files.'''
    
    return ' '.join([f'{v:.{num_digits}e}' for v in M.flatten()])


#loftr
def load_torch_image(fname, device, resize=True, img_max_size=1280):
    img = cv2.imread(fname)
    img_sz = img.shape
    if img_max_size <=0:
        resize = False
    #change shape
    if resize:
        #use leargets scalescale
        scale = img_max_size / max(img.shape[0], img.shape[1]) 
    else:
        scale = 1
    w = int(img.shape[1] * scale)
    h = int(img.shape[0] * scale)
    #normalization，channel
    img = cv2.resize(img, ((w//8)*8, (h//8)*8))
    img = K.image_to_tensor(img, False).float() /255.
    img = K.color.bgr_to_rgb(img)
    return img.to(device), img_sz

#single match
def match_same_size(img_path0, img_path1, matcher, device=device, flip=False, img_max_size=1280):
    img0, source_img_shape = load_torch_image(img_path0, device, img_max_size=img_max_size)
    img1, target_img_shape = load_torch_image(img_path1, device, img_max_size=img_max_size)
    input_dict = {"image0": K.color.rgb_to_grayscale(img0).to(device), 
                  "image1": K.color.rgb_to_grayscale(img1).to(device)}
    #
    if flip:
        input_dict["image0"] = torch.cat((input_dict["image0"], torch.flip(input_dict["image0"], dims=[3])), dim=0)
        input_dict["image1"] = torch.cat((input_dict["image1"], torch.flip(input_dict["image1"], dims=[3])), dim=0)
        
    #infer    
    with torch.no_grad():
        correspondences = matcher(input_dict)
    
    # flip coordinate
    #extract important info
    # https://github.com/kornia/kornia/blob/master/kornia/feature/loftr/loftr.py
    keypoints0 = correspondences["keypoints0"].cpu().numpy()
    keypoints1 = correspondences["keypoints1"].cpu().numpy()
    confidence = correspondences["confidence"].cpu().numpy()
    batch_indexes = correspondences["batch_indexes"].cpu().numpy()
    if flip:
        select_fliped = (batch_indexes == 1)
        keypoints0[select_fliped, 0] = input_dict["image0"].shape[-1] - keypoints0[select_fliped, 0]
        keypoints1[select_fliped, 0] = input_dict["image1"].shape[-1] - keypoints1[select_fliped, 0]
    
    # scale
    #chaneg back to origin
    new_source_img_shape = input_dict["image0"].shape[-2:]
    new_target_img_shape = input_dict["image1"].shape[-2:]
    sy, sx = source_img_shape[0] / new_source_img_shape[0], source_img_shape[1] / new_source_img_shape[1]
    keypoints0[:, 0] = keypoints0[:, 0] * sx
    keypoints0[:, 1] = keypoints0[:, 1] * sy

    sy, sx = target_img_shape[0] / new_target_img_shape[0], target_img_shape[1] / new_target_img_shape[1]
    keypoints1[:, 0] = keypoints1[:, 0] * sx
    keypoints1[:, 1] = keypoints1[:, 1] * sy
    torch.cuda.empty_cache()
    return {
        "keypoints0": keypoints0,
        "keypoints1":  keypoints1,
        "confidence": confidence
    }
    
#put single pair into multiple pairs   
def match(img_path0, img_path1, matcher, device=device):
    mkpts0 = []
    mkpts1 = []
    confidence = []
    for img_size in IMG_MAX_SIZE:
        preds =  match_same_size(img_path0, img_path1, matcher, device=device, flip=DO_FLIP, img_max_size=img_size)
        mkpts0.append(preds["keypoints0"])
        mkpts1.append(preds["keypoints1"])
        confidence.append(preds["confidence"])
    # concatenate
    mkpts0 = np.concatenate(mkpts0, axis=0)
    mkpts1 = np.concatenate(mkpts1, axis=0)
    confidence = np.concatenate(confidence, axis=0)
    
    # filter pairs
    #filterMAX_NUM_PAIRS
    ind = np.argsort(-confidence)
    ind = ind[:MAX_NUM_PAIRS]
    mkpts0 = mkpts0[ind]
    mkpts1 = mkpts1[ind]
    confidence = confidence[ind]
    return mkpts0, mkpts1

# ***Inference***

In [7]:
F_dict = {}
import time
for i, row in enumerate(test_samples):
    #get info
    sample_id, batch_id, image_1_id, image_2_id = row
    
    #loftr model extrac keypoint-pairs 
    st = time.time()
    img_path0 = f'{src}/test_images/{batch_id}/{image_1_id}.png'
    img_path1 = f'{src}/test_images/{batch_id}/{image_2_id}.png'
    mkpts0, mkpts1 = match(img_path0, img_path1, matcher)
    torch.cuda.empty_cache()
    
    ##dkm
    img1 = cv2.imread(img_path0) 
    img2 = cv2.imread(img_path1)
        
    img1PIL = Image.fromarray(cv2.cvtColor(img1, cv2.COLOR_BGR2RGB))
    img2PIL = Image.fromarray(cv2.cvtColor(img2, cv2.COLOR_BGR2RGB))
    
    #dkm extrac keypoint-pairs 
    dense_matches, dense_certainty = dkm_model.match(img1PIL, img2PIL)
    dense_certainty = dense_certainty.sqrt()
    sparse_matches, sparse_certainty = dkm_model.sample(dense_matches, dense_certainty, 2000)
    #filter low score point
    sparse_matches = sparse_matches[sparse_certainty>0.5]
    dkm_p0 = sparse_matches[:, :2]
    dkm_p1 = sparse_matches[:, 2:]
    
    #rescale 
    h, w, c = img1.shape
    dkm_p0[:, 0] = ((dkm_p0[:, 0] + 1)/2) * w
    dkm_p0[:, 1] = ((dkm_p0[:, 1] + 1)/2) * h

    h, w, c = img2.shape
    dkm_p1[:, 0] = ((dkm_p1[:, 0] + 1)/2) * w
    dkm_p1[:, 1] = ((dkm_p1[:, 1] + 1)/2) * h
    
    #ensemble
    mkpts0 = np.concatenate([mkpts0, dkm_p0], axis=0)
    mkpts1 = np.concatenate([mkpts1, dkm_p1], axis=0)
    #最少需要8个点来得到基础矩阵（使用8点算法）
    if len(mkpts0) > 7:
        #使用cv2.findFundamentalMat计算基础矩阵，使用cv2.USAC_MAGSAC估算器，匹配误差在0.1845-0.999999，最大迭代次数为10000
        F, inliers = cv2.findFundamentalMat(mkpts0, mkpts1, cv2.USAC_MAGSAC, 0.1845, 0.999999, 10000)
        inliers = inliers > 0
        assert F.shape == (3, 3), 'Malformed F?'
        F_dict[sample_id] = F
    else:
        F_dict[sample_id] = np.zeros((3, 3))
        continue
    gc.collect()
    nd = time.time()    
    #画图相关函数
    if (i < 3):
        print("Running time: ", nd - st, " s")
        image_1 =load_torch_image(img_path0, device=device, resize=False)[0]
        image_2 = load_torch_image(img_path1, device=device,  resize=False)[0]
        draw_LAF_matches(
        KF.laf_from_center_scale_ori(torch.from_numpy(mkpts0).view(1,-1, 2),
                                    torch.ones(mkpts0.shape[0]).view(1,-1, 1, 1),
                                    torch.ones(mkpts0.shape[0]).view(1,-1, 1)),

        KF.laf_from_center_scale_ori(torch.from_numpy(mkpts1).view(1,-1, 2),
                                    torch.ones(mkpts1.shape[0]).view(1,-1, 1, 1),
                                    torch.ones(mkpts1.shape[0]).view(1,-1, 1)),
        torch.arange(mkpts0.shape[0]).view(-1,1).repeat(1,2),
        K.tensor_to_image(image_1),
        K.tensor_to_image(image_2),
        inliers,
        draw_dict={'inlier_color': (0.2, 1, 0.2),
                   'tentative_color': None, 
                   'feature_color': (0.2, 0.5, 1), 'vertical': False})
    
#结果文件保存
with open('submission.csv', 'w') as f:
    f.write('sample_id,fundamental_matrix\n')
    for sample_id, F in F_dict.items():
        f.write(f'{sample_id},{FlattenMatrix(F)}\n')