## Kinship Classification using Face Transformer Features

**Basic Idea**: Use the features extracted by pretrained Face Transformer to identify whether an image pair are kin or non-kin <br>
**Face Transformer Repository**: https://github.com/zhongyy/Face-Transformer

In [None]:
import torch
from torch import nn, optim
import sys
import os
import torchvision.transforms as transforms
import numpy as np
from PIL import Image
from tqdm import tqdm
from pathlib import Path
from torch.utils.data import Dataset, DataLoader, TensorDataset
from torch.optim.lr_scheduler import LinearLR
import pandas as pd

# Download from Face-Transformer Repository
from vit_pytorch.vit_face.vit_face import ViT_face
from vit_pytorch.vit_face.vits_face import ViTs_face

### Paths to change start###
ROOT = "/home/UG/c200140/recognizing-faces-in-the-wild" # Change this path 

IMAGE_PATH = ROOT + "/train" # Folder of train images

# Folder where all features extracted by different models will be stored
SENET_FEATURE_PATH = ROOT + "/train-senet-vgg2-features"
RESNET_FEATURE_PATH = ROOT + "/train-resnet-vgg2-features"
VIT_FACE_FEATURE_PATH = ROOT + "/train-vit-face-features"
    
TRAIN_FOLDER = ROOT + "/Excel/Train (New)" # Folder of training data excel sheets for different validation set
TEST_FOLDER = ROOT + "/Excel/Test (New)" # Folder of validation data excel sheets for different validation set

TEST_FILE_PATH = ROOT + "/sample_submission.csv" # Location of sample_submission.csv
### Paths to change end###

device = torch.device("cuda") if torch.cuda.is_available() else torch.device("cpu")
print(f"Using {device}")

## Step 1: Convert Train Images to Vision Transformer Output Vector and Save Them
This eliminates the need to transform the train images everytime. 

In [None]:
### Creating Folders of Each Family and its Members just like the train folder
### The folder will now store the extracted features instead of the actual images. 
def create_folders_for_feature(model_name):
    path = None
    if model_name == "senet":
        path = SENET_FEATURE_PATH
    elif model_name == "resnet":
        path = RESNET_FEATURE_PATH
    elif model_name == "vit-face":
        path = VIT_FACE_FEATURE_PATH
        
    try:
        if path != None:
            for family in os.listdir(IMAGE_PATH):
                os.mkdir(path + f"/{family}")
                for member in os.listdir(IMAGE_PATH + f"/{family}"):
                    os.mkdir(path + f"/{family}/{member}")
    except FileExistsError: 
        print("Folder already exist!")

create_folders_for_feature("vit-face")

In [None]:
model = ViTs_face(
            loss_type='CosFace',
            GPU_ID=device,
            num_class=93431,
            image_size=112,
            patch_size=8,
            ac_patch_size=12,
            pad=4,
            dim=512,
            depth=20,
            heads=8,
            mlp_dim=2048,
            dropout=0.1,
            emb_dropout=0.1
        )
model.load_state_dict(torch.load(f"{ROOT}/ViT-Face/vits_face.pth"))

model.to(device)
model.eval()

In [None]:
# Define a transform to convert PIL image to a Torch tensor
transform = transforms.Compose([
    transforms.Resize((112, 112)), # ViT-face only accepts size (112, 112)
    transforms.PILToTensor(),
])

# Convert every image in the train folder to a feature vector
# prevent gradient calculation for faster extraction
with torch.no_grad():
    for family in tqdm(os.listdir(IMAGE_PATH), total = len(os.listdir(IMAGE_PATH))):
        for member in os.listdir(IMAGE_PATH + f"/{family}"):
            for img_name in os.listdir(IMAGE_PATH + f"/{family}/{member}"):
                
                # Load the image as Torch tensor
                image = transform(Image.open(IMAGE_PATH + f"/{family}/{member}/{img_name}").convert('RGB')).float().reshape(1, 3, 112, 112).to(device)
                
                # Flatten into one-dimensional feature vector
                img_feature = model(image).flatten().cpu().numpy()
                
                # Save into folder
                np.save(VIT_FACE_FEATURE_PATH + f"/{family}/{member}/{Path(img_name).stem}.npy", img_feature)

## Step 2: Training Neural Network Classifier

