## We will compare the effect of the choice of loss on the training of the facenet model

Import the necessary libraries

In [1]:
import numpy as np

from torchvision.datasets import ImageFolder
from torchvision import transforms

from facenet_pytorch import MTCNN, fixed_image_standardization, training, extract_face
from facenet_pytorch import InceptionResnetV1, training
from torch.optim import Adam
import utils

from torch.utils.data import DataLoader, SubsetRandomSampler, SequentialSamplerC
from losses.triplet_loss import TripletLoss

import tqdm
import torch

data_dir = 'lfw_cropped'
device = 'cuda' if torch.cuda.is_available() else 'cpu'
batch_size = 32

## Detect the faces in the LFW dataset and save them in a new folder
- The LFW dataset contains images of celebrities in different poses and lighting conditions
- We will use the MTCNN face detector to detect the faces in the images and save them in a new folder
- We will use the saved images to train the facenet model

In [2]:
# create dataset and data loaders from cropped images output from MTCNN
trans = transforms.Compose([
    np.float32,
    transforms.ToTensor(),
    fixed_image_standardization
])

triplet_dataset = TripletsDataset(csv_file='lfw_cropped_annots.csv')

embed_loader = DataLoader(
    triplet_dataset,
    num_workers=4,
    pin_memory=True,
    batch_size=batch_size,
    sampler=SequentialSampler(triplet_dataset)
)

In [6]:
# Create an inception resnet (in train mode):
resnet = InceptionResnetV1(
    classify=False,
    num_classes=len(triplet_dataset.class_to_idx)
    ).to(device)

# Train the model for 5 epochs with triplet loss
optimizer = torch.optim.AdamW(resnet.parameters(), lr=0.001)
scheduler = torch.optim.lr_scheduler.MultiStepLR(optimizer, [5, 10])

# Define the triplet loss function
triplet_loss = TripletLoss(margin=14).to(device)

loss_fn = triplet_loss
metrics = {
    'fps': training.BatchTimer(),
    'acc': training.accuracy
}

### Define a Dataset class for the contrastive loss function

In [14]:
from torch.utils.data import Dataset
import pandas as pd
from PIL import Image


class ContrastiveDataset(Dataset):
    def __init__(self, csv_file, transform=None):
        """
        Args:
            csv_file (string): Path to the csv file with paths.
            transform (callable, optional): Optional transform to be applied
                on a sample.
        """
        annotations = pd.read_csv(csv_file)
        self.paths = annotations["path"].values
        self.labels = annotations["label"].values
        self.transform = transform
        self.class_to_idx = {cls: i for i, cls in enumerate(set(self.labels))}


    def __len__(self):
        return len(self.labels)

    def __getitem__(self, index):
        img_path = self.paths[index]
        img = Image.open(img_path) # np.array(Image.open(img_path))
        img = np.array(img)
        label = self.labels[index]

        return img, label


In [18]:
contrastive_loss = SupervisedContrastiveLoss()

dataset = ContrastiveDataset(csv_file='lfw_cropped_annots.csv')

loader = DataLoader(
    dataset,
    num_workers=4,
    pin_memory=True,
    batch_size=batch_size,
    shuffle=True
)

In [20]:
def train_net_contrastive_loss(model, loss_fn, optimizer, scheduler, num_epochs, dataloaders, dataset_sizes, device, fold):
    model.train()

    for epoch in tqdm.tqdm(range(num_epochs)):
        for batch in dataloaders:
            optimizer.zero_grad()
            img, label = batch
            img = img.to(device)
            # label = torch.Tensor(label).to(device)
            embedding = model(img)
            print(embedding)
            loss = loss_fn(embedding, label)
            loss.backward()
            optimizer.step()

        scheduler.step()
        print(f'Epoch: {epoch}, Loss: {loss.item()}')


train_net_contrastive_loss(model=resnet,
                           loss_fn=contrastive_loss,
                           optimizer=optimizer,
                           scheduler=scheduler,
                           num_epochs=10,
                           dataloaders=loader,
                           dataset_sizes=len(dataset),
                           device=device,
                           fold=0)

  0%|          | 0/10 [00:00<?, ?it/s]


RuntimeError: Given groups=1, weight of size [32, 3, 3, 3], expected input[32, 160, 160, 3] to have 3 channels, but got 160 channels instead

In [4]:

def train_net_triplet_loss(model, loader, optimizer, scheduler, epochs):
    model.train()
    
    for epoch in tqdm.tqdm(range(epochs)):
        for batch in loader:
            # model.zero_grad()  # what's the difference to optimizer.zero_grad()?
            anchor_embedding = model(batch["anchor"].to(device))
            positive_embedding = model(batch["positive"].to(device))
            negative_embedding = model(batch["negative"].to(device))
            loss = triplet_loss(anchor_embedding,
                                 positive_embedding,
                                 negative_embedding)
            loss.backward()
            optimizer.step()

        scheduler.step()
        print(f'Epoch: {epoch}, Loss: {loss.item()}')

            
# Creates once at the beginning of training
train_net_triplet_loss(model=resnet,
                       loader=embed_loader,
                       optimizer=optimizer,
                       scheduler=scheduler,
                       epochs=10)

 10%|█         | 1/10 [01:57<17:38, 117.63s/it]

