In [1]:
import torch
import torch.nn as nn
from torch.autograd import Variable
import torchvision.models as models
from torchvision import transforms
import torch.nn.functional as F
from torch.utils.tensorboard import SummaryWriter
writer = SummaryWriter()
from torch.utils.data import Dataset, DataLoader

import numpy as np
import pandas as pd
from PIL import Image

import matplotlib.pyplot as plt 
import matplotlib.image as mpimg 


import os 
import shutil
from datetime import datetime  
from tqdm import tqdm
import random
import itertools

# Preprocessing

Function for creating augmented train, validation and test folders

In [9]:
def create_dataset_folders_augmented(lfw_dataset_cropped_augmented):
    # Configuration
    var_dataset_cropped_folder = lfw_dataset_cropped_augmented
    var_dataset_split_folder = 'cropped_split'
    var_temp_folder = './temp'
    var_val_split = 0.2
    var_test_split = 0.1

    # Create folders if they dont exist   
 
    if not os.path.exists(var_temp_folder):
        os.makedirs(var_temp_folder)
    if not os.path.exists(var_dataset_cropped_folder):
        os.makedirs(var_dataset_cropped_folder) 
    if not os.path.exists(var_dataset_split_folder):
        os.makedirs(var_dataset_split_folder)  
    if not os.path.exists(var_dataset_split_folder + '/train'):
        os.makedirs(var_dataset_split_folder+'/train')   
    if not os.path.exists(var_dataset_split_folder + '/validation'):
        os.makedirs(var_dataset_split_folder+'/validation') 
    if not os.path.exists(var_dataset_split_folder + '/test'):
        os.makedirs(var_dataset_split_folder+'/test')
    
    # Split persons in Train, Val and Test set  
    names = os.listdir(var_dataset_cropped_folder)   
    n_samples = len(names) 

    shuffled_indices = np.random.permutation(n_samples)
    testset_inds = shuffled_indices[:int(n_samples * var_test_split)]
    validationset_inds = shuffled_indices[int(n_samples * var_test_split) : int(n_samples * var_test_split) + int(n_samples * var_val_split)]
    trainingset_inds = shuffled_indices[int(n_samples * var_test_split) + int(n_samples * var_val_split) :] 


    for i in testset_inds: 
        shutil.copytree(f'{var_dataset_cropped_folder}/{names[i]}', f'{var_dataset_split_folder}/test/{names[i]}') 
    for i in trainingset_inds: 
        shutil.copytree(f'{var_dataset_cropped_folder}/{names[i]}', f'{var_dataset_split_folder}/train/{names[i]}') 
    for i in validationset_inds: 
        shutil.copytree(f'{var_dataset_cropped_folder}/{names[i]}', f'{var_dataset_split_folder}/validation/{names[i]}') 

In [10]:
"""
Create the splitted dataset folders
parameter: Dataset folder with cropped and augmented LFW images
"""

create_dataset_folders_augmented('../augmentation/lfw_cropped')

Function for creating contrastive pairs and csv files

In [11]:
def create_pairs(base_augmented, csv_target):
    base_augmented = base_augmented
    folders_augmented = os.listdir(base_augmented)
    with open(csv_target, "w", encoding = "utf-8") as handle:
        dataset = []
        for folder in folders_augmented:
            joined = os.path.join(base_augmented, folder)
            positive = os.listdir(joined)
            
            if len(os.listdir(joined)) == 1:
                other = os.path.join(base_augmented, random.choice(folders_augmented))
                negative = os.path.join(other, random.choice(os.listdir(other)))
                first = os.path.join(base_augmented + "/" + folder, random.choice(positive))
                handle.write(",".join([first, negative, "0"]))
                handle.write("\n")
                continue


            c = list(itertools.combinations(positive, 2))
            unq = list(set(c))[0:40]
            
            for val1, val2 in unq:
                #choose negative
                other = os.path.join(base_augmented, random.choice(folders_augmented))
                while other == joined:
                    other = os.path.join(base_augmented, random.choice(folders_augmented))
                negative = os.path.join(other, random.choice(os.listdir(other)))      
                first = os.path.join(base_augmented + "/" + folder, val1)
                second = os.path.join(base_augmented+ "/" + folder, val2)
                handle.write(",".join([first, second, "1"]))
                handle.write("\n")
                handle.write(",".join([first, negative, "0"]))
                handle.write("\n")

