# Стройка нейронной сети

In [164]:
import numpy as np

class Operation():
    '''
    Базовый класс операций
    '''
    def __init__(self):
        pass

    def forward(self, input_: np.ndarray) -> np.ndarray:
        
        self.input_ = input_

        self.output_ = self._output()
        
        return self.output_

    def backward(self, output_diff: np.ndarray) -> np.ndarray:

        assert output_diff.shape[::-1] == self.output_.shape, f"Output_diff_shape = {output_diff.shape}, Output_shape = {self.output_.shape}"

        return self._input_diff(output_diff)

    def _output(self) -> np.ndarray:
        raise NotImplementedError()

    def _input_diff(self, output_diff_):
        raise NotImplementedError()

In [165]:
class ParamOperation(Operation):

    def __init__(self, params):
        self.params = params

    def backward(self, output_diff: np.ndarray) -> np.ndarray:

        assert output_diff.shape[::-1] == self.output_.shape, f"Output_diff_shape = {output_diff.shape}, Output_shape = {self.output_.shape}"
        self.params_grad = self._params_grad(output_diff)
        assert self.params_grad.shape == self.params.shape, f"Params_grad_shape = {self.params_grad.shape}, Params_shape = {self.params.shape}"
        return self._input_diff(output_diff)

    def _params_grad(self, output_diff_: np.ndarray) -> np.ndarray:
        raise NotImplementedError()

    def get_params(self):
        return self.params

    def get_params_grad(self):
        return self.params_grad

In [166]:
class WeightMultiply(ParamOperation):
    
    def _output(self):
        assert self.params.shape[0] == self.input_.shape[1], f"Params_shape = {self.params.shape}, Input_shape = {self.input_.shape}"
        return self.input_ @ self.params

    def _input_diff(self, output_diff_: np.ndarray):
        return self.params @ output_diff_

    def _params_grad(self, output_diff_: np.ndarray):
        return (output_diff_ @ self.input_).T

In [167]:
class BiasAdd(ParamOperation):

    def __init__(self, B : np.ndarray):
        super().__init__(B)
    
    def _output(self):
        return self.input_ + self.params

    def _input_diff(self, output_diff_: np.ndarray):
        return output_diff_

    def _params_grad(self, output_diff_: np.ndarray):
        return np.sum(output_diff_.T, axis=0, keepdims=True)

In [168]:
class Sigmoid(Operation):

    def _sigmoid(self, input_ : np.ndarray):
        return 1.0 / (1.0 + np.exp(np.clip(-input_, -50, 50)))

    def _output(self):
        return self._sigmoid(self.input_)

    def _input_diff(self, output_diff_):
        input_grad = (self.output_ * (1 - self.output_)).T * output_diff_
        return input_grad

In [169]:
from typing import List

class Layer:
    def __init__(self, neurons: int):
        self.neurons = neurons
        self.operations: List[Operation] = []
        self.First = True

    def setup_layer(self, shape):
        raise NotImplementedError()

    def forward(self, input_: np.ndarray):
        self.input_ = input_
        if self.First:
            self.setup_layer(self.input_.shape)
            self.First = False
        self._output = input_
        for operation in self.operations:
            self._output = operation.forward(self._output)
        
        return self._output

    def backward(self, output_diff):

        self.input_grad = output_diff

        for operation in self.operations[::-1]:
            self.input_diff = operation.backward(self.input_grad)


        return self.input_diff

    def get_params(self):
        for operation in self.operations:
            if isinstance(operation, ParamOperation):
                yield operation.get_params()

    def get_params_grad(self):
        for operation in self.operations:
            if isinstance(operation, ParamOperation):
                yield operation.get_params_grad()

In [170]:
class Dense(Layer):
    
    def __init__(self, neurons, activation = Sigmoid):
        super().__init__(neurons)
        self.activation = activation

    def setup_layer(self, shape):

        W = np.random.randn(shape[1], self.neurons)
        B = np.random.randn(1, self.neurons)
        if self.activation is not None:
            self.operations = [WeightMultiply(W),
                              BiasAdd(B),
                              self.activation()]
        else:
            self.operations = [WeightMultiply(W),
                              BiasAdd(B)]

