In [2]:
import torch
import numpy
import sklearn
import pprint
import torchvision
import sklearn
from sklearn.metrics import confusion_matrix
from sklearn.metrics import ConfusionMatrixDisplay
import torch
import torchvision
import random
import numpy as np
import matplotlib
import matplotlib.pyplot as plt
import torchvision.models as model
import torchvision.transforms as transforms
from torch.utils.data import Dataset, DataLoader
import torch.nn.functional as F
import torch.utils.model_zoo as model_zoo
import tarfile
import shutil
from torch.nn import DataParallel
from pathlib import Path
from PIL import Image
import cv2
import os
from retinaface import RetinaFace
from deepface import DeepFace
import torch.nn as nn
from PIL import Image
import math
import time
import lightning as L
from models.deepheadpose.code.hopenet import Hopenet
from models.deepheadpose.code.utils import softmax_temperature
from typing import Any
from lightning.pytorch.utilities.types import TRAIN_DATALOADERS
from pytorch_lightning.utilities.types import STEP_OUTPUT
from pytorch_lightning.tuner import Tuner
from torch import Tensor
from models.resnet50 import iresnet50

In [None]:
!nvidia-smi

In [3]:
device = torch.device("cuda")
ones = torch.ones(1, device= device)
if torch.cuda:
    mps_device = torch.device("cuda")
    x = torch.ones(1, device=mps_device)
    print (x)
else:
    print ("cuda device not found.")

tensor([1.], device='cuda:0')


In [None]:
class FocalLoss(torch.nn.Module):
    def __init__(self, weight=None, gamma=0, eps=1e-7):
        super(FocalLoss, self).__init__()
        self.gamma = gamma
        self.eps = eps
        self.ce = torch.nn.CrossEntropyLoss(weight=weight)

    def forward(self, input, target):
        logp = self.ce(input, target)
        p = torch.exp(-logp)
        loss = (1 - p) ** self.gamma * logp
        return loss.mean()

In [None]:
class ArcFaceLoss(torch.nn.Module):
    """Implement of large margin arc distance: :
        Args:
            in_features: size of each input sample
            out_features: size of each output sample
            s: norm of input feature
            m: margin

            cos(theta + m)
        """
    def __init__(self, in_features, out_features, s=30.0, m=0.50):
        super(ArcFaceLoss, self).__init__()
        self.in_features = in_features
        self.out_features = out_features
        self.s = s
        self.m = m
        self.weight = torch.nn.Parameter(torch.FloatTensor(out_features, in_features))
        torch.nn.init.xavier_uniform_(self.weight)

        self.cos_m = math.cos(m)
        self.sin_m = math.sin(m)
        self.th = math.cos(math.pi - m)
        self.mm = math.sin(math.pi - m) * m

    def forward(self, feature, label= None):
        # Normalize embeddings and weights
        feature = F.normalize(feature)
        weights = F.normalize(self.weight)

        # Compute the logit
        cos_theta = F.linear(feature, weights)
        
        if (label is None):
            return cos_theta

        # Find the angle between the weight and feature
        theta = torch.acos(cos_theta)

        # Add angular margin penalty
        marginal_target_logit = torch.cos(theta + self.m)
        
        marginal_target_logit = torch.where(cos_theta > self.th, marginal_target_logit, cos_theta - self.mm)

        # One-hot encoding
        one_hot = torch.zeros(cos_theta.size(), device=device)
       # one_hot = torch.zeros_like(cos_theta)
        one_hot.scatter_(1, label.view(-1, 1).long(), 1)

        # Compute class wise similarity score through element wise multiplication of one_hot ground truth and the marginal target logit
        score = (one_hot * marginal_target_logit) + ((1.0 - one_hot) * cos_theta)
        #score = torch.mul(marginal_target_logit, one_hot)

        # Rescale to s
        score *= self.s

        return score


In [None]:
# transform into tensors
transform_train = transforms.Compose([
    transforms.RandomHorizontalFlip(),
    transforms.RandomResizedCrop((112,112)),
    transforms.ToTensor(),
    transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
])

transform = transforms.Compose([
    transforms.Resize((112, 112)),
    transforms.ToTensor(),
    transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
])

transform_pose = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.CenterCrop(224),
    transforms.ToTensor(),
    transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
])



In [None]:
def make_folders(img_dir):
    # Set the root directory path where the subdirectories are located
    # Loop through each file in the directory
    for filename in os.listdir(img_dir):
        
        # Check if the file is a .ppm image
        if filename.endswith('.jpg'):
            
            # Get the person's name from the filename
            name = filename.split('_')[:-1]
            name = '_'.join(name)
            
            # Create a directory for the person if it doesn't exist
            if not os.path.exists(os.path.join(img_dir, name)):
                os.makedirs(os.path.join(img_dir, name))
            
            # Open the image and save it as a .jpeg in the person's directory
            img = Image.open(os.path.join(img_dir, filename))
            img.save(os.path.join(img_dir, name, filename.replace('.jpg', '.jpg')))
