In [124]:
import torch

In [125]:
class IncorrectKernelSizeException(Exception):
    def __init__(self, kernel_size):
        self.kernel_size = kernel_size

    def __str__(self):
        return f"Incorrect kernel size: {self.kernel_size}. It must be odd."

In [126]:
class ResidualConvBlock(torch.nn.Module):
    def __init__(
        self,
        in_c,
        out_c,
        kernel_size: int = 3,
        activation = torch.nn.ReLU,
        *, # Только именованные параметры
        use_bias = True
    ):
        
        super().__init__()
        self.activation = activation()
        
        if kernel_size % 2 != 0:
            padding_size = (kernel_size - 1) // 2
        else:
            raise IncorrectKernelSizeException(kernel_size)
            
        # print(in_c, out_c, kernel_size, padding_size, '\n---------------------------------------')
        
        self.conv = torch.nn.Conv2d(
            in_channels = in_c,
            out_channels = out_c,
            kernel_size = kernel_size,
            padding = padding_size,
            padding_mode = 'zeros',
            bias = use_bias
        )

        if in_c == out_c:
            self.in_layer = torch.nn.Identity()
        else:
            self.in_layer = torch.nn.Conv2d(
                in_channels = in_c,
                out_channels = out_c,
                kernel_size = 1
            )

    def forward(self, x):
        return self.activation(self.in_layer(x) + self.conv(x))

In [127]:
class GlobalMaxPooling(torch.nn.Module):
    def __init__(self):
        super().__init__()
        
    def forward(self, x):
        return x.max(-1).values.max(-1).values

In [128]:
class GlobalAvgPolling(torch.nn.Module):
    def __init__(self):
        super().__init__()

    #TODO: to make this AvgPooling module

    def forward(self, x):
        pass

In [129]:
from collections import OrderedDict
class ConvNetwork(torch.nn.Module):
    def __init__(
        self,
        classes_num: int, # Количество выходных значений. Количество клаасов для предсказания
        conv_params: list[tuple], # Список кортежей следующего вида: (кол-во блоков между пулингами, число каналов на входе/выходе блоков, размер ядра, функция активации)
        linear_params: list[tuple] = [], # Список кортежей следующего вида: (кол-во нейронов на i-ом слое, функция активации на i-ом слое). Начинаем с 2-ого слоя, заканчиваем предпоследним слоем (отсчёт с 1). (P.S. Кол-во нейронов и функция активации на входном и выхдном слоях известны и так)
        use_Softmax: bool = False
    ):

        super().__init__()

        conv_layers = []
        linear_layers = []

        # Собираем свёрточную часть

        in_channels = 1
        for group_num, (blocks_amount, channels_num, kernel_size, activation) in enumerate(conv_params):
            if group_num == len(conv_params) - 1:
                linear_in = channels_num # Рассчитываем входной вектор линейного слоя
                
            for block_num in range(blocks_amount):
                
                if block_num != 0:
                    in_channels = channels_num
                    
                conv_layers.append(
                    (
                        f'ResidualBlock {group_num} {block_num}',
                        ResidualConvBlock(
                            in_c = in_channels,
                            out_c = channels_num,
                            kernel_size = kernel_size,
                            activation = activation
                        )
                    )
                )

            if group_num < len(conv_params) - 1:
                conv_layers.append(
                    (
                        f'Pooling {group_num}',
                        torch.nn.AvgPool2d(2)
                    )
                )

        # Собираем Global Max Pooling

        gm_pooling = [('Global Max Pooling', GlobalMaxPooling())]

        # Собираем линейную часть
        
        if len(linear_params) == 0:
            linear_layers.append(('Linear 0', torch.nn.Linear(linear_in, classes_num)))
            linear_layers.append(('Activation 0', torch.nn.Softmax(-1) if use_Softmax else torch.nn.LogSoftmax(-1)))

        else:
            (first_out, activation) = linear_params[0]
            linear_layers.append(('Linear 0', torch.nn.Linear(linear_in, first_out)))
            linear_layers.append(('Activation 0', activation()))
            
            for i in range(len(linear_params)):
                (in_size, _) = linear_params[i - 1]
                (out_size, activation) = linear_params[i]
                linear_layers.append((f'Linear {i + 1}', torch.nn.Linear(in_size, out_size)))
                linear_layers.append((f'Activation {i + 1}', activation()))

            (last_in, _) = linear_params[-1]
            linear_layers.append((f'Linear {len(linear_params)}', torch.nn.Linear(last_in, classes_num)))
            linear_layers.append((f'Activation {len(linear_params)}', torch.nn.Softmax(-1) if use_Softmax else torch.nn.LogSoftmax(-1)))

        #

        all_layers = conv_layers + gm_pooling + linear_layers
        self.final_model = torch.nn.Sequential(OrderedDict(all_layers))

    def forward(self, x): # bs x h x w
        x = x.unsqueeze(1) # bs x c x h x w
        return self.final_model(x)

