In [1]:
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader, ConcatDataset
from torchvision import transforms, models
import torch.optim as optim
import os
import random
import cv2
import numpy as np
import glob
from PIL import Image


from deformable_conv import DeformableConv2d

In [2]:
'''
Makeup_Cosmetic - Makeup_Co_5
Makeup_Impersonation - Makeup_Im_1
Makeup_Obfuscation - Makeup_Ob_1
Mannequin - Mask_Mann_1
Mask_HalfMask - Mask_Half_1
Mask_PaperMask - Mask_Paper_1
Mask_TransparentMask - Mask_Trans_51
Paper - Paper_1
Partial_Eye - Partial_Eye_3
Partial_FunnyeyeGlasses - Partial_Funnyeye_1
Partial_Mouth - Partial_Mouth_1
Partial_PaperGlasses - Partial_Paperglass_1
Replay - Replay_2
Silicone - Mask_Silicone_4
'''

attack_to_cls = {
    'Makeup_Co': 1,
    'Makeup_Im': 2,
    'Makeup_Ob': 3,
    'Mask_Mann': 4,
    'Mask_Half': 5,
    'Mask_Paper': 6,
    'Mask_Trans': 7,
    'Paper': 8,
    'Partial_Eye': 9,
    'Partial_Funnyeye': 10,
    'Partial_Mouth': 11,
    'Partial_Paperglass': 12,
    'Replay': 13,
    'Mask_Silicone': 14
}

In [3]:
class SiWMv2Dataset(Dataset):
    def __init__(self, live_path, spoof_path, live_ref, spoof_ref):
        self.transform = transforms.Compose([transforms.Resize((224, 224)), transforms.ToTensor(),transforms.Normalize(mean=[0.5], std=[0.5])])
        self.live_path = live_path
        self.spoof_path = spoof_path
        self.images = []
        self.labels = []
        self.sub_levels = []
        self.real = 0
        self.attack = 1
        
        with open(live_ref, 'r') as f:
            content = f.read()
        live_dirs = content.split('\n')
        
        with open(spoof_ref, 'r') as f:
            content = f.read()
        spoof_dirs = content.split('\n')
        
        for ldir in live_dirs:
            if ldir != '' and ldir != ' ':
                ldir_path = os.path.join(self.live_path, ldir)
                if os.path.exists(ldir_path):
                    lcontent = os.listdir(ldir_path)
                    lcontent = [os.path.join(ldir_path, f) for f in lcontent if f.endswith('png')]
                    self.images.extend(lcontent)
                    self.labels.extend([self.real] * len(lcontent))
                    self.sub_levels.extend([self.real] * len(lcontent))
                else:
                    print(ldir_path)
        
        for sdir in spoof_dirs:
            if sdir != '' and sdir != ' ':
                sdir_path = os.path.join(self.spoof_path, sdir)
                scontent = os.listdir(sdir_path)
                scontent = [os.path.join(sdir_path, f) for f in scontent if f.endswith('png')]
                self.images.extend(scontent)
                self.labels.extend([self.attack] * len(scontent))
                attacklabel = attack_to_cls['_'.join(os.path.basename(sdir).split('_')[:-1])]
                self.sub_levels.extend([attacklabel] * len(scontent))
        
        self.data_len = len(self.images)
        

    def __len__(self): # returns the total number of samples in the dataset
        return self.data_len

    def __getitem__(self, idx): # loads and returns a sample from the dataset at the given index
        imgpath = self.images[idx]
        img = Image.open(imgpath).convert('RGB')
        img = self.transform(img)
        return img, self.labels[idx], self.sub_levels[idx]

In [4]:
device = torch.device(0)

class_label_real = 0
class_label_attack = 1

real_dir = r'/data/ssd2/shakeel-workspace/DOWNLOAD/SiW-Mv2_preprocessed/Live/'
spoof_dir = r'/data/ssd2/shakeel-workspace/DOWNLOAD/SiW-Mv2_preprocessed/Spoof_root/'

# Protocol 1

train_spoof_ref = r'/data/ssd2/shakeel-workspace/DOWNLOAD/SiW-Mv2_preprocessed/trainlist_all.txt'
train_live_ref = r'/data/ssd2/shakeel-workspace/DOWNLOAD/SiW-Mv2_preprocessed/trainlist_live.txt'
test_spoof_ref = r'/data/ssd2/shakeel-workspace/DOWNLOAD/SiW-Mv2_preprocessed/testlist_all.txt'
test_live_ref =r'/data/ssd2/shakeel-workspace/DOWNLOAD/SiW-Mv2_preprocessed/testlist_live.txt'