#### Functions for Building Training and Validation Dataset from Transformer Output in Step 1

In [None]:
def build_dataset_row(path1, path2, model_name):
    if model_name == "senet":
        feature1 = np.load(SENET_FEATURE_PATH + f"/{path1}.npy").reshape(1, 2048)
        feature2 = np.load(SENET_FEATURE_PATH + f"/{path2}.npy").reshape(1, 2048)
    elif model_name == "resnet":
        feature1 = np.load(RESNET_FEATURE_PATH + f"/{path1}.npy").reshape(1, 2048)
        feature2 = np.load(RESNET_FEATURE_PATH + f"/{path2}.npy").reshape(1, 2048)
    elif model_name == "vit_face":
        feature1 = np.load(VIT_FACE_FEATURE_PATH + f"/{path1}.npy").reshape(1, 512)
        feature2 = np.load(VIT_FACE_FEATURE_PATH + f"/{path2}.npy").reshape(1, 512)
        
    feature_pair = np.concatenate([feature1, feature2], axis = 0)
    return feature_pair 

def load_data(val_set, model_name, feature_size):
    KIN_CSV_FILE_TRAIN = TRAIN_FOLDER + f"/train-kin-pairs-{val_set}.csv"
    KIN_CSV_FILE_TEST = TEST_FOLDER + f"/test-kin-pairs-{val_set}.csv"
    NON_KIN_CSV_FILE_TRAIN = TRAIN_FOLDER + f"/train-non-kin-pairs-{val_set}.csv"
    NON_KIN_CSV_FILE_TEST = TEST_FOLDER + f"/test-non-kin-pairs-{val_set}.csv"

    # Loading Kinship and Non-Kinship Dataset
    kin_df_train = pd.read_csv(KIN_CSV_FILE_TRAIN)
    kin_df_train["Label"] = 1
    kin_df_test = pd.read_csv(KIN_CSV_FILE_TEST)
    kin_df_test["Label"] = 1
    KINSHIP_SIZE_TRAIN = len(kin_df_train) 
    KINSHIP_SIZE_TEST = len(kin_df_test)

    non_kin_df_train = pd.read_csv(NON_KIN_CSV_FILE_TRAIN)
    non_kin_df_train["Label"] = 0
    non_kin_df_test = pd.read_csv(NON_KIN_CSV_FILE_TEST)
    non_kin_df_test["Label"] = 0

    train_df = pd.concat([kin_df_train, non_kin_df_train]).reset_index(drop=True)
    test_df =  pd.concat([kin_df_test, non_kin_df_test]).reset_index(drop=True)

    print(f"Kinship Dataset Size (Train): {len(kin_df_train)}")
    print(f"Kinship Dataset Size (Test): {len(kin_df_test)}")
    print(f"Non-Kinship Dataset Size (Train): {len(non_kin_df_train)}")
    print(f"Non-Kinship Dataset Size (Test): {len(non_kin_df_test)}")

    print(f"Train Size: {len(train_df)}")
    print(f"Test Size: {len(test_df)}")

    
    train_dataset = np.empty([KINSHIP_SIZE_TRAIN * 2,2, feature_size], dtype=np.float32)
    train_label = np.empty([KINSHIP_SIZE_TRAIN * 2], dtype=np.float32)

    for index, row in tqdm(train_df.iterrows(), total = len(train_df), desc="Progress"):
        img_pair = build_dataset_row(row[0], row[1], model_name)
        train_dataset[index] = img_pair # Doing this is faster than concatenating in a for loop
        train_label[index] = row[2]

    test_dataset = np.empty([KINSHIP_SIZE_TEST * 2,2,feature_size], dtype=np.float32)
    test_label = np.empty([KINSHIP_SIZE_TEST * 2], dtype=np.float32)

    for index, row in tqdm(test_df.iterrows(), total = len(test_df), desc="Progress"):
        img_pair = build_dataset_row(row[0], row[1], model_name)
        test_dataset[index] = img_pair
        test_label[index] = row[2]
    
    train_dataset = torch.tensor(train_dataset, dtype = torch.float32).to(device)
    train_label = torch.tensor(train_label, dtype = torch.float32).to(device)
    test_dataset = torch.tensor(test_dataset, dtype = torch.float32).to(device)
    test_label = torch.tensor(test_label, dtype = torch.float32).to(device)

    return train_dataset, train_label, test_dataset, test_label

