In [None]:
pip install pytorch_metric_learning

In [None]:
pip install faiss-gpu

In [None]:
pip install pytorch_lightning

In [None]:
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd
import os
import random
import json
import scipy.io
import time
import glob
import shutil
import yaml
import xml.etree.ElementTree as ET
from concurrent.futures import ThreadPoolExecutor

import torch
import torchvision
from torch.utils.data import DataLoader, Dataset, Subset, ConcatDataset, random_split
import torch.nn as nn
from torch.optim import Adadelta, Adam
from torch.optim.lr_scheduler import StepLR
from torch.utils.tensorboard import summary, writer, SummaryWriter
from torchvision.datasets import ImageFolder
import torchmetrics
from IPython.display import display
from ipywidgets import IntProgress


from torchvision import transforms
from torchvision.models import resnet18
import torch.functional as F

from PIL import Image 
from tqdm.auto import tqdm
import cv2
from sklearn.model_selection import train_test_split

from pytorch_metric_learning.losses import TripletMarginLoss
from pytorch_metric_learning.miners import BatchEasyHardMiner, TripletMarginMiner
from pytorch_metric_learning import distances, losses, miners, reducers, testers
from pytorch_metric_learning.utils.accuracy_calculator import AccuracyCalculator
import pytorch_lightning as pl
import faiss
from pytorch_metric_learning.utils.inference import CustomKNN
from pytorch_lightning.callbacks import ModelCheckpoint
from pytorch_lightning.callbacks.early_stopping import EarlyStopping
from pytorch_lightning.callbacks import LearningRateMonitor
from pytorch_lightning.loggers import TensorBoardLogger, CSVLogger

In [None]:
progress_bar = IntProgress(min=0, max=100, description='Обучение модели:')
display(progress_bar)

Seed

In [None]:
SEED = 42

torch.manual_seed(SEED)
torch.cuda.manual_seed(SEED)
torch.cuda.manual_seed_all(SEED)
random.seed(SEED)
np.random.seed(SEED)
torch.backends.cudnn.benchmark = False
torch.backends.cudnn.deterministic = True

Вспомогательная функция для отрисовки изображений

In [None]:
def plot_image(image, label):
	'''
	Вспомогательная функция для plotting'а тензора torch
	
	Параметры:
	-image: TorchTensor
	-label: str для написания заголовка
	'''
	img = image.squeeze(0).permute(1, 2, 0).numpy()
	fig = plt.figure(figsize = (4, 4))
	plt.imshow(img)
	plt.title(label)
	plt.tight_layout()
	plt.axis('off')
    

Дополнительные функции

In [None]:
# Базовый transform
base_transform = transforms.Compose(
    [
        transforms.Resize((300, 300)),
        transforms.ToTensor(),
        transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
    ]
)

mean=[0.485, 0.456, 0.406]
std=[0.229, 0.224, 0.225]

# Денормализация изображения для вывода через plt.imhsow()
def unnormalize(tensor, mean=mean, std=std):
    for t, m, s in zip(tensor, mean, std):
        t.mul_(s).add_(m)
    return tensor


Создание каталогов

In [None]:
DIR_PATH = '/kaggle/input/logodet3k/LogoDet-3K'

logo_dataset = pd.DataFrame(glob.glob(f"{DIR_PATH}/*/*/*"), columns=["file_path"])
logo_dataset["extension"] = logo_dataset["file_path"].apply(lambda x: x.split(".")[-1])
logo_dataset["logo_category"] = logo_dataset["file_path"].apply(lambda x: x.split(os.sep)[-3])
logo_dataset["logo_name"] = logo_dataset["file_path"].apply(lambda x: x.split(os.sep)[-2])
logo_dataset.head()

