## Face Classification and Verification


**Project: Face Classification & Verification using Convolutional Neural Networks**

**Overview:** 
- Developed CNN-based architectures for face verification, focusing on constructing effective convolutional neural networks and generating discriminative, generalizable feature representations.
- Implemented a face classifier to extract and analyze facial features (like skin tone, hair color, nose size) from images, converting them into fixed-length feature vectors or face embeddings.
- Designed a verification system to compute the similarity between feature vectors of two images, determining if they represent the same person.

**Key Achievements:**
- Successfully implemented and trained a resnet50 model that can distinguish facial features with high accuracy (90% for recognition & 60% for verification), thereby enhancing the performance of face verification tasks.
- Developed and optimized a system capable of effectively comparing feature vectors to generate a similarity score, crucial for accurate identity verification.

# Preliminaries

In [None]:
!nvidia-smi # to see what GPU you have

In [None]:
!pip install wandb torchsummary twilio timm --quiet

In [None]:
import torch
from torchsummary import summary
import torchvision
import os
import gc
from tqdm import tqdm
from PIL import Image
import numpy as np
import pandas as pd
from sklearn.metrics import accuracy_score
import glob
import wandb
device = 'cuda' if torch.cuda.is_available() else 'cpu'
print("Device: ", device)

In [None]:
from google.colab import drive # Link your drive if you are a colab user
drive.mount('/content/drive') # Models in this project take a long time to get trained and make sure to save it here

# Download Data from Kaggle

In [None]:

!pip install --upgrade --force-reinstall --no-deps kaggle 

!mkdir /root/.kaggle

with open("/root/.kaggle/kaggle.json", "w+") as f:
    f.write('{"username":"username","key":"key"}') 
    # Put your kaggle username & key here

!chmod 600 /root/.kaggle/kaggle.json

# Configs

In [None]:
config = {
    'batch_size': 128, # Increase this if your GPU can handle it
    'lr': 0.1, # change to 0.01 after 100 epochs, then 0.003 after 100 epochs, then 0.001 for another 100 epochs
    'epochs': 120, 
}

# Classification Dataset

Transformations: 


*   Increased the size of images, since it makes features easily recognizable.
*   Added randomCrop, RandomHorizontalFlip, RandAugment, and finally normalized



In [None]:
DATA_DIR = '/content/data/11-785-f22-hw2p2-classification/'
TRAIN_DIR = os.path.join(DATA_DIR, "classification/train") 
VAL_DIR = os.path.join(DATA_DIR, "classification/dev")
TEST_DIR = os.path.join(DATA_DIR, "classification/test")

# Transforms using torchvision - Refer https://pytorch.org/vision/stable/transforms.html

train_transforms = torchvision.transforms.Compose([
    torchvision.transforms.Resize((256, 256)),
    torchvision.transforms.RandomCrop((224, 224)),
    torchvision.transforms.RandomHorizontalFlip(p=0.5),
    # torchvision.transforms.RandAugment(),  # uncomment after training for the 1st 100 epochs
    torchvision.transforms.ToTensor(),
    torchvision.transforms.Normalize(mean=0, std=1),
])
# Most torchvision transforms are done on PIL images. So you convert it into a tensor at the end with ToTensor()
# But there are some transforms which are performed after ToTensor() : e.g - Normalization
# Normalization Tip - Do not blindly use normalization that is not suitable for this dataset

val_transforms = torchvision.transforms.Compose([
    torchvision.transforms.Resize((256, 256)),
    torchvision.transforms.ToTensor(),
    torchvision.transforms.Normalize(mean=0, std=1),
])


train_dataset = torchvision.datasets.ImageFolder(TRAIN_DIR, transform = train_transforms)
val_dataset = torchvision.datasets.ImageFolder(VAL_DIR, transform = val_transforms)


# Create data loaders
train_loader = torch.utils.data.DataLoader(train_dataset, batch_size = config['batch_size'], 
                                           shuffle = True,num_workers = 4, pin_memory = True)
val_loader = torch.utils.data.DataLoader(val_dataset, batch_size = config['batch_size'], 
                                         shuffle = False, num_workers = 2)