Epoch: 0, Loss: 14.602161407470703


 20%|██        | 2/10 [03:50<15:18, 114.87s/it]

Epoch: 1, Loss: 12.94651985168457


 30%|███       | 3/10 [05:43<13:18, 114.04s/it]

Epoch: 2, Loss: 13.415044784545898


 40%|████      | 4/10 [07:37<11:23, 113.89s/it]

Epoch: 3, Loss: 13.542312622070312


 50%|█████     | 5/10 [09:31<09:29, 113.85s/it]

Epoch: 4, Loss: 13.208274841308594


 60%|██████    | 6/10 [11:25<07:35, 113.93s/it]

Epoch: 5, Loss: 13.303377151489258


 70%|███████   | 7/10 [13:19<05:42, 114.08s/it]

Epoch: 6, Loss: 13.221366882324219


 80%|████████  | 8/10 [15:14<03:48, 114.25s/it]

Epoch: 7, Loss: 13.000764846801758


 90%|█████████ | 9/10 [17:08<01:54, 114.40s/it]

Epoch: 8, Loss: 13.443133354187012


100%|██████████| 10/10 [19:03<00:00, 114.37s/it]

Epoch: 9, Loss: 13.470927238464355





In [6]:
# Save model
torch.save(resnet, 'resnet_10_epochs.pt')

del resnet
torch.cuda.empty_cache()

# Ignore for now

In [None]:
# Define the transforms to use for the LFW dataset
transform = transforms.Compose([
    transforms.Resize((96, 96)),
    transforms.ToTensor()
])


train_ds = ImageFolder(data_dir + 'train')

val_ds = ImageFolder(data_dir + 'val')


train_loader = DataLoader(train_ds,
                          batch_size,
                          shuffle=True,
                          num_workers=4,
                          pin_memory=True)

val_loader = DataLoader(val_ds,
                        batch_size,
                        num_workers=4,
                        pin_memory=True)


In [None]:
# Triplet loader
import random

# Define a custom collate function to create triplets of images
def triplet_collate_fn(batch):
    # Create a list to store the triplets
    triplets = []

    # Loop over the batch
    for i in range(len(batch)):
        # Select a random anchor image
        anchor_img, anchor_label = batch[0][i], batch[1][i]
        anchor_img = anchor_img.unsqueeze(0)

        # Select a positive image with the same class as the anchor image
        positive_imgs = [img
                         for img, label in zip(batch[0], batch[1])
                         if label == anchor_label and img is not anchor_img]
        positive_img = random.choice(positive_imgs)
        positive_img = positive_img.unsqueeze(0)

        # Select a negative image with a different class than the anchor image
        negative_imgs = [img for img, label in zip(batch[0], batch[1])
                         if label != anchor_label]
        negative_img = random.choice(negative_imgs)
        negative_img = negative_img.unsqueeze(0)

        # Add the triplet to the list
        triplet = (anchor_img, positive_img, negative_img)
        triplets.append(triplet)

    # Combine the triplets into a batch
    triplets = torch.cat(triplets, dim=0)

    return triplets

# Define the data loader for the LFW dataset with the triplet collate function
lfw_dataloader = DataLoader(train_ds,
                            batch_size=32,
                            shuffle=True,
                            num_workers=4,
                            collate_fn=triplet_collate_fn)

In [None]:
for batch in train_loader:
    # Create a list to store the triplets
    triplets = []

    # Loop over the batch
    for i in range(len(batch)):
        # Select a random anchor image
        anchor_img, anchor_label = batch[0][i], batch[1][i]
        anchor_img = anchor_img.unsqueeze(0)

        # Select a positive image with the same class as the anchor image
        positive_imgs = [img
                         for img, label in zip(batch[0], batch[1])
                         if label == anchor_label and img is not anchor_img]
        positive_img = random.choice(positive_imgs)
        positive_img = positive_img.unsqueeze(0)

        # Select a negative image with a different class than the anchor image
        negative_imgs = [img for img, label in zip(batch[0], batch[1])
                         if label != anchor_label]
        negative_img = random.choice(negative_imgs)
        negative_img = negative_img.unsqueeze(0)

        # Add the triplet to the list
        triplet = (anchor_img, positive_img, negative_img)
        triplets.append(triplet)

    break

triplets

## Define the training loop

In [None]:
# Define the triplet loss function
triplet_loss = TripletLoss(margin=0.2)

# Define the optimizer
optimizer = Adam(resnet18_model.parameters(), lr=0.001)

# Train the model on the LFW dataset with the triplet loss
for epoch in range(10):
    for i, (anchor, positive, negative) in enumerate(lfw_dataloader):
        optimizer.zero_grad()
        anchor_emb = resnet18_model(anchor)
        positive_emb = resnet18_model(positive)
        negative_emb = resnet18_model(negative)
        loss = triplet_loss(anchor_emb, positive_emb, negative_emb)
        loss.backward()
        optimizer.step()
        print('Epoch [{}/{}], Step [{}/{}], Loss: {:.4f}'.format(epoch+1, 10, i+1, len(train_loader), loss.item()))


In [None]:
batch