# Нейронка класс сверточная MNIST

## Библиотеки

In [None]:
import torch
import torchvision

from torchvision import datasets
from torchvision.datasets import ImageFolder
from torch.utils.data import Dataset, DataLoader, random_split
from torchvision import utils
from torchvision.transforms import v2
from torchvision import transforms 

from torch import nn

import os
import json
import numpy as np
import matplotlib.pyplot as plt

import struct
import sys
import random

from array import array
from tqdm import tqdm
from matplotlib.ticker import AutoMinorLocator, MultipleLocator
from PIL import Image


# Обязательая ячейка

In [None]:
path_to_all_data = r'C:\Users\user\Desktop\learn models'

## Трансформации

In [None]:
transform = v2.Compose([
    v2.ToImage(),
    v2.Grayscale(), # очевидно - только 1 цветовой канал в градации серого
    v2.ToDtype(torch.float32, scale=True),
    v2.Normalize(mean=(0.5, ), std=(0.5, ))    
])

## Загрузка изображений

In [None]:
train_data = datasets.ImageFolder(root=os.path.join(path_to_all_data, r'MNIST\data\training'), transform=transform)
test_data = datasets.ImageFolder(root=os.path.join(path_to_all_data, r'MNIST\data\testing'), transform=transform)

train_data, val_data = random_split(train_data, [0.7, 0.3])

In [None]:
test_data.classes, test_data.class_to_idx

In [None]:
test_data

In [None]:
img, cls = test_data[200]
print(img.shape, cls)

In [None]:
train_loader = DataLoader(dataset=train_data, batch_size=32, shuffle=True)
val_loader = DataLoader(dataset=val_data, batch_size=32, shuffle=False)
test_loader = DataLoader(dataset=test_data, batch_size=32, shuffle=False)

In [None]:
imgs, clss = next(iter(train_loader))
plt.imshow(np.transpose(utils.make_grid(imgs, normalize=True), (1, 2, 0)))
for i in range(int(len(clss) / 8)):
    print(clss[i * 8:(i+1) * 8])


## GPU/CPU

In [None]:
device = 'cuda' if torch.cuda.is_available() else 'cpu'
device

## Модель

Формулы для расчёта карты признаков 

$$H_{out}=\Biggl\lfloor \frac{H_{in}+2\cdot\text{padding[0]}-\text{dilation[0]}\cdot(\text{kernel\_size[0]} - 1) - 1}{\text{stride[0]}} \Biggl\rfloor + 1$$

$$W_{out}=\Biggl\lfloor \frac{W_{in}+2\cdot\text{padding[1]}-\text{dilation[1]}\cdot(\text{kernel\_size[1]} - 1) - 1}{\text{stride[1]}} \Biggl\rfloor + 1$$

### Калькулятор для расчёта размеров карты признаков

In [None]:
padding = [0, 0]
dilation = [0, 0]
kernel_size = [3, 3]
stride = [1, 1]
H_in = 28
W_in = 28

H_out = int((H_in + 2 * padding[0] - dilation[0] * (kernel_size[0] - 1 ) -1 )/ stride[0]) + 1
W_out = int((W_in + 2 * padding[1] - dilation[0] * (kernel_size[1] - 1 ) -1 )/ stride[1]) + 1

print(f'H_out = {H_out}')
print(f'W_out = {H_out}')

In [None]:
class MyCnv(nn.Module):
    def __init__(self, in_channels, out):
        super().__init__()
        self.conv = nn.Sequential(
            nn.Conv2d(in_channels, 32, (3, 3)), # (3, 28, 28) -> (32, 26, 26)
            nn.ReLU(),
            nn.Conv2d(32, 64, (3, 3)), # (32, 26, 26) -> (64, 24, 24) 
            nn.ReLU()
        )
        self.flatten = nn.Flatten() # (64, 24, 24) -> (64*24*24)
        self.fc = nn.Sequential( # fully connected
            nn.Linear(64*24*24, 128),
            nn.ReLU(),
            nn.Dropout(0.3),
            nn.Linear(128, 10)
        )        
       
    def forward(self, x):
        x = self.conv(x)
        x = self.flatten(x)
        out = self.fc(x)
        return out

# x = x.flatten(start_dim=1, end_dim=-1) # как аналог можно заменить nn.Flatten()(x)