make_folders("./cplfw/aligned images/")

In [None]:
# Set your dataset directory path here
def MakeFoldersTrainTest(dataset_dir, dist_dir):
    # Set your train, val, and test ratios here
    train_ratio = 0.8
    test_ratio = 0.2

    # Get the list of subdirectories in the dataset directory
    subdirs = [name for name in os.listdir(dataset_dir) if os.path.isdir(os.path.join(dataset_dir, name))]

    # Shuffle the list of subdirectories
    random.shuffle(subdirs)

    # Split the subdirectories into train, val, and test sets
    train_subdirs = subdirs[:int(len(subdirs) * train_ratio)]
    test_subdirs = subdirs[int(len(subdirs) * (train_ratio)):]

    # Create the train, val, and test directories
    train_dir = os.path.join(dist_dir, "HELEN_train")
    os.makedirs(train_dir, exist_ok=True)
    test_dir = os.path.join(dist_dir, "HELEN_test")
    os.makedirs(test_dir, exist_ok=True)

    # Move the images from the subdirectories to the train, val, and test directories
    for subdir in train_subdirs:
        src_dir = os.path.join(dataset_dir, subdir)
        dst_dir = os.path.join(train_dir, subdir)
        os.makedirs(dst_dir, exist_ok=True)
        for img_file in os.listdir(src_dir):
            src_path = os.path.join(src_dir, img_file)
            dst_path = os.path.join(dst_dir, img_file)
            shutil.copy(src_path, dst_path)

    for subdir in test_subdirs:
        src_dir = os.path.join(dataset_dir, subdir)
        dst_dir = os.path.join(test_dir, subdir)
        os.makedirs(dst_dir, exist_ok=True)
        for img_file in os.listdir(src_dir):
            src_path = os.path.join(src_dir, img_file)
            dst_path = os.path.join(dst_dir, img_file)
            shutil.copy(src_path, dst_path)
dataset_dir = "./data/300WLPA_2d/HELEN_detected/"
MakeFoldersTrainTest(dataset_dir=dataset_dir, dist_dir= './data/300WLPA_2d/')


In [None]:
import pickle

with open('data/pickles/helen_train.pkl', 'rb') as f:
    content = pickle.load(f)
    
content

In [None]:
import pickle

# Open and load the .pkl file
with open('./test_sets/query_galleries_HELEN.pkl', 'rb') as f:
    data = pickle.load(f)

data
# print(data.keys())
# # Print the loaded data
# for key, value in data.items():
#     print('Pose Bin: {}, Gallery Length: {}, Query Length: {}'.format(key, len(value['gallery']), len(value['query'])))


In [None]:
def Transfer_Folders(source_path, dest_path)
    for root, dirs, files in os.walk(source_path, topdown= True):
    #     print(dirs)
        if len(dirs) == 0:
            ID = root.split('/')[-1]
            print(ID)
            file_path = os.path.join(dest_path, ID)
            os.makedirs(file_path, exist_ok= True)
            
            for file in files:
                img_path = os.path.join(root, file)
                face = RetinaFace.extract_faces(img_path= img_path, align= True, allow_upscaling= False)
                if len(face) > 0:
                    face = Image.fromarray(face[0])
                    face = face.convert("RGB")
                    plt.imshow(face)
                    plt.show(face)
                    print("Detected")
                    save_path = os.path.join(file_path, file)
                    face.save(save_path) 
                    print(save_path)
                else:
                    img = Image.open(img_path)
                    plt.imshow(img)
                    plt.show()
                    print("Not Detected or already cropped")
                    save_path = os.path.join(file_path, file)
                    print(save_path)
                    img.save(save_path)
# Transfer_Folders()

In [None]:
def convertImages(root_dir):
    # Convert .pgm files to .jpeg
    for root, dirs, files in os.walk(root_dir, topdown= True):
        
        for file in files:
            print(file)
            if "pgm" in file:
                filename = file
                file_path = root + "/" + filename
                img = Image.open(file_path)
                os.remove(file_path)
                img.save(file_path.replace('.pgm', '.jpeg'))
# convertImages("./P1E")

In [None]:
def Merge_Sets(main_folder, dest_folder):

    # Get the list of camera angle subfolders
    camera_folders = [folder for folder in os.listdir(main_folder)]
    os.makedirs(os.path.join(main_folder, dest_folder), exist_ok= True)
    # Iterate over camera angle subfolders
    for camera_folder in camera_folders:
        # Set the path to the camera angle subfolder
        camera_folder_path = os.path.join(main_folder, camera_folder)
        
        # Get the list of identity subfolders in the camera angle subfolder
        identity_folders = [folder for folder in os.listdir(camera_folder_path)]
        
        # Iterate over identity subfolders
        for identity_folder in identity_folders:
            # Set the path to the identity subfolder
            identity_folder_path = os.path.join(camera_folder_path, identity_folder)
            
            # Set the destination folder path (where all identity folders will be merged)
            destination_folder_path = os.path.join(main_folder, dest_folder, identity_folder)
            
            # Create the destination folder if it doesn't exist
            os.makedirs(destination_folder_path, exist_ok=True)
            
            # Get the list of image files in the identity subfolder
            image_files = [file for file in os.listdir(identity_folder_path) if file.endswith(".jpeg")]
            
            # Move each image file to the destination folder
            for image_file in image_files:

                source_file_path = os.path.join(identity_folder_path, image_file)
                destination_file_path = os.path.join(destination_folder_path, image_file)
                base_name, extension = os.path.splitext(image_file)
                if (os.path.exists(destination_file_path)):
                    count = 1
                    while True:
                        new_base_name = f"{base_name}_{str(count).zfill(2)}"
                        new_file_name = f"{new_base_name}{extension}"
                        new_destination_file_path = os.path.join(destination_folder_path, new_file_name)
                        if not os.path.exists(new_destination_file_path):
                            destination_file_path = new_destination_file_path
                            break
                        count += 1
                
                shutil.copy(source_file_path, destination_file_path)
