In [None]:
#В отдельный класс random generators
#Оптимизация при больших кол-вах эпох значительно хуже, чем в библиотечных реализациях

In [1]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import torch
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import root_mean_squared_error

In [2]:
class USequential:
    def __init__(self):
        self.layers_name = []
        self.layers = []

    def __repr__(self):
        return ''.join([f'({layer_name}): {layer}\n'
                        for layer_name, layer in zip(self.layers_name, self.layers)])

    def __getitem__(self, index):
        return self.layers[index]
    
    def add_module(self, layer_name, layer):
        self.layers_name.append(layer_name)
        self.layers.append(layer)

    def forward(self, x):
        for layer in self.layers:
            x = layer.forward(x)
        return x

    def backward(self, x, y, loss_function_type):
        error = self._loss_function[loss_function_type](x, y)
        for layer_no, layer in enumerate(self.layers[::-1], 1):
            layer.setErrors(error)
            if layer_no < len(self.layers):
                error = layer.backward()   

    def step(self, lr):
        for layer in self.layers:
            layer.step(lr)
        
    @staticmethod
    def _dirSquareError(x, y):
        vector = 2 * (x - y)
        return vector / vector.numel()

    @staticmethod
    def _dirCrossEntropyLoss(x, class_idx):
        output = torch.exp(x) / torch.sum(torch.exp(x))
        output[class_idx] -= 1
        return output

    _loss_function = {'MSE': _dirSquareError,
                      'CEL': _dirCrossEntropyLoss}


class ULinear:
    def __init__(self, 
                 in_features,
                 out_features,
                 bias=True,
                 start_weights_type='random'):
        self.in_features = in_features
        self.out_features = out_features
        self.weights_ = self._start_weights[start_weights_type]((in_features, out_features))
        self.bias_ = self._start_weights[start_weights_type]((out_features,)) if bias else None

    def __repr__(self):
        return f'Linear(in_features={self.in_features}, out_features={self.out_features})'

    def forward(self, X):
        self.prev_output_ = X.detach().clone()
        output = self.weights_ @ X
        if self.bias_ is not None:
            output += self.bias_
        return output

    def backward(self):
        return self.errors_ @ self.weights_

    def step(self, lr):
        self.weights_ -= lr * (self.errors_.reshape(-1,1) @ self.prev_output_.reshape(1,-1))
        if self.bias_ is not None:
            self.bias_ -= lr * self.errors_

    def setErrors(self, errors):
        self.errors_ = errors

    @staticmethod
    def _random()

    _start_weights = {'random': _random,
                      'xavier': _XavierUniform}


class UReLU:
    def __init__(self):
        pass

    def __repr__(self):
        return 'ReLU()'

    def forward(self, X):
        self.z_ = torch.where(X >= 0, 1, 0)
        return torch.where(X >= 0, X, 0)

    def backward(self):
        return self.errors_ * self.z_

    def step(self, lr):
        pass        

    def setErrors(self, errors):
        self.errors_ = errors

class UTanh:
    def __init__(self):
        pass

    def __repr__(self):
        return 'Tanh()'

    def forward(self, X):
        self.z_ = -torch.pow(torch.tanh(X), 2) + 1
        return torch.tanh(X)

    def backward(self):
        return self.errors_ * self.z_

    def step(self, lr):
        pass

    def setErrors(self, errors):
        self.errors_ = errors

