<a href="https://colab.research.google.com/github/der-himmel/obr-iz/blob/main/%D0%9F%D1%80%D0%B0%D0%BA%D1%82%D0%B8%D1%87%D0%B5%D1%81%D0%BA%D0%BE%D0%B5_%D0%B7%D0%B0%D0%B4%D0%B0%D0%BD%D0%B8%D0%B5_%E2%84%966.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Пишем свою собственную, маленькую, да удаленькую ResNet

In [1]:
from pathlib import Path
import numpy as np
import pandas as pd
from PIL import Image
import torch
from torch import nn
from torchvision import transforms
import gc
from functools import partial
from tqdm import tqdm
from matplotlib import pyplot as plt

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

## Сверточный блок

![](https://github.com/ViktorAnchutin/miniresnet/blob/main/rdc_images/3%20resnet%20paper.png?raw=true)

In [2]:
class ConvBlock(nn.Module):
    def __init__(
            self,
            input_channels: int,
            output_channels: int,
            kernel_size: int = 3,
            padding: int = 1,
            activation: bool = True,
        ):
        super().__init__()
        self.layers = nn.Sequential(
            nn.Conv2d(
                input_channels,
                output_channels,
                stride=1,
                kernel_size=kernel_size,
                padding=padding
            ),
            nn.BatchNorm2d(output_channels),
            nn.ReLU() if activation else nn.Identity()
        )

    def forward(self, x):
        return self.layers(x)

example_of_block = ConvBlock(3, 32)
example_of_block(torch.rand(64, 3, 244, 244)).shape

torch.Size([64, 32, 244, 244])

## Residual модуль

![](https://github.com/ViktorAnchutin/miniresnet/blob/main/rdc_images/5%20resnet%20paper.png?raw=true)

In [3]:
class ResidualBlock(nn.Module):
    def __init__(self, input_channels, output_channels):
        super().__init__()
        self.mismatch = not input_channels == output_channels

        self.residual_conv = ConvBlock(
            input_channels,
            output_channels,
            kernel_size=1,
            padding=0,
            activation=False
        )

        self.convolutions = nn.Sequential(
            ConvBlock(input_channels, output_channels),
            ConvBlock(output_channels, output_channels, activation=False),
        )

        self.activation = nn.ReLU()

    def residual_connection(self, x):
        if self.mismatch:
            return self.convolutions(x) + self.residual_conv(x)
        else:
            return self.convolutions(x) + x

    def forward(self, x):
        return self.activation(self.residual_connection(x))

ResidualBlock(3,32)(torch.randn((64, 3, 244, 244))).shape, ResidualBlock(32,32)(torch.randn((64,32,244,244))).shape

(torch.Size([64, 32, 244, 244]), torch.Size([64, 32, 244, 244]))

## MaxPooling
![](https://github.com/ViktorAnchutin/miniresnet/blob/main/rdc_images/6%20max%20pooling.png?raw=true)

In [4]:
class MaxPoolBlock(nn.Module):
    def __init__(self, kernel_size: int = 2, stride: int = 2, padding: int = 0):
        """
        :param kernel_size: Размер ядра свёртки (по умолчанию 2).
        :param stride: Шаг ядра (по умолчанию 2).
        :param padding: Паддинг для границ (по умолчанию 0).
        """
        super(MaxPoolBlock, self).__init__()
        self.pool = nn.MaxPool2d(kernel_size=kernel_size, stride=stride, padding=padding)

    def forward(self, x):
        """
        Прямой проход через MaxPooling блок.
        :param x: Входной тензор.
        :return: Тензор с уменьшенными размерами.
        """
        return self.pool(x)

In [5]:
example_tenzor1 = torch.randn((64, 3, 144, 144))

example_tenzor2 = torch.randn((64, 3, 7, 7))

maxpool_block = MaxPoolBlock(kernel_size=2, stride=2)

output1 = maxpool_block(example_tenzor1)
output2 = maxpool_block(example_tenzor2)

print("Output 1 shape:", output1.shape)  #(64, 3, 72, 72)
print("Output 2 shape:", output2.shape)  #(64, 3, 3, 3)

Output 1 shape: torch.Size([64, 3, 72, 72])
Output 2 shape: torch.Size([64, 3, 3, 3])


In [6]:
nn.MaxPool2d(2)(torch.randn((64,3,144,144))).shape, nn.MaxPool2d(2)(torch.randn((64,3,7,7))).shape

(torch.Size([64, 3, 72, 72]), torch.Size([64, 3, 3, 3]))

## Global average pooling
![](https://github.com/ViktorAnchutin/miniresnet/blob/main/rdc_images/7%20gap.png?raw=true)

In [7]:
class GlobalAveragePooling(nn.Module):
    def __init__(self):
        super(GlobalAveragePooling, self).__init__()
        self.gap = nn.AdaptiveAvgPool2d((1, 1))  # Приводим размерность к (1, 1)

    def forward(self, x):
        return self.gap(x)  # Применяем Global Average Pooling

In [8]:
GAP_layer = GlobalAveragePooling()
input_tenzor = torch.randn(64, 128, 7, 7)


output_tenzor = GAP_layer(input_tenzor)

print("Input shape:", input_tenzor.shape)
print("Output shape after GAP:", output_tenzor.shape)

Input shape: torch.Size([64, 128, 7, 7])
Output shape after GAP: torch.Size([64, 128, 1, 1])


In [9]:
nn.AdaptiveAvgPool2d((1,1))(torch.randn((64,128,7,7))).shape, nn.AdaptiveAvgPool2d((1,1))(torch.randn((64,256,2,2))).shape

(torch.Size([64, 128, 1, 1]), torch.Size([64, 256, 1, 1]))

## Развертывание


![image.png](https://github.com/ViktorAnchutin/miniresnet/blob/main/rdc_images/8%20flatten.png?raw=true)

## Финальное отображение в таргет



![image.png](https://github.com/ViktorAnchutin/miniresnet/blob/main/rdc_images/9%20head.png?raw=true)

## Создаем модель

In [10]:
def get_backbone_layers(channels: list, maxpool_kernel: int = 2):
    layers = list()

    layers.extend([ConvBlock(3, channels[0]), nn.MaxPool2d(maxpool_kernel)])
    for in_channels, out_channels  in zip(channels[:-1], channels[1:]):
        layers.extend([
            ResidualBlock(in_channels, out_channels),
            nn.MaxPool2d(maxpool_kernel),
        ])

    layers.append(nn.AdaptiveAvgPool2d((1, 1)))

    return nn.Sequential(*layers)

In [11]:
class MiniResNet(nn.Module):
    def __init__(self, channels: list, maxpool_kernel: int):
        super().__init__()
        self.backbone_net = get_backbone_layers(channels, maxpool_kernel)

        self.output_head = nn.Sequential(
            nn.Linear(channels[-1], channels[-1]),
            nn.ReLU(),
            nn.Linear(channels[-1], 10)
        )

    def forward(self, x):
        x = self.backbone_net(x)

        x = x.view(x.size(0), -1)

        x = self.output_head(x)

        return x

MiniResNet([10, 16], 2)(torch.rand(1, 3, 32, 32)).shape

torch.Size([1, 10])

# Дальше знакомый код

In [12]:
import torch
import torch.nn as nn
import torch.optim as optim
import torchvision
import torchvision.transforms as transforms
from torch.utils.data import DataLoader
from sklearn.metrics import accuracy_score, recall_score, precision_score, f1_score
import numpy as np

In [13]:
def calculate_metrics(loader, model):
    y_true = []
    y_pred = []
    model.eval()
    with torch.no_grad():
        for inputs, labels in loader:
            outputs = model(inputs)
            _, predicted = torch.max(outputs, 1)
            y_pred.extend(predicted.numpy())
            y_true.extend(labels.numpy())

    accuracy = accuracy_score(y_true, y_pred)
    recall = recall_score(y_true, y_pred, average='macro')
    precision = precision_score(y_true, y_pred, average='macro')
    f1 = f1_score(y_true, y_pred, average='macro')
    return accuracy, recall, precision, f1

In [27]:
transform = transforms.Compose([transforms.RandomRotation(10), transforms.ToTensor(), transforms.Normalize((0.5,), (0.5,))])
trainset = torchvision.datasets.CIFAR10(root='./data', train=True, download=True, transform=transform)
valset = torchvision.datasets.CIFAR10(root='./data', train=False, download=True, transform=transform)
trainloader = DataLoader(trainset, batch_size=64, shuffle=True)
valloader = DataLoader(valset, batch_size=64, shuffle=False)

Files already downloaded and verified
Files already downloaded and verified


In [29]:
model = MiniResNet([16, 32, 64, 256], 2)
criterion = nn.CrossEntropyLoss()

optimizer = optim.Adam(model.parameters(), lr=3e-4)

In [30]:
for epoch in range(10):
    running_loss = 0.0
    model.train()
    for i, data in enumerate(trainloader, 0):
        inputs, labels = data

        optimizer.zero_grad()

        outputs = model(inputs)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()

        running_loss += loss.item()

    print(f'Epoch {epoch + 1}, Loss: {running_loss / len(trainloader):.3f}')

    train_acc, train_rec, train_prec, train_f1 = calculate_metrics(trainloader, model)
    print(f'Training - Accuracy: {train_acc}, Recall: {train_rec}, Precision: {train_prec}, F1 Score: {train_f1}')

    val_acc, val_rec, val_prec, val_f1 = calculate_metrics(valloader, model)
    print(f'Validation - Accuracy: {val_acc}, Recall: {val_rec}, Precision: {val_prec}, F1 Score: {val_f1}')

print('Finished Training')

Epoch 1, Loss: 1.288
Training - Accuracy: 0.62422, Recall: 0.62422, Precision: 0.6557735796158332, F1 Score: 0.6197721679362413
Validation - Accuracy: 0.6106, Recall: 0.6106, Precision: 0.6374813084875226, F1 Score: 0.606309390288788
Epoch 2, Loss: 0.952
Training - Accuracy: 0.70638, Recall: 0.70638, Precision: 0.7292599261042527, F1 Score: 0.7065797505485533
Validation - Accuracy: 0.672, Recall: 0.6720000000000002, Precision: 0.6938143206850736, F1 Score: 0.6715271007117017
Epoch 3, Loss: 0.808
Training - Accuracy: 0.73072, Recall: 0.73072, Precision: 0.7597058513549259, F1 Score: 0.7310505690278281
Validation - Accuracy: 0.6866, Recall: 0.6866000000000001, Precision: 0.7143951119823637, F1 Score: 0.6862853407508734
Epoch 4, Loss: 0.711
Training - Accuracy: 0.76398, Recall: 0.7639800000000001, Precision: 0.7769215704346294, F1 Score: 0.762829602194433
Validation - Accuracy: 0.7146, Recall: 0.7146000000000001, Precision: 0.7276503139517109, F1 Score: 0.712002545553889
Epoch 5, Loss: 0.

KeyboardInterrupt: 