In [3]:
import numpy as np
import idx2numpy
import time

import torch
import torch.nn as nn

In [4]:
def get_train_test_data(folder_path: str):
    """ Загрузка данных MNIST - рукописные цифры
        param: folder_path - путь до папки с файлами из http://yann.lecun.com/exdb/mnist/
    """
    x_train = idx2numpy.convert_from_file(folder_path + 'train-images.idx3-ubyte')
    x_test = idx2numpy.convert_from_file(folder_path + 't10k-images.idx3-ubyte')
    
    y_train = idx2numpy.convert_from_file(folder_path + 'train-labels.idx1-ubyte')
    y_test = idx2numpy.convert_from_file(folder_path + 't10k-labels.idx1-ubyte')
    
    train_size, test_size = x_train.shape[0], x_test.shape[0]
    
    return x_train.reshape(train_size, 28 * 28) / 255, x_test.reshape(test_size, 28 * 28) / 255, \
           y_train, y_test

In [5]:
x_train, x_test, y_train, y_test = get_train_test_data('mnist-data/')

# Custom NN

In [6]:
class SimpleNeuralNetwork:
    def __init__(self, layers: list[int]):
        """ Структура сети:
            input -> hidden_layer_1 -> relu_layer -> hidden_layer_2 -> softmax
            Лосс-функция: CrossEntropyLoss
            params: 
                layers - список с количеством нейронов в каждом слое, len(layers) = 3
        """
        self.weights = [np.random.randn(layers[0], layers[1]) * 0.01 + 0.01, 
                        np.random.randn(layers[1], layers[2]) * 0.01 + 0.01]
        
        self.bias = [np.random.randn(1, layers[1]), 
                     np.random.randn(1, layers[2])]
        
        self._print_pattern = 'Epoch {} \t Time {:.2f}s \t Loss {:.4f} \t Accuracy {:.4f}'
        self._report_pattern = '{0} time {1:.2f}s \t {0} loss {2:.4f} \t {0} accuracy {3:.4f}'

    def forward(self, x):
        """ Прямой проход по нейронной сети.
        Цель: запомнить состояние всех слоев и вычислить результат.
        Запоманием hidden_layer_1, relu_layer, hidden_layer_2 перед softmax.
        Всё это нужно для backprob.
        params:
            x - input, shape: (batch_size, dim)
        """
        self.hidden_layer_1 = x @ self.weights[0] + self.bias[0]
        self.relu_layer = self.relu(self.hidden_layer_1)
        self.hidden_layer_2 = self.relu_layer @ self.weights[1] + self.bias[1]
        return self.softmax(self.hidden_layer_2)
    
    def backward(self, x, y_true, y_pred):
        """ Обратный проход по нейронной сети. 
        Цель: вычислить производные cross entropy loss по каждому параметру нейронной сети
        self.derivate_weights - производные по весам
        self.derivate_bias - производные по байесам
        Нужно для очередного шага градиентного спуска.
        params:
            x - input, shape: (batch_size, dim)
            y_true - true targets, shape: (batch_size, num_classes) - one hot encoding
            y_pred - predict targets, shape: (batch_size, num_classes) - one hot encoding
        """
        derivate_hidden_layer_2 = (y_pred - y_true) / y_true.shape[0]
        
        weight2 = self.relu_layer.T @ derivate_hidden_layer_2
        bias2 = np.sum(derivate_hidden_layer_2, axis=0)
        
        derivate_hidden_layer_1 = (derivate_hidden_layer_2 @ self.weights[1].T) * self.derivate_relu(self.hidden_layer_1)
        
        weight1 = x.T @ derivate_hidden_layer_1
        bias1 = np.sum(derivate_hidden_layer_1, axis=0)
        
        self.derivate_weights = [weight1, weight2]
        self.derivate_bias = [bias1, bias2]
    
    def optimizer_step(self, lr):
        """ Шаг стохастического градиентного спуска.
        Для этого используем найденные производные self.self.derivate_weights и self.derivate_bias.
            params:
                lr - скорость обучения (learning_rate)
        """
        self.weights = [w - lr * d for w, d in zip(self.weights, self.derivate_weights)]
        self.bias = [b - lr * d for b, d in zip(self.bias, self.derivate_bias)]

    def fit(self, x_train, y_train, *, epochs, lr, batch_size):
        """ Обучение нейронной сети. 
        Эпоха - выполнение последовательности forward -> backward -> optimizer_step для всей обучающей
        выборки.
            params:
                x_train - обучающая выборка
                y_train - таргеты (будет сделан one_hot_encoding, если нужно)
                keyword_args:
                    epochs - количество эпох
                    lr - learning_rate
                    batch_size - размер пачки
        """
        
        if y_train.size == y_train.shape[0]:
            y_train = self.one_hot_encoding(y_train)
        
        start_train = time.time()
        for epoch in range(epochs):
            start_epoch = time.time()
            it, train_loss, accuracy_train = 0, [], []
            while it < len(x_train):
                x_batch = x_train[it:it + batch_size]
                y_batch = y_train[it:it + batch_size]

                y_pred = self.forward(x_batch)
                self.backward(x_batch, y_batch, y_pred)
                self.optimizer_step(lr)
                
                train_loss.append(self.cross_entropy_loss(y_pred, y_batch))
                accuracy_train.append(self.accuracy_score(y_pred, y_batch))
                
                it += batch_size

            epoch_time = time.time() - start_epoch
            
            loss = np.mean(train_loss)
            accuracy = np.mean(accuracy_train)
            
            print(self._print_pattern.format(epoch + 1, epoch_time, loss, accuracy))

        train_time = time.time() - start_train
        print('\n.............................FINISH.............................\n')
        print(self._report_pattern.format('Train', train_time, loss, accuracy))

    def get_test_score(self, x_test, y_test, *, batch_size):
        """ Проверка на тестовых данных.
        Accuracy score для тестовых данных
            params:
                x_test - тестовая выборка
                y_test - таргеты (будет сделан one_hot_encoding, если нужно)
        """
        if y_test.size == y_test.shape[0]:
            y_test = self.one_hot_encoding(y_test)
        
        test_time = time.time()
        it, test_loss, accuracy_test = 0, [], []
        while it < x_test.shape[0]:
            x_batch = x_test[it:it + batch_size]
            y_batch = y_test[it:it + batch_size]
            
            y_pred = self.forward(x_batch)
            
            test_loss.append(self.cross_entropy_loss(y_pred, y_batch))
            accuracy_test.append(self.accuracy_score(y_pred, y_batch))
            
            it += batch_size
        
        loss = np.mean(test_loss)
        accuracy = np.mean(accuracy_test)
        
        print(self._report_pattern.format('Test', time.time() - test_time, loss, accuracy))

    @staticmethod
    def derivate_relu(x):
        return (x > 0).astype(np.int_)
    
    @staticmethod
    def relu(x):
        return np.maximum(x, 0)
    
    @staticmethod
    def softmax(x):
        return np.exp(x) / np.sum(np.exp(x), axis=1, keepdims=True)
    
    @staticmethod
    def cross_entropy_loss(y_pred, y_true):
        return np.mean(-np.sum(y_true * np.log(y_pred), axis=1))
    
    @staticmethod
    def one_hot_encoding(y):
        n_labels = max(y) + 1
        return np.eye(n_labels)[y]
    
    @staticmethod
    def accuracy_score(y_pred, y_true):
        y_pred_classes = np.argmax(y_true, axis=1)
        y_true_classes = np.argmax(y_pred, axis=1)
        return np.mean(y_pred_classes == y_true_classes)