model = MyCnv(1, 10).to(device)

std_info = ''
model, model(torch.rand([16, 1, 28, 28], dtype=torch.float32).to(device)).shape

## Класс ранней остановки

In [None]:
class EarlyStopping:
    def __init__(self, mode='min', patience=10, treshold=0.0001, treshold_mode='rel'):
        if mode not in {'min', 'max'}:
            raise ValueError(f'Параметр mode может принимать только значения max и min - {mode}')
        if treshold_mode not in {'rel', 'abs'}:
            raise ValueError(f'Параметр treshold_mode может принимать только значения max и min - {treshold_mode}')
        if not isinstance(patience, int):
            raise ValueError(f'Параметр patience должен быть int - {type(patience)}, {patience}')
        if not isinstance(treshold, float):
            raise ValueError(f'Параметр treshold должен быть float - {type(treshold)}, {treshold}')
        if treshold >= 1.0:
            raise ValueError(f'Параметр treshold должен быть меньше 1.0 - {treshold}')
        
        self.mode = mode
        self.patience = patience
        self.treshold = treshold
        self.treshold_mode = treshold_mode
        self.count = 0
        self.best = None
        
        
    def __call__(self, tracker_parameter):
        current = float(tracker_parameter)
        if self.best is None:
            self.best = current
            return False
        
        if self.changed_better(current, self.best):
            self.best = current
            self.count = 0
        else:
            self.count += 1
            
        if self.count >= self.patience:
            self.count = 0
            return True
        return False
    
    
    def changed_better(self, current, best):
        if self.mode == 'min' and self.treshold_mode == 'rel':
            return current < best - best * self.treshold
        
        elif self.mode == 'min' and self.treshold_mode == 'abs':
            return current < best - self.treshold
        
        elif self.mode == 'max' and self.treshold_mode == 'rel':
            return current > best + best * self.treshold
        
        else:# self.mode == 'max' and self.treshold_mode == 'abs':
            return current > best + self.treshold
            

## Гиперпараметры, шедулер

In [None]:
EPOCHS = 10
train_loss, train_acc, val_loss, val_acc = [], [], [], []
lr_list = []
best_loss = None
treshold = 0.00001

In [None]:
loss_model = nn.CrossEntropyLoss()
opt = torch.optim.Adam(model.parameters(), lr=0.0001)
lr_scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(opt, mode='min', patience=3)
earlystopping = EarlyStopping(mode='min', patience=10)

## Обучение

In [None]:
def update_checkpoint():
    checkpoint = {
            'class_to_idx': for_one_hot_vector.class_to_idx,
            'info': std_info,
            'state_model': model.state_dict(),
            'state_opt': opt.state_dict(),
            'state_lr_scheduler': lr_scheduler.state_dict(),
            'loss': {
                'train_loss': train_loss,
                'val_loss': val_loss,
                'best_loss': best_loss
            },
            'metric': {
                'train_acc': train_acc,
                'val_acc': val_acc
            },
            'lr': lr_list,
            'epoch': {
                'EPOCHS': EPOCHS,
                'save_epoch': epoch
            }
        }
    return checkpoint

In [None]:
%%time
os.chdir(os.path.join(path_to_all_data, 'MNIST'))

