## Лекция 6. Свёрточная нейронная сеть в PyTorch

In [179]:
# Соберём вручную простую свёрточную нейросеть, используя написанный ранее блок
from collections import OrderedDict

conv_layers = [
    (
        "residual_block_1",
        ResidualBlock(
            input_c=3,
            output_c=8,
            kernel_size=3,
            activation=torch.nn.ReLU,
        ),
    ),
    (
        "residual_block_2",
        ResidualBlock(
            input_c=8,
            output_c=16,
            kernel_size=3,
            activation=torch.nn.ReLU,
        ),
    ),
    (
        "pooling",
        torch.nn.MaxPool2d(kernel_size=2),
    ),
    (
        "residual_block_3",
        ResidualBlock(
            input_c=16,
            output_c=32,
            kernel_size=3,
            activation=torch.nn.ReLU,
        ),
    ),
]

conv_model = torch.nn.Sequential(OrderedDict(conv_layers))
conv_model

Sequential(
  (residual_block_1): ResidualBlock(
    (conv): Conv2d(3, 8, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (correct_channels): Conv2d(3, 8, kernel_size=(1, 1), stride=(1, 1))
    (activation): ReLU()
  )
  (residual_block_2): ResidualBlock(
    (conv): Conv2d(8, 16, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (correct_channels): Conv2d(8, 16, kernel_size=(1, 1), stride=(1, 1))
    (activation): ReLU()
  )
  (pooling): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
  (residual_block_3): ResidualBlock(
    (conv): Conv2d(16, 32, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (correct_channels): Conv2d(16, 32, kernel_size=(1, 1), stride=(1, 1))
    (activation): ReLU()
  )
)

In [216]:
# Функция .max() в torch - как сделать MaxPooling
h = torch.rand(2, 3)

print(h, h.max(1), h.max(1).values.shape, h[0, :], h[1, :], sep="\n\n")

tensor([[0.8826, 0.5636, 0.5012],
        [0.1230, 0.8688, 0.1818]])

torch.return_types.max(
values=tensor([0.8826, 0.8688]),
indices=tensor([0, 1]))

torch.Size([2])

tensor([0.8826, 0.5636, 0.5012])

tensor([0.1230, 0.8688, 0.1818])


In [186]:
# Теперь соберём вручную классификатор
lin_layers = [
    (
        "linear_1",
        torch.nn.Linear(
            in_features=32,
            out_features=16,
        ),
    ),
    (
        "linear_activation_1",
        torch.nn.ReLU(),
    ),
    (
        "linear_2",
        torch.nn.Linear(
            in_features=16,
            out_features=10,
        ),
    ),
    (
        "softmax",
        torch.nn.Softmax(-1),
    ),
]

lin_model = torch.nn.Sequential(OrderedDict(lin_layers))
lin_model

Sequential(
  (linear_1): Linear(in_features=32, out_features=16, bias=True)
  (linear_activation_1): ReLU()
  (linear_2): Linear(in_features=16, out_features=10, bias=True)
  (softmax): Softmax(dim=-1)
)

In [192]:
# Посмотрим на примере
x = torch.rand(2, 3, 32, 32)

print(f"Изначальный размер {x.shape}")

# Применяем свёрточные слои
x = conv_model(x)

print(f"После свёрток {x.shape}")

# Сделаем операцию GlobalMaxPooling
x = x.max(-1).values.max(-1).values

print(f"После глобального пулинга {x.shape}")

# Применяем классификатор
x = lin_model(x)

print(f"После классификатора {x.shape}\n")

print(x, x.sum(1), sep="\n\n")

Изначальный размер torch.Size([2, 3, 32, 32])
После свёрток torch.Size([2, 32, 16, 16])
После глобального пулинга torch.Size([2, 32])
После классификатора torch.Size([2, 10])

tensor([[0.0997, 0.0971, 0.0913, 0.0944, 0.1034, 0.1081, 0.0774, 0.1059, 0.1198,
         0.1029],
        [0.0996, 0.0975, 0.0907, 0.0939, 0.1030, 0.1084, 0.0774, 0.1064, 0.1200,
         0.1030]], grad_fn=<SoftmaxBackward0>)

tensor([1., 1.], grad_fn=<SumBackward1>)


In [108]:
# Повторим похожее, но в более общем виде при помощи обёртки в torch.nn.Module
IMG_SIZE = 32

class ResidualModel(torch.nn.Module):
    def __init__(
        self,
        out_size,  # Число классов
        residual_sizes,  # Задаём список троек: количество Residual блоков, число каналов и размер ядра в каждом таком блоке
        linear_sizes,  # Список размеров линейных слоёв в финальном классификаторе
        activation = torch.nn.ReLU,  # Какую функцию активации использовать
        flatten = True,  # Использовать Flatten или GlobalPooling
    ):
        super().__init__()
        self.flatten = flatten
        
        # Собираем все свёрточные слои в один блок
        conv_layers = []

        current_c = 3
        for group_index, (num_residual, out_channels, kernel_size) in enumerate(residual_sizes):
            for residual_index in range(num_residual):
                conv_layers.append((  # Добавляем несколько Residual блоков для каждой группы
                    f"residual_{group_index}_{residual_index}",
                    ResidualBlock(
                        input_c = current_c if residual_index == 0 else out_channels,
                        output_c = out_channels,
                        kernel_size = kernel_size,
                        activation = activation,
                    ),
                ))
            current_c = out_channels
            if group_index < len(residual_sizes) - 1:
                conv_layers.append((  # В конце группы добавляем один Pooling слой, если это не последняя группа
                    f"pooling_{group_index}",
                    torch.nn.AvgPool2d(2),
                ))
        
        self.conv_part = torch.nn.Sequential(OrderedDict(conv_layers))
        
        # Собираем все линейные слои во второй блок
        out_c = residual_sizes[-1][1]
        final_size = IMG_SIZE // 2 ** (len(residual_sizes) - 1)
        clf_in_size = (out_c * final_size**2) if flatten else out_c
        
        linear_sizes = [clf_in_size, *linear_sizes, out_size]
        
        clf_layers = []
        
        for i, (from_size, to_size) in enumerate(zip(linear_sizes[:-1], linear_sizes[1:])):
            clf_layers.append((f"linear_{i}", torch.nn.Linear(from_size, to_size)))
            if i < len(linear_sizes) - 2:
                clf_layers.append((f"act_{i}", activation()))
            else:
                clf_layers.append(("log_softmax", torch.nn.LogSoftmax(-1)))
                
        self.clf_part = torch.nn.Sequential(OrderedDict(clf_layers))
    
    def forward(self, x):  # input (bs x h x w x c_in)
        x = x.permute((0, 3, 1, 2))  # (bs x c_in x h x w)
        
        x = self.conv_part(x)  # bs x c_out x h' x w'
        
        if self.flatten:
            # Операция Flatten
            x = x.reshape(x.shape[0], -1)  # bs x (c_out * h' * w')
        else:
            # Операция GlobalMaxPooling
            x = x.max(-1).values.max(-1).values  # bs x c_out

        
        x = self.clf_part(x)  # bs x 10
        
        return x


def print_num_params(model):
    # Считаем общее количество обучаемых весов в модели
    sum_params_size = 0
    for name, param in model.named_parameters():
        param_size = param.numel()
        print("{:<46} ~ {:<14} ~ {}".format(name, str(list(param.shape)), param_size))
        sum_params_size += param_size
    print(f"Общее число параметров ~ {sum_params_size}")


model = ResidualModel(
    out_size=10,
    residual_sizes=[(2, 8, 3), (2, 16, 3), (2, 32, 3), (2, 32, 3)],
    linear_sizes=[32],
    activation=torch.nn.ReLU,
    flatten=True,
)

model

ResidualModel(
  (conv_part): Sequential(
    (residual_0_0): ResidualBlock(
      (conv): Conv2d(3, 8, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
      (correct_channels): Conv2d(3, 8, kernel_size=(1, 1), stride=(1, 1))
      (activation): ReLU()
    )
    (residual_0_1): ResidualBlock(
      (conv): Conv2d(8, 8, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
      (correct_channels): Identity()
      (activation): ReLU()
    )
    (pooling_0): AvgPool2d(kernel_size=2, stride=2, padding=0)
    (residual_1_0): ResidualBlock(
      (conv): Conv2d(8, 16, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
      (correct_channels): Conv2d(8, 16, kernel_size=(1, 1), stride=(1, 1))
      (activation): ReLU()
    )
    (residual_1_1): ResidualBlock(
      (conv): Conv2d(16, 16, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
      (correct_channels): Identity()
      (activation): ReLU()
    )
    (pooling_1): AvgPool2d(kernel_size=2, stride=2, padding=0)
    (residual_2_0): Re

In [109]:
x = torch.rand(12, 32, 32, 3)

model(x).shape

torch.Size([12, 10])

In [110]:
print_num_params(model)

conv_part.residual_0_0.conv.weight             ~ [8, 3, 3, 3]   ~ 216
conv_part.residual_0_0.conv.bias               ~ [8]            ~ 8
conv_part.residual_0_0.correct_channels.weight ~ [8, 3, 1, 1]   ~ 24
conv_part.residual_0_0.correct_channels.bias   ~ [8]            ~ 8
conv_part.residual_0_1.conv.weight             ~ [8, 8, 3, 3]   ~ 576
conv_part.residual_0_1.conv.bias               ~ [8]            ~ 8
conv_part.residual_1_0.conv.weight             ~ [16, 8, 3, 3]  ~ 1152
conv_part.residual_1_0.conv.bias               ~ [16]           ~ 16
conv_part.residual_1_0.correct_channels.weight ~ [16, 8, 1, 1]  ~ 128
conv_part.residual_1_0.correct_channels.bias   ~ [16]           ~ 16
conv_part.residual_1_1.conv.weight             ~ [16, 16, 3, 3] ~ 2304
conv_part.residual_1_1.conv.bias               ~ [16]           ~ 16
conv_part.residual_2_0.conv.weight             ~ [32, 16, 3, 3] ~ 4608
conv_part.residual_2_0.conv.bias               ~ [32]           ~ 32
conv_part.residual_2_0.corre

In [114]:
model = ResidualModel(
    out_size=10,
    residual_sizes=[(2, 8, 3), (2, 16, 3), (2, 32, 3), (2, 32, 3)],
    linear_sizes=[32],
    activation=torch.nn.ReLU,
    flatten=False,
)

model

ResidualModel(
  (conv_part): Sequential(
    (residual_0_0): ResidualBlock(
      (conv): Conv2d(3, 8, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
      (correct_channels): Conv2d(3, 8, kernel_size=(1, 1), stride=(1, 1))
      (activation): ReLU()
    )
    (residual_0_1): ResidualBlock(
      (conv): Conv2d(8, 8, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
      (correct_channels): Identity()
      (activation): ReLU()
    )
    (pooling_0): AvgPool2d(kernel_size=2, stride=2, padding=0)
    (residual_1_0): ResidualBlock(
      (conv): Conv2d(8, 16, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
      (correct_channels): Conv2d(8, 16, kernel_size=(1, 1), stride=(1, 1))
      (activation): ReLU()
    )
    (residual_1_1): ResidualBlock(
      (conv): Conv2d(16, 16, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
      (correct_channels): Identity()
      (activation): ReLU()
    )
    (pooling_1): AvgPool2d(kernel_size=2, stride=2, padding=0)
    (residual_2_0): Re

In [119]:
x = torch.rand(12, 32, 32, 3)

model(x).shape

torch.Size([12, 10])

In [116]:
print_num_params(model)

conv_part.residual_0_0.conv.weight             ~ [8, 3, 3, 3]   ~ 216
conv_part.residual_0_0.conv.bias               ~ [8]            ~ 8
conv_part.residual_0_0.correct_channels.weight ~ [8, 3, 1, 1]   ~ 24
conv_part.residual_0_0.correct_channels.bias   ~ [8]            ~ 8
conv_part.residual_0_1.conv.weight             ~ [8, 8, 3, 3]   ~ 576
conv_part.residual_0_1.conv.bias               ~ [8]            ~ 8
conv_part.residual_1_0.conv.weight             ~ [16, 8, 3, 3]  ~ 1152
conv_part.residual_1_0.conv.bias               ~ [16]           ~ 16
conv_part.residual_1_0.correct_channels.weight ~ [16, 8, 1, 1]  ~ 128
conv_part.residual_1_0.correct_channels.bias   ~ [16]           ~ 16
conv_part.residual_1_1.conv.weight             ~ [16, 16, 3, 3] ~ 2304
conv_part.residual_1_1.conv.bias               ~ [16]           ~ 16
conv_part.residual_2_0.conv.weight             ~ [32, 16, 3, 3] ~ 4608
conv_part.residual_2_0.conv.bias               ~ [32]           ~ 32
conv_part.residual_2_0.corre