# train_spoof_ref = r'protocol_1/train_spoof.txt'
# train_live_ref = r'protocol_1/train_live.txt'
# val_spoof_ref = r'protocol_1/val_spoof.txt'
# val_live_ref = r'protocol_1/val_live.txt'
# test_spoof_ref = r'/data/ssd2/shakeel-workspace/DOWNLOAD/SiW-Mv2_preprocessed/testlist_all.txt'
# test_live_ref =r'/data/ssd2/shakeel-workspace/DOWNLOAD/SiW-Mv2_preprocessed/testlist_live.txt'

# train_dataset = SiWMv2Dataset(real_dir, spoof_dir, train_live_ref, train_spoof_ref)
# val_dataset = SiWMv2Dataset(real_dir, spoof_dir, val_live_ref, val_spoof_ref)
test_dataset = SiWMv2Dataset(real_dir, spoof_dir, test_live_ref, test_spoof_ref)

# train_loader = DataLoader(train_dataset, batch_size  = 64, shuffle = True, pin_memory = True, num_workers = 8)
# val_loader = DataLoader(val_dataset, batch_size  = 64, shuffle = True, pin_memory = True, num_workers = 8)
test_loader = DataLoader(test_dataset, batch_size  = 64, shuffle = True, pin_memory = True, num_workers = 8)

# Print dataset sizes
print(f"Training set size: {len(test_dataset)}")
# print(f"Training set size: {len(val_dataset)}")

/data/ssd2/shakeel-workspace/DOWNLOAD/SiW-Mv2_preprocessed/Live/Live_893
/data/ssd2/shakeel-workspace/DOWNLOAD/SiW-Mv2_preprocessed/Live/Live_896
/data/ssd2/shakeel-workspace/DOWNLOAD/SiW-Mv2_preprocessed/Live/Live_899
/data/ssd2/shakeel-workspace/DOWNLOAD/SiW-Mv2_preprocessed/Live/Live_900
Training set size: 110019


In [5]:
# Load pre-trained ResNet18
# model = models.resnet18(pretrained=True)
# num_ftrs = model.fc.in_features
# model.fc = nn.Linear(num_ftrs, 2)

# Load pre-trained MobileNetV2
model = models.mobilenet_v2(pretrained=True)
# model.features[0] = DeformableConv2d(3, 32, 3, 2, 1)
model.features[-1] = nn.Sequential(
    DeformableConv2d(320, 1280, 3, 2),
    nn.BatchNorm2d(1280),
    nn.ReLU6()
)
model.classifier[1] = nn.Linear(in_features=1280, out_features=2) #default in_features =1280, out_features = 1000

# best_model_path = r'checkpoint_protocol_1_wo_val/best_95_0.000376204145941474.pth'
# best_model_path = r'checkpoint_protocol_1_wo_val/best_105_0.00012885049953524681.pth'
# best_model_path = r'checkpoint_protocol_1_wo_val/best_126_0.00012175165403143548.pth'
best_model_path = r'checkpoint_protocol_1_wo_val/best_142_4.0373697962606216e-07.pth'  #3.009
# best_model_path = r'checkpoint_protocol_1_wo_val_cont/best_2_0.0005171791127407008.pth'
# best_model_path = r'checkpoint_protocol_1_wo_val/best_32_0.0007601213937333313.pth'
# best_model_path = r'checkpoint_protocol_1_wo_val/best_135_0.00016639192170754796.pth'
# best_model_path = r'checkpoint_protocol_1_wo_val_cont/best_127_2.6473109944895285e-05.pth'
# best_model_path = r'checkpoint_protocol_1_wo_val_cont/best_188_5.549576379481903e-05.pth'
state_dict = torch.load(best_model_path,  map_location=device)
model.load_state_dict(state_dict, strict = True)
model = model.to(device)
print(model)



