# Нейронные сети
__Суммарное количество баллов: 10__

Для начала вам предстоит реализовать свой собственный backpropagation и протестировать его на реальных данных, а затем научиться обучать нейронные сети при помощи библиотеки `PyTorch` и использовать это умение для классификации классического набора данных CIFAR10.

Обратите внимание, что использование PyTorch во всех заданиях кроме последнего запрещено. Автоматической проверки на его использование не будет, однако все посылки будут проверены вручную. 

In [1]:
import numpy as np
import copy
from sklearn.datasets import make_blobs, make_moons
from typing import List, NoReturn

In [2]:
import torch
from torch import nn
import torch.nn.functional as F


### Задание 1 (3 балла)
Нейронные сети состоят из слоев, поэтому для начала понадобится реализовать их. Пока нам понадобятся только три:

`Linear` - полносвязный слой, в котором `y = Wx + b`, где `y` - выход, `x` - вход, `W` - матрица весов, а `b` - смещение. 

`ReLU` - слой, соответствующий функции активации `y = max(0, x)`.


#### Методы
`forward(X)` - возвращает предсказанные для `X`. `X` может быть как вектором, так и батчем

`backward(d)` - считает градиент при помощи обратного распространения ошибки. Возвращает новое значение `d`

`update(alpha)` - обновляет веса (если необходимо) с заданой скоростью обучения

#### Оценка
Валидируется корректность работы каждого модуля отдельно. Ожидается, что выходы каждого модуля будут незначительно отличаться от ожидаемых выходов, а подсчет градиента и градиентный спуск будут работать корректно.

In [None]:
from task import ReLU, Linear

In [92]:
# Task 1
# https://sgugger.github.io/a-simple-neural-net-in-numpy.html

class Module:
    """
    Абстрактный класс. Его менять не нужно. Он описывает общий интерфейс взаимодествия со слоями нейронной сети.
    """
    def forward(self, x):
        pass
    
    def backward(self, d):
        pass
        
    def update(self, alpha):
        pass
    
    
class Linear(Module):
    """
    Линейный полносвязный слой.
    """
    def __init__(self, in_features: int, out_features: int):
        """
        Parameters
        ----------
        in_features : int
            Размер входа.
        out_features : int 
            Размер выхода.
    
        Notes
        -----
        W и b инициализируются случайно.
        """
        self.in_features = in_features
        self.out_features = out_features
        self.weights = np.random.normal(0, 1/np.sqrt(self.in_features + self.out_features), (in_features + 1, out_features))
        # веса инициализируются случайно из нормального с параметрами 0, 1/sqrt(in+out)

        # у каждой вершины набор из out_features различных наборов весов
        #self.weights = np.random.normal(0, 1/np.sqrt(self.in_features + self.out_features), (self.in_features, self.out_features, self.in_features + 1)) #bias inside
        # in_features - количество фич в датасете
        # out_features - количество фич на выходе
    
    def forward(self, x: np.ndarray) -> np.ndarray:
        """
        Возвращает y = Wx + b.

        Parameters
        ----------
        x : np.ndarray
            Входной вектор или батч.
            То есть, либо x вектор с in_features элементов,
            либо матрица размерности (batch_size, in_features).
    
        Return
        ------
        y : np.ndarray
            Выход после слоя.
            Либо вектор с out_features элементами,
            либо матрица размерности (batch_size, out_features)

        """

        self.x = np.insert(np.copy(x), x.shape[1], np.ones(len(x)), axis=1) #last column for bias вынести в общий класс
        #self.x = x
        print('lin layer, x and weights shape ', self.x.shape, self.weights.shape)
        res = self.x @ self.weights  
        print('lin layer res shape ', res.shape)
        #print(res.shape)     
        return res

    
    def backward(self, d: np.ndarray) -> np.ndarray:
        """
        Cчитает градиент при помощи обратного распространения ошибки.

        """
        print('d shape ', d.shape)
        print('lin layer backward weights ', self.weights.shape)
        self.grad_w = (np.matmul(self.x[:,:,None],d[:,None,:])).mean(axis=0) #dloss/dw_k
        #print(d)
        #print(self.weights)
        res = np.dot(d,self.weights.transpose()) #dloss/dx_k
        print('backward lin layer res ', res[:,:-1])
        return res[:,:-1]
        
    def update(self, alpha: float) -> NoReturn:
        """
        Обновляет W и b с заданной скоростью обучения.

        Parameters
        ----------
        alpha : float
            Скорость обучения.
        """
        #градиентный спуск с альфа коэф 
        # нужна производная по w
        print('grad descent, weights ', self.weights.shape)
        print('grad_w shape ', self.grad_w.shape)
        #print(alpha*self.grad_w)
        self.weights -= alpha*self.grad_w
    

