In [1]:
import numpy as np

import matplotlib.pyplot as plt

from sklearn.model_selection import train_test_split

from tqdm import tqdm

import scipy as sp
import pandas as pd
from sklearn.utils import shuffle
from sklearn.preprocessing import scale

# Загрузим простой датасет

In [2]:
from sklearn.datasets import load_boston

#### Предобработка данных

In [3]:
dataset = load_boston()
observations = scale(dataset.data)
tg = scale(dataset.target)

In [4]:
X_train, X_test, y_train, y_test = train_test_split(observations, tg, test_size = 0.25)

In [5]:
print("Train data:")
print(X_train.shape, y_train.shape)
print("Test data:")
print(X_test.shape, y_test.shape)

Train data:
(379, 13) (379,)
Test data:
(127, 13) (127,)


# Сделаем все нужные нам блоки:

In [6]:
class ErrorFunctionSTD():
    def __init__(self):
        """
        Инициализация метода функции ошибки.
        В функции ошибки нету параметров.
        """
        self.params = []
        self.grad_params = [np.zeros_like(self.params)]
        pass
    
    def zero_grad(self):
        """
        Функция которая обнуляет градиенты
        """
        self.grad_params = [np.zeros_like(self.params)]
        pass
    
    def forward(self, predict, target):
        """
        predict --- ответы предсказаные, вектор размера [batch_size x size]
        target --- истиные ответы, вектор размера  [batch_size x size]
        Считает среднее отклонение от истинного
        """
        return (((predict - target).T@(predict - target))/predict.shape[0]).sum()
    
    def backward(self, predict, target):
        """
        predict --- ответы предсказаные, вектор размера [batch_size x size]
        target --- истиные ответы, вектор размера  [batch_size x size]
        Возврщает производную ошибки по предсказаным ответам
        """
        return 2*(predict - target)/predict.shape[0]

In [7]:
class Dense():
    def __init__(self, input_size, output_size):
        """
        input_size --- размер входа даного слоя
        output_size --- размер выхода даного слоя
        """
        self.weights = np.random.randn(input_size, output_size)*0.01
        self.biases = np.zeros(output_size)
        self.params = [self.weights, self.biases]
        self.grad_params = [np.zeros_like(self.weights), np.zeros_like(self.biases)]
        pass
    
    def zero_grad(self):
        """
        Функция которая обнуляет градиенты
        """
        self.grad_params = [np.zeros_like(self.weights), np.zeros_like(self.biases)]
        pass
    
    def forward(self, input):
        """
        На вход нам дается выход предыдущего слоя нейронной сети
        На выходе должны получить вектор ответов текущего слоя
        На вход подается вектор размера [batch_size x input_size]
        На выходе должен быть вектор размера [batch_size x output_size]
        """
        self.__input = np.array(input)
        self.__output = self.__input@self.weights
        self.__output = self.__output + np.reshape(self.biases, [1, -1])
        return self.__output
    
    def backward(self, grad_output):
        """
        На вход подаются градиента ВЫХОДУ текущего слоя
        На выход нужно дать градиенты по ВХОДУ текущего слоя и по параметрам текущего слоя
        На вход подается вектор размера [batch_size x output_size]
        На выходе должен быть вектор размера [batch_size x intput_size] --- градиент по ВХОДУ
        
        (Также после выполенния данного метода сохраняются градиенты по параметрам модели)
        """
        grad_weight = self.__input.T@grad_output
        grad_biases = grad_output.sum(axis = 0)
        
        grad_input = grad_output@self.weights.T
        
        # np.ravel() --- вытаскивает матриицу в вектор (аналог np.reshape(arr ,[-1]))
        # np.r_[arr1, arr2] --- конкатенирует два вектора
        self.grad_params = [grad_weight, grad_biases]
        return grad_input
        

In [8]:
class ReLu:
    def __init__(self):
        """
        Инициализация нелинейной функции ReLu.
        В функции ReLu нету параметров.
        """
        self.params = []
        self.grad_params = [np.zeros_like(self.params)]
        pass
    
    def zero_grad(self):
        """
        Функция которая обнуляет градиенты
        """
        self.grad_params = [np.zeros_like(self.params)]
        pass
    
    def forward(self, input):
        """
        На вход нам дается выход предыдущего слоя нейронной сети
        На выходе должны получить вектор ответов текущего слоя
        На вход подается вектор размера [batch_size x input_size]
        На выходе должен быть вектор размера [batch_size x input_size]
        """
        self.__input = np.array(input)
        self.__output = np.abs(input*(1.0 + np.sign(input))/2.0)
        return self.__output
    
    def backward(self, grad_output):
        """
        На вход подаются градиента ВЫХОДУ текущего слоя
        На выход нужно дать градиенты по ВХОДУ текущего слоя и по параметрам текущего слоя
        На вход подается вектор размера [batch_size x intput_size]
        На выходе должен быть вектор размера [batch_size x intput_size] --- градиент по ВХОДУ
        
        (Также после выполенния данного метода сохраняются градиенты по параметрам модели)
        """        
        grad_input = np.multiply(grad_output, np.sign(self.__output))
        return grad_input
        

# Здесь еще нужно добавить

### Блок dropout

In [10]:
class dropout():
    def __init__(self, p = 0.5):
        pass
    
    def zero_grad(self):
        pass
    
    def forward(self, input):
        pass
    
    def backward():
        pass

# Теперь нам нужен оптимизатор параметров:

Будем использовать стандартный градиентный спуск (для начала напишем свой).

