In [None]:
import torch
from torch import nn
from torch.utils.data import DataLoader
from torchvision import datasets
from torchvision.transforms import ToTensor, ToPILImage
from matplotlib import pyplot as plt

In [None]:
#загрузка тренировчного и тестового наборов данных.
#Для загрузки используется библиотека фраемворка pytorch
#под названием torchvision. В данном случае мы загружаем
#черно белые изображения одежды из 10 категорий. Подробнее:
#https://github.com/zalandoresearch/fashion-mnist
training_data = datasets.MNIST(
    root="data",
    train=True,
    download=True,
    transform=ToTensor(),
)

# Download test data from open datasets.
test_data = datasets.MNIST(
    root="data",
    train=False,
    download=True,
    transform=ToTensor(),
)

In [None]:
#Этот класс копирует поведение встроенного класса Dataset 
#библиотеки pytorch. В данном случае data - имитация какого-то
#набора данных. Волшебные методы __len__ и __getitem__ позволяют
#расширить функционал класса python, делая итеарацию по набору
#данных проще и удобнее.
class test:
  def __init__(self, data):
      self.data = data
  
  def __len__(self,):
      #вызывается в момент len(экземпляр класса test)
      return len(self.data)

  def __getitem__(self, key):
  #   #вызывается в момент экземпляр класса test [index]
      return self.data[key]

a = [1,2,3,4,5]
check = test(a)
check[2]

In [None]:
test_data

In [None]:
#каждый элемент данных, содержащийся training_data (аналогично для test_data)
#представляет из себя кортеж (tuple). Перывый элемент кортежа - изображение, 
#второй - метка класса. Далее мы нарисуем изображение, чтобы убедиться, что 
#метка класса и изображение совпадают.
image, label = test_data[5]
label

In [None]:
# 1 - соответсует метке брюк
image = ToPILImage(mode='L')(image)
plt.imshow(image, cmap = 'gray')

In [None]:
#batch_size - размер пакета, показывает какого количество данных за раз будет
#подаваться нейронной сети на вход. Если batch_size = 64, то за раз нейросеть
#будет получать 64 изображения. Этот параметр задается в самом начале и не меняется
#на всем этапе обучения. Является гиперпараметром. Влияет на точность.
batch_size = 64

#Создание экземпляра класса DataLoader, который будет автоматически собирать 
#данные в пакет нужного нам размера. По сути расширение Dataset. Далее мы будем
#использовать именно DataLoader. Можно посмотреть какого размера данные (тензоры)
#на выходе DataLoader.
train_dataloader = DataLoader(training_data, batch_size=batch_size)
test_dataloader = DataLoader(test_data, batch_size=batch_size)

for X, y in test_dataloader:
    print(f"Shape of X [N, C, H, W]: {X.shape}")
    print(f"Shape of y: {y.shape} {y.dtype}")
    break

In [None]:
#выбор устройства для обучения. Чтобы поменять на gpu (видеокарту) для увеличения
#скорости нужно перейти в кладку "Среда выполнения" (сверху) и выбрать параметр
#"сменить среду выполнения"
device = "cuda" if torch.cuda.is_available() else "cpu"
print(f"Using {device} device")

#Создание класса нейронной сети. Он наследуется от nn.Module (класс Pytorch)
#В этом классе вам нужно определять слои нейронной сети. Метод forward - исполь
#зуется для прямого прохода сети (смотри презентацию), т.е. считает предсказание.

#nn.Sequential используется для объединения нескольких блоков нейронной сети в один

class NeuralNetwork(nn.Module):
    def __init__(self):
        super().__init__()
        self.flatten = nn.Flatten()
        self.linear_relu_stack = nn.Sequential(
            nn.Linear(28*28, 1024),
            nn.Sigmoid(),
            nn.Linear(1024, 512),
            nn.Sigmoid(),
            nn.Linear(512, 512),
            nn.Sigmoid(),
            nn.Linear(512, 10)
        )
    #предсказание
    def forward(self, x):
        x = self.flatten(x)
        logits = self.linear_relu_stack(x)
        return logits