class ReLU(Module):
    """
    Слой, соответствующий функции активации ReLU. Данная функция возвращает новый массив, в котором значения меньшие 0 заменены на 0.
    """
    def __init__(self):
        pass
    
    def forward(self, x: np.ndarray) -> np.ndarray:
        """
        Возвращает y = max(0, x).

        Parameters
        ----------
        x : np.ndarray
            Входной вектор или батч.
    
        Return
        ------
        y : np.ndarray
            Выход после слоя (той же размерности, что и вход).

        """
        self.x = x
        return np.where(x <= 0, 0, x)

        
    def backward(self, d) -> np.ndarray:
        """
        Cчитает градиент при помощи обратного распространения ошибки.

        Parameters
        ----------
        d : np.ndarray
            Градиент.
        Return
        ------
        np.ndarray
            Новое значение градиента.
        """
        print('relu res ', np.where(self.x <= 0, 0, d))
        return np.where(self.x <= 0, 0, d)
    

In [None]:
a = np.array([1, 2, -1])
print(np.where(a < 0, 0, a))

[1 2 0]


In [None]:

b = np.array([[1, 2, 3], [1, 4, 5], [0, 2, 4]])

in_features = 3
out_features = 2
weights = np.random.normal(0, 1/np.sqrt(in_features + out_features), (out_features, b.shape[1])) 
print('b ', b)
print('weights', weights)



print(b.shape, weights.shape)
res = np.zeros((out_features, b.shape[1])).T #выход слоя

cur = 0
for i in range(len(b)):
    #print(i.shape, weights.shape)
    cur = b[i] @ weights.T
    print(cur)
    res[i] = cur
    print(res)

print(res.T) # результат

# хотим: вектор размера (out_features, b.shape[1])

b  [[1 2 3]
 [1 4 5]
 [0 2 4]]
weights [[-1.03953021  0.72904043 -0.48184614]
 [ 0.81795713 -0.12874812 -0.3948837 ]]
(3, 3) (2, 3)
[-1.02698778 -0.6241902 ]
[[-1.02698778 -0.6241902 ]
 [ 0.          0.        ]
 [ 0.          0.        ]]
[-0.53259922 -1.67145384]
[[-1.02698778 -0.6241902 ]
 [-0.53259922 -1.67145384]
 [ 0.          0.        ]]
[-0.46930372 -1.83703103]
[[-1.02698778 -0.6241902 ]
 [-0.53259922 -1.67145384]
 [-0.46930372 -1.83703103]]
[[-1.02698778 -0.53259922 -0.46930372]
 [-0.6241902  -1.67145384 -1.83703103]]


In [None]:
a = np.array([1, 2, 3])
w = np.array([1, 2, 3])
b = np.array([[1, 2, 3], [1, 4, 5]])
print(a @ b.T)
print(a.shape, b.shape)

[14 24]
(3,) (2, 3)


### Задание 2 (2 балла)
Теперь сделаем саму нейронную сеть.

