# Face Recognition

In this notebook we are going to create and train a model for the problem of face recognition. The first step is to create and visualize the dataset, that is going to be the CelebA dataset. Next, we are going to pretrain a model to classify those faces, and finally, that pretrained model is what we are going to use as a baseline to create a siamese neural network to solve this face recognition problem. 

In [None]:
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd
import PIL.Image as Image
from sklearn.

import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader

from torchvision import models, transforms

from glob import glob

from tqdm.notebook import tqdm

import os
import sys
import copy

## First, lets create our dataset and visualize the images

In [None]:
class CelebADataset(Dataset):
    def __init__(self, train=True, trans=None):
        data_path = "/media/carlo/16CA983262CDA30A/Datasets/CelebA/"
        images_path = data_path + "img_align_celeba/*"
        identities_table = pd.read_csv(data_path + "identity_CelebA.csv").values
        
        # create a dictonary to get the identities easier
        self.identities = {}
        for image_name, label in identities_table:
            self.identities[image_name] = label
            
        # get a list of all the paths to the images
        images = glob(images_path)

        if train:
            self.images = images[:180000]
        else:
            self.images = images[180000:]
            
        if trans is not None:
            self.transforms = trans
        else:
            self.transforms = transforms.ToTensor()
            
    def __getitem__(self, index):
        image_path = self.images[index]
        image = Image.open(image_path)
        image = self.transforms(image)
        
        image_name = image_path.split(os.sep)[-1]
        
        # the labels are base 1, that why the -1
        return image, torch.tensor(self.identities[image_name] - 1, dtype=torch.int64)
    
    def __len__(self):
        return len(self.images)

Now, lets define our train and test transformers. For train, we are going to use data augmentation to avoid overtraining

In [None]:
IMG_HEIGHT, IMG_WIDTH = 300, 300

train_transforms = transforms.Compose([
    transforms.Resize((IMG_HEIGHT, IMG_WIDTH)),
    transforms.RandomChoice([
        transforms.ColorJitter(),
        transforms.RandomResizedCrop((IMG_HEIGHT, IMG_WIDTH), (0.6, 1.0)),
        transforms.RandomGrayscale(),
        transforms.RandomRotation(20),
        transforms.RandomHorizontalFlip(),
    ]),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406],
                         std=[0.229, 0.224, 0.225])
])

test_transforms = transforms.Compose([
    transforms.Resize((IMG_HEIGHT, IMG_WIDTH)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406],
                         std=[0.229, 0.224, 0.225])
])

train_dataset = CelebADataset(True, train_transforms)
test_dataset = CelebADataset(False, test_transforms)

Now, lets plot some images to see the results

In [None]:
fig = plt.figure(figsize=(8, 8))
columns = 4
rows = 4
for i in range(1, columns*rows +1):
    img = train_dataset[i][0].numpy()
    img = np.moveaxis(img, 0, 2)
    img = np.clip(img, 0, 1)
    fig.add_subplot(rows, columns, i)
    plt.imshow(img)
plt.show()

The color issues are due to the normalization

Lets create a virtual dataloader to handle virtual epochs due to the large dataset

In [None]:
class VirtualDataLoader:

    def __init__(self, data_loader, steps_per_epoch: int = 1000):
        self.data_loader = data_loader
        self.iterator = iter(self.data_loader)
        self.steps_per_epoch = steps_per_epoch
        self.current_step = 0

    def __iter__(self):
        return self

    def __next__(self):
        if self.current_step < self.steps_per_epoch:
            self.current_step += 1
            try:
                return next(self.iterator)
            except StopIteration:
                self.iterator = iter(self.data_loader)
                return next(self.iterator)
        else:
            self.current_step = 0
            raise StopIteration
            
    def __len__(self):
        return self.steps_per_epoch

And now, create the data loaders

In [None]:
len(train_dataset) + len(test_dataset)

In [None]:
BATCH_SIZE = 25
TRAIN_STEPS_PER_EPOCH = 25
TEST_STEPS_PER_EPOCH = 25

raw_train_loader = DataLoader(
    train_dataset,
    BATCH_SIZE,
    shuffle=True,
    num_workers=8
)

raw_test_loader = DataLoader(
    test_dataset,
    BATCH_SIZE,
    num_workers=8
)

train_loader = VirtualDataLoader(raw_train_loader, TRAIN_STEPS_PER_EPOCH)
test_loader = VirtualDataLoader(raw_test_loader, TEST_STEPS_PER_EPOCH)

## Pretraining

In [None]:
# Get the number of labels 
labels = max(train_dataset.identities.values())
labels

Now, lets create the model to pretrain

In [None]:
class EfficientNet(nn.Module):
    def __init__(self, labels):
        super().__init__()
        self.base_model = models.efficientnet_b3(pretrained=True)
        self.base_model.classifier = nn.Sequential(
            nn.Dropout(p=0.3, inplace=True),
            nn.Linear(in_features=1536, out_features=labels, bias=True)
        )

    def forward(self, x):
        return self.base_model(x)

In [None]:
device = "cuda" if torch.cuda.is_available() else "cpu"

model = EfficientNet(labels).to(device)

Lets define our train function