Visualized my images after transformations to view the changes.

In [None]:
import matplotlib.pyplot as plt

fig, axs = plt.subplots(3, 6, figsize=(20,11))
fig.suptitle('Random pictures from train dataset', fontsize=20)
for ax in axs.flatten():
    n = np.random.randint(len(train_dataset))
    img = train_dataset[n][0]
    img = img.numpy().transpose((1, 2, 0))
    mean = np.array([0.4802, 0.4481, 0.3975])
    std = np.array([0.2302, 0.2265, 0.2262])
    img = std * img + mean
    img = np.clip(img, 0, 1)
    # ax.set_title(encoder_labels.inverse_transform([train_dataset[n][1]])[0])
    ax.imshow(img)

In [None]:
# You can do this with ImageFolder as well, but it requires some tweaking
class ClassificationTestDataset(torch.utils.data.Dataset):

    def __init__(self, data_dir, transforms):
        self.data_dir   = data_dir
        self.transforms = transforms

        # This one-liner basically generates a sorted list of full paths to each image in the test directory
        self.img_paths  = list(map(lambda fname: os.path.join(self.data_dir, fname), sorted(os.listdir(self.data_dir))))

    def __len__(self):
        return len(self.img_paths)
    
    def __getitem__(self, idx):
        return self.transforms(Image.open(self.img_paths[idx]))

In [None]:
test_dataset = ClassificationTestDataset(TEST_DIR, transforms = val_transforms)
test_loader = torch.utils.data.DataLoader(test_dataset, batch_size = config['batch_size'], shuffle = False,
                         drop_last = False, num_workers = 2)

In [None]:
print("Number of classes: ", len(train_dataset.classes))
print("No. of train images: ", train_dataset.__len__())
print("Shape of image: ", train_dataset[0][0].shape)
print("Batch size: ", config['batch_size'])
print("Train batches: ", train_loader.__len__())
print("Val batches: ", val_loader.__len__())

# CNN Architecture

The Architecture used in Resnet50
Implementation referenced from :

*   https://arxiv.org/abs/1512.03385v1
*   https://blog.paperspace.com/writing-resnet-from-scratch-in-pytorch/
* https://github.com/pytorch/vision/blob/main/torchvision/models/resnet.py



In [None]:
import torch
import torch.nn as nn

class block(nn.Module):
    def __init__(
        self, in_channels, intermediate_channels, identity_downsample=None, stride=1):
        super(block, self).__init__()
        self.expansion = 4
        
        self.conv1 = nn.Conv2d(in_channels, intermediate_channels, kernel_size=1, stride=1, padding=0, bias=False)
        self.bn1 = nn.BatchNorm2d(intermediate_channels)
        
        self.conv2 = nn.Conv2d(
            intermediate_channels,
            intermediate_channels,
            kernel_size=3,
            stride=stride,
            padding=1,
            bias=False)
        self.bn2 = nn.BatchNorm2d(intermediate_channels)
              
        self.conv3 = nn.Conv2d(
            intermediate_channels,
            intermediate_channels * self.expansion,
            kernel_size=1,
            stride=1,
            padding=0,
            bias=False)
        self.bn3 = nn.BatchNorm2d(intermediate_channels * self.expansion)
        self.relu = nn.ReLU()

        self.identity_downsample = identity_downsample
        self.stride = stride

    def forward(self, x):
      
        identity = x.clone()

        x = self.conv1(x)
        x = self.bn1(x)
        x = self.relu(x)
        x = self.conv2(x)
        x = self.bn2(x)
        x = self.relu(x)
        x = self.conv3(x)
        x = self.bn3(x)

        if self.identity_downsample is not None:

            identity = self.identity_downsample(identity)

        x += identity
        x = self.relu(x)
        return x