In [35]:
class UConv2d:
    def __init__(self,
                 in_channels,
                 out_channels,
                 kernel_size,
                 bias=True,
                 start_weights_type='random'):
        self.in_channels = in_channels
        self.out_channels = out_channels
        self.kernel_size = kernel_size
        self.weights_ = self._start_weights[start_weights_type]((out_channels, in_channels, kernel_size, kernel_size),
                                                                in_channels,
                                                                out_channels)
        self.bias_ = self._start_weights[start_weights_type]((out_channels,),
                                                             in_channels,
                                                             out_channels) if bias else None

    def __repr__(self):
        return (f'Conv2d(in_channels={self.in_channels}, '
                       f'out_channels={self.out_channels}, '
                       f'kernel_size=({self.kernel_size},{self.kernel_size})')

    @staticmethod
    def _makeConvolution(X_channel, kernel):
        k_n, k_m = kernel.shape
        n = X_channel.shape[0] - k_n + 1
        m = X_channel.shape[1] - k_m + 1
        return torch.tensor([[torch.sum(X_channel[row:row+k_n, col:col+k_m]*kernel) 
                              for col in range(m)]
                             for row in range(n)]).reshape(1,n,m)

    def _getInputErrors(self, output_errors, kernel):
        reversed_kernel = torch.flip(kernel, (0,1))
        k_n, k_m = np.array(reversed_kernel.shape) - 1
        n, m = output_errors.shape
        extended_errors = torch.zeros((n + 2 * k_n, m + 2 * k_m))
        extended_errors[k_n:-k_n, k_m:-k_m] = output_errors
        return self._makeConvolution(extended_errors, reversed_kernel)
        
    
    
    def forward(self, X):
        self.prev_img_ = X.detach().clone()
        output = torch.cat([torch.unsqueeze(torch.sum(torch.cat([self._makeConvolution(X_channel, kernel)
                                                                 for X_channel, kernel in zip(X, out_kernels)]),
                                                      dim=-3), 
                                            0)
                            for out_kernels in self.weights_])
        if self.bias_ is not None:
            output = torch.cat([torch.unsqueeze(out+bias, 0)
                                for out, bias in zip(output, self.bias_)])
        return output

    def backward(self):
        return torch.sum(torch.cat([torch.unsqueeze(torch.cat([self._getInputErrors(channel_error, kernel) 
                                                               for kernel in out_kernels], 
                                                              dim=0),
                                                    0)
                                    for out_kernels, channel_error in zip(self.weights_, self.errors_)]),
                         dim=0)

    def step(self, lr):
        weights_grad = torch.cat([torch.unsqueeze(torch.cat([self._makeConvolution(X_channel, channel_errors)
                                                             for X_channel in self.prev_img_]), 
                                                  0)
                                  for channel_errors in self.errors_])
        self.weights_ -= lr * weights_grad
        if self.bias_ is not None:
            self.bias_ -= lr * torch.sum(self.errors_, dim=(2,1))

    def setErrors(self, errors):
        self.errors_ = errors

    @staticmethod
    def _random(shape, fan_in, fan_out):
        return torch.randn(shape)
    
    @staticmethod
    def _XavierUniform(shape, fan_in, fan_out):
        a = np.sqrt(6/(fan_in+fan_out))
        output = torch.empty(shape)
        output.uniform_(-a,a)
        return output
        
    _start_weights = {'random': _random,
                      'xavier': _XavierUniform}


class UFlatten:
    def __init__(self):
        pass

    def __repr__(self):
        return 'Flatten()'

    def forward(self, X):
        self.initial_shape = X.shape
        return X.reshape(-1,)

    def backward(self):
        return self.errors_

    def step(self, lr):
        pass

    def setErrors(self, errors):
        self.errors_ = errors.reshape(self.initial_shape)


