In [1]:
import numpy as np
import kornia as K
import kornia.feature as KF
import torch
import torch.nn as nn
import torchvision
from PIL import Image
import skimage.transform
import PIL.Image as pil
import tqdm
import os
os.sys.path.append("/home/data/workspace/heqi/monogastroendo")
from utils import *

fpath = os.path.join("/home/data/workspace/heqi/matchingloss/splits/simcol_complete", "{}_files.txt")
data_path = "/home/data/workspace/heqi/matchingloss/data/simcol_complete/imgs"
img_ext = ".png"

def pil_loader(path):
    # open path as file to avoid ResourceWarning
    # (https://github.com/python-pillow/Pillow/issues/835)
    with open(path, 'rb') as f:
        with Image.open(f) as img:
            return img.convert('RGB')
        
def get_image_path(folder, frame_index_str):
    f_str = "{}{}".format(frame_index_str, img_ext)
    image_path = os.path.join(data_path, folder, f_str)
    return image_path

def get_color(folder, frame_index_str, do_flip):
    color = pil_loader(get_image_path(folder, frame_index_str))
    
    if do_flip:
        color = color.transpose(pil.FLIP_LEFT_RIGHT)
    return color

# utils
resize = torchvision.transforms.Resize((352, 352), interpolation=torchvision.transforms.InterpolationMode.LANCZOS)
to_tensor = torchvision.transforms.ToTensor()
torch.cuda.set_device(1)
device = torch.device("cuda")

class LoFTR(nn.Module):
    """Layer to compute the correspondences between a pair of images
    """
    def __init__(self, pretrained='indoor'):
        super(LoFTR, self).__init__()
        self.matcher = KF.LoFTR(pretrained=pretrained)

    def forward(self, src0, srcx):
        input_dict = {"image0": K.color.rgb_to_grayscale(src0), # LofTR works on grayscale images only 
                    "image1": K.color.rgb_to_grayscale(srcx)}
        with torch.no_grad():
            correspondences = self.matcher(input_dict)
        return correspondences
##########################################################

# load data
train_filenames = readlines(fpath.format("train"))
val_filenames = readlines(fpath.format("val"))
test_filenames = readlines(fpath.format("test"))

# load matcher
try:
    matcher
    print("matcher loaded")
except NameError:
    print("load matcher")
    matcher = LoFTR(pretrained="indoor")
    matcher.to(device)

# processing correspondence
matcher_result = {"no_flip": [],
                  "do_flip": []}
for i in tqdm.notebook.tnrange(len(val_filenames)):
    line = val_filenames[i].split()
    
    for do_flip in [False, True]:
        img_tensor = []
        for j in range(3):
            img_tensor.append(to_tensor(resize(get_color(line[0], line[1+j], do_flip))).to(device))
        correspondences = []
        correspondences.append(matcher.forward(img_tensor[1][None, ...], img_tensor[0][None, ...]))
        correspondences.append(matcher.forward(img_tensor[1][None, ...], img_tensor[2][None, ...]))
        for k in range(2):
            del correspondences[k]['batch_indexes']
            correspondences[k]['keypoints0'] = correspondences[k]['keypoints0'].detach().cpu().numpy()
            correspondences[k]['keypoints1'] = correspondences[k]['keypoints1'].detach().cpu().numpy()
            correspondences[k]['confidence'] = correspondences[k]['confidence'].detach().cpu().numpy()
        if do_flip:
            matcher_result["do_flip"].append(correspondences)
        else:
            matcher_result["no_flip"].append(correspondences)
    
np.save("val_352x352", matcher_result)

# processing correspondence
matcher_result = {"no_flip": [],
                  "do_flip": []}
