# Собственная архитектура НС

In [1]:
# установка дополнительных библиотек для работы с НС
!pip install -q kaggle torch torchvision torchmetrics ipywidgets

In [2]:
import gc
import os
import time
import random
import shutil
import warnings
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

from copy import deepcopy
# from tqdm import tqdm
from tqdm.notebook import tqdm


import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import torchvision
from torchvision import datasets, transforms
from torchmetrics import Accuracy

In [3]:
%matplotlib inline
warnings.filterwarnings("ignore")

# Подготовка данных

In [4]:
ROOT_DIR = './'
SPLIT_DATA_DIR = ROOT_DIR + 'data_prepared/'

TRAIN_DATA_DIR = SPLIT_DATA_DIR + 'train/'
VALID_DATA_DIR = SPLIT_DATA_DIR + 'valid/'
TEST_DATA_DIR = SPLIT_DATA_DIR + 'test/'

# TEST_DATA_FAKE_LABEL_COPY_DIR = TEST_DATA_COPY_DIR + 'all_classes/'
# SHORT_TRAIN_DATA_COPY_DIR = DATA_COPY_DIR + 'short_train/'

Подготовим разбиение на train и valid

In [5]:
def train_valid_split(
    num_val_images_per_class,
    num_test_images_per_class
):

    os.makedirs(VALID_DATA_DIR, exist_ok = True)

    classes = os.listdir(TRAIN_DATA_DIR)

    for class_name in classes:

        list_of_pics = os.listdir(TRAIN_DATA_DIR + class_name)

        os.mkdir(VALID_DATA_DIR + class_name)

        val_list_of_pics = np.random.choice(list_of_pics, size = num_val_images_per_class, replace=False)

        for pic in val_list_of_pics:
            shutil.move(TRAIN_DATA_DIR + class_name + '/' + pic, VALID_DATA_DIR + class_name + '/' + pic)
            
    os.makedirs(TEST_DATA_DIR, exist_ok = True)

    for class_name in classes:

        list_of_pics = os.listdir(TRAIN_DATA_DIR + class_name)

        os.mkdir(TEST_DATA_DIR + class_name)

        val_list_of_pics = np.random.choice(list_of_pics, size = num_val_images_per_class, replace=False)

        for pic in val_list_of_pics:
            shutil.move(TRAIN_DATA_DIR + class_name + '/' + pic, TEST_DATA_DIR + class_name + '/' + pic)
            
if not os.path.isdir(VALID_DATA_DIR):
    train_valid_split(100, 50)

Подготовим даталоадеры. Добавте в transform аугментацию.

Например, следующими функциями (где нужно)
- transforms.Normalize
- transforms.RandomHorizontalFlip
- transforms.RandomResizedCrop

In [6]:
DEVICE = 'cuda:0' if torch.cuda.is_available() else 'cpu'

In [7]:
torch.manual_seed(42)
BATCH_SIZE = 64
DEVICE = 'cuda:0' if torch.cuda.is_available() else 'cpu'

transform_train = transforms.Compose([transforms.ToTensor()])

transform_valid = transforms.Compose([transforms.ToTensor()])

transform_test = transforms.Compose([transforms.ToTensor()])

train_dataset = datasets.ImageFolder(TRAIN_DATA_DIR, transform=transform_train)
valid_dataset = datasets.ImageFolder(VALID_DATA_DIR, transform=transform_valid)
test_dataset = datasets.ImageFolder(TEST_DATA_DIR, transform = transform_test)

train_dataloader = torch.utils.data.DataLoader(
    train_dataset,
    batch_size = BATCH_SIZE,
    shuffle = True
)

valid_dataloader = torch.utils.data.DataLoader(
    valid_dataset,
    batch_size = BATCH_SIZE,
    shuffle = False
)

test_dataloader = torch.utils.data.DataLoader(
    test_dataset,
    batch_size = 1,
    shuffle = False
)

# Архитектура модели

