# Import

In [None]:
!git clone https://github.com/Edouard99/Idemia.git

In [None]:
import torchvision.transforms as transforms
import torchvision.datasets as dset
import torch
import torch.nn
import numpy as np 
import struct
import matplotlib.pyplot as plt
from torch.utils.data import Dataset
from Idemia.facenet_model import *
from Idemia.utils import *
import torch.optim as optim
import tqdm
import os
from google.colab import drive
from google.colab import files
drive.mount("/content/gdrive", force_remount=True)

In [3]:
!cp /content/gdrive/MyDrive/Idemia/db_train.raw /content/
!cp /content/gdrive/MyDrive/Idemia/db_val.raw /content/
!cp /content/gdrive/MyDrive/Idemia/label_train.txt /content/
!mkdir /content/results/

In [4]:
with open('./db_train.raw', 'rb') as f:
    data = np.fromfile(f, dtype=np.dtype(np.uint8))
    data = data.reshape((111430,56, 56, 3))
with open('./label_train.txt', 'rb') as f:
    label=f.read().splitlines()
    for k,elem in enumerate(label):
        label[k]=int(elem)
    label=np.array(label)

In [6]:
data_0=data[np.where(label==0)[0]]
data_1=data[np.where(label==1)[0]]
data_0_t=data_0[:int(len(data_0)*0.7)]
data_1_t=data_1[:int(len(data_1)*0.7)]
data_0_v=data_0[int(len(data_0)*0.7):]
data_1_v=data_1[int(len(data_1)*0.7):]
train_ds=dataset(data_0_t,data_1_t)
val_ds=dataset(data_0_v,data_1_v)
dataloader_t = torch.utils.data.DataLoader(train_ds,batch_size=32,shuffle=True,drop_last=False)
dataloader_v = torch.utils.data.DataLoader(val_ds,batch_size=32,shuffle=True,drop_last=False)

# Model

In [14]:
device= torch.device("cuda:0")

In [15]:
facenet=FaceNet(0.2,0.7,True).to(device)

In [16]:
optimizer = optim.Adam(facenet.parameters(), lr=3e-4, betas=(0.9, 0.999))
scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=4, gamma=0.5,verbose=True)
alpha1=0.2
alpha2=0.2
num_epochs=20
threshold=0.5

In [None]:
training(num_epochs,facenet,optimizer,scheduler,dataloader_t,dataloader_v,device,alpha1,alpha2,threshold,"/content/results/")

# Data visu & function

In [None]:
class dataset(Dataset):
    def __init__(self, data_0, data_1):
        self.samples = []
        self.transform=transforms.Compose([  transforms.RandomHorizontalFlip(p=0.5),
                                transforms.RandomAffine(20,(0.12,0.12),(0.8,1.2),interpolation=transforms.InterpolationMode.NEAREST,fill=0),
                                transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))
                                ])
        for img in data_0:
            img=transforms.ToTensor()(img)
            self.samples.append((img,0))
        for img in data_1:
            img=transforms.ToTensor()(img)
            self.samples.append((img,1))

    def __len__(self):
        return len(self.samples)

    def __getitem__(self, id):
        (img,label)=self.samples[id]
        img=self.transform(img)
        return (img,label)

In [17]:
def lossfunction(x,y):
    num_0=torch.where(y==0)[0].shape[0]
    num_1=x.shape[0]-num_0
    w0=0.
    w1=0.
    if num_0!=0:
        w0=torch.divide(x.shape[0],(2*num_0),).float()
    if num_1!=0:
        w1=torch.divide(x.shape[0],(2*num_1),).float()
    w=torch.ones_like(x).float()
    w[torch.where(y==0)[0]]=w0
    w[torch.where(y==1)[0]]=w1
    return torch.nn.BCELoss(w)(x.float(),y.float())