In [171]:
class Loss():

    def __init__(self):
        pass

    def forward(self, target, prediction):
        self.target = target
        self.prediction = prediction
        assert self.target.shape == self.prediction.shape, f"Target_shape = {self.target.shape}, Prediction_shape = {self.prediction.shape}"
        self.loss_value = self._output()
        return self.loss_value

    def backward(self):
        self.prediction_diff_ = self.prediction_diff()
        assert self.prediction_diff_.shape[::-1] == self.prediction.shape
        return self.prediction_diff_

    def _output(self):
        raise NotImplementedError()

    def prediction_diff(self):
        raise NotImplementedError()

In [172]:
class MSE(Loss):
    
    def _output(self):
        return np.mean((self.target.reshape(self.prediction.shape) - self.prediction) ** 2)

    def prediction_diff(self):
        if self.prediction.ndim == 1:
            return 2 / (self.prediction.shape[0] * self.prediction.shape[1]) * (self.prediction - self.target.reshape(self.prediction.shape)).reshape(-1, 1)
        else:
            return 2 / (self.prediction.shape[0] * self.prediction.shape[1]) * (self.prediction - self.target.reshape(self.prediction.shape)).T

In [173]:
from typing import List

class NeuralNetwork():
    
    def __init__(self, layers: List[Layer | Dense]):
        self.layers = layers
        
    def forward(self, x_batch):

        self.x_output = x_batch
        for layer in self.layers:
            self.x_output = layer.forward(self.x_output)
        return self.x_output

    def backward(self, loss_grad):

        self.loss_grad = loss_grad

        for layer in self.layers[::-1]:

            self.loss_grad = layer.backward(self.loss_grad)

        return None

    def get_params(self):
        return map(lambda layer: layer.get_params(), self.layers)

    def get_params_grad(self):
        return map(lambda layer: layer.get_params_grad(), self.layers)

## Реализация ранних моделей с помощью нового класса

In [174]:
class Optimizer():

    def __init__(self, learning_rate = 0.01):

        self.lr = learning_rate

    def step(self):


        raise NotImplementedError()

In [175]:
class SGD(Optimizer):

    def step(self):
        for layer_params, layer_params_grad in zip(self.net.get_params(), self.net.get_params_grad()): # type: ignore
            for params, params_grad in zip(layer_params, layer_params_grad):
                params -= self.lr * params_grad

In [176]:
class Trainer:
    def __init__(self, loss: Loss, Net: NeuralNetwork, optimizer, lr: float = 0.01):
        self.loss = loss
        self.optimizer = optimizer(lr)
        self.Net = Net
        setattr(self.optimizer, 'net', self.Net)

    def get_batch(self, x_train, y_train, batch_size):
        N = x_train.shape[0]
        for i in range(0, N, batch_size):
            yield x_train[i:i + batch_size], y_train[i:i + batch_size]
    
    def fit(self, x_train, y_train, epochs = 100, verbose=True, batch_size = None):

        np.random.seed(10)

        indices = np.random.permutation(x_train.shape[0])
        x_train = x_train[indices]
        y_train = y_train[indices]

        if batch_size is None:
            batch_size = x_train.shape[0] // 5
        
        

        for e in range(epochs):
            all_loss = 0
            for x_batch, y_batch in self.get_batch(x_train, y_train, batch_size):
                pred = self.Net.forward(x_batch)
                loss = self.loss.forward(y_batch, pred)
                all_loss += loss
                loss_grad = self.loss.backward()
                self.Net.backward(loss_grad)
                self.optimizer.step()

            if verbose:
                print(f'Epoch {e + 1}: loss = {all_loss * batch_size / x_train.shape[0]}')

## Тестирование классов

In [177]:
from sklearn.datasets import fetch_california_housing
from sklearn.model_selection import train_test_split

california = fetch_california_housing()
x_train, x_test, y_train, y_test = train_test_split(california.data, california.target, random_state=10) # type: ignore

In [178]:
from sklearn.preprocessing import StandardScaler

