In [1]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [2]:
import torch
import torch.nn as nn
import os
import numpy as np
import pandas as pd

In [3]:
from torch.utils.data import Dataset
from torchvision import transforms
from sklearn.preprocessing import LabelEncoder
import torch
from PIL import Image

class CelebADataSet(Dataset):
    def __init__(self, df=None, ptrain=80, pval=5,
                 path_to_im='/content/drive/MyDrive/DLS Face recognition/celebA_train_500/celebA_imgs',
                 path_to_annot='/content/drive/MyDrive/DLS Face recognition/celebA_train_500/celebA_anno.txt'):
        """
        Параметр ptrain определяет какой процент данных будет отдан для тренировочной выборки
        Параметр pval определяет какой процент данных будет отдан для валидационной выборки
        """
        super().__init__()

        self._path_to_file_name = path_to_im
        self._path_to_annot = path_to_annot

        # Записываем данные об изображениях в DataFrame
        if df is None:
            self._table = pd.read_csv(self._path_to_annot, header=None, sep=' ')
        else:
            self._table = df
        self._table.columns = ['File_name', 'Class']

        self._len = self._table.shape[0]

        # Получаем индексы для разделения данных на train и test
        # а ля train test split из sklearn
        idx = np.random.choice(self._len, self._len, False)
        p1 = self._len//100 * ptrain
        p2 = self._len//100 * (100 - pval)
        self.train_idx, self.test_idx, self.val_idx = np.split(idx, [p1, p2])

        self.label_encoder = LabelEncoder()
        self.label_encoder.fit(self._table['Class'].values)

    class _Data(Dataset):
        """
        Класс, который будет подаваться в Dataloader
        """
        def __init__(self, upper, idx, train=True):
            super().__init__()
            self._upper = upper
            self.train = train

            self._table = self._upper._table.iloc[idx]

        def __len__(self):
            return len(self._table)

        def load_sample(self, file):
            image = Image.open(file)
            image.load()
            return image

        def __getitem__(self, index):
            path_to_img = os.path.join(self._upper._path_to_file_name,
                                       self._table['File_name'].iloc[index])
            image = self.load_sample(path_to_img)

            make_transforms = {True: transforms.Compose([transforms.Resize(224, antialias=True),
                                                         transforms.RandomRotation((0, 5)),
                                                         transforms.ColorJitter(hue=0.1, saturation=0.1),
                                                         transforms.RandomAdjustSharpness(1, p=0.5),
                                                         transforms.RandomAutocontrast(p=0.5),
                                                         transforms.RandomEqualize(p=0.5),
                                                         transforms.RandomHorizontalFlip(p=0.5),
                                                         transforms.ToTensor(),
                                                         transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])]),
                               False: transforms.Compose([transforms.Resize(224, antialias=True),
                                                          transforms.ToTensor(),
                                                          transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])])}

            image = make_transforms[self.train](image)
            label =  self._upper.label_encoder.transform([self._table['Class'].iloc[index]])[0]

            return image, label

    def get_train(self):
        return self._Data(self, self.train_idx)
    def get_test(self):
        return self._Data(self, self.test_idx, False)
    def get_val(self):
        return self._Data(self, self.val_idx, False)
    def get_train_and_test(self):
        return self.get_train(), self.get_test()


In [4]:
def fit_epoch(model, train_loader, loss_fn, optimaizer, DEVICE):
    loss_per_epoch = 0
    accuracy_per_epoch = 0
    processed = 0

    num_batch = len(train_loader)

    log_template = "batch: {n:d}/{all:d} train loss: {t_loss:0.4f} train acc {t_acc:0.4f}"

    model.train()
    for i, (images, labels) in enumerate(train_loader):
        images = images.to(DEVICE)
        labels = labels.to(DEVICE)

        # forward and backward
        optimaizer.zero_grad()

        outputs = model(images, labels)
        loss = loss_fn(outputs, labels)
        loss.backward()
        optimaizer.step()

        #statistic
        preds = torch.argmax(outputs, 1)
        loss_per_epoch += loss.item() * images.size(0)
        accuracy_per_epoch += torch.sum(preds == labels.data).item()

        processed += images.size(0)
        torch.cuda.empty_cache()

        print(log_template.format(n=i+1, all=num_batch, t_loss=loss_per_epoch/processed, t_acc=accuracy_per_epoch/processed))


    loss_per_epoch = loss_per_epoch / processed
    accuracy_per_epoch = accuracy_per_epoch / processed
    torch.cuda.empty_cache()
    return loss_per_epoch, accuracy_per_epoch

