# Face recognition siamese model

## Imports

In [None]:
import os

import torch
from torch import nn
from torch.utils.data import DataLoader
from torch.utils.data import Dataset
from torch.utils.data import Subset
from torch.utils.data import random_split

from torchvision import datasets
from torchvision.transforms import ToTensor, Lambda, Compose, RandomHorizontalFlip, Resize

from torchdata.datapipes.iter import Zipper, IterableWrapper

import matplotlib.pyplot as plt
import pandas as pd
import numpy as np

## Data pre-processing

In [None]:
ROOT_PATH = os.path.join('data')
POS_PATH = os.path.join('data', 'positive')
NEG_PATH = os.path.join('data', 'negative')
ANC_PATH = os.path.join('data', 'anchor')

In [None]:
img_transforms = Compose([
    RandomHorizontalFlip(),
    ToTensor(),
    Resize((105, 105)),
])
full_dataset: datasets.ImageFolder = datasets.ImageFolder(root=ROOT_PATH, transform=img_transforms)

dataset = [img_paths, labels]

Labels:
- anchor = 0
- positive = 1
- negative = 2

In [None]:
# array of booleans of where the anchor, positive, and negative images are in the dataset
is_anchor : bool = torch.tensor(full_dataset.targets) == 0
is_negative :bool = torch.tensor(full_dataset.targets) == 1
is_positive : bool = torch.tensor(full_dataset.targets) == 2

# extract the anchor, positive, and negative img indices
anchor_indices : torch.Tensor = is_anchor.nonzero().flatten()
negative_indices : torch.Tensor = is_negative.nonzero().flatten()
positive_indices : torch.Tensor = is_positive.nonzero().flatten()

# create the anchor, positive, and negative datasets
anchor_dataset : Subset = Subset(full_dataset, anchor_indices)
negative_dataset : Subset = Subset(full_dataset, negative_indices)
positive_dataset : Subset = Subset(full_dataset, positive_indices)

Now, the datasets are [(img, label), (img, label), (img, label), ...]. We need them to be [img, img, img, ...] only, no label needed since they are already split by label.

In [None]:
# WARNING: this takes about 2 minutes to run, please only run it once
anchor_dataset : list = [sublist[0] for sublist in list(anchor_dataset)]
negative_dataset : list = [sublist[0] for sublist in list(negative_dataset)]
positive_dataset : list = [sublist[0] for sublist in list(positive_dataset)]

In [None]:
# zip the anchor, positive, and negative datasets together
zipped_pos_dataset : list = Zipper(IterableWrapper(anchor_dataset), IterableWrapper(positive_dataset), IterableWrapper(torch.ones(len(anchor_dataset))))
zipped_neg_dataset : list = Zipper(IterableWrapper(anchor_dataset), IterableWrapper(negative_dataset), IterableWrapper(torch.zeros(len(anchor_dataset))))

zipped_pos_dataset : list = list(zipped_pos_dataset)
zipped_neg_dataset : list = list(zipped_neg_dataset)

Now the `zipped_pos_dataset` has the format: `[(anchor, positive, 1), (anchor, positive, 1), ...]`. Here the 1 is the label of the pair, which signifies that the image is positive, meaning from the same person as anchor. So the `zipped_neg_dataset` has 0s instead of 1s and negative images instead of positive.

In [None]:
# Combine the positive and negative datasets and shuffle them
final_dataset : list = zipped_pos_dataset + zipped_neg_dataset
np.random.shuffle(final_dataset)

## Data loading

In [135]:
batch_size = 4

# split between training and testing 80-20
train_set, test_set = random_split(final_dataset, [int(len(final_dataset) * 0.8), len(final_dataset) - int(len(final_dataset) * 0.8)])
train_dataloader : DataLoader = DataLoader(train_set, batch_size=batch_size)
test_dataloader : DataLoader = DataLoader(test_set, batch_size=batch_size)

len(dataloader) = number of batches = total imgs in final_dataset / batch_size

Data is organized inside `dataloader` as follows: [anchors, pos/neg imgs, label]
- img is of shape [3, 224, 224], since there is _batch_size_ (currently 64) images in a batch, the shape of anc/pos/neg imgs is [64, 3, 224, 224]
- label is either 0 or 1: 0 for negative, 1 for positive

Note: run the next block to confirm

In [None]:
first_batch = train_dataloader._get_iterator().__next__()