class ResNet(nn.Module):
    def __init__(self, block, layers, image_channels, num_classes):
        super(ResNet, self).__init__()
        self.in_channels = 64
        self.conv1 = nn.Conv2d(image_channels, 64, kernel_size=7, stride=2, padding=3, bias=False)
        self.bn1 = nn.BatchNorm2d(64)
        self.relu = nn.ReLU()
        self.maxpool = nn.MaxPool2d(kernel_size=3, stride=2, padding=1)

        # Essentially the entire ResNet architecture are in these 4 lines below
        self.layer1 = self._make_layer(
            block, layers[0], intermediate_channels=64, stride=1
        )
        self.layer2 = self._make_layer(
            block, layers[1], intermediate_channels=128, stride=2
        )
        self.layer3 = self._make_layer(
            block, layers[2], intermediate_channels=256, stride=2
        )
        self.layer4 = self._make_layer(
            block, layers[3], intermediate_channels=512, stride=2
        )

        self.avgpool = nn.AdaptiveAvgPool2d((1, 1))
        self.fc = nn.Linear(512 * 4, num_classes)


    def _make_layer(self, block, num_residual_blocks, intermediate_channels, stride): #intermediate_channels : Output channels
        identity_downsample = None
        layers = []
        if stride != 1 or self.in_channels != intermediate_channels * 4:
            identity_downsample = nn.Sequential(
                nn.Conv2d(
                    self.in_channels,
                    intermediate_channels * 4,
                    kernel_size=1,
                    stride=stride,
                    bias=False
                ),
                nn.BatchNorm2d(intermediate_channels * 4),
            )

        layers.append(block(self.in_channels, intermediate_channels, identity_downsample, stride)
        
        )

        # The expansion size is always 4 for ResNet 50,101,152
        self.in_channels = intermediate_channels * 4

        # For example for first resnet layer: 256 will be mapped to 64 as intermediate layer,
        # then finally back to 256. Hence no identity downsample is needed, since stride = 1,
        # and also same amount of channels.
        for i in range(num_residual_blocks - 1):
            layers.append(block(self.in_channels, intermediate_channels))

        return nn.Sequential(*layers)

    def forward(self, x, return_feats = False):

        x = self.conv1(x)
        x = self.bn1(x)
        x = self.relu(x)
        x = self.maxpool(x)


        x = self.layer1(x)
        x = self.layer2(x)
        x = self.layer3(x)
        x = self.layer4(x)

        feats = self.avgpool(x)
        feats = feats.reshape(feats.shape[0], -1)
        out = self.fc(feats)

        if return_feats:
            return feats
        else:
            return out

#### ResNet50

model = ResNet(block, [3, 4, 6, 3],3,7000).to(device)
summary(model, (3, 224, 224))

# Setup everything for training

In [None]:
from torch.optim import lr_scheduler

criterion = torch.nn.CrossEntropyLoss(label_smoothing=0.1)
optimizer = torch.optim.SGD(model.parameters(), lr=config['lr'], momentum=0.9, weight_decay=1e-4)
scheduler = lr_scheduler.ReduceLROnPlateau(optimizer, mode='max', patience=20, factor=0.1)
scaler = torch.cuda.amp.GradScaler()

# Let's train!

In [None]:
def train(model, dataloader, optimizer, criterion):
    
    model.train()

    # Progress Bar 
    batch_bar = tqdm(total=len(dataloader), dynamic_ncols=True, leave=False, position=0, desc='Train', ncols=5) 
    
    num_correct = 0
    total_loss = 0

    for i, (images, labels) in enumerate(dataloader):
        
        optimizer.zero_grad() # Zero gradients

        images, labels = images.to(device), labels.to(device)
        
        with torch.cuda.amp.autocast(): # This implements mixed precision. Thats it! 
            outputs = model(images)
            loss = criterion(outputs, labels)

        # Update no. of correct predictions & loss as we iterate
        num_correct += int((torch.argmax(outputs, axis=1) == labels).sum())
        total_loss += float(loss.item())
        
        
        # tqdm lets you add some details so you can monitor training as you train.
        batch_bar.set_postfix(
            acc="{:.04f}%".format(100 * num_correct / (config['batch_size']*(i + 1))),
            loss="{:.04f}".format(float(total_loss / (i + 1))),
            num_correct=num_correct,
            lr="{:.04f}".format(float(optimizer.param_groups[0]['lr'])))
        
        scaler.scale(loss).backward() # This is a replacement for loss.backward()
        scaler.step(optimizer) # This is a replacement for optimizer.step()
        scaler.update() 
       

        batch_bar.update() # Update tqdm bar

    batch_bar.close() # You need this to close the tqdm bar

    acc = 100 * num_correct / (config['batch_size']* len(dataloader))
    total_loss = float(total_loss / len(dataloader))

    return acc, total_loss