class UMaxPool2d:
    def __init__(self, kernel_size):
        self.kernel_size = kernel_size

    def __repr__(self):
        return f'MaxPool2d(kernel_size={self.kernel_size})'

    def forward(self, X):
        self.input_shape_, K = X.shape, self.kernel_size
        self.max_indexes_ = [[[(lambda n,m,M,k: (n+M//k, m+M%k))(row, 
                                                                 col, 
                                                                 torch.argmax(X_channel[row:row+K,col:col+K]).item(),
                                                                 K)
                               for col in range(0,K*(X_channel.shape[1]//K),K)]
                              for row in range(0,K*(X_channel.shape[0]//K),K)]
                             for X_channel in X]
        return torch.cat([torch.unsqueeze(torch.tensor([[X[channel_no][n][m] 
                                                         for n, m in row] 
                                                        for row in X_channel]),
                                          0)
                          for channel_no, X_channel in enumerate(self.max_indexes_)])

    def backward(self):
        chosen_comps = torch.zeros(self.input_shape_)
        for channel_no, indexes in enumerate(self.max_indexes_):
            for indexes_row, errors_row in zip(indexes, self.errors_[channel_no]):
                for coord, value in zip(indexes_row, errors_row):
                    n, m = coord
                    chosen_comps[channel_no][n][m] = value
        return chosen_comps

    def step(self, lr):
        pass
    
    def setErrors(self, errors):
        self.errors_ = errors

In [4]:
data_digits = pd.read_csv('DATA/digits.csv')

In [5]:
X_img = torch.tensor(data_digits.iloc[[3]].drop('number_label', axis=1).to_numpy().reshape(1,8,8),
                     dtype=torch.float32)
y_res = torch.tensor(data_digits.iloc[[3]]['number_label'].values[0],
                     dtype=torch.int64)

In [17]:
conv1 = torch.nn.Conv2d(1,3,2)
conv2 = torch.nn.Conv2d(3,5,2)
lin1 = torch.nn.Linear(20,15)
classifier = torch.nn.Linear(15,10)

seq = torch.nn.Sequential()
seq.add_module('conv1', conv1)
seq.add_module('conv_relu1', torch.nn.ReLU())
seq.add_module('pool1', torch.nn.MaxPool2d(2))
seq.add_module('conv2', conv2)
seq.add_module('conv_relu2', torch.nn.ReLU())
seq.add_module('flatten', torch.nn.Flatten(0))
seq.add_module('lin1', lin1)
seq.add_module('lin_relu1', torch.nn.ReLU())
seq.add_module('classifier', classifier)

In [18]:
uconv1 = UConv2d(1,3,2)
uconv1.weights_ = conv1.weight.detach().clone()
uconv1.bias_ = conv1.bias.detach().clone()
uconv2 = UConv2d(3,5,2)
uconv2.weights_ = conv2.weight.detach().clone()
uconv2.bias_ = conv2.bias.detach().clone()
ulin1 = ULinear(20,15)
ulin1.weights_ = lin1.weight.detach().clone()
ulin1.bias_ = lin1.bias.detach().clone()
uclassifier = ULinear(15,10)
uclassifier.weights_ = classifier.weight.detach().clone()
uclassifier.bias_ = classifier.bias.detach().clone()

useq = USequential()
useq.add_module('conv1', uconv1)
useq.add_module('conv_relu1', UReLU())
useq.add_module('pool1', UMaxPool2d(2))
useq.add_module('conv2', uconv2)
useq.add_module('conv_relu2', UReLU())
useq.add_module('flatten', UFlatten())
useq.add_module('lin1', ulin1)
useq.add_module('lin_relu1', UReLU())
useq.add_module('classifier', uclassifier)

In [19]:
loss_function = torch.nn.CrossEntropyLoss()
optimizer = torch.optim.SGD(seq.parameters(), lr=0.1)

print(f'Изначальные параметры\nВеса\n{seq[0].weight.data}\nСмещение\n{seq[0].bias.data}')

for _ in range(2000):
    optimizer.zero_grad()
    output = seq(X_img)
    loss = loss_function(output, y_res)
    loss.backward()
    optimizer.step()

print(f'Обновленные параметры\nВеса\n{seq[0].weight.data}\nСмещение\n{seq[0].bias.data}')

Изначальные параметры
Веса
tensor([[[[ 0.1847, -0.2458],
          [-0.0636,  0.4786]]],


        [[[ 0.3010, -0.0439],
          [ 0.3827, -0.0795]]],


        [[[-0.0490, -0.1427],
          [ 0.0678,  0.0247]]]])
Смещение
tensor([0.2247, 0.2008, 0.0447])
Обновленные параметры
Веса
tensor([[[[ 0.1107, -0.2343],
          [-0.0878,  0.4663]]],


        [[[ 0.4894,  0.0032],
          [ 0.5653, -0.0479]]],


        [[[-0.0410, -0.1424],
          [ 0.1187,  0.0059]]]])
Смещение
tensor([0.2226, 0.2177, 0.0445])


In [20]:
print(f'Изначальные параметры\nВеса\n{useq[0].weights_}\nСмещение\n{useq[0].bias_}')

for _ in range(2000):
    uOutput = useq.forward(X_img)
    useq.backward(uOutput, y_res, 'CEL')
    useq.step(0.1)

print(f'Обновленные параметры\nВеса\n{useq[0].weights_.data}\nСмещение\n{useq[0].bias_}')

Изначальные параметры
Веса
tensor([[[[ 0.1847, -0.2458],
          [-0.0636,  0.4786]]],


        [[[ 0.3010, -0.0439],
          [ 0.3827, -0.0795]]],


        [[[-0.0490, -0.1427],
          [ 0.0678,  0.0247]]]])
Смещение
tensor([0.2247, 0.2008, 0.0447])
Обновленные параметры
Веса
tensor([[[[ 0.1107, -0.2343],
          [-0.0878,  0.4663]]],


        [[[ 0.4894,  0.0032],
          [ 0.5654, -0.0479]]],


        [[[-0.0410, -0.1424],
          [ 0.1187,  0.0059]]]])
Смещение
tensor([0.2226, 0.2177, 0.0445])


In [27]:
w = torch.empty(3,5)

In [31]:
w.uniform_(-1,1)

tensor([[-0.4186, -0.1342,  0.1167, -0.8624, -0.6144],
        [-0.3703,  0.2133, -0.4238, -0.8108,  0.1422],
        [-0.6933, -0.3509, -0.4158, -0.9608, -0.5929]])

In [30]:
w

tensor([[0.9035, 0.2425, 0.5926, 0.8160, 0.7627],
        [0.0097, 0.2591, 0.9980, 0.2924, 0.4337],
        [0.9270, 0.5133, 0.6040, 0.3185, 0.6474]])

In [33]:
a = np.sqrt(6/(4+5))

In [34]:
a

0.816496580927726