#### Functions for Training

In [None]:
# Reference: https://stackoverflow.com/questions/71998978/early-stopping-in-pytorch
class EarlyStopper_Checkpoint():
    def __init__(self, patience=1, save_path = None, min_delta=0, metric = "val_loss"):
        self.patience = patience
        self.min_delta = min_delta
        self.counter = 0
        self.save_path = save_path
        self.metric = metric
        if metric == "val_loss":
            self.min_validation_loss = np.inf
        elif metric == "val_acc":
            self.max_validation_accuracy = 0

    def early_stop(self, metric):
        if self.metric == "val_loss":
            if self.check_metric(metric):
                self.min_validation_loss = metric
                self.counter = 0
            elif metric > (self.min_validation_loss + self.min_delta):
                self.counter += 1
                if self.counter >= self.patience:
                    return True
            return False
        elif self.metric == "val_acc":
            if self.check_metric(metric):
                self.max_validation_accuracy = metric
                self.counter = 0
            elif metric < (self.max_validation_accuracy + self.min_delta):
                self.counter += 1
                if self.counter >= self.patience:
                    return True
            return False
            
    def check_metric(self, metric):
        if self.metric == "val_loss":
            if metric < self.min_validation_loss:
                return True
            else:
                return False
        elif self.metric == "val_acc":
            if metric > self.max_validation_accuracy:
                return True
            else:
                return False
    
class NN_Classifier(nn.Module):
    def __init__(self, input_size, hidden_size, output_size):
        super(NN_Classifier, self).__init__()
        self.linear_relu_stack = nn.Sequential(
            nn.Dropout(p = 0.5),
            nn.Linear(input_size, hidden_size),
            nn.GELU(),
            nn.Dropout(p = 0.5),
            nn.Linear(hidden_size, output_size)
        )
    
    def forward(self, x):
        output = self.linear_relu_stack(x)
        return output

def ema_loss(cur_loss, prev_loss):
    loss = 0.9 * prev_loss + 0.1 * cur_loss
    return loss


def combine_function(x1, x2, function):
    if function == "diff":
        return (x1 - x2)
    elif function == "diff_square":
        x = (x1 - x2)
        return torch.sign(x) * x**2
    elif function == "exp_div":
        return torch.exp(x1)/torch.exp(x2) - torch.exp(x2)/torch.exp(x1)
    elif function == "exp_diff":
        return torch.exp(x1) - torch.exp(x2)
    elif function == "concat":
        return torch.concatenate([x1, x2], axis = -1)

#### Convert all test images into Transformer output to prevent repetition (Do this once only)

In [None]:
df_test = pd.read_csv(TEST_FILE_PATH)

# Define a transform to convert PIL 
# image to a Torch tensor
transform = transforms.Compose([
    transforms.Resize((112,112)),
    transforms.PILToTensor()
])

feature_extractor = ViTs_face(
        loss_type='CosFace',
        GPU_ID=device,
        num_class=93431,
        image_size=112,
        patch_size=8,
        ac_patch_size=12,
        pad=4,
        dim=512,
        depth=20,
        heads=8,
        mlp_dim=2048,
        dropout=0.1,
        emb_dropout=0.1
    ).to(device)

feature_extractor.load_state_dict(torch.load(f"{ROOT}/ViT-Face/vits_face.pth"))
feature_extractor.eval()

img1_features = torch.empty(size = [5310, 512]).to(device)
img2_features = torch.empty(size = [5310, 512]).to(device)

# prevent gradient calculation for faster extraction
with torch.no_grad():
    ### Creating Folders of Each Family and its Members just like the test-public-faces folder
    for index, row in tqdm(df_test.iterrows(), total = len(df_test), desc="Progress"):
        img1, img2 = row[0].split("-")

        img1 = transform(Image.open(ROOT + f"/test/{img1}").convert('RGB')).float().reshape(1,3, 112, 112).to(device) # Shape: (3, 224, 224) -> RGB (C x H x W)
        img2 = transform(Image.open(ROOT + f"/test/{img2}").convert('RGB')).float().reshape(1,3, 112, 112).to(device)

        # Extract Features
        img1_features[index] = feature_extractor(img1).flatten()
        img2_features[index] = feature_extractor(img2).flatten()

    np.save(ROOT + "/test-vit-face-features/img1_features.npy", img1_features.cpu())
    np.save(ROOT + "/test-vit-face-features/img2_features.npy", img2_features.cpu())

