# Face recognition learning pipeline for VGGface dataset

## Подключение библиотек

In [12]:
import torch.nn as nn
from torch.nn import functional as F
import pandas as pd
import matplotlib.pyplot as plt
import os
from torch.utils.data import Dataset, DataLoader
from PIL import Image
import torch
import numpy as np
import glob
import cv2
import segmentation_models_pytorch as smp
from sklearn.model_selection import train_test_split
import time
from torch.autograd import Variable
import albumentations as A
from albumentations.pytorch import ToTensorV2
import torch.onnx
from torchinfo import summary
from typing import Tuple
import torchvision.models as models
import os
from facenet_pytorch import MTCNN
import glob
import urllib
from multiprocessing import Pool
from torch.optim.lr_scheduler import ExponentialLR
from typing import List

## Используемые функции

In [3]:
def get_mean_std(research_data_loader: DataLoader) -> Tuple[tuple]:
    '''Функция получает на вход объект класса DataLoader для 
    вычисления mean и std для всего датасета поканально
    Входные параметры:
    research_data_loader: DataLoader - объект для загрузки изображений лиц
    Возвращаемые значения:
    (mean, std): Tuple[tuple] - кортеж, содержащий кортежи со значениями mean и std поканально
    ((mean_r, mean_g, mean_b), (std_r, std_g, std_b))'''
    
    nimages = 0
    mean = 0.
    std = 0.
    for batch in research_data_loader:
        # Приводим тензор из [B, C, W, H] к [B, C, W * H]
        batch = batch.view(batch.size(0), batch.size(1), -1)
        # Суммируем общее количество иобработанных изображений
        nimages += batch.size(0)
        # Вычисляем mean и std
        mean += batch.mean(2).sum(0)
        std += batch.std(2).sum(0)

    mean /= nimages
    std /= nimages
    return tuple(mean.cpu().numpy()), tuple(std.cpu().numpy())

In [4]:
def get_data_csv(faces_directory_path: str = None) -> pd.DataFrame:
    '''Функция получает на вход путь к директории с лицами
    и генерирует датафрейм, содержащий полные имена изображений 
    Входные параметры:
    faces_directory_path: str - путь к директории с лицами,
    Возвращаемые значения:
    pd.DataFrame: data - dataframe, содержащий полные имена изображений'''

    if not os.path.isdir(faces_directory_path):
        raise OSError('Directory is not exist')

    data = {}
    data['face_path'] = []
    data['face_path'] = list(glob.glob(faces_directory_path + "/*/*"))
    data = pd.DataFrame(data)
    data['person_name'] = data['face_path'].apply(lambda x: x.split('/')[-2])
    #data = data.sort_values(by='person_name')

    return data

In [5]:
def get_train_valid_test(source_df: pd.DataFrame, separate_feature: str = None, 
                         valid_size: float = 0.2, test_size: float = 0.2) -> pd.DataFrame:
    '''Функция разделяет source_df на три части с коэффициентами valid_size и test_size
    по уникальным значениям separate_feature так, чтобы в новых датафреймах
    не было строк с одинаковыми значениями из separate_feature
    Входные параметры:
    source_df: pd.DataFrame - датафрейм для разделения на train, valid и test
    separate_feature: str - поле, по которому датафрейм будет разделен
    valid_size: float - коэффициент разделения для valid
    test_size: float - коэффициент разделения для test
    Возвращаемые значения:
    pd.DataFrame: data_train - датафрейм для тренировки
    pd.DataFrame: data_valid - датафрейм для валидации
    pd.DataFrame: data_test - датафрейм для тестирования'''

    if (separate_feature != None) & (separate_feature in source_df.columns):
        train_faces, test_faces = train_test_split(source_df[separate_feature].unique(), 
                                                   test_size=(valid_size + test_size), random_state=42)
        valid_faces, test_faces = train_test_split(test_faces, 
                                                   test_size=(test_size/(valid_size + test_size)), random_state=42)
        
        data_train = source_df[np.isin(source_df[separate_feature].values, train_faces)]
        data_valid = source_df[np.isin(source_df[separate_feature].values, valid_faces)]
        data_test = source_df[np.isin(source_df[separate_feature].values, test_faces)]
        
        assert source_df.shape[0] == (data_train.shape[0] + data_valid.shape[0] + data_test.shape[0])
        assert np.isin(data_train[separate_feature].values, data_valid[separate_feature].values).sum() == 0
        assert np.isin(data_train[separate_feature].values, data_test[separate_feature].values).sum() == 0
        assert np.isin(data_valid[separate_feature].values, data_test[separate_feature].values).sum() == 0

    else:
        data_train, data_test = train_test_split(source_df, test_size=(valid_size + test_size))
        data_valid, data_test = train_test_split(data_test, test_size=(test_size/(valid_size + test_size)))


    return (data_train, data_valid, data_test)

## Используемые классы