In [12]:
"""
Create the csv files with contrastive loss pairs
parameter: train, validation or test folder 
"""

create_pairs("cropped_split/train", "lfw_train_augmented.csv")
create_pairs("cropped_split/validation", "lfw_val_augmented.csv")
create_pairs("cropped_split/test", "lfw_test_augmented.csv")

Dataset class

In [16]:
# Create Datatset class
class LFWDataset(Dataset):
    """ Labeled Faces in the Wild dataset."""
    
    # define the constructor of this dataset object
    def __init__(self, csv):
        self.mean = [0.6158, 0.4637, 0.3757]
        self.std = [0.2124, 0.1863, 0.1812]
        
        # read csv file  
        with open(csv, 'r') as f:
            lines = f.readlines()
            self.img1_list = [
                i.split(",")[0] for i in lines
            ]
            self.img2_list = [
                i.split(",")[1] for i in lines
            ]       
            self.label_list = [int(i.split(",")[2].replace("\n","")) for i in lines]
            tmp1 = []
            tmp2 = []
            labels = []
            deleted = 0
            for i in range(len(self.img1_list)):
                if self.img1_list[i] != self.img2_list[i]:
                    tmp1.append(self.img1_list[i])
                    tmp2.append(self.img2_list[i])
                    labels.append(self.label_list[i])
                else:
                    deleted+=1
            print(deleted)
                
    def __getitem__(self, index):
        # gets two images and corresponding label, e.g. 0 for different and 1 for same person
        img1_path = self.img1_list[index]
        img2_path = self.img2_list[index]
        label = self.label_list[index]
        label=int(label)
        
        img1 = Image.open(img1_path)
        img2 = Image.open(img2_path)
       
        
        transform = transforms.Compose(
            [transforms.Resize([int(224), int(224)]),
             transforms.ToTensor(),
             transforms.Normalize(self.mean, self.std)]
        )
        img1 = transform(img1)
        img2 = transform(img2)
        
        return img1.float(), img1_path, img2.float(), img2_path, label
    
    
    def __len__(self):
        return len(self.label_list)
        

Function for creating train, validation and test datasets

In [21]:
def create_datasets(train_csv, val_csv, test_csv):
    train_csv = train_csv
    validation_csv = val_csv
    test_csv = test_csv
    
    lfw_dataset_train = LFWDataset(train_csv)
    lfw_dataset_val = LFWDataset(validation_csv)
    lfw_dataset_test = LFWDataset(test_csv)
    
    return lfw_dataset_train, lfw_dataset_val, lfw_dataset_test
    

In [None]:
"""
Create train, validation and test datasets
parameter: train, validation and test csv
"""

lfw_dataset_train, lfw_dataset_val, lfw_dataset_test = create_datasets("lfw_train_augmented.csv","lfw_val_augmented.csv","lfw_test_augmented.csv")

# Face Embedding Model

SiameseClassification class: Model parameters can be set in function "set_model_parameters"

In [24]:
"""
Siamese Model Class

    :param resnet_model: torch model 
    :param dimension: embedding vector dimensio
    :param output: output size of torch model
"""
class SiameseClassification(nn.Module):
    def __init__(self, resnet_model, dimension, output):
        super(SiameseClassification, self).__init__()
        self.base = resnet_model
        self.base.fc = nn.Linear(output, dimension)
        self.classifier = nn.Linear(dimension, 1)
        self.sigmoid = nn.Sigmoid()
        print(self.base)
    def forward(self, x, y):
        out1 = self.base(x)
        out2 = self.base(y)
        distance = torch.abs(out1-out2)
        out = self.sigmoid(self.classifier(distance))
        return out, out1, out2
  

# DataLoader

Function for creating train, validation and test dataloader

In [25]:
def dataloader(batch_size, lfw_dataset_train, lfw_dataset_val, lfw_dataset_test):
    batch_size = batch_size
    train_loader = torch.utils.data.DataLoader(dataset=lfw_dataset_train,
                                           batch_size=batch_size,
                                           num_workers=4,
                                           shuffle=True, sampler=None,
                                           collate_fn=None)

    val_loader = torch.utils.data.DataLoader(dataset=lfw_dataset_val,
                                           batch_size=batch_size,
                                           num_workers=4,
                                           shuffle=True, sampler=None,
                                           collate_fn=None)
    test_loader = torch.utils.data.DataLoader(dataset=lfw_dataset_test,
                                                   batch_size=1,
                                                   num_workers=4,
                                                   shuffle=False, sampler=None,
                                                   collate_fn=None)
    
    return train_loader, val_loader, test_loader