# Merge_Sets("./P1E/", "P1E_Merge")   

In [None]:
source_path= './data/AFW/AFW_aligned'
for root, dirs, files in os.walk(source_path, topdown= True):
    print 

In [None]:
# Clean images by deploying RetinaFace
def DetectAndAlign(root_directory_path):

    # Walk from the root directory and replace every image in each subdirectory with their image in the subdirectory.
    for root, dirs, files in os.walk(root_directory_path, topdown= True):
        # Go through each file and conduct facial detection and alignment
        for file in files:
            if "jpg" in file:
                file_path = os.path.join(root, file)
                face = RetinaFace.extract_faces(img_path= file_path, align= True, allow_upscaling= False)
                if len(face) > 0:
                    face = Image.fromarray(face[0])
                    face = face.convert("RGB")
                    print("Detected " + file)
                    # plt.imshow(face)
                    # plt.show(face)
                    os.remove(file_path)
                    face.save(file_path)
                else:
                    print("Not Detected " + file )
          
        

dir_path = "./data/300WLPA_2d/AFW_not_detected/"
DetectAndAlign(dir_path)

In [None]:
pose_map = {
    '2-1': '30_-90', '2-2': '30_-67.5', '2-3': '30_-45', '2-4': '30_-22.5', '2-5': '30_0', 
    '2-6': '30_22.5', '2-7': '30_45', '2-8': '30_67.5', '2-9': '30_90',
    '3-1': '15_-90', '3-2': '15_-75', '3-3': '15_-60', '3-4': '15_-45', '3-5': '15_-30',
    '3-6': '15_-15', '3-7': '15_0', '3-8': '15_15', '3-9': '15_30', '3-10': '15_45', 
    '3-11': '15_60', '3-12': '15_75', '3-13': '15_90',
    '4-1': '0_-90', '4-2': '0_-75', '4-3': '0_-60', '4-4': '0_-45', '4-5': '0_-30',
    '4-6': '0_-15', '4-7': '0_0', '4-8': '0_15', '4-9': '0_30', '4-10': '0_45', 
    '4-11': '0_60', '4-12': '0_75', '4-13': '0_90',
    '5-1': '-15_-90', '5-2': '-15_-75', '5-3': '-15_-60', '5-4': '-15_-45', '5-5': '-15_-30',
    '5-6': '-15_-15', '5-7': '-15_0', '5-8': '-15_15', '5-9': '-15_30', '5-10': '-15_45', 
    '5-11': '-15_60', '5-12': '-15_75', '5-13': '-15_90',
    '6-1': '-30_-90', '6-2': '-30_-67.5', '6-3': '-30_-45', '6-4': '-30_-22.5', '6-5': '-30_0',
    '6-6': '-30_22.5', '6-7': '-30_45', '6-8': '-30_67.5', '6-9': '-30_90'
}

expression_list = ['White', "Happy", "Suprised", "Normal", "WearGlasses"]

for root, dirs, files in os.walk("./data/M2FPA/Train", topdown= True):
        # Go through each file and conduct facial detection and alignment
        for dir in dirs:
            if dir in expression_list:
                dir_path = os.path.join(root, dir)
                os.rmdir(dir_path)
                


In [14]:
pose_map = {
    '2-1': '30_-90', '2-2': '30_-67.5', '2-3': '30_-45', '2-4': '30_-22.5', '2-5': '30_0', 
    '2-6': '30_22.5', '2-7': '30_45', '2-8': '30_67.5', '2-9': '30_90',
    
    '3-1': '15_-90', '3-2': '15_-75', '3-3': '15_-60', '3-4': '15_-45', '3-5': '15_-30',
    '3-6': '15_-15', '3-7': '15_0', '3-8': '15_15', '3-9': '15_30', '3-10': '15_45', 
    '3-11': '15_60', '3-12': '15_75', '3-13': '15_90',
    
    '4-1': '0_-90', '4-2': '0_-75', '4-3': '0_-60', '4-4': '0_-45', '4-5': '0_-30',
    '4-6': '0_-15', '4-7': '0_0', '4-8': '0_15', '4-9': '0_30', '4-10': '0_45', 
    '4-11': '0_60', '4-12': '0_75', '4-13': '0_90',
    
    '5-1': '-15_-90', '5-2': '-15_-75', '5-3': '-15_-60', '5-4': '-15_-45', '5-5': '-15_-30',
    '5-6': '-15_-15', '5-7': '-15_0', '5-8': '-15_15', '5-9': '-15_30', '5-10': '-15_45', 
    '5-11': '-15_60', '5-12': '-15_75', '5-13': '-15_90',
    
    '6-1': '-30_-90', '6-2': '-30_-67.5', '6-3': '-30_-45', '6-4': '-30_-22.5', '6-5': '-30_0',
    '6-6': '-30_22.5', '6-7': '-30_45', '6-8': '-30_67.5', '6-9': '-30_90'
}

