In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import random_split
from torch.utils.data import DataLoader
from torch.utils.data import Dataset
import torchvision
import torchvision.datasets as datasets
import torchvision.transforms as transforms
import math
import matplotlib.pyplot as plt 
import numpy as np
import plotly.figure_factory as ff
import cv2
import numpy as np
import random

import torch.optim as optim
from torch.optim.lr_scheduler import ReduceLROnPlateau
from PIL import Image
from matplotlib.pyplot import imshow
from IPython.display import display
from random import choice
from random import randint
from pathlib import Path

In [None]:
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
print(device)

cuda:0


In [None]:
from google.colab import drive 
drive.mount('/content/gdrive/')

Mounted at /content/gdrive/


In [None]:
!git clone https://github.com/mihaidusmanu/d2-net.git
% cd /content/d2-net/hpatches_sequences/
!bash ./download.sh

In [None]:
!tar xvzf /content/gdrive/My\ Drive/OxfordRelease.tar.gz -C /content
!tar xvzf /content/gdrive/My\ Drive/Viewpoints.tar.gz -C /content
!tar xvzf /content/gdrive/My\ Drive/WebcamRelease.tar.gz -C /content

In [None]:
theta_ranges = [(0, 15), (15, 45), (45, 90), (90, 180)]
cur_theta_idx = 0

In [None]:
!if [ -d '/content/gdrive/My Drive/Patches' ]; then echo "Directory already exist" ; else mkdir '/content/gdrive/My Drive/Patches' && echo "Directory created"; fi
!if [ -d '/content/gdrive/My Drive/RotatedPatches_15' ]; then echo "Directory already exist" ; else mkdir '/content/gdrive/My Drive/RotatedPatches_15' && echo "Directory created"; fi
!if [ -d '/content/gdrive/My Drive/RotatedPatches_45' ]; then echo "Directory already exist" ; else mkdir '/content/gdrive/My Drive/RotatedPatches_45' && echo "Directory created"; fi
!if [ -d '/content/gdrive/My Drive/RotatedPatches_90' ]; then echo "Directory already exist" ; else mkdir '/content/gdrive/My Drive/RotatedPatches_90' && echo "Directory created"; fi
!if [ -d '/content/gdrive/My Drive/RotatedPatches_180' ]; then echo "Directory already exist" ; else mkdir '/content/gdrive/My Drive/RotatedPatches_180' && echo "Directory created"; fi

In [None]:
image_keypoints_path = '/content/gdrive/My Drive/image_keypoints.npy'
patches_folder_path = '/content/gdrive/My Drive/Patches/'
rotated_patches_folder_path = '/content/gdrive/My Drive/RotatedPatches_'
thetas_folder_path = '/content/gdrive/My Drive/thetas_'

In [None]:
def create_keypoints(nfeatures, save):
  orb = cv2.ORB_create(nfeatures)
  f = open('/content/d2-net/image_list_hpatches_sequences.txt', 'r')

  image_keypoints = []

  while True:
    img_path = f.readline()
    if not img_path:
      break
    
    img_path = '/content/d2-net/' + img_path[:-1]

    src = cv2.imread(img_path, cv2.IMREAD_GRAYSCALE)

    keypoint, _ = orb.detectAndCompute(src, None)
    keypoints_xy = cv2.KeyPoint_convert(keypoint)

    image_keypoints.append((img_path, keypoints_xy))

  f.close()

  if save:
    # (image path, keypoints) 파일 저장
    np_image_keypoints = np.array(image_keypoints)
    np.save(image_keypoints_path, np_image_keypoints)

  return image_keypoints

def load_keypoints():
  image_keypoints = np.load(image_keypoints_path, allow_pickle=True)

  return image_keypoints

In [None]:
def create_patches(image_keypoints, save):
  patches = []

  idx = 0
  for image_idx, (img_path, keypoints) in enumerate(image_keypoints):
    if image_idx % 50 == 0:
      print(image_idx)
    cur_img = Image.open(img_path)

    for x, y in keypoints:
      tmp_img = cur_img.copy()
      if x - 16 < 0 or y - 16 < 0 or x + 16 >= cur_img.width or y + 16 >= cur_img.height:
          print("Invalid cropping range")
          break
      patch = tmp_img.crop((x - 16, y - 16, x + 16, y + 16)) # left, upper, right, lower
      if save:
        patch.save(patches_folder_path + str(idx) + '.jpg')
      patches.append(patch)

      idx += 1

  return patches