In [None]:
def filter_dataset(logo_dataset:pd.DataFrame = logo_dataset, 
                  q:float = 0.05):
    '''
    Функция для фильтрации (сокращения) исходного датасета.
    В этом есть смысл ввиду предположения избыточности полного набора данных 
    при решении задачи Object Detection c одним классом (logo)

    Параметры:
    -logo_dataset: исходный датасет с логотипами (pd.DataFrame, default: logo_dataset)
    -q: квантиль для подрезки распределения (убрать небольшое число логотипов) (float, default: 0.05)
    '''

    logo_dataset = logo_dataset[logo_dataset['logo_category'] != 'Medical']
    assert logo_dataset['extension'].value_counts()['jpg'] == logo_dataset['extension'].value_counts()['xml']
    logo_dataset = logo_dataset[logo_dataset['logo_category'] != 'Sports']
    assert logo_dataset['extension'].value_counts()['jpg'] == logo_dataset['extension'].value_counts()['xml']
    value_counts = logo_dataset['logo_name'].value_counts()
    threshold = np.quantile(value_counts.values, q = q)
    logo_names_to_remove = value_counts[value_counts < threshold].index
    logo_dataset = logo_dataset[~logo_dataset['logo_name'].isin(logo_names_to_remove)]
    assert logo_dataset['extension'].value_counts()['jpg'] == logo_dataset['extension'].value_counts()['xml']

    return logo_dataset
    
logo_dataset_copy = logo_dataset.copy()
logo_dataset_copy = filter_dataset(logo_dataset = logo_dataset_copy, 
                                  q = 0.25)
print(f"Dataset shape: {logo_dataset_copy.shape}")
print(f"No. of .jpg files {logo_dataset_copy['extension'].value_counts()['jpg']}")
print(f"No. of .xml files {logo_dataset_copy['extension'].value_counts()['xml']}")
logo_dataset_copy.head()
    

In [None]:
logo_dataset_copy["logo_name"].value_counts().reset_index().plot(
    x="logo_name", y="count", figsize=(10,5), title="Distribution of logo img counts")

plt.show()

Вспомогательные словари формата ```{'logo_name': label} и {label: 'logo_name'}``` 

In [None]:
classname2idx = {logo_name: idx for idx, logo_name in enumerate(sorted(logo_dataset_copy["logo_name"].unique()))}
idx2classname = {idx: logo_name for logo_name, idx in classname2idx.items()}

In [None]:
logo_dataset_copy['is_train'] = True
train_logo_dataset, test_logo_dataset = train_test_split(logo_dataset_copy, test_size=0.25, random_state=SEED)
test_logo_dataset['is_train'] = False
final_logo_dataset = pd.concat([train_logo_dataset, test_logo_dataset])
final_logo_dataset.reset_index(drop=True, inplace=True)
print(f"Logo dataset shape: {final_logo_dataset.shape}")
final_logo_dataset.head()

Необходимо закодировать метки для обучения

In [None]:
final_logo_dataset['label'] = final_logo_dataset['logo_name'].map(classname2idx)
print(f"Number of classes: {len(final_logo_dataset['label'].unique())}")
print(f"Final logo dataset shape: {final_logo_dataset.shape[0]}")
final_logo_dataset.head()

Элементы YOLO preprocessing могут быть полезны для получения общего каталога

In [None]:
example_xml_file = '/kaggle/input/logodet3k/LogoDet-3K/Clothes/2xist/1.xml'

def convert_to_YOLO_format(xml_path:str = example_xml_file, output_dir = None,
                          name2class_dict:dict = classname2idx):
    '''
    Функция по заданному файлу (.xml) создает файл c аннотациями для YOLO

    Параметры:
    -xml_path: путь к файлу формата .xml (str, default: example_xml_file)
    -output_dir: путь для записи YOLO аннотаций (default: None)
    -name2class_dict: словарь для получения классов-меток (dict, default: classname2idx)
    '''
    
    yolo_lines = []
    tree = ET.parse(xml_path)
    root = tree.getroot()
    image_width = float(root.find("size/width").text)
    image_height = float(root.find("size/height").text)
    depth = float(root.find("size/depth").text)

    for obj in root.findall('object'):
        class_name = obj.find('name').text
        bbox = obj.find('bndbox')
        xmin = float(bbox.find('xmin').text)
        ymin = float(bbox.find('ymin').text)
        xmax = float(bbox.find('xmax').text)
        ymax = float(bbox.find('ymax').text)
        x_center = (xmin + xmax) / 2 / image_width
        y_center = (ymin + ymax) / 2 / image_height
        width = (xmax - xmin) / image_width
        height = (ymax - ymin) / image_height
        class_index = classname2idx.get(class_name, 0)
        yolo_line = f"{class_index} {x_center} {y_center} {width} {height}"
        yolo_lines.append(yolo_line)
        
    if output_dir is not None:
        with open(output_dir, "w") as f:
            f.write("\n".join(yolo_lines))
            
    return yolo_lines