In [130]:
from torchvision import datasets

download_dir = '.\dataset'

train_data = datasets.MNIST(root=download_dir, download=True, train=True)
val_data = datasets.MNIST(root=download_dir, download=True, train=False)

x_train = train_data.data
y_train = train_data.targets

x_val = val_data.data
y_val = val_data.targets

x_train = x_train / 255.
x_val = x_val / 255.

  download_dir = '.\dataset'


In [140]:
model = ConvNetwork(
    classes_num=10,
    conv_params=[(3, 8, 3, torch.nn.ReLU), (3, 16, 3, torch.nn.ReLU), (3, 32, 3, torch.nn.ReLU)],
    linear_params=[(32, torch.nn.ReLU), (32, torch.nn.ReLU), (32, torch.nn.ReLU)]
) 

print(model)
# in out kenel padding

ConvNetwork(
  (final_model): Sequential(
    (ResidualBlock 0 0): ResidualConvBlock(
      (activation): ReLU()
      (conv): Conv2d(1, 8, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
      (in_layer): Conv2d(1, 8, kernel_size=(1, 1), stride=(1, 1))
    )
    (ResidualBlock 0 1): ResidualConvBlock(
      (activation): ReLU()
      (conv): Conv2d(8, 8, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
      (in_layer): Identity()
    )
    (ResidualBlock 0 2): ResidualConvBlock(
      (activation): ReLU()
      (conv): Conv2d(8, 8, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
      (in_layer): Identity()
    )
    (Pooling 0): AvgPool2d(kernel_size=2, stride=2, padding=0)
    (ResidualBlock 1 0): ResidualConvBlock(
      (activation): ReLU()
      (conv): Conv2d(8, 16, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
      (in_layer): Conv2d(8, 16, kernel_size=(1, 1), stride=(1, 1))
    )
    (ResidualBlock 1 1): ResidualConvBlock(
      (activation): ReLU()
      (conv)

In [None]:
optimizer = torch.optim.SGD(
    params=model.parameters(),
    lr=.001
)

EPOCHS_NUM = 10000
BATCH_SIZE = 1000
loss_fn = torch.nn.NLLLoss()
scheduler = torch.optim.lr_scheduler.ExponentialLR(optimizer, gamma=0.5)

for epoch in range(1, EPOCHS_NUM + 1):
    optimizer.zero_grad()

    # TODO: Доделать обучение

    batch_pos = torch.randint(low=0, high=y_train.shape[0], size=[BATCH_SIZE])
    pred = model(x_train[batch_pos])
    loss = loss_fn(pred, y_train[batch_pos])

    loss.backward()
    optimizer.step()

    if epoch % 100 == 0:
        print(f'{epoch}. loss: {loss.item()}')

    if epoch % 500 == 0:
        scheduler.step()

100. loss: 2.3025805950164795
200. loss: 2.3022401332855225
300. loss: 2.3036396503448486
400. loss: 2.3051130771636963
500. loss: 2.3072171211242676


In [None]:
x = torch.tensor([[1, 2, 3],
                  [4, 5, 6]])
print(x.shape)
x.unsqueeze_(1)
print(x)
print(x.shape)
x = x[:, None, :]
print(x)