# Пишем свой фреймворк

In [1]:
import numpy as np

In [2]:
import warnings
warnings.filterwarnings('ignore')

Инициализируем абстрактный класс "модуль":

In [3]:
class Module():
    def __init__(self):
        self._train = True
    
    def forward(self, input):
        raise NotImplementedError

    def backward(self,input, grad_output):
        raise NotImplementedError
    
    def parameters(self):
        'Возвращает список собственных параметров.'
        return []
    
    def grad_parameters(self):
        'Возвращает список тензоров-градиентов для своих параметров.'
        return []
    
    def train(self):
        self._train = True
    
    def eval(self):
        self._train = False

Sequential:

In [4]:
class Sequential(Module):
    def __init__ (self, *layers):
        super().__init__()
        self.layers = layers

    def forward(self, input):
        for layer in self.layers:
            input = layer.forward(input)
        self.output = input
        return self.output

    def backward(self, input, grad_output):
        for i in range(len(self.layers)-1, 0, -1):
            grad_output = self.layers[i].backward(self.layers[i-1].output, grad_output)        
        grad_input = self.layers[0].backward(input, grad_output)
        return grad_input
      
    def parameters(self):
        res = []
        for l in self.layers:
            res += l.parameters()
        return res
    
    def grad_parameters(self):
        res = []
        for l in self.layers:
            res += l.grad_parameters()
        return res
    
    def train(self):
        for layer in self.layers:
            layer.train()
    
    def eval(self):
        for layer in self.layers:
            layer.eval()

Fully-connected layer:

In [5]:
class Linear(Module):
    def __init__(self, dim_in, dim_out):
        super().__init__()
        stdv = 1./np.sqrt(dim_in)
        self.W = np.random.uniform(-stdv, stdv, size=(dim_in, dim_out))
        self.b = np.random.uniform(-stdv, stdv, size=dim_out)
        
    def forward(self, input):
        self.output = np.dot(input, self.W) + self.b
        return self.output
    
    def backward(self, input, grad_output):
        self.grad_b = np.mean(grad_output, axis=0)
        self.grad_W = np.dot(input.T, grad_output)
        self.grad_W /= input.shape[0]
        grad_input = np.dot(grad_output, self.W.T)
        
        return grad_input
    
    def parameters(self):
        return [self.W, self.b]
    
    def grad_parameters(self):
        return [self.grad_W, self.grad_b]

ReLU:

In [6]:
class ReLU(Module):
    def __init__(self):
         super().__init__()
    
    def forward(self, input):
        self.output = np.maximum(input, 0)
        return self.output
    
    def backward(self, input, grad_output):
        grad_input = np.multiply(grad_output, input > 0)
        return grad_input

In [7]:
import torch.nn as nn
import torch

Сравним функцию с реализацией в пайторч:

In [8]:
input_torch = torch.tensor([1., 2., -3., 4., -7.], requires_grad=True)
input_array = np.array([1, 2, -3, 4, -7])

layer_custom = ReLU()
layer_torch = nn.ReLU()(input_torch)

print('ReLU custom:\n', layer_custom.forward(input_array))
print('ReLU torch:\n', layer_torch)

layer_torch.backward(torch.Tensor([1, 1, 1, 1, 1]))
print('ReLU backward torch:\n', input_torch.grad)
print('ReLU backward custom:\n', layer_custom.backward(input_array, np.ones(5)))


ReLU custom:
 [1 2 0 4 0]
ReLU torch:
 tensor([1., 2., 0., 4., 0.], grad_fn=<ReluBackward0>)
ReLU backward torch:
 tensor([1., 1., 0., 1., 0.])
ReLU backward custom:
 [1. 1. 0. 1. 0.]


LeakyReLU:

In [19]:
class LeakyReLU(Module):
    def __init__(self, slope=0.03):
        super().__init__()
        self.slope = slope

    def forward(self, input):
        self.output = np.maximum(input, np.multiply(self.slope, input))
        return self.output

    def backward(self, input, grad_output):
        grad_input = np.multiply(grad_output, input > 0) + np.multiply(self.slope, np.multiply(grad_output, input < 0))
        return grad_input

Сравнение функций для LeakyReLU:

In [20]:
input_torch = torch.tensor([1., 2., -3., 4., -7.], requires_grad=True)
input_array = np.array([1, 2, -3, 4, -7])

layer_custom = LeakyReLU(0.05)
layer_torch = nn.LeakyReLU(0.05)(input_torch)

print('LeakyReLU custom:\n', layer_custom.forward(input_array))
print('LeakyReLU torch:\n', layer_torch)

