In [11]:
import insightface
from insightface.app.common import Face
from insightface.model_zoo import model_zoo
from neural_nets import MMFace, MMFaceClassifier, insightface_model, InsightFaceClassifier, IntermediateFusionClassifier
from utils import load_model
import numpy as np
# REQUIRED FOR CUDA TO BE USED
import torch

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
num_subjects = 21

det_model = model_zoo.get_model("../models/buffalo_l/det_10g.onnx")
det_model.prepare(ctx_id=0, input_size=(480, 640), det_thres=0.5)
rec_model = model_zoo.get_model("../models/buffalo_l/w600k_r50.onnx")

mmface_model = MMFace(2).to(device)
mmface_model.load_state_dict(torch.load(f"models/mmFace-liveness-99.6.pt")["model_state_dict"])
mmface_model.eval()
# TODO: MAKE READY FOR FEATURE EXTRACTION LAST LAYER

insightface_classifier = InsightFaceClassifier(num_subjects).to(device)
insightface_classifier.load_state_dict(torch.load("models/insightface_classifier.pt")["model_state_dict"])
insightface_classifier.eval()

# TODO: TRAIN (ONCE PAPER DATASET COLLECTED)
# insightface_classifier_liveness = InsightFaceClassifier(2).to(device)
# insightface_classifier_liveness.eval()
#
# mmface_classifier_liveness = MMFaceClassifier_Liveness().to(device)
# mmface_classifier_liveness.load_state_dict(torch.load("models/mmFace-21_classifier.pt")["model_state_dict"])
# mmface_classifier_liveness.eval()

# TODO: NEED DATASET FOR THIS
intermediate_fusion_clf = IntermediateFusionClassifier(512*2, num_subjects).to(device)

# TODO: LOOP THROUGH DATASET IN ORDER AND RECOGNISE AND CLASSIFY FOR EVALUATION (LOAD DATA FOR EACH SUBJECT)
def recognise(rgb_input, radar_input):
    with torch.no_grad():
        rgb_emb = insightface_model(rgb_input[..., ::-1], det_model, Face, rec_model)
        # TODO: EXTRACT EMBEDDING FROM MODEL.FC2
        radar_emb = mmface_model(radar_input)
    
    return rgb_emb, radar_emb

def classify_no_radar(rgb_emb):
    with torch.no_grad():
        subject = insightface_classifier(rgb_emb)
        liveness = insightface_classifier_liveness(rgb_emb)

    return subject, liveness

def classify(rgb_emb, radar_emb):
    with torch.no_grad():
        subject = insightface_classifier(rgb_emb)
        liveness = mmface_classifier_liveness(radar_emb)
    
    return subject, liveness