In [6]:
class VGGface2Data(object):
    '''Класс для предобработки данных датасета VGGFace2'''
    def __init__(self, images_directory: str, faces_directory: str):
        '''Конструктор класса VGGface2Data
        Входные параметры:
        images_directory: str - директория, в которой будут храниться скачанные изображения людей
        faces_directory: str - директория, в которой будут храниться лица, выделенный из изображений
        Возвращаемые значения:
        объект класса VGGface1Data'''
        
        if ((not os.path.isdir(images_directory)) or (not os.path.isdir(faces_directory))):
            raise OSError('Directory is not exist')
    
        self.images_directory = images_directory
        self.faces_directory = faces_directory


    def get_faces_from_mtcnn(self, out_face_size: tuple = (160, 160), in_faces_size_treashold: tuple = (150, 150), 
                        face_prob_treashold: float = 0.99, face_ratio_treashold: float = 1.5):
        '''Метод получения из изображений с людьми в images_directory изображений лиц и сохранения их
        в faces_directory с помощью детектора лиц mtcnn
        Входные параметры:
        images_directory: str - директория с изображениями людей (включает в себя поддиректории - каждая для
        отдельного человека)
        faces_directory: str - директория с изображениями лиц людей (включает в себя поддиректории - каждая для
        отдельного человека)
        out_face_size: tuple - размер, к которому будут приведены изображения лиц перед сохранением в faces_directory
        in_faces_size_treashold: tuple - порог размера лица на исходном изображении, по которому будут фильтроваться
        изображения лиц перед сохранением в faces_directory
        face_prob_treashold: float - порог уверенности сети в обнаружении лица на исходном изображении, 
        по которому будут фильтроваться изображения лиц перед сохранением в faces_directory
        face_ratio_treashold: float - порог соотношения сторон изображения лица, по которому будут фильтроваться 
        изображения лиц перед сохранением в faces_directory'''

        persons_list = os.listdir(self.images_directory)
        mtcnn = MTCNN(keep_all=True, device='cuda:0')
        for person in persons_list:
            os.mkdir(self.faces_directory + person)
            images_list = list(glob.glob(self.images_directory + person + "/*"))
            for image in images_list:
                try:
                    image_numpy = cv2.imread(image)
                    rgb_image_numpy = cv2.cvtColor(image_numpy, cv2.COLOR_BGR2RGB)
                    boxes, probs = mtcnn.detect(rgb_image_numpy, landmarks=False)
                    # Если на изображении несколько лиц или сеть не уверена в обнаружении лица, 
                    # не обрабатываем это изображение
                    if (len(boxes) != 1) or (probs[0] < face_prob_treashold):
                        continue
                    box = boxes[0]
                    left, top, right, bottom = (int(box[0])), (int(box[1])), (int(box[2])), (int(box[3]))
                    face_numpy = image_numpy[top:bottom, left:right, :]

                    # Если размер лица на исходном изображении мал, то не обрабатываем это изображение
                    # (избавляемся от шакалистых лиц)
                    if ((face_numpy.shape[0] < in_faces_size_treashold[0]) or 
                                        (face_numpy.shape[1] < in_faces_size_treashold[1])):
                        continue

                    # Бывает, сеть принимает за лицо рандомные прямоугольники, избавляемся от них
                    if ((face_numpy.shape[0]/face_numpy.shape[1] > face_ratio_treashold) or 
                                    (face_numpy.shape[1]/face_numpy.shape[0] > face_ratio_treashold)):
                        continue

                    face_numpy = cv2.resize(face_numpy, out_face_size, interpolation = cv2.INTER_LINEAR).astype('float')

                    cv2.imwrite(self.faces_directory + person + '/' + image.split('/')[-1], face_numpy)

                except Exception as e:
                    print(e)

In [7]:
class VGGface1Data(VGGface2Data):
    '''Класс для загрузки и предобработки данных датасета VGGFace1 на основе класса VGGface2Data
    так как в новом классе нам нужен метод get_faces_from_mtcnn'''
    def __init__(self, urls_directory: str, images_directory: str, faces_directory: str):
        '''Конструктор класса VGGface1Data
        Входные параметры:
        urls_directory: str - директория, в которой находятся текстовые файлы (для каждого человека свой),
        в которых хранятся url изображений соответствующих людей
        images_directory: str - директория, в которой будут храниться скачанные изображения людей
        faces_directory: str - директория, в которой будут храниться лица, выделенный из изображений
        Возвращаемые значения:
        объект класса VGGface1Data'''
        
        VGGface2Data.__init__(self, images_directory, faces_directory)
        
        if not os.path.isdir(urls_directory):
            raise OSError('Directory is not exist')
                             
        self.urls_directory = urls_directory

        
    @staticmethod
    def __checkurl(params: list):
        '''Статический метод для запроса изображения по url и сохранения его в целевую директорию.
        Используется при вызове функции map модуля multiprocessing
        Входные параметры:
        params[0]: str - url, по которому находится изображение человека
        params[1]: str - директория, в которую необходимо сохранить скачанное изображение человека
        params[2]: str - имя человека, для которого скачивается изображение
        params[3]: str - индекс изображения для данного человека
        params[4]: str - координаты bbox'а лица на изображении данного человека'''
        
        image_url = params[0]
        images_directory = params[1]
        person_name = params[2]
        index = params[3]
        bbox = params[4]

        try:
            print(f'Trying download {image_url}')
            with urllib.request.urlopen(image_url, timeout=5) as f:
                image = f.read()
            with open(images_directory + person_name + '/' + str(index) + ' ' + str(bbox) + '.jpg', 'wb') as binary_file:
                binary_file.write(image)
        except Exception as e:
            print(e)
              
                
    def download(self, total_persons: int = 200, total_images_for_person: int = 200, processes: int = 20):
        '''Метод скачивания изображений людей в self.images_directory с помощью запросов к url,
        которые хранятся в self.urls_directory и сагрегированы по txt файлам (для каждого человека свой файл,
        хранящий 1000 url к соответствующим изображениям)
        Входные параметры:
        total_persons: int - общее количесво людей, для которых будут скачиваться изображения
        total_images_for_person: int - общее количество запросов, которые будут выполнены для каждого человека
        processes: int - количество процессов для загрузки изображений
        Возвращаемые значения:
        total_time: float - время, затраченное на загрузку и сохранение изображений'''

        total_persons = int(total_persons)
        if total_persons < 1:
            total_persons = 1
        if total_persons > 1000:
            total_persons = 1000

        total_images_for_person = int(total_images_for_person)
        if total_images_for_person < 1:
            total_images_for_person = 1
        if total_images_for_person > 1000:
            total_images_for_person = 1000

        persons_list = list(glob.glob(self.urls_directory + "*"))

        start_time = time.time()

        for i in range(total_persons):
            # Создаем папку с именем человека, в которой будем хранить его изображения
            person_name = persons_list[i].split('.')[0].split('/')[-1]
            os.mkdir(self.images_directory + person_name)
            # Считываем содержимое текстового файла и парсим его
            file_with_urls = open(persons_list[i], "r")
            
            info = file_with_urls.readlines()
            urls = list(map(lambda x: x.split(' ')[1], info))
            indexes = list(map(lambda x: x.split(' ')[0], info))
            bboxes = list(map(lambda x: x.split(' ')[2:], info))
            bboxes = list(map(lambda x: ' '.join(x), bboxes))

            urls = urls[: total_images_for_person]
            indexes = indexes[: total_images_for_person]
            bboxes = bboxes[: total_images_for_person]

            # Список из одинаковых элементов необходим для правильной работы функции p.map
            images_directory_arr = [self.images_directory] * total_images_for_person
            person_name_arr = [person_name] * total_images_for_person

            p = Pool(processes=processes)
            result = p.map(VGGface1Data.__checkurl, list(zip(urls, images_directory_arr, person_name_arr, 
                                                           indexes, bboxes)))

            file_with_urls.close

        stop_time = time.time()
        total_time = stop_time - start_time

        return total_time

    
    def get_faces_from_bbox(self, out_face_size: tuple = (160, 160), in_faces_size_treashold: tuple = (150, 150), 
                  face_ratio_treashold: float = 1.5):
        '''Метод получения из изображений с людьми в images_directory изображений лиц с использованием
        известных координат bbox и сохранения изображений лиц в faces_directory
        Входные параметры:
        out_face_size: tuple - размер, к которому будут приведены изображения лиц перед сохранением в faces_directory
        in_faces_size_treashold: tuple - порог размера лица на исходном изображении, по которому будут фильтроваться
        изображения лиц перед сохранением в faces_directory
        face_ratio_treashold: float - порог соотношения сторон изображения лица, по которому будут фильтроваться 
        изображения лиц перед сохранением в faces_directory'''

        persons_list = os.listdir(self.images_directory)
        for person in persons_list:
            os.mkdir(self.faces_directory + person)
            images_list = list(glob.glob(self.images_directory + person + "/*"))
            for image in images_list:
                try:
                    image_numpy = cv2.imread(image)
                    rgb_image_numpy = cv2.cvtColor(image_numpy, cv2.COLOR_BGR2RGB)
                    left = int(float(image.split(' ')[1]))
                    top = int(float(image.split(' ')[2]))
                    right = int(float(image.split(' ')[3]))
                    bottom = int(float(image.split(' ')[4]))

                    face_numpy = image_numpy[top:bottom, left:right, :]

                    # Если размер лица на исходном изображении мал, то не обрабатываем это изображение
                    # (избавляемся от шакалистых лиц)
                    if ((face_numpy.shape[0] < in_faces_size_treashold[0]) or 
                                        (face_numpy.shape[1] < in_faces_size_treashold[1])):
                        continue

                    # Бывает, сеть принимает за лицо рандомные прямоугольники, избавляемся от них
                    if ((face_numpy.shape[0]/face_numpy.shape[1] > face_ratio_treashold) or 
                                    (face_numpy.shape[1]/face_numpy.shape[0] > face_ratio_treashold)):
                        continue 

                    face_numpy = cv2.resize(face_numpy, out_face_size, interpolation = cv2.INTER_LINEAR).astype('float')

                    cv2.imwrite(self.faces_directory + person + '/' + image.split('/')[-1], face_numpy)

                except Exception as e:
                    print(e)