example = convert_to_YOLO_format(name2class_dict = {})
print(example)


Функция для построения рабочей директории

In [None]:
dataset_dst_dir = "/kaggle/working/logodet3k"
if os.path.exists(dataset_dst_dir):
    shutil.rmtree(dataset_dst_dir)
os.makedirs(f"{dataset_dst_dir}/train", exist_ok=True)
os.makedirs(f"{dataset_dst_dir}/val", exist_ok=True)

In [None]:
def copy_to_working_dir(sample, 
                    classnam2idx:dict = classname2idx):

    '''
    Функция для создания каталога обучения YOLO

    Параметры:
    -sample: строка данных из датафрейма (logo_dataset)
    -classname2idx: словарь меток (dict, default: classname2idx)
    '''
    train_folder = "train" if sample["is_train"] else "val"
    input_file = sample["file_path"]
    output_file = os.path.join(dataset_dst_dir, train_folder, "__".join(sample["file_path"].split(os.sep)[-3:]))
    
    # print(train_folder)
    # print(input_xml_file)
    # print(output_xml_file)
    
    if not os.path.exists(output_file):
        shutil.copy(input_file, output_file)
    if not os.path.exists(output_file.replace(".jpg", ".txt")):
        convert_to_YOLO_format(input_file.replace(".jpg", ".xml"), 
                            output_file.replace(".jpg", ".txt"), classname2idx)
    return True

first_row = final_logo_dataset.iloc[1][:]
# print(first_row)
copy_to_working_dir(first_row)


In [None]:
def create_yolo_data(logo_dataset:pd.DataFrame = final_logo_dataset):
    '''
    Функция создаст каталог для обучения модели YOLO

    Параметры:
    -logo_dataset: исходный датасет (pd.DataFrame, default: final_logo_dataset)
    '''

    copy_to_working_dir_results = []

    with ThreadPoolExecutor() as e:
        for _, row in tqdm(logo_dataset.iterrows()):
            status = e.submit(copy_to_working_dir, dict(row))
            copy_to_working_dir_results.append(status)

    copy_to_working_results = logo_dataset_copy.apply(lambda x: copy_to_working_dir(x), axis=1)
    print(copy_to_working_results.sum(), logo_dataset_copy.shape[0])

create_yolo_data()

Далее необходимо создать класс Dataset

In [None]:
from PIL import Image

class LogoDataset(Dataset):

        '''
        Простой класс датасета для получения изображений и меток. 

        Функции:
        def __init__(self, ...):
        Параметры:
        -root: путь к корню каталога (str, default: '/kaggle/working/logodet3k/train')
        -transform: torch transform (default: None)

        def __len__(self):
        Возвращает длину датасета

        def __getitem__(self, idx):
        Возвращает изображение-метку по индексу в датасете

        '''

        def __init__(self, root:str = '/kaggle/working/logodet3k/train', transform = None):
            self.root_dir = root
            self.transform = transform
            self.image_files = [f for f in os.listdir(root) if f.endswith('.jpg')]

        def __len__(self):
            return len(self.image_files)
            
        def __getitem__(self, idx):

            img_name = self.image_files[idx]
            img_path = os.path.join(self.root_dir, img_name)
            txt_path = os.path.join(self.root_dir, img_name.replace('.jpg', '.txt'))
            
            image = Image.open(img_path).convert('RGB')

            with open(txt_path, 'r') as file:
                line = file.readline().strip()
                class_index, x_center, y_center, width, height = map(float, line.split())

            img_width, img_height = image.size
            x_center = int(x_center * img_width)
            y_center = int(y_center * img_height)
            width = int(width * img_width)
            height = int(height * img_height)

            left = int(x_center - width / 2)
            top = int(y_center - height / 2)
            right = int(x_center + width / 2)
            bottom = int(y_center + height / 2)

            cropped_image = image.crop((left, top, right, bottom))

            if self.transform:
                cropped_image = self.transform(cropped_image)
            
            return cropped_image, int(class_index)


