# 02. Домашнее задание. Обучение Vision Transformer


В этом задании вам предлагается дописать реализацию ViT и обучить его на датасете CIFAR-10. Хотя эта модель гораздо больше тех CNN, что мы изучали (и будем изучать), ресурсов colab-а гарантированно хватит, чтобы закончить обучение. Так же рекомендуется просмотреть Lab_5, Lab_3 и Lab_6 , поскольку это задание во многом основано на них.

**Ищите комментарии "Write your code here" для быстрого обнаружения мест, где вы должны что-то дописать!**

![](https://hashtelegraph.com/wp-content/uploads/2024/08/shooting-sparrows-with-a-cannon--1024x576.jpg)

## 1. Patch Embeddings

Отличительной особенностью ViT является эмбеддинги изображений. Чтобы представить изображение в виде входного вектора для трансформера, оно разбивается на фрагменты (patches) заданного размера. В нашей имплементации это будет сделано с помощью обучаемой свертки.

In [1]:
import math
import torch
from torch import nn


class PatchEmbeddings(nn.Module):
    """
    Преобразуйте изображение в патчи, а затем спроецируйте их в векторное пространство.
    """

    def __init__(self, config):
        super().__init__()
        self.image_size = config["image_size"]
        self.patch_size = config["patch_size"]
        self.num_channels = config["num_channels"]
        self.hidden_size = config["hidden_size"]
        # Количество фрагментов, исходя из размера изображения и размера фрагмента.
        self.num_patches = (self.image_size // self.patch_size) ** 2
        # Write your code here +
        # Допишите нужные параметры. Выходом свертки явлются self.num_patches фрагментов размером self.hidden_size
        # Вы можете менять параметры out_channels, kernel_size, stride
        self.projection = nn.Conv2d(
            self.num_channels,
            self.hidden_size,
            kernel_size=self.patch_size,
            stride=self.patch_size,
        )

    def forward(self, x):
        # (batch_size, num_channels, image_size, image_size) -> (batch_size, num_patches, hidden_size)
        batch_size = x.shape[0]
        x = self.projection(x)
        x = x.flatten(2).transpose(1, 2)
        assert x.shape[0] == batch_size
        assert x.shape[1] == self.num_patches
        assert x.shape[2] == self.hidden_size
        return x

In [2]:
class Embeddings(nn.Module):
    """
    Combine the patch embeddings with the class token and position embeddings.
    """

    def __init__(self, config):
        super().__init__()
        self.config = config
        self.patch_embeddings = PatchEmbeddings(config)
        # Создаем обучаемый [CLS] токен
        # Подобно BERT, токен [CLS] добавляется в начало входной последовательности.
        # Токен используется для классификации
        self.cls_token = nn.Parameter(torch.randn(1, 1, config["hidden_size"]))
        self.position_embeddings = nn.Parameter(
            torch.randn(1, self.patch_embeddings.num_patches + 1, config["hidden_size"])
        )
        self.dropout = nn.Dropout(config["hidden_dropout_prob"])

    def forward(self, x):
        x = self.patch_embeddings(x)
        batch_size, _, _ = x.size()
        # Расширяем токен [CLS] до размера батча
        # (1, 1, hidden_size) -> (batch_size, 1, hidden_size)
        cls_tokens = self.cls_token.expand(batch_size, -1, -1)
        # Присоединяем токен [CLS] к началу входной последовательности.
        # Длина последовательности (num_patches + 1)
        x = torch.cat((cls_tokens, x), dim=1)
        x = x + self.position_embeddings
        x = self.dropout(x)
        return x

## 2. Attention head

Допишите реализацию головы трансформера. Формулы подсчеты attention score можно найти в Лекции 5, посвященной трансформерам

In [3]:
from torch.nn.functional import softmax


class AttentionHead(nn.Module):
    """
    Голова трансформера
    """

    def __init__(self, hidden_size, attention_head_size, dropout):
        super().__init__()
        self.hidden_size = hidden_size
        self.attention_head_size = attention_head_size

        self.query = nn.Linear(hidden_size, attention_head_size, bias=True)
        self.key = nn.Linear(hidden_size, attention_head_size, bias=True)
        self.value = nn.Linear(hidden_size, attention_head_size, bias=True)

        self.dropout = nn.Dropout(dropout)

    def forward(self, x):
        # x перемножается на query, key, value матрицы.
        # Результат перемножения используется для для подсчета attention output
        Q = self.query(x)
        K = self.key(x)
        V = self.value(x)
        # Write your code here +
        # Посчитайте attention_output по формуле: softmax(Q*K.T/sqrt(head_size)) * V
        # Опционально, на softmax(Q*K.T/sqrt(head_size)) можно добавить dropout
        attention_output = softmax(
            torch.matmul(Q, K.transpose(-2, -1)) / math.sqrt(self.attention_head_size),
            dim=-1,
        )
        attention_output = self.dropout(attention_output)
        attention_output = torch.matmul(attention_output, V)

        return attention_output


class MultiHeadAttention(nn.Module):
    """
    Multi-head attention module
    """

    def __init__(self, config):
        super().__init__()
        self.hidden_size = config["hidden_size"]
        self.num_attention_heads = config["num_attention_heads"]
        # Размерность для одной головы
        self.attention_head_size = self.hidden_size // self.num_attention_heads
        self.all_head_size = self.num_attention_heads * self.attention_head_size
        # Write your code here +
        # Создайте несколько голов и добавьте их в self.heads
        self.heads = nn.ModuleList(
            [
                AttentionHead(
                    self.hidden_size,
                    self.attention_head_size,
                    config["attention_probs_dropout_prob"],
                )
                for _ in range(self.num_attention_heads)
            ]
        )
        # Финальная линейная проекция
        self.output_projection = nn.Linear(self.all_head_size, self.hidden_size)
        self.output_dropout = nn.Dropout(config["hidden_dropout_prob"])

    def forward(self, x):
        # Считаем параллельно в каждой голове, потом конкатенируем результат
        attention_outputs = [head(x) for head in self.heads]
        attention_output = torch.cat(
            [attention_output for attention_output in attention_outputs], dim=-1
        )
        attention_output = self.output_projection(attention_output)
        attention_output = self.output_dropout(attention_output)
        return attention_output

In [4]:
class MLP(nn.Module):
    """
    Многослойный персептрон
    """

    def __init__(self, config):
        super().__init__()
        self.dense_1 = nn.Linear(config["hidden_size"], config["intermediate_size"])
        self.activation = nn.GELU()
        self.dense_2 = nn.Linear(config["intermediate_size"], config["hidden_size"])
        self.dropout = nn.Dropout(config["hidden_dropout_prob"])

    def forward(self, x):
        x = self.dense_1(x)
        x = self.activation(x)
        x = self.dense_2(x)
        x = self.dropout(x)
        return x

## 3. Encoder

Схема энкодера трансформера, используемого в ViT

![](https://theaisummer.com/static/aa65d942973255da238052d8cdfa4fcd/7d4ec/the-transformer-block-vit.png)


In [5]:
class Block(nn.Module):
    """
    Блок энкодера, как на рисунке сверху
    """

    def __init__(self, config):
        super().__init__()
        self.attention = MultiHeadAttention(config)
        self.layernorm_1 = nn.LayerNorm(config["hidden_size"])
        self.mlp = MLP(config)
        self.layernorm_2 = nn.LayerNorm(config["hidden_size"])

    def forward(self, x):
        # Write your code here +
        # x должен пройти по последовательности LayerNorm, MultiHeadAttention и
        # MLP слоев, как на рисунке сверху
        norm_x = self.layernorm_1(x)
        attention_out = self.attention(norm_x)
        x = x + attention_out

        norm_x = self.layernorm_2(x)
        mlp_out = self.mlp(norm_x)
        x = x + mlp_out

        return x


class Encoder(nn.Module):
    """
    Энкодер, состоящий из config["num_blocks"] блоков
    """

    def __init__(self, config):
        super().__init__()
        self.blocks = nn.ModuleList(
            [Block(config) for _ in range(config["num_blocks"])]
        )
        # Write your code here +
        # Создайте список из config["num_blocks"] Block-ов
        ...

    def forward(self, x):
        for block in self.blocks:
            x = block(x)
        return x

In [6]:
class ViTForClassfication(nn.Module):
    """
    Vision transformer. Состоит из PatchEmbedder-а, энкодера и линейного слоя
    """

    def __init__(self, config):
        super().__init__()
        self.config = config
        # 32 для CIFAR
        self.image_size = config["image_size"]
        self.hidden_size = config["hidden_size"]
        # 10 для CIFAR
        self.num_classes = config["num_classes"]

        self.embedding = Embeddings(config)
        self.encoder = Encoder(config)
        self.classifier = nn.Linear(self.hidden_size, self.num_classes)

    def forward(self, x):
        embedding_output = self.embedding(x)
        encoder_output = self.encoder(embedding_output)
        # Рассчываем логиты как выходные данные токена [CLS]
        logits = self.classifier(encoder_output[:, 0, :])

        return logits

## 4. Подготовка датасета

Эта часть мало чем отличается от Lab_3, поэтому можете свериться с ней

In [None]:
# Import libraries
import torch
import torchvision
import torchvision.transforms as transforms


def prepare_data(
    batch_size=4, num_workers=2, train_sample_size=None, test_sample_size=None
):
    # Write your code here +
    # Допишите нужные трансформации для обучающего датасета
    train_transform = transforms.Compose(
        [
            transforms.Resize((32, 32)),
            transforms.RandomHorizontalFlip(),
            transforms.RandomRotation(10),
            transforms.ToTensor(),
            transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5)),
        ]
    )

    trainset = torchvision.datasets.CIFAR10(
        root="./data", train=True, download=True, transform=train_transform
    )
    if train_sample_size is not None:
        # Randomly sample a subset of the training set
        indices = torch.randperm(len(trainset))[:train_sample_size]
        trainset = torch.utils.data.Subset(trainset, indices)

    trainloader = torch.utils.data.DataLoader(
        trainset, batch_size=batch_size, shuffle=True, num_workers=num_workers
    )

    test_transform = transforms.Compose(
        [
            transforms.Resize((32, 32)),
            transforms.ToTensor(),
            transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5)),
        ]
    )

    testset = torchvision.datasets.CIFAR10(
        root="./data", train=False, download=True, transform=test_transform
    )
    if test_sample_size is not None:
        indices = torch.randperm(len(testset))[:test_sample_size]
        testset = torch.utils.data.Subset(testset, indices)

    testloader = torch.utils.data.DataLoader(
        testset, batch_size=batch_size, shuffle=False, num_workers=num_workers
    )

    return trainloader, testloader