sc = StandardScaler()

x_train = sc.fit_transform(x_train)

In [179]:
TestNetwork = NeuralNetwork(layers = [Dense(4), Dense(1, None)]) # type: ignore

In [180]:
# TestTrainer = Trainer(MSE(), Linear_Regression, SGD)

In [181]:
x_train.shape

(15480, 8)

In [182]:
TestNetwork = NeuralNetwork(layers = [Dense(4), Dense(1, None)]) # type: ignore
TestTrainer = Trainer(MSE(), TestNetwork, SGD)
TestTrainer.fit(x_train, y_train.reshape(-1, 1))

Epoch 1: loss = 19.130741711848884
Epoch 2: loss = 11.611255125032299
Epoch 3: loss = 7.459053815558494
Epoch 4: loss = 5.026573589031945
Epoch 5: loss = 3.5536491192882433
Epoch 6: loss = 2.6456083531499566
Epoch 7: loss = 2.07962766325318
Epoch 8: loss = 1.7233481303127938
Epoch 9: loss = 1.4961224322529134
Epoch 10: loss = 1.3483433355388446
Epoch 11: loss = 1.249466568121682
Epoch 12: loss = 1.1807434606490237
Epoch 13: loss = 1.130720515106633
Epoch 14: loss = 1.0924341490684712
Epoch 15: loss = 1.0616648044845711
Epoch 16: loss = 1.03585766176239
Epoch 17: loss = 1.013461506153383
Epoch 18: loss = 0.9935274319587077
Epoch 19: loss = 0.9754671438109455
Epoch 20: loss = 0.9589082140846347
Epoch 21: loss = 0.9436077134734586
Epoch 22: loss = 0.9294007546194287
Epoch 23: loss = 0.916169830302018
Epoch 24: loss = 0.9038265136248852
Epoch 25: loss = 0.8923005086869517
Epoch 26: loss = 0.8815330808504399
Epoch 27: loss = 0.8714731055497758
Epoch 28: loss = 0.8620746892391811
Epoch 29: l

In [183]:
Linear_Regression = NeuralNetwork(layers = [Dense(neurons=1, activation=None)]) # type: ignore
TestTrainer = Trainer(MSE(), Linear_Regression, SGD)
TestTrainer.fit(x_train, y_train.reshape(-1, 1))

Epoch 1: loss = 11.505606923326427
Epoch 2: loss = 9.56118457035504
Epoch 3: loss = 8.00089829788599
Epoch 4: loss = 6.740473251327629
Epoch 5: loss = 5.716263902100021
Epoch 6: loss = 4.879666690671874
Epoch 7: loss = 4.193172851988301
Epoch 8: loss = 3.627552027905941
Epoch 9: loss = 3.159821271214466
Epoch 10: loss = 2.771764087724444
Epoch 11: loss = 2.448838226777396
Epoch 12: loss = 2.1793609606305395
Epoch 13: loss = 1.9538945329177277
Epoch 14: loss = 1.764777595499683
Epoch 15: loss = 1.6057643195938047
Epoch 16: loss = 1.4717438187094753
Epoch 17: loss = 1.3585201365682409
Epoch 18: loss = 1.2626383931669767
Epoch 19: loss = 1.1812464607193267
Epoch 20: loss = 1.1119842413499479
Epoch 21: loss = 1.0528945681060364
Epoch 22: loss = 1.0023511739669675
Epoch 23: loss = 0.9590002237827404
Epoch 24: loss = 0.9217126876926587
Epoch 25: loss = 0.8895454255484136
Epoch 26: loss = 0.8617093021407569
Epoch 27: loss = 0.8375429994202586
Epoch 28: loss = 0.8164914607550221
Epoch 29: loss

In [184]:
from sklearn.metrics import mean_absolute_error, mean_squared_error

x_test = sc.transform(x_test)
y_pred = TestNetwork.forward(x_test).flatten()
y_test.shape, y_pred.shape

((5160,), (5160,))

In [185]:
print('MSE:', mean_squared_error(y_test, y_pred))

MSE: 0.7309265727536652
