In [None]:
# Import necessary packages

%matplotlib inline
import torchvision
import torchvision.datasets as dset
import torchvision.transforms as transforms
from torch.utils.data import DataLoader,Dataset
import matplotlib.pyplot as plt
import torchvision.utils
import numpy as np
import random
from PIL import Image
import torch
from torch.autograd import Variable
import PIL.ImageOps    
import torch.nn as nn
from torch import optim
import torch.nn.functional as F
import zipfile
import torchvision.models as models
import pandas as pd
import numpy as np

In [None]:
#Mount Driver
from google.colab import drive
drive.mount('/content/drive')

In [None]:
#Methods to display image for visualization

def imshow(img,text=None,should_save=False):
    npimg = img.numpy()
    plt.axis("off")
    if text:
        plt.text(75, 8, text, style='italic',fontweight='bold',
            bbox={'facecolor':'white', 'alpha':0.8, 'pad':10})
    plt.imshow(np.transpose(npimg, (1, 2, 0)))
    plt.show()    

def show_plot(iteration,loss):
    plt.plot(iteration,loss)
    plt.title('Number of Epochs vs Loss ')
    plt.xlabel('Number of Epochs')
    plt.ylabel('Loss')
    plt.show()

In [None]:
#Contrastive loss

class ContrastiveLoss(torch.nn.Module):
    """
    Contrastive loss function.
    Based on: http://yann.lecun.com/exdb/publis/pdf/hadsell-chopra-lecun-06.pdf
    """

    def __init__(self, margin=2.0):
        super(ContrastiveLoss, self).__init__()
        self.margin = margin

    def forward(self, output1, output2, label):
        euclidean_distance = F.pairwise_distance(output1, output2)
        loss_contrastive = torch.mean((1-label) * torch.pow(euclidean_distance, 2) +
                                      (label) * torch.pow(torch.clamp(self.margin - euclidean_distance, min=0.0), 2))

        return loss_contrastive

In [None]:
#SpatialPyramid_pooling

import math
def spatial_pyramid_pool(previous_conv, num_sample, previous_conv_size, out_pool_size_row=[5, 4, 3, 2, 1, 3, 1], out_pool_size_height=[5, 4, 3, 2, 1, 2, 3]):
    '''
    previous_conv: a tensor vector of previous convolution layer
    num_sample: an int number of image in the batch
    previous_conv_size: an int vector [height, width] of the matrix features size of previous convolution layer
    out_pool_size: a int vector of expected output size of max pooling layer
    
    returns: a tensor vector with shape [1 x n] is the concentration of multi-level pooling
    '''    
    # print(previous_conv.size())
    for i in range(len(out_pool_size_row)):
        # print(previous_conv_size)
        h_wid = int(math.ceil(previous_conv_size[0] / out_pool_size_height[i]))
        w_wid = int(math.ceil(previous_conv_size[1] / out_pool_size_row[i]))
        h_pad = (h_wid*out_pool_size_height[i] - previous_conv_size[0] + 1)/2
        w_pad = (w_wid*out_pool_size_row[i] - previous_conv_size[1] + 1)/2
        maxpool = nn.MaxPool2d((h_wid, w_wid), stride=(h_wid, w_wid), padding=(int(math.floor(h_pad)), int(math.floor(w_pad))))
        x = maxpool(previous_conv)
        # print(x.size())
        if(i == 0):
            # print("\X size:",x.size())
            spp = x.view(x.size()[0], x.size()[1],-1, 1)
            # print("\nspp size:",spp.size())
        else:
            spp = torch.cat((spp,x.view(x.size()[0], x.size()[1],-1, 1)), 2)
            # print("\nsize:",spp.size())
    # print("\nsize:",spp.size())
     
    return spp

In [None]:
#SiameseNetwork Architecture
# # spatial_out_size = [  #spatial pyramid out put for 7 level-pyramid
# #  [5,5], 
# #  [4,4], 
# #  [3, 3], 
# #  [2, 2], 
# #  [1, 1],
# #  [3, 2],
# #  [1, 3]
# # ]
# out_spatial_size_row =    [5, 4, 3, 2, 1]
# out_spatial_size_height = [5, 4, 3, 2, 1]

