In [None]:
from google.colab import drive
drive.mount("/content/gdrive")

Mounted at /content/gdrive


In [None]:
!cp /content/gdrive/MyDrive/kaggle.json /content

# Preparations
- Download dataset for training, validating. We train with CASIA-webface from kaggle and test with self-collect our friend and friends of our friend faces (called by ThreeF dataset).

In [None]:
# %%capture
!git clone https://github.com/HarikNguyen/ThreeF.git

Cloning into 'ThreeF'...
remote: Enumerating objects: 1398, done.[K
remote: Total 1398 (delta 0), reused 0 (delta 0), pack-reused 1398[K
Receiving objects: 100% (1398/1398), 42.99 MiB | 15.31 MiB/s, done.
Resolving deltas: 100% (19/19), done.


In [None]:
import os
import pandas as pd
import numpy as np
os.environ['KAGGLE_CONFIG_DIR'] = "/content"

In [None]:
%cd /content

/content


In [None]:
!kaggle datasets download -d ntl0601/casia-webface

Downloading casia-webface.zip to /content
100% 2.53G/2.53G [02:00<00:00, 24.2MB/s]
100% 2.53G/2.53G [02:00<00:00, 22.6MB/s]


In [None]:
%%capture
!unzip \*.zip  && rm *.zip

# Create dataloaders

## Get imgage list in casia_webface dataset

In [None]:
# define get leaf path
def get_leaf_paths(root_dir):
  leaf_paths = []

  for dirpath, dirnames, filenames in os.walk(root_dir):
    if not dirnames and filenames:
      for filename in filenames:
        leaf_path = os.path.join(dirpath, filename)
        leaf_paths.append(leaf_path)

  return leaf_paths

In [None]:
# get img_list in casia_webface dataset
casia_webface_df = pd.read_csv("/content/casia-webface.txt",
                 sep="\s+",
                 header=None)
casia_webface_paths = "/content/" + casia_webface_df[1]

# get img_list in ThreeF dataset

threeF_paths = get_leaf_paths("/content/ThreeF")

In [None]:
casia_webface_df.iloc[100000][0]

1336

In [None]:
# print the number of person
num_o_pers = casia_webface_df[0].unique().shape[0]
num_o_pers

10537

In [None]:
M = np.max(casia_webface_df[0].value_counts())
m = np.min(casia_webface_df[0].value_counts())
avg = np.average(casia_webface_df[0].value_counts())
print(f"max: {M}", f"min: {m}", f"average: {avg}")

max: 786 min: 2 average: 43.69478978836481


In [None]:
threeF_paths = np.array(threeF_paths)
threeF_paths.shape

(661,)

In [None]:
casia_webface_paths.shape

(460412,)

# Define MobileNet model and Arcface loss

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torchvision.transforms as transforms
from torchvision.datasets import ImageFolder
from torch.utils.data import DataLoader
from torch.nn import init

## MobileNet

In [None]:
class Hswish(nn.Module):
  def forward(self, x):
    out = x * F.relu6(x + 3, inplace=True) / 6
    return out

class Hsigmoid(nn.Module):
  def forward(self, x):
    out = F.relu6(x + 3, inplace=True) / 6
    return out

class SeModule(nn.Module):
  def __init__(self, in_size, reduction=4):
    super(SeModule, self).__init__()
    expand_size =  max(in_size // reduction, 8)
    self.se = nn.Sequential(
      nn.AdaptiveAvgPool2d(1),
      nn.Conv2d(in_size,
                expand_size,
                kernel_size=1,
                bias=False),
      nn.BatchNorm2d(expand_size),
      nn.ReLU(inplace=True),
      nn.Conv2d(expand_size,
                in_size,
                kernel_size=1,
                bias=False),
      nn.Hardsigmoid()
    )

  def forward(self, x):
    return x * self.se(x)

class Block(nn.Module):
  '''expand + depthwise + pointwise'''
  def __init__(self,
               kernel_size,
               in_size,
               expand_size,
               out_size,
               act,
               se,
               stride):
    super(Block, self).__init__()
    self.stride = stride

    self.conv1 = nn.Conv2d(in_size,
                           expand_size,
                           kernel_size=1,
                           bias=False)
    self.bn1 = nn.BatchNorm2d(expand_size)
    self.act1 = act(inplace=True)

    self.conv2 = nn.Conv2d(expand_size,
                           expand_size,
                           kernel_size=kernel_size,
                           stride=stride,
                           padding=kernel_size//2,
                           groups=expand_size,
                           bias=False)
    self.bn2 = nn.BatchNorm2d(expand_size)
    self.act2 = act(inplace=True)
    self.se = SeModule(expand_size) if se else nn.Identity()

    self.conv3 = nn.Conv2d(expand_size,
                           out_size,
                           kernel_size=1,
                           bias=False)
    self.bn3 = nn.BatchNorm2d(out_size)
    self.act3 = act(inplace=True)

    self.skip = None
    if stride == 1 and in_size != out_size:
      self.skip = nn.Sequential(
        nn.Conv2d(in_size,
                  out_size,
                  kernel_size=1,
                  bias=False),
        nn.BatchNorm2d(out_size)
      )

    if stride == 2 and in_size != out_size:
      self.skip = nn.Sequential(
        nn.Conv2d(in_channels=in_size,
                  out_channels=in_size,
                  kernel_size=3,
                  groups=in_size,
                  stride=2,
                  padding=1,
                  bias=False),
        nn.BatchNorm2d(in_size),
        nn.Conv2d(in_size,
                  out_size,
                  kernel_size=1,
                  bias=True),
        nn.BatchNorm2d(out_size)
      )

    if stride == 2 and in_size == out_size:
        self.skip = nn.Sequential(
        nn.Conv2d(in_channels=in_size,
                  out_channels=out_size,
                  kernel_size=3,
                  groups=in_size,
                  stride=2,
                  padding=1,
                  bias=False),
        nn.BatchNorm2d(out_size)
      )

  def forward(self, x):
    skip = x

    out = self.act1(self.bn1(self.conv1(x)))
    out = self.act2(self.bn2(self.conv2(out)))
    out = self.se(out)
    out = self.bn3(self.conv3(out))

    if self.skip is not None:
      skip = self.skip(skip)
    return self.act3(out + skip)

class MobileNetV3_Small(nn.Module):
  def __init__(self,
               num_classes=1000,
               act=nn.Hardswish):
    super(MobileNetV3_Small, self).__init__()
    self.conv1 = nn.Conv2d(3,
                           16,
                           kernel_size=3,
                           stride=2,
                           padding=1,
                           bias=False)
    self.bn1 = nn.BatchNorm2d(16)
    self.hs1 = act(inplace=True)

    self.bneck = nn.Sequential(
      Block(3, 16, 16, 16, nn.ReLU, True, 2),
      Block(3, 16, 72, 24, nn.ReLU, False, 2),
      Block(3, 24, 88, 24, nn.ReLU, False, 1),
      Block(5, 24, 96, 40, act, True, 2),
      Block(5, 40, 240, 40, act, True, 1),
      Block(5, 40, 240, 40, act, True, 1),
      Block(5, 40, 120, 48, act, True, 1),
      Block(5, 48, 144, 48, act, True, 1),
      Block(5, 48, 288, 96, act, True, 2),
      Block(5, 96, 576, 96, act, True, 1),
      Block(5, 96, 576, 96, act, True, 1),
    )

    self.conv2 = nn.Conv2d(96,
                           576,
                           kernel_size=1,
                           stride=1,
                           padding=0,
                           bias=False)
    self.bn2 = nn.BatchNorm2d(576)
    self.hs2 = act(inplace=True)
    self.gap = nn.AdaptiveAvgPool2d(1)

    self.linear3 = nn.Linear(576, 1280, bias=False)
    self.bn3 = nn.BatchNorm1d(1280)
    self.hs3 = act(inplace=True)
    self.drop = nn.Dropout(0.2)
    self.linear4 = nn.Linear(1280, num_classes)
    self.init_params()

  def init_params(self):
    for m in self.modules():
      if isinstance(m, nn.Conv2d):
        init.kaiming_normal_(m.weight, mode='fan_out')
        if m.bias is not None:
          init.constant_(m.bias, 0)
      elif isinstance(m, nn.BatchNorm2d):
        init.constant_(m.weight, 1)
        init.constant_(m.bias, 0)
      elif isinstance(m, nn.Linear):
        init.normal_(m.weight, std=0.001)
        if m.bias is not None:
          init.constant_(m.bias, 0)

  def forward(self, x):
    out = self.hs1(self.bn1(self.conv1(x)))
    out = self.bneck(out)

    out = self.hs2(self.bn2(self.conv2(out)))
    out = self.gap(out).flatten(1)
    out = self.drop(self.hs3(self.bn3(self.linear3(out))))

    return self.linear4(out)

In [None]:
class ArcFaceLoss(nn.Module):
  def __init__(self,
               embedding_size,
               num_classes,
               margin=0.5,
               scale=64):
    super(ArcFaceLoss, self).__init__()
    self.embedding_size = embedding_size
    self.num_classes = num_classes
    self.margin = margin
    self.scale = scale

    self.weights = nn.Parameter(torch.FloatTensor(num_classes, embedding_size))
    nn.init.xavier_uniform_(self.weights)

  def forward(self, embeddings, targets):
    # Normalize the input embeddings and weights
    embeddings = F.normalize(embeddings, p=2, dim=1)
    weights = F.normalize(self.weights, p=2, dim=1)

    # Compute the cosine similarity between embeddings and weights
    cos_theta = torch.matmul(embeddings, weights.t())
    cos_theta = torch.clamp(cos_theta, -1.0, 1.0)

    # Get the target class weights
    target_weights = weights[targets]

    # Compute the margin term
    theta = torch.acos(cos_theta)
    margin_theta = theta + self.margin

    # Compute the final logits with the margin term
    logits = torch.cos(margin_theta)
    logits *= self.scale

    # Compute the cross-entropy loss
    loss = F.cross_entropy(logits, targets)

    return loss

In [None]:
# Define the transformation for resizing the images
transform = transforms.Compose([
    transforms.Resize((112, 112)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406],
                         std=[0.229, 0.224, 0.225])
])

In [None]:
# Create an instance of the ImageFolder dataset
dataset_dir = "casia-webface"
dataset = ImageFolder(root=dataset_dir, transform=transform)

In [None]:
# Create a dataloader using the ImageFolder dataset
batch_size = 1280
shuffle = True
dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=shuffle)

In [None]:
num_classes = len(dataset.classes)

In [None]:
# define model
model = MobileNetV3_Small(num_classes)

In [None]:
# Define the loss function and the optimizer.
criterion = ArcFaceLoss(
    embedding_size=model.linear4.out_features,
    num_classes=num_classes,
    margin=0.5,
    scale=64,)
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
lr_scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=10, gamma=0.1)