def load_patches():
  patches = []
  idx = 0
  while True:
    image_path = Path(patches_folder_path + str(idx) + '.jpg')
    if not image_path.is_file():
      break
    
    image = Image.open(image_path)
    patches.append(image)

    idx += 1
    
  return patches

In [None]:
def create_rotated_patches_thetas(theta_range, image_keypoints, save):
  theta_min, theta_max = theta_range

  rotated_patches = []
  thetas = []

  idx = 0
  for image_idx, (img_path, keypoints) in enumerate(image_keypoints):
    if image_idx % 50 == 0:
      print(image_idx)
    cur_img = Image.open(img_path)

    for x, y in keypoints:
      if theta_min == 0:
        theta = randint(-theta_max, theta_max)
      else:
        theta = choice([randint(theta_min + 1,theta_max),randint(-theta_max, -theta_min - 1)])
      thetas.append(theta)

      tmp_img = cur_img.copy()
      rotated_patch = tmp_img.rotate(theta, center=(x, y)).crop((x - 16, y - 16, x + 16, y + 16))
      if save:
        rotated_patch.save(rotated_patches_folder_path + str(theta_max) + '/' + str(idx) + '.jpg')
      rotated_patches.append(rotated_patch)

      idx += 1

  if save:
    np_thetas = np.array(thetas)
    np.save(thetas_folder_path + str(theta_max) + '.npy', np_thetas)
    # np_theta = np.load('/content/gdrive/My Drive/thetas_' + str(theta_max))

  return rotated_patches, thetas

def load_rotated_patches(theta_range):
  _, theta_max = theta_range

  rotated_patches = []
  idx = 0
  while True:
    image_path = Path(rotated_patches_folder_path + str(theta_max) + '/' + str(idx) + '.jpg')
    if not image_path.is_file():
      break
    
    image = Image.open(image_path)
    rotated_patches.append(image)

    idx += 1
    
  return rotated_patches

def load_thetas(theta_range):
  _, theta_max = theta_range
  return np.load(thetas_folder_path + str(theta_max) + '.npy')

In [None]:
image_keypoints_path = Path(image_keypoints_path)
if image_keypoints_path.is_file():
  print("image_keypoints already exists.")
else:
  print("no image_keypoints file")

no image_keypoints file


In [None]:
Oxford_classes = ['bark', 'bikes', 'boat', 'graf', 'leuven', 'notredame', 'obama', 'paintedladies', 'rushmore', 'trees', 'ubc', 'wall', 'yosemite']
Webcam_classes = ['Chamonix', 'Courbevoie', 'Frankfurt', 'Mexico', 'Panorama', 'StLouis']
Viewpoints_classes = ['chatnoir', 'duckhunt', 'mario', 'outside', 'posters']

test_dataset_names = ['OxfordRelease', 'Viewpoints', 'WebcamRelease']
test_classes_per_datasets = [Oxford_classes, Viewpoints_classes, Webcam_classes]

In [None]:
def create_test_keypoints(nfeatures, test_dataset_name, test_classes):
  print("test dataset:", test_dataset_name)
  orb = cv2.ORB_create(nfeatures)
  image_keypoints = []

  for test_class in test_classes:
    print("test_class:", test_class)
    path_prefix = '/content/' + test_dataset_name + '/' + test_class + '/test/'
    file_path = path_prefix + 'test_imgs.txt'
    f = open(file_path, 'r')

    while True:
      img_path = f.readline()
      if not img_path:
        break
      
      img_path = path_prefix + 'image_color/' + img_path[11:-1]

      src = cv2.imread(img_path, cv2.IMREAD_GRAYSCALE)

      keypoint, _ = orb.detectAndCompute(src, None)
      keypoints_xy = cv2.KeyPoint_convert(keypoint)

      image_keypoints.append((img_path, keypoints_xy))

    f.close()

  print()

  return image_keypoints

def save_test_keypoints(test_image_keypoints, test_dataset_name):
  # (image path, keypoints) 파일 저장
  np_test_image_keypoints = np.array(test_image_keypoints)
  np.save('/content/gdrive/My Drive/' + test_dataset_name + '/test_image_keypoints.npy', np_test_image_keypoints)


def load_test_keypoints(test_dataset_name):
  test_image_keypoints = np.load('/content/gdrive/My Drive/' + test_dataset_name + '/test_image_keypoints.npy', allow_pickle=True)

  return test_image_keypoints