# Finally, обучение!

In [8]:
batch_size = 128
epochs = 10
lr = 1e-2

import torch
from torch import nn, optim

device = "cuda" if torch.cuda.is_available() else "cpu"

# Можете по-экспериментировать с параметрами для улучшения обучения
config = {
    "patch_size": 4,  # Input image size: 32x32 -> 8x8 patches
    "hidden_size": 48,
    "num_blocks": 4,
    "num_attention_heads": 4,
    "intermediate_size": 4 * 48,  # 4 * hidden_size
    "hidden_dropout_prob": 0.0,
    "attention_probs_dropout_prob": 0.0,
    "initializer_range": 0.02,
    "image_size": 32,
    "num_classes": 10,  # num_classes of CIFAR10
    "num_channels": 3,
}

assert config["hidden_size"] % config["num_attention_heads"] == 0
assert config["intermediate_size"] == 4 * config["hidden_size"]
assert config["image_size"] % config["patch_size"] == 0


class Trainer:
    """
    Класс обучения
    """

    def __init__(self, model, optimizer, loss_fn, device):
        self.model = model.to(device)
        self.optimizer = optimizer
        self.loss_fn = loss_fn
        self.device = device

    def train(self, trainloader, testloader, epochs):
        for i in range(epochs):
            self.model.train()
            train_loss = 0
            for batch in trainloader:
                # Move the batch to the device
                batch = [t.to(self.device) for t in batch]
                images, labels = batch
                # Zero the gradients
                self.optimizer.zero_grad()
                # Calculate the loss
                loss = self.loss_fn(self.model(images), labels)
                # Backpropagate the loss
                loss.backward()
                # Update the model's parameters
                self.optimizer.step()
                train_loss += loss.item() * len(images)
            accuracy, test_loss = self.evaluate(testloader)
            print(
                f"Epoch: {i+1}, Train loss: {train_loss / len(trainloader.dataset):.4f}, Test loss: {test_loss:.4f}, Accuracy: {accuracy:.4f}"
            )

    @torch.no_grad()
    def evaluate(self, testloader):
        self.model.eval()
        total_loss = 0
        correct = 0
        with torch.no_grad():
            for batch in testloader:
                # Move the batch to the device
                batch = [t.to(self.device) for t in batch]
                images, labels = batch

                # Get predictions
                logits = self.model(images)

                # Calculate the loss
                loss = self.loss_fn(logits, labels)
                total_loss += loss.item() * len(images)

                # Calculate the accuracy
                predictions = torch.argmax(logits, dim=1)
                correct += torch.sum(predictions == labels).item()
        accuracy = correct / len(testloader.dataset)
        avg_loss = total_loss / len(testloader.dataset)
        return accuracy, avg_loss