#### Train Neural Network Across Validation Sets and Functions

In [None]:
for val_set in ["V00", "V01", "V02", "V03", "V04", "V05", "V06", "V07", "V08"]:
    # Load Dataset
    _X_train, _y_train, _X_test, _y_test = load_data(val_set, "vit_face", 512)
    
    for function in ["diff", "diff_square", "exp_diff", "exp_div", "concat"]:
        X_train = combine_function(_X_train[:, 0, :], _X_train[:, 1, :], function)
        X_test = combine_function(_X_test[:, 0, :], _X_test[:, 1, :], function)

        X_train = X_train.to(device)
        y_train = _y_train.reshape(-1, 1).to(device)
        X_test = X_test.to(device)
        y_test = _y_test.reshape(-1, 1).to(device)

        model_name = "vit_face"

        print(f"X_Train Shape: {X_train.shape}")
        print(f"y_Train Shape: {y_train.shape}")
        print(f"X_Test Shape: {X_test.shape}")
        print(f"y_Test Shape: {y_test.shape}")

        print(f"y_train Class Distribution")
        print(f"Kinship Pairs: {torch.unique(y_train, return_counts = True)[1][1]}")
        print(f"Non-Kinship Pairs: {torch.unique(y_train, return_counts = True)[1][0]}")
        print()
        print(f"y_test Class Distribution")
        print(f"Kinship Pairs: {torch.unique(y_test, return_counts = True)[1][1]}")
        print(f"Non-Kinship Pairs: {torch.unique(y_test, return_counts = True)[1][0]}")

        batch_size = 256

        dataloader_train = DataLoader(TensorDataset(X_train, y_train.float()), 
                                      batch_size=batch_size,
                                      shuffle=True)

        dataloader_test =  DataLoader(TensorDataset(X_test, y_test.float()), 
                                      batch_size=batch_size,
                                      shuffle=True)

        if function == "concat":
            model = NN_Classifier(1024, 1024, 1).to(device)
        else:
            model = NN_Classifier(512, 512, 1).to(device)
        checkpoint_name = f"{model_name}_{function}_{val_set}"

        learning_rate = 1e-4
        no_of_epoch = 100

        optimizer = optim.Adam(model.parameters(), lr = learning_rate, weight_decay=3e-4)
        scheduler = LinearLR(optimizer, start_factor=1, end_factor = 0.2, total_iters=50)
        start_epoch = 0

        early_stopper = EarlyStopper_Checkpoint(patience = 5, save_path =  ROOT + f"/Model Checkpoints/ViT Face/Without V09/{checkpoint_name}_best.pt", metric = "val_acc")

        bce_loss_func = nn.BCEWithLogitsLoss(reduction='mean')

        for epoch in range(start_epoch, start_epoch + no_of_epoch):
            train_loss = 0
            correct = 0

            # Training 
            model.train()
            for train_data, train_label in dataloader_train:
                optimizer.zero_grad()
                output= model(train_data)
                loss = bce_loss_func(output, train_label)
                loss.backward()
                optimizer.step()
                train_loss = ema_loss(loss, train_loss)

                y_pred = nn.Sigmoid()(output.detach())

                correct += ((y_pred>0.5).float() == train_label).float().sum()

            train_accuracy = correct / len(dataloader_train.dataset) * 100
            checkpoint = {"epoch": epoch,
                          "model_state_dict": model.state_dict(),
                          'optimizer_state_dict': optimizer.state_dict(),
                          'scheduler_state_dict': scheduler.state_dict()}
            torch.save(checkpoint, f"./Model Checkpoints/ViT Face/Without V09/{checkpoint_name}.pt")

            print(f"Saving checkpoint to {checkpoint_name}.pt")
            correct = 0
            test_loss = 0
            # Validation
            model.eval()
            with torch.no_grad():
                for test_data, test_label in dataloader_test:
                    output= model(test_data)
                    loss = bce_loss_func(output, test_label)
                    y_pred = nn.Sigmoid()(output)
                    test_loss = ema_loss(loss, test_loss)
                    correct += ((y_pred>0.5).float() == test_label).float().sum()

                test_accuracy = correct / len(dataloader_test.dataset) * 100
                print(f"Epoch {epoch}: Train Loss: {train_loss: .5f}, Training Accuracy: {train_accuracy:.2f}%, Test Loss: {test_loss: .5f}, Testing Accuracy = {test_accuracy:.2f}%")

            scheduler.step()
            if early_stopper.check_metric(test_accuracy):
                checkpoint = {"epoch": epoch,
                              "test_loss": test_loss,
                              "test_acc": test_accuracy,
                              "model_state_dict": model.state_dict(),
                              'optimizer_state_dict': optimizer.state_dict(),
                              'scheduler_state_dict': scheduler.state_dict()}
                torch.save(checkpoint, f"./Model Checkpoints/ViT Face/Without V09/{checkpoint_name}_best.pt")
                print(f"Saving checkpoint to {checkpoint_name}_best.pt")


            if early_stopper.early_stop(test_accuracy):
                print("Stopped early due to no improvement in validation loss")
                break

        print(checkpoint_name)
        if function == "concat":
            model = NN_Classifier(1024, 1024, 1).to(device)
        else:
            model = NN_Classifier(512, 512, 1).to(device)
        model.load_state_dict(torch.load(ROOT + f"/Model Checkpoints/ViT Face/Without V09/{checkpoint_name}_best.pt")["model_state_dict"])
        model.eval()

        # Loading the test dataset
        df_test = pd.read_csv(TEST_FILE_PATH)

        with torch.no_grad():
            img1_features = torch.tensor(np.load(ROOT + "/test-vit-face-features/img1_features.npy")).to(device)
            img2_features = torch.tensor(np.load(ROOT + "/test-vit-face-features/img2_features.npy")).to(device)
            X = combine_function(img1_features, img2_features, function)
            output = model(X)
            y_pred = nn.Sigmoid()(output)
            df_test['is_related'] = y_pred.detach().cpu()
        
        # Print Distribution of Output
        print(f"0 - 0.1 : {(df_test['is_related']<=0.1).sum()}")
        print(f"0.1 - 0.2 : {(df_test['is_related']<=0.2).sum()}")
        print(f"0.2 - 0.3 : {(df_test['is_related']<=0.3).sum()}")

        print(f"0.3 - 0.4 : {(df_test['is_related']<=0.4).sum()}")
        print(f"0.4 - 0.5 : {(df_test['is_related']<0.5).sum()}")

        print(f"0.5 - 0.6 : {(df_test['is_related']>=0.5).sum()}")
        print(f"0.6 - 0.7 : {(df_test['is_related']>=0.6).sum()}")
        print(f"0.7 - 0.8 : {(df_test['is_related']>=0.7).sum()}")
        print(f"0.8 - 0.9 : {(df_test['is_related']>=0.8).sum()}")
        print(f"0.9 - 1 : {(df_test['is_related']>=0.9).sum()}")
        
        df_test.to_csv(ROOT + f"/Test Results/ViT-Face/Without V09/{checkpoint_name}.csv", index = False)
        model = None
        optimizer = None
        scheduler = None

### Step 3: Creating an Ensemble via Averaging

In [None]:
for function in ["diff", "diff_square", "exp_diff", "exp_div", "concat"]:
    df_test = pd.read_csv(TEST_FILE_PATH)
    for val_set in ["V00", "V01", "V02", "V03", "V04", "V05", "V06", "V07", "V08"]:
        df_test['is_related'] += pd.read_csv(ROOT + f"/Test Results/ViT-Face/Without V09/vit_face_{function}_{val_set}.csv")['is_related']/9
    
    df_test.to_csv(ROOT + f"/Test Results/ViT-Face/Without V09/vit_face_{function}_avg.csv", index = False)
        

In [None]:
df_test = pd.read_csv(TEST_FILE_PATH)
for function in ["diff", "diff_square", "exp_diff", "exp_div", "concat"]:
    df_test['is_related'] += pd.read_csv(ROOT + f"/Test Results/ViT-Face/Without V09/vit_face_{function}_avg.csv")['is_related']/5
df_test.to_csv(ROOT + f"/Test Results/ViT-Face/Without V09/vit_face_avg.csv", index = False)