# N = batch size, C = color channels, H = height, W = width
print("Shape of data [N, C, H, W]: ", first_batch[0].shape)
print("Shape of labels: ", first_batch[2].shape, first_batch[1].dtype)

### Visualizing the data for debugging

Here is an example of how to visualize an image

In [None]:
# first image is at first_batch[0][0]
# we need to change tesor ordering for plt.imshow using permute
plt.imshow(first_batch[0][0].permute(1, 2, 0))

# How to access different batches:
# it = train_dataloader._get_iterator()
# first_batch = it._next_data()
# second_second = it._next_data()

## Model

### Embedding layer

In [None]:
# Get cpu or gpu device for training.
device = "cuda" if torch.cuda.is_available() else "cpu"
print("Using {} device".format(device))

In [116]:
class EmbeddingNetwork(nn.Module):
    def __init__(self):
        super(EmbeddingNetwork, self).__init__()
        
        # layers
        # first layer: 3 input channels, 64 output channels
        self.l1 = nn.Conv2d(3, 64, 10, padding=1)
        self.a1 = nn.ReLU()
        self.p1 = nn.MaxPool2d(2)
        
        # second layer: 64 input channels, 128 output channels
        self.l2 = nn.Conv2d(64, 128, 7, padding=1)
        self.a2 = nn.ReLU()
        self.p2 = nn.MaxPool2d(2)
        
        # third layer: 128 input channels, 128 output channels
        self.l3 = nn.Conv2d(128, 128, 4, padding=1)
        self.a3 = nn.ReLU()
        self.p3 = nn.MaxPool2d(2)
        
        # fourth layer: 128 input channels, 256 output channels
        self.l4 = nn.Conv2d(128, 256, 4, padding=1)
        self.a4 = nn.ReLU()
        self.p4 = nn.Flatten()

    def forward(self, x):
        """Pass the input tensor through the embeddding network.

        Args:
            x: input tensor, 3 channels, 105x105 pixels

        Returns:
            torch.Tensor: output tensor, 4096 channels
        """
        x = self.l1(x)
        x = self.a1(x)
        x = self.p1(x)
        
        x = self.l2(x)
        x = self.a2(x)
        x = self.p2(x)
        
        x = self.l3(x)
        x = self.a3(x)
        x = self.p3(x)
        
        x = self.l4(x)
        x = self.a4(x)
        x = self.p4(x)
        
        return x

### Siamese network

In [122]:
class SiameseNetwork(nn.Module):
    def __init__(self):
        super(SiameseNetwork, self).__init__()
        
        # embedding layer
        self.embedding_layer = EmbeddingNetwork()
        
        # fully connected classification layer
        # 2 classes: 0 (negative) and 1 (positive)
        self.feature_vector = nn.Linear(20736, 4096)
        self.classification_layer = nn.Linear(4096, 2)
        
    def forward(self, anchor, db_image):
        """Pass the input tensor through the siamese network.

        Args:
            anchor (torch.Tensor): input image (from webcam), 3 channels, 105x105 pixels
            db_image (torch.Tensor): target image (from database), 3 channels, 105x105 pixels

        Returns:
            torch.Tensor: output tensor, 1 channel
        """
        # pass through embedding layer
        anchor = self.embedding_layer(anchor)
        db_image = self.embedding_layer(db_image)
        
        # calculate the absolute difference between the two embeddings
        dist = torch.abs(anchor - db_image)
        
        # pass through fully connected classification layer
        x = self.feature_vector(dist)
        x = self.classification_layer(x)
        
        return x

### Loss and optimizer

In [123]:
model = SiameseNetwork().to(device)
print(model)

SiameseNetwork(
  (embedding_layer): EmbeddingNetwork(
    (l1): Conv2d(3, 64, kernel_size=(10, 10), stride=(1, 1), padding=(1, 1))
    (a1): ReLU()
    (p1): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
    (l2): Conv2d(64, 128, kernel_size=(7, 7), stride=(1, 1), padding=(1, 1))
    (a2): ReLU()
    (p2): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
    (l3): Conv2d(128, 128, kernel_size=(4, 4), stride=(1, 1), padding=(1, 1))
    (a3): ReLU()
    (p3): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
    (l4): Conv2d(128, 256, kernel_size=(4, 4), stride=(1, 1), padding=(1, 1))
    (a4): ReLU()
    (p4): Flatten(start_dim=1, end_dim=-1)
  )
  (feature_vector): Linear(in_features=20736, out_features=4096, bias=True)
  (classification_layer): Linear(in_features=4096, out_features=2, bias=True)
)


