# load libraries

In [1]:
#load libraries
import os
import numpy as np
import cv2
import json
import skimage.io as io
import shutil
import os
from matplotlib import pyplot as plt
import seaborn as sns
import csv


import torch
import torchvision # torch package for vision related things
import torch.nn.functional as F  # Parameterless functions, like (some) activation functions
import torchvision.datasets as datasets  # Standard datasets
import torchvision.transforms as transforms  # Transformations we can perform on our dataset for augmentation
from torch import optim  # For optimizers like SGD, Adam, etc.
from torch import nn  # All neural network modules, nn.Linear, nn.Conv2d, BatchNorm, Loss functions
from torch.utils.data import DataLoader  # Gives easier dataset managment by creating mini batches etc.
from tqdm import tqdm  # For nice progress bar!
from random import sample
import copy

# Load Image Data

In [8]:
import os
import pandas as pd 
import torch
from torch.utils.data import Dataset
from PIL import Image
import torchvision.transforms as transforms

class trainDataset(Dataset):
    '''
    csv_file contain the image name and the label (x,y) of each image
        first column: name of the image
        second column: label (the x,y coordinations) of the image
    img_dir: the path to where the train or test images are stored
    flag_resize (bool): a flag, if True, then do the resize. Resize the image into suqare.
    
    '''
    def __init__(self, csv_file, img_dir, flag_resize=True, img_size=224, transform=None, cmap='RGB'):
        self.annotations = pd.read_csv(csv_file) #read csv as dataframe
        self.img_dir = img_dir
        self.img_names = self.annotations['id'].to_numpy() #array of object, size: (7500,)
        self.flag_resize = flag_resize
        self.img_size = img_size
        self.transform = transform
        self.cmap = cmap
    
    
    def __len__(self):
        return len(self.img_names)
        

    '''
    must have __getitem__ function
    to return specific image and the target/label of that image
    '''
    def __getitem__(self, index):
        img_filename = self.img_names[index] + ".jpg" #output will look like this ('IMG2760_5.jpg',)
        img_path = os.path.join(self.img_dir, img_filename)
        image = Image.open(img_path).convert(self.cmap) # w, h, channel #image raed by PIL is channel last. shape(490, 680, 3)
        #image = io.imread(img_path)
        geo_label_x = torch.tensor(self.annotations.iloc[index,1])
        geo_label_y = torch.tensor(self.annotations.iloc[index,2])
        
        
        #Resize PIL image to square
        if self.flag_resize:
            ori_img_width = image.size[0]
            ori_img_height = image.size[1]
            image = image.resize((self.img_size,self.img_size),resample=Image.LANCZOS) #LANCZOS is the best
      
        
        
        if self.transform: #transforms.ToTensor() convert PIL image into a tensor with shape (channel, height, width)
            image = self.transform(image) #after convert to tensor it is channel first (shape: 3,490,680)



        return (img_filename, image, (geo_label_x, geo_label_y))
    
    '''
    This function is used to get the id, x, and y of spdcific image in the dataset by its image file name
    '''
    def get_item_by_imgName(self, imgName):
        dataframe = self.annotations
        imgID = imgName.strip('.jpg')
        out = dataframe[dataframe['id'] == imgID] #out is a dataframe that contain 3 columns: 'id', 'x', 'y'
        ID = out.iloc[0,0]
        assert ID==imgID
        x = out.iloc[0,1]
        y = out.iloc[0,2]
        return (ID, x, y)
        
        
        
'''
This dataset is used for test. Because during the test time, we don't have the label (x,y)
'''
class testDataset(Dataset):
    '''
    csv_file contain the image name and the label (x,y) of each image
        first column: name of the image
        second column: label (the x,y coordinations) of the image
    img_dir: the path to where the train or test images are stored
    flag_resize (bool): a flag, if True, then do the resize. Resize the image into suqare.
    
    '''
    def __init__(self, csv_file, img_dir, flag_resize=True, img_size=224, transform=None, cmap = 'RGB'):
        self.annotations = pd.read_csv(csv_file)
        self.img_dir = img_dir
        self.img_names = self.annotations['id'].to_numpy() #array of object, size: (7500,)
        self.flag_resize = flag_resize
        self.img_size = img_size
        self.transform = transform
        self.cmap = cmap
    
    '''
    must have __len__ function
    '''
    def __len__(self):
        return len(self.img_names)
        

    '''
    must have __getitem__ function
    to return specific image and the target/label of that image
    '''
    def __getitem__(self, index):
        img_filename = self.img_names[index] + ".jpg" #output will look like this ('IMG2760_5.jpg',)
        img_path = os.path.join(self.img_dir, img_filename)
        image = Image.open(img_path).convert(self.cmap) # w, h, channel #image raed by PIL is channel last. shape(490, 680, 3)
        #image = io.imread(img_path)
        #geo_label_x = torch.tensor(self.annotations.iloc[index,1])
        #geo_label_y = torch.tensor(self.annotations.iloc[index,2])
        
        
        #Resize PIL image to square
        if self.flag_resize:
            ori_img_width = image.size[0]
            ori_img_height = image.size[1]
            image = image.resize((self.img_size,self.img_size),resample=Image.LANCZOS) #LANCZOS is the best
        
        if self.transform: #transforms.ToTensor() convert PIL image into a tensor with shape (channel, height, width)
            image = self.transform(image) #after convert to tensor it is channel first (shape: 3,490,680)

        return (img_filename, image)
    