In [7]:
model = SimpleNeuralNetwork([784, 300, 10])
model.fit(x_train, y_train, epochs=20, lr=0.1, batch_size=64)
model.get_test_score(x_test, y_test,  batch_size=64)

Epoch 1 	 Time 3.14s 	 Loss 0.5599 	 Accuracy 0.8461
Epoch 2 	 Time 3.00s 	 Loss 0.2541 	 Accuracy 0.9266
Epoch 3 	 Time 3.08s 	 Loss 0.1924 	 Accuracy 0.9448
Epoch 4 	 Time 3.17s 	 Loss 0.1522 	 Accuracy 0.9565
Epoch 5 	 Time 3.14s 	 Loss 0.1250 	 Accuracy 0.9648
Epoch 6 	 Time 3.19s 	 Loss 0.1056 	 Accuracy 0.9702
Epoch 7 	 Time 3.05s 	 Loss 0.0911 	 Accuracy 0.9748
Epoch 8 	 Time 3.17s 	 Loss 0.0798 	 Accuracy 0.9778
Epoch 9 	 Time 3.03s 	 Loss 0.0707 	 Accuracy 0.9805
Epoch 10 	 Time 3.13s 	 Loss 0.0630 	 Accuracy 0.9825
Epoch 11 	 Time 3.07s 	 Loss 0.0566 	 Accuracy 0.9847
Epoch 12 	 Time 3.07s 	 Loss 0.0510 	 Accuracy 0.9867
Epoch 13 	 Time 3.05s 	 Loss 0.0461 	 Accuracy 0.9882
Epoch 14 	 Time 3.11s 	 Loss 0.0419 	 Accuracy 0.9895
Epoch 15 	 Time 3.07s 	 Loss 0.0382 	 Accuracy 0.9907
Epoch 16 	 Time 3.08s 	 Loss 0.0348 	 Accuracy 0.9918
Epoch 17 	 Time 3.06s 	 Loss 0.0319 	 Accuracy 0.9926
Epoch 18 	 Time 3.04s 	 Loss 0.0292 	 Accuracy 0.9935
Epoch 19 	 Time 3.01s 	 Loss 0.0268 	