class CNN1(nn.Module):
  def __init__(self):
    super().__init__()
    # self.output_num_row = out_spatial_size_row
    # self.output_num_height = out_spatial_size_height

    self.pad = nn.ZeroPad2d(1)
    self.conv1 = nn.Conv2d(3, 64, kernel_size=3, stride=1)#, padding=1
    self.conv2 = nn.Conv2d(64, 64, kernel_size=3, stride=1)#, padding=1
    self.max_pool1 = nn.MaxPool2d(kernel_size=2, stride=2) #, padding=0

    self.conv3 = nn.Conv2d(64, 128, kernel_size=3, stride=1)#, padding=1
    self.conv4 = nn.Conv2d(128, 128, kernel_size=3, stride=1)#, padding=1
    self.conv5 = nn.Conv2d(128, 256, kernel_size=3, stride=1)#, padding=1
    self.conv6 = nn.Conv2d(256, 256, kernel_size=3, stride=1)#, padding=1

    self.conv7 = nn.Conv2d(256, 256, kernel_size=3, stride=1)#, padding=1
    self.conv8 = nn.Conv2d(256, 512, kernel_size=3, stride=1)#, padding=1
    self.conv9 = nn.Conv2d(512, 512, kernel_size=3, stride=1)#, padding=1
    self.conv10 = nn.Conv2d(512, 512, kernel_size=3, stride=1)#, padding=1

    self.conv11 = nn.Conv2d(512, 512, kernel_size=3, stride=1)#, padding=1
    self.conv12 = nn.Conv2d(512, 512, kernel_size=3, stride=1)#, padding=1
    self.conv13 = nn.Conv2d(512, 512, kernel_size=3, stride=1)#, padding=1
    #upto here the output is of size = (512, 14, 14)

    #spatial dimesionality reduction 
    self.conv14 = nn.Conv2d(512, 64, kernel_size=3, padding=1, stride=1)
    self.conv15 = nn.Conv2d(64, 64, kernel_size=1, stride=1) #something here
    self.fc1 = nn.Sequential(
        nn.Linear(64*64*1, 500),
        nn.ReLU(inplace=True),

        nn.Linear(500, 500),
        nn.ReLU(inplace=True),

        nn.Linear(500, 50)
    )
  def forward_once(self, x):
      output = self.pad(F.relu(self.conv1(self.pad(x)))) #conv1
      output = self.pad(self.max_pool1(F.relu(self.conv2(output)))) #conv2
      output = self.pad(F.relu(self.conv3(output))) #conv3
      output = self.pad(self.max_pool1(F.relu(self.conv4(output)))) #conv4
      
      output = self.pad(F.relu(self.conv5(output)))#conv5
      output = self.pad(F.relu(self.conv6(output))) #conv 6
      output = self.pad(self.max_pool1(F.relu(self.conv7(output)))) #conv7
      output = self.pad(F.relu(self.conv8(output))) #conv 8
      output = self.pad(F.relu(self.conv9(output))) #conv 9
      output = self.pad(self.max_pool1(F.relu(self.conv10(output)))) #conv10
      output = self.pad(F.relu(self.conv11(output))) #conv 11
      output = self.pad(F.relu(self.conv12(output))) #conv 12
      output = F.relu(self.conv13(output)) #conv13
      
      

      output = spatial_pyramid_pool(output, 1, [int(output.size(2)), int(output.size(3))])
      
      #spatial dimesionality reduction
      output = F.relu(self.conv14(output)) #conv14
      output = F.relu(self.conv15(output)) #conv15

      #size upto here (-1, 64, 64, 1)

      output = output.view(output.size()[0], 1, output.size()[1], output.size()[2])
      #size upto here (-1, 1, 64, 64)
      
      output = output.view(output.size()[0], -1)
      #to check how it works
      output = self.fc1(output)
      return output

  #return a size of (2*64*64, 1-d) for each image
  def forward(self, input1, input2): #add here ''
      output1 = self.forward_once(input1)
      # ssp1 = spatial_pyramid_pool(output1, 1, [int(output1.size(2)), int(output1.size(3))], self.out_spatial_size)
      output2 = self.forward_once(input2)
      # ssp2 = spatial_pyramid_pool(output2, 1, [int(output2.size(2)), int(output2.size(3))], self.out_spatial_size)
      return output1, output2 


In [None]:
# Save and Load Models
# to save and load the model
def save_models(cnn1_model, name):
  # torch.save(cnn1_model, "SavedModel/"+name+".pt")
  torch.save(cnn1_model, "/content/drive/MyDrive/SavedModel/FinalModelSaved/"+name+".pt")
  