Сначала собираем и учим автоэнкодер

In [12]:
class AutoEncoder(nn.Module):
    def __init__(self):
        super().__init__()

        def initialization(layer):
            if isinstance(layer, nn.Conv2d) or isinstance(layer, nn.ConvTranspose2d):
                nn.init.kaiming_normal_(layer.weight)
                layer.bias.data.fill_(0.)

        self.encoder = nn.Sequential(nn.Conv2d(3, 128, kernel_size=(3, 3), padding = 1, stride = 1),
                                     nn.ELU(),
                                     nn.MaxPool2d(kernel_size=(2, 2)),
                                     nn.Conv2d(128, 64, kernel_size=(3, 3), padding = 1, stride = 1),
                                     nn.ELU(),
                                     nn.MaxPool2d(kernel_size=(2, 2)),
                                     nn.Conv2d(64, 32, kernel_size=(3, 3), padding = 1, stride = 1),
                                     nn.ELU()
        )

        self.decoder = nn.Sequential(nn.ConvTranspose2d(32, 64, kernel_size=(3, 3),
                                                        padding = 1, stride = 1),
                                     nn.ELU(),
                                     nn.Upsample(scale_factor=2, mode='nearest'),
                                     nn.ConvTranspose2d(64, 128, kernel_size=(3, 3), 
                                                        padding = 1, stride = 1),
                                     nn.ELU(),
                                     nn.Upsample(scale_factor=2, mode='nearest'),
                                     nn.ConvTranspose2d(128, 3, kernel_size=(3, 3), 
                                                        padding = 1, stride = 1),
                                     nn.ELU()
        )

        self.apply(initialization)

    def forward(self, x):
        latent_code = self.encoder(x)
        reconstruction = self.decoder(latent_code)
        return reconstruction

In [13]:
for x_train, y_train in  train_dataloader:
    break
    
for x_valid, y_valid in  valid_dataloader:
    break

In [14]:
model = AutoEncoder()
criterion = torch.nn.MSELoss()
optimizer = optim.Adam(model.parameters(), lr = 0.0003)
                      
def run_train(model, criterion, optimizer, x_train, x_valid):
    for epoch in range(50):
        print(f'epoch: {epoch}')
        
        model.train()
        optimizer.zero_grad()
        output = model(x_train) 
        loss = criterion(output, x_train)
        loss.backward()
        print(f'loss_train: {loss.item()}')
        optimizer.step()

        model.eval()
        output = model(x_valid)
        loss = criterion(output, x_valid)
        print(f'loss_valid: {loss.item()}')

run_train(model, criterion, optimizer, x_train, x_valid) # переобучение на одном батче




### Вопросы:
* Зачем делать шаг переобучения на одном батче? 
* Мы используем MSE лосс - что это? Какие лоссы вы еще знаете для задачи регрессии? Опишите их применимость и особенности
* А что произойдет, если мы заменим лосс на бинарный (например на бинарную кросс энтропию)?


In [20]:
class FullModel(nn.Module):
    def __init__(self,ae_model, num_classes: int = 200) -> None:
        super(FullModel, self).__init__()

        self.features = nn.Sequential(
            ae_model.encoder
        )

        self.avgpool = nn.AvgPool2d((3, 3))

        self.classifier = nn.Sequential(
            nn.Dropout(0.5),
            nn.Linear(32 * 5 * 5, 2024),
            nn.ReLU(),
            nn.Dropout(0.5),
            nn.Linear(2024, 2024),
            nn.ReLU(),
            nn.Linear(2024, num_classes),
        )

    def _forward_impl(self, x):
        out = self.features(x)
        out = self.avgpool(out)
        out = torch.flatten(out, 1)
        out = self.classifier(out)

        return out
    
    def forward(self, x):
        return self._forward_impl(x)

 ### Вопрос:
 *  Как понять входную размерность слоя в classifier? (32 * 5 * 5 что значат эти цифры?)
 * Какую задачу пытается решить autoencoder? 
 * Опишите разницу между max_pool и avg_pool. Как вы думаете когда стоит использовать avg_pool, а когда max_pool?

