<a href="https://colab.research.google.com/github/CassieHuang22/CIS581-Final-Project-Masked-Facial-Recognition/blob/main/facial_recognition_masked.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

#Training on Masked Faces

This notebook trains our model for our dataset of masked faces and creates graphs of our training results.

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
cd /content/drive/MyDrive/CIS5810/Final Project/Model Weights Masked

/content/drive/MyDrive/CIS5810/Final Project/Model Weights Masked


In [None]:
!pip install facenet-pytorch

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting facenet-pytorch
  Downloading facenet_pytorch-2.5.2-py3-none-any.whl (1.9 MB)
[K     |████████████████████████████████| 1.9 MB 15.3 MB/s 
Installing collected packages: facenet-pytorch
Successfully installed facenet-pytorch-2.5.2


In [None]:
import torchvision
import torch
from torchvision import transforms
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

In [None]:
from torch.utils.data import Dataset

In [None]:
import os
from typing import Any, Callable, Dict, List, Optional, Tuple, Union

from PIL import Image

from torchvision.datasets.utils import check_integrity, download_and_extract_archive, download_url, verify_str_arg
from torchvision.datasets import VisionDataset
from torchvision import transforms

## Create Masked Dataset

Due to modifying the original LFW dataset, we have to create our own custom dataset. We modified the original LFW code by changing the _change_integrity function.

In [None]:
class _LFW(VisionDataset):

    base_folder = "lfw-py"
    download_url_prefix = "http://vis-www.cs.umass.edu/lfw/"

    file_dict = {
        "original": ("lfw", "lfw.tgz", "a17d05bd522c52d84eca14327a23d494"),
        "funneled": ("lfw_funneled", "lfw-funneled.tgz", "1b42dfed7d15c9b2dd63d5e5840c86ad"),
        "deepfunneled": ("lfw-deepfunneled", "lfw-deepfunneled.tgz", "68331da3eb755a505a502b5aacb3c201"),
    }
    checksums = {
        "pairs.txt": "9f1ba174e4e1c508ff7cdf10ac338a7d",
        "pairsDevTest.txt": "5132f7440eb68cf58910c8a45a2ac10b",
        "pairsDevTrain.txt": "4f27cbf15b2da4a85c1907eb4181ad21",
        "people.txt": "450f0863dd89e85e73936a6d71a3474b",
        "peopleDevTest.txt": "e4bf5be0a43b5dcd9dc5ccfcb8fb19c5",
        "peopleDevTrain.txt": "54eaac34beb6d042ed3a7d883e247a21",
        "lfw-names.txt": "a6d0a479bd074669f656265a6e693f6d",
    }
    annot_file = {"10fold": "", "train": "DevTrain", "test": "DevTest"}
    names = "lfw-names.txt"

    def __init__(
        self,
        root: str,
        split: str,
        image_set: str,
        view: str,
        transform: Optional[Callable] = None,
        target_transform: Optional[Callable] = None,
        download: bool = False,
    ) -> None:
        super().__init__(os.path.join(root, self.base_folder), transform=transform, target_transform=target_transform)

        self.image_set = verify_str_arg(image_set.lower(), "image_set", self.file_dict.keys())
        images_dir, self.filename, self.md5 = self.file_dict[self.image_set]

        self.view = verify_str_arg(view.lower(), "view", ["people", "pairs"])
        self.split = verify_str_arg(split.lower(), "split", ["10fold", "train", "test"])
        self.labels_file = f"{self.view}{self.annot_file[self.split]}.txt"
        self.data: List[Any] = []

        if download:
            self.download()

        #if not self._check_integrity():
        #    raise RuntimeError("Dataset not found or corrupted. You can use download=True to download it")

        self.images_dir = os.path.join(self.root, images_dir)

    def _loader(self, path: str) -> Image.Image:
        with open(path, "rb") as f:
            img = Image.open(f)
            return img.convert("RGB")

    def _check_integrity(self) -> bool:
        st1 = check_integrity(os.path.join(self.root, self.filename), self.md5)
        st2 = check_integrity(os.path.join(self.root, self.labels_file), self.checksums[self.labels_file])
        if not st1 or not st2:
            return False
        if self.view == "people":
            return check_integrity(os.path.join(self.root, self.names), self.checksums[self.names])
        return True

    def download(self) -> None:
        if self._check_integrity():
            print("Files already downloaded and verified")
            return
        url = f"{self.download_url_prefix}{self.filename}"
        download_and_extract_archive(url, self.root, filename=self.filename, md5=self.md5)
        download_url(f"{self.download_url_prefix}{self.labels_file}", self.root)
        if self.view == "people":
            download_url(f"{self.download_url_prefix}{self.names}", self.root)

    def _get_path(self, identity: str, no: Union[int, str]) -> str:
        return os.path.join(self.images_dir, identity, f"{identity}_{int(no):04d}.jpg")

    def extra_repr(self) -> str:
        return f"Alignment: {self.image_set}\nSplit: {self.split}"

    def __len__(self) -> int:
        return len(self.data)


In [None]:
import numpy as np
cache_present = list(np.load('./cache_present.npy'))
cache_absent = list(np.load('./cache_absent.npy'))

In [None]:
def file_exists(path):
  if path in cache_present:
    return True
  elif path in cache_absent:
    return False
  else:
    if os.path.isfile(path):
      cache_present.append(path)
      return True
    else:
      cache_absent.append(path)
      return False

In [None]:
"""The following code saves our file lists so they can be loaded later and help train our model faster."""
import numpy as np
cache_present_np = np.array(cache_present)
cache_absent_np = np.array(cache_absent)
np.save('./cache_present', cache_present_np)
np.save('./cache_absent', cache_present_np)

In [None]:
class LFWPairs(_LFW):
    """`LFW <http://vis-www.cs.umass.edu/lfw/>`_ Dataset.

    Args:
        root (string): Root directory of dataset where directory
            ``lfw-py`` exists or will be saved to if download is set to True.
        split (string, optional): The image split to use. Can be one of ``train``, ``test``,
            ``10fold``. Defaults to ``10fold``.
        image_set (str, optional): Type of image funneling to use, ``original``, ``funneled`` or
            ``deepfunneled``. Defaults to ``funneled``.
        transform (callable, optional): A function/transform that  takes in an PIL image
            and returns a transformed version. E.g, ``transforms.RandomRotation``
        target_transform (callable, optional): A function/transform that takes in the
            target and transforms it.
        download (bool, optional): If true, downloads the dataset from the internet and
            puts it in root directory. If dataset is already downloaded, it is not
            downloaded again.

    """

    def __init__(
        self,
        root: str,
        split: str = "10fold",
        image_set: str = "funneled",
        transform: Optional[Callable] = None,
        target_transform: Optional[Callable] = None,
        download: bool = False,
    ) -> None:
        super().__init__(root, split, image_set, "pairs", transform, target_transform, download)

        self.pair_names, self.data, self.targets = self._get_pairs(self.images_dir)

    def _get_pairs(self, images_dir: str) -> Tuple[List[Tuple[str, str]], List[Tuple[str, str]], List[int]]:
        pair_names, data, targets = [], [], []
        with open(os.path.join(self.root, self.labels_file)) as f:
            lines = f.readlines()
            if self.split == "10fold":
                n_folds, n_pairs = lines[0].split("\t")
                n_folds, n_pairs = int(n_folds), int(n_pairs)
            else:
                n_folds, n_pairs = 1, int(lines[0])
            s = 1

            for fold in range(n_folds):
                matched_pairs = [line.strip().split("\t") for line in lines[s : s + n_pairs]]
                unmatched_pairs = [line.strip().split("\t") for line in lines[s + n_pairs : s + (2 * n_pairs)]]
                s += 2 * n_pairs
                for pair in matched_pairs:
                    img1, img2, same = self._get_path(pair[0], pair[1]), self._get_path(pair[0], pair[2]), 1
                    pair_names.append((pair[0], pair[0]))
                    if file_exists(img1) and file_exists(img2):
                      data.append((img1, img2))
                      targets.append(same)
                for pair in unmatched_pairs:
                    img1, img2, same = self._get_path(pair[0], pair[1]), self._get_path(pair[2], pair[3]), 0
                    pair_names.append((pair[0], pair[2]))
                    if file_exists(img1) and file_exists(img2):
                      data.append((img1, img2))
                      targets.append(same)
                    

        return pair_names, data, targets

    def __getitem__(self, index: int) -> Tuple[Any, Any, int]:
        """
        Args:
            index (int): Index

        Returns:
            tuple: (image1, image2, target) where target is `0` for different indentities and `1` for same identities.
        """
        img1, img2 = self.data[index]
        img1, img2 = self._loader(img1), self._loader(img2)
        target = self.targets[index]

        if self.transform is not None:
            img1, img2 = self.transform(img1), self.transform(img2)

        if self.target_transform is not None:
            target = self.target_transform(target)

        return img1, img2, target


In [None]:
train_transforms = transforms.Compose([transforms.Resize((160, 160)), transforms.ToTensor(), transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))]) 
test_transforms = transforms.Compose([transforms.Resize((160, 160)), transforms.ToTensor(), transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))]) 