for root, dirs, files in os.walk("./data/M2FPA/Train", topdown= True):
        # Go through each file and conduct facial detection and alignment
        for file in files:
            if 'jpg' in file:
                file_path = os.path.join(root, file)
                expression = file_path.split('/')[-2]
                id = file_path.split('/')[0]
                cam_angle = file.split('-')[-2][-1] + file.split('-')[-1].strip('.jpg')
                new_file_name = id + "_" + expression + '_' + cam_angle
                
                file_parts = file_path.split('/')
                new_file_path = '/'.join(file_parts[:2]) + '/' + new_file_name + '.jpeg'
                print(new_file_name)

                


In [13]:

for root, dirs, files in os.walk("./data/M2FPA/Train", topdown= True):
        # Go through each file and conduct facial detection and alignment
        for file in files:
            if "jpeg" in file:
                img_path = os.path.join(root, file)
                pitch = float(file.split('_')[-2]) * -1
                yaw = float(file.split('_')[-1].strip('.jpeg')) * -1
                
                file_parts = file.split('_')
                new_file_path = file_parts[0] + '_' + file_parts[1] + '_' + str(pitch) + '_' + str(yaw) + '.jpeg'
                new_file_path = os.path.join(root, new_file_path)
                print(new_file_path)
                shutil.copy(img_path, new_file_path)
                os.remove(img_path)
                
                

./data/M2FPA/Train/0035/0035_WearGlasses_-30.0_67.5
./data/M2FPA/Train/0035/0035_Happy_-15.0_90.0
./data/M2FPA/Train/0035/0035_Surprise_30.0_67.5
./data/M2FPA/Train/0035/0035_WearGlasses_-0.0_30.0
./data/M2FPA/Train/0035/0035_Happy_-0.0_60.0
./data/M2FPA/Train/0035/0035_WearGlasses_-15.0_45.0
./data/M2FPA/Train/0035/0035_WearGlasses_-30.0_-45.0
./data/M2FPA/Train/0035/0035_Normal_15.0_60.0
./data/M2FPA/Train/0035/0035_Normal_-0.0_15.0
./data/M2FPA/Train/0035/0035_WearGlasses_-0.0_15.0
./data/M2FPA/Train/0035/0035_Happy_30.0_-22.5
./data/M2FPA/Train/0035/0035_Surprise_-0.0_30.0
./data/M2FPA/Train/0035/0035_Normal_15.0_75.0
./data/M2FPA/Train/0035/0035_Happy_-0.0_-90.0
./data/M2FPA/Train/0035/0035_Normal_15.0_-30.0
./data/M2FPA/Train/0035/0035_Surprise_15.0_75.0
./data/M2FPA/Train/0035/0035_WearGlasses_-30.0_-67.5
./data/M2FPA/Train/0035/0035_WearGlasses_30.0_-0.0
./data/M2FPA/Train/0035/0035_Surprise_15.0_-15.0
./data/M2FPA/Train/0035/0035_Surprise_-15.0_-90.0
./data/M2FPA/Train/0035/00

In [None]:
for root, dirs, files in os.walk("./data/300WLPA_2d/LFPW_detected", topdown= False):
        # Go through each file and conduct facial detection and alignment
        if len(files) == 0 and len(dirs) == 0:
                print(root)
                os.rmdir(root)

In [None]:
for root, dirs, files in os.walk("./data/300WLPA_2d/AFW_not_detected", topdown= False):
        # Go through each file and conduct facial detection and alignment
        if len(dirs) == 0:
            for file in files:
                source_path = os.path.join(root, file)
                print(source_path)
                dst_path = os.path.join(os.path.relpath())

In [None]:
# Removes the depth files from the MAPIR Dataset
def Remove_Files(root_dir):
    for root, dirs, files in os.walk(root_dir, topdown= True):
        print(files)
        for file in files:
            if "_" in file:
                os.remove(os.path.join(root, file))
root_dir = "./cplfw/"
Remove_Files(root_dir)