for epoch in range(EPOCHS):
    model.train()
    running_train_loss = []
    true_answer = 0 
    train_loop = tqdm(train_loader, leave=False) 
    for x, targets in train_loop:
        x = x.to(device)
        targets = targets.reshape(-1).to(torch.int32) 
        targets = torch.eye(10)[targets].to(device)

        pred = model(x)
        loss = loss_model(pred, targets)
        opt.zero_grad() 
        loss.backward()
        opt.step() 
        
        running_train_loss.append(loss.item())
        mean_train_loss = sum(running_train_loss) / len(running_train_loss)
        true_answer += (pred.argmax(dim=1) == targets.argmax(dim=1)).sum().item()
        train_loop.set_description(f'Epoch [{epoch+1}/{EPOCHS}], train_loss={mean_train_loss:.4f}')

    running_train_acc = true_answer / len(train_data)
    train_loss.append(mean_train_loss)
    train_acc.append(running_train_acc)

    model.eval()
    with torch.no_grad(): 
        running_val_loss = []
        true_answer = 0
        for x, targets in val_loader:
            x = x.to(device)
            targets = targets.reshape(-1).to(torch.int32)
            targets = torch.eye(10)[targets].to(device)  
            
            pred = model(x)
            loss = loss_model(pred, targets)

            running_val_loss.append(loss.item())
            mean_val_loss = sum(running_val_loss) / len(running_val_loss)
            true_answer += (pred.argmax(dim=1) == targets.argmax(dim=1)).sum().item()

        running_val_acc = true_answer / len(val_data)
        val_loss.append(mean_val_loss)
        val_acc.append(running_val_acc)

        lr_scheduler.step(mean_val_loss)
        lr_list.append(lr_scheduler._last_lr[0])

        print(f'Epoch [{epoch+1}/{EPOCHS}], train_loss={mean_train_loss:.4f}, train_acc={running_train_acc:.4f}, val_loss={mean_val_loss:.4f}, val_acc={running_val_acc:.4f}')

    if best_loss is None:
        best_loss = mean_val_loss

    if mean_val_loss < best_loss - best_loss * treshold:
        best_loss = mean_val_loss
        
        checkpoint = update_checkpoint()
        torch.save(checkpoint, os.path.join('conv_model', f'model_{epoch+1}.pt'))
        for i in os.listdir('conv_model'):
            if os.path.join('conv_model', i) != os.path.join('conv_model', f'model_{epoch+1}.pt'):
                os.remove(os.path.join('conv_model', i))        
        print(f'На эпохе {epoch+1}, сохранена модель со значением функции потерь на валидации - {mean_val_loss:.4f}', end='\n\n')

    if earlystopping(mean_val_loss):
        print(f"\033[31mОбучение остановленно на {epoch + 1} эпохе.\033[0m")
        break
else:
    print(f'Достигнут лимит по эпохам - {EPOCHS}')

## Тест

In [None]:
model.eval()
with torch.no_grad(): 
    running_test_loss = []
    true_answer = 0
    for x, targets in test_loader:
        x = x.to(device)
        targets = targets.reshape(-1).to(torch.int32)
        targets = torch.eye(10)[targets].to(device)        
        pred = model(x)
        loss = loss_model(pred, targets)
        running_test_loss.append(loss.item())
        mean_test_loss = sum(running_test_loss) / len(running_test_loss)
        true_answer += (pred.argmax(dim=1) == targets.argmax(dim=1)).sum().item()
    running_test_acc = true_answer / len(val_data)
print(f'mean_test_loss={mean_test_loss:.4f}, running_test_acc={running_test_acc:.4f}')

## Графики 

In [None]:
fig, axs = plt.subplots(1, 1)
axs.plot(train_loss)
axs.plot(val_loss)
ax0 = axs.twinx()
ax0.plot(lr_list, color='green')
axs.legend(['loss_train', 'loss_val', 'lr'])
axs.grid()


In [None]:
fig, axs = plt.subplots(1, 1)
axs.plot(train_acc)
axs.plot(val_acc)
ax1 = axs.twinx()
ax1.plot(lr_list, color='green')
axs.legend(['acc_train', 'acc_val', 'lr'])
axs.grid()

## Загрузка модели

In [None]:
load_model = nn.Sequential(
    nn.Linear(28*28, 128),
    nn.ReLU(),
    nn.Linear(128, 10)
).to(device)

In [None]:
load_model_state = torch.load(os.path.join('model', f'model_3.pt'))

In [None]:
print(load_model_state)

In [None]:
checkpoint

In [None]:
load_loss_model = nn.CrossEntropyLoss()
load_opt = torch.optim.Adam(load_model.parameters(), lr=0.001)
load_lr_scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(load_opt)

In [None]:
load_model.load_state_dict(load_model_state['state_model'])
load_opt.load_state_dict(load_model_state['state_opt'])
load_lr_scheduler.load_state_dict(load_model_state['state_lr_scheduler'])

In [None]:
EPOCHS = load_model_state['epoch']['EPOCHS']
save_epoch = load_model_state['epoch']['save_epoch']

load_train_loss = load_model_state['loss']['train_loss']
load_train_acc = load_model_state['metric']['train_acc']
load_val_loss = load_model_state['loss']['val_loss']
load_val_acc = load_model_state['metric']['val_acc']
load_lr_list = load_model_state['lr']

load_best_loss = load_model_state['loss']['best_loss']