In [26]:
"""
Create the dataloader

    :param batch_size: default 64
    :param lfw_dataset_train: train dataset 
    :param lfw_dataset_val: validation dataset
    :param lfw_dataset_test: test dataset
"""

train_loader, val_loader, test_loader = dataloader(64,lfw_dataset_train, lfw_dataset_val, lfw_dataset_test)

# Loss Function

In [27]:
# Loss function
class ContrastiveLoss(nn.Module):
    """
    Contrastive loss
    Takes embeddings of two samples and a target label == 1 if samples are from the same class and label == 0 otherwise
    """

    def __init__(self, margin):
        super(ContrastiveLoss, self).__init__()
        self.margin = margin
        self.eps = 1e-9

    def forward(self, output1, output2, target, size_average=True):
        distances = (output2 - output1).pow(2).sum(1)  # squared distances
        losses = 0.5 * (target.float() * distances +
                        (1 + -1 * target).float() * F.relu(self.margin - (distances + self.eps).sqrt()).pow(2))
        return losses.mean() if size_average else losses.sum()



# Training

In [28]:
class TensorBoardLogger: 
    
    def __init__(self, runname):   
        path = 'logs/{}/{}'.format(datetime.today().strftime('%Y-%m-%d'), runname)
        if not os.path.exists(path):
            os.makedirs(path) 
        self.writer = SummaryWriter(path) 
    
    def log_loss(self, mode, loss, epoch): 
        self.writer.add_scalar(f"loss/{mode}", loss, epoch)  
        self.writer.close()
    
    def log_acc(self, mode, acc, epoch): 
        self.writer.add_scalar(f"acc/{mode}", acc, epoch) 
        self.writer.close()
        
    def log_lossdecay(self, mode, loss_decay, epoch): 
        self.writer.add_scalar(f"loss_decay/{mode}", loss_decay, epoch) 
        self.writer.close()

Function for setting model parameter

In [45]:
def set_model_parameter(learning_rate, model, dimension, decay_rate):
    lr = learning_rate

    resnet_model = model
    dimension = dimension
    if resnet_model == models.resnet18() or resnet_model == models.resnet34():
        ouput = 512
    else:
        output = 2048
    model = SiameseClassification(resnet_model, dimension, output)
    model = model.cuda()
    
    
    optimizer = torch.optim.Adam(model.parameters(), lr)
    my_lr_scheduler = torch.optim.lr_scheduler.ExponentialLR(optimizer=optimizer, gamma=decay_rate)
    
    return optimizer, my_lr_scheduler, model

In [46]:
"""
Set training parameters

    :param learning rate: default: 3e-4 
    :param torchvision model: default models.resnet101()
    :param embedding vector dimension: default: 256
    :param learning rate decay: default 0.95
"""

optimizer, my_lr_scheduler, model = set_model_parameter(3e-4, models.resnet101(), 256, 0.95)