In [None]:
# Counts images per each identity as well as the total number of images
def count_file(root_dir):
    total_count_file = 0
    # folder_count = 0
    for folder_name in os.listdir(root_dir):
        sub_dir = os.path.join(root_dir, folder_name)
        total_count = 0
        folder_count = 0
        for root, dirs, files in os.walk(sub_dir, topdown= True):
            count = 0
            folder_count += 1
            for file in files:
                total_count += 1
                total_count_file += 1
                count += 1

        # print(f'{folder_name} images: {total_count}')
        print(f'{folder_name} identities: {folder_count}, count: {total_count}')
    print(f'Total images: {total_count_file}')
count_file(root_dir="./data/300WLPA_2d/HELEN_test_bins_rad2degrees")

In [None]:
class LFWDataset(torch.utils.data.Dataset):
    def __init__(self, root_dir, transform= None):
        self.root_dir = root_dir
        self.data = []
        self.labels = []
        self.transform = transform
        for label, subdir in enumerate(os.listdir(root_dir)):
            subdir_path = os.path.join(root_dir, subdir)
            if not os.path.isdir(subdir_path):
                continue
            for img_file in os.listdir(subdir_path):
                img_path = os.path.join(subdir_path, img_file)
                self.data.append(img_path)
                self.labels.append(label)

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        img_path = self.data[idx]
        img = Image.open(img_path).convert("RGB")
        if (self.transform is not None):
            img = self.transform(img)
        label = self.labels[idx]
        return img, label

In [None]:
LPA_D = torchvision.datasets.ImageFolder(root= "./data/300WLPA_2d/AFW", transform= transform_pose)
num_classes = len(set(LPA_D.classes))

In [None]:
# Instantiate HopeNet
pose_estimator = Hopenet(torchvision.models.resnet.Bottleneck, [3, 4, 6, 3], 66)
pose_estimator.load_state_dict(torch.load("./models/deepheadpose/hopenet_robust_alpha1.pkl", map_location= device))
pose_estimator.to(device)
pose_estimator.eval()

In [None]:
img, label = LPA_D[21]
print(LPA_D.samples[21])
# faces = RetinaFace.extract_faces(img_path= "./MAPIR-Faces-Dataset/001/image_3-0_rgb.png")
# faces = Image.fromarray(faces[0])
# faces.convert("RGB")
# faces = transform_pose(faces)
# img = faces
img = torch.unsqueeze(img, dim=0).to(device)
print(img.shape)
# print(label)
yaw, pitch, roll = pose_estimator(img)
img = torch.squeeze(img, dim= 0).to("cpu")
plt.imshow(img.permute(1,2,0))
plt.show()
print('yaw: {}, pitch: {}, roll: {}'.format(yaw, pitch, roll))
print(yaw.shape)

In [None]:
 # Binned predictions
_, yaw_bpred = torch.max(yaw.data, 1)
_, pitch_bpred = torch.max(pitch.data, 1)
_, roll_bpred = torch.max(roll.data, 1)

print(yaw_bpred)

In [None]:
idx_tensor = [idx for idx in range(66)]
idx_tensor = torch.FloatTensor(idx_tensor).to(device)
# Continuous predictions
yaw_predicted = softmax_temperature(yaw.data, 1)
pitch_predicted = softmax_temperature(pitch.data, 1)
roll_predicted = softmax_temperature(roll.data, 1)

# Get continuous predictions in degrees.
yaw_predicted = torch.sum(yaw_predicted.data[0] * idx_tensor) * 3 - 99
pitch_predicted = torch.sum(pitch_predicted.data[0] * idx_tensor) * 3 - 99
roll_predicted = torch.sum(roll_predicted.data[0] * idx_tensor) * 3 - 99

print(roll_predicted)
print(yaw_predicted)
print(pitch_predicted)

In [None]:
# Estimate the Pitch and Yaw of every image in the MAPIR Dataset
pitch_list = []
yaw_list = []

img_poses = {}
for idx, (img, label) in enumerate(LPA_D):
    img = torch.unsqueeze(img, dim=0).to(device)
    yaw, pitch, roll = pose_estimator(img)
    
    idx_tensor = [idx for idx in range(66)]
    idx_tensor = torch.FloatTensor(idx_tensor).to(device)
    # Continuous predictions
    yaw_predicted = softmax_temperature(yaw.data, 1)
    pitch_predicted = softmax_temperature(pitch.data, 1)
    roll_predicted = softmax_temperature(roll.data, 1)

    # Get continuous predictions in degrees.
    yaw_predicted = torch.sum(yaw_predicted.data[0] * idx_tensor) * 3 - 99
    pitch_predicted = torch.sum(pitch_predicted.data[0] * idx_tensor) * 3 - 99
    roll_predicted = torch.sum(roll_predicted.data[0] * idx_tensor) * 3 - 99

    img_poses[LPA_D[idx]] = (pitch_predicted.to("cpu").numpy(), yaw_predicted.to("cpu").numpy(), idx)
    pitch_list.append(pitch_predicted.to("cpu").numpy())
    yaw_list.append(yaw_predicted.to("cpu").numpy())

    print("Pitch: {}, Yaw: {}".format(pitch_predicted, yaw_predicted))