In [None]:
def validate(model, dataloader, criterion):
  
    model.eval()
    batch_bar = tqdm(total=len(dataloader), dynamic_ncols=True, position=0, leave=False, desc='Val', ncols=5)

    num_correct = 0.0
    total_loss = 0.0

    for i, (images, labels) in enumerate(dataloader):
        
        # Move images to device
        images, labels = images.to(device), labels.to(device)
        
        # Get model outputs
        with torch.inference_mode():
            outputs = model(images)
            loss = criterion(outputs, labels)

        num_correct += int((torch.argmax(outputs, axis=1) == labels).sum())
        total_loss += float(loss.item())

        batch_bar.set_postfix(
            acc="{:.04f}%".format(100 * num_correct / (config['batch_size']*(i + 1))),
            loss="{:.04f}".format(float(total_loss / (i + 1))),
            num_correct=num_correct)

        batch_bar.update()
        
    batch_bar.close()
    acc = 100 * num_correct / (config['batch_size']* len(dataloader))
    total_loss = float(total_loss / len(dataloader))
    return acc, total_loss

In [None]:
gc.collect() # These commands help you when you face CUDA OOM error
torch.cuda.empty_cache()

# Wandb

In [None]:
import os
os.environ['WANDB_API_KEY']='API_KEY'
wandb.login() #API Key is in your wandb account, under settings (wandb.ai/settings)

In [None]:
# Create your wandb run
run = wandb.init(
    name = "resnet50", ## Wandb creates random run names if you skip this field
    reinit = True, ### Allows reinitalizing runs when you re-run this cell
    # run_id = ### Insert specific run id here if you want to resume a previous run
    # resume = "must" ### You need this to resume previous runs, but comment out reinit = True when using this
    project = "hw2p2-ablations", ### Project should be created in your wandb account 
    config = config ### Wandb Config for your run
)

# Experiments

In [None]:
best_valacc = 0.0

for epoch in range(config['epochs']):

    curr_lr = float(optimizer.param_groups[0]['lr'])

    train_acc, train_loss = train(model, train_loader, optimizer, criterion)
    
    print("\nEpoch {}/{}: \nTrain Acc {:.04f}%\t Train Loss {:.04f}\t Learning Rate {}".format(
        epoch + 1,
        config['epochs'],
        train_acc,
        train_loss,
        curr_lr))
    
    val_acc, val_loss = validate(model, val_loader, criterion)


    print("Val Acc {:.04f}%\t Val Loss {:.04f}".format(val_acc, val_loss))

    wandb.log({"train_loss":train_loss, 'train_Acc': train_acc, 'validation_Acc':val_acc, 
               'validation_loss': val_loss, "learning_Rate": curr_lr})


    # #Save model in drive location if val_acc is better than best recorded val_acc
    if val_acc >= best_valacc:
        print("Saving model")
        torch.save({'model_state_dict':model.state_dict(),
                  'optimizer_state_dict':optimizer.state_dict(),
                  # 'scheduler_state_dict':scheduler.state_dict(),
                  'val_acc': val_acc, 
                  'epoch': epoch}, f'./checkpoint_res50_{best_valacc}.pth',
                  _use_new_zipfile_serialization=False)
        best_valacc = val_acc
        wandb.save(f'checkpoint_res50_{best_valacc}.pth')
      # You may find it interesting to exlplore Wandb Artifcats to version your models
run.finish()

# Classification Task: Testing

In [None]:
def test(model,dataloader):
    model.eval()
    batch_bar = tqdm(total=len(dataloader), dynamic_ncols=True, position=0, leave=False, desc='Test')
    test_results = []
    for i, (images) in enumerate(dataloader):
        images = images.to(device)
        with torch.inference_mode():
            outputs = model(images)
        outputs = torch.argmax(outputs, axis=1).detach().cpu().numpy().tolist()
        test_results.extend(outputs)

        batch_bar.update()
    batch_bar.close()
    return test_results

