In [None]:
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.optim import lr_scheduler
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd 

from skimage import io,transform
from PIL import Image 
import os
# from torch.utils.data import Dataset
from torch.utils.data import Dataset,DataLoader
from torchvision import transforms, utils,models
device = torch.device("cuda:0:" if torch.cuda.is_available() else "cpu")

directory_name ="./pins-face-recognition/"


# list all directory of celebrity 
all_directory_path = [x[0] for x in os.walk(directory_name)][1:]

#vocabulary creation
#create necessary vocabulary for later use
dirs = sorted(os.listdir(directory_name))
name_to_classid = {d:i for i,d in enumerate(dirs)}
classid_to_name = {v:k for k,v in name_to_classid.items()}
num_classes = len(name_to_classid)
print("number of classes: "+str(num_classes))

# read all directories
img_paths = {c:[directory + "/" + img for img in sorted(os.listdir(directory_name+directory))] 
             for directory,c in name_to_classid.items()}
# retrieve all images
all_images_path = []
for img_list in img_paths.values():
    all_images_path += img_list
# map to integers
path_to_id = {v:k for k,v in enumerate(all_images_path)}
id_to_path = {v:k for k,v in path_to_id.items()}

#get total images
def totalImageCount(directory):
    total_images=0
    for i in range(len(all_directory_path)):
        total_images += len(list(os.walk(all_directory_path[i]))[0][2])
    return total_images
print("total directory is ",len(all_directory_path))
print("total images are " ,totalImageCount(all_directory_path))

In [None]:
#show sample image in all celebrity directory
def show_sample_images(n_samples=5,directory_name=directory_name):
    directory=os.listdir(directory_name)
    for each in directory[:n_samples]:
        plt.figure()
        currentFolder = directory_name+'/' + each
        for i, file in enumerate(os.listdir(currentFolder)[0:n_samples]):
            fullpath = currentFolder+"/"+ file
            i=i+1
            img=io.imread(os.path.join(fullpath))
            plt.subplot(1,n_samples,i)
            plt.imshow(img)
            plt.suptitle(each)
show_sample_images()

In [None]:
#show sample images from single celebrity directory
def show_sample_from_directory(directory_index):
    i=0
    print(classid_to_name[directory_index],"images")
    _ ,fig = plt.subplots(4, 4, figsize=(12,12))
    dir_images=img_paths[directory_index]
    
    fig = fig.flatten()
    for f in fig:
        image_path=os.path.join(directory_name+"/"+str(dir_images[i]))
        f.imshow(io.imread(image_path))
        i=i+1
show_sample_from_directory(40)

In [None]:
dataset = pd.read_csv("dataset.csv")
dataset.head()

In [None]:
#split dataset
from sklearn.model_selection import train_test_split
dataset_train,dataset_validation =train_test_split(dataset,test_size=0.2)

#check the distribution of classes 
dataset_validation.groupby("class").count().head(13)

In [None]:
print("number of training set ",len(dataset_train))
print("number of development set",len(dataset_validation))

#necessary vocab(for developer understanding who is a  yo yo ganjini )
classid_to_name

In [None]:
#Data Preprocessing

class PinterestDataset(Dataset):
    '''
    Dataframe : pandas dataframe which contains celebrity class and corresponding image path
    key note:
    we have to choose positive and negative sample for each targets for dev and training set
    '''
    def __init__(self,dataframe,transform):
        self.data = dataframe
        self.transform = transform
#         self.train = isTrain
    def __getitem__(self,idx):
        if torch.is_tensor(idx):
            idx = idx.tolist()
        #get image name
        image_name= str(self.data.iloc[idx,0])
        #get image array
        anchor_image = io.imread(image_name)
        #get class of the image
        class_value = self.data.iloc[idx,1]
        #do positive sampling
        #get all data relative to the corresponding image class
        get_all_positive = self.data[self.data["class"]==class_value]
        #remove the particular image data from queried data
        get_all_positive = get_all_positive[get_all_positive["images"]!=image_name]
        #choose random data from that filtered dataset
        positive_frame =get_all_positive.sample(n=1)
        positive_image = io.imread(str(positive_frame.iloc[0,0]))
        #choose negative data by taking random sampling except that particular class
        get_all_negative = self.data[self.data["class"]!=class_value]
        negative_frame = get_all_negative.sample(n=1)
        negative_image = io.imread(str(negative_frame.iloc[0,0]))
        
#return_array = [anchor_image,positive_image,negative_image]
        
        if self.transform:
            anchor_image = self.transform(anchor_image)
            positive_image = self.transform(positive_image)
            negative_image = self.transform(negative_image)
       # return torch.tensor([anchor_image,positive_image,negative_image])
        return {
            "anchor":anchor_image,
            "positive_image":positive_image,
            "negative_image":negative_image
        }
    
    def __len__(self):
        return len(self.data)