In [None]:
def create_test_patches(image_keypoints, test_dataset_name, save):
  test_patches = []
  idx = 0

  for image_path, keypoints in image_keypoints:
    cur_img = Image.open(image_path)

    for x, y in keypoints:
      tmp_img = cur_img.copy()
      if x - 16 < 0 or y - 16 < 0 or x + 16 >= cur_img.width or y + 16 >= cur_img.height:
          print("Invalid cropping range")
          break
      patch = tmp_img.crop((x - 16, y - 16, x + 16, y + 16)) # left, upper, right, lower
      if save:
        patch.save('/content/gdrive/My Drive/' + test_dataset_name + '/TestPatches/' + str(idx) + '.jpg')
      test_patches.append(patch)
      
      idx += 1

  return test_patches

def load_test_patches(test_dataset_name):
  test_patches = []
  idx = 0
  while True:
    image_path = Path('/content/gdrive/My Drive/' + test_dataset_name + '/TestPatches/' + str(idx) + '.jpg')
    if not image_path.is_file():
      break
    
    image = Image.open(image_path)
    test_patches.append(image)

    idx += 1
    
  return test_patches

In [None]:
def create_test_rotated_patches_thetas(theta_range, image_keypoints, test_dataset_name, save):
  theta_min, theta_max = theta_range
  test_rotated_patches = []
  test_thetas = []
  idx = 0

  for image_path, keypoints in image_keypoints:
      cur_img = Image.open(image_path)

      for x, y in keypoints:
        if theta_min == 0:
          theta = randint(-theta_max, theta_max)
        else :
          theta = choice([randint(theta_min + 1,theta_max),randint(-theta_max, -theta_min - 1)])
        test_thetas.append(theta)

        tmp_img = cur_img.copy()
        rotated_patch = tmp_img.rotate(theta, center=(x, y)).crop((x - 16, y - 16, x + 16, y + 16))
        if save:
          rotated_patch.save('/content/gdrive/My Drive/' + test_dataset_name + '/TestRotatedPatches_' + str(theta_max) + '/' + str(idx) + '.jpg')
        test_rotated_patches.append(rotated_patch)
        
        idx += 1
  
  if save:
    np_test_thetas = np.array(test_thetas)
    np.save('/content/gdrive/My Drive/' + test_dataset_name + '/test_thetas_' + str(theta_max) + '.npy', np_test_thetas)

  return test_rotated_patches, test_thetas

def load_test_rotated_patches(theta_range, test_dataset_name):
  _, theta_max = theta_range

  test_rotated_patches = []
  idx = 0
  while True:
    image_path = Path('/content/gdrive/My Drive/' + test_dataset_name + '/TestRotatedPatches_' + str(theta_max) + '/' + str(idx) + '.jpg')
    if not image_path.is_file():
      break
    
    image = Image.open(image_path)
    test_rotated_patches.append(image)

    idx += 1
    
  return test_rotated_patches

def load_test_thetas(theta_range, test_dataset_name):
  _, theta_max = theta_range
  return np.load('/content/gdrive/My Drive/' + test_dataset_name + '/test_thetas_' + str(theta_max) + '.npy')

In [None]:
class CustomDataset(Dataset): 
  def __init__(self, patches, rotated_patches, thetas):
    if len(patches) != len(thetas) or len(patches) != len(rotated_patches):
      print("data length is different")
      return 

    self.patches = patches
    self.rotated_patches = rotated_patches
    self.thetas = thetas

  # 총 데이터의 개수를 리턴
  def __len__(self): 
    return len(self.patches)

  # 인덱스를 입력받아 그에 맵핑되는 입출력 데이터를 파이토치의 Tensor 형태로 리턴
  def __getitem__(self, idx): 
    # patch = transforms.ToTensor()(self.patches[idx]).unsqueeze(0)
    patch = transforms.ToTensor()(self.patches[idx])
    # rotated_patch = transforms.ToTensor()(self.rotated_patches[idx]).unsqueeze(0)
    rotated_patch = transforms.ToTensor()(self.rotated_patches[idx])
    theta = self.thetas[idx]
    return patch, rotated_patch, theta

In [None]:
def create_dataloader(image_keypoints, cur_theta_idx, batch_size, save):
  patches = create_patches(image_keypoints, save=save)
  rotated_patches, thetas = create_rotated_patches_thetas(theta_ranges[cur_theta_idx], image_keypoints, save=save)

  dataset = CustomDataset(patches, rotated_patches=rotated_patches, thetas=thetas)
  dataloader = DataLoader(dataset, batch_size, shuffle=False)

  return dataloader