In [8]:
class ResearchDataset(Dataset):
    '''Класс для создания датасета для вычисления среднего и дисперсии по каналам изображений лиц'''
    def __init__(self, data_info: pd.DataFrame, device: str):
        '''Входные параметры:
        data_info: pd.DataFrame - датафрейм с адресами изображений и масок
        device: str - имя устройства, на котором будут обрабатываться данные
        Возвращаемые значения:
        объект класса CustomDatasetForTrain'''
        
        # Подаем подготовленный датафрейм
        self.data_info = data_info
        # Количество изображений
        self.data_len = len(self.data_info.index)
        self.device = device

    def __getitem__(self, index: int):
        '''Входные параметры:
        index: int - индекс для обращения к элементам датафрейма self.data_info
        Возвращаемые значения:
        image_tensor: torch.Tensor - тензорное представление изображения лица'''
        
        image_numpy = cv2.imread(self.data_info['face_path'][index])
        image_numpy = cv2.cvtColor(image_numpy, cv2.COLOR_BGR2RGB).astype('float')/255.0
        image_tensor = torch.from_numpy(image_numpy)
        image_tensor = image_tensor.to(self.device).float()
        image_tensor = image_tensor.permute(2, 0, 1)

        return image_tensor

    def __len__(self):
        return self.data_len

In [9]:
class CustomDataset(Dataset):
    '''Класс для создания тренировочных и валидационных датасетов'''
    def __init__(self, data_info: pd.DataFrame, device: str, transform: object):
        '''Входные параметры:
        data_info: pd.DataFrame - датафрейм с адресами изображений и масок
        device: str - имя устройства, на котором будут обрабатываться данные
        transform: object - список трансформации, которым будут подвергнуты изображения и маски
        Возвращаемые значения:
        объект класса CustomDataset'''
        
        # Подаем подготовленный датафрейм
        self.data_info = data_info
        self.face_path_arr = self.data_info.iloc[:,0]
        self.person_name_arr = self.data_info.iloc[:,1]
        # Количество изображений
        self.data_len = len(self.data_info.index)
        # Устройство, на котором будут находиться выходные тензоры
        self.device = device
        # Сохраняем преобразования данных
        self.transform = transform

    def __getitem__(self, index: int):
        '''Входные параметры:
        index: int - индекс для обращения к элементам датафрейма data_info
        Возвращаемые значения:
        Tuple[torch.Tensor] - кортеж из тензорных представлений изображений лиц (якоря, позитивного
        и негативного примеров)'''
        
        anchor_image = cv2.imread(self.data_info['face_path'][index])
        anchor_person_name = self.data_info['person_name'][index]
        positive_indices = self.data_info[self.data_info['person_name'] == anchor_person_name].index.values
        negative_indices = self.data_info[self.data_info['person_name'] != anchor_person_name].index.values
        
        while True:
            positive_index = np.random.choice(positive_indices)
            if positive_index != index:
                break
        
        positive_index = np.random.choice(positive_indices)
        negative_index = np.random.choice(negative_indices)
        
        positive_image = cv2.imread(self.face_path_arr[positive_index])
        negative_image = cv2.imread(self.face_path_arr[negative_index])
        
        anchor_image = cv2.cvtColor(anchor_image, cv2.COLOR_BGR2RGB).astype('float')/255.0
        positive_image = cv2.cvtColor(positive_image, cv2.COLOR_BGR2RGB).astype('float')/255.0
        negative_image = cv2.cvtColor(negative_image, cv2.COLOR_BGR2RGB).astype('float')/255.0
        
        tr_anchor_image = self.transform(image=anchor_image)['image'].to(self.device).float()
        tr_positive_image = self.transform(image=positive_image)['image'].to(self.device).float()
        tr_negative_image = self.transform(image=negative_image)['image'].to(self.device).float()

        return tr_anchor_image, tr_positive_image, tr_negative_image

    def __len__(self):
        return self.data_len