Получение датасетов

In [None]:
def get_datasets(train_path:str = '/kaggle/working/logodet3k/train',
                val_path:str = '/kaggle/working/logodet3k/val', 
                idx_to_visualize = None,
                transform = None,
                label2name:dict = idx2classname,
                dataset_type = LogoDataset):
    '''
    Функция для получения train и test датасетов
    
    Параметры:
    -train_path: путь к тренировочному набору файлов (str, default: '/kaggle/working/logodet3k/train')
    -val_path: путь к валидационному набору файлов (str, default: '/kaggle/working/logodet3k/val')
    -idx_to_visualize: индекс датасета для визуализации (default: None)
    -transform: torch transform (default: None)
    -label2name: словарь метка <-> название (dict, default: idx2classname)
    -dataset_type: тип датасета (default: LogoDataset)
    '''

    logo_train_dataset = dataset_type(train_path, transform)
    logo_val_dataset = dataset_type(val_path, transform)

    print(f"Размер тренировочного датасета: {len(logo_train_dataset)}")
    print(f"Размер валидационного датасета: {len(logo_val_dataset)}")

    if idx_to_visualize is not None:
        
        image, label = logo_train_dataset[idx_to_visualize]
        name = label2name[label]
        plot_image(image, name)

    return logo_train_dataset, logo_val_dataset

train_logo_dataset, val_logo_dataset = get_datasets(idx_to_visualize = 0,
                                                    transform = base_transform)
    

Работает!

Проверки размерностей

In [None]:
train_logo_dataset[0][0].size(), train_logo_dataset[1][0].size(), train_logo_dataset[2][0].size()

Создание dataloader'ов

In [None]:
def get_dataloaders(train_dataset = train_logo_dataset,
                   val_dataset = val_logo_dataset,
                   batch_size:int = 64,
                   shuffle_train:bool = True,
                   shuffle_test:bool = False):
    '''
    Функция получения тренировочного и тестового даталоадеров

    Параметры:
    -train_dataset: тренировочный датасет (default: train_logo_dataset)
    -val_dataset: валидационный датасет (default: val_logo_dataset)
    -batch_size: размер батча (int, default: 64)
    -shuffle_train: флаг для перемешивания train (bool, default: True)
    -shuffle_test: флаг для перемешивания test (bool, default: False)

    Возвращает:
    -Кортеж итераторов (train_loader, val_loader)
    '''

    train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=shuffle_train)
    val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=shuffle_test)

    return train_loader, val_loader

train_loader, val_loader = get_dataloaders(batch_size = 312)


Проверка Даталоадеров

In [None]:
# for images, labels in tqdm(train_loader):
    # print(len(images))
    # print(labels)
    # break
    # pass
# print("Успешно!")

Теперь нужно создать класс ```pl.LightningModule```

