# Metric Learning

In [None]:
!pip install pytorch-metric-learning

In [None]:
!pip install faiss-gpu

Для обучения установим Lightning

In [None]:
!pip install lightning

# Task
Задачу с Kaggle. Цель: обучить модель отличать подлиные подписи:

[kaggle signature verification dataset](https://www.kaggle.com/robinreni/signature-verification-dataset)



<img src ="https://ml.gan4x4.ru/msu/dep-1.9/Exercises/EX11/signature_verification_dataset.png" width="600">

Датасет состоит из набора сканов подписей разложенным по папкам


> id1

> id1_forg

> id2

> id2_forg

> ...


в папке id1 содержатся сканы полинных подписей одного человека. В папке id1_forg содержатся сканы поддельных подписей того же человека.

В папках с перфиксом id2, id2_forg - настоящие и поддельные подписи другого человека и.т.д.

# Load the Data :

Фрагмент датасета достаточный для выполнения задания доступен по ссылке::  http://edunet.kea.su/repo/src/L11_Transfer_learning/sign_mini.zip


In [None]:
!wget https://ml.gan4x4.ru/msu/dep-1.9/datasets/sign_mini.zip
!unzip sign_mini.zip


### Create standart dataset
Класс датасета.

Для использования TripletLoss или CosineEmbeddingLoss
нам нужно получать тройки. Для ArcFace или CE with SoftMax нет.

In [None]:
from torchvision.datasets import ImageFolder
from torchvision import transforms

transform = transforms.Compose(
    [
        transforms.Resize((96, 96)),
        transforms.Grayscale(),
        transforms.ToTensor(),
        transforms.Normalize(0.9409, 0.1078),
    ]
)


train_dataset = ImageFolder("sign_data_mini/train", transform=transform)
val_dataset = ImageFolder("sign_data_mini/test", transform=transform)
print(train_dataset)
print("Classes count", len(train_dataset.classes))

Изображения

In [None]:
from torch.utils.data import DataLoader
import torch
from torchvision import utils
import matplotlib.pyplot as plt

train_dataloader = DataLoader(train_dataset, batch_size=32, shuffle=True, num_workers=2)
val_dataloader = DataLoader(val_dataset, batch_size=32, shuffle=False, num_workers=2)
dataiter = iter(val_dataloader)
batch = next(dataiter)  # img1, label
images, labels = batch

grid = utils.make_grid(images, nrow=8)

plt.axis("off")
plt.rcParams["figure.figsize"] = (20, 15)
plt.imshow(grid.permute(1, 2, 0).numpy())
plt.show()

Увеличим размер batch

In [None]:
train_dataloader = DataLoader(
    train_dataset, batch_size=128, shuffle=True, num_workers=2
)

# Model

In [None]:
from torchvision.models import resnet18
from torch import nn


def get_model():
    model = resnet18()
    model.conv1 = nn.Conv2d(1, 64, kernel_size=5, stride=2, padding=2)
    model.fc = nn.Linear(512, 64)
    # if return_embedding:
    #  model.fc = nn.Identity()
    return model

In [None]:
from torchsummary import summary

model = get_model()
# print(model)
summary(model, (1, 96, 96), device="cpu")

# ArcFace

##Lightning module

Все отличие от классического цикла обучени состоит в том что вместо меток классов модель возвращает эмбеддинг

In [None]:
class PMLMetricWrapper:
    embeddings = None
    labels = None

    def __init__(self):
        self.calc = AccuracyCalculator()
        self.reset()

    def compute(self):
        accuracies = self.calc.get_accuracy(self.embeddings, self.labels)
        return accuracies

    def update(self, embeddings, labels):
        self.labels = torch.cat((self.labels, labels.detach().cpu()), dim=0)
        self.embeddings = torch.cat((self.embeddings, embeddings.detach().cpu()))

    def reset(self):
        self.embeddings = torch.empty((0, 64))
        self.labels = torch.empty((0))

In [None]:
import lightning as L
from pytorch_metric_learning.utils.accuracy_calculator import AccuracyCalculator


class Lit(L.LightningModule):
    lr = 0.001

    def __init__(self, model, criterion):
        super().__init__()
        self.model = model
        self.criterion = criterion
        self.metrics = PMLMetricWrapper()

    def configure_optimizers(self):
        optimizer = torch.optim.AdamW(self.parameters(), lr=self.lr)
        return optimizer

    def training_step(self, batch, batch_idx):
        x, y = batch
        embeddings = self.model(x)
        loss = self.criterion(embeddings, y)
        self.log("loss", loss, prog_bar=True)
        return loss

    def on_validation_epoch_start(self):
        self.metrics.reset()

    def validation_step(self, batch, batch_idx):
        x, y = batch
        embeddings = self.model(x)
        self.metrics.update(embeddings, y)

    def on_validation_epoch_end(self):
        metrics = self.metrics.compute()
        self.log("mAP", metrics["mean_average_precision"], prog_bar=True)

        # return metrics

## Loss init
Инициализируем  [ArcFaceLoss](https://kevinmusgrave.github.io/pytorch-metric-learning/losses/#arcfaceloss)

Для начала выясним длину эмбединга



In [None]:
from pytorch_metric_learning.losses import ArcFaceLoss

model = get_model()

out = model(torch.randn(1, 1, 96, 96))  # [1,64]
print(out.shape)

И создадим объект для loss

In [None]:
# to do it without Lightning you must add .to(device) for loss object
criterion = ArcFaceLoss(
    num_classes=len(train_dataset.classes),  # because of linear layer inside
    embedding_size=out.shape[1],
)

for name, p in criterion.named_parameters():
    print(name, p.shape)

##Train

Tensorboard

In [None]:
%load_ext tensorboard
%tensorboard --logdir lightning_logs

Code for train

In [None]:
from lightning.pytorch.loggers import TensorBoardLogger

model = get_model()
lit_model = Lit(model, criterion)
logger = TensorBoardLogger("lightning_logs", name="ArcFace")
trainer = L.Trainer(max_epochs=20, logger=logger, log_every_n_steps=5)
trainer.fit(
    model=lit_model, train_dataloaders=train_dataloader, val_dataloaders=val_dataloader
)

In [None]:
# outliers, _ = loss_func.get_outliers(train_embeddings, train_labels.squeeze(1))
# print(f"There are {len(outliers)} outliers")

In [None]:
print(lit_model.metrics.compute())

# Check on forged

Нам интересно оценить как модель будет отличать именно поддельные подписи. Для того что бы получить все пары подписей придется написать свой код

Создание тренировочного и тестового датасета и даталоадера.
В отличие варианта для визуализации к трансформациям добавилась нормализация.

Этот блок можно модифицировать не требуется, но допускается.



In [None]:
import os
import random
from glob import glob
from itertools import product

from PIL import Image
from torch.utils.data import Dataset


class PairDataset(Dataset):
    def __init__(self, dir=None, transform=None):
        self.dir = dir
        self.transform = transform
        self.classes = {1: "Orginial", -1: "Forged"}  # Change if need
        self.data = self.get_pairs()
        self.targets = self.get_targets()
        self.cache = {}

    def get_pairs(self):
        pairs = []  # to store [orig, fake] or [orig,orig] pairs
        persons = self.load_data()
        for key in persons:
            all_pairs = product(
                persons[key]["orig"], persons[key]["orig"] + persons[key]["forg"]
            )
            # remove pairs with themselve
            without_self_comparsion = list(filter(lambda x: x[0] != x[1], all_pairs))
            pairs += without_self_comparsion
        return pairs

    def load_data(self):
        all_paths = glob(f"{self.dir}/**/*")  # get all files path
        persons = {}
        # Group files by ID and type
        for path in all_paths:
            id, tp = PairDataset.parse(path)
            if not id in persons:
                persons[id] = {"orig": [], "forg": []}
            persons[id][tp].append(path)
        return persons

    def get_targets(self):
        targets = []
        for pair in self.data:
            _, tp = PairDataset.parse(pair[1])
            label = -1 if tp == "forg" else 1
            targets.append(label)
        return targets

    @staticmethod
    def parse(path):
        folder = path.split(os.sep)[-2]
        id = folder.split("_")[0]
        tp = "forg" if "forg" in path else "orig"
        return id, tp

    def __getitem__(self, index):
        image0_path, image1_path = self.data[index]
        label = self.targets[index]

        # Loading the images
        img0 = self.load(image0_path)
        img1 = self.load(image1_path)

        return img0, img1, label

    def load(self, path):
        if path in self.cache:
            img = self.cache[path]
        else:
            img = Image.open(path)  # .convert("L")
            self.cache[path] = img
        # Apply image transformations
        if self.transform is not None:
            img = self.transform(img)
        return img

    def __len__(self):
        return len(self.data)

In [None]:
test_pair_dataset = PairDataset("/content/sign_data_mini/test", transform)

Создадим экземпляр датасета и убедимся что данные загружаются

In [None]:
from torch.utils.data import DataLoader
import torch
from torchvision import transforms, utils
import matplotlib.pyplot as plt

# Viewing the sample of images to check whether its loading properly
print('"1" - подписи настоящие, "-1" - подделка')


test_pair_dataloader = DataLoader(test_pair_dataset, batch_size=8, shuffle=False)
dataiter = iter(test_pair_dataloader)

example_batch = next(dataiter)  # img1, img2, label
# display the data
concatenated = torch.cat((example_batch[0], example_batch[1]), 0)
grid = utils.make_grid(concatenated)

plt.axis("off")
plt.rcParams["figure.figsize"] = (40, 20)
plt.imshow(grid.permute(1, 2, 0).numpy())
plt.show()

print(example_batch[2].numpy())

## Accuracy

Вспомогательный метод для подсчета accuracy с использованием порога и косинусного расстояния.

 Пара подписей считаются принадлежащими одному человеку, если косинусное расстояние между ними меньше порога по умолчанию равного 0.5. Иначе одна из подписей считается подделкой.



In [None]:
from sklearn.metrics import accuracy_score

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")


def validation(model, dl, threshold=0.5):
    model.eval()
    model.to(device)
    predicts = []
    gt = []
    sim = nn.CosineSimilarity()
    for i, data in enumerate(dl):
        x0, x1, label = data
        out1 = model(x0.to(device))
        out2 = model(x1.to(device))
        dist = sim(out1, out2)
        gt.append(label.flatten())
        dist[dist < threshold] = -1  # Forged
        dist[dist > threshold] = 1  # Original
        predicts.append(dist.flatten().detach().cpu())
    predicts = torch.cat(predicts).numpy().astype(float)
    gt = torch.cat(gt).cpu().squeeze().detach().numpy()
    return accuracy_score(predicts, gt)


validation(model, test_pair_dataloader)  # accuracy ~ 0.5 on untrained model

Можно предположить что наиболее сложной задачей является именно различение поддельных подписей от настоящих. А при обучении они не так часто попадают в один batch. Можно использовать код вроде того что написан выше что бы в batch попадали только подписи одного человека и подделки. Но это тоже не идеальная стратегия так как у других людей могут встретиться похожие подписи.

Более универсальным решением будет использование [Mainer](https://kevinmusgrave.github.io/pytorch-metric-learning/miners/)

#TritletLoss

### Labels
Так как у нас много классов

In [None]:
import numpy as np

_, labels = next(iter(train_dataloader))


def show_classes(labels):
    plt.figure(figsize=(20, 6))
    x, counts = labels.unique(return_counts=True)
    plt.bar(np.array(train_dataset.classes)[x], height=counts)

    # plt.show()


show_classes(labels)

## Sampler

Если брать случайные батчи то у большинства объектов не находиться позитивных примеров. Когда датасет будет содержать еще большее количество объектов то ситуация усугубится.

Поэтому Нужно формировать батчи(семплировать данные) так что бы в батче для каждого объекта были как положительные так и отрицательные примеры.


В Pytorch есть механизм для семплирования: https://pytorch.org/docs/stable/data.html#torch.utils.data.Sampler

Но он не поддерживает нужный нам алгоритм, поэтому воспользуемся сэмплером из библиотеки [PyTorch Metric Learning](https://kevinmusgrave.github.io/pytorch-metric-learning/samplers/)

Список сэмплеров:
https://kevinmusgrave.github.io/pytorch-metric-learning/samplers/


Воспользуемся [MPerClassSampler](https://kevinmusgrave.github.io/pytorch-metric-learning/samplers/#mperclasssampler) этот семплер формирует батчи в которых фиксированное количество объектов из каждого класса

In [None]:
from pytorch_metric_learning.samplers import MPerClassSampler


mpc_sampler = MPerClassSampler(
    labels=train_dataset.targets, m=3, length_before_new_iter=10000  # epoch end
)

train_dataloader_mpc_sampler = DataLoader(
    train_dataset, batch_size=32, sampler=mpc_sampler
)
images, labels = next(iter(train_dataloader_mpc_sampler))  # img1, label


show_classes(labels)

Вероятность того что batch попадут и поддельные и оригинальные подписи одного человека мала, увеличим размер batch.

In [None]:
train_dataloader_mpc_sampler = DataLoader(
    train_dataset, batch_size=512, sampler=mpc_sampler, num_workers=2
)

## Miner

Для использования TipletLoss нужно формировать тройки объектов

In [None]:
from pytorch_metric_learning.miners import TripletMarginMiner

miner = TripletMarginMiner(margin=0.2, type_of_triplets="all")
embeddings = model(images.to(device))
indices_tuple = miner(embeddings, labels)
print(len(indices_tuple))
print(indices_tuple[0].shape)
print(indices_tuple[1].shape)
print(indices_tuple[2].shape)

In [None]:
from pytorch_metric_learning.miners import TripletMarginMiner


class LitTriplet(Lit):
    def __init__(self, model, criterion):
        super().__init__(model, criterion)
        self.miner = TripletMarginMiner(margin=0.2, type_of_triplets="all")

    def training_step(self, batch, batch_idx):
        x, y = batch
        embeddings = self.model(x)
        indices = self.miner(embeddings, y)
        loss = self.criterion(embeddings, y, indices)
        self.log("loss", loss, prog_bar=True)
        return loss

## Configure triplet loss

In [None]:
from pytorch_metric_learning.losses import TripletMarginLoss

model = get_model()
criterion = TripletMarginLoss(margin=0.2)

## Train

In [None]:
lit = LitTriplet(model, criterion)
lit.lr = 1e-5
logger = TensorBoardLogger("lightning_logs", name="Triplet")
trainer = L.Trainer(max_epochs=10, logger=logger, log_every_n_steps=5)
trainer.fit(
    model=lit,
    train_dataloaders=train_dataloader_mpc_sampler,
    val_dataloaders=val_dataloader,
)

Check on Forged

In [None]:
validation(model, test_pair_dataloader)

# Offline pair selection

In [None]:
train_pair_dataset = PairDataset("/content/sign_data_mini/train", transform)
print(train_pair_dataset[0])
train_pair_dataloader = DataLoader(
    train_pair_dataset, batch_size=32, shuffle=True, num_workers=2
)

In [None]:
class LitCosine(Lit):
    def __init__(self, model, criterion):
        super().__init__(model, criterion)

    def training_step(self, batch, batch_idx):
        x0, x1, y = batch
        embeddings0 = self.model(x0)
        embeddings1 = self.model(x1)
        loss = self.criterion(embeddings0, embeddings1, y)
        self.log("loss", loss, prog_bar=True)
        return loss

    def validation_step(self, batch, batch_idx):
        pass

    def on_validation_epoch_end(self):
        pass

In [None]:
model = get_model()
criterion = nn.CosineEmbeddingLoss(margin=-0)
lit = LitCosine(model, criterion)
lit.lr = 1e-5
logger = TensorBoardLogger("lightning_logs", name="ForgCosine")
trainer = L.Trainer(max_epochs=3, logger=logger, log_every_n_steps=5)
trainer.fit(
    model=lit, train_dataloaders=train_pair_dataloader, val_dataloaders=val_dataloader
)

In [None]:
validation(model, test_pair_dataloader)

# Test

В данном блоке нужно описать структуру модели.

На вход модели поступают **два** одноканальных изображения а не одно. На выходе **два** вектора - признака (embeddings).

Размер embedding подберите самостоятельно.



Допускается использовать в качестве основы готовые модели из torchvision

Вспомогательный код для загрузки весов модели с диска

Вспомогательный код для вывода изображений

In [None]:
import numpy as np


def imshow(img, text=None, should_save=False):
    npimg = img.numpy()
    plt.axis("off")
    plt.rcParams["figure.figsize"] = (5, 10)
    if text:
        plt.text(
            75,
            8,
            text,
            style="italic",
            fontweight="bold",
            bbox={"facecolor": "white", "alpha": 0.8, "pad": 10},
        )
    plt.imshow(np.transpose(npimg, (1, 2, 0)))
    plt.show()

Визуализация результатов сравнения

In [None]:
vis_dataloader = DataLoader(test_pair_dataset, batch_size=32)

In [None]:
import torch.nn.functional as F

# Print the sample outputs to view its dissimilarity
for i, batch in enumerate(vis_dataloader):
    x0, x1, label_id = batch
    output1 = model(x0.to(device))
    output2 = model(x1.to(device))

    eucledian_distance = F.pairwise_distance(output1, output2)
    cosine_distance = F.cosine_similarity(output1, output2)
    cos_norm = (cosine_distance + 1) / 2

    for i in range(label_id.shape[0]):
        concatenated = torch.cat((x0[i][0], x1[i][0]), 1)
        label = test_pair_dataset.classes[label_id[i].item()]
        imshow(
            utils.make_grid(concatenated),
            "Dissimilarity(euc./cos./norm.): {:.2f}/{:.2f}/{:.2f} Label: {}".format(
                eucledian_distance[i].item(),
                cosine_distance[i].item(),
                cos_norm[i].item(),
                label,
            ),
        )

        print("Dissimilarity:")
        print(f"Euclidean dist:  {eucledian_distance[i].item():.2f} [0 .. inf ]")
        print(f"Cosine dist  {cosine_distance[i].item():.2f} [-1 .. 1]")
        print(f"Normalized cos. dist  {cos_norm[i].item():.2f} [0 .. 1]")
    break