In [None]:
def get_dataset_examples(self, n=10, test_dataset=False):
        """
        Returns `n` random images form dataset. If `test_dataset` parameter
        is not provided or False it will return images from training part of dataset.
        If `test_dataset` parameter is True it will return images from testing part of dataset.
        """
        if test_dataset:
            data_path = self.test_data_path
        else:
            data_path = self.train_data_path

        images = os.listdir(os.path.join(data_path, 'inputs'))
        images = random.sample(images, n)
        inputs = [os.path.join(data_path, 'inputs', img) for img in images]
        outputs = [os.path.join(data_path, 'outputs', img) for img in images]
        return inputs, outputs

In [None]:
lfw_train = LFWPairs(".", split='train', transform=train_transforms)

In [None]:
lfw_test = LFWPairs(".", split='test', transform=test_transforms)

## Training

In [None]:
from facenet_pytorch import InceptionResnetV1

In [None]:
class face_rec(torch.nn.Module):
  def __init__(self):
    super().__init__()
    #self.resnet_18 = torchvision.models.resnet18()
    self.inception_resnet = InceptionResnetV1(pretrained='vggface2').eval()
    self.fc_layers = torch.nn.Sequential(torch.nn.Linear(512, 1), torch.nn.Sigmoid())

  def forward(self, img1, img2):
    encoding_1 = self.inception_resnet(img1)
    encoding_2 = self.inception_resnet(img2)
    input_fc = torch.abs(encoding_1 - encoding_2)
    out = self.fc_layers(input_fc)
    return out