In [None]:
class modelArcFaceLoss(pl.LightningModule):

	'''
	Класс модели с функцией потерь ArcFaceLoss (наследует методы из pl.LightningModule)
	'''

	def __init__(
			self,
			model=resnet18(pretrained=True), # базовая модель resnet18
			embedding_size=128,
			distance_metric=distances.CosineSimilarity(),
			reducer=reducers.ThresholdReducer(low=0),
			loss_fn=losses.ArcFaceLoss,  # Вот она ArcFace из pytorch_metric_learning
			arcface_margin=0.5,  # margin гиперпараметр
			arcface_scale=64,  #scale гиперпараметр
			optimizer=Adam, 
			optimizer_params={'lr': 0.001, 'weight_decay': 0.0001},
			class_dict=idx2classname,
			min_lr=1e-5,
			step_size=8,
			gamma=0.5
			):
		
		'''
		Конуструктор объекта класс def __init__(self, ...)

		Парамеры:
		-model: Базовая модель (default: resnet18(pretrained = True))
		-embedding_size: Размер эмбеддингов после сверточных слоев для решения задачи Metric Learning (default = 128)
		-distance_metric: Метрика подсчета расстояния между объектами (default: CosineSimilarity())
		-reduce: Функция редукции потерь, которая используется для фильтрации значений loss на основе порогового значения.
		Например, ThresholdReducer(low=0) игнорирует все значения потерь ниже 0.
		Это может повысить устойчивость к шуму в данных (default: ThresholdReducer(low=0))
		-loss_fn: функция потерь (default: ArcFaceLoss)
		-arcface_margin: Смещение угла в формуле функции потерь (default: 0.5)
		-arcface_scale: Масшатабирующий параметр в формуле функции потерь (default: 64)
		-optimizer: оптимизатор (default: Adam)
		-optimizer_params: параметры оптимизатора
		-class_dict: словарь Dict label->idx2classname
		-min_lr: минимальный шаг сходимости (тот предел, до которого уменьшается lr в процессе обучения)
		-step_size: число эпох, через которое экпоненциально уменьшаем шаг сходимости
		-gamma: уменьшающий множитель 

		Инициализирует всё необходимое

		'''

		super(modelArcFaceLoss, self).__init__()

		# Модель и её параметры (Архитектура + Функция потерь + Оптимизатор)
		self.backbone = model,
		self.backbone = self.backbone[0]
		self.embedding_size = embedding_size
		self.backbone.fc = nn.Linear(self.backbone.fc.in_features, self.embedding_size)
		self.fc = nn.Linear(self.embedding_size, self.embedding_size)
		self.distance = distance_metric
		self.reducer = reducer
		self.arcface_margin = arcface_margin
		self.arcface_scale = arcface_scale
		self.loss_fn = loss_fn(
			num_classes=len(class_dict),
			embedding_size=self.embedding_size,
			margin=self.arcface_margin,
			scale=self.arcface_scale
		)

		self.optimizer_params = optimizer_params
		self.optimizer = optimizer(self.parameters(), **self.optimizer_params)
		self.class_dict = class_dict

		# Если мы хотим еще параллельно решать задачу классификации на основе привычной CrossEntropy
		self.classifier_head = nn.Sequential(
			nn.ReLU(),
			nn.Linear(in_features=self.embedding_size, out_features=len(self.class_dict))
		)
		self.classif_loss = torch.nn.CrossEntropyLoss()
		self.save_hyperparameters()
		self.gamma = gamma
		self.step_size = step_size
		self.scheduler = StepLR(self.optimizer, step_size=self.step_size, gamma=self.gamma)
		self.min_lr = min_lr

		# Эмбеддинги для подсчета метрик в конце валидации
		self.val_embeddings = []
		self.val_labels = []

	def forward(self, input_x):
		'''
		forward модели после подачи batch_size:

		Параметры:
		-self
		-input_x: входой пакет картинок

		Возвращает эмбеддинг картинки
		'''

		# Прогон через CNN
		cnn_output = self.backbone(input_x)
		# Прогон через линейные слои
		embedding = self.fc(cnn_output)
		return embedding

	def training_step(self, batch, batch_idx):
			'''
			Часть train логики: подаем батч, разбиваем на (images, labels)
			Возвращем loss, по которому будет считаться градиент
			'''

			images, labels = batch
			embeddings = self(images)

			# ArcFace loss
			loss_arcface = self.loss_fn(embeddings, labels)
			final_loss = loss_arcface

			self.log('train_loss', final_loss, sync_dist=True)
			return final_loss

	def on_train_start(self):
		self.train()

	def validation_step(self, batch, batch_idx):
			
			'''
			Логика на валидации: подаем батч, считаем loss на валидации и записываем в tensor_board
			И добавляем эмбеддинги и метки для подсчёта метрик
			'''

			images, labels = batch
			embeddings = self(images)

			loss_arcface = self.loss_fn(embeddings, labels)

			final_loss = loss_arcface
			self.log('validation_loss', final_loss, sync_dist=True)

			self.val_embeddings.append(embeddings)
			self.val_labels.append(labels)

	def on_validation_epoch_end(self):
			
			'''
			Логика в конце валидации: считает ключевую метрики на валидации, а именно precision@1:
            
			-precision@1
			-Обнуляет массивы эмбеддингов и меток в конце
			'''

			all_embeddings = torch.cat(self.val_embeddings)
			all_labels = torch.cat(self.val_labels)

			accuracy_calculator = AccuracyCalculator(include=("precision_at_1",), k=1, knn_func=CustomKNN(
				distances.CosineSimilarity(), batch_size=64))

			metrics = accuracy_calculator.get_accuracy(all_embeddings, all_labels)
			precision_at_1 = metrics["precision_at_1"]
			self.log('precision_at_1_epoch', precision_at_1, sync_dist=True)

			self.val_embeddings = []
			self.val_labels = []

	def on_validation_start(self):
			self.eval()

	def configure_optimizers(self):
		'''
		Объявление оптимизатора и его фичей
		'''
		
		return {
			'optimizer': self.optimizer,
			'lr_scheduler': {
				'scheduler': self.scheduler,
				'interval': 'epoch',
				'frequency': 1,
				'reduce_on_plateau': False,
				'monitor': 'validation_loss',
			}
		}

	def lr_scheduler_step(self, scheduler, metric):
		'''
		Обновление шага сходимости
		'''

		scheduler.step()
		self._adjust_learning_rate()

	def _adjust_learning_rate(self):
		'''
		Проверка достижения предела learning_rate (self.min_lr)
		'''
		
		for param_group in self.optimizer.param_groups:
			param_group['lr'] = max(param_group['lr'], self.min_lr)