for i in tqdm.notebook.tnrange(len(train_filenames)):
    line = train_filenames[i].split()
    
    for do_flip in [False, True]:
        img_tensor = []
        for j in range(3):
            img_tensor.append(to_tensor(resize(get_color(line[0], line[1+j], do_flip))).to(device))
        correspondences = []
        correspondences.append(matcher.forward(img_tensor[1][None, ...], img_tensor[0][None, ...]))
        correspondences.append(matcher.forward(img_tensor[1][None, ...], img_tensor[2][None, ...]))
        for k in range(2):
            del correspondences[k]['batch_indexes']
            correspondences[k]['keypoints0'] = correspondences[k]['keypoints0'].detach().cpu().numpy()
            correspondences[k]['keypoints1'] = correspondences[k]['keypoints1'].detach().cpu().numpy()
            correspondences[k]['confidence'] = correspondences[k]['confidence'].detach().cpu().numpy()
        if do_flip:
            matcher_result["do_flip"].append(correspondences)
        else:
            matcher_result["no_flip"].append(correspondences)
    
np.save("train_352x352", matcher_result)

# processing correspondence
matcher_result = {"no_flip": [],
                  "do_flip": []}
for i in tqdm.notebook.tnrange(len(test_filenames)):
    line = test_filenames[i].split()
    
    for do_flip in [False, True]:
        img_tensor = []
        for j in range(3):
            img_tensor.append(to_tensor(resize(get_color(line[0], line[1+j], do_flip))).to(device))
        correspondences = []
        correspondences.append(matcher.forward(img_tensor[1][None, ...], img_tensor[0][None, ...]))
        correspondences.append(matcher.forward(img_tensor[1][None, ...], img_tensor[2][None, ...]))
        for k in range(2):
            del correspondences[k]['batch_indexes']
            correspondences[k]['keypoints0'] = correspondences[k]['keypoints0'].detach().cpu().numpy()
            correspondences[k]['keypoints1'] = correspondences[k]['keypoints1'].detach().cpu().numpy()
            correspondences[k]['confidence'] = correspondences[k]['confidence'].detach().cpu().numpy()
        if do_flip:
            matcher_result["do_flip"].append(correspondences)
        else:
            matcher_result["no_flip"].append(correspondences)
    
np.save("test_352x352", matcher_result)

load matcher


  0%|          | 0/7161 [00:00<?, ?it/s]

  0%|          | 0/21510 [00:00<?, ?it/s]

  0%|          | 0/8951 [00:00<?, ?it/s]

In [3]:
matcher_result['no_flip'][0][0]["keypoints0"]

array([[ 16.,  16.],
       [ 40.,  16.],
       [ 48.,  16.],
       ...,
       [368., 424.],
       [408., 424.],
       [416., 424.]], dtype=float32)

In [26]:
del matcher_result_load

In [3]:
import numpy as np
try:
    matcher_result_load
except NameError:
    matcher_result_load = np.load("matcher_result.npy", allow_pickle=True).all()
matcher_result_load['do_flip'][0][0]["keypoints0"]

array([[ 72.,  16.],
       [ 80.,  16.],
       [ 88.,  16.],
       ...,
       [400., 424.],
       [408., 424.],
       [424., 424.]], dtype=float32)

In [4]:
matcher_result_load['do_flip'][0]

[{'keypoints0': array([[ 72.,  16.],
         [ 80.,  16.],
         [ 88.,  16.],
         ...,
         [400., 424.],
         [408., 424.],
         [424., 424.]], dtype=float32),
  'keypoints1': array([[ 40.52599 ,  56.303223],
         [ 48.312782,  56.0391  ],
         [ 55.917706,  55.54209 ],
         ...,
         [375.33487 , 376.00107 ],
         [382.4657  , 375.67676 ],
         [392.83636 , 375.35797 ]], dtype=float32),
  'confidence': array([0.663439  , 0.99974006, 0.8829035 , ..., 1.        , 0.9891658 ,
         0.9999306 ], dtype=float32)},
 {'keypoints0': array([[112.,  16.],
         [128.,  16.],
         [136.,  16.],
         ...,
         [ 88., 424.],
         [ 96., 424.],
         [104., 424.]], dtype=float32),
  'keypoints1': array([[119.538445,  16.027597],
         [128.78128 ,  16.262442],
         [136.65668 ,  17.015198],
         ...,
         [ 56.229065, 414.63205 ],
         [ 64.1382  , 414.94138 ],
         [ 72.83633 , 414.48016 ]], dtype=float32