## Контрастное обучение с SimCLR
[SimCLR](https://arxiv.org/abs/2002.05709) (Simple Contrastive Learning Representation): self-supervised модель, которая используется для получения осмысленных представлений изображений

<!-- <img src="../images/simclr_im1.png" alt="drawing" width="600"/> -->



## Contrastive learning framework

<!-- <img src="../images/simclr_im2.png" alt="drawing" width="700"/> -->

## Обучение на CIFAR-10
Обучим модель извлечения признаков изображений на наборе данных [CIFAR-10](https://www.cs.toronto.edu/~kriz/cifar.html). Для обучения эмбеддингов будем использовать контрастную функцию потерь.

Из набора данных выбираем $N$ изображений и для каждого из них получаем 2 избражения, используя 2 случаных преобразования (кроп, изменение цвета) исходного изображения. Пару изображений, полученных от одного изображения будем называть положительной парой, иначе отрицательной. Теперь мы имеем $2N$ изображений

Для каждой пары положительной пары $(i,j)$ определим функцию потерь, которая вынуждает модель выдавать близкие по метрике эмбеддинги для положительных пар, и далёкие для отрицательных.

$$
l_{i,j} = -\log\frac{\exp(\text{sim}(\textbf{z}_i, \textbf{z}_j)/\tau)}{\sum_{k=1}^{2N}\mathbb{1}_{[k\neq i]}\exp(\text{sim}(\textbf{z}_i,\textbf{z}_k)/\tau)}
$$

$$
\text{sim}(\textbf{u}, \textbf{v}) = \textbf{u}^T\textbf{v}/\left\lVert\textbf{u}\right\rVert \left\lVert\textbf{v}\right\rVert
$$

Итоговая функция потерь:

$$
\mathcal{L} = \frac{1}{2N}\sum_{k=1}^N[l(2k-1,2k) + l(2k, 2k - 1)]
$$


In [1]:
import torchvision
import torch
from torchsummary import summary
import matplotlib.pyplot as plt
import numpy as np
import torch.nn.functional as F
from torch import nn
from torchvision import transforms

device = torch.device("cuda") if torch.cuda.is_available() else torch.device("cpu")

  from .autonotebook import tqdm as notebook_tqdm


In [2]:
# a = torch.Tensor([[1, 2, 3],
# [-2, 1, 2],
# [-3, 3, 1]])

# a.diagonal(dim1=-1, dim2=-2).zero_()
# a

### Задание модели

В качестве бекбона будем использовать модификацию [ResNet-50](https://arxiv.org/abs/1512.03385)

Так как разрешение изображений в наборе данных CIFAR-10, меньше чем в на наборе данных [ImageNet](https://www.image-net.org/), мы заменим первый свёрточный слой с ядром $(7\times 7)$ и страйдом $2$, на свёрточный слой с ядром размера $(3\times3)$ и страйдом $1$  
Также мы удалим первый maxpolling слой

In [3]:
def get_color_distortion(s=0.5):
    # s is the strength of color distortion.
    color_jitter = transforms.ColorJitter(0.8 * s, 0.8 * s, 0.8 * s, 0.2 * s)
    rnd_color_jitter = transforms.RandomApply([color_jitter], p=0.8)
    rnd_gray = transforms.RandomGrayscale(p=0.2)
    color_distort = transforms.Compose([rnd_color_jitter, rnd_gray])
    return color_distort

In [4]:
resnet50 = torchvision.models.resnet50(pretrained=False).to(device)
list(resnet50.children())[:4]

[Conv2d(3, 64, kernel_size=(7, 7), stride=(2, 2), padding=(3, 3), bias=False),
 BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True),
 ReLU(inplace=True),
 MaxPool2d(kernel_size=3, stride=2, padding=1, dilation=1, ceil_mode=False)]

In [5]:
new_modules = [
    nn.Conv2d(3, 64, kernel_size=(3, 3), stride=(1, 1), padding=1),
    nn.BatchNorm2d(64),
    nn.ReLU(inplace=True),
] + list(resnet50.children())[4:-1]
resnet50_cifar = torch.nn.Sequential(*new_modules).to(device)

# summary(resnet50_cifar, (3, 32, 32))

In [6]:
class SimCLR(nn.Module):
    def __init__(
        self,
        base_encoder: torch.nn.Module,
        projection_dim=128,
        temp=0.5,
    ) -> None:
        super().__init__()
        self.projection_dim = projection_dim
        self.temp = temp

        # define transforms
        self.transformations = transforms.Compose(
            [
                transforms.RandomResizedCrop(size=(32, 32)),
                transforms.RandomHorizontalFlip(),
                get_color_distortion(s=0.5),
            ]
        )

        # define base_encoder x -> h
        self.base_encoder = base_encoder
        self.latent_dim = 2048

        # define projection head h -> z

        self.projection_head = nn.Sequential(
            nn.Linear(self.latent_dim, self.projection_dim, bias=False),
            nn.BatchNorm1d(self.projection_dim),
            nn.ReLU(inplace=True),
            nn.Linear(self.projection_dim, self.projection_dim, bias=False),
            nn.BatchNorm1d(self.projection_dim, affine=False),
        )

    def forward(self, images: torch.Tensor) -> torch.Tensor:
        """
        Takes batch of augmented images, and computes contrastive loss
        """

        batch_size = images[0].shape[0]

        first_view, second_view = images

        # compute embeddings
        first_h = self.base_encoder(first_view)
        second_h = self.base_encoder(second_view)

        first_z = self.projection_head(torch.squeeze(first_h))
        second_z = self.projection_head(torch.squeeze(second_h))

        # normalize
        first_z, second_z = F.normalize(first_z), F.normalize(second_z)

        # compute similarities

        view_cat = torch.cat([first_z, second_z], dim=0)  # 2N x d
        s = view_cat @ view_cat.T  # 2N x 2N

        s = s / self.temp

        # Mask out same-sample terms

        s[torch.arange(2 * batch_size), torch.arange(2 * batch_size)] = -float("inf")

        # compute loss
        targets = torch.cat(
            (
                torch.arange(batch_size, 2 * batch_size),
                torch.arange(0, batch_size),
            ),
            dim=0,
        )
        targets = targets.to(s.get_device()).long()

        loss = F.cross_entropy(s, targets, reduction="sum")

        loss = loss / (2 * batch_size)

        return loss, first_h

### 1) Загрузка данных

In [7]:
def get_transform(train):
    if train:
        transform = transforms.Compose(
            [
                transforms.RandomResizedCrop(32),
                transforms.RandomHorizontalFlip(p=0.5),
                transforms.RandomApply(
                    [transforms.ColorJitter(0.4, 0.4, 0.4, 0.1)], p=0.8
                ),
                transforms.RandomGrayscale(p=0.2),
                transforms.ToTensor(),
                transforms.Normalize(
                    [0.4914, 0.4822, 0.4465], [0.2023, 0.1994, 0.2010]
                ),
            ]
        )
    else:
        transform = transforms.Compose(
            [
                transforms.ToTensor(),
                transforms.Normalize(
                    [0.4914, 0.4822, 0.4465], [0.2023, 0.1994, 0.2010]
                ),
            ]
        )
    return transform


class SimCLRDataTransform(object):
    def __init__(self, transform):
        self.transform = transform

    def __call__(self, sample):
        xi = self.transform(sample)
        xj = self.transform(sample)
        return xi, xj

In [8]:
DATA_PATH = "../data"
BATCH_SIZE = 128
NUM_WORKERS = 12

N_EPOCH = 120
LR = 1.0
MOMENTUM = 0.9
WD = 1e-6


train_dset = torchvision.datasets.CIFAR10(
    DATA_PATH,
    train=True,
    transform=SimCLRDataTransform(get_transform(train=True)),
    download=True,
)
test_dset = torchvision.datasets.CIFAR10(
    DATA_PATH,
    train=False,
    transform=SimCLRDataTransform(get_transform(train=False)),
    download=True,
)

train_loader = torch.utils.data.DataLoader(
    train_dset,
    batch_size=BATCH_SIZE,
    num_workers=12,
    drop_last=True,
)

val_loader = torch.utils.data.DataLoader(
    test_dset,
    batch_size=BATCH_SIZE,
    num_workers=12,
    drop_last=True,
)

Files already downloaded and verified
Files already downloaded and verified


In [9]:
from lars import LARS

model = SimCLR(base_encoder=resnet50_cifar).to(device)
optimizer = LARS(
    model.parameters(),
    lr=LR,
    momentum=MOMENTUM,
    weight_decay=WD,
)

### Обучение

In [10]:
model.train()
with torch.autograd.set_detect_anomaly(True):
    for _ in range(N_EPOCH):
        for images, _ in train_loader:
            images = [x.to(device) for x in images]
            loss, hs = model(images)
            hs = hs.detach()
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

            print(loss.item())

	add_(Number alpha, Tensor other)
Consider using one of the following signatures instead:
	add_(Tensor other, *, Number alpha) (Triggered internally at  /opt/conda/conda-bld/pytorch_1640811803361/work/torch/csrc/utils/python_arg_parser.cpp:1050.)
  buf.mul_(momentum).add_(actual_lr, d_p + weight_decay * p.data)


5.55462646484375
5.543821334838867
5.524416923522949
5.528378963470459
5.5681352615356445
5.561792373657227
5.501705169677734
5.514983177185059
5.539113998413086
5.530882835388184
5.479883193969727
5.628942966461182
5.50087308883667
5.4649834632873535
5.430487155914307
5.562296390533447
5.500844955444336
5.498812198638916
5.427420139312744
5.500797748565674
5.530545234680176
5.453332901000977
5.541370391845703
5.436233997344971
5.358013153076172
5.432920455932617
5.321462631225586
5.326171875
5.42201042175293
5.273672580718994
5.380896091461182
5.31166934967041
5.2001519203186035
5.384452819824219
5.320104598999023
5.332322597503662
5.18975305557251
5.296821594238281
5.241923809051514
5.303590774536133
5.251042366027832
5.119968891143799
5.110445499420166
5.220890045166016
5.111337184906006
5.008529186248779
5.035027027130127
5.013930320739746
4.998182773590088
5.097278118133545
5.026236534118652
5.106514930725098
5.019130229949951
5.0167436599731445
5.000108242034912
5.056652545928955

KeyboardInterrupt: 