In [None]:
def train(model: torch.nn.Module, train_set, test_set, epochs_, step, learning_rate, model_name: str, path_to_save: str,
          start_epoch_=0, adam: bool = True, patience: int = 5):
    """
    Train a siamese network with Adam or RMSProp optimizer and contrastive loss.
    :param model: torch.nn.Module
        The Pytorch model to train
    :param train_set: DataLoader
        The data to train the model
    :param test_set: DataLoader
        The data to test the efficiency of the model
    :param epochs_: int
        The number of epochs to train. If -1, it will train until the early stopping stops the training
    :param step: function
        A function that receives a data sample from the dataset and returns the loss
    :param learning_rate: float
        Starting learning rate used during the train phase
    :param model_name: str
        Specify the model name, just to save the files correctly
    :param path_to_save: str
        The path to save the model
    :param start_epoch_: int
        To continue the training. Indicates in what epoch the current train will start.
    :param adam: bool
        If true, uses Adam as optimizer, if false uses RMSProp
    :param patience: int
        The number of epochs used in the early stopping
    :return: (list, list)
        The train and test losses
    """
    losses_ = []
    test_losses_ = []
    best_score = float("inf")
    best_model = None
    not_improved = 0
    until_converge = False

    if adam:
        optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)
    else:
        optimizer = torch.optim.RMSprop(model.parameters(), lr=learning_rate)

    lr_scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=5, gamma=0.1)

    if start_epoch_ != 0:
        lr_scheduler.load_state_dict(torch.load(f"{path_to_save}/lr_scheduler_{start_epoch_ - 1}.pt"))

    if epochs_ == -1:
        epochs_ = start_epoch_ + 1
        until_converge = True
        
    # to scale and train in fp16
    scaler = torch.cuda.amp.GradScaler()

    model.train()

    epoch = start_epoch_
    while epoch < epochs_:
        sys.stdout.flush()
        epoch_loss = 0

        for sample in tqdm(train_set, total=len(train_set), desc='Train'):
            optimizer.zero_grad()

            with torch.cuda.amp.autocast():
                current_loss = step(model, *sample)
            
            scaler.scale(current_loss).backward()

            epoch_loss += current_loss.detach().cpu().item()

            scaler.step(optimizer)
            scaler.update()

        # Test step
        sys.stdout.flush()
        test_loss = 0
        with torch.no_grad():
            for sample in tqdm(test_set, total=len(test_set), desc="Test"):
                test_loss += step(model, *sample).detach().cpu().item()

        epoch_loss /= len(train_set)
        test_loss /= len(test_set)
        losses_.append(epoch_loss)
        test_losses_.append(test_loss)

        torch.save(model.state_dict(), f'{path_to_save}/{model_name}_{epoch + 1}.pt')
        torch.save(lr_scheduler.state_dict(), f'{path_to_save}/lr_scheduler_{epoch + 1}.pt')

        # Get the best model
        if test_loss < best_score:
            best_score = test_loss
            best_model = copy.deepcopy(model.state_dict())
            not_improved = 0

        else:
            not_improved += 1

        if until_converge:
            epochs_ += 1
            message = f'Epoch: {epoch + 1}, Loss: {epoch_loss:.4f}, ' + \
                      f'Test loss: {test_loss:.4f}, Epochs w/o improvement: {not_improved}\n'
            full_message = f'Epoch: {epoch + 1}, Loss: {epoch_loss}, ' + \
                           f'Test loss: {test_loss}, Epochs w/o improvement: {not_improved}\n'
        else:
            message = f'Epoch: {epoch + 1}/{epochs_}, Loss: {epoch_loss:.4f}, ' + \
                      f'Test loss: {test_loss:.4f}, Epochs w/o improvement: {not_improved}\n'
            full_message = f'Epoch: {epoch + 1}/{epochs_}, Loss: {epoch_loss}, ' + \
                           f'Test loss: {test_loss}, Epochs w/o improvement: {not_improved}\n'
        sys.stdout.flush()
        sys.stdout.write(message)

        with open(f'{path_to_save}/train_{model_name}.log', 'a') as f:
            f.write(full_message)
            f.close()

        # Early stopping
        if not_improved == patience:
            break

        epoch += 1

    torch.save(best_model, f'{path_to_save}/best-{model_name}.pt')
    return losses_, test_losses_

In [None]:
def get_accuracy(labels, predictions) :

And implement a step of the face classification training process

In [None]:
def classification_train_step(model, images, labels):
    images = images.to(device)
    labels = labels.to(device)

    predictions = model(images)

    return F.cross_entropy(predictions, labels)

In [None]:
losses, test_losses = train(
    model=model,
    train_set=train_loader,
    test_set=test_loader,
    epochs_=20,
    step=classification_train_step,
    learning_rate=0.001,
    model_name="b3_classification",
    path_to_save="./models",
    patience=-1
)

In [None]:
losses
# plt.figure(figsize=(8, 8))
# plt.plot(losses[0], losses[1], c='red', label='train loss')
# plt.plot(test_losses[:, 0], test_losses[:, 1], c='blue', label='test loss')
# plt.xlabel('Steps')
# plt.ylabel('Loss')
# plt.legend()
# plt.show()