#### Методы
`fit(X, y)` - обучает нейронную сеть заданное число эпох. В каждой эпохе необходимо использовать [cross-entropy loss](https://ml-cheatsheet.readthedocs.io/en/latest/loss_functions.html#cross-entropy) для обучения, а так же производить обновления не по одному элементу, а используя батчи.

`predict_proba(X)` - предсказывает вероятности классов для элементов `X`

#### Параметры конструктора
`modules` - список, состоящий из ранее реализованных модулей и описывающий слои нейронной сети. В конец необходимо добавить `Softmax`

`epochs` - количество эпох обучения

`alpha` - скорость обучения

#### Оценка
Оценка производится на заданных ботом гиперпараметрах и архитектурах. Ожидается, что при подобранных заранее гиперпараметрах решение будет демонстрировать приемлемую точность.

Всего 20 тестов по 500 точек в обучающей выборке и по 100 точек в тестовой выборке c 20 эпохами обучения и 10 тестов по 1000 точек в обучающей выборке и 200 точек в тестовой выборке с 40 эпохами обучения. Количество признаков варьируется от 2 до 8. Количество классов не более 8 и не менее 2.

In [None]:
from task import MLPClassifier

In [95]:
# Task 2
import math
class MLPClassifier:
    def __init__(self, modules: List[Module], epochs: int = 40, alpha: float = 0.01, batch_size: int = 32):
        """
        Parameters
        ----------
        modules : List[Module]
            Cписок, состоящий из ранее реализованных модулей и 
            описывающий слои нейронной сети. 
            В конец необходимо добавить Softmax.
        epochs : int
            Количество эпох обучения.
        alpha : float
            Cкорость обучения.
        batch_size : int
            Размер батча, используемый в процессе обучения.
        """
        self.modules = modules #тут должен быть еще софтмакс
        self.epochs = epochs
        self.alpha = alpha
        self.batch_size = batch_size

    def forward(self, x):
        for layer in self.modules:
            print('forward, x shape ', x.shape)
            x = layer.forward(x)
        return x

    def backward(self, y):
        print('modules ', self.modules)
        for i in range(len(self.modules) - 1, -1, -1):
            print('take: ', i)
            y = self.modules[i].backward(y)
            if type(self.modules[i]) == Linear:
                self.modules[i].update(self.alpha)
                

    def cross_entropy(self, x, y):
        #print(x.shape, y.shape)
        #print(np.where(y[:,np.newaxis]==1, -np.log(x), 0))
        ce = (np.where(y[:,np.newaxis]==1,-np.log(x), 0)).sum(axis=1)
        print('cross entropy ', ce)
        print('cross entropy ', ce.shape)
        return ce

    def d_cross_entropy(self, x, y):
        dce = np.where(y[:,np.newaxis]==1,-1/x, 0)  
        print('d cross entropy ', dce)
        return dce  

    def softmax(self, x):
        #x = x[:,:-1] #not considering bias 
        return np.exp(x) / np.exp(x).sum(axis=1)[:,np.newaxis]
   
            
    def fit(self, X: np.ndarray, y: np.ndarray) -> NoReturn: #train 
        """
        Обучает нейронную сеть заданное число эпох. 
        В каждой эпохе необходимо использовать cross-entropy loss для обучения, 
        а так же производить обновления не по одному элементу, а используя батчи (иначе обучение будет нестабильным и полученные результаты будут плохими.

        Parameters
        ----------
        X : np.ndarray
            Данные для обучения.
        y : np.ndarray
            Вектор меток классов для данных.
        """
        X = np.hstack((np.copy(X), np.array(y)[:,np.newaxis]))
        for i in range(self.epochs):
            loss = 0
            
            np.random.shuffle(X)
            batches = np.array_split(X, math.ceil(len(X)/self.batch_size))
            for batch in batches:
                print('new batch, size ', batch.shape)
                #print(type(batch))
                batches_data = batch[:,:-1]
                batches_true = batch[:,-1]
                res = self.forward(batches_data)
                print('res shape ', res.shape)
                res_softmax = self.softmax(res)
                print('res softmax ', res_softmax)
                #print('res prediction ', np.argmax(res_softmax, axis=1))
                loss += self.cross_entropy(res_softmax, batches_true).sum()
                print('loss ', loss)
                print('loss shape ', loss.shape)
                #d_loss = self.d_cross_entropy(batches_data, batches_true)
                d_loss = self.d_cross_entropy(res_softmax, batches_true)
                print('d_loss shape ', d_loss.shape)
                res_backward = self.backward(d_loss)
                
                


        
    def predict_proba(self, X: np.ndarray) -> np.ndarray:


        """
        Предсказывает вероятности классов для элементов X.

        Parameters
        ----------
        X : np.ndarray
            Данные для предсказания.
        
        Return
        ------
        np.ndarray
            Предсказанные вероятности классов для всех элементов X.
            Размерность (X.shape[0], n_classes)
        
        """
        #тут добавить софтмакс
        res = self.forward(X)
        res_softmax = self.softmax(res)
        return res_softmax
        
    def predict(self, X) -> np.ndarray:
        """
        Предсказывает метки классов для элементов X.

        Parameters
        ----------
        X : np.ndarray
            Данные для предсказания.
        
        Return
        ------
        np.ndarray
            Вектор предсказанных классов
        
        """
        p = self.predict_proba(X)
        return np.argmax(p, axis=1)

In [15]:
a = np.array([[1, 2], [3, 4]])
y = [5, 6]
print(np.hstack((a, np.array(y)[:,np.newaxis])))

[[1 2 5]
 [3 4 6]]


In [None]:
import numpy as np
a = np.array([[1, 2], [3, 4]])
print(np.arange(len(a)))
d = np.arange(len(a))
np.random.shuffle(d)
print(d)
print(a[d])
print(a[0])
print(a[[1, 0]])

[0 1]
[1 0]
[[3 4]
 [1 2]]
[1 2]
[[3 4]
 [1 2]]


In [None]:
x = np.array([[1, 2, 3, 4, 5], [2, 3, 4, 5, 6]])
print(np.exp(x) / np.exp(x).sum(axis=1)[:,np.newaxis])

[[0.01165623 0.03168492 0.08612854 0.23412166 0.63640865]
 [0.01165623 0.03168492 0.08612854 0.23412166 0.63640865]]


In [None]:
a = np.array([[1, 2, 3], [4, 5, 6]])
print(np.sum(a, axis=1))


[ 6 15]


In [None]:
a = np.array([1, 2, 3, 4, 5])
b = np.array([11, 12, 13, 14, 15])
print(np.where(a % 2 == 0, b, a))

[ 1 12  3 14  5]


In [None]:
import math
b = np.array([[1, 2, 3], [4, 5, 6], [7, 8, 9], [3, 4, 5], [4, 5, 6]])
c = np.array_split(b, math.ceil(len(b)/2))
print(c)

[array([[1, 2, 3],
       [4, 5, 6]]), array([[7, 8, 9],
       [3, 4, 5]]), array([[4, 5, 6]])]


In [96]:
p = MLPClassifier([
    Linear(4, 8),
    ReLU(),
    Linear(8, 8),
    ReLU(),
    Linear(8, 2)
])

X = np.random.randn(50, 4)
y = [(0 if x[0] > x[2]**2 or x[3]**3 > 0.5 else 1) for x in X]
p.fit(X, y)

[1;30;43mStreaming output truncated to the last 5000 lines.[0m
 0.         1.38902753 1.38676797 1.39627163 1.38634307 1.38716681
 1.38730386 0.         0.         0.         0.         0.
 0.        ]
cross entropy  (25,)
loss  20.842478430434408
loss shape  ()
d cross entropy  [[ 0.          0.        ]
 [-1.78779954 -2.26935845]
 [-1.8912191  -2.12205853]
 [ 0.          0.        ]
 [-1.81319249 -2.22972115]
 [ 0.          0.        ]
 [-1.95410476 -2.04810293]
 [-1.92508511 -2.08098162]
 [-2.00330773 -1.99670317]
 [-1.93701042 -2.06722399]
 [-1.93537876 -2.06908564]
 [-1.95494842 -2.04717698]
 [ 0.          0.        ]
 [-1.90069975 -2.11024789]
 [-2.04448812 -1.95740677]
 [-1.81878098 -2.22132783]
 [-1.98613801 -2.01405685]
 [-1.94263252 -2.0608588 ]
 [-1.93842662 -2.06561342]
 [ 0.          0.        ]
 [ 0.          0.        ]
 [ 0.          0.        ]
 [ 0.          0.        ]
 [ 0.          0.        ]
 [ 0.          0.        ]]
d_loss shape  (25, 2)
modules  [<__main__.

### Задание 3 (2 балла)
Протестируем наше решение на синтетических данных. Необходимо подобрать гиперпараметры, при которых качество полученных классификаторов будет достаточным.

Первый датасет - датасет moons. В каждом тесте у данных всего два признака, классов также два.

Второй датасет - датасет blobs. В каждом тесте у данных по два признака, классов три.


Обратите внимание, что датасеты могут отличаться от приведенных ниже по количеству точек, уровню шума и положению центроидов. Количество классов и признаков остается неизменным.

Обратите внимание, что классификатор будет обучаться ботом под каждый датасет отдельно. Обучать самостоятельно в файле `task.py` классификатор не нужно.

Количество датасетов каждого типа равно 5. Количество точек в обучающей выборке не менее 1000, количество точек в тестовой выборке не менее 200.

#### Оценка
Средняя точность на датасетах moons больше 0.85 - +1 балл

Средняя точность на датасетах blobs больше 0.85 - +1 балл

In [None]:
from task import classifier_moons, classifier_blobs

In [None]:
# Task 3

classifier_moons = MLPClassifier() # Нужно указать гиперпараметры
classifier_blobs = MLPClassifier() # Нужно указать гиперпараметры



In [None]:
X, y = make_moons(400, noise=0.075)
X_test, y_test = make_moons(400, noise=0.075)
classifier_moons.fit(X, y)
print("Accuracy", np.mean(classifier_moons.predict(X_test) == y_test))

In [None]:
X, y = make_blobs(400, 2, centers=[[0, 0], [2.5, 2.5], [-2.5, 3]])
X_test, y_test = make_blobs(400, 2, centers=[[0, 0], [2.5, 2.5], [-2.5, 3]])
classifier_blobs.fit(X, y)
print("Accuracy", np.mean(classifier_blobs.predict(X_test) == y_test))

## PyTorch

Для выполнения следующего задания понадобится PyTorch. [Инструкция по установке](https://pytorch.org/get-started/locally/)

Если у вас нет GPU, то можно использовать [Google Colab](https://colab.research.google.com/) или обучать сеть на CPU.

In [None]:
from torchvision import datasets, transforms
from torch.utils.data import DataLoader
import torch
from tqdm import tqdm
from torch import nn
import torch.nn.functional as F
import matplotlib.pyplot as plt

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

t = transforms.ToTensor()

cifar_train = datasets.CIFAR10("datasets/cifar10", download=True, train=True, transform=t)
train_loader = DataLoader(cifar_train, batch_size=1024, shuffle=True, pin_memory=torch.cuda.is_available())
cifar_test = datasets.CIFAR10("datasets/cifar10", download=True, train=False, transform=t)
test_loader = DataLoader(cifar_test, batch_size=1024, shuffle=False, pin_memory=torch.cuda.is_available())

### Задание 4 (3 балла)
А теперь поработам с настоящими нейронными сетями и настоящими данными. Необходимо реализовать сверточную нейронную сеть, которая будет классифицировать изображения из датасета CIFAR10. Имплементируйте класс `Model` и функцию `calculate_loss`. 

Обратите внимание, что `Model` должна считать в конце `softmax`, т.к. мы решаем задачу классификации. Соответствеено, функция `calculate_loss` считает cross-entropy.

Для успешного выполнения задания необходимо, чтобы `accuracy`, `mean precision` и `mean recall` были больше 0.5

__Можно пользоваться всем содержимым библиотеки PyTorch.__

In [None]:
from task import TorchModel, calculate_loss

In [None]:
# Task 4
# size of images: 32x32
class TorchModel(nn.Module):
    def __init__(self):
        super().__init__()
        #self.conv1 = nn.Conv2d(3, 6, 5)
        #self.pool = nn.MaxPool2d(2, 2)
        #self.conv2 = nn.Conv2d(6, 16, 5)
        self.fc1 = nn.Linear(16 * 5 * 5, 120)
        self.fc2 = nn.Linear(120, 84)
        self.fc3 = nn.Linear(84, 10)
        self.relu = nn.ReLU()

        
    def forward(self, x: torch.Tensor) -> torch.Tensor:
        x = self.pool(F.relu(self.conv1(x)))
        x = self.pool(F.relu(self.conv2(x)))
        x = torch.flatten(x, 1) # flatten all dimensions except batch
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        x = self.fc3(x)
        return x
    
    def load_model(self):
        """
        Используйте torch.load, чтобы загрузить обученную модель
        Учтите, что файлы решения находятся не в корне директории, поэтому необходимо использовать следующий путь:
        `__file__[:-7] +"model.pth"`, где "model.pth" - имя файла сохраненной модели `
        """
        torch.load('__file__[:-7]' + 'torch_model.pth')
    
    def save_model(self):
        """
        Используйте torch.save, чтобы сохранить обученную модель
        """
        torch.save(self, 'torch_model.pth')
        
def calculate_loss(X: torch.Tensor, y: torch.Tensor, model: TorchModel):
    """
    Cчитает cross-entropy.

    Parameters
    ----------
    X : torch.Tensor
        Данные для обучения.
    y : torch.Tensor
        Метки классов.
    model : Model
        Модель, которую будем обучать.

    """
    
    y_pred = model(X)
    return F.cross_entropy(y, y_pred)
    
    pass

Теперь обучим нашу модель. Для этого используем ранее созданные batch loader'ы.

In [None]:
def train(model, epochs=100):
    optimizer = torch.optim.Adam(model.parameters())
    train_losses = []
    test_losses = []
    for i in range(epochs):
        #Train
        loss_mean = 0
        elements = 0
        for X, y in iter(train_loader):
            X = X.to(device)
            y = y.to(device)
            loss = calculate_loss(X, y, model)
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
            loss_mean += loss.item() * len(X)
            elements += len(X)
        train_losses.append(loss_mean / elements)
        #Test
        loss_mean = 0 
        elements = 0
        for X, y in iter(test_loader):
            X = X.to(device)
            y = y.to(device)
            loss = calculate_loss(X, y, model)
            loss_mean += loss.item() * len(X)
            elements += len(X)
        test_losses.append(loss_mean / elements)
        print("Epoch", i, "| Train loss", train_losses[-1], "| Test loss", test_losses[-1])
    return train_losses, test_losses

In [None]:
model = Model().to(device)
train_l, test_l = train(model)

Построим график функции потерь

In [None]:
plt.figure(figsize=(12, 6))
plt.plot(range(len(train_l)), train_l, label="train")
plt.plot(range(len(test_l)), test_l, label="test")
plt.xlabel("Epoch")
plt.ylabel("Loss")
plt.legend()
plt.tight_layout()
plt.show()

И, наконец, посчитаем метрики

In [None]:
true_positive = np.zeros(10)
true_negative = np.zeros(10)
false_positive = np.zeros(10)
false_negative = np.zeros(10)
accuracy = 0
ctn = 0
for X, y in iter(test_loader):
    X = X.to(device)
    y = y.to(device)
    with torch.no_grad():
        y_pred = model(X).max(dim=1)[1]
    for i in range(10):
        for pred, real in zip(y_pred, y):
            if real == i:
                if pred == real:
                    true_positive[i] += 1
                else:
                    false_negative[i] += 1
            else:
                if pred == i:
                    false_positive[i] += 1
                else:
                    true_negative[i] += 1
            
    accuracy += torch.sum(y_pred == y).item()
    ctn += len(y)
print("Overall accuracy", accuracy / ctn)
print("Precision", true_positive / (true_positive + false_positive))
print("Recall", true_positive / (true_positive + false_negative))
print("Mean Precision", np.mean(true_positive / (true_positive + false_positive)))
print("Mean Recall", np.mean(true_positive / (true_positive + false_negative)))