In [None]:
# Create a figure and axes for the plot
fig, ax = plt.subplots()
# Create histograms
ax.hist(pitch_list, bins=50, alpha=0.5, label='Pitch')
ax.hist(yaw_list, bins=50, alpha=0.5, label='yaw')

# Set labels and title
ax.set_xlabel('Angles')
ax.set_ylabel('Frequency')
ax.set_title('Distribution of Pitch and Yaw Angles')

# Add a legend
ax.legend()

# Display the plot
plt.show()

# Show the plot
plt.show()

In [None]:
count = 0 
identity_count = {}
for img_path, (pitch, yaw, data_idx) in img_poses.items():
    img, label = SurvFace_dataset_estimate[data_idx]
    if (pitch < 25 and pitch > -25) and (yaw < 25 and yaw > -25):
        count += 1 
        if (label not in identity_count):
            identity_count[label] = 1
        else:
            identity_count[label] += 1
for label, val in identity_count.items():
    print("{}: {}".format(label, val))
print("Total images in pose bin {}".format(count))

In [None]:
# Get a set of images
def Get_Pose_Bin(img_path, dest_path, yaw_range, pitch_range):
    
    # Define the ranges for the pitch and yaw pose bins
    yaw_min, yaw_max = yaw_range
    pitch_min, pitch_max = pitch_range
    
    # Make directory if path exists
    os.makedirs(dest_path, exist_ok= True)

    # Make a new folder for pose bins and organize each image by class folder 
    for root, dirs, files in os.walk(img_path, topdown= True):
        if len(dirs) == 0: 
            ID = root.split('/')[-1]
            # file_path = os.path.join(dest_path, ID)
            # os.makedirs(file_path, exist_ok= True)
            for file in files:
                if "jpg" in file:

                    # Open image path and convert to tensor 
                    img_path = os.path.join(root, file)
                    img = Image.open(img_path)
                    img = transform_pose(img)
                    img = torch.unsqueeze(img, dim= 0).to("mps")

                    # Pass Tensor to HopeNet Pose estimator
                    yaw, pitch, roll = pose_estimator(img)
                    
                    idx_tensor = [idx for idx in range(66)]
                    idx_tensor = torch.FloatTensor(idx_tensor).to(device)
                    # Continuous predictions
                    yaw_predicted = softmax_temperature(yaw.data, 1)
                    pitch_predicted = softmax_temperature(pitch.data, 1)
                    roll_predicted = softmax_temperature(roll.data, 1)

                    # Get continuous predictions in degrees.
                    yaw_predicted = torch.sum(yaw_predicted.data[0] * idx_tensor) * 3 - 99
                    pitch_predicted = torch.sum(pitch_predicted.data[0] * idx_tensor) * 3 - 99

                    
                    # Check if image is within desired bin range 
                    # if frontal == True:
                    if (yaw_predicted > yaw_min) and (yaw_predicted < yaw_max) and (pitch_predicted > pitch_min) and (pitch_predicted < pitch_max):
                        file_dest_path = os.path.join(dest_path, ID)
                        file_source_path = os.path.join(root, file)
                        print("Yaw: {}".format(yaw_predicted))
                        print("Pitch: {}".format(pitch_predicted))
                        
                        os.makedirs(file_dest_path, exist_ok= True)
                        shutil.copy(file_source_path, file_dest_path)
                    # else:
                    #     if (yaw_predicted < yaw_min) or (yaw_predicted > yaw_max) :
                    #         file_dest_path = os.path.join(dest_path, ID)
                    #         file_source_path = os.path.join(root, file)
                    #         print("Yaw Non-frontal: {}".format(yaw_predicted))
                    #         os.makedirs(file_dest_path, exist_ok= True)
                    #         shutil.copy(file_source_path, file_dest_path)
                    
Get_Pose_Bin("./data/300WLPA_2d/AFW_aligned/", "/data/300WLPA_2d/AFW_aligned_-30_15_-30_30/", yaw_range=(-30, 30), pitch_range= (-30, 15))


In [None]:
#Implemenet CNN Baacosh_ckbone
Path_resnet = Path('./arcface_weights_kaggle/QMUL_weights/frontal/resnet50_weights_frontal_survface_epoch_330.pth') 
if (Path_resnet.is_file()):
    models = iresnet50(pretrained=True)
    models.fc.requires_grad = True
#     models = DataParallel(models)
    models.load_state_dict(torch.load('./arcface_weights_kaggle/QMUL_weights/frontal/resnet50_weights_frontal_survface_epoch_330.pth', map_location= 'mps'))
    # models.load_state_dict(torch.load('/Users/carterung/CV_Research/arcface_weights_kaggle/resnet50_weights3.pth', map_location= device))
    print('weights loaded')
else:
    models = iresnet50(pretrained= True)
    for param in models.parameters():
        param.requires_grad_(False)
    models.fc.requires_grad_(True)
    models.to(device)

models.to(device)
models

In [None]:
Path_arcface = Path('./arcface_weights_kaggle/QMUL_weights/frontal/arcface_weights_frontal_survface_epoch_330.pth')