Инициализация модели и её гиперпараметров

In [None]:
'''
Блок инициализации модели и её фичей
'''

'''Объявляем device (на kaggle 'gpu')'''
device = 'gpu' if torch.cuda.is_available() else 'cpu'
print(device)

'''Инициализируем модель. Через 15 (step_size = 15) эпох уменьшаем learning_rate'''
model = modelArcFaceLoss(step_size=15)
pl_model = model

'''
Полезные утилиты:

-early_stopping: обрываем обучение, если loss на валидации не падает в течение 5 эпох
-lr_monitor: мониторинг шага сходимости (чтобы его тоже логировать в TensorBoard)
'''
early_stopping = EarlyStopping(monitor="validation_loss", mode="min", patience=5)
lr_monitor = LearningRateMonitor(logging_interval='step')


'''
Логирование по качеству в TensorBoard:

-checkpoint_callback_1: сохранять Топ-3 по лоссу на валидации
-checkpoint_callback_2: сохранять Топ-2 по метрике precision_at_1
'''
tensorboard_logger = TensorBoardLogger("tb_logs", name="cars196_model_arcface_lr_scheduler_70")
checkpoint_callback_1 = ModelCheckpoint(
    monitor='validation_loss',  
    mode='min', 
    save_top_k=3, 
    filename='best-checkpoint-arcfaceloss-{epoch:02d}-{validation_loss:.2f}'  
)

checkpoint_callback_2 = ModelCheckpoint(
    monitor='precision_at_1_epoch',  
    mode='max', 
    save_top_k=2,  
    filename='best-precision-arcfaceloss-{epoch:02d}-{precision_at_1_epoch:.2f}'  
)

'''
PyTorchLightning Module:

Ключевые параметры:
-max_epochs = 30: Максимум 30 эпох будем обучать
-min_epochs = 15: Минимум 15 эпох будем обучать
'''

trainer = pl.Trainer(
    max_epochs=30,
    min_epochs = 15,
    accelerator=device,
    devices=1,
    logger=tensorboard_logger,
    callbacks=[checkpoint_callback_1, checkpoint_callback_2, early_stopping, lr_monitor],
    log_every_n_steps=1,  
    enable_progress_bar=True  
)



Обучение

In [None]:
'''Блок обучения'''
trainer.fit(pl_model, train_loader, val_loader)