In [None]:
#Transformation Custom Loader Class

class Rescale(object):
    """Rescale the image in a sample to a given size.

    Args:
        output_size (tuple or int): Desired output size. If tuple, output is
            matched to output_size. If int, smaller of image edges is matched
            to output_size keeping aspect ratio the same.
    """

    def __init__(self, output_size):
#         assert isinstance(output_size, (int, tuple))
        self.output_size = output_size

    def __call__(self, sample):
        
        image = sample
        
        h, w = image.shape[:2]
#         print(h,w)
#         if isinstance(self.output_size, int):
#             if h > w:
#                 new_h, new_w = self.output_size * h / w, self.output_size
#             else:
#                 new_h, new_w = self.output_size, self.output_size * w / h
#         else:
#             new_h, new_w = self.output_size
        new_h = self.output_size
        new_w = self.output_size
#         new_h, new_w = int(new_h), int(new_w)

        img = transform.resize(image, (new_h, new_w,3))
        return img

In [None]:
class ToTensor(object):
    """Convert ndarrays in sample to Tensors."""

    def __call__(self, sample):
        
        image = sample
#         print(type(torch.from_numpy(image)))
        # swap color axis because
        # numpy image: H x W x C
        # torch image: C X H X W
#         print(image.shape)
     
        image = image.transpose((2, 0, 1))
        
        return image

In [None]:
class Normalize(object):
    def __init__(self,mean,std):
        self.mean=mean
        self.std=std
    def __call__(self,sample):
        image = sample
        normalized=  (image -self.mean) / self.std
        return normalized

In [None]:
transformed_dataset_train = PinterestDataset(dataset_train,
                                        transform=transforms.Compose([Rescale(224),ToTensor(),Normalize(0.5,0.5)
                                                          ]))
transformed_dataset_validation = PinterestDataset(dataset_validation,
                                        transform=transforms.Compose([Rescale(224),ToTensor(),Normalize(0.5,0.5)
                                                          ]))

In [None]:
# for i in range(8000):
#     a=transformed_dataset_train[i]

# plt.imshow(a["anchor"][0])

train_dataloader=DataLoader(transformed_dataset_train,batch_size=32,shuffle=True)
validation_dataloader=DataLoader(transformed_dataset_validation,batch_size=32,shuffle=True)

def im_convert(tensor):
 
#     image = tensor.numpy()
    image = tensor.transpose(1, 2, 0)
    image = image * np.array((0.5, 0.5, 0.5)) + np.array((0.5, 0.5, 0.5))
    image = image.clip(0, 1)
    return image
# plt.imshow((sample["image"]))

In [None]:
i= 898
sample_anchor= im_convert(transformed_dataset_train[i]["anchor"])
sample_pos= im_convert(transformed_dataset_train[i]["positive_image"])
sample_neg= im_convert(transformed_dataset_train[i]["negative_image"])

In [None]:
transformed_dataset_train[i]["anchor"].shape

plt.imshow(sample_anchor)

In [None]:
plt.imshow(sample_pos)

In [None]:
plt.imshow(sample_neg)

# for i,j in enumerate(train_dataloader):
#     print(j["anchor"].shape)
#     break

In [None]:
#Visualise Sampling batch

test=DataLoader(transformed_dataset_train,batch_size=5,shuffle=True)
test_batch =iter(test).next()

In [None]:
def img_convert(tensor):
    image = tensor.cpu().clone().detach().numpy()
    image = image.transpose(1, 2, 0)
    image = image * np.array((0.5, 0.5, 0.5)) + np.array((0.5, 0.5, 0.5))
    image = image.clip(0, 1)
    return image
# plt.imshow((sample["image"]))
def visualize_samples(dictionary):
#     print(n_samples)
    for key,data in enumerate(dictionary):
        plt.figure()
        plt.suptitle(data)
        for i in range((dictionary[data][:5].shape[0])):
            j=i
            i=i+1
            plt.subplot(1,5,i)            
            plt.imshow(img_convert(dictionary[data][j]))

visualize_samples(test_batch)

In [None]:
resnet = models.resnet18(pretrained=True)

# resnet= resnet.double()

for param in resnet.parameters():
    param.requires_grad = False

n_inputs = resnet.fc.in_features

last_layer = nn.Linear(n_inputs, 128)
resnet.fc = last_layer
resnet.to(device)

In [None]:
#Model Creation

class EmbeddingNet(nn.Module):
    def __init__(self,pretrained_net):
        super(EmbeddingNet, self).__init__()
        self.resnet = pretrained_net

    def forward(self, x):
        output = self.resnet(x)
        return output

    def get_embedding(self, x):
        return self.forward(x)