In [10]:
class TripletLoss(nn.Module):
    '''Класс для вычисления Triplet loss для набора изображенй в формате torch.Tensor'''
    def __init__(self, margin: float, device: str):
        super(TripletLoss, self).__init__()
        self.margin = margin
        self.device = device

    def forward(self, anchor_embs: torch.Tensor, positive_embs: torch.Tensor, negative_embs: torch.Tensor) -> float:
        '''Входные параметры:
        anchor_embs: torch.Tensor - тензор предсказанных эмбэддингов для якорей
        positive_embs: torch.Tensor - тензор предсказанных эмбэддингов для позитивных примеров
        negative_embs: torch.Tensor - тензор предсказанных эмбэддингов для негативных примеров
        все тензоры имеют размерность B х E, где B - размер батча, E - размер вектора эмбэддинга
        Возвращаемые значения:
        score: float - значение Triplet loss для наборов предсказанных эмбэддингов'''
        
        anchor_positive_distances = torch.sum((anchor_embs - positive_embs)**2, dim=1)
        anchor_negative_distances = torch.sum((anchor_embs - negative_embs)**2, dim=1)
        
        score = torch.max(torch.tensor([0.]).to(self.device), 
                        anchor_positive_distances - anchor_negative_distances + 
                        torch.tensor([self.margin]).to(self.device))
        
        score = torch.mean(score)
        
        return score

In [50]:
class RecognitionMetric(nn.Module):
    '''Класс для вычисления метрики для задачи face recognition для набора изображенй в формате torch.Tensor'''
    def __init__(self):
        super(RecognitionMetric, self).__init__()

        
    def forward(self, anchor_embs: torch.Tensor, positive_embs: torch.Tensor, 
                negative_embs: torch.Tensor) -> List[float]:
        '''Входные параметры:
        anchor_embs: torch.Tensor - тензор предсказанных эмбэддингов для якорей
        positive_embs: torch.Tensor - тензор предсказанных эмбэддингов для позитивных примеров
        negative_embs: torch.Tensor - тензор предсказанных эмбэддингов для негативных примеров
        все тензоры имеют размерность B х E, где B - размер батча, E - размер вектора эмбэддинга
        Возвращаемые значения:
        score: List[float] - список, содержащий два значения:
            1 - среднее по батчу l2 расстояние от anchor_embs до positive_embs
            2 - среднее по батчу l2 расстояние от anchor_embs до negative_embs'''
        
        with torch.no_grad():
            # для каждого триплета в батче получаем l2 расстояния от якоря до позитивного и от 
            # якоря до негативного элемента
            anchor_positive_distances = (torch.sum((anchor_embs - positive_embs)**2, dim=1))**0.5
            anchor_negative_distances = (torch.sum((anchor_embs - negative_embs)**2, dim=1))**0.5
            # Проводим усреднения по батчу
        return (torch.mean(anchor_positive_distances).item(), torch.mean(anchor_negative_distances).item())