# Trainning Loop

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = model.to(device)
criterion = criterion.to(device)

In [None]:
%%capture
model.train()

In [None]:
# If you don't want to train, please load model.pth (weight) to model
PRE_TRAINED = False

In [None]:
if not PRE_TRAINED:
  EPOCH_NUM = 15
  for epoch in range(EPOCH_NUM):
    print(f"Epoch {epoch}/{EPOCH_NUM}")
    for batch_idx, (inputs, targets) in enumerate(dataloader):
      inputs, targets = inputs.to(device), targets.to(device)

      optimizer.zero_grad()

      # Forward pass
      embeddings = model(inputs)

      loss = criterion(embeddings, targets)

      # Backward pass and optimization
      loss.backward()
      optimizer.step()

      if batch_idx % 100 == 0:
        print(f"Batch {batch_idx}/{len(dataloader)}, Loss: {loss.item()}")

    # Update the learning rate scheduler
    lr_scheduler.step()

Epoch 0/15
Batch 0/384, Loss: 9.400459289550781
Batch 100/384, Loss: 9.047263145446777
Batch 200/384, Loss: 8.981632232666016
Batch 300/384, Loss: 8.887411117553711
Epoch 1/15
Batch 0/384, Loss: 8.71939754486084
Batch 100/384, Loss: 8.656660079956055
Batch 200/384, Loss: 8.407463073730469
Batch 300/384, Loss: 8.265393257141113
Epoch 2/15
Batch 0/384, Loss: 7.946831703186035
Batch 100/384, Loss: 7.817373752593994
Batch 200/384, Loss: 7.585287570953369
Batch 300/384, Loss: 7.441148281097412
Epoch 3/15
Batch 0/384, Loss: 7.112764835357666
Batch 100/384, Loss: 6.907504081726074
Batch 200/384, Loss: 6.585480690002441
Batch 300/384, Loss: 6.525500297546387
Epoch 4/15
Batch 0/384, Loss: 6.133518695831299
Batch 100/384, Loss: 6.094368934631348
Batch 200/384, Loss: 5.8725786209106445
Batch 300/384, Loss: 5.688343524932861
Epoch 5/15
Batch 0/384, Loss: 5.373782157897949
Batch 100/384, Loss: 5.395934104919434
Batch 200/384, Loss: 5.149231910705566
Batch 300/384, Loss: 5.156480312347412
Epoch 6/15