layer_torch.backward(torch.Tensor([1, 1, 1, 1, 1]))
print('LeakyReLU backward torch:\n', input_torch.grad)
print('LeakyReLU backward custom:\n', layer_custom.backward(input_array, np.ones(5)))


LeakyReLU custom:
 [ 1.    2.   -0.15  4.   -0.35]
LeakyReLU torch:
 tensor([ 1.0000,  2.0000, -0.1500,  4.0000, -0.3500],
       grad_fn=<LeakyReluBackward0>)
LeakyReLU backward torch:
 tensor([1.0000, 1.0000, 0.0500, 1.0000, 0.0500])
LeakyReLU backward custom:
 [1.   1.   0.05 1.   0.05]


Sigmoid:

In [21]:
class Sigmoid(Module):
    def __init__(self):
        super().__init__()

    def forward(self, input):
        self.output = 1 / (1 + np.exp(-input))
        return self.output
    
    def backward(self, input, grad_output):
        grad_input = np.multiply(self.forward(input), 1 - self.forward(input))
        return grad_input

Сравнение функций для Sigmoid:

In [22]:
input_torch = torch.tensor([[ 0.5570, -0.9258], [ 0.5570, -0.9258]], requires_grad=True)
input_array = np.array([[ 0.5570, -0.9258], [ 0.5570, -0.9258]])

layer_custom = Sigmoid()
layer_torch = nn.Sigmoid()(input_torch)

print('custom:\n', layer_custom.forward(input_array))
print('torch:\n', layer_torch)

layer_torch.backward(torch.Tensor([[1, 1], [1, 1]]))
print('Sigmoid backward torch:\n', input_torch.grad)
print('Sigmoid backward custom:\n', layer_custom.backward(input_array, np.ones((2, 2))))

custom:
 [[0.63575811 0.28377758]
 [0.63575811 0.28377758]]
torch:
 tensor([[0.6358, 0.2838],
        [0.6358, 0.2838]], grad_fn=<SigmoidBackward>)
Sigmoid backward torch:
 tensor([[0.2316, 0.2032],
        [0.2316, 0.2032]])
Sigmoid backward custom:
 [[0.23156973 0.20324786]
 [0.23156973 0.20324786]]


Softmax:

Нам не надо писать backward для софтмакса, потому что он уже учтен в backward'e кросс-энтропии:

In [23]:
class SoftMax(Module):
    def __init__(self):
         super().__init__()
    
    def forward(self, input):
        sub_input = np.subtract(input, input.max(axis=1, keepdims=True))
        self.output = np.exp(sub_input) / np.sum(np.exp(sub_input), axis=1, keepdims=True)
        return self.output
    
    def backward(self, input, grad_output):
        return grad_output

Сравнение функций для Softmax:

In [24]:
input_array = np.random.uniform(0, 1, (3, 5))
grad = np.random.uniform(0, 1, (3, 5))
input_tensor = torch.tensor(input_array, requires_grad=True)
layer_custom = SoftMax()
layer_torch = nn.functional.softmax(input_tensor)

print('custom:\n', layer_custom.forward(input_array))
print('torch:\n', layer_torch)

# Не было необходимости писать беквард для софтмакса, но давайте все равно его протестим:
def softmax_backward(softmax, grad_out):
    result = []
    for i in range(grad_out.shape[0]):
        result.append(np.dot((np.diagflat(softmax[i]) - np.dot(softmax[i].T.reshape(-1,1), softmax[i].reshape(1,-1))), grad_out[i].reshape(-1, 1)))
    return np.array(result).reshape(grad.shape)

layer_torch.backward(torch.Tensor(grad))
print()
print('Softmax backward torch:\n', input_tensor.grad)
print('Softmax backward custom:\n', softmax_backward(layer_custom.forward(input_array), grad))


custom:
 [[0.2157302  0.32267532 0.1661519  0.13603299 0.15940959]
 [0.1436334  0.16224488 0.22408223 0.25565338 0.21438612]
 [0.1846925  0.21401673 0.17103187 0.18323973 0.24701917]]
torch:
 tensor([[0.2157, 0.3227, 0.1662, 0.1360, 0.1594],
        [0.1436, 0.1622, 0.2241, 0.2557, 0.2144],
        [0.1847, 0.2140, 0.1710, 0.1832, 0.2470]], dtype=torch.float64,
       grad_fn=<SoftmaxBackward>)