# Feature Extraction from ViT

In [3]:
# Load Image Data
import os
import pandas as pd 
import torch
from torch.utils.data import Dataset
#from skimage import io
from PIL import Image
import torchvision.transforms as transforms

class myDataset(Dataset):
    def __init__(self, csv_file, img_dir, flag_resize=True, img_size=224, transform=None):
        self.annotations = pd.read_csv(csv_file)
        self.img_dir = img_dir
        self.img_names = self.annotations['id'].to_numpy() #array of object, size: (7500,)
        self.flag_resize = flag_resize
        self.img_size = img_size
        self.transform = transform
    
    def __len__(self):
        return len(self.img_names)
        
    def __getitem__(self, index):
        img_filename = self.img_names[index] + ".jpg"
        img_path = os.path.join(self.img_dir, img_filename)
        image = Image.open(img_path).convert('RGB') # w, h, channel #image raed by PIL is channel last. shape(490, 680, 3)
        #image = io.imread(img_path)
        
#         geo_label_x = torch.tensor(self.annotations.iloc[index,1])
#         geo_label_y = torch.tensor(self.annotations.iloc[index,2])
        
        #Resize PIL image to square
        if self.flag_resize:
            ori_img_width = image.size[0]
            ori_img_height = image.size[1]
            image = image.resize((self.img_size,self.img_size),resample=Image.LANCZOS) #LANCZOS is the best
        
        if self.transform: #transforms.ToTensor() convert PIL image into a tensor with shape (channel, height, width)
            image = self.transform(image) #after convert to tensor it is channel first (shape: 3,490,680)

        return (img_filename, image) # (geo_label_x, geo_label_y)) 
    

In [4]:
'''
This file is the main module of the vision transformer
The code is extracted from 
https://github.com/jankrepl/mildlyoverfitted/blob/master/github_adventures/vision_transformer/custom.py

'''

import torch
import torch.nn as nn