if (Path_arcface.is_file()):
    metric_fc = ArcFaceLoss(512, num_classes, 64.0, 0.5)
    # metric_fc = DataParallel(metric_fc)
    metric_fc.requires_grad = True
    metric_fc.load_state_dict(torch.load('./arcface_weights_kaggle/QMUL_weights/frontal/arcface_weights_frontal_survface_epoch_330.pth', map_location= torch.device("mps")))
else:
    metric_fc = ArcFaceLoss(512, num_classes, 64.0, 0.5)
    # metric_fc = DataParallel(metric_fc)
    metric_fc.requires_grad = True

metric_fc.to(device)
print(metric_fc.state_dict())

# Analyze model parameters and layers
#summary(models, (3, 112, 112))

In [None]:
PATH = "/Users/carterung/CV_Research/arcface_weights_kaggle/QMUL_weights/frontal/frontal_epoch=249-step=16750.ckpt"
models = models.load_state_dict(torch.load('./', map_location= device))

In [None]:
# Get the embedding list and label list
with torch.no_grad():
    models.eval()
    embedding_list = []
    label_list = []
    # Fill query and gallery set
    for i, (img, label) in enumerate(data_loader_test):
        img, label = img.to(device), label.to(device)
        embeddings = models(img)
        embedding_list.append(embeddings)
        label_list.append(label)

    label_list = torch.cat(label_list)
    embedding_list = torch.cat(embedding_list)
    

In [None]:
# Make class dictionary
class_dict = {}
label_list = label_list.to("cpu").numpy().tolist()
for i in range(len(label_list)):
    if (label_list[i] not in class_dict):
        class_dict[label_list[i]] = [embedding_list[i]]
    else:
        class_dict[label_list[i]].append(embedding_list[i])


In [None]:
for label in class_dict.keys():
    class_dict[label] = torch.stack(class_dict[label])

In [None]:
# Fill query and gallery sets
query_set = {"embedding": [],
            "label": []}
gallery_set = {"embedding": [],
            "label": []}
for label in class_dict.keys():
    if (len(class_dict[label]) > 1):
        query_set["embedding"].append(class_dict[label][0])
        query_set["label"].append(label)
        
        for j in range(1, len(class_dict[label])):
            gallery_set["embedding"].append(class_dict[label][j])
            gallery_set["label"].append(label)

In [None]:
gallery_set["embedding"] = torch.stack(gallery_set["embedding"])
label_dict = {}
for idx in range(len(gallery_set["label"])):
    label_dict[idx] = gallery_set["label"][idx]
gallery_set["label"] = label_dict   

print(len(gallery_set["label"]))
print(len(query_set["embedding"]))
print(gallery_set["embedding"].shape)
print(query_set["embedding"][0].shape)

In [None]:
# Probe images to test against the embedding gallery
with torch.inference_mode():
    models.eval()
    correct_counts = [0.0] * len(gallery_set["label"])
    total_counts = [0.0] * len(gallery_set["label"])
    top_k_accuracy = [0.0] * len(gallery_set["label"])

    for probe_idx in range(len(query_set['embedding'])):
        probe_emb = query_set['embedding'][probe_idx]
        probe_label = query_set['label'][probe_idx]
        cos_similarity = F.cosine_similarity(probe_emb, gallery_set['embedding'], dim= -1)

        # Iterate over all values of k
        for k in range(1, len(gallery_set["label"]) + 1):
            ret, pred = torch.topk(cos_similarity, k)
            # Compute accuracy for this value of k
            correct_counts[k-1] += int(any(probe_label == gallery_set['label'][i] for i in pred.to("cpu").numpy().tolist()[:k]))
            total_counts[k-1] += 1
            top_k_accuracy[k-1] = correct_counts[k-1] / total_counts[k-1]

# Plot the accuracy vs top-k rank
import matplotlib.pyplot as plt
print("Rank 1 Accuracy: {}, Rank 5 Accuracy: {}".format(top_k_accuracy[0], top_k_accuracy[4]))
plt.plot(range(1, len(top_k_accuracy)+1), top_k_accuracy)
plt.xlabel('Top-k rank')
plt.ylabel('Accuracy')
plt.show()