# Процесс обучения

В классе реализованно итерирование по эпохам и бачам (через даталоадер)

In [22]:
class model_training():

    def __init__(self, lr, train_autoencode, trainloader, testloader, device, model):
        self.trainloader = trainloader
        self.testloader = testloader
        self.device = device
        self.train_autoencode = train_autoencode
        
        self.model = model
        if train_autoencode:
            self.loss_fn = torch.nn.MSELoss()
        else:
            self.loss_fn = torch.nn.CrossEntropyLoss()
            
        
        
        # добвляем возможность выбирать лосс, чтобы учить в одном классе обе модели
        
        self.opt = optim.Adam(
            model.parameters(),
            lr = lr,
            weight_decay = 0.0001
        )

        self.best_model = None
        self.best_epoch = None

        self.loss_train = []
        self.loss_test = []
        self.metric_train = []
        self.metric_test = []

    def accuracy(self, y_predicts, y_labels):
        acc = (y_predicts == y_labels).sum().item() / y_predicts.size(0)
        return acc

    def train_nn(self, trainloader, model, opt, loss_fn, device):
        """Функция итерирование по бачам для обучения"""
        model.train()
        running_loss_train = []
        running_acc_train = []
        if self.train_autoencode:
            for batch in tqdm(train_dataloader):
                inputs, labels = batch
                inputs = inputs.to(device)

                opt.zero_grad()
                outputs = model(inputs).to(device)
                

                loss = loss_fn(outputs, inputs)
                loss.backward()
                opt.step()

                running_loss_train.append(loss.item())
                running_acc_train.append(loss.item())
        else:
            for batch in tqdm(train_dataloader):
                inputs, labels = batch
                inputs = inputs.to(device)
                labels = labels.to(device)

                opt.zero_grad()
                outputs = model(inputs).to(device)
                y_pred = nn.Softmax(dim=1)(outputs).argmax(dim=1).to(device)

                loss = loss_fn(outputs, labels)
                loss.backward()
                opt.step()

                running_loss_train.append(loss.item())
                running_acc_train.append(self.accuracy(y_pred, labels))
            

 
            
        return model, running_loss_train, running_acc_train

    def eval_nn(self, testloader, model, loss_fn, device):
        """Функция итерирование по бачам для валидации"""
        model.eval()
        running_loss_test = []
        running_acc_test = []
        if self.train_autoencode:
            for batch in tqdm(valid_dataloader):
                inputs, labels = batch
                inputs = inputs.to(device)

                outputs = model(inputs).to(device)
                loss = loss_fn(outputs, inputs)

                running_loss_test.append(loss.item())
                running_acc_test.append(loss.item())
        else:
            
        
            for batch in tqdm(valid_dataloader):

                inputs, labels = batch
                inputs = inputs.to(device)
                labels = labels.to(device)

                outputs = model(inputs).to(device)
                y_pred = nn.Softmax(dim=1)(outputs).argmax(dim=1).to(device)
                loss = loss_fn(outputs, labels)

                running_loss_test.append(loss.item())
                running_acc_test.append(self.accuracy(y_pred, labels))

        return running_loss_test, running_acc_test

    def training_loop(self, max_epochs = 10):
        """Функция обучения по эпохам"""
        print('Начинаю обучение')
        start_training_time = time.time()

        for epoch in tqdm(range(max_epochs)):

            start_epoch_time = time.time()

            self.model, running_loss_train, running_acc_train = self.train_nn(
                self.trainloader,
                self.model,
                self.opt,
                self.loss_fn,
                self.device
            )

            running_loss_test, running_acc_test = self.eval_nn(
                self.testloader,
                self.model,
                self.loss_fn,
                self.device
            )

            self.loss_train.append(np.mean(running_loss_train))
            self.loss_test.append(np.mean(running_loss_test))

            self.metric_train.append(np.mean(running_acc_train))
            self.metric_test.append(np.mean(running_acc_test))

            if np.mean(running_acc_test) >= np.max(self.metric_test):
                self.best_model = deepcopy(self.model)
                self.best_epoch = epoch
                torch.save(main_loop.best_model, './models/custom_model.pt')

            duration_epoch = time.time() - start_epoch_time

            print(f"""EPOCH {epoch} :
            train_loss: {self.loss_train[-1]:.5f}
            test_loss: {self.loss_test[-1]:.5f}
            train_acc: {self.metric_train[-1]:.5f}
            test_acc: {self.metric_test[-1]:.5f}
            Эпоха заняла по времени {round(duration_epoch / 60, 2)} минут""")

        duration_total = time.time() - start_training_time
        print(f'Всего обучение заняло: {round(duration_total / 60, 2)} минут')
        print(f'Лучшее значение метрики было достингнуто на {self.best_epoch} эпохе')

        fig, axes = plt.subplots(nrows = 2, ncols = 2, figsize=(10, 8))

        axes[0, 0].plot(range(max_epochs), self.loss_train)
        axes[0, 0].set_title('loss_train')

        axes[0, 1].plot(range(max_epochs), self.loss_test)
        axes[0, 1].set_title('loss_test')

        axes[1, 0].plot(range(max_epochs), self.metric_train)
        axes[1, 0].set_title('metric_train')

        axes[1, 1].plot(range(max_epochs), self.metric_test)
        axes[1, 1].set_title('metric_test')

        plt.tight_layout()
        plt.show()