In [None]:
def create_test_dataloader(cur_theta_idx, test_dataset_name, test_classes, batch_size, nfeatures, save):
  test_image_keypoints = create_test_keypoints(nfeatures, test_dataset_name, test_classes)
  test_patches = create_test_patches(test_image_keypoints, test_dataset_name, save=save)
  test_rotated_patches, test_thetas = create_test_rotated_patches_thetas(theta_ranges[cur_theta_idx], test_image_keypoints, test_dataset_name, save=save)

  test_dataset = CustomDataset(test_patches, test_rotated_patches, test_thetas)
  test_dataloader = DataLoader(test_dataset, batch_size, shuffle=False)

  return test_dataloader

In [None]:
# (image path, keypoints) 생성
image_keypoints = create_keypoints(1000, save=True)

In [None]:
patches = create_patches(image_keypoints, save=True)
print(len(patches))

In [None]:
rotated_patches, thetas = create_rotated_patches_thetas(theta_ranges[2], image_keypoints, save=True)
print(len(rotated_patches))
print(len(thetas))

In [None]:
# (image path, keypoint) 파일 읽어오기
image_keypoints = load_keypoints()
print(len(image_keypoints))

648


In [None]:
dataloader_theta_range_0 = create_dataloader(image_keypoints, theta_ranges[cur_theta_idx], batch_size=128)

In [None]:
test_dataloaders_0 = []
for i, (test_dataset_name, test_classes) in enumerate(list(zip(test_dataset_names, test_classes_per_datasets))):
  if i == 0:
    continue
  test_dataloader = create_test_dataloader(theta_ranges[cur_theta_idx], test_dataset_name, test_classes, batch_size=32)
  test_dataloaders_0.append(test_dataloader)

test dataset: Viewpoints
test_class: chatnoir
test_class: duckhunt
test_class: mario
test_class: outside
test_class: posters

test dataset: WebcamRelease
test_class: Chamonix
test_class: Courbevoie
test_class: Frankfurt
test_class: Mexico
test_class: Panorama
test_class: StLouis