In [7]:
def test_epoch(model, test_loader, loss_fn, DEVICE):
    loss_per_epoch = 0
    accuracy_per_epoch = 0
    processed = 0

    model.eval()
    for images, labels in test_loader:
        images = images.to(DEVICE)
        labels = labels.to(DEVICE)
        logits = []

        with torch.no_grad():
            outputs = model(images, labels)
            loss = loss_fn(outputs, labels)
            logits.append(outputs)

            #statistic
            preds = torch.argmax(outputs, 1)
            loss_per_epoch += loss.item() * images.size(0)
            accuracy_per_epoch += torch.sum(preds == labels.data).item()

            processed += images.size(0)
        torch.cuda.empty_cache()

    loss_per_epoch = loss_per_epoch / processed
    accuracy_per_epoch = accuracy_per_epoch / processed
    torch.cuda.empty_cache()
    return loss_per_epoch, accuracy_per_epoch

In [8]:
def train_model(model, train_loader, test_loader, num_epoch, loss_fn, optimaizer, scheduler, DEVICE):
    history = {'train loss':[], 'test loss': [],
               'train accuracy': [], 'test accuracy': []}

    log_template = "\nEpoch {ep:d} train loss: {t_loss:0.4f} test loss {v_loss:0.4f} train acc {t_acc:0.4f} test acc {v_acc:0.4f}"

    torch.cuda.empty_cache()

    for epoch in range(num_epoch):
        print(f'Epoch {epoch+1}/{num_epoch}')

        torch.cuda.empty_cache()

        train_loss, train_accuracy = fit_epoch(model, train_loader, loss_fn, optimaizer, DEVICE)
        test_loss, test_accuracy = test_epoch(model, test_loader, loss_fn, DEVICE)

        history['train loss'].append(train_loss)
        history['test loss'].append(test_loss)
        history['train accuracy'].append(train_accuracy)
        history['test accuracy'].append(test_accuracy)

        scheduler.step()
        torch.cuda.empty_cache()

        print(log_template.format(ep=epoch+1, t_loss=train_loss, v_loss=test_loss,
                                  t_acc=train_accuracy, v_acc=test_accuracy))

    torch.cuda.empty_cache()
    return history

In [9]:
import math
from torch.nn.functional import linear, normalize
class ArcFace(nn.Module):
    def __init__(self, in_feature, out_feature, s, m):
        super().__init__()
        self.s = s
        self.margin = m

        self.weights = nn.Parameter(torch.FloatTensor(out_feature, in_feature))
        nn.init.xavier_normal_(self.weights)

        # for cos(th + m) = cos(th)cos(m) - sin(th)sin(m)
        self.cos_m = math.cos(m)
        self.sin_m = math.sin(m)

        # see (*)
        self.th = math.cos(math.pi - m)
        self.mm = math.sin(math.pi - m) * m

    def forward(self, embending, labels=None):
        cos_th = linear(normalize(embending), normalize(self.weights))
        sin_th = torch.sqrt(1 - torch.pow(cos_th, 2))

        #cos(th + m) = cos(th)cos(m) - sin(th)sin(m)
        cos_th_m = cos_th * self.cos_m - sin_th * self.sin_m

        # see (*)
        cos_th_m = torch.where(cos_th > self.th, cos_th_m, cos_th - self.mm)

        onehot = torch.zeros(cos_th.size(), device='cuda')
        onehot.scatter_(1, labels.view(-1, 1).long(), 1)
        # if it target class when cos(th + m else cos(th)
        outputs = onehot * cos_th_m + (1 - onehot) * cos_th
        outputs *= self.s
        return outputs