Это просто функция которая делает один шаг градиентного спуска имея градиенты всех параметров.

In [11]:
class GD:
    def __init__(self, parameters = None, alpha = 0.01):
        """
        parameters это вектор размера [size],
        alpha это шаг градиентного спуска
        """

        self.__parameters = parameters
        self.__alpha = alpha
        pass
    
    def set_parameters(self, parameters):
        """
        Функция которая устанавливает вектор параметров
        parameters --- вектор размера [size]
        """
        self.__parameters = parameters
        pass
    
    def step(self, gradients):
        """
        gradients это вектор градиентов по параметрам модели размеры [size]
        
        Функция обновляет вектор параметров self.__parameters
        """
        if self.__parameters is None:
            self.__parameters = np.zeros_like(gradients)
        self.__parameters = self.__parameters - self.__alpha*gradients
        pass
    
    def parameters(self):
        """
        Функция возвращает вектор параметров
        """
        return self.__parameters

# Теперь строем всю полносвязную сеть:

In [12]:
class Network():
    def __init__(self, input_size, error_function = ErrorFunctionSTD(), optimizer = GD()):
        """
        Иницилизирует полносвязную нейронную сеть
        input_size это число которое указывает на количество входов нейросети
        error_function это функция ошибки, которую мы оптимизируем
        optimizer это метод оптимизации нейросети
        """
        self.__error_function = error_function
        self.__optimizer = optimizer
        
# здесь задается сама структуреа нейросети
        self.__list_of_layer = []
        self.__list_of_layer.append(Dense(input_size, 9))
        self.__list_of_layer.append(ReLu())
        self.__list_of_layer.append(Dense(9, 5))
        self.__list_of_layer.append(ReLu())
        self.__list_of_layer.append(Dense(5, 1))
        
# здесь мы инициализируем нейросеть
        self.__init_weight()

    def __get_weights(self):
        """
        Функция которая возврвщает вектор параметров нейросети в виде вектора
        """
        weights = []
        for layer in self.__list_of_layer:
            for param in layer.params:
                weights += param.ravel().tolist()
        return np.array(weights)

    def __set_weights(self, weights):
        """
        Функция которая по заданому вектору задает все параметры нейросети
        """
        i = 0
        for layer in self.__list_of_layer:
            for param in layer.params:
                l = param.size
                param[:] = weights[i:i+l].reshape(param.shape)
                i += l

    def __init_weight(self):
        """
        Функция которая инициализирует веса нейросети
        """
        i = 0
        for layer in self.__list_of_layer:
            for param in layer.params:
                l = param.size
                param[:] = np.random.randn(l).reshape(param.shape)*0.01
                i += l
                
    def __get_gradients(self):
        """
        Функция которая возврвщает вектор градиентов параметров нейросети в виде вектора
        """
        gradients = []
        for layer in self.__list_of_layer:
            for grad_param in layer.grad_params:
                gradients += grad_param.ravel().tolist()
        return np.array(gradients)
    
    def __forward(self, input):
        """
        Функция которая делает проход вперед по нейросети
        """
        output = input
        for layer in self.__list_of_layer:
            output = layer.forward(output)
        return output
    
    def __backward(self, grad_output):
        """
        Функция которая делает проход назад по нейросети
        """
        grad = grad_output
        for layer in list(reversed(self.__list_of_layer)):
            grad = layer.backward(grad)
        pass
    
    def zero_grad(self):
        """
        Функция которая обнуляет градиенты всех параметров в нейросети
        """
        for layer in self.__list_of_layer:
            layer.zero_grad()
        pass
    
    def predict(self, input):
        """
        Функция которая делает предсказание по данным
        """
        output = input
        for layer in self.__list_of_layer:
            output = layer.forward(output)
        return output
    
    def fit(self, X = None, y = None, epochs = 100):
        """
        Функция которая обучает нейросеть
        X это матрица входов размера [batch_size x input_size]
        y это вектор входов размера [batch_size x output_size]
        epochs это число указывающее количество эпох в обучении
        """
        if X is None:
            return 
        if y is None:
            return

        weights = self.__get_weights()
        self.__optimizer.set_parameters(weights)
        
        for epoch in tqdm(range(epochs)):
# Обнуляем градиенты, чтобы старые нам не помешали
            self.zero_grad()
    
# Считаем градиент ошибки по выходу нейросети
            grad_output = self.__error_function.backward(self.__forward(X), y)
        
# Считаем градиент ошибки по всем параметрам нейросети
            self.__backward(grad_output)
            
# Вытаскиваем эти градиенты
            gradients = self.__get_gradients()
        
# Делаем шаг оптимизации параметров
            self.__optimizer.step(gradients)
            
# Обновляем веса нейросети
            self.__set_weights(self.__optimizer.parameters())
    
        return
        

In [13]:
error_function = ErrorFunctionSTD()

In [14]:
network = Network(input_size = 13, error_function=error_function)

In [15]:
print("Ошибка на train:", error_function.forward(network.predict(X_train), y_train.reshape([-1,1])))

Ошибка на train: 0.8847349214233711


In [16]:
network.fit(X_train, y_train.reshape([-1,1]), epochs=40000)

100%|██████████| 40000/40000 [00:10<00:00, 3649.48it/s]


In [17]:
print("Ошибка на train:", error_function.forward(network.predict(X_train), y_train.reshape([-1,1])))

Ошибка на train: 0.04387810843026418


In [18]:
print("Ошибка на test:", error_function.forward(network.predict(X_test), y_test.reshape([-1,1])))

Ошибка на test: 0.10640393088032178