MobileNetV2(
  (features): Sequential(
    (0): Conv2dNormActivation(
      (0): Conv2d(3, 32, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1), bias=False)
      (1): BatchNorm2d(32, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (2): ReLU6(inplace=True)
    )
    (1): InvertedResidual(
      (conv): Sequential(
        (0): Conv2dNormActivation(
          (0): Conv2d(32, 32, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), groups=32, bias=False)
          (1): BatchNorm2d(32, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
          (2): ReLU6(inplace=True)
        )
        (1): Conv2d(32, 16, kernel_size=(1, 1), stride=(1, 1), bias=False)
        (2): BatchNorm2d(16, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      )
    )
    (2): InvertedResidual(
      (conv): Sequential(
        (0): Conv2dNormActivation(
          (0): Conv2d(16, 96, kernel_size=(1, 1), stride=(1, 1), bias=False)
          (1): BatchNorm2d(96, eps=

In [6]:
# Evaluate on the test set
test_correct = 0
test_total = 0

model.eval()
with torch.no_grad():
    
    test_cat_labels = torch.empty(0, dtype=torch.int64, device=device)
    test_cat_labels_sub = torch.empty(0, dtype=torch.int64, device = device)
    test_predicted_cat_labels = torch.empty(0, dtype=torch.int64, device=device)

    for test_images, test_labels, test_labels_sub in test_loader:
        test_images, test_labels = test_images.to(device), test_labels.to(device)
        test_labels_sub = test_labels_sub.to(device)
        test_model_op = model(test_images)
        _, test_predicted = torch.max(test_model_op, 1)
        test_correct += (test_predicted == test_labels).sum().item() 
        test_total += test_labels.size(0)

        test_cat_labels = torch.cat((test_cat_labels, test_labels))
        test_cat_labels_sub = torch.cat((test_cat_labels_sub, test_labels_sub))
        test_predicted_cat_labels = torch.cat((test_predicted_cat_labels, test_predicted))

    test_accuracy = test_correct / test_total * 100  
    print(f'Test Accuracy: {test_accuracy:.2f}%')

Test Accuracy: 95.16%


In [7]:
test_cat_labels_cpu = test_cat_labels.cpu()
test_cat_labels_sub = test_cat_labels_sub.cpu()
test_predicted_cat_labels_cpu = test_predicted_cat_labels.cpu()

In [8]:
from sklearn.metrics import confusion_matrix, accuracy_score, precision_score, recall_score, f1_score, balanced_accuracy_score

In [9]:
tn, fp, fn, tp = confusion_matrix(test_cat_labels_cpu, test_predicted_cat_labels_cpu).ravel()

print(f'TN: {tn}, FP: {fp}, FN: {fn}, TP: {tp}')

acc_score = accuracy_score(test_cat_labels_cpu, test_predicted_cat_labels_cpu)
prec_score = precision_score(test_cat_labels_cpu, test_predicted_cat_labels_cpu)
recall = recall_score(test_cat_labels_cpu, test_predicted_cat_labels_cpu)

Y_I_val =(tp/(tp+fn)) + (tn/(tn+fp)) - 1
sensitivity_val = tp / (tp + fn)
specificity_val = tn / (tn + fp)
f1score_val = 2 * tp / (2 * tp + fp + fn)
FAR = fp/(fp + tn)
FRR = fn/(fn + tp)
HTER_val = (FAR + FRR)/2
EER = (fp+fn)/(tn+fp+fn+tp)
val_bacc = balanced_accuracy_score(test_cat_labels_cpu, test_predicted_cat_labels_cpu)


print('Testing Results')
print(30*'-')
print('Acc:', acc_score, '\nSen:', sensitivity_val, '\nSpec:', specificity_val, '\nYI:', Y_I_val, '\nF1:', f1score_val, '\nPrec:', prec_score, '\nRecall:', recall, '\nHTER:', HTER_val, '\nEER:', EER, '\nBACC:', val_bacc)

TN: 55137, FP: 575, FN: 4749, TP: 49558
Testing Results
------------------------------
Acc: 0.9516083585562494 
Sen: 0.9125527095954481 
Spec: 0.9896790637564618 
YI: 0.9022317733519098 
F1: 0.949023362696285 
Prec: 0.9885305088464684 
Recall: 0.9125527095954481 
HTER: 0.04888411332404505 
EER: 0.048391641443750626 
BACC: 0.9511158866759549


In [10]:
test_predicted_cat_labels_cpu

tensor([0, 0, 0,  ..., 1, 0, 0])

In [11]:
class_label_real = 0
class_label_attack = 1

In [12]:
ayo = test_cat_labels_sub + 1

In [13]:
ayo[1000:2000]

tensor([ 1, 15,  1,  6,  9, 14,  1,  6,  1,  1,  9,  1, 13, 11,  1,  9,  1, 13,
         1, 15,  1,  9, 14,  1, 12,  9,  1,  1,  1, 14,  1, 15, 11,  1, 14,  1,
         2,  1,  1, 10,  3,  1,  1,  1,  8, 12,  1,  9,  1,  1,  3, 14,  9, 15,
         1, 11,  1,  1, 11,  1, 11,  1,  8,  3,  2, 11,  3, 11, 12,  8,  1,  1,
        14,  9,  1,  1,  2,  1, 14,  4, 14,  1,  1,  8,  1, 13,  8,  1,  6,  8,
         9, 11,  1,  1,  1,  1,  7,  1,  1,  1,  1, 10, 12,  9,  9,  6, 11,  1,
         1, 14,  1,  1, 10,  1,  8,  1,  1,  1,  9,  9,  1,  2,  1,  1,  1,  1,
         1,  1,  3,  1, 11,  1,  1,  1,  2, 13,  9,  3,  1,  2,  4,  1, 14,  1,
        13,  1,  1,  1,  3,  1,  1,  1,  1,  1,  1,  9, 11,  1,  2,  1,  1,  1,
         1, 11,  1,  9,  1,  1, 14, 11,  1,  1,  2,  3, 11,  1,  1, 11,  1,  1,
         1, 11,  3, 11, 10,  1,  1,  1, 13, 12,  1,  1,  1, 11,  1,  3,  1,  1,
         1, 14,  1,  1,  9, 12,  1,  3,  9,  1, 10,  5,  1, 11,  2,  1, 12,  8,
         1,  1,  3, 11,  1,  8,  1, 12, 

In [14]:
from metrics import calculate_metrics

# y_attack_types = np.load(r'normal_labels2.npy')
# pred = np.load(r'normal_predictions.npy')

pred_y = np.where(test_predicted_cat_labels_cpu == 1, 0, 1)
# rreal = np.where(test_cat_labels_cpu == 1, 0, 1)




apcer, bpcer, acer = calculate_metrics(ayo, pred_y)


print('APCER: ' + str(apcer))
print('BPCER: ' + str(bpcer))
print('ACER: ' + str(acer))

APCER: 0.049848526576700636
BPCER: 0.010320936243538196
ACER: 0.030084731410119414


In [15]:

def confusion_matrix(y_true, y_pred):
    '''
    calculates the confusion matrix

    parameters:
        y_true (list): A list with the true values as in,
                                0 - negative
                                1 - positive

        y_pred (list): A list with the predictions as in,
                                0 - negative
                                1 - positive
    returns:
        cm (list): A list with the values for TN, FP, FN, TP (true negatives, false positives, false negatives and false positives)
    '''
    cm = [0, 0, 0, 0] # tn, fp, fn, tp
    for y, y_hat in zip(y_true, y_pred):
        if(y == 0): # false
            if(y_hat == 0):
                cm[0] += 1 # tn
            else:
                cm[1] += 1 # fp
        elif(y == 1): # true
            if(y_hat == 0):
                cm[2] += 1 # fn
            else:
                cm[3] += 1 # tp
                
    return cm


def calculate_metrics(y_attack_types, y_pred, threshold=0.5):
    '''
    calculates the metrics APCER, BPCER and ACER as defined for the OULU dataset.

    parameters:
        y_attack_types (list): A list with the labels for the samples as in,
                                1 - live samples
                                2 - print attack 1
                                3 - print attack 2
                                4 - display attack 1
                                5 - display attack 2

        y_pred_display (list): A list with the predictions as in,
                                0 - attack
                                1 - live

                                or

                                [0-1] float binarized if threshold bellow

        pred_threshold (int) [0-1]: used to binarize predictions

    returns:
        apcer (float) [0-1] attack presentation classification error rate
        bpcer (float) [0-1] bona fide presentation classification error rate
        acer (float) [0-1] average classification error rate
        apcer_mean (float): mean of APCER values across print and display attacks
        apcer_variance (float): variance of APCER values across print and display attacks
        bpcer_mean (float): mean of BPCER values across print and display attacks
        bpcer_variance (float): variance of BPCER values across print and display attacks
        acer_mean (float): mean of ACER values across print and display attacks
        acer_variance (float): variance of ACER values across print and display attacks
    '''

    y_pred = [(0 if y >= threshold else 1) for y in y_pred] # binarize y_pred using threshold
    y_true = [(0 if y == 1 else 1) for y in y_attack_types] # get y_true from the y_attack_types

    # from now on 1=attack and 0=live

    print_attack_indices = [index for index, element in enumerate(y_attack_types) if element in [1,2,3]]
    display_attack_indices = [index for index, element in enumerate(y_attack_types) if element in [1,4,5]]

    y_true_print_attack = [y_true[i] for i in print_attack_indices]
    y_pred_print_attack = [y_pred[i] for i in print_attack_indices]
    y_true_display_attack = [y_true[i] for i in display_attack_indices]
    y_pred_display_attack = [y_pred[i] for i in display_attack_indices]

    # tn, fp, fn, tp
    cm_print_attack = confusion_matrix(y_true_print_attack, y_pred_print_attack) # confusion matrix for print attack
    cm_display_attack = confusion_matrix(y_true_display_attack, y_pred_display_attack) # confusion matrix for display attack

    try:
        apcer_print = cm_print_attack[2] / (cm_print_attack[3] + cm_print_attack[2]) # fn / (tp + fn)
    except:
        apcer_print = 0.0

    try:
        apcer_display = cm_display_attack[2] / (cm_display_attack[3] + cm_display_attack[2]) # fn / (tp + fn)
    except:
        apcer_display = 0.0

    try:
        bpcer_print = cm_print_attack[1] / (cm_print_attack[1] + cm_print_attack[0]) # fp/(fp + tn)
    except:
        bpcer_print = 0.0

    try:
        bpcer_display = cm_display_attack[1] / (cm_display_attack[1] + cm_display_attack[0]) # fp/(fp + tn)
    except:
        bpcer_display = 0.0

    apcer = max(apcer_print, apcer_display) # max of both apcer (print, display) attack presentation classification error rate
    bpcer = max(bpcer_print, bpcer_display) # max of both bpcer (print, display) bona fide presentation classification error rate

    acer = (apcer + bpcer) / 2 # average between apcer and bpcer

    apcer_values = [apcer_print, apcer_display]
    bpcer_values = [bpcer_print, bpcer_display]
    acer_values = [(apcer_print + bpcer_print) / 2, (apcer_display + bpcer_display) / 2]

    apcer_mean = sum(apcer_values) / len(apcer_values)
    apcer_variance = sum((x - apcer_mean) ** 2 for x in apcer_values) / len(apcer_values)

    bpcer_mean = sum(bpcer_values) / len(bpcer_values)
    bpcer_variance = sum((x - bpcer_mean) ** 2 for x in bpcer_values) / len(bpcer_values)

    acer_mean = sum(acer_values) / len(acer_values)
    acer_variance = sum((x - acer_mean) ** 2 for x in acer_values) / len(acer_values)

    return apcer, bpcer, acer, apcer_mean, apcer_variance, bpcer_mean, bpcer_variance, acer_mean, acer_variance


In [16]:
pred_y = np.where(test_predicted_cat_labels_cpu == 1, 0, 1)
# rreal = np.where(test_cat_labels_cpu == 1, 0, 1)




apcer, bpcer, acer, apcer_mean, apcer_variance, bpcer_mean, bpcer_variance, acer_mean, acer_variance = calculate_metrics(ayo, pred_y)


In [17]:
print('APCER: ' + str(apcer))
print('APCER mean: ' + str(apcer_mean))
print('APCER variance: ' + str(apcer_variance))


print('BPCER: ' + str(bpcer))
print('BPCER mean: ' + str(bpcer_mean))
print('BPCER variance: ' + str(bpcer_variance))

print('ACER: ' + str(acer))
print('ACER mean: ' + str(acer_mean))
print('ACER variance: ' + str(acer_variance))

APCER: 0.049848526576700636
APCER mean: 0.04420294403339718
APCER variance: 3.187260225325274e-05
BPCER: 0.010320936243538196
BPCER mean: 0.010320936243538196
BPCER variance: 0.0
ACER: 0.030084731410119414
ACER mean: 0.027261940138467687
ACER variance: 7.968150563313174e-06