In [None]:
if not PRE_TRAINED:
  # Specify the file path where you want to save the model
  file_path = 'model_adam.pth'

  # Save the model
  torch.save(model.state_dict(), file_path)

In [None]:
!cp /content/model.pth /content/gdrive/MyDrive/

# Model Evaluation

In [None]:

if PRE_TRAINED:
  model.load_state_dict(torch.load("model.pth"))

# Remove last FC layer
model.linear4 = nn.Identity()

In [None]:
%%capture
model.eval()

In [None]:
# Create an instance of the ImageFolder dataset
!rm -rf ThreeF/.git
val_dataset_dir = "ThreeF"
val_dataset = ImageFolder(root=val_dataset_dir, transform=transform)
# Create a dataloader using the ImageFolder dataset
batch_size = len(val_dataset)
shuffle = True
val_dataloader = DataLoader(val_dataset, batch_size=batch_size, shuffle=shuffle)

In [None]:
len(val_dataset)

641

In [None]:
total_correct = 0
total_wrong = 0
total_samples = 0
threshold = .3

In [None]:
for i in range(len(val_dataset)-1):
  data = dataset[i][0] # Get the data point
  label_i = dataset[i][1] # Get the label
  # Compute the embedding for the data point
  with torch.no_grad():
    data = data.unsqueeze(0).to(device)
    embedding_i = model(data)

  for j in range(i+1,len(val_dataset)):
    data = dataset[j][0]
    with torch.no_grad():
      data = data.unsqueeze(0).to(device)
      embedding_j = model(data)
      label_j = dataset[j][1]

    similarity_score = F.cosine_similarity(embedding_i,
                                           embedding_j,
                                           dim=1)

    if similarity_score.item() >= threshold  and label_i == label_j:
      total_correct += 1
    if ((similarity_score.item() >= threshold  and label_i != label_j) or
      (similarity_score.item() < threshold  and label_i == label_j)):
      total_wrong += 1
    total_samples +=1

In [None]:

FPR = total_wrong / (total_wrong + total_samples - total_correct)
TPR = total_correct / total_samples

print("False Positive Rate (FPR):", FPR)
print("True Positive Rate (TPR):", TPR)