Softmax backward torch:
 tensor([[-0.0685, -0.0214,  0.0471,  0.0611, -0.0183],
        [-0.0076, -0.0775,  0.0341,  0.1054, -0.0545],
        [ 0.0477,  0.0150,  0.0712, -0.0611, -0.0729]], dtype=torch.float64)
Softmax backward custom:
 [[-0.06848399 -0.02139871  0.04710827  0.06105249 -0.01827806]
 [-0.007614   -0.07746672  0.03412195  0.10541092 -0.05445214]
 [ 0.04769257  0.01500802  0.07120759 -0.06105651 -0.07285167]]


Dropout:

In [25]:
class Dropout(Module):
    def __init__(self, p=0.5):
        super().__init__()
        self.p = p
        self.mask = None

    def forward(self, input):
        if self._train:
            self.mask = np.random.binomial(1, (1-self.p), size=input.shape)
            self.output = self.mask * input
        else:
            self.output = input * (1-self.p)
        return self.output

    def backward(self, input, grad_output):
        if self._train:
            self.grad_input = self.mask * grad_output
        else:
            self.grad_input = grad_output
        return self.grad_input

BatchNorm:

In [26]:
class BatchNorm(Module):
    def __init__(self, num_features):
        super().__init__()
        self.gamma = np.ones(num_features)
        self.beta = np.zeros(num_features)
        self.mu = np.zeros(shape=num_features)
        self.sigma = np.ones(shape=num_features)
        self.momentum = 0.9
        self.eps = 1e-5

    def forward(self, input):
        if self._train:
            mu_new = np.mean(input, axis=0)
            sigma_new = np.mean((input - mu_new) ** 2, axis=0)
            self.mu = self.momentum * self.mu + (1 - self.momentum) * mu_new
            self.sigma = self.momentum * self.sigma + (1 - self.momentum) * sigma_new
            input_norm = (input - mu_new) / np.sqrt(sigma_new + self.eps)
            self.output = self.gamma * input_norm + self.beta
        else:
            input_norm = (input - self.mu) / np.sqrt(self.sigma + self.eps)
            self.output = self.gamma * input_norm + self.beta
        return self.output

    def backward(self, input, grad_output):
        if self._train:
            mu = np.mean(input, axis=0)
            sigma = np.mean((input - mu) ** 2, axis=0)
            input_norm = (input - mu) / np.sqrt(sigma + self.eps)
            t = 1. / np.sqrt(sigma + self.eps)
            m = input.shape[0]
            self.grad_gamma = np.sum(grad_output * input_norm, axis=0)
            self.grad_beta = np.sum(grad_output, axis=0)
            grad_x = (self.gamma * t / m) * (m * grad_output - t ** 2 * (input - mu) * np.sum(grad_output * (input - mu), axis=0) - np.sum(grad_output, axis=0))
            grad_input = grad_x

        else:
            input_norm = (input - self.mu) / np.sqrt(self.sigma + self.eps)
            t = 1. / np.sqrt(self.sigma + self.eps)
            m = input.shape[0]
            self.grad_gamma = np.sum(grad_output * input_norm, axis=0)
            self.grad_beta = np.sum(grad_output, axis=0)
            grad_x = (self.gamma * t / m) * (m * grad_output - t ** 2 * (input - self.mu) * np.sum(grad_output * (input - self.mu), axis=0) - np.sum(grad_output, axis=0))
            grad_input = grad_x

        return grad_input

    def parameters(self):
        return [self.gamma, self.beta]

    def grad_parameters(self):
        return [self.grad_gamma, self.grad_beta]

Сравнение для BatchNorm:

In [27]:
input_tensor = torch.randn(64, 5)
grad_tensor = torch.randn(64, 5)

layer_custom = BatchNorm(5)
layer_torch = nn.BatchNorm1d(5)

layer_torch.train()
layer_custom._train = True

print('custom:\n', layer_custom.forward(input_tensor.detach().numpy())[:5])
print('torch:\n', layer_torch(input_tensor)[:5])

print('--------------')

custom:
 [[-0.31578416  0.2206727   0.55805683  0.53968656  0.26611346]
 [ 1.61705005  0.22198747  0.06159324  1.19194663  0.13319016]
 [ 2.05434728  0.18678124  1.21635592 -0.17682622  1.08032107]
 [-0.33395895 -0.65781593 -0.55126923  0.87074119  0.73552126]
 [ 0.85521311  0.96148813 -1.66470075  0.24545448  2.28932834]]
