In [1]:
# Connect to drive
from google.colab import drive
drive.mount("/content/drive", force_remount=True)

Mounted at /content/drive


In [2]:
import cv2 as cv
import numpy as np
import skimage.measure
from pathlib import Path
import pandas as pd
import re
import torch
from torch.utils.data import Dataset, DataLoader, RandomSampler, SequentialSampler
from google.colab.patches import cv2_imshow
import torchvision.transforms.functional as fn
from sklearn.model_selection import train_test_split
import random
import torch.nn.functional as F

In [3]:
data_path = '/content/drive/MyDrive/Project/Interview/Nan_junior/Data/'
image_size = 50

# Load template

In [4]:
data_dir = Path(data_path + 'templates')
paths_train = list((data_dir).glob('*.png'))[0:]

hero_names = []
hero_templates = []

for path in paths_train:
	x = re.search(r'[\]][\w\.\_]+Original', str(path))
	hero_name = re.sub(r'[0-9]','',x.group()[1:-9]).lower()
	hero_names.append(hero_name)

	img = cv.imread(str(path))
	img = cv.resize(img, [image_size,image_size], interpolation = cv.INTER_AREA)
	hero_templates.append(img)



# Data augmentation

In [5]:
def padding(img):
  resized_img = cv.resize(img, [img.shape[0]-10,img.shape[1]-10], interpolation = cv.INTER_AREA)
  top, bottom, left, right = 5, 5, 5, 5
  padded_img = cv.copyMakeBorder(resized_img, top, bottom, left, right, cv.BORDER_CONSTANT, value=[0, 0, 0])
  return padded_img

def crop_circle(img):
  # print(img)
  hh, ww = img.shape[:2]
  hc, wc = hh//2, ww//2
  radius = hh//2

  mask = np.zeros(img.shape[:2], dtype="uint8")
  # mask2 = np.zeros_like(img)
  mask = cv.circle(mask, (hc,wc), radius, (255,255), -1)
  cropped_img = cv.bitwise_and(img, img, mask=mask)
  return cropped_img