In [None]:
trainloader = torch.utils.data.DataLoader(lfw_train, batch_size=16,
                                          shuffle=True)
testloader = torch.utils.data.DataLoader(lfw_test, batch_size=16)

In [None]:
N = len(lfw_train)
val_N = len(lfw_test)

In [None]:
lr = 1e-3
weight_decay = 1e-2

In [None]:
def train_model(face_model, criterion, optimizer, trainloader, testloader, epochs):
  train_losses = []
  validation_losses = []
  validation_errors = []
  epochs_list = []
  net = face_model.to(device)
  best_val_accuracy = 0
  # Freeze model inception_resnet parameters
  for param in net.inception_resnet.parameters():
    param.requires_grad = False
  for epoch in range(epochs):
    epoch_loss = 0.0
    flag = 0
    if epoch == 20 and flag == 0:
      for op_params in optimizer.param_groups:
        op_params['lr'] = 1e-3
      flag = 1
    net.train()
    for i, (img1, img2, labels) in enumerate(trainloader):
      images1 = img1.to(device)
      images2 = img2.to(device)
      labels = labels.to(device)
      outputs = net(images1, images2)
      loss = criterion(torch.flatten(outputs), labels.float())
      optimizer.zero_grad()
      loss.backward()
      epoch_loss += loss.item() * images1.shape[0]
      optimizer.step()
      #if ((i+1) % 20 == 0):
      #  print("Epoch: " + str(epoch + 1) + ", Step: " + str(i+1) + ", Loss = " + str(loss.item()))
    net.eval()
    with torch.no_grad():
      correct = 0
      total = 0
      val_loss = 0
      for test_img1, test_img2, labels in testloader:
        test_images1 = test_img1.to(device)
        test_images2 = test_img2.to(device)
        labels = labels.to(device)
        outputs = net(test_images1, test_images2)
        val_loss += criterion(torch.flatten(outputs), labels.float()).item() * test_images1.shape[0]
        predictions = torch.flatten(torch.tensor(outputs.clone().detach() > 0.5, dtype=int))
        correct += (predictions == labels).sum().item()
        total += labels.size(0)
    val_accuracy = correct / total
    train_losses.append(epoch_loss / N)
    validation_losses.append(val_loss / val_N)
    validation_errors.append(1 - val_accuracy)
    epochs_list.append(epoch)
    if(((epoch + 1) > epochs // 3) and (val_accuracy > best_val_accuracy)):
      best_val_accuracy = val_accuracy
      file_name = "MASKED_weights_lr"+str(lr)+"_wd"+str(weight_decay)+"_epoch"+str(epochs)
      torch.save(net.state_dict(), file_name)
    print("Epoch: " + str(epoch + 1) + ", Epoch-loss: " + str(epoch_loss / N) + ", Accuracy-test: " + str(val_accuracy))
  
  return train_losses, validation_losses, validation_errors, epochs_list

In [None]:
facenet = face_rec()
optimizer = torch.optim.Adam(facenet.parameters(), lr=lr, weight_decay=weight_decay)
criterion = torch.nn.BCELoss()

  0%|          | 0.00/107M [00:00<?, ?B/s]

In [None]:
train_losses, validation_losses, validation_errors, epochs_list = train_model(facenet, criterion, optimizer, trainloader, testloader, 20)

KeyboardInterrupt: ignored

## Evaluation

In [None]:
best_model = face_rec().to(device)
best_model.load_state_dict(torch.load("/content/drive/MyDrive/CIS5810/Final Project/Model Weights Masked/MASKED_weights_lr0.001_wd0.01_epoch20"))
best_model.eval()

face_rec(
  (inception_resnet): InceptionResnetV1(
    (conv2d_1a): BasicConv2d(
      (conv): Conv2d(3, 32, kernel_size=(3, 3), stride=(2, 2), bias=False)
      (bn): BatchNorm2d(32, eps=0.001, momentum=0.1, affine=True, track_running_stats=True)
      (relu): ReLU()
    )
    (conv2d_2a): BasicConv2d(
      (conv): Conv2d(32, 32, kernel_size=(3, 3), stride=(1, 1), bias=False)
      (bn): BatchNorm2d(32, eps=0.001, momentum=0.1, affine=True, track_running_stats=True)
      (relu): ReLU()
    )
    (conv2d_2b): BasicConv2d(
      (conv): Conv2d(32, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
      (bn): BatchNorm2d(64, eps=0.001, momentum=0.1, affine=True, track_running_stats=True)
      (relu): ReLU()
    )
    (maxpool_3a): MaxPool2d(kernel_size=3, stride=2, padding=0, dilation=1, ceil_mode=False)
    (conv2d_3b): BasicConv2d(
      (conv): Conv2d(64, 80, kernel_size=(1, 1), stride=(1, 1), bias=False)
      (bn): BatchNorm2d(80, eps=0.001, momentum=0.1, affine=

In [None]:
def evaluate_model(net):
  with torch.no_grad():
      correct = 0
      total = 0
      val_loss = 0
      for test_img1, test_img2, labels in testloader:
        test_images1 = test_img1.to(device)
        test_images2 = test_img2.to(device)
        labels = labels.to(device)
        outputs = net(test_images1, test_images2)
        val_loss += criterion(torch.flatten(outputs), labels.float()).item() * test_images1.shape[0]
        predictions = torch.flatten(torch.tensor(outputs.clone().detach() > 0.5, dtype=int))
        correct += (predictions == labels).sum().item()
        total += labels.size(0)
  return correct / total

In [None]:
print("Best accuracy:", evaluate_model(best_model))

  predictions = torch.flatten(torch.tensor(outputs.clone().detach() > 0.5, dtype=int))


Best accuracy: 0.8199195171026157


## Training Plots
The code below creates plots of our results for training.

In [None]:
import matplotlib.pyplot as plt
import matplotlib
import numpy as np

matplotlib.rcParams.update({'font.size': 20})

losses = [0.6911138793244896,
0.6880981432176945,
0.6859944732082555,
0.6848479186185993, 
0.6823498958154118, 
0.681148484369861, 
0.6791600811146217,
0.67881919519865, 
0.6773789730584253,
0.6744122678588315,
0.6752931060412888,
0.6739502938841279,
0.6743421871412619, 
0.6712695394259216, 
0.6713780253754612, 
0.6701562615131221, 
0.6695251382128222, 
0.6687141039119772, 
0.6680673688663872, 
0.6666260593604689]
epochs = list(range(1, 21))
plt.figure(figsize=(15, 10))
plt.scatter(epochs, losses, color='red')
plt.plot(epochs, losses, color='red')
plt.xticks(range(1, 21))
plt.xlabel("Epochs")
plt.ylabel("Loss Value")
plt.title("Loss vs. Training Epochs")

plt.show()

accuracy = [0.5,
0.5,
0.5412474849094567,
0.7665995975855131,
0.7223340040241448,
0.6408450704225352,
0.5845070422535211,
 0.7676056338028169,
0.6680080482897385,
0.607645875251509,
0.6901408450704225,
0.7183098591549296,
0.7132796780684104,
0.6448692152917505,
0.7816901408450704,
0.8199195171026157,
0.8158953722334004,
0.7746478873239436,
0.6297786720321932,
0.7907444668008048]
epochs = list(range(1, 21))
plt.figure(figsize=(15, 10))
plt.scatter(epochs, 1-np.array(accuracy), color='purple')
plt.plot(epochs, 1-np.array(accuracy), color='purple')
plt.xticks(range(1, 21))
plt.xlabel("Epochs")
plt.ylabel("Test Error")
plt.title("Test Error vs. Training Epochs")

plt.show()