# Pytorch

In [8]:
class Dataset(torch.utils.data.Dataset):
    
    def __init__(self, x, y):
        self.x = x
        self.y = y
        
    def __len__(self):
        return len(self.x)
    
    def __getitem__(self, i):
        return torch.FloatTensor(self.x[i]), self.y[i]

In [9]:
train_loader = torch.utils.data.DataLoader(Dataset(x_train, y_train), batch_size=64)
test_loader = torch.utils.data.DataLoader(Dataset(x_test, y_test), batch_size=64)

In [10]:
model = nn.Sequential(
    nn.Linear(784, 300),
    nn.ReLU(),
    nn.Linear(300, 10)
)

In [11]:
def get_accuracy(data_loader, model):
    tp = 0
    n = 0
    with torch.no_grad():
        for x, y in data_loader:
            outputs = model(x)
            _, predicted = torch.max(outputs.data, 1)
            n += y.size(0)
            tp += int((predicted == y).sum())
    return tp / n

In [12]:
optimizer = torch.optim.SGD(model.parameters(), lr=0.01)
loss_func = nn.CrossEntropyLoss()

start = time.time()
for epoch in range(1, 20):
    for i, data in enumerate(train_loader):
        
        inputs, labels = data
        
        optimizer.zero_grad()

        outputs = model(inputs)
        
        loss = loss_func(outputs, labels)
        
        loss.backward()
        
        optimizer.step()
        
    print('\nEpoch {}, train_accuracy {}'.format(epoch, get_accuracy(train_loader, model)))
    print('Epoch {}, test_accuracy {}\n'.format(epoch, get_accuracy(test_loader, model)))

print('Finish')
print('time: {} seconds'.format(time.time() - start))


Epoch 1, train_accuracy 0.855
Epoch 1, test_accuracy 0.8636


Epoch 2, train_accuracy 0.8861333333333333
Epoch 2, test_accuracy 0.8926


Epoch 3, train_accuracy 0.89755
Epoch 3, test_accuracy 0.9027


Epoch 4, train_accuracy 0.9053333333333333
Epoch 4, test_accuracy 0.9098


Epoch 5, train_accuracy 0.9106166666666666
Epoch 5, test_accuracy 0.9147


Epoch 6, train_accuracy 0.9154333333333333
Epoch 6, test_accuracy 0.9196


Epoch 7, train_accuracy 0.91915
Epoch 7, test_accuracy 0.9235


Epoch 8, train_accuracy 0.92265
Epoch 8, test_accuracy 0.9255


Epoch 9, train_accuracy 0.9259
Epoch 9, test_accuracy 0.9281


Epoch 10, train_accuracy 0.9291
Epoch 10, test_accuracy 0.9298


Epoch 11, train_accuracy 0.9318666666666666
Epoch 11, test_accuracy 0.9322


Epoch 12, train_accuracy 0.9346666666666666
Epoch 12, test_accuracy 0.9341


Epoch 13, train_accuracy 0.9371833333333334
Epoch 13, test_accuracy 0.9362


Epoch 14, train_accuracy 0.9393166666666667
Epoch 14, test_accuracy 0.9382


Epoch 15,