In [58]:
class ModelWrapper(nn.Module):
    '''Класс, реализующий функционал для работы с нейронной сетью для распознавания лиц на основе VGGface'''
    def __init__(self, model: object):
        '''Конструктор класса
        Входные параметры:
        model: nn.Module - последовательность слоев или модель, через которую будут проходить данные
        Возвращаемые значения: 
        объект класса ModelWrapper'''
        
        super(ModelWrapper, self).__init__()
        self.model = model

        
    def forward(self, input_data):
        '''Метод прямого прохода через объект класса
        Входные параметры:
        input_data: torch.Tensor - тензорное представление изображения лица
        Возвращаемые значения: 
        output_data: torch.Tensor - эмбэддинг лица в тензорном формате'''
        
        output_data = self.model(input_data)
        return output_data
    
    
    def fit(self, criterion: object, metric: object, optimizer: object, scheduler: object,
                  train_data_loader: DataLoader, valid_data_loader: DataLoader=None, 
                  epochs: int=1, verbose: int=50):
        '''Метод для обучения модели
        Входные параметры:
        criterion: object - объект для вычисления loss
        metric: object - объект для вычисления метрики качества
        optimizer: object - оптимизатор
        scheduler: object - 
        train_data_loader: DataLoader - загрузчик данных для обучения
        valid_data_loader: DataLoader - загрузчик данных для валидации
        epochs: int - количество эпох обучения
        verbose: int - вывод информации через каждые verbose итераций
        Возвращаемые значения:
        result: dict - словарь со значениями loss при тренировке, валидации и метрики при валидации 
        для каждой эпохи'''
        
        self.optimizer = optimizer
        epoch_train_losses = []
        epoch_valid_losses = []
        epoch_valid_metrics = []
        result = {}
        
        for epoch in range(epochs):
            self.model.train()
            time1 = time.time()
            running_loss = 0.0
            train_losses = []
            for batch_idx, data in enumerate(train_data_loader):
                # data - список из 3-х тезоров с размерностями [B, C, W, H]. Сделаем конкатенацию
                # в один тензор с размерностью [3*B, C, W, H] для одновременного прогона через сеть
                data = torch.cat(data, dim=0)
                data = Variable(data)       

                optimizer.zero_grad()
                outputs = self.model(data)
                # После прогона данных, разделяем результат на три соответствующих тензора
                batch_size = int(outputs.shape[0]/3)
                anchor_embs = outputs[:batch_size, :]
                positive_embs = outputs[batch_size : 2*batch_size, :]
                negative_embs = outputs[2*batch_size :, :]
                loss = criterion(anchor_embs, positive_embs, negative_embs)
                loss.backward()
                optimizer.step()

                running_loss += loss.item()
                train_losses.append(loss.item())
                if (batch_idx+1) % verbose == 0:
                    print(f'Train Epoch: {epoch+1}, Loss: {(running_loss/verbose):.6f}, ', end="")
                    print(f'Learning rate: {scheduler.get_last_lr()[0]}')
                    time2 = time.time()
                    print(f'Spended time for {verbose} batches ({int((verbose*data.shape[0])/3)} triplets', end="") 
                    print(f' or {int(verbose*data.shape[0])} images) : {(time2-time1):.6f} sec')
                    
                    time1 = time.time()
                    running_loss = 0.0

            train_loss = np.mean(train_losses)
            scheduler.step()
            
            if valid_data_loader != None:
                valid_result = self.valid(criterion, metric, valid_data_loader)
                valid_loss = valid_result['valid_loss']
                valid_metric = valid_result['valid_metric']
            
                print('='*80)
                print(f'Epoch {epoch+1}, train loss: {(train_loss):.6f}, valid loss: {(valid_loss):.6f}, ', end="")
                print(f'valid anc pos dist: {(valid_metric[0]):.6f}, valid anc neg dist: {(valid_metric[1]):.6f}')
                print('='*80)
            else:
                print('='*80)
                print(f'Epoch {epoch+1}, train loss: {(train_loss):.6f}')
                print('='*80)
                valid_loss = None
                valid_metric = None
            
            epoch_train_losses.append(train_loss)
            epoch_valid_losses.append(valid_loss)
            epoch_valid_metrics.append(valid_metric)
        
        result['epoch_train_losses'] = epoch_train_losses
        result['epoch_valid_losses'] = epoch_valid_losses
        result['epoch_valid_metrics'] = epoch_valid_metrics
        
        return result
    
    
    def valid(self, criterion: object, metric: object, valid_data_loader: DataLoader):
        '''Метод для валидации модели
        Входные параметры:
        criterion: object - объект для вычисления loss
        metric: object - объект для вычисления метрики качества
        valid_data_loader: DataLoader - загрузчик данных для валидации
        Возвращаемые значения:
        result: dict - словарь со значениями loss и метрики при валидации'''
        
        self.model.eval()
        valid_metrics = []
        valid_losses = []
        result = {}
        for batch_idx, data in enumerate(valid_data_loader):
            data = torch.cat(data, dim=0)
            data = Variable(data)
            
            outputs = self.model(data)
                    
            batch_size = int(outputs.shape[0]/3)
            anchor_embs = outputs[:batch_size, :]
            positive_embs = outputs[batch_size : 2*batch_size, :]
            negative_embs = outputs[2*batch_size :, :]
            
            loss = criterion(anchor_embs, positive_embs, negative_embs)
            valid_losses.append(loss.item())
                    
            metric_value = metric(anchor_embs, positive_embs, negative_embs)
            valid_metrics.append(metric_value)
                
        valid_loss    = np.mean(valid_losses)
        valid_metric  = np.mean(valid_metrics, axis=0)
        result['valid_loss'] = valid_loss
        result['valid_metric'] = valid_metric
        return result

    
    def save(self, path_to_save: str = './model.pth'):
        '''Метод сохранения весов модели
        Входные параметры:
        path_to_save: str - директория для сохранения состояния модели'''
        
        torch.save(self.model.state_dict(), path_to_save)
    
    
    def trace_save(self, path_to_save: str = './model.pth'):
        '''Метод сохранения модели через torchscript
        Входные параметры:
        path_to_save: str - директория для сохранения модели'''
        
        example_forward_input = torch.rand(1, 3, 160, 160).to('cpu')
        if next(self.model.parameters()).is_cuda:
            example_forward_input= example_forward_input.to('cuda:0')
            
        traced_model = torch.jit.trace((self.model).eval(), example_forward_input)
        torch.jit.save(traced_model, path_to_save)
    
    
    def onnx_save(self, path_to_save: str = './face_recognition_model.onnx'):
        '''Метод сохранения модели в формате ONNX
        Входные параметры:
        path_to_save: str - директория для сохранения модели'''
        
        example_forward_input = torch.randn(1, 3, 160, 160, requires_grad=True).to('cpu')
        if next(self.model.parameters()).is_cuda:
            example_forward_input= example_forward_input.to('cuda:0')

        torch.onnx.export(self.model,
                          example_forward_input,
                          path_to_save,
                          export_params=True,
                          opset_version=10,
                          do_constant_folding=True,
                          input_names = ['input'],
                          output_names = ['output'],
                          dynamic_axes={'input' : {0 : 'batch_size'},    # Модель будет работать с произвольным
                                        'output' : {0 : 'batch_size'}})  # размером батча
    
    
    def load(self, path_to_model: str = './model.pth'):
        '''Метод загрузки весов модели
        Входные параметры:
        path_to_model: str - директория с сохраненными весами модели'''
        
        self.model.load_state_dict(torch.load(path_to_model))

In [15]:
# Собственная модель с нуля
class Model(nn.Module):
    def __init__(self):
        super().__init__()
        self.conv1 = nn.Conv2d(3, 32, kernel_size=3, stride=1, padding=1)
        self.pool = nn.MaxPool2d(2, 2)
        self.conv2 = nn.Conv2d(32, 64, kernel_size=3, stride=1, padding=1)
        self.conv3 = nn.Conv2d(64, 128, kernel_size=3, stride=1, padding=1)
        self.conv4 = nn.Conv2d(128, 256, kernel_size=3, stride=1, padding=1)
        self.conv5 = nn.Conv2d(256, 512, kernel_size=3, stride=1, padding=1)
        
        self.fc1 = nn.Linear(512, 256)
        self.fc2 = nn.Linear(256, 128)
        
        self.bn1 = nn.BatchNorm2d(32)
        self.bn2 = nn.BatchNorm2d(64)
        self.bn3 = nn.BatchNorm2d(128)
        self.bn4 = nn.BatchNorm2d(256)
        self.bn5 = nn.BatchNorm2d(512)
        
        self.alobal_average = nn.AvgPool2d(5)
        
        self.dropout1 = nn.Dropout(p=0.3)
        self.dropout2 = nn.Dropout(p=0.4)

    def forward(self, x):
        x = self.pool(F.relu(self.bn1(self.conv1(x))))
        x = self.pool(F.relu(self.bn2(self.conv2(x))))
        x = self.pool(F.relu(self.bn3(self.conv3(x))))
        x = self.pool(F.relu(self.bn4(self.conv4(x))))
        x = self.pool(F.relu(self.bn5(self.conv5(x))))
        
        x = self.alobal_average(x)
        
        x = x.view(-1, 512)
        x = F.relu(self.dropout1(self.fc1(x)))
        x = self.fc2(x)
        return x

In [16]:
class Identity(nn.Module):
    '''Класс необходим для реализации transfer learning.
    Если создать объект этого класса и присвоить его последнему слою классификатора
    обученной нейросети, то классификатор перезапишется, сигнал будет проходить без изменений, 
    и в дальнейшем можно будет добавлять пользовательские слои'''
    
    def __init__(self):
        super(Identity, self).__init__()
        
        
    def forward(self, x):
        return x