In [None]:
model_ae = AutoEncoder()
model_ae.to(DEVICE)
train_autoencode = True

main_loop_ae = model_training(0.01,train_autoencode, train_dataloader, valid_dataloader, DEVICE, model_ae)
main_loop_ae.training_loop(1)

In [None]:
model = FullModel(model_ae)
model.features.requires_grad_ = False
model.to(DEVICE)
train_autoencode = False
main_loop = model_training(0.01, train_autoencode, 
                           train_dataloader, 
                           valid_dataloader, 
                           DEVICE, model)
main_loop.training_loop(1)

 ### Вопрос:
 * Опишите разницу применения автоэнкодера и трансферлернинга. Когда автоэнкодер предпочтительнее?
 * Возможно ли переобучение когда мы обучаем автоэкодер? Надо ли снижать уровень регуляризации при построении (относительно обычной модели)
 * Что такое латентное пространство в автоэнкодере? Может ли оно быть больше чем исходная размерность объектов?
 * Можно ли строить автоэнкодеры для текстов?

# Оценка качества на тесте

In [14]:
best_model = torch.load('./models/custom_model.pt')

loss_fn = nn.CrossEntropyLoss()

def accuracy(y_predicts, y_labels):
    acc = (y_predicts == y_labels).sum().item() / y_predicts.size(0)
    return acc

best_model.eval()
running_loss_test = []
running_acc_test = []
        
for batch in tqdm(valid_dataloader):

    inputs, labels = batch
    inputs = inputs.to(DEVICE)
    labels = labels.to(DEVICE)

    outputs = best_model(inputs).to(DEVICE)
    y_pred = nn.Softmax(dim=1)(outputs).argmax(dim=1).to(DEVICE)
    loss = loss_fn(outputs, labels)

    running_loss_test.append(loss.item())
    running_acc_test.append(accuracy(y_pred, labels))
    
    
print(np.mean(running_loss_test))
print(np.mean(running_acc_test))

  0%|          | 0/313 [00:00<?, ?it/s]

5.301841595683235
0.0049920127795527154


# Подготовка и отправка на kaggle
Проскорте полученной моделью данные для сабмита и отправьте

In [13]:
# !kaggle competitions submit -c hse-summer-2023-cnn -f <ваши прогнозы>.csv -m "Message"