torch:
 tensor([[-0.3158,  0.2207,  0.5581,  0.5397,  0.2661],
        [ 1.6171,  0.2220,  0.0616,  1.1919,  0.1332],
        [ 2.0543,  0.1868,  1.2164, -0.1768,  1.0803],
        [-0.3340, -0.6578, -0.5513,  0.8707,  0.7355],
        [ 0.8552,  0.9615, -1.6647,  0.2455,  2.2893]],
       grad_fn=<SliceBackward>)
--------------


Абстрактный класс для Criterion:

In [28]:
class Criterion():        
    def forward(self, input, target):
        raise NotImplementedError

    def backward(self, input, target):
        raise NotImplementedError

MSE:

In [29]:
class MSE(Criterion):
    def forward(self, input, target):
        self.output = np.sum(np.power(input - target, 2)) / input.shape[0]
        return self.output
 
    def backward(self, input, target):
        self.grad_output  = (input - target) * 2 / input.shape[0]
        return self.grad_output

Сравнение функций для MSE:

In [30]:
predict = np.random.randn(5, 1)
labels = np.random.randn(5, 1)

pred_tensor = torch.tensor(predict, requires_grad=True)
labels_tensor = torch.tensor(labels)

MSE_torch = nn.functional.mse_loss(pred_tensor, labels_tensor)

print('torch: ', MSE_torch)
print('custom: ', MSE().forward(predict, labels))

MSE_torch.backward()
print('Gradient torch:\n', pred_tensor.grad)
print('Gradient custom:\n', MSE().backward(predict, labels))

torch:  tensor(1.7903, dtype=torch.float64, grad_fn=<MseLossBackward>)
custom:  1.7903148627563954
Gradient torch:
 tensor([[ 1.1027],
        [ 0.0807],
        [ 0.3511],
        [-0.1863],
        [-0.2276]], dtype=torch.float64)
Gradient custom:
 [[ 1.10272365]
 [ 0.08073183]
 [ 0.35105221]
 [-0.18627612]
 [-0.22759252]]


CrossEntropy:

In [35]:
class CrossEntropy(Criterion):
    def __init__(self):
        super().__init__()
        
    def forward(self, input, target): 
        eps = 1e-9
        input_clamp = np.clip(input, eps, 1 - eps)               
        self.output = -1 * np.sum( np.multiply( np.log( SoftMax().forward(input_clamp) ), target ) ) / input_clamp.shape[0]
        return self.output

    def backward(self, input, target):
        eps = 1e-9
        input_clamp = np.clip(input, eps, 1 - eps)

        grad_input = (SoftMax().forward(input_clamp) - target)
        
        # Чтобы градиент сходился с градиентом торча, раскомментить:           
        # grad_input = (SoftMax().forward(input_clamp) - target) / input_clamp.shape[0]

        return grad_input

Cравнение функций для CrossEntropy:

In [36]:
predict = np.random.uniform(0, 1, (3, 5))
labels = np.array([[0, 0, 1, 0, 0], [1, 0, 0, 0, 0], [0, 0, 0, 0, 1]])
print()
print('predict:\n', predict)
print('labels:\n', labels)

pred_tensor = torch.tensor(predict, requires_grad=True)
labels_tensor = torch.tensor([2, 0, 4], dtype=torch.long)

CE_torch = nn.functional.cross_entropy(pred_tensor, labels_tensor)

print('torch: ', CE_torch)
print('custom: ', CrossEntropy().forward(predict, labels))

CE_torch.backward()
print('Gradient torch:\n', pred_tensor.grad)
print('Gradient custom:\n', CrossEntropy().backward(predict, labels))


predict:
 [[0.69067852 0.43224745 0.57056269 0.72160854 0.13139046]
 [0.90132604 0.56409229 0.94320448 0.01381289 0.75059249]
 [0.59322367 0.16130528 0.33159224 0.68269917 0.46577442]]
labels:
 [[0 0 1 0 0]
 [1 0 0 0 0]
 [0 0 0 0 1]]
torch:  tensor(1.5236, dtype=torch.float64, grad_fn=<NllLossBackward>)
custom:  1.5235587728685556
Gradient torch:
 tensor([[ 0.0782,  0.0604, -0.2640,  0.0807,  0.0447],
        [-0.2506,  0.0591,  0.0863,  0.0341,  0.0712],
        [ 0.0759,  0.0493,  0.0584,  0.0830, -0.2665]], dtype=torch.float64)
Gradient custom:
 [[ 0.23462562  0.18119251 -0.79192985  0.24199599  0.13411573]
 [-0.75175415  0.17718348  0.25886275  0.10219742  0.2135105 ]
 [ 0.2276351   0.1477949   0.17523229  0.24894187 -0.79960415]]
