In [None]:
# Import all the necessary Library 
import torchvision
import torch.utils.data as utils
from torchvision import datasets
import torchvision.transforms as transforms
from torch.utils.data import DataLoader,Dataset
from torch.autograd import Variable
import matplotlib.pyplot as plt
import torchvision.utils
import numpy as np
import copy
from torch.optim import lr_scheduler
import os
from PIL import Image
import torch
from torch.autograd import Variable
import PIL.ImageOps    
import torch.nn as nn
from torch import optim
import torch.nn.functional as F
import pandas as pd 
import random

In [None]:
torch.cuda.is_available()

In [None]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

In [None]:
def imshow(img,should_save=False):
    npimg = img.numpy()
    plt.axis("off")
    plt.imshow(np.transpose(npimg, (1, 2, 0)))
    plt.show()    

def show_plot(iteration,loss):
    plt.plot(iteration,loss)
    plt.show()

In [None]:
class Config():
    
    training_dir = "/home/kwanzo/Programming/Deep Learning/FB-ISC/Images/Augmented_Images/output"
    train_batch_size = 10
    train_number_epochs = 20

In [None]:
#Directory names list used later
sets = [os.listdir()[i] for i in range(len(os.listdir())) if 'set' in os.listdir()[i]]

#list of images
images = []
for root, dirnames, f in os.walk('/home/kwanzo/Programming/Deep Learning/FB-ISC/Images/Augmented_Images/output'):
    images.append(f)

#First two elements are irrelevant
images = images[2:]

In [None]:
def df_columns():
    #Building the first column of the dataframe of image pairs
    image_names = sorted(images[0])
    lst1 = []
    flag = 0
    for i in range(len(image_names)):
        
        while flag < 3:
            lst1.append('set1/' + image_names[i])
            flag += 1
            
        while flag < 5:
            lst1.append(random.choice(sets[1:]) + '/' + image_names[i])
            flag += 1
            
        flag = 0

    #2nd column
    lst2 = []
    flag = 0
    for i in range(len(image_names)):
        while flag < 3:
            lst2.append(random.choice(sets) + '/' + image_names[i])
            flag += 1
            
        while flag < 5:
            lst2.append(random.choice(sets) + '/' + image_names[random.randint(0, 3499)])
            flag += 1
            
        flag = 0
    
    
    #Labels
    labels = []
    for i in range(len(lst1)):
        if lst1[i][4:] == lst2[i][4:]:
            labels.append(1)
        else:
            labels.append(0)
    
    return lst1, lst2, labels

In [None]:
#inintializing the columns and labels
column1, column2, labels = df_columns()


In [None]:
#Dataframe of images
df = pd.DataFrame({'Image1': column1,
                  'Image2': column2,
                  'Label': labels})
df

In [None]:
class SiameseNetworkDataset():
    
    def __init__(self,df=None,training_dir=None,transform=None):
        # used to prepare the labels and images path
        self.training_df=df
        self.training_dir = training_dir    
        self.transform = transform
        
    def __getitem__(self,index):
        
        # getting the image path
        image1_path=os.path.join(self.training_dir,self.training_df.iat[index,0])
        image2_path=os.path.join(self.training_dir,self.training_df.iat[index,1])
        
        
        # Loading the image
        img0 = Image.open(image1_path)
        img1 = Image.open(image2_path)
        img0 = img0.convert("RGB")
        img1 = img1.convert("RGB")
        
        # Apply image transformations
        if self.transform is not None:
            img0 = self.transform(img0)
            img1 = self.transform(img1)
        
        return img0, img1, torch.from_numpy(np.array([int(self.training_df.iat[index,2])], dtype=np.float32))
        
    def __len__(self):
        return len(self.training_df)

In [None]:
siamese_dataset = SiameseNetworkDataset(df,training_dir,
                                        transform=transforms.Compose([transforms.Resize((105,105)),
                                                                      transforms.ToTensor()
                                                                      ])
                                       )

In [None]:
# Viewing the sample of images and to check whether its loading properly

vis_dataloader = DataLoader(siamese_dataset,
                        shuffle=True,
                        batch_size=8,
                        pin_memory = True)
dataiter = iter(vis_dataloader)


example_batch = next(dataiter)
concatenated = torch.cat((example_batch[0], example_batch[1]), 0)
imshow(torchvision.utils.make_grid(concatenated))

print(example_batch[2].shape)


In [1]:
#Checking different pretrained models and their layers

# mod = torchvision.models.resnet18(pretrained = True)
# feat_lst = nn.Sequential(*(list(mod.children())))
# feat_lst

In [None]:

# class model_pretrained(nn.Module):
    