In [17]:
# Постороение модели на основе mobilenet_v3_small
class MobileImagenet(nn.Module):
    def __init__(self):
        super().__init__()
        self.backbone = models.mobilenet_v3_small(pretrained=True)
        # Заменяем последние 2 полносвязных слоя на имеющие нужную размерность
        self.backbone.classifier[0] = nn.Linear(576, 256)
        self.backbone.classifier[3] = nn.Linear(256, 128)

        
    def get_layers_names(self):
         return dict(self.backbone.named_modules())

        
    def forward(self, x):
        x = self.backbone(x)
        return x

## Загрузка и обработка данных

VGGface2 test https://www.kaggle.com/greatgamedota/vggface2-test - изображения людей

VGGface https://www.robots.ox.ac.uk/~vgg/data/vgg_face/ - ссылки на изображения людей в txt файлах
(надо скачивать через request)

Для работы необходимо создать директории:

    Для VGGface1:
    - /home/dima/datasets/vgg_face_dataset/files/ - для хранения текстовых файлов со ссылками на 
    изображения VGGface1
    (должны изначально находиться в этой директории)
    - /home/dima/datasets/vgg_face_dataset/full_images/ - для хранения изображений VGGface1, которые будут 
    скачаны
    - /home/dima/datasets/vgg_face_dataset/faces_from_mtcnn/ - для хранения лиц, которые будут извлечены mtcnn
    - /home/dima/datasets/vgg_face_dataset/faces_from_bbox/ - для хранения лиц, которые будут извлечены через 
    известные bbox'ы
    
    Для VGGface2:
    - /home/dima/datasets/vgg_face2_dataset/faces/ - для хранения изображений VGGface2 
    (должны изначально находиться в этой директории)
    - /home/dima/datasets/vgg_face2_dataset/full_images/ - для хранения лиц, которые будут извлечены mtcnn

In [18]:
VGG1_URLS_DIRECTORY = "/home/dima/datasets/vgg_face_dataset/files/"
VGG1_IMAGES_DIRECTORY = '/home/dima/datasets/vgg_face_dataset/full_images/'
VGG1_FACES_DIRECTORY_MTCNN = '/home/dima/datasets/vgg_face_dataset/faces_from_mtcnn/'
VGG1_FACES_DIRECTORY_BBOX = '/home/dima/datasets/vgg_face_dataset/faces_from_bbox/'

VGG2_FACES_DIRECTORY = '/home/dima/datasets/vgg_face2_dataset/faces/'
VGG2_IMAGES_DIRECTORY = '/home/dima/datasets/vgg_face2_dataset/full_images/'

In [None]:
# Загрузка VGGface1
vggface1data = VGGface1Data(urls_directory=VGG1_URLS_DIRECTORY,
                            images_directory=VGG1_IMAGES_DIRECTORY, 
                            faces_directory=VGG1_FACES_DIRECTORY_BBOX)

total_time = vggface1data.download(total_persons=1000, total_images_for_person=400, processes=20)
total_time

In [None]:
# Получение лиц для VGGface1 на основе известных bbox
vggface1data.get_faces_from_bbox(out_face_size=(160, 160), in_faces_size_treashold=(100, 100), 
                  face_ratio_treashold=1.5)

# Меняем целевую директорию для лиц
vggface1data.faces_directory = VGG1_FACES_DIRECTORY_MTCNN

# Получение лиц для VGGface1 с использованием mtcnn
vggface1data.get_faces_from_mtcnn(out_face_size=(160, 160), in_faces_size_treashold=(100, 100), 
                        face_prob_treashold=0.99, face_ratio_treashold=1.5)

Выделение лиц для VGGface1 происходило двумя способами: с помощью предоставленных bbox'ов и через mtcnn.
В первом случае получился датасет из 122000 изображений лиц, во втором - из 77000. В обоих случаях
отфильтровывались те лица, исходный размер которых на изображениях был меньше 100х100.
Разница в размере произошла из-за того, что алгоритм выделения лиц с помощью mtcnn отбраковывал изображения, 
на которых присутствуют несколько человек, плюс еще есть дополнительные фильтры.
Лучше использовать первый метод, если нужно больше данных, но нужна чистка данных, так как иногда bbox'ы 
размечены неправильно. Если мало времени, используем второй метод.

In [None]:
# Получение лиц для VGGface2 с использованием mtcnn
vggface2data = VGGface2Data(images_directory=VGG2_IMAGES_DIRECTORY, faces_directory=VGG2_FACES_DIRECTORY)
vggface2data.get_faces_mtcnn(out_face_size=(160, 160),
                             in_faces_size_treashold=(100, 100), 
                             face_prob_treashold=0.99, 
                             face_ratio_treashold=1.5)

## Обучение модели

In [19]:
device = 'cuda:0' if torch.cuda.is_available() else 'cpu'
device

'cuda:0'

In [20]:
torch.cuda.get_device_name(0)

'NVIDIA GeForce RTX 3060'

In [21]:
!nvidia-smi

Sun Dec  5 16:04:35 2021       
+-----------------------------------------------------------------------------+
| NVIDIA-SMI 470.86       Driver Version: 470.86       CUDA Version: 11.4     |
|-------------------------------+----------------------+----------------------+
| GPU  Name        Persistence-M| Bus-Id        Disp.A | Volatile Uncorr. ECC |
| Fan  Temp  Perf  Pwr:Usage/Cap|         Memory-Usage | GPU-Util  Compute M. |
|                               |                      |               MIG M. |
|   0  NVIDIA GeForce ...  Off  | 00000000:01:00.0  On |                  N/A |
|  0%   44C    P8    10W / 170W |    485MiB / 12045MiB |     12%      Default |
|                               |                      |                  N/A |
+-------------------------------+----------------------+----------------------+
                                                                               
+---------------------------------------------------------------------------

In [65]:
learning_rate = 0.001
num_epochs = 30
margin = 2
batch_size = 128

In [23]:
big_vggface1 = get_data_csv(VGG1_FACES_DIRECTORY_BBOX)
big_vggface1.shape

(122114, 2)

In [24]:
small_vggface1 = get_data_csv(VGG1_FACES_DIRECTORY_MTCNN)
small_vggface1.shape

(77199, 2)

In [25]:
vggface2 = get_data_csv(VGG2_FACES_DIRECTORY)
vggface2.shape

(57331, 2)