def main():
    # Training parameters
    # Load the CIFAR10 dataset
    trainloader, testloader = prepare_data(batch_size=batch_size)
    # Create the model, optimizer, loss function and trainer
    model = ViTForClassfication(config)
    optimizer = optim.AdamW(model.parameters(), lr=lr, weight_decay=1e-2)
    loss_fn = nn.CrossEntropyLoss()
    trainer = Trainer(model, optimizer, loss_fn, device=device)
    trainer.train(trainloader, testloader, epochs)


if __name__ == "__main__":
    main()

100%|██████████| 170M/170M [00:10<00:00, 16.1MB/s] 


Epoch: 1, Train loss: 1.7524, Test loss: 1.5415, Accuracy: 0.4374
Epoch: 2, Train loss: 1.4571, Test loss: 1.3485, Accuracy: 0.5074
Epoch: 3, Train loss: 1.3401, Test loss: 1.2912, Accuracy: 0.5272
Epoch: 4, Train loss: 1.2640, Test loss: 1.2533, Accuracy: 0.5455
Epoch: 5, Train loss: 1.2157, Test loss: 1.2567, Accuracy: 0.5468
Epoch: 6, Train loss: 1.1663, Test loss: 1.2361, Accuracy: 0.5615
Epoch: 7, Train loss: 1.1250, Test loss: 1.0990, Accuracy: 0.6052
Epoch: 8, Train loss: 1.0950, Test loss: 1.0420, Accuracy: 0.6245
Epoch: 9, Train loss: 1.0591, Test loss: 1.0592, Accuracy: 0.6164
Epoch: 10, Train loss: 1.0274, Test loss: 1.1105, Accuracy: 0.6032