(*) https://habr.com/ru/companies/ntechlab/articles/531842/ [Сравнение функций потерь]

In [10]:
from torchvision.models import mobilenet_v3_large

def block(in_channel, out_channel):
    return nn.Sequential(nn.ReLU(True),
                         nn.BatchNorm1d(in_channel),
                         nn.Linear(in_channel, out_channel))

class ArcMobileNet(nn.Module):
    def __init__(self, embending_size, num_class, s, m, weights=None):
        super(ArcMobileNet, self).__init__()

        self.backbone = mobilenet_v3_large(weights=weights)
        self.backbone.classifier = self.backbone.classifier[0]

        in_features = self.backbone.classifier.out_features
        self.linear_block = block(in_features, embending_size)
        self.arcface = ArcFace(embending_size, num_class, s, m)

    def forward(self, image, labels=None):
        # get result from last linear layer
        embending = self.backbone(image)
        # resize vector size to size embending
        embending = self.linear_block(embending)

        if labels is not None:
             return self.arcface(embending, labels)
        else:
            return embending


In [21]:
from torch.utils.data import DataLoader
import multiprocessing

NUM_WORKER = multiprocessing.cpu_count()
BATCH_SIZE = 64

celebA = CelebADataSet(ptrain=85)
train, test = celebA.get_train_and_test()

train_loader = DataLoader(train, batch_size=BATCH_SIZE, shuffle=True, num_workers=NUM_WORKER)
test_loader = DataLoader(test, batch_size=BATCH_SIZE, num_workers=NUM_WORKER)

In [22]:
from torchvision.models import MobileNet_V3_Large_Weights

NUM_CLASS = len(celebA.label_encoder.classes_)
s = 8 # math.sqrt(2) * math.log(NUM_CLASS - 1)
margin = 0.2

arc_model = ArcMobileNet(embending_size=256, num_class=NUM_CLASS, s=s, m=margin, weights=MobileNet_V3_Large_Weights.IMAGENET1K_V2)

In [23]:
optimizer = torch.optim.AdamW(arc_model.parameters())
scheduler = torch.optim.lr_scheduler.StepLR(optimizer, 3, 0.5)
loss_fn = torch.nn.CrossEntropyLoss()

DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")
arc_model = arc_model.to(DEVICE)

In [25]:
torch.cuda.empty_cache()

NUM_EPOCH = 25
history = train_model(arc_model, train_loader, test_loader, NUM_EPOCH, loss_fn, optimizer, scheduler, DEVICE)

Epoch 1/25
batch: 1/160 train loss: 7.8696 train acc 0.0000
batch: 2/160 train loss: 7.8124 train acc 0.0000
batch: 3/160 train loss: 7.8044 train acc 0.0000
batch: 4/160 train loss: 7.8154 train acc 0.0000
batch: 5/160 train loss: 7.7847 train acc 0.0000
batch: 6/160 train loss: 7.7818 train acc 0.0000
batch: 7/160 train loss: 7.7662 train acc 0.0000
batch: 8/160 train loss: 7.7477 train acc 0.0000
batch: 9/160 train loss: 7.7410 train acc 0.0000
batch: 10/160 train loss: 7.7378 train acc 0.0000
batch: 11/160 train loss: 7.7362 train acc 0.0000
batch: 12/160 train loss: 7.7157 train acc 0.0000
batch: 13/160 train loss: 7.7110 train acc 0.0000
batch: 14/160 train loss: 7.7005 train acc 0.0000
batch: 15/160 train loss: 7.6933 train acc 0.0000
batch: 16/160 train loss: 7.6794 train acc 0.0000
batch: 17/160 train loss: 7.6812 train acc 0.0000
batch: 18/160 train loss: 7.6790 train acc 0.0000
batch: 19/160 train loss: 7.6718 train acc 0.0000
batch: 20/160 train loss: 7.6739 train acc 0.000