Applied providers: ['CUDAExecutionProvider', 'CPUExecutionProvider'], with options: {'CUDAExecutionProvider': {'do_copy_in_default_stream': '1', 'cudnn_conv_algo_search': 'EXHAUSTIVE', 'device_id': '0', 'gpu_external_alloc': '0', 'enable_cuda_graph': '0', 'gpu_mem_limit': '18446744073709551615', 'gpu_external_free': '0', 'gpu_external_empty_cache': '0', 'arena_extend_strategy': 'kNextPowerOfTwo', 'cudnn_conv_use_max_workspace': '1', 'cudnn_conv1d_pad_to_nc1d': '0', 'tunable_op_enable': '0', 'tunable_op_tuning_enable': '0', 'enable_skip_layer_norm_strict_mode': '0'}, 'CPUExecutionProvider': {}}
Applied providers: ['CUDAExecutionProvider', 'CPUExecutionProvider'], with options: {'CUDAExecutionProvider': {'do_copy_in_default_stream': '1', 'cudnn_conv_algo_search': 'EXHAUSTIVE', 'device_id': '0', 'gpu_external_alloc': '0', 'enable_cuda_graph': '0', 'gpu_mem_limit': '18446744073709551615', 'gpu_external_free': '0', 'gpu_external_empty_cache': '0', 'arena_extend_strategy': 'kNextPowerOfTwo',

In [23]:
import matplotlib.pyplot as plt

DATA_PATH = "../../Soli/soli_realsense/data"

def load_rgb(subject, experiment):
    return np.load(f"{DATA_PATH}/{subject}/{subject}-{experiment}_colour.npy").astype(np.float32)

def recognise_rgb(rgb_input):
    return insightface_model(rgb_input[..., ::-1], det_model, Face, rec_model)

fake_0 = recognise_rgb(load_rgb(90, 0)[0])
fake_0_1 = recognise_rgb(load_rgb(90, 5)[0])
real_0 = recognise_rgb(load_rgb(0, 0)[0])
real_0_1 = recognise_rgb(load_rgb(0, 1)[0])
real_1 = recognise_rgb(load_rgb(1, 0)[0])

In [27]:
score_good = np.clip(np.dot(fake_0_1, fake_0.T), 0., 1.)
score_bad = np.clip(np.dot(fake_0, real_1.T), 0., 1.)

print(score_good, score_bad)

0.3785049319267273 0.0


## mmFace Feature Extraction

### mmFace Dataset Load

In [None]:
from dataset_builder import normalise, load_dataset_DL, load_dataset
from torchvision.transforms import Compose, ToTensor
import os
import numpy as np

subjects = range(21)
num_subjects = len(subjects)
experiments = list(range(15))
num_frames = 250

torch.cuda.empty_cache()
train, validation, test = load_dataset(os.path.relpath("../../Soli/soli_realsense/data"), subjects, experiments=experiments, num_frames=num_frames, batch_size=128)

In [None]:
import torch
import torch.nn as nn
from tqdm import tqdm
from neural_nets import MMFaceFE, MMFaceClassifier
from utils import load_model, load_history

num_epochs = 1
learning_rate = 0.01

model = MMFaceFE().to(device)

# Loss + Optimiser
criterion = nn.CrossEntropyLoss()
optimiser = torch.optim.SGD(model.parameters(), lr=learning_rate, weight_decay=0.001, momentum=0.9)

model_name = f"mmFaceFE-{num_subjects}.pt"
cur_epoch, loss_history, train_acc, val_acc = load_model(model_name, model, optimiser)

if len(loss_history) > 0:
    print(f"{model_name}\n\tEpoch: {cur_epoch}\n\tLoss: {loss_history[-1]:.4f}\n\tTrain Accuracy: {train_acc[-1]:.4f}\n\tValidation Accuracy: {val_acc[-1]:.4f}")

### Training

In [None]:
for epoch in range(cur_epoch, num_epochs):
    print(f"\nEpoch [{epoch}/{num_epochs-1}]:")
    if os.path.exists(f"models/{model_name}"):
        loss_history, train_acc, val_acc = load_history(f"models/{model_name}")

    model.train()
    # Running Loss and Accuracy
    running_loss, running_acc, total = 0., 0., 0.

    for data, labels in tqdm(train):
        # Forward Pass
        outputs = model(data)
        print(outputs)
        _, predicted = torch.max(outputs.data, 1)
        print(predicted)
        loss = criterion(outputs, labels)

        # Backward Pass and Optimise
        optimiser.zero_grad()
        loss.backward()
        optimiser.step()

        # running_loss += loss.item()
        # total += labels.size(0)
        # running_acc += (predicted == labels).sum().item()

        del data, labels, outputs
        torch.cuda.empty_cache()
    
    avg_train_loss = running_loss/len(train)
    # avg_train_acc = 100*running_acc/total
    print(f"\tAverage Train Loss: {avg_train_loss:.4f}")
    # print(f"\tTrain Accuracy: {avg_train_acc:.4f}%")

    torch.save({"epoch": epoch+1,
                "model_state_dict": model.state_dict(),
                "optimiser_state_dict": optimiser.state_dict(),
                "loss_history": loss_history + [avg_train_loss],
                "train_acc": train_acc,
                "val_acc": val_acc},
                f"models/{model_name}")
    
    # # Validation
    # model.eval()
    # with torch.no_grad():
    #     correct = 0
    #     total = 0
    #     for data, labels in validation:
    #         outputs = model(data)
    #         _, predicted = torch.max(outputs.data, 1)
    #         total += labels.size(0)
    #         correct += (predicted == labels).sum().item()
    #         del data, labels, outputs
        
    #     avg_val_acc = 100*correct/total
    #     print(f"\tValidation Accuracy: {avg_val_acc:.4f}%")

    # model_checkpoint = torch.load(f"models/{model_name}")
    # model_checkpoint["val_acc"].append(avg_val_acc)
    # torch.save(model_checkpoint, f"models/{model_name}")

    # # Stop if overfitting
    # if avg_train_acc - avg_val_acc > 5:
    #     break

In [None]:
# TODO: SAVE EMBEDDINGS

## mmFace Classification

### mmFace Embeddings Load

In [None]:
# TODO: LOAD EMBEDDINGS

In [None]:
classifier = MMFaceClassifier(num_subjects).to(device)
criterion_clf = nn.CrossEntropyLoss()
optimiser_clf = torch.optim.SGD(classifier.parameters(), lr=learning_rate, weight_decay=0.001, momentum=0.9)

classifier_name = f"mmFaceClassifier-{num_subjects}.pt"
cur_epoch, loss_history, train_acc, val_acc = load_model(classifier_name, classifier, optimiser_clf)

if len(loss_history) > 0:
    print(f"{model_name}\n\tEpoch: {cur_epoch}\n\tLoss: {loss_history[-1]:.4f}\n\tTrain Accuracy: {train_acc[-1]:.4f}\n\tValidation Accuracy: {val_acc[-1]:.4f}")

### Training

In [None]:
for epoch in range(cur_epoch, num_epochs):
    print(f"\nEpoch [{epoch}/{num_epochs-1}]:")
    if os.path.exists(f"models/{classifier_name}"):
        loss_history, train_acc, val_acc = load_history(f"models/{classifier_name}")

    classifier.train()
    # Running Loss and Accuracy
    running_loss, running_acc, total = 0., 0., 0.

    for data, labels in tqdm(train):
        # Forward Pass
        outputs = classifier(data)
        _, predicted = torch.max(outputs.data, 1)
        loss = criterion_clf(outputs, labels)

        # Backward Pass and Optimise
        optimiser_clf.zero_grad()
        loss.backward()
        optimiser_clf.step()

        running_loss += loss.item()
        total += labels.size(0)
        running_acc += (predicted == labels).sum().item()

        del data, labels, outputs
        torch.cuda.empty_cache()
    
    avg_train_loss = running_loss/len(train)
    avg_train_acc = 100*running_acc/total
    print(f"\tAverage Train Loss: {avg_train_loss:.4f}")
    print(f"\tTrain Accuracy: {avg_train_acc:.4f}%")

    torch.save({"epoch": epoch+1,
                "model_state_dict": classifier.state_dict(),
                "optimiser_state_dict": optimiser_clf.state_dict(),
                "loss_history": loss_history + [avg_train_loss],
                "train_acc": train_acc + [avg_train_acc],
                "val_acc": val_acc},
                f"models/{classifier_name}")
    
    # Validation
    classifier.eval()
    with torch.no_grad():
        correct = 0
        total = 0
        for data, labels in validation:
            outputs = classifier(data)
            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()
            del data, labels, outputs
        
        avg_val_acc = 100*correct/total
        print(f"\tValidation Accuracy: {avg_val_acc:.4f}%")

    classifier_checkpoint = torch.load(f"models/{classifier_name}")
    classifier_checkpoint["val_acc"].append(avg_val_acc)
    torch.save(classifier_checkpoint, f"models/{classifier_name}")

    # Stop if overfitting
    if avg_train_acc - avg_val_acc > 5:
        break