class TripletNet(nn.Module):
    '''
    input
    embedding net : ConvNet which takes torch.tensor input
     run parallel convnet for each batch
    '''
    def __init__(self, embedding_net):
        super(TripletNet, self).__init__()
        self.embedding_net = embedding_net

    def forward(self, x1, x2, x3):
        output1 = self.embedding_net(x1)
        output2 = self.embedding_net(x2)
        output3 = self.embedding_net(x3)
        return output1, output2, output3

    def get_embedding(self, x):
        return self.embedding_net(x)

embedding_net = EmbeddingNet(resnet)
model = TripletNet(embedding_net)

In [None]:
#Creating Triple Loss Class

class TripletLoss(nn.Module):
    """
    Input:
        margin : (float) 
    Triplet loss
    Takes embeddings of an anchor sample, a positive sample and a negative sample
    """

    def __init__(self, margin):
        super(TripletLoss, self).__init__()
        self.margin = margin

    def forward(self, anchor, positive, negative):
        distance_positive = ((anchor - positive)**2).sum(axis=1)  
        distance_negative = ((anchor - negative)**2).sum(axis=1) 
        losses = max(0,distance_positive - distance_negative + self.margin)
     
        return losses.mean()

margin = 0.5
loss_fn = TripletLoss(margin)
lr = 1e-3

In [None]:
optimizer = optim.Adam(model.parameters(), lr=lr)
scheduler = lr_scheduler.StepLR(optimizer, 8, gamma=0.1, last_epoch=-1)

In [None]:
# epochs = 5
# running_loss_history = []
# val_running_loss_history=[]
# losses=[]
# total_loss=0
# for e in range(epochs):
#     running_loss=0.0
#     val_running_loss=0.0
#     for i,batched_data in enumerate(train_dataloader):
#         print(i)
#         input_anchor= batched_data["anchor"].to(device)
#         input_positive = batched_data["positive_image"].to(device)
#         input_negative = batched_data["negative_image"].to(device)
#         optimizer.zero_grad()
#         outputs = model(input_anchor.float(),input_positive.float(),input_negative.float())
#         if type(outputs) not in (tuple, list):
#             outputs = (outputs,)
#         loss_inputs = outputs
#         loss_outputs = loss_fn(*loss_inputs)
#         loss = loss_outputs[0] if type(loss_outputs) in (tuple, list) else loss_outputs
#         losses.append(loss.item())
#         total_loss += loss.item()
#         loss.backward()
#         optimizer.step()
# #         loss = loss_fn(input_anchor.float(),input_positive.float(),input_negative.float())
# #         optimizer.zero_grad()
# #         loss.backward()
# #         optimizer.step()
#         running_loss+=loss.item()
#         print(loss.item())
#     else:
#         pass
# #         with torch.no_grad():
# #             for i,batched_val in enumerate(validation_dataloader):
# #                 val_input_anchor = batched_val["anchor"].to(device)
# #                 val_input_positive = batched_val["positive_image"].to(device)
# #                 val_input_negative = batched_val["negative_image"].to(device)
               
# #                 val_outputs = model(val_input_anchor.float(),val_input_positive.float(),val_input_negative.float())
# #                 val_loss = loss_fn(val_input_anchor.float(),val_input_positive.float(),val_input_negative.float())
# #                 val_running_loss+=val_loss.item()
# # #         print(val_loss.item())
    
# #   epoch_loss
# #     epoch_loss = running_loss/len(train_dataloader.dataset)
  
# #     running_loss_history.append(epoch_loss)
    
# #     val_epoch_loss = val_running_loss/len(validation_dataloader.dataset)
# #     val_running_loss_history.append(val_epoch_loss)
# #     print('epoch :', (e+1))
# #     print('training loss: {:.4f}'.format(epoch_loss))
# #     print('validation loss: {:.4f}'.format(val_epoch_loss))

In [None]:
def train_one_epoch(train_loader, model, loss_fn, optimizer,printer):

    model.train()
    losses = []
    total_loss = 0
    for batch_idx, batched_data in enumerate(train_loader):
        
        input_anchor= batched_data["anchor"].to(device).float()
        input_positive = batched_data["positive_image"].to(device).float()
        input_negative = batched_data["negative_image"].to(device).float()
        optimizer.zero_grad()
        outputs = model(input_anchor,input_positive,input_negative)

        if type(outputs) not in (tuple, list):
            outputs = (outputs,)
        loss_inputs = outputs
        loss_outputs = loss_fn(*loss_inputs)
        loss = loss_outputs[0] if type(loss_outputs) in (tuple, list) else loss_outputs
        losses.append(loss.item())
        total_loss += loss.item()
        loss.backward()
        optimizer.step()
    
        if batch_idx % printer == 0:
            message = 'Train: [{}/{} ({:.0f}%)]\tLoss: {:.6f}'.format(
                batch_idx * len(batched_data["anchor"][0]), len(train_loader.dataset),
                100. * batch_idx / len(train_loader), np.mean(losses))
            
            print(message)
            losses = []
    total_loss /= (batch_idx + 1)
    return total_loss