In [None]:
class Net(nn.Module):
  def __init__(self, theta_idx):
    super().__init__()
    self.conv1 = nn.Conv2d(3, 32, 3, padding=1)
    self.conv2 = nn.Conv2d(32, 32, 3, padding=1)
    self.conv3 = nn.Conv2d(32, 64, 3, padding=1)
    self.conv4 = nn.Conv2d(64, 64, 3, padding=1)
    self.conv5 = nn.Conv2d(64, 128, 3, padding=1)
    self.conv6 = nn.Conv2d(128, 128, 3, padding=1)
    self.conv7 = nn.Conv2d(128, 256, 3, padding=1)
    self.conv8 = nn.Conv2d(256, 256, 3, padding=1)

    self.bn1 = nn.BatchNorm2d(32)
    self.bn2 = nn.BatchNorm2d(32)
    self.bn3 = nn.BatchNorm2d(64)
    self.bn4 = nn.BatchNorm2d(64)
    self.bn5 = nn.BatchNorm2d(128)
    self.bn6 = nn.BatchNorm2d(128)
    self.bn7 = nn.BatchNorm2d(256)
    self.bn8 = nn.BatchNorm2d(256)

    self.maxPool = nn.MaxPool2d(2, stride=2)

    self.fc1 = nn.Linear(2*2*256, 2*2*256)
    self.fc2 = nn.Linear(2*2*256, 1)

    self.metric = 0
    self.optimizer = optim.SGD(self.parameters(), lr=0.01, momentum=0.9, weight_decay=0.0001)
    self.scheduler = ReduceLROnPlateau(self.optimizer, factor=0.1, patience=10, min_lr=1e-7)
    self.epoch_num = 100

    self.path = '/content/gdrive/My Drive/state_dict_' + str(theta_idx) + '.pt'
  
  def forward(self, x):
    x = F.relu(self.bn1(self.conv1(x)))
    x = F.relu(self.bn2(self.conv2(x)))
    x = self.maxPool(x)

    x = F.relu(self.bn3(self.conv3(x)))
    x = F.relu(self.bn4(self.conv4(x)))
    x = self.maxPool(x)

    x = F.relu(self.bn5(self.conv5(x)))
    x = F.relu(self.bn6(self.conv6(x)))
    x = self.maxPool(x)

    x = F.relu(self.bn7(self.conv7(x)))
    x = F.relu(self.bn8(self.conv8(x)))
    x = self.maxPool(x)

    x = x.view(-1, 2 * 2 * 256)
    x = self.fc1(x)
    x = self.fc2(x)
    return x

  def criterion(self, prediction, rotated_prediction, theta):
    batch_size = len(prediction)
    zero = torch.zeros([batch_size, 1]).to(device)
    Lr1 = torch.max(abs(prediction) - 180, zero)
    Lr2 = torch.max(abs(rotated_prediction) - 180, zero)
    diff = rotated_prediction - prediction
    theta = torch.reshape(theta, [batch_size, 1])
    Lp = torch.min(torch.abs(diff - theta), 2 * 180 - torch.abs(diff - theta))
    L = Lr1 + Lr2 + 1e-1 * Lp

    return L
    
  def resume(self):
    checkpoint = torch.load(self.path)
    self.load_state_dict(checkpoint['model_state_dict'])
    self.optimizer.load_state_dict(checkpoint['optimizer_state_dict'])
    self.scheduler.load_state_dict(checkpoint['scheduler_state_dict'])
    start_epoch = checkpoint['epoch']
    self.train_losses = checkpoint['loss']
    self.train_errors = checkpoint['error']

  def train(self, dataloader):
    self.train_losses = []
    self.train_errors = []

    start_epoch = 0;

    count = 0
    for i in range(start_epoch, self.epoch_num):
      count += 1
      if i % 50:
        torch.save({
            'epoch': i,
            'model_state_dict': self.state_dict(),
            'optimizer_state_dict': self.optimizer.state_dict(),
            'scheduler_state_dict': self.scheduler.state_dict(),
            'loss': self.train_losses,
            'error': self.train_errors
            }, self.path)
        
      if count % 100 == 0:
        print(count, end=" ")

      # train
      error = 0
      for patch, rotated_patch, theta in dataloader:
        patch = patch.to(device)
        rotated_patch = rotated_patch.to(device)
        theta = theta.to(device)

        prediction = self(patch)
        rotated_prediction = self(rotated_patch)
        
        self.optimizer.zero_grad()
        loss = self.criterion(prediction, rotated_prediction, theta)
        loss = torch.mean(loss)
        loss.backward()
        self.optimizer.step()
        self.scheduler.step(self.metric)

        error = torch.mean(torch.abs(torch.abs(prediction - rotated_prediction) - theta))

      self.train_losses.append(loss)
      self.train_errors.append(error)
    
    torch.save({
            'epoch': 0,
            'model_state_dict': self.state_dict(),
            'optimizer_state_dict': self.optimizer.state_dict(),
            'scheduler_state_dict': self.scheduler.state_dict(),
            'loss': self.train_losses,
            'error': self.train_errors
            }, self.path)
  
    print("train done.")

  def test(self, testloader):
    test_losses = []
    test_erorrs = []
    with torch.no_grad():
      for patch, rotated_patch, theta in testloader:
        patch = patch.to(device)
        rotated_patch = rotated_patch.to(device)
        theta = theta.to(device)

        prediction = self(patch)
        rotated_prediction = self(rotated_patch)
    
        loss = self.criterion(prediction, rotated_prediction, theta)
        test_losses.extend(loss)

        error = torch.abs(torch.abs(prediction - rotated_prediction) - theta)
        test_erorrs.extend(error)
      
    return test_losses
    
  def plot_training_curve(self):
    plt.plot(range(1, len(self.train_losses) + 1), self.train_losses, label="train loss")
    plt.xlabel('epoch num')
    plt.ylabel('loss')
    plt.title('Train Loss per Epoch')
    plt.legend()
    plt.show()

    plt.plot(range(1, len(self.train_errors) + 1), self.train_errors, label="train errors")
    plt.xlabel('epoch num')
    plt.ylabel('error')
    plt.title('Train Error per Epoch')
    plt.legend()
    plt.show()
    
  def plot_test_curve(self, test_losses, test_errors):
    plt.plot(range(1, len(test_losses) + 1), test_losses, label="test loss")
    plt.xlabel('epoch num')
    plt.ylabel('loss')
    plt.title('Test Loss per Epoch')
    plt.legend()
    plt.show()

    plt.plot(range(1, len(test_errors) + 1), test_errors, label="test error")
    plt.xlabel('epoch num')
    plt.ylabel('error')
    plt.title('Test Error per Epoch')
    plt.legend()
    plt.show()

In [None]:
# theta range 0
net = Net(cur_theta_idx)
net = net.to(device)
net.train(dataloader, resume=False)
net.plot_training_curve()
print("Last train loss:", net.train_losses[-1].item())

In [None]:
# test
net = Net(cur_theta_idx)
net = net.to(device)
net.resume()
for test_dataloader in test_dataloaders_0:
  print("test start")
  test_losses = net.test(test_dataloader)
  net.plot_test_curve(test_losses)
  print("Average test loss:", np.mean([x.item() for x in test_losses]))
  print()