In [127]:
# using cross entropy loss function and adam optimizer
loss_fn = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.0001)

## Training & testing

In [128]:
def train(dataloader, model, loss_fn, optimizer):
    # Get the size of the dataset
    size = len(dataloader.dataset)
    # Put the model in training mode
    model.train()
    # Loop over the dataset
    for batch, (X, Y, z) in enumerate(dataloader):
        X, Y, z = X.to(device), Y.to(device), z.to(device)
        
        z = z.type(torch.LongTensor)

        pred = model(X, Y)
        loss = loss_fn(pred, z)

        # Backpropagation
        # Disable the gradient calculation for the model parameters
        optimizer.zero_grad()
        # Compute the gradient of the loss with respect to the model parameters
        loss.backward()
        # Update the model parameters
        optimizer.step()

        # Print the loss every 100 batches
        if batch % 100 == 0:
            loss, current = loss.item(), batch * len(X)
            print(f"loss: {loss:>7f}  [{current:>5d}/{size:>5d}]")

def test(dataloader, model, loss_fn):
    # Get the size of the dataset
    size = len(dataloader.dataset)
    # Get the number of batches
    num_batches = len(dataloader)
    # Put the model in evaluation mode
    model.eval()
    test_loss, correct = 0, 0
    with torch.no_grad():
        # Loop over the dataset
        for X, Y, z in dataloader:
            X, Y, z = X.to(device), Y.to(device), z.to(device)
            z = z.type(torch.LongTensor)
            pred = model(X, Y)
            test_loss += loss_fn(pred, z).item()
            # Add the output value (1 or 0) to the correct variable
            correct += (pred.argmax(1) == z).type(torch.float).sum().item()
    # Compute the average loss and accuracy
    test_loss /= num_batches
    correct /= size
    print(f"Test Error: \n Accuracy: {(100*correct):>0.1f}%, Avg loss: {test_loss:>8f} \n")

In [129]:
epochs = 5
for t in range(epochs):
    print(f"Epoch {t+1}\n-------------------------------")
    # Train the model
    train(train_dataloader, model, loss_fn, optimizer)p
    # Test the model
    test(test_dataloader, model, loss_fn)
print("Done!")

Epoch 1
-------------------------------
loss: 0.703034  [    0/  223]
loss: 1.996190  [  100/  223]
loss: 0.240953  [  200/  223]
Test Error: 
 Accuracy: 91.1%, Avg loss: 0.298442 

Epoch 2
-------------------------------
loss: 0.347941  [    0/  223]
loss: 1.526834  [  100/  223]
loss: 0.267948  [  200/  223]
Test Error: 
 Accuracy: 83.9%, Avg loss: 0.333829 

Epoch 3
-------------------------------
loss: 0.598308  [    0/  223]
loss: 0.019489  [  100/  223]
loss: 0.052453  [  200/  223]
Test Error: 
 Accuracy: 89.3%, Avg loss: 0.238684 

Epoch 4
-------------------------------
loss: 0.210626  [    0/  223]
loss: 0.008171  [  100/  223]
loss: 0.032453  [  200/  223]
Test Error: 
 Accuracy: 82.1%, Avg loss: 0.343465 

Epoch 5
-------------------------------
loss: 0.014554  [    0/  223]
loss: 0.382451  [  100/  223]
loss: 0.423024  [  200/  223]
Test Error: 
 Accuracy: 87.5%, Avg loss: 0.232769 

Done!


In [130]:
# Saving the model in a file, we will use it in the next cell
torch.save(model.state_dict(), "model.pth")
print("Saved PyTorch Model State to model.pth")

Saved PyTorch Model State to model.pth


In [141]:
model = SiameseNetwork()
model.load_state_dict(torch.load("model.pth"))

model.eval()

x, y, z = enumerate(test_dataloader).__next__()[1]

print(x.shape, y.shape, z.shape)

# plt.imshow(x.permute(1, 2, 0))

for i in range(batch_size):
    with torch.no_grad():
        pred = model(x[i].unsqueeze(0), y[i].unsqueeze(0))
        predicted, actual = pred[0].argmax(0), z[0]
        print(f'Predicted: "{predicted}", Actual: "{actual}"')

torch.Size([4, 3, 105, 105]) torch.Size([4, 3, 105, 105]) torch.Size([4])
Predicted: "0", Actual: "0.0"
Predicted: "1", Actual: "0.0"
Predicted: "0", Actual: "0.0"
Predicted: "0", Actual: "0.0"