def load_models( name):
  # cnn1_model=torch.load("SavedModel/"+name+".pt")
  cnn1_model=torch.load("/content/drive/MyDrive/SavedModel/FinalModelSaved/"+name+".pt")
  return cnn1_model

In [None]:
#load Keras Weight into PyTorch model

# pip install deepface

#calling the dependencies
from deepface import DeepFace
import cv2
import matplotlib.pyplot as plt
  #calling VGGFace
model_name = "VGG-Face"
model = DeepFace.build_model(model_name)

weights = model.get_weights() #load parameters from vggface

# Create model CNN1
cnn1_model = CNN1().cuda()
cnn1_model

# from torchsummary import summary
# summary(cnn1_model, (3, 224, 224), (3, 224, 224))

# Create model CNN2
# cnn2_model = CNN2().cuda()
# cnn2_model

# from torchsummary import summary
# summary(cnn2_model, (2, 64, 64))

def allocate_keras_vggface_weight_to_pyt():
  conv_layers = [cnn1_model.conv1, cnn1_model.conv2, cnn1_model.conv3, cnn1_model.conv4, 
               cnn1_model.conv5, cnn1_model.conv6, cnn1_model.conv7, cnn1_model.conv8, 
               cnn1_model.conv9, cnn1_model.conv10, cnn1_model.conv11, cnn1_model.conv12, 
               cnn1_model.conv13]
  index_count = 0
  for conv in conv_layers:
    conv.weight.data = torch.from_numpy(np.transpose(weights[index_count])).cuda()
    index_count += 1
    conv.bias.data = torch.from_numpy(weights[index_count]).cuda()
    index_count += 1
allocate_keras_vggface_weight_to_pyt()

In [None]:
# Dataset Visualization

#set path to the dataset 
class Config():
    training_dir = "/content/drive/MyDrive/FinalDataSet/AllDatasets/Training"
    testing_dir = "/content/drive/MyDrive/FinalDataSet/AllDatasets/Testing"
    # training_dir = "/content/drive/MyDrive/training"
    # testing_dir = "/content/drive/MyDrive/testing"
    train_batch_size = 32
    train_number_epochs = 400

Config.training_dir

#custom dataset 
class SiameseNetworkDataset(Dataset):
    
    def __init__(self,imageFolderDataset,transform=None,should_invert=False):
        self.imageFolderDataset = imageFolderDataset    
        self.transform = transform
        self.should_invert = should_invert
        
    def __getitem__(self,index):
        img0_tuple = random.choice(self.imageFolderDataset.imgs)
        #we need to make sure approx 50% of images are in the same class
        should_get_same_class = random.randint(0,1) 
        if should_get_same_class:
            while True:
                #keep looping till the same class image is found
                img1_tuple = random.choice(self.imageFolderDataset.imgs) 
                if img0_tuple[1]==img1_tuple[1]:
                    break
        else:
            while True:
                #keep looping till a different class image is found
                
                img1_tuple = random.choice(self.imageFolderDataset.imgs) 
                if img0_tuple[1] !=img1_tuple[1]:
                    break

        img0 = Image.open(img0_tuple[0])
        img1 = Image.open(img1_tuple[0])
        img0 = img0.convert("RGB")
        img1 = img1.convert("RGB")
        
        if self.should_invert:
            img0 = PIL.ImageOps.invert(img0)
            img1 = PIL.ImageOps.invert(img1)

        if self.transform is not None:
            img0 = self.transform(img0)
            img1 = self.transform(img1)
        
        return img0, img1 , torch.from_numpy(np.array([int(img1_tuple[1]!=img0_tuple[1])],dtype=np.float32))
    
    def __len__(self):
        return len(self.imageFolderDataset.imgs)

folder_dataset = dset.ImageFolder(root=Config.training_dir)

siamese_dataset = SiameseNetworkDataset(imageFolderDataset=folder_dataset,
                                        transform=transforms.Compose([transforms.Resize((224,224)),
                                                                      transforms.ToTensor()
                                                                      ])
                                       ,should_invert=False)

visual_dataloader = DataLoader(siamese_dataset,
                        shuffle=True,
                        num_workers=2,
                        batch_size=32)
dataiter = iter(visual_dataloader)


example_batch = next(dataiter)
concatenated = torch.cat((example_batch[0],example_batch[1]),0)
imshow(torchvision.utils.make_grid(concatenated))
print(example_batch[2].numpy())


example_batch[1].size(), example_batch[0].size() # [batch_size, #channels, width, height]