In [27]:
def training(num_epochs,facenet,optimizer,scheduler,dataloader_t,dataloader_v,device,alpha1,alpha2,threshold,path):

    Loss_train_train=[]
    Loss_train_eval=[]
    Loss_val_eval=[]
    for epoch in tqdm.tqdm(range(num_epochs)):
        L_t_t=[]
        L_t_e=[]
        L_v_e=[]
        tp_t,fn_t,tn_t,fp_t=0,0,0,0
        tp_v,fn_v,tn_v,fp_v=0,0,0,0

        facenet.train()
        for i, dataj in (enumerate(dataloader_t, 0)):
            facenet.zero_grad()
            x=dataj[0].float().to(device)
            gt=dataj[1].float().to(device)
            y,aux1,aux2=facenet(x)
            loss=lossfunction(y.view(-1),gt)
            loss_aux1=lossfunction(aux1.view(-1),gt)
            loss_aux2=lossfunction(aux2.view(-1),gt)
            total_loss=(loss+alpha1*loss_aux1+alpha2*loss_aux2)/(1+alpha1+alpha2)
            total_loss.backward()
            optimizer.step()
            L_t_t.append([loss.item(),loss_aux1.item(),loss_aux2.item(),total_loss.item()])
            
        facenet.eval()
        for i, dataj in enumerate(dataloader_t, 0):
            x=dataj[0].float().to(device)
            yh=dataj[1].float().to(device)
            y=facenet(x).view(-1)
            loss=lossfunction(y,gt)
            L_t_e.append(loss.item())
            pred=y>threshold
            gt=yh
            tp_t+=torch.sum(((y>0.5)==gt)[torch.where((gt)==1)]).item()#TP
            fn_t+=torch.sum(((y>0.5)!=gt)[torch.where((gt)==1)]).item()#FN
            tn_t+=torch.sum(((y>0.5)==gt)[torch.where((gt)==0)]).item()#TN
            fp_t+=torch.sum(((y>0.5)!=gt)[torch.where((gt)==0)]).item()#FP
        for i, dataj in enumerate(dataloader_v, 0):
            x=dataj[0].float().to(device)
            gt=dataj[1].float().to(device)
            y=facenet(x).view(-1)
            loss=lossfunction(y,gt)
            L_v_e.append(loss.item())
            pred=y>threshold
            tp_v+=torch.sum(((y>0.5)==gt)[torch.where((gt)==1)]).item()#TP
            fn_v+=torch.sum(((y>0.5)!=gt)[torch.where((gt)==1)]).item()#FN
            tn_v+=torch.sum(((y>0.5)==gt)[torch.where((gt)==0)]).item()#TN
            fp_v+=torch.sum(((y>0.5)!=gt)[torch.where((gt)==0)]).item()#FP
        far_v=fp_v/(tn_v+fp_v)
        frr_v=fn_v/(tp_v+fn_v)
        far_t=fp_t/(tn_t+fp_t)
        frr_t=fn_t/(tp_t+fn_t)
        hter_t=0.5*(far_t+frr_t)
        hter_v=0.5*(far_v+frr_v)
        scheduler.step()
        err_t_t=np.mean(L_t_t,0)
        err_t_e=np.mean(L_t_e,0)
        err_v_e=np.mean(L_v_e,0)
        Loss_train_train.append(err_t_t)
        Loss_train_eval.append(err_t_e)
        Loss_val_eval.append(err_v_e)
        print("Training : Loss {} ||| Hter {} \t Validation : Loss {} ||| Hter {}".format(err_t_e,hter_t,err_v_e,hter_v))
        torch.save({
            'epoch': epoch,
            'model_state_dict': facenet.state_dict(),
            'optimizer_state_dict': optimizer.state_dict(),
            'loss_train_train': Loss_train_train,
            'loss_train_eval': Loss_train_eval,
            'loss_val_eval':Loss_val_eval,
            'metrics_train':{'tp':tp_t,'fn':fn_t,'fp':fp_t,'tn':tn_t,'far':far_t,'frr':frr_t,'hter':hter_t},
            'metrics_val':{'tp':tp_v,'fn':fn_v,'fp':fp_v,'tn':tn_v,'far':far_v,'frr':frr_v,'hter':hter_v}
            }, path+"checkpoint_{}.pth".format(epoch))