class PatchEmbed(nn.Module):
    def __init__(self, img_size, patch_size, in_chans=3, embed_dim=1024):
        super().__init__()
        self.img_size = img_size #assume square image, image_size = the length of the image (int)
        self.patch_size = patch_size #assume square, patch_size = the length of the patch (int)
        self.n_patches = (img_size // patch_size) ** 2 # total number of patches (flatten)

        #kernel will exactly fall into the patches and never overlap.
        self.proj = nn.Conv2d(
                in_chans,
                embed_dim,
                kernel_size=patch_size,
                stride=patch_size,
        )

    def forward(self, x):
        # feed x to proj layer
        x = self.proj(
                x
            )  # (n_samples, embed_dim, n_patches ** 0.5, n_patches ** 0.5) #64 patch -> 8x8
        x = x.flatten(2)  # (n_samples, embed_dim, n_patches)
        x = x.transpose(1, 2)  # (n_samples, n_patches, embed_dim) #swap the axis

        return x

# Below code are just copied from NLP

class Attention(nn.Module):
    def __init__(self, dim, n_heads=12, qkv_bias=True, attn_p=0., proj_p=0.):
        super().__init__()
        self.n_heads = n_heads
        self.dim = dim
        self.head_dim = dim // n_heads
        self.scale = self.head_dim ** -0.5 #come from "attention is all you need paper" to prevent too large head to dominant

        self.qkv = nn.Linear(dim, dim * 3, bias=qkv_bias) #linear mapping, take the embedding token and mapping/split to query, key, value
        #input size = (?,?,dim) output size = (?,?,3*dim) #linear mapping applied to the last dimension

        self.attn_drop = nn.Dropout(attn_p)
        self.proj = nn.Linear(dim, dim) #take the concatenate heads #input size = dim #output size = dim

        self.proj_drop = nn.Dropout(proj_p)

    def forward(self, x):
        n_samples, n_tokens, dim = x.shape

        if dim != self.dim:
            raise ValueError

        qkv = self.qkv(x)  # (n_samples, n_patches + 1, 3 * dim)
        
        # create extra dimension for heads and q,k,v
        qkv = qkv.reshape(
                n_samples, n_tokens, 3, self.n_heads, self.head_dim
        )  # (n_smaples, n_patches + 1, 3, n_heads, head_dim)
        
        # change the order
        qkv = qkv.permute(
                2, 0, 3, 1, 4
        )  # (3, n_samples, n_heads, n_patches + 1, head_dim) 

        q, k, v = qkv[0], qkv[1], qkv[2]
        k_t = k.transpose(-2, -1)  # (n_samples, n_heads, head_dim, n_patches + 1) 
        dp = (
           q @ k_t #matrix multiplication
        ) * self.scale # (n_samples, n_heads, n_patches + 1, n_patches + 1)
        
        #apply softmax to get discrete probabilities that some up to 1. This can be use as the weight.
        attn = dp.softmax(dim=-1)  # (n_samples, n_heads, n_patches + 1, n_patches + 1) 
        attn = self.attn_drop(attn) #dropout

        weighted_avg = attn @ v  # (n_samples, n_heads, n_patches +1, head_dim) #compute the weighted average
        weighted_avg = weighted_avg.transpose(
                1, 2
        )  # (n_samples, n_patches + 1, n_heads, head_dim)
        weighted_avg = weighted_avg.flatten(2)  # (n_samples, n_patches + 1, dim) #flatten(2), start flatten from axis 2
        # dim = n_heards x head_dim

        x = self.proj(weighted_avg)  # (n_samples, n_patches + 1, dim)
        x = self.proj_drop(x)  # (n_samples, n_patches + 1, dim) #dropout

        return x


class MLP(nn.Module):
    def __init__(self, in_features, hidden_features, out_features, p=0.):
        super().__init__()
        self.fc1 = nn.Linear(in_features, hidden_features)
        self.act = nn.GELU()
        self.fc2 = nn.Linear(hidden_features, out_features)
        self.drop = nn.Dropout(p)

    def forward(self, x):
        x = self.fc1(x) # (n_samples, n_patches + 1, hidden_features)
        x = self.act(x)  # (n_samples, n_patches + 1, hidden_features)
        x = self.drop(x)  # (n_samples, n_patches + 1, hidden_features)
        x = self.fc2(x)  # (n_samples, n_patches + 1, hidden_features)
        x = self.drop(x)  # (n_samples, n_patches + 1, hidden_features) 

        return x


class Block(nn.Module):

    def __init__(self, dim, n_heads, mlp_ratio=4.0, qkv_bias=True, p=0., attn_p=0.):
        super().__init__()
        self.norm1 = nn.LayerNorm(dim, eps=1e-6) #Applies Layer Normalization over a mini-batch of inputs

        self.attn = Attention(
                dim,
                n_heads=n_heads,
                qkv_bias=qkv_bias,
                attn_p=attn_p,
                proj_p=p
        )
        self.norm2 = nn.LayerNorm(dim, eps=1e-6)
        hidden_features = int(dim * mlp_ratio)
        #remove the mlp layer
        self.mlp = MLP(
                in_features=dim,
                hidden_features=hidden_features,
                out_features=dim,
        )

    def forward(self, x):

        x = x + self.attn(self.norm1(x)) #residule (add x to output)
        x = x + self.mlp(self.norm2(x)) #residule (add x to output)

        return x


class VisionTransformer(nn.Module):
    def __init__(
            self,
            img_size=384,
            patch_size=16,
            in_chans=3,
            n_classes=1, #output dimension
            embed_dim=1024,
            depth=12,
            n_heads=12,
            mlp_ratio=4.,
            qkv_bias=True,
            p=0.,
            attn_p=0.,
    ):
        super().__init__()

        self.patch_embed = PatchEmbed(
                img_size=img_size,
                patch_size=patch_size,
                in_chans=in_chans,
                embed_dim=embed_dim,
        )
        self.cls_token = nn.Parameter(torch.zeros(1, 1, embed_dim)) #3D
        #nn.Parameter is a kind of Tensor that is to be considered a module parameter.

        # determine where is exactly the location of the given patch in the image
        self.pos_embed = nn.Parameter(
                torch.zeros(1, 1 + self.patch_embed.n_patches, embed_dim) #1+ for CLS token
        )
        self.pos_drop = nn.Dropout(p=p)

        self.blocks = nn.ModuleList(
            [
                Block(
                    dim=embed_dim,
                    n_heads=n_heads,
                    mlp_ratio=mlp_ratio,
                    qkv_bias=qkv_bias,
                    p=p,
                    attn_p=attn_p,
                )
                for _ in range(depth)
            ]
        )

        self.norm = nn.LayerNorm(embed_dim, eps=1e-6) #normalize over the lasr axis (which size is embed_dim)
        self.projEmb = nn.Linear(embed_dim, n_classes) #self.head
        #self.sig = nn.Sigmoid() #convert to [0,1]
        

    def forward(self, x):
        """Run the forward pass.

        Parameters
        ----------
        x : torch.Tensor
            Shape `(n_samples, in_chans, img_size, img_size)`.

        Returns
        -------
        logits : torch.Tensor
            Logits over all the classes - `(n_samples, n_classes)`.
        """
        n_samples = x.shape[0]
        x = self.patch_embed(x) #turn input image x into patch embedding


        # learnable cks token
        cls_token = self.cls_token.expand(
                n_samples, -1, -1
        )  # (n_samples, 1, embed_dim)

        # prepend to the patch embedding
        x = torch.cat((cls_token, x), dim=1)  # (n_samples, 1 + n_patches, embed_dim)
        # add the learnable location embedding 
        x = x + self.pos_embed  # (n_samples, 1 + n_patches, embed_dim)
        x = self.pos_drop(x)

        for block in self.blocks:
            x = block(x) # (n_samples, 1 + n_patches, embed_dim)?

        x = self.norm(x) # (n_samples, 1 + n_patches, embed_dim)?

        
        # for classification task
        cls_token_final = x[:, 0] #only take the CLS token to represent the whole image 
        
        x = cls_token_final #shape [batch_size, 768(embed_dim)]
        
        return x
    

In [5]:
# load ground true locations
train_dic = {}
with open('train.csv', 'r') as file:
    reader = csv.reader(file)
    first = True
    for row in reader:
        if first:
            first = False
            continue
        train_dic[row[0]+".jpg"] = {'x':row[1], 'y':row[2]}
print(f'total train image is {len(train_dic)}')
        
test_dic = {}
with open('test.csv', 'r') as file:
    reader = csv.reader(file)
    first = True
    for row in reader:
        if first:
            first = False
            continue
        test_dic[row[0]+'.jpg'] = [] 
print(f'total test image is {len(test_dic)}')

total train image is 7500
total test image is 1200


In [6]:
# Set device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [7]:
%%capture
# load pre-trained model

import numpy as np
import timm
import torch

model_name = "vit_base_patch32_224" 
model_official = timm.create_model(model_name, pretrained=True).to(device)
model_official.eval() #switch to eval mode
print(type(model_official))

custom_config = {
        "img_size": 224, #384
        "in_chans": 3,
        "patch_size": 32,
        "embed_dim": 768,#1024 for large #768 for base #192 for tiny
        "depth": 12, #24 for large #12 for base #12 for tiny
        "n_heads": 12, #16 for large #12 for base #3 for tiny, #6 for small
        "qkv_bias": True,
        "mlp_ratio": 4,
        "n_classes": 1
}

model_custom = VisionTransformer(**custom_config).to(device)
model_custom.eval() #set to evaluation mode

In [8]:
# check if the pre-trained model is correctly loaded
#this function count the number of learnable parameters
def get_n_params(module):
    return sum(p.numel() for p in module.parameters() if p.requires_grad)

# compare whethere 2 tensor are equal
def assert_tensors_equal(t1, t2):
    a1, a2 = t1.detach().numpy(), t2.detach().numpy()

    np.testing.assert_allclose(a1, a2)
#==============================================================================

# copy the pretrained weighted into our customer model
for n_o, p_o in model_official.named_parameters():
    for n_c, p_c in model_custom.named_parameters():
        if n_o == n_c:
            print(n_o)
            p_c.data[:] = p_o.data


cls_token
pos_embed
patch_embed.proj.weight
patch_embed.proj.bias
blocks.0.norm1.weight
blocks.0.norm1.bias
blocks.0.attn.qkv.weight
blocks.0.attn.qkv.bias
blocks.0.attn.proj.weight
blocks.0.attn.proj.bias
blocks.0.norm2.weight
blocks.0.norm2.bias
blocks.0.mlp.fc1.weight
blocks.0.mlp.fc1.bias
blocks.0.mlp.fc2.weight
blocks.0.mlp.fc2.bias
blocks.1.norm1.weight
blocks.1.norm1.bias
blocks.1.attn.qkv.weight
blocks.1.attn.qkv.bias
blocks.1.attn.proj.weight
blocks.1.attn.proj.bias
blocks.1.norm2.weight
blocks.1.norm2.bias
blocks.1.mlp.fc1.weight
blocks.1.mlp.fc1.bias
blocks.1.mlp.fc2.weight
blocks.1.mlp.fc2.bias
blocks.2.norm1.weight
blocks.2.norm1.bias
blocks.2.attn.qkv.weight
blocks.2.attn.qkv.bias
blocks.2.attn.proj.weight
blocks.2.attn.proj.bias
blocks.2.norm2.weight
blocks.2.norm2.bias
blocks.2.mlp.fc1.weight
blocks.2.mlp.fc1.bias
blocks.2.mlp.fc2.weight
blocks.2.mlp.fc2.bias
blocks.3.norm1.weight
blocks.3.norm1.bias
blocks.3.attn.qkv.weight
blocks.3.attn.qkv.bias
blocks.3.attn.proj.wei

In [9]:
# save the model
torch.save(model_custom, "model_custom_with_pretrained.pth")

In [10]:
img_transform = transforms.Compose([
        transforms.ToTensor(), #transform image from [0,1] PIL Image into a tensor
        transforms.Normalize(mean=torch.tensor([0.5000, 0.5000, 0.5000]), std=torch.tensor([0.5000, 0.5000, 0.5000])) #transform image into [-1,1] #(three value for there channel)
])

In [11]:
train_dataset = myDataset(csv_file = 'train.csv', img_dir = 'train_img', flag_resize=True, img_size = 224, transform = img_transform)
batch_size = 1
train_loader = DataLoader(dataset=train_dataset, batch_size=batch_size, shuffle=False)

test_dataset = myDataset(csv_file = 'test.csv', img_dir = 'test_img', flag_resize=True, img_size = 224, transform = img_transform)
batch_size = 1
test_loader = DataLoader(dataset=test_dataset, batch_size=batch_size, shuffle=False)


In [9]:
# train_dataset is the whole 7500 image in "COMP90086_2021_Project_train"
# del_set and valid_set is for develop feature matching algorithm purpose!
# cmap="L" meas read image as grayscale
train_dataset_original_size = trainDataset(csv_file = 'train.csv', img_dir = 'train_img', flag_resize=False, 
                                           img_size = None, transform = transforms.ToTensor(), cmap='L') #return image is channel first, value between [0,1]
# divide training dataset into validation and develop dataset (10:90)
dev_set, valid_set = torch.utils.data.random_split(train_dataset_original_size, [6750, 750]) #10% for validation

test_dataset = testDataset(csv_file = 'test.csv', img_dir = 'test_img', flag_resize=False, img_size = None, transform = transforms.ToTensor(), cmap='L')


In [13]:
# save the images
all_image_names_in_valid_set = []
for img_name, _, _ in tqdm(valid_set):
    all_image_names_in_valid_set.append(img_name)

100%|██████████| 750/750 [00:15<00:00, 47.19it/s]


In [14]:
# save the images
all_image_names_in_dev_set = []
for img_name, _, _ in tqdm(dev_set):
    all_image_names_in_dev_set.append(img_name)

100%|██████████| 6750/6750 [02:30<00:00, 44.93it/s]


# Cosine Similarity of image features

In [15]:

pred_all = dict()
for batch_idx, (filename, img) in enumerate(tqdm(train_loader)):
    with torch.no_grad():
        img = img.to(device=device)
        out = model_custom(img)
        filename = filename[0]
        pred_all[filename] = out
        
pred_test = dict()
for batch_idx, (filename, img) in enumerate(tqdm(test_loader)):
    with torch.no_grad():
        img = img.to(device=device)
        out = model_custom(img)
        filename = filename[0]
        pred_test[filename] = out

 88%|████████▊ | 6635/7500 [06:21<00:49, 17.40it/s]


KeyboardInterrupt: 

In [17]:
# save the similarity into dictionary format
from tqdm import tqdm 

cos = nn.CosineSimilarity(dim=1, eps=1e-6)
sim_dic = dict()

for filename1, output1 in tqdm(pred_all.items()):
    filename1 = filename1[0]
    sim_dic[filename1] = dict()
    for filename2, output2 in pred_all.items():
        filename2 = filename2[0]
        if filename1 != filename2:
            sim = cos(output1,output2)
            sim_dic[filename1][filename2] = sim.item()
            
for key in sim_dic.keys():
    sim_dic[key] = dict(sorted(sim_dic[key].items(), key=lambda item: item[1], reverse=True))



cos = nn.CosineSimilarity(dim=1, eps=1e-6)
test_sim_dic = dict()

for filename1, output1 in tqdm(pred_test.items()):
    filename1 = filename1
    test_sim_dic[filename1] = dict()
    for filename2, output2 in pred_all.items():
        filename2 = filename2
        if filename1 != filename2:
            sim = cos(output1,output2)
            test_sim_dic[filename1][filename2] = sim.item()
            
for key in test_sim_dic.keys():
    test_sim_dic[key] = dict(sorted(test_sim_dic[key].items(), key=lambda item: item[1], reverse=True))
  

In [19]:
# export the similarity result into json file
with open('train_sim_dic.json', 'w') as file:
    file.write(json.dumps(sim_dic2))

with open('test_sim_dic.json', 'w') as file:
    file.write(json.dumps(test_sim_dic))

In [3]:
# load the similarity result from json file
f = open('train_sim_dic.json',)
sim_dic = json.load(f)

f = open('test_sim_dic.json',)
test_sim_dic = json.load(f)

# Build Method 1


## Feature (Affine Matrix) extraction for Linear Regression

In [29]:
# install kornia if not downloaded
# pip install kornia

In [84]:
'''
Input:
    img0: the source image. Must be a grayscale image, channel first
    img1: the target image to match. Must be a grayscale image, channel first
        The input image with the shape (1,h,w)
    threshold (float): a value in [0,1]. It is the threshold to filter bad matching points
'''
# load the pre-trained LoFTR for point matching and setup
from kornia.feature import LoFTR
loftr = LoFTR('indoor')

# get matching points by LoFTR model
def get_matched_points_by_loftr(img0, img1, threshold =0.5):
    # Add batch axis
    img0 = img0.unsqueeze(0)
    img1 = img1.unsqueeze(0)
    
    input = {"image0": img0, "image1": img1}
    out = loftr(input)
    # out is a dict with keys: dict_keys(['keypoints0', 'keypoints1', 'confidence', 'batch_indexes'])
    pts_img0 = out['keypoints0']
    pts_img1 = out['keypoints1']
    confidence = out['confidence']
    filter = (confidence>threshold)
    good_pts_img0 = pts_img0[filter]
    good_pts_img1 = pts_img1[filter]
    good_confidence = confidence[filter]
    return good_pts_img0, good_pts_img1, good_confidence

# get the top N similar image from the input dictionary
def find_top_similar_imgs(img, n_top, dic):
    cnt = 0
    top_imgs = []
    for img2 in dic[img].keys():
        top_imgs.append(img2)
        cnt += 1
        if cnt == n_top:
            break
    return top_imgs

  and should_run_async(code)


In [11]:
# load the image information into dictionary format
dev_set_dic = dict()
for dev in tqdm(dev_set):
    img1_name, img1, labels = dev
    dev_set_dic[img1_name] = {'value': img1, 'label': labels}
    
val_set_dic = dict()
for val in tqdm(valid_set):
    img1_name, img1, labels = val
    val_set_dic[img1_name] = {'value': img1, 'label': labels}

all_set_dic = dict()
for data in tqdm(train_dataset_original_size):
    img1_name, img1, labels = data
    all_set_dic[img1_name] = {'value': img1, 'label': labels}
    
test_set_dic = dict()
for data in tqdm(test_dataset):
    img1_name, img1 = data
    test_set_dic[img1_name] = {'value': img1}

100%|██████████| 7500/7500 [01:28<00:00, 84.55it/s]
100%|██████████| 1200/1200 [00:14<00:00, 85.11it/s]


In [198]:
from tqdm import tqdm
from numba import jit, cuda
import scipy.stats as ss
import tensorflow as tf
import time

# get affine matrix, and others from training images
for img1_name in tqdm(all_set_dic.keys()):
    max_num_pts = 0
    best_label, best_pts1, best_pts2, best_confidence = None, [], [], None
    Affine_M, Essential_M, R_est, t_est = None, None, None, None
    img1, label1 = all_set_dic[img1_name]['value'], all_set_dic[img1_name]['label']

    selected_imgs_10 = find_top_similar_imgs(img1_name, n_top = 10, dic = sim_dic)
    loftr_train_dic[img1_name] = dict()
    
    t = 0.7
    s, e = 0, 10 # search the top 10 matching images
    while True:
        for img2_name in selected_imgs_10[s:e]:   # params to avoid tqdm mutiple printing
            img2 = all_set_dic[img2_name]['value']
            pts1, pts2, confidence = get_matched_points_by_loftr(img1, img2, threshold = t)
            
            if len(pts1) > max_num_pts:
                max_num_pts = len(pts1)
                best_img_name = img2_name
                best_label = all_set_dic[img2_name]['label']
                best_pts1 = pts1
                best_pts2 = pts2
                best_confidence = confidence

        if (max_num_pts <=5 and t == 0.7):
            t = 0
#             s, e = 3, 10 
        else:
            break
    
    # if there is no matching point, record the image and save empty values
    if max_num_pts == 0 :
        train_skipped.append(img1_name)
        print('here', len(train_skipped))
        loftr_train_dic[img1_name][best_img_name] = {'Affine_M': Affine_M, 'Essential_M': Essential_M, 
                                                     'R_est': R_est, 't_est': t_est,
                                                     'pts1':best_pts1, 'pts2': best_pts2, 
                                                     'confidence': best_confidence}
    else:
        if len(best_pts1) >= 6:
            e_src = np.float32(best_pts1)
            e_dst = np.float32(best_pts2)
        
            Essential_M, mask = cv2.findEssentialMat(e_src, e_dst)  # occurs error with only 5 points!!!
            points, R_est, t_est, mask_pose = cv2.recoverPose(Essential_M, e_src, e_dst)  # R_est: Rotation & t_est: transformation

        if len(best_pts1) >= 3:
            best3_pts1, best3_pts2 = zip(*random.sample(list(zip(best_pts1, best_pts2)), 3))
            best3_pts1, best3_pts2 = torch.stack(best3_pts1), torch.stack(best3_pts2)

            a_src = np.float32(best3_pts1)
            a_dst = np.float32(best3_pts2)
            Affine_M = cv2.getAffineTransform(a_src, a_dst)
            
        loftr_train_dic[img1_name][best_img_name] = {'Affine_M': Affine_M, 'Essential_M': Essential_M, 
                                                     'R_est': R_est, 't_est': t_est,
                                                     'pts1':best_pts1, 'pts2': best_pts2, 
                                                     'confidence': best_confidence}


  and should_run_async(code)
 32%|███▏      | 2434/7500 [10:04:04<41:01:05, 29.15s/it]

here 1


100%|██████████| 7500/7500 [31:02:34<00:00, 14.90s/it]   


'Hailey'

In [247]:
# save the result into a dictionary and export it
for k1 in tqdm(loftr_train_dic.keys()):
    for k2 in loftr_train_dic[k1].keys():
        for k3 in loftr_train_dic[k1][k2].keys():
            a = loftr_train_dic[k1][k2][k3]
            if type(a) is not list and a is not None:
                loftr_train_dic[k1][k2][k3] = loftr_train_dic[k1][k2][k3].tolist()
                
with open('loftr_train_dic.json', 'w') as file:
    file.write(json.dumps(loftr_train_dic))
    
# load saved file if needed    
f = open('loftr_train_dic.json',)
loftr_train_dic = json.load(f)

  and should_run_async(code)
100%|██████████| 7500/7500 [00:12<00:00, 585.20it/s] 


In [29]:
# get the features and labels (difference of x, y coordinates) for linear regression model 
X, Y = [] , [] 
real_Y, match_Y = [], []
valid_keys = ['Affine_M','R_est',  't_est']
Affine_M_empty = [0, 0, 0, 0, 0, 0]
R_est_empty = [0, 0, 0,0, 0, 0,0, 0, 0]
t_est_empty = [0, 0, 0]
for k1 in loftr_train_dic.keys():
    for k2 in loftr_train_dic[k1].keys():
        x = []
        for k3 in valid_keys:
            a = loftr_train_dic[k1][k2][k3]
            if a != None:
                x.extend(np.array(a).reshape(-1))
            else:
                if k3 == 'Affine_M':
                    x.extend(Affine_M_empty)
                elif k3 == 'R_est':
                    x.extend(R_est_empty)
                elif k3 == 't_est':
                    x.extend(t_est_empty)
                else:
                    print(k3)
                    print(a)
        
    diff_x = all_set_dic[k2]['label'][0].item() - all_set_dic[k1]['label'][0].item()
    diff_y = all_set_dic[k2]['label'][1].item() - all_set_dic[k1]['label'][1].item()
    diff_xy = [diff_x, diff_y]
    X.append(x)
    Y.append(diff_xy)
    real_Y.append([all_set_dic[k1]['label'][0].item(), all_set_dic[k1]['label'][1].item()])
    match_Y.append([all_set_dic[k2]['label'][0].item(), all_set_dic[k2]['label'][1].item()])

In [250]:
# get affine matrix, and others from test images

for img1_name in tqdm(test_set_dic.keys()):
    max_num_pts = 0
    best_label, best_pts1, best_pts2, best_confidence = None, [], [], None
    Affine_M, Essential_M, R_est, t_est = None, None, None, None
    img1 = test_set_dic[img1_name]['value'] 

    selected_imgs_10 = find_top_similar_imgs(img1_name, n_top = 10, dic = test_sim_dic)
    loftr_test_dic[img1_name] = dict()
    
    t = 0.7
    s, e = 0, 10 # search the top 10 matching images
    while True:
        for img2_name in selected_imgs_10[s:e]:   # params to avoid tqdm mutiple printing
            img2 = all_set_dic[img2_name]['value']
            pts1, pts2, confidence = get_matched_points_by_loftr(img1, img2, threshold = t)
            
            if len(pts1) > max_num_pts:
                max_num_pts = len(pts1)
                best_img_name = img2_name
                best_label = all_set_dic[img2_name]['label']
                best_pts1 = pts1
                best_pts2 = pts2
                best_confidence = confidence

        if (max_num_pts <=5 and t == 0.7):
            t = 0
#             s, e = 3, 10
        else:
            break
    

    if max_num_pts == 0 :
        test_skipped.append(img1_name)
        print('here', len(test_skipped))
        loftr_test_dic[img1_name][best_img_name] = {'Affine_M': Affine_M, 'Essential_M': Essential_M, 
                                                     'R_est': R_est, 't_est': t_est,
                                                     'pts1':best_pts1, 'pts2': best_pts2, 
                                                     'confidence': best_confidence}
    else:
        if len(best_pts1) >= 6:
            e_src = np.float32(best_pts1)
            e_dst = np.float32(best_pts2)
        
            Essential_M, mask = cv2.findEssentialMat(e_src, e_dst)  # occurs error with only 5 points!!!
            points, R_est, t_est, mask_pose = cv2.recoverPose(Essential_M, e_src, e_dst)  # R_est: Rotation & t_est: transformation

        if len(best_pts1) >= 3:
            best3_pts1, best3_pts2 = zip(*random.sample(list(zip(best_pts1, best_pts2)), 3))
            best3_pts1, best3_pts2 = torch.stack(best3_pts1), torch.stack(best3_pts2)

            a_src = np.float32(best3_pts1)
            a_dst = np.float32(best3_pts2)
            Affine_M = cv2.getAffineTransform(a_src, a_dst)
            
        loftr_test_dic[img1_name][best_img_name] = {'Affine_M': Affine_M, 'Essential_M': Essential_M, 
                                                     'R_est': R_est, 't_est': t_est,
                                                     'pts1':best_pts1, 'pts2': best_pts2, 
                                                     'confidence': best_confidence}
'''Hailey'''


  and should_run_async(code)
100%|██████████| 1200/1200 [5:09:25<00:00, 15.47s/it]  


'Hailey'

In [251]:
# save the result into a dictionary and export it
for k1 in tqdm(loftr_test_dic.keys()):
    for k2 in loftr_test_dic[k1].keys():
        for k3 in loftr_test_dic[k1][k2].keys():
            a = loftr_test_dic[k1][k2][k3]
            if type(a) is not list and a is not None:
                loftr_test_dic[k1][k2][k3] = loftr_test_dic[k1][k2][k3].tolist()

with open('loftr_test_dic.json', 'w') as file:
    file.write(json.dumps(loftr_test_dic))

  and should_run_async(code)
100%|██████████| 1200/1200 [00:00<00:00, 1449.18it/s]


## Build LR

In [25]:
from sklearn.linear_model import LinearRegression
from sklearn.model_selection import cross_val_score
from sklearn.model_selection import train_test_split

x_train, x_test, y_train, y_test = train_test_split(X, Y, test_size = 0.1, random_state = 10)
# x_train, y_train = X, real_Y
lr = LinearRegression()
lr.fit(x_train,y_train)

scoring = "neg_mean_absolute_error"
results = cross_val_score(lr, x_train, y_train, scoring=scoring, n_jobs = 4)  # 2 n_jobs
print("Mean Absolute Error: ", results.mean()); print("Standard Deviation: ", results.std())


Mean Absolute Error:  -5.55179654706102
Standard Deviation:  0.45471223183850656


In [59]:
# MAE from all training data
train_pred = lr.predict(X)
train_result = []
for i in range(len(train_pred)):
    result_x = match_Y[i][0] + train_pred[i][0]
    result_y = match_Y[i][1] + train_pred[i][1]
    train_result.append([result_x, result_y])
    
total = 0
for i in range(len(train_result)):
    total += abs(train_result[i][0] - real_Y[i][0]) + abs(train_result[i][1] - real_Y[i][1])
print('MAE in train :',total/len(train_result) )    

MSE in train : 11.284431491847418


## Real prediction on test images for Kaggle

In [317]:
test_X= []  
test_result = []
valid_keys = ['Affine_M','R_est',  't_est']

for k1 in loftr_test_dic.keys():
    for k2 in loftr_test_dic[k1].keys():
        x = []
        for k3 in valid_keys:
            a = loftr_test_dic[k1][k2][k3]
            if a != None:
                x.extend(np.array(a).reshape(-1))
            else:
                if k3 == 'Affine_M':
                    x.extend(Affine_M_empty)
                elif k3 == 'R_est':
                    x.extend(R_est_empty)
                elif k3 == 't_est':
                    x.extend(t_est_empty)
                else:
                    print(k3)
                    print(a)
    test_X.append(x)
    test_pred = np.array(lr.predict([x])).reshape(-1)
    result_x = all_set_dic[k2]['label'][0].item() + test_pred[0]
    result_y = all_set_dic[k2]['label'][1].item() + test_pred[1]
    test_result.append([result_x, result_y])

  and should_run_async(code)


In [318]:
# export the results
header = ['id','x','y']
predicts = test_result

with open('new_result2.csv', 'w') as file:
    writer = csv.writer(file)
    writer.writerow(header)
    for imgfile, predict in zip(test_dic.keys(), predicts):
        imgfile = [imgfile.strip(".jpg")]
        imgfile.extend([predict[0], predict[1]])
        writer.writerow(imgfile)  

  and should_run_async(code)