model_fc = NeuralNetwork().to(device)
print(model_fc)

In [None]:
#проверка, что нейронная сеть работает
for X, y in test_dataloader:
    X = X.to(device)
    y_pred = model_fc(X)
    print(y_pred.shape)
    break

In [None]:
#выбор функции потерь, которой мы будем считать ошибку предсказания. Для клас-
#сификации обычно используется CrossEntropyLoss, но можно использовать и MSE.
loss_fn = nn.CrossEntropyLoss()
#Выбор оптимайзера. Он занимается обновлением параметров. Сейчас не вижу смысла
#дотошно разбирать как он работает. Он, в отличии от нас на лекции, более хитро
#обновляет веса модели, а раз он обновляет веса именно ему нужно передать параметр
#скорости обучения lr!
optimizer = torch.optim.Adam(model_fc.parameters(), lr=2e-3)

In [None]:
def train(dataloader, model, loss_fn, optimizer):
    size = len(dataloader.dataset)
    #перевод модели в режим тренировки. Это нужно указывать, так как некоторые
    #слои сети по-разному считаются на тренировке и на тесте.
    model.train()
    for batch, (X, y) in enumerate(dataloader):
        #извлекаем данные из dataloader и ФИЗИЧЕСКИ перекладываем их на то же
        #устройство, на котором находится модель. 
        X, y = X.to(device), y.to(device)

        #делаем предсказание
        pred = model(X)
        #считаем ошибку
        loss = loss_fn(pred, y)

        #зануляем градиенты с предыдущего шага
        optimizer.zero_grad()
        
        #считаем градиенты
        loss.backward()
        #обновляем веса модели
        optimizer.step()

        if batch % 100 == 0:
            loss, current = loss.item(), (batch + 1) * len(X)
            print(f"loss: {loss:>7f}  [{current:>5d}/{size:>5d}]")

def test(dataloader, model, loss_fn):
    size = len(dataloader.dataset)
    num_batches = len(dataloader)
    model.eval()
    test_loss, correct = 0, 0
    #для предсказания на тестовых данных всегда используейте with torch.no_grad()
    #чтобы pytorch не считал градиенты.
    with torch.no_grad():
        for X, y in dataloader:
            X, y = X.to(device), y.to(device)
            pred = model(X)
            test_loss += loss_fn(pred, y).item()
            correct += (pred.argmax(1) == y).type(torch.float).sum().item()
    test_loss /= num_batches
    correct /= size
    print(f"Test Error: \n Accuracy: {(100*correct):>0.1f}%, Avg loss: {test_loss:>8f} \n")

In [None]:
#собираем все вместе. Каждая эпоха состоит из 2х циклов - обучения и теста. 
epochs = 5
for t in range(epochs):
    print(f"Epoch {t+1}\n-------------------------------")
    train(train_dataloader, model_fc, loss_fn, optimizer)
    test(test_dataloader, model_fc, loss_fn)
print("Done!")

In [None]:
img, label = test_data[5]
y = model_fc(img)

In [None]:
torch.save(model_fc.state_dict(), 'model.pt')

In [None]:
#Для визуализации результатов
def plot(model, testdataset, imgs = 9):
  fig = plt.figure(figsize=(16,16))
  for i in range(imgs):
      a = fig.add_subplot(3, 3, i + 1)
      image, label = testdataset[i]
      predict = torch.argmax(model(image))
      image = ToPILImage(mode='L')(image)
      plt.imshow(image, cmap = 'gray')
      a.set_title(str(predict == label))
  plt.show()
plot(model_fc, test_data)

### Метрики ###

In [None]:
from sklearn.metrics import recall_score
from sklearn.metrics import precision_score
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay

In [None]:
predicts = []
real_labels = []
model_fc.eval()
with torch.no_grad():
    for X, y in test_dataloader:
        X = X.to(device)
        pred = model_fc(X)
        predicts.extend(pred.argmax(1).tolist())
        real_labels.extend(y.tolist())

In [None]:
predicts[:5], real_labels[:5]

In [None]:
precision_score(predicts, real_labels, average=None)

In [None]:
recall_score(predicts, real_labels, average=None)

In [None]:
cm = confusion_matrix(predicts, real_labels, labels=range(10))
disp = ConfusionMatrixDisplay(confusion_matrix=cm, display_labels=range(10))
disp.plot()

In [None]:

class ConvNeuralNetwork(nn.Module):
    def __init__(self):
        super().__init__()
        self.flatten = nn.Flatten()
        self.layer1 = self.create_block(1, 16, 3)
        self.layer2 = self.create_block(16, 32, 3)
        self.layer3 = self.create_block(32, 64, 3)
        self.classificator = nn.Linear(64, 10)


    def create_block(self, in_channels, out_channels, kernel_size):
        layer = nn.Sequential(nn.Conv2d(in_channels, out_channels, kernel_size = kernel_size),
                              nn.MaxPool2d(kernel_size = 2, stride = 2),
                              nn.ReLU())
        return layer
    #предсказание
    def forward(self, x):
        x = self.layer1(x)
        x = self.layer2(x)
        x = self.layer3(x)
        x = self.flatten(x)
        x = self.classificator(x)
        return x


conv_model = ConvNeuralNetwork().to(device)

for X, y in test_dataloader:
    X = X.to(device)
    y_pred = conv_model(X)
    print(y_pred.shape)
    break

In [None]:
#выбор функции потерь, которой мы будем считать ошибку предсказания. Для клас-
#сификации обычно используется CrossEntropyLoss, но можно использовать и MSE.
loss_fn = nn.CrossEntropyLoss()
#Выбор оптимайзера. Он занимается обновлением параметров. Сейчас не вижу смысла
#дотошно разбирать как он работает. Он, в отличии от нас на лекции, более хитро
#обновляет веса модели, а раз он обновляет веса именно ему нужно передать параметр
#скорости обучения lr!
optimizer = torch.optim.Adam(conv_model.parameters(), lr=2e-3)

epochs = 5
for t in range(epochs):
    print(f"Epoch {t+1}\n-------------------------------")
    train(train_dataloader, conv_model, loss_fn, optimizer)
    test(test_dataloader, conv_model, loss_fn)
print("Done!")

In [None]:
image, _ = test_data[5]

In [None]:
image_ = ToPILImage(mode='L')(image)
plt.imshow(image_, cmap = 'gray')

In [None]:
one = image[:,:,10:21]

In [None]:
image_ = ToPILImage(mode='L')(one)
plt.imshow(image_, cmap = 'gray')

In [None]:
background = torch.zeros(1, 28, 28)
background[:,:, 4:15] = one
image_ = ToPILImage(mode='L')(background)
plt.imshow(image_, cmap = 'gray')

In [None]:
#загрузить сохраненную модель
weights = torch.load('your_model.pt',  map_location=torch.device('cpu'))
fc = NeuralNetwork()
fc.load_state_dict(weights)

In [None]:
# подайте батч модели, чтобы получить предсказания.
# Сравните ответы!
batch = torch.stack([image,background], 0)

### Задание ###
**Вам нужно улучшить качество сверточной нейронной сети. Качество будем смотреть по Confusion Matrix (смотри раздел с метриками). Сохраните веса модели (файл <your_model_name.pt>)!**
___
> Как можно улучшить качество?

1.   Добавить слой BatchNorm2d (слой нормализации добавляют перед функцией активации)
2.   Сделать модель глубже
3.   Сделать слой классификатора более глубоким.

Это творческое задание, не бойтесь пробовать !