# train_one_epoch(train_dataloader,model,loss_fn,optimizer,5)

In [None]:
def test_one_epoch(val_loader, model, loss_fn):
    with torch.no_grad():
        model.eval()
        val_loss = 0
        for batch_idx,val_batched_data in enumerate(val_loader):
            print(batch_idx)
            val_anchor=val_batched_data["anchor"].to(device).float()
            val_positive = val_batched_data["positive_image"].to(device).float()
            val_negative =val_batched_data["negative_image"].to(device).float()
            outputs = model(val_anchor,val_positive,val_negative)

            if type(outputs) not in (tuple, list):
                outputs = (outputs,)
            loss_inputs = outputs
            loss_outputs = loss_fn(*loss_inputs)
            loss = loss_outputs[0] if type(loss_outputs) in (tuple, list) else loss_outputs
            val_loss += loss.item()

           

    return val_loss

# test_one_epoch(validation_dataloader,model,loss_fn)

In [None]:
def train(train_loader, val_loader, model, loss_fn, optimizer, scheduler, n_epochs, printer,start_epoch=0):
   
    for epoch in range(0, start_epoch):
        scheduler.step()

    for epoch in range(start_epoch, n_epochs):
        scheduler.step()

        # Train stage
        train_loss = train_one_epoch(train_loader, model, loss_fn, optimizer, printer)

        message = 'Epoch: {}/{}. Train set: Average loss: {:.4f}'.format(epoch + 1, n_epochs, train_loss)
        
        val_loss = test_one_epoch(val_loader, model, loss_fn)
        val_loss /= len(val_loader)

        message += '\nEpoch: {}/{}. Validation set: Average loss: {:.4f}'.format(epoch + 1, n_epochs,
                                                                                 val_loss)
       
        print(message)

train(train_dataloader,validation_dataloader,model,loss_fn,optimizer,scheduler,5,2)

In [None]:
torch.save(model.state_dict(), "saved_model v1.pth")

In [None]:
#Exploring Validation

sample_batch =iter(validation_dataloader).next()

In [None]:
sample_anchor = sample_batch["anchor"].float()
sample_positive = sample_batch["positive_image"].float()
sample_negative = sample_batch["negative_image"].float()

#predict for validation batch
#get embedding for anchor
sample_anchor_emb = model.get_embedding(sample_anchor)

#get embedding for positive 
sample_positive_emb = model.get_embedding(sample_positive)

#get embedding for negative 
sample_negative_emb=model.get_embedding(sample_negative)

from numpy import dot
from numpy.linalg import norm
def cosine_similarity(a,b):
    cos_sim = dot(a, b)/(norm(a)*norm(b))
    return cos_sim

i=31
#distance between anchor and negative
cosine_similarity(sample_anchor_emb[i].detach().numpy(),sample_negative_emb[i].detach().numpy())

In [None]:
#distance between anchor and positive
cosine_similarity(sample_anchor_emb[i].detach().numpy(),sample_positive_emb[i].detach().numpy())

In [None]:
dist_pos=[]
dist_neg =[]
for i in range(31):
    dist_pos.append(cosine_similarity(sample_anchor_emb[i].detach().numpy(),sample_positive_emb[i].detach().numpy()))
    dist_neg.append( cosine_similarity(sample_anchor_emb[i].detach().numpy(),sample_negative_emb[i].detach().numpy()) )
df = pd.DataFrame()
df["dist between positive and anchor"]= dist_pos
df["dist between negative and anchor"]= dist_neg

In [None]:
#single batch validation
df

In [None]:
def show_grid(image_array,n_row=1,n_col=4,vector_a=None,vector_b=None):

    i=0
    _ ,fig = plt.subplots(n_row, n_col, figsize=(12,12))
#   print(fig)
    
    fig = fig.flatten()
#   print(fig)
    for f in fig:
#     print(i,f)
        image= image_array[i]
        f.imshow(img_convert(image))
        if vector_a is not None:
            distance = cosine_similarity(vector_a[i].detach().numpy(),vector_b[i].detach().numpy())
            f.title.set_text(str(distance))
        i=i+1
    
#     f.title(str(aseth_value))
       
print("validation sample Anchor images are")
show_grid(sample_anchor)

In [None]:
print("sample validation positive images ")
show_grid(sample_positive,vector_a=sample_anchor_emb,vector_b=sample_positive_emb)

In [None]:
print("sample validation negative images are")
show_grid(sample_negative,vector_a=sample_anchor_emb,vector_b=sample_negative_emb)