Эксперименты показали, что работать с vggface2 (в нашем случае только тестовая часть) не имеет смысла, так
как данных в нем относительно мало (а именно мало различных людей - 500), и даже легковесная модель быстро 
переобучается. Поэтому основная работа будет проведена с big_vggface1 и small_vggface1: с лицами, извлеченными 
из vggface1 на основе bbox и с использованием mtcnn соответственно - в обоих случаях разлиных людей 1000, а 
различия заключаются лишь в общем количестве изображений лиц людей.

In [26]:
big_train_df, big_valid_df, big_test_df = get_train_valid_test(source_df=big_vggface1, 
                                                               separate_feature='person_name')
big_train_df.reset_index(inplace=True, drop=True)
big_valid_df.reset_index(inplace=True, drop=True)
big_test_df.reset_index(inplace=True, drop=True)

print(f"Unique person names in train: {big_train_df['person_name'].nunique()}")
print(f"Unique person names in valid: {big_valid_df['person_name'].nunique()}")
print(f"Unique person names in test: {big_test_df['person_name'].nunique()}")
print(f"Total images in train: {big_train_df.shape[0]}")
print(f"Total images in valid: {big_valid_df.shape[0]}")
print(f"Total images in test: {big_test_df.shape[0]}")

Unique person names in train: 600
Unique person names in valid: 200
Unique person names in test: 200
Total images in train: 73006
Total images in valid: 24832
Total images in test: 24276


In [27]:
small_train_df, small_valid_df, small_test_df = get_train_valid_test(source_df=small_vggface1, 
                                                                     separate_feature='person_name')
small_train_df.reset_index(inplace=True, drop=True)
small_valid_df.reset_index(inplace=True, drop=True)
small_test_df.reset_index(inplace=True, drop=True)

print(f"Unique person names in train: {small_train_df['person_name'].nunique()}")
print(f"Unique person names in valid: {small_valid_df['person_name'].nunique()}")
print(f"Unique person names in test: {small_test_df['person_name'].nunique()}")
print(f"Total images in train: {small_train_df.shape[0]}")
print(f"Total images in valid: {small_valid_df.shape[0]}")
print(f"Total images in test: {small_test_df.shape[0]}")

Unique person names in train: 600
Unique person names in valid: 200
Unique person names in test: 200
Total images in train: 46117
Total images in valid: 15513
Total images in test: 15569


In [28]:
research_data = ResearchDataset(big_train_df, device)
research_data_loader = DataLoader(research_data, batch_size=batch_size, shuffle=True)
big_mean, big_std = get_mean_std(research_data_loader)

print(f'Mean channel values in train: {big_mean}')
print(f'Std channel values in train: {big_std}')

Mean channel values in train: (0.5904295, 0.44956186, 0.38371497)
Std channel values in train: (0.2431854, 0.20771153, 0.19577332)


In [29]:
research_data = ResearchDataset(small_train_df, device)
research_data_loader = DataLoader(research_data, batch_size=batch_size, shuffle=True)
small_mean, small_std = get_mean_std(research_data_loader)

print(f'Mean channel values in train: {small_mean}')
print(f'Std channel values in train: {small_std}')

Mean channel values in train: (0.633451, 0.47640446, 0.4034844)
Std channel values in train: (0.21870759, 0.18738167, 0.17562027)


Различия mean и std в двух наборах данных связаты с тем, что при извлечении лиц через mtcnn выделяются bbox'ы
меньшей площади и содержат меньше информации о фоне

In [30]:
big_train_transform = A.Compose([
    A.HorizontalFlip(p=0.5),
    A.Rotate (limit=20, interpolation=1, border_mode=4, value=None, always_apply=True, p=1),
    A.Normalize(mean=big_mean, std=big_std, max_pixel_value=1.0),
    ToTensorV2(),])

big_valid_test_transform = A.Compose([
    A.Normalize(mean=big_mean, std=big_std, max_pixel_value=1.0),
    ToTensorV2(),])

small_train_transform = A.Compose([
    A.HorizontalFlip(p=0.5),
    A.Rotate (limit=20, interpolation=1, border_mode=4, value=None, always_apply=True, p=1),
    A.Normalize(mean=small_mean, std=small_std, max_pixel_value=1.0),
    ToTensorV2(),])

small_valid_test_transform = A.Compose([
    A.Normalize(mean=small_mean, std=small_std, max_pixel_value=1.0),
    ToTensorV2(),])

In [31]:
big_train_data = CustomDataset(big_train_df, device, big_train_transform)
big_valid_data = CustomDataset(big_valid_df, device, big_valid_test_transform)
big_test_data = CustomDataset(big_test_df, device, big_valid_test_transform)

big_train_data_loader = DataLoader(big_train_data, batch_size=batch_size, shuffle=True)
big_valid_data_loader = DataLoader(big_valid_data, batch_size=batch_size, shuffle=True)
big_test_data_loader = DataLoader(big_test_data, batch_size=batch_size, shuffle=True)

small_train_data = CustomDataset(small_train_df, device, small_train_transform)
small_valid_data = CustomDataset(small_valid_df, device, small_valid_test_transform)
small_test_data = CustomDataset(small_test_df, device, small_valid_test_transform)

small_train_data_loader = DataLoader(small_train_data, batch_size=batch_size, shuffle=True)
small_valid_data_loader = DataLoader(small_valid_data, batch_size=batch_size, shuffle=True)
small_test_data_loader = DataLoader(small_test_data, batch_size=batch_size, shuffle=True)

In [66]:
# model from scratch
#model = Model().to(device)

# transfer learning with mobilenet v3 (imagenet)
model = MobileImagenet().to(device)
model_wrapper = ModelWrapper(model=model)
summary(model_wrapper.model, input_size=(32, 3, 160, 160))

Layer (type:depth-idx)                             Output Shape              Param #
MobileImagenet                                     --                        --
├─MobileNetV3: 1-1                                 [32, 128]                 --
│    └─Sequential: 2-1                             [32, 576, 5, 5]           --
│    │    └─ConvBNActivation: 3-1                  [32, 16, 80, 80]          464
│    │    └─InvertedResidual: 3-2                  [32, 16, 40, 40]          744
│    │    └─InvertedResidual: 3-3                  [32, 24, 20, 20]          3,864
│    │    └─InvertedResidual: 3-4                  [32, 24, 20, 20]          5,416
│    │    └─InvertedResidual: 3-5                  [32, 40, 10, 10]          13,736
│    │    └─InvertedResidual: 3-6                  [32, 40, 10, 10]          57,264
│    │    └─InvertedResidual: 3-7                  [32, 40, 10, 10]          57,264
│    │    └─InvertedResidual: 3-8                  [32, 48, 10, 10]          21,968
│    │    └