In [None]:

threshold=0.5
facenet.eval()
tp,fn,tn,fp=0,0,0,0
for dataj in enumerate(dataloader_t):
    x=dataj[0].float().to(device)
    yh=dataj[1].float().to(device)
    y=facenet(x).view(-1)
    pred=y>threshold
    gt=yh
    tp+=torch.sum(((y>0.5)==gt)[torch.where((gt)==1)]).item()#TP
    fn+=torch.sum(((y>0.5)!=gt)[torch.where((gt)==1)]).item()#FN
    tn+=torch.sum(((y>0.5)==gt)[torch.where((gt)==0)]).item()#TN
    fp+=torch.sum(((y>0.5)!=gt)[torch.where((gt)==0)]).item()#FP
far_t=fp/(tn+fp)
frr_t=fn/(tp+fn)
tp,fn,tn,fp=0,0,0,0
for dataj in enumerate(dataloader_v):
    x=dataj[0].float().to(device)
    yh=dataj[1].float().to(device)
    y=facenet(x).view(-1)
    pred=y>threshold
    gt=yh
    tp+=torch.sum(((y>0.5)==gt)[torch.where((gt)==1)]).item()#TP
    fn+=torch.sum(((y>0.5)!=gt)[torch.where((gt)==1)]).item()#FN
    tn+=torch.sum(((y>0.5)==gt)[torch.where((gt)==0)]).item()#TN
    fp+=torch.sum(((y>0.5)!=gt)[torch.where((gt)==0)]).item()#FP
far_v=fp/(tn+fp)
frr_v=fn/(tp+fn)

hter_t=0.5*(far_t+frr_t)
hter_v=0.5*(far_v+frr_v)
print(hter_t,hter_v)

In [None]:
facenet.eval()
tp=[]
fp=[]
tn=[]
fn=[]
threshold=0.5
for i, dataj in enumerate(dataloader_v, 0):
    x=dataj[0].float().to(device)
    yh=dataj[1].float().to(device)
    y=facenet(x).view(-1)
    pred=y>threshold
    gt=yh
    x_fn=x[torch.where(( (y>0.5)!=gt) & (gt==1))]
    x_fp=x[torch.where(( (y>0.5)!=gt) & (gt==0))]
    x_tp=x[torch.where(( (y>0.5)==gt) & (gt==1))]
    x_tn=x[torch.where(( (y>0.5)==gt) & (gt==0))]
    if x_fn.shape[0]!=0:
      fn.append(x_fn.cpu().numpy())
    if x_fp.shape[0]!=0:
      fp.append(x_fp.cpu().numpy())
    if x_tp.shape[0]!=0:
      tp.append(x_tp.cpu().numpy())
    if x_tn.shape[0]!=0:
      tn.append(x_tn.cpu().numpy())

In [None]:
nb_tp=0
nb_tn=0
nb_fp=0
nb_fn=0
for elem in tp:
  nb_tp+=elem.shape[0]
for elem in tn:
  nb_tn+=elem.shape[0]
for elem in fp:
  nb_fp+=elem.shape[0]
for elem in fn:
  nb_fn+=elem.shape[0]

In [None]:
far=nb_fp/(nb_fp+nb_tn)
frr=nb_fn/(nb_tp+nb_fn)
hter=0.5*(far+frr)
print(far, frr, hter)

In [None]:
transform_to_img=transforms.Compose([   transforms.Normalize([0,0,0],[2,2,2]),
                                        transforms.Normalize([-0.5,-0.5,-0.5],[1,1,1]), 
                                        transforms.ConvertImageDtype(torch.uint8)
                                    ])

In [None]:
for elem in tn[20:40]:
    img=elem[0]
    img=transform_to_img(torch.tensor(img))
    img=torch.transpose(img,0,1)
    img=torch.transpose(img,1,2)
    img=img.numpy()
    plt.imshow(img)
    plt.show()