ResNet(
  (conv1): Conv2d(3, 64, kernel_size=(7, 7), stride=(2, 2), padding=(3, 3), bias=False)
  (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (relu): ReLU(inplace=True)
  (maxpool): MaxPool2d(kernel_size=3, stride=2, padding=1, dilation=1, ceil_mode=False)
  (layer1): Sequential(
    (0): Bottleneck(
      (conv1): Conv2d(64, 64, kernel_size=(1, 1), stride=(1, 1), bias=False)
      (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (conv2): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
      (bn2): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (conv3): Conv2d(64, 256, kernel_size=(1, 1), stride=(1, 1), bias=False)
      (bn3): BatchNorm2d(256, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (relu): ReLU(inplace=True)
      (downsample): Sequential(
        (0): Conv2d(64, 256, kernel_size=(1, 1), stride=(1, 

Function for training a model

In [47]:
def trainer(epochs, run_tag, model, path, base_path):
    
    loss_func = nn.BCELoss()
    aux_loss = ContrastiveLoss(1)
    epochs = epochs
    per_epoch_history = []
    best = 99.0
    best_acc = 0.0
    tag = run_tag
    logger = TensorBoardLogger(run_tag)

    use_second_loss = False

    for epoch in range(epochs):
        train_bar = tqdm(train_loader, total = len(train_loader), position=0, leave=True)
        running_loss = 0.0
        loss_history = []
        model.train()
        running_acc = 0.0
        for i, data in enumerate(train_bar):
            image1s, img1_path, image2s, img2_path, labels = data
            image1s = image1s.cuda()
            image2s = image2s.cuda()
            labels = torch.tensor(labels, dtype =torch.float).cuda()
            optimizer.zero_grad()
            classout, f1, f2 = model.forward(image1s, image2s)
            first = aux_loss(f1, f2, labels)
            loss =  first
            if use_second_loss:
                second = loss_func(classout[:,0], labels)
                loss = 0.2*second + loss

            running_acc += ((classout[:,0] > 0.5) == labels).float().mean().item()
            loss.backward()
            optimizer.step()
            running_loss += loss.data.item()
            train_bar.set_description("Train: Epoch: %d, Loss: %.3f, Acc: %.3f, BestValAcc: %.2f" % (epoch,running_loss / (i+1), (running_acc / (i+1)), best_acc))
            train_bar.refresh()
        my_lr_scheduler.step()
        per_epoch_history.append(loss_history)
      
        logger.log_loss('train', running_loss / (i+1), epoch)
        logger.log_acc('train', running_acc / (i+1), epoch)
        
        
        
    #validate
        val_bar = tqdm(val_loader, total = len(val_loader), position=0, leave=True)     
        model.eval()
        running_acc = 0.0
        running_loss = 0.0
        per_person = []
        with torch.no_grad():
            for i, data in enumerate(val_bar):
                image1s, img1_path,  image2s, img2_path, labels = data
                image1s = image1s.cuda()
                image2s = image2s.cuda()
                labels = torch.tensor(labels, dtype =torch.float).cuda()
                classout, f1, f2 = model.forward(image1s, image2s)
                first = aux_loss(f1, f2, labels)
                loss =  first
                if use_second_loss:
                    second = loss_func(classout[:,0], labels)
                    loss = 0.2*second + loss
                running_loss += loss.data.item()
                running_acc += ((classout[:,0] > 0.5) == labels).float().mean().item()
                val_bar.set_description("Validation: Epoch: %d, Loss: %.3f, Acc: %.3f" % (epoch,running_loss / (i+1), (running_acc / (i+1))))
        running_acc /= len(val_loader)
        logger.log_loss('validation', running_loss / (i+1), epoch)  
        logger.log_acc('validation', best_acc, epoch)
                     
                        
        #save model
        best_acc = running_acc
        PATH = path
        BASE_PATH = base_path
        torch.save(model.state_dict(), PATH)
        torch.save(model.base.state_dict(), BASE_PATH)
        
        lr= np.asarray(my_lr_scheduler.get_lr())
        logger.log_lossdecay('train', lr , epoch)
    
    print("Training finished")
         

In [None]:
"""
Train a model

    :param number of epochs: default: 80
    :param model: spezified torchvision resnetmodel
    :param model path: default "resnet101.pth"
    :param model base path: default "base_resnet101.pth"
"""


trainer(80, 'resnet34', model, "resnet101.pth", "base_resnet101.pth")

# Load a model for evaluation

Function for testing the classification model

In [95]:
def set_model_for_evaluation(resnet_model, dimension, output, model_file_name):
    classification_model = SiameseClassification(resnet_model, dimension, output)
    if torch.cuda.is_available():
        classification_model.cuda()
    classification_model.load_state_dict(torch.load(model_file_name))
    return classification_model

In [96]:
"""
Load a model for evaulation

    :param model: model which was used for training
    :param embedding vector dimension: default 256
    :param output size of torch model: default 2048
    :param model path: default "resnet101.pth"
"""
classification_model = set_model_for_evaluation(models.resnet101(), 256, 2048, "resnet101.pth")

ResNet(
  (conv1): Conv2d(3, 64, kernel_size=(7, 7), stride=(2, 2), padding=(3, 3), bias=False)
  (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (relu): ReLU(inplace=True)
  (maxpool): MaxPool2d(kernel_size=3, stride=2, padding=1, dilation=1, ceil_mode=False)
  (layer1): Sequential(
    (0): Bottleneck(
      (conv1): Conv2d(64, 64, kernel_size=(1, 1), stride=(1, 1), bias=False)
      (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (conv2): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
      (bn2): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (conv3): Conv2d(64, 256, kernel_size=(1, 1), stride=(1, 1), bias=False)
      (bn3): BatchNorm2d(256, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (relu): ReLU(inplace=True)
      (downsample): Sequential(
        (0): Conv2d(64, 256, kernel_size=(1, 1), stride=(1, 

# Evaluation on Test_Loader

In [97]:
""" 
Performance measurements
"""  

class PerformanceMeasurement: 
    
    def __init__(self): 
        self.predictions = {}  
        self.thresholds = np.arange(0, 4, 0.01)   
    
    """ 
    Prediction inspired by faceguard pipeline 
    We calculate the distance between the anchor embedding and a 
    comparison embedding

    If the distance is larger than a threshold, we return that the 
    images are not the same (0) otherwise they are the same (1) 
    
    :param anchor_embedding: Embedding vector of the anchor 
    :param compare_embedding: Embedding vector to compare with the anchor 
    :param threshold: Highest distance until anchor and compare are the same  
    """
    def prediction(self, anchor_embedding, compare_embedding, threshold):  
        dist = np.linalg.norm(np.subtract(anchor_embedding.cpu().detach().numpy(), compare_embedding.cpu().detach().numpy()))   
        if dist > threshold: 
            return 0 
        else: 
            return 1 
    
    """ 
    Add new training run 
    :param anchor_embedding: Embedding vector of the anchor 
    :param anchor_embedding: Embedding vector of the same person as anchor  
    :param negative_embedding: Negative example 
    """
    def add_new_run(self, anchor_embedding, compare_embedding, ground_truth): 
        for threshold in self.thresholds: 
            #Create entry if it does not exist
            if threshold not in self.predictions: 
                self.predictions[threshold] = {}   
                self.predictions[threshold]['tp'] = 0 
                self.predictions[threshold]['fp'] = 0  
                self.predictions[threshold]['tn'] = 0  
                self.predictions[threshold]['fn'] = 0 

            # The positive prediction (so comparison between anchor and positive) should return 1
            pred = self.prediction(anchor_embedding, compare_embedding, threshold)  

            if pred == 0 and ground_truth == 0: 
                self.predictions[threshold]['tn'] += 1  
            elif pred == 1 and ground_truth == 0: 
                self.predictions[threshold]['fp'] += 1 

            elif pred == 1 and ground_truth == 1: 
                self.predictions[threshold]['tp'] += 1 
            elif pred == 0 and ground_truth == 1: 
                self.predictions[threshold]['fn'] += 1  
    
    """ 
    Simple accuracy helper function 
    """
    def calc_acc(self, tp, tn, fp, fn):  
        return (tp+tn)/(tp+tn+fp+fn) 
    
    
    """ 
    Function that gets called after all runs 
    Here we calculate the accuracy for all thresholds and return the highest accuracy 
    
    This also helps us set the threshold for the pipeline function 
    """
    def calc_total_acc(self):  
        max_acc = 0  
        max_t = 0  
        
        for threshold in self.predictions: 
            acc = self.calc_acc(self.predictions[threshold]['tp'], self.predictions[threshold]['tn'], self.predictions[threshold]['fp'], self.predictions[threshold]['fn']) 
            if acc > max_acc:  
                print(f"Trehshold {threshold} > Acc: {acc}") 
                max_acc = acc 
                max_t = threshold
        
        return max_acc

Function for evaulating the loaded model

In [98]:
def evaluate(test_loader): 
        test_casia_loader = test_loader
        eval_total_loss = 0  
        perf_measure = PerformanceMeasurement() 
        
        with torch.no_grad():
            for i, data in enumerate(test_loader):
                image1, img1_path, image2, img2_path, ground_truth = data
                image1 = image1.cuda()
                image2 = image2.cuda()
                ground_truth = torch.tensor(ground_truth, dtype =torch.float).cuda()

                classout, anchor_embedding, compare_embedding = classification_model.forward(image1, image2)    
                
                perf_measure.add_new_run(anchor_embedding, compare_embedding, ground_truth)   
                
        return perf_measure.calc_total_acc() 

In [None]:
"""
Evaulate the loaded model

    :param test_loader: test loader
"""
evaluate(test_loader)