def reduce_resolution(img, scale_factor, iter):
  height, width, _ = img.shape

  low_resolution_img = cv.resize(img, [width, height], interpolation = cv.INTER_LINEAR)
  for i in range(iter):
    low_resolution_img = cv.resize(low_resolution_img, [width//scale_factor, height//scale_factor], interpolation = cv.INTER_LINEAR)
    low_resolution_img = cv.resize(low_resolution_img, [width, height], interpolation = cv.INTER_LINEAR)
  return low_resolution_img

def change_brightness(img, value=-40):
	hsv = cv.cvtColor(img, cv.COLOR_BGR2HSV)
	h, s, v = cv.split(hsv)
	v = cv.add(v,value)
	v[v > 255] = 255
	v[v < 0] = 0
	final_hsv = cv.merge((h, s, v))
	img = cv.cvtColor(final_hsv, cv.COLOR_HSV2BGR)
	return img

In [6]:
hero_dataset = []

for i in range(len(hero_templates)):
  hero_images = []
  hero_images.append(crop_circle(hero_templates[i]))

  hero_images.append(change_brightness(crop_circle(hero_templates[i])))

  hero_images.append(crop_circle(reduce_resolution(hero_templates[i],4,3)))

  hero_images.append(crop_circle(reduce_resolution(hero_templates[i],4,5)))

  hero_images.append(crop_circle(reduce_resolution(hero_templates[i],5,5)))

  hero_images.append(crop_circle(reduce_resolution(hero_templates[i],7,4)))

  hero_images.append(change_brightness(crop_circle(reduce_resolution(hero_templates[i],4,3))))

  hero_images.append(change_brightness(crop_circle(reduce_resolution(hero_templates[i],4,5))))

  hero_images.append(change_brightness(crop_circle(reduce_resolution(hero_templates[i],5,5))))

  hero_images.append(change_brightness(crop_circle(reduce_resolution(hero_templates[i],7,4))))

  #####
  hero_dataset.append(hero_images)



In [7]:
print(len(hero_dataset))
print(len(hero_dataset[0]))
print(len(hero_dataset[1]))

79
10
10


# Custom dataset

In [8]:
#preprocessing and loading the dataset
class SiameseDataset(Dataset):
    def __init__(self, training_data=None, setSize = 7110):
        # used to prepare the labels and images path
        self.train_data = training_data
        self.size = setSize

    def __len__(self):
      return self.size # 10 * len(self.train_data)

    def __getitem__(self,index):
      img1 = None
      img2 = None
      label = None
      
      if index % 2 == 0: #select them same character for both images
        character = random.choice([*range(len(self.train_data))])
        i1 = random.choice([*range(len(self.train_data[character]))])
        i2 = random.choice([*range(len(self.train_data[character]))])

        while i1 == i2:
          i2 = random.choice([*range(len(self.train_data[character]))])

        img1 = self.train_data[character][i1]
        img2 = self.train_data[character][i2]
        label = 1.0
      else:
        character1 = random.choice([*range(len(self.train_data))])
        character2 = random.choice([*range(len(self.train_data))])

        while character1 == character2:
          character2 = random.choice([*range(len(self.train_data))])
        i1 = random.choice([*range(len(self.train_data[character1]))])
        i2 = random.choice([*range(len(self.train_data[character2]))])
        img1 = self.train_data[character1][i1]
        img2 = self.train_data[character1][i2]
        label = 0.0


      img1 = np.stack([img1[:,:,0], img1[:,:,1], img1[:,:,2]])
      img2 = np.stack([img2[:,:,0], img2[:,:,1], img2[:,:,2]])

      img1 = torch.tensor(img1, dtype=torch.float32)
      img2 = torch.tensor(img2, dtype=torch.float32)
      label = torch.from_numpy(np.array([label], dtype=np.float32)) 

      # img1 = img1 / 255.0
      # img2 = img2 / 255.0

      return img1, img2, label

In [9]:
print(len(hero_dataset))
dataset = SiameseDataset(hero_dataset)
train_dataset, val_dataset = torch.utils.data.random_split(dataset,[0.8,0.2])


train_dataloader = DataLoader(train_dataset, shuffle = True, batch_size = 64)
val_dataloader = DataLoader(val_dataset, shuffle = True, batch_size = 64)
print(len(train_dataloader))
print(len(val_dataloader))

79
89
23


In [10]:
# img1, img2, l = dataset[0]
# img1[1, 60:68, 60:68]

# Model

In [11]:
class SiameseNetwork(torch.nn.Module):
    def __init__(self):
        super(SiameseNetwork, self).__init__()
        # Setting up the Sequential of CNN Layers
        self.cnn = torch.nn.Sequential(
            torch.nn.Conv2d(3, 96, kernel_size=11, stride=1),
            torch.nn.ReLU(),
            torch.nn.MaxPool2d(3, stride=2),
            torch.nn.Dropout2d(p=0.3),
            
            torch.nn.Conv2d(96, 256, kernel_size=5, stride=1, padding=2),
            torch.nn.ReLU(),
            torch.nn.MaxPool2d(3, stride=2),
            torch.nn.Dropout2d(p=0.3),

            torch.nn.Conv2d(256, 384 , kernel_size=3, stride=1, padding=1),
            torch.nn.ReLU(),
            torch.nn.MaxPool2d(3, stride=2),
            # torch.nn.Dropout2d(p=0.3),
        )
        # Defining the fully connected layers
        self.fc = torch.nn.Sequential(
            torch.nn.Linear(6144, 256),
            # torch.nn.ReLU(),
            # torch.nn.Dropout2d(p=0.5),
            
            # torch.nn.Linear(1024, 128),
            # torch.nn.ReLU(),
            # torch.nn.Linear(128,16)
            )
        
    def forward_once(self, x):
        # Forward pass 
        output = self.cnn(x)
        output = output.view(output.size()[0], -1)
        output = self.fc(output)
        return output

    def forward(self, input1, input2):
        # forward pass of input 1
        feat1 = self.forward_once(input1)
        # forward pass of input 2
        feat2 = self.forward_once(input2)
        return feat1, feat2

# Loss function

In [12]:
class ContrastiveLoss(torch.nn.Module):
    """
    Contrastive loss function.
    """

    def __init__(self, margin = 3.0):
        super(ContrastiveLoss, self).__init__()
        self.margin = margin

    def forward(self, x0, x1, y):
        # euclidian distance
        diff = x0 - x1   # (batch, embedd size)
        # print("diff:", diff.shape)

        dist_sq = torch.mean(torch.pow(diff, 2), 1)   # (batch, )
        # print("dist_sq:", dist_sq.shape)

        dist = torch.sqrt(dist_sq)
        # print("dist:", dist.shape)

        mdist = self.margin - dist
        # print("mdist:", mdist.shape)

        dist = torch.clamp(mdist, min=0.0)
        # print("new_dist:", dist.shape)
        
        y = torch.squeeze(y)
        # print(y.shape)
        loss = torch.mean(y * dist_sq + (1 - y) * torch.pow(dist, 2))
        # loss = torch.sum(loss) / x0.size()[0]
        # print("loss:", loss.shape)
        return loss

# Config

In [13]:
batch_size = 64
num_epoch = 10

# Training

In [14]:
# Declare Siamese Network
net = SiameseNetwork().cuda()
# Decalre Loss Function
criterion = ContrastiveLoss()
# Declare Optimizer
optimizer = torch.optim.Adam(net.parameters(), lr=1e-5, weight_decay=0.0005)

#train the model
def train():
    train_loss = 0.0
    valid_loss = 0.0
    train_loss_list = []
    valid_loss_list = []
    best_valid_loss = float('Inf')
    global_step = 0
    global_step_list = []
    valid_period = len(val_dataloader)
    # loss=[] 
    # counter=[]
    # iteration_number = 0
    net.train()
    for epoch in range(1, num_epoch + 1):
        for i, data in enumerate(train_dataloader,0):
            img0, img1 , label = data
            img0, img1 , label = img0.cuda(), img1.cuda() , label.cuda()
            optimizer.zero_grad()

            output1, output2 = net(img0, img1)
            loss_contrastive = criterion(output1, output2, label)
            loss_contrastive.backward()
            optimizer.step()  
            
            # print(loss_contrastive.item())
            train_loss += loss_contrastive.item()
            global_step += 1

            if global_step % valid_period == 0:
              net.eval()
              with torch.no_grad():
                for _, data in enumerate(val_dataloader):
                  img0, img1 , label = data
                  img0, img1 , label = img0.cuda(), img1.cuda() , label.cuda()
                  output1, output2 = net(img0, img1)
                  loss_contrastive = criterion(output1, output2, label)
                  valid_loss += loss_contrastive.item()

              train_loss = train_loss / valid_period
              valid_loss = valid_loss / valid_period
              train_loss_list.append(train_loss)
              valid_loss_list.append(valid_loss)
              global_step_list.append(global_step)

              # Print sumary
              print('Epoch [{}/{}], global_step [{}/{}], train loss {:.4f}, valid loss {:.4f}'\
                    .format(epoch, num_epoch, global_step, num_epoch * len(train_dataloader), train_loss, valid_loss))
              # Save check point if model is better
              if best_valid_loss > valid_loss:
                best_valid_loss = valid_loss
                torch.save(net.state_dict(), data_path + "weights/model.pt")
                
              train_loss, valid_loss = 0.0, 0.0
              net.train()

        # print("Epoch {}\n Current loss {}\n".format(epoch, loss_contrastive.item()))
        # iteration_number += 10
        # counter.append(iteration_number)
        # loss.append(loss_contrastive.item())
    # show_plot(counter, loss)   
    return net

#set the device to cuda
# device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = train()
# torch.save(model.state_dict(), data_path + "weights/model.pt")
print("Training Finished")

Epoch [1/10], global_step [23/890], train loss 15.7359, valid loss 3.0470
Epoch [1/10], global_step [46/890], train loss 6.6836, valid loss 3.4824
Epoch [1/10], global_step [69/890], train loss 4.0181, valid loss 3.5375
Epoch [2/10], global_step [92/890], train loss 3.3881, valid loss 3.6453
Epoch [2/10], global_step [115/890], train loss 2.7773, valid loss 3.7310
Epoch [2/10], global_step [138/890], train loss 2.6211, valid loss 3.7973
Epoch [2/10], global_step [161/890], train loss 2.4875, valid loss 3.8239
Epoch [3/10], global_step [184/890], train loss 2.4745, valid loss 3.7891
Epoch [3/10], global_step [207/890], train loss 2.4808, valid loss 3.8192
Epoch [3/10], global_step [230/890], train loss 2.3972, valid loss 3.8715
Epoch [3/10], global_step [253/890], train loss 2.4015, valid loss 3.9386
Epoch [4/10], global_step [276/890], train loss 2.4240, valid loss 3.8931
Epoch [4/10], global_step [299/890], train loss 2.3464, valid loss 3.9655
Epoch [4/10], global_step [322/890], trai

# Test

In [15]:
def generate_test_image_pairs(images_dataset, labels_dataset, image):
   
    pair_images = []
    pair_labels = []

    for i, hero in enumerate(images_dataset):
      r = random.choice([*range(len(hero))])
      test_image = hero[r]
      pair_images.append((image, test_image))
      pair_labels.append(labels_dataset[i])
    return np.array(pair_images), np.array(pair_labels)


def process_test_image(img):
  hh, ww = img.shape[0:2]
  crop = img[0 : img.shape[0], 0 : ww//2]
  crop = cv.bilateralFilter(crop, 11, 75, 75)

  gray_img = cv.cvtColor(crop, cv.COLOR_BGR2GRAY)
  circles_img = cv.HoughCircles(gray_img,  cv.HOUGH_GRADIENT,1.0, ww,
                              param1=50, param2=10, minRadius=hh//5, maxRadius=hh//2+5)
  if circles_img is not None:
    circles_img = np.uint16(np.around(circles_img))
    for i in circles_img[0,:]:
      # Crop square of character's avatar
      r = i[2]
      if i[0] <= r:
        left = 0
      else:
        left = i[0]-r
      right = i[0] + r

      if i[1] <= r:
        top = 0
      else:
        top = i[1]-r
      bottom = i[1] + r

      crop = img[top:bottom, left:right]
      # crop = cv.detailEnhance(crop, sigma_s=10, sigma_r=0.15)
      # kernel = np.array([[-1,-1,-1], [-1,9,-1], [-1,-1,-1]])
      # crop = cv.filter2D(crop, -1, kernel)
      
      # Crop circle and convert to size 128x128
      square_hh, square_ww = crop.shape[:2]
      square_hc, square_wc = square_hh//2, square_hh//2
      radius = square_hh//2
      
      mask = np.zeros(crop.shape[:2], dtype="uint8")
      mask = cv.circle(mask, (square_hc, square_wc), radius, (255,255), -1)
      cropped_img = cv.bitwise_and(crop, crop, mask=mask)
      cropped_img = cv.resize(cropped_img, [image_size,image_size], interpolation = cv.INTER_CUBIC)
    return cropped_img
    
  else:
    crop = img[0 : img.shape[0], 0 : ww//3]
    cropped_img = cv.resize(crop, [image_size,image_size], interpolation = cv.INTER_CUBIC)
    return cropped_img

In [16]:
data_dir = Path(data_path + 'test_images')
paths_train = list((data_dir).glob('*.jpg'))[0:]

test_images = []
test_labels = []

with open(data_path + 'test.txt','r') as f:
  test_samples = f.readlines()

test_data_path = data_path + 'test_images/'

for sample in test_samples:
  file_name, label = sample.split('\t')
  label = re.sub(r'[0-9]','',label).lower()
  test_labels.append(label.strip('\n'))
  
  # print(file_name)
  img = cv.imread(test_data_path + file_name)
  test_images.append(process_test_image(img))
  
print(len(test_images))

98


In [17]:
#preprocessing and loading the dataset
class TestDataset(Dataset):
    def __init__(self, training_data=None, testing_data = None):
        # used to prepare the labels and images path
        self.train_data = training_data
        self.test_data = testing_data

    def __len__(self):
      return len(self.train_data) * len(self.test_data)

    def __getitem__(self,index):
      test_index = index // len(self.train_data)
      train_index = index % len(self.train_data)
      img1 = self.test_data[test_index]

      r = random.choice([*range(len(self.train_data[train_index]))])
      img2 = self.train_data[train_index][r]

      img1 = np.stack([img1[:,:,0], img1[:,:,1], img1[:,:,2]])
      img2 = np.stack([img2[:,:,0], img2[:,:,1], img2[:,:,2]])

      img1 = torch.tensor(img1, dtype=torch.float32)
      img2 = torch.tensor(img2, dtype=torch.float32)

      # img1 = img1 / 255.0
      # img2 = img2 / 255.0

      return img1, img2

In [18]:
test_dataset = TestDataset(hero_dataset, test_images)
test_dataloader = DataLoader(test_dataset, shuffle = False, num_workers = 8, batch_size = len(hero_dataset))
print(len(test_dataloader))

98




In [19]:
acc = 0

# model = SiameseNetwork()
# model.load_state_dict(torch.load(data_path + "weights/model.pt"))
model.cuda()
model.eval()
with torch.no_grad():
  for label_index, data in enumerate(test_dataloader):
    img0, img1 = data
    img0, img1 = img0.cuda(), img1.cuda()
    output1, output2 = model(img0, img1)
    euclidean_distance = F.pairwise_distance(output1, output2)
    predict_index = torch.argmin(euclidean_distance).item()

    if test_labels[label_index] == hero_names[predict_index]:
      acc+=1

print('Accuracy:', acc, '/', len(test_images))

Accuracy: 80 / 98