In [None]:
test_results = test(model, test_loader)

# Verification Task: Validation

In this verification task, we are presented with a generalized scenario that involves:

Working with X unknown identities and Y known identities.
The objective is to match these X unknown identities to the Y known identities.
We have access to a verification dataset comprising 1000 known identities and 1000 unknown identities. The unknown identities are divided into two groups: 200 for development (dev) and 800 for testing. Our task is to compare the unknown identities against the 1000 known identities and accurately assign an identity to each image from the set of unknowns.

To achieve this, we will utilize or fine-tune our model, originally trained for classification, to compare images between known and unknown identities using a similarity metric. This will involve assigning labels to the unknown identities.

The performance of our model will be evaluated based on the quality of embeddings/features it generates on images or faces that it has not encountered during its training phase for classification

Similarity metric used : **CosineSimilarity**


No triplet loss was used.

In [None]:
known_regex = "/content/data/verification/known/*/*"
known_paths = [i.split('/')[-2] for i in sorted(glob.glob(known_regex))] 
# This obtains the list of known identities from the known folder

unknown_regex = "/content/data/verification/unknown_test/*" # Change the directory accordingly for the test set

# We load the images from known and unknown folders
unknown_images = [Image.open(p) for p in tqdm(sorted(glob.glob(unknown_regex)))]
known_images = [Image.open(p) for p in tqdm(sorted(glob.glob(known_regex)))]


transforms = torchvision.transforms.Compose([
    torchvision.transforms.ToTensor()])

unknown_images = torch.stack([transforms(x) for x in unknown_images])
known_images  = torch.stack([transforms(y) for y in known_images ])
#Print your shapes here to understand what we have done


similarity_metric = torch.nn.CosineSimilarity(dim= 1, eps= 1e-6) 

In [None]:
def eval_verification(unknown_images, known_images, model, similarity, batch_size= config['batch_size'], mode='test'): 

    unknown_feats, known_feats = [], []

    batch_bar = tqdm(total=len(unknown_images)//batch_size, dynamic_ncols=True, position=0, leave=False, desc=mode)
    model.eval()

    # We load the images as batches for memory optimization and avoiding CUDA OOM errors
    for i in range(0, unknown_images.shape[0], batch_size):
        unknown_batch = unknown_images[i:i+batch_size] # Slice a given portion upto batch_size
        
        with torch.no_grad():
            unknown_feat = model(unknown_batch.float().to(device), return_feats=False) # Setting the return_features to False increased my accuracy by 1%       
        unknown_feats.append(unknown_feat)
        batch_bar.update()
    
    batch_bar.close()
    
    batch_bar = tqdm(total=len(known_images)//batch_size, dynamic_ncols=True, position=0, leave=False, desc=mode)
    
    for i in range(0, known_images.shape[0], batch_size):
        known_batch = known_images[i:i+batch_size] 
        with torch.no_grad():
              known_feat = model(known_batch.float().to(device), return_feats=False)
          
        known_feats.append(known_feat)
        batch_bar.update()

    batch_bar.close()

    # Concatenate all the batches
    unknown_feats = torch.cat(unknown_feats, dim=0)
    known_feats = torch.cat(known_feats, dim=0)

    similarity_values = torch.stack([similarity(unknown_feats, known_feature) for known_feature in known_feats])

    predictions = similarity_values.argmax(0).cpu().numpy()

    pred_id_strings = [known_paths[i] for i in predictions]
    
    if mode == 'val':
        true_ids = pd.read_csv('/content/data/verification/dev_identities.csv')['label'].tolist()
        accuracy = accuracy_score(pred_id_strings, true_ids)
        print("Verification Accuracy = {}".format(accuracy))
    
    return pred_id_strings

In [None]:
pred_id_strings = eval_verification(unknown_images, known_images, model, similarity_metric, config['batch_size'], mode='test')

In [None]:
with open("verification_results.csv", "w+") as f:
    f.write("id,label\n")
    for i in range(len(pred_id_strings)):
        f.write("{},{}\n".format(i, pred_id_strings[i]))