In [67]:
#criterion = TripletLoss(margin=margin, device=device)
optimizer = torch.optim.Adam(model_wrapper.parameters(), lr=learning_rate)
criterion = nn.TripletMarginLoss(margin=margin, p=2)
metric = RecognitionMetric()
scheduler = ExponentialLR(optimizer, gamma=0.5)

In [68]:
result_train = model_wrapper.fit(criterion,
             metric,
             optimizer,
             scheduler,
             small_train_data_loader,
             small_valid_data_loader,
             epochs=3)

Train Epoch: 1, Loss: 0.096635, Learning rate: 0.001
Spended time for 50 batches (6400 triplets or 19200 images) : 62.337253 sec
Train Epoch: 1, Loss: 0.064593, Learning rate: 0.001
Spended time for 50 batches (6400 triplets or 19200 images) : 61.905559 sec
Train Epoch: 1, Loss: 0.060135, Learning rate: 0.001
Spended time for 50 batches (6400 triplets or 19200 images) : 61.832221 sec
Train Epoch: 1, Loss: 0.054912, Learning rate: 0.001
Spended time for 50 batches (6400 triplets or 19200 images) : 61.732972 sec
Train Epoch: 1, Loss: 0.051116, Learning rate: 0.001
Spended time for 50 batches (6400 triplets or 19200 images) : 61.713237 sec
Train Epoch: 1, Loss: 0.051301, Learning rate: 0.001
Spended time for 50 batches (6400 triplets or 19200 images) : 61.858773 sec
Train Epoch: 1, Loss: 0.044811, Learning rate: 0.001
Spended time for 50 batches (6400 triplets or 19200 images) : 61.821240 sec
Epoch 1, train loss: 0.060029, valid loss: 0.046099, valid anc pos dist: 1.113252, valid anc neg 

In [69]:
result_train

{'epoch_train_losses': [0.06002866575324139,
  0.03498224838742589,
  0.02612304136703962],
 'epoch_valid_losses': [0.04609856699578098,
  0.030498537471609525,
  0.025793155838475853],
 'epoch_valid_metrics': [array([1.11325152, 2.01408657]),
  array([1.04788802, 2.07806055]),
  array([1.00210186, 1.99182223])]}

In [70]:
result_valid = model_wrapper.valid(criterion, metric, small_valid_data_loader)
result_valid

{'valid_loss': 0.027044421481731973,
 'valid_metric': array([1.00375848, 1.98308984])}

In [71]:
result_test = model_wrapper.valid(criterion, metric, small_test_data_loader)
result_test

{'valid_loss': 0.02837662829361001,
 'valid_metric': array([1.01532354, 1.96890503])}

In [156]:
model_wrapper.save(path_to_save='./mobile_emb128_0.26test_margin2_122000own')

In [47]:
model_wrapper = ModelWrapper(model=model)
model_wrapper.load(path_to_model = './mobile_emb128_0.26test_margin2_122000own')

## Выводы

1. За 3 эпохи на VGGface2 (27000 лиц изначально больших 150х150) выходим на 0.25 на тесте. За 3 эпохи на VGGface2 (57000 лиц изначально больших 100х100) выходим на 0.25 на 
   тесте. Модель переобучается из-за малого количаства различных людей (500).

2. За 3 эпохи на VGGface1 (15000) на valid и test быстро уходим в переобучение, так как данных мало.

3. За 4 эпохи на VGGface1 (77000 лиц выделенных mtcnn, изначально больших 100х100) выходим на 0.17 на тесте.
   Прогресс за счет увеличения данных (количества различных человек и общее количетсво изображений).

4. Увеличение размерности эмбэддинга со 128 до 256 приводило к ухудшению качества.
    
5. Лучший результат при эмбэддинг = 128, VGGface1 (77000 лиц выделенных mtcnn изначально больших 100х100), 
   0.14 на test за 3 эпохи для margin = 1.

6. Увеличение margin вдвое приводит к увеличению вдвое минимального лосса, но также вдвое увеличивает
   расстояние между эмбэддингами для различных людей. Этот параметр лучше калибровать непосредственно в 
   приложении по распознаванию.

7. На малом (77000) и большом (122000) VGGface1 минимальный loss на test равен примерно 0.14-0.15 для margin = 1
   и 0.26-0.28 для margin = 2 на 4-5 эпохах. Дальше - переобучение. Для большего датасета качество немного лучше
   (на 0.01-0.02). Большее влияние на точность оказывает увеличение датасета за счет добавления новых людей
   (сравниваем с VGGface2), чем за счет добавления новых изображений уже существующих людей.
    
8. При обучении собственной модели ошибка уменьшается в несколько раз медленнее (по эпохам) при большем 
   количестве параметров, и минимальное значение метрики получается в несколько раз большим, чем при работе 
   с готовой архитектурой (непредобученной), поэтому целесообразно использовать готовые архитектуры, 
   так как они более оптимальны. 
    
9. Готовая архитектура с предтренированными весами обучается быстрее, поэтому нужно меньше эпох для
   достижения нужной метрики, следовательно, переобучение начнет проявляться при более низком значении 
   метрики.
       
10. Чем меньше замороженных весов в предобученной модели, тем быстрее модель обучается, поэтому оптимально
    не замораживать никакие веса.

In [None]:
# Примеры кода заморозкой и разморозкой весов модели
# Заморозить все веса модели
for child in model_wrapper.model.children():
    for param in child.parameters():
        param.requires_grad = False

# Разморозить веса блока модели
for child in model_wrapper.model.backbone.features[12].children():
    for param in child.parameters():
        param.requires_grad = True

# Разморозить предпоследний полносвязный слой
for param in model_wrapper.model.backbone.classifier[0].parameters():
    param.requires_grad = True

# Разморозить последний полносвязный слой
for param in model_wrapper.model.backbone.classifier[3].parameters():
    param.requires_grad = True

# Разморозить все веса классификатора
for child in model_wrapper.model.backbone.classifier.children():
    for param in child.parameters():
        param.requires_grad = True