In [26]:
# Сохраняем веса модели
path_to_save_model_state = '/content/drive/MyDrive/DLS Face recognition/models weights/arcmobilenet_DLS_FR_weightes_v2.pth'
torch.save(arc_model.state_dict(), path_to_save_model_state)

In [None]:
# Функция подготавливающая изображение перед подачей его в сеть
def make_transforms(image):
    transformations = transforms.Compose([transforms.Resize(224, antialias=True),
                                          transforms.ToTensor(),
                                          transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])])

    image = transformations(image)
    image = image[None,:,:,:]
    return image

In [None]:
from sklearn.metrics.pairwise import cosine_distances

class IdentificationRateMetric:
    def __init__(self, query, distraction, transform, device, fpr=[0.5, 0.2, 0.1, 0.05]):
        self.query = query
        self.distraction = distraction
        self.transform = transform
        self.fpr = fpr
        self.saved = False
        self.query_emb = None
        self.distraction_emb = None

        self.device = device
        self.path_query = '/content/drive/MyDrive/DLS Face recognition/celebA_ir/celebA_query'
        self.path_distraction = '/content/drive/MyDrive/DLS Face recognition/celebA_ir/celebA_distractors'

    def get_embendings(self, model, df, root):
        model.eval()
        model.to(self.device)
        emb = []
        for i in range(df.shape[0]):
            path = os.path.join(root, df['File_name'].iloc[i])
            img = self.transform(Image.open(path)).to(self.device)
            embending = model(img).cpu().detach().numpy().reshape(-1, 1)
            emb.append((embending, df['Class'].iloc[i]))
        return emb

    def get_metric(self, model, fpr=None, save_embendings=False, load_embendings=False):
        if load_embendings and self.saved:
            query = self.query_emb
            distraction = self.distraction_emb
        else:
            query = self.get_embendings(model, self.query, self.path_query)
            distraction = self.get_embendings(model, self.distraction, self.path_distraction)

        if save_embendings:
            self.query_emb = query
            self.distraction_emb = distraction
            self.saved = True

        diff_face = []
        sim_face = []
        dist = []

        # compute cosine distance for query part
        for i in range(len(query)):
            emb1, label1 = query[i]
            for j in range(i+1, len(query)):
                emb2, label2 = query[j]
                if label1 == label2:
                    sim_face.append(cosine_distances(emb1, emb2)[0][0])
                else:
                    diff_face.append(cosine_distances(emb1, emb2)[0][0])

        # compute cosine distance for distraction part
        for emb1, _ in distraction:
            for emb2, _ in query:
                dist.append(cosine_distances(emb1, emb2)[0][0])

        FPR = self.fpr if fpr is None else fpr

        rate = []
        for fpr_ in FPR:
            N = int((len(dist) + len(diff_face)) * fpr_)
            threashold = sorted(dist + diff_face)[N]
            positive_pair = np.array(sim_face)
            rate.append(positive_pair[positive_pair < threashold].size/positive_pair.size)
        return rate

In [None]:
query_df = pd.read_csv('/content/drive/MyDrive/DLS Face recognition/celebA_ir/celebA_anno_query.csv')
query_df.columns = ['File_name', 'Class']

path = os.listdir('/content/drive/MyDrive/DLS Face recognition/celebA_ir/celebA_distractors')
t = {'File_name': path, 'Class': -1}

distractors_df = pd.DataFrame(t)

In [None]:
id_rate = IdentificationRateMetric(query_df, distractors_df, make_transforms, DEVICE)

rate = id_rate.get_metric(arc_model, save_embendings=True)

In [None]:
rate = [id_rate.get_metric(arc_model, load_embendings=id_rate.saved, fpr=fpr) for fpr in [0.5, 0.2, 0.1, 0.05]]
rate