In [None]:
models = torchvision.models.resnet50(pretrained=True)
# Evaluate against my model 
with torch.inference_mode():
    correct_count = 0.0
    models.eval()
    # metric_fc.eval()
    y_true = []
    y_pred = []
    cos_list_same = []
    cos_list_diff = []
    cos_list = []
    models.to("cpu")
  
    for idx, (img1, img2, label) in enumerate(lfw_pairs_test):
    # for idx in range(600, len(lfw_pairs_test)):
        # Do Face Detection Alignment
        # label = 0
     
        img1_path, img2_path = lfw_pairs_test.data[idx]
        faces1 = RetinaFace.extract_faces(img1_path, align= True, allow_upscaling= True)
        faces2 = RetinaFace.extract_faces(img2_path, align= True, allow_upscaling= True)
        
        max_cos_sim = -50
        # Find the most prominent matches in the images
        for i in range(len(faces1)):
            img1 = Image.fromarray(faces1[i])
            img1 = img1.convert("RGB")
            img1 = transform(img1)
            
            img1.to(device)
            img1 = torch.unsqueeze(img1, dim= 0)
            embed_1 = models(img1)
            
            for j in range(len(faces2)):
                img2 = Image.fromarray(faces2[j])
                img2 = img2.convert("RGB")
                img2 = transform(img2)

                img2.to(device)
                img2 = torch.unsqueeze(img2, dim= 0)
                embed_2 = models(img2)
                face_cos_sim = F.cosine_similarity(embed_1, embed_2)
                if (face_cos_sim > max_cos_sim):
                    max_cos_sim = face_cos_sim
                    main_img1 = img1
                    main_img2 = img2
                    
        
        # fig = plt.figure()
        # ax1 = fig.add_subplot(2, 2, 1)
        # ax1.imshow(main_img1.squeeze().permute(1,2,0))
        # ax2 = fig.add_subplot(2, 2, 2)
        # ax2.imshow(main_img2.squeeze().permute(1,2,0))
        # plt.show()

        cos_sim = max_cos_sim
        cos_list.append(cos_sim.to("cpu"))
        if (label == 1):
            cos_list_same.append(cos_sim.to("cpu"))
        else:
            cos_list_diff.append(cos_sim.to("cpu"))
            
        print('Cosine Similarity: {}, Prediction: {}, Ground Truth: {}'.format(cos_sim.to("cpu").numpy()[0], cos_sim.to("cpu").numpy()[0] > 0.313, label))
        correct_count += label == (cos_sim.to("cpu").numpy()[0] > 0.313)
        
        # Add labels and predictions
        y_true.append(label)
        y_pred.append(cos_sim.to("cpu").numpy()[0] > 0.313)

    print(correct_count / len(lfw_pairs_test))

In [None]:
# Make Confusion Matrix
cm = confusion_matrix(y_true, y_pred)

# Display Matrix
cm_display = ConfusionMatrixDisplay(cm).plot()

In [None]:
# Compute a histogram of true and false labels for cosine similarity
verification_scores_same = np.array([score.item() for score in cos_list_same])
verification_scores_diff = np.array([score.item() for score in cos_list_diff])
verification_scores = np.array([score.item() for score in cos_list])


# Create histograms
plt.hist(verification_scores_same, bins=50, alpha=0.5, label='Same Person')
plt.hist(verification_scores_diff, bins=50, alpha=0.5, label='Different Person')

# Add legend, labels, and title
plt.legend(loc='upper right')
plt.xlabel('Verification score')
plt.ylabel('Frequency')
plt.title('Histograms of face verification scores')

# Show the plot
plt.show()

In [None]:
# Make Graph and Histogram of Data Distributions
plt.plot(y_true, cos_list, 'bo', markersize=.018)
plt.ylabel('Cos_similarity')
plt.xlabel('Label')
plt.show()

In [None]:
from deepface import DeepFace
models.eval()
models.to(device)

PATHOne= '/Users/carterung/CV_Research/QMUL-SurvFace/Face_Verification_Test_Set/verification_images/15_cam3_1.jpg'
PATHTwo= '/Users/carterung/CV_Research/QMUL-SurvFace/Face_Verification_Test_Set/verification_images/25_cam3_1.jpg'
Carter_Pic1_Path = PATHOne
Carter_Pic2_Path = PATHTwo


Carter_Pic1 = Image.open(PATHOne)
Carter_Pic2 = Image.open(PATHTwo)
Carter_Pic1 = transform(Carter_Pic1)
Carter_Pic2 = transform(Carter_Pic2)

fig = plt.figure()
ax1 = fig.add_subplot(2, 2, 1)
ax1.imshow(Carter_Pic1.permute(1,2,0))
ax2 = fig.add_subplot(2, 2, 2)
ax2.imshow(Carter_Pic2.permute(1,2,0))
plt.show()


Carter_Pic1, Carter_Pic2 = Carter_Pic1.to(device), Carter_Pic2.to(device)
Carter_Pic1 = torch.unsqueeze(Carter_Pic1, dim= 0)
Carter_Pic2 = torch.unsqueeze(Carter_Pic2, dim= 0)
emb_1 = models(Carter_Pic1)
emb_2 = models(Carter_Pic2)
cosine_sim = F.cosine_similarity(emb_1, emb_2)
obj = DeepFace.verify(Carter_Pic1_Path, Carter_Pic2_Path
          , model_name = 'ArcFace', detector_backend = 'skip')
print('DeepFace Result: {}'.format(1 - obj['distance']))
print('My Model: {}'.format(cosine_sim))
Img_T1 = Image.open(PATHOne)
Img_T2 = Image.open(PATHTwo)
fig = plt.figure()
ax1 = fig.add_subplot(2, 2, 1)
ax1.imshow(Img_T1)
ax2 = fig.add_subplot(2, 2, 2)
ax2.imshow(Img_T2)
plt.show()