#     def __init__(self):
#         super(model_pretrained, self).__init__()
#         self.original_model = torchvision.models.resnet18(pretrained=True)
#         for self.param in self.original_model.parameters():
#             self.param.requires_grad = False
#         self.num_ftrs = self.original_model.fc.in_features
#         self.original_model.fc = nn.Linear(self.num_ftrs, 2)
# #         self.features = nn.Sequential(
# #             # stop at conv4
# #             *list(self.original_model.children())[:-7]
# #         )
        
# #       Defining the fully connected layers
#         self.fc1 = nn.Sequential(
#             nn.Linear(12288, 1024),
#             nn.ReLU(inplace=True),
#             nn.Dropout2d(p=0.5),
            
#             nn.Linear(1024, 128),
#             nn.ReLU(inplace=True),
            
#             nn.Linear(128, 2)
#         )



#     def forward_once(self, x):
#     # Forward pass 
#         output = self.features(x)
#         output = output.view(output.size()[0], -1)
#         output = self.fc1(output)
#         return output

#     def forward(self, inp1, inp2):
#         inp1 = self.forward_once(inp1)
#         inp2 = self.forward_once(inp2)
#         return inp1, inp2


In [None]:
def model_pretrained():
    model_conv = torchvision.models.resnet18(pretrained=True)
    for param in model_conv.parameters():
        param.requires_grad = False

    # Parameters of newly constructed modules have requires_grad=True by default
    num_ftrs = model_conv.fc.in_features
    
    # Output size = 2
    model_conv.fc = nn.Linear(num_ftrs, 2)

    model_conv = model_conv.to(device)
    
    return model_conv

In [None]:
#Loss function 

class ContrastiveLoss(torch.nn.Module):

    def __init__(self, margin=1.0):
        super(ContrastiveLoss, self).__init__()
        self.margin = margin

    def forward(self, output1, output2, label):
        euclidean_distance = F.pairwise_distance(output1, output2)
        loss_contrastive = torch.mean((1-label) * torch.pow(euclidean_distance, 2) +
                                      (label) * torch.pow(torch.clamp(self.margin - euclidean_distance, min=0.0), 2))


        return loss_contrastive

In [None]:
# Load the dataset as pytorch tensors using dataloader
train_dataloader = DataLoader(siamese_dataset,
                        shuffle=True,
                        num_workers=8,
                        batch_size=Config.train_batch_size,
                             pin_memory=True)

In [None]:
# Declare Siamese Networkt
net = model_pretrained()
# Decalre Loss Function
criterion = ContrastiveLoss()
# Declare Optimizer
optimizer = optim.RMSprop(net.parameters(), lr=1e-3, alpha=0.99, eps=1e-8, weight_decay=0.0005, momentum=0.9)


In [None]:
def train():
    counter = []
    loss_history = [] 
    iteration_number= 0
    
    for epoch in range(0,Config.train_number_epochs):
        for i, data in enumerate(train_dataloader,0):
            img0, img1 , label = data
            img0, img1 , label = Variable(img0.cuda()), Variable(img1.cuda()), Variable(label.cuda())
            optimizer.zero_grad()
            output1,output2 = net(img0), net(img1)
            loss_contrastive = criterion(output1,output2,label)
            loss_contrastive.backward()
            optimizer.step()
            if i % 500 == 0 :
                print("Epoch number {}\n Current loss {}\n".format(epoch,loss_contrastive.item()))
                iteration_number += 500
                counter.append(iteration_number)
                loss_history.append(loss_contrastive.item())
    return net

In [None]:
device = torch.device('cuda')
# Train the model
model = train()
torch.save(model.state_dict(), training_dir + "/model.pt")
print("Model Saved Successfully")

In [None]:
# Load the saved model

model = model_pretrained().to(device)
model.load_state_dict(torch.load(training_dir + "/model.pt"))

In [None]:
# Load the test dataset
test_dataset = SiameseNetworkDataset(df,training_dir,
                                        transform=transforms.Compose([transforms.Resize((105,105)),
                                                                      transforms.ToTensor()
                                                                      ])
                                       )

test_dataloader = DataLoader(test_dataset,num_workers=6,batch_size=1,shuffle=True)

In [2]:
def eval(model, test_loader):
    with torch.no_grad():
        model.eval()
        correct = 0
        count = 0
        margin = 2

        for mainImg, testImg, label in test_dataloader:
            pred = None
            
            mainImg = mainImg.to(device)
            # determine which category an image belongs to
        
            testImg = testImg.to(device)
            output1, output2 = model(mainImg, testImg)
            
            #Euclidean Distance
            result = F.pairwise_distance(output1.cuda(), output2.cuda())
            if result <= margin and label.item() == 1:
                correct += 1
            
            count += 1
            
            if count % 1000 == 0:
                print("Accuracy = {}".format(100*(correct/count)))

In [None]:
eval(model, test_dataloader)