In [1]:
from abc import ABC, abstractmethod
import copy

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import torch

from typing import Union
from IPython.display import HTML, clear_output

In [3]:
BOS, EOS = ' ', '\n'

data = pd.read_json("./arxivData.json")
lines = data.apply(lambda row: (row['title'] + ' ; ' + row['summary'])[:128], axis=1) \
            .apply(lambda line: BOS + line.replace(EOS, ' ') + EOS) \
            .tolist()

tokens = list(set(''.join(lines)))

num_tokens = len(tokens)
print('num_tokens = ', num_tokens)

token_to_id = {token: idx for idx, token in enumerate(tokens)}

def to_matrix(data, token_to_id, max_len=None, dtype='int32', batch_first=True):
    """Casts a list of names into rnn-digestable matrix"""
    
    max_len = max_len or max(map(len, data))
    data_ix = np.zeros([len(data), max_len], dtype) + token_to_id[' ']

    for i in range(len(data)):
        line_ix = [token_to_id[c] for c in data[i]]
        data_ix[i, :len(line_ix)] = line_ix
        
    if not batch_first: # convert [batch, time] into [time, batch]
        data_ix = np.transpose(data_ix)

    return data_ix

In [7]:
class BaseLayer(ABC):
    @abstractmethod
    def __init__(self) -> None:
        pass

    def __call__(self, x: np.array, grad: bool = True) -> np.array:
        return self.forward(x, grad)

    @abstractmethod
    def forward(self, x: np.array, grad: bool = True) -> np.array:
        pass

    @abstractmethod
    def backward(self, output_error: np.array) -> np.array:
        pass

# conv1d check

In [9]:
class Conv1dVanilla(BaseLayer):
    """
    Сверточный слой, со страйдом 1 и без паддингов, для 1 предложения, не батча
    """
    def __init__(self, in_channels, out_channels, kernel_size):
        self.in_channels = in_channels
        self.out_channels = out_channels
        self.kernel_size = kernel_size

        scale = np.sqrt(1/(in_channels*kernel_size))
        self.kernel = np.random.uniform(-scale, scale, size=(out_channels, in_channels, kernel_size))
        self.bias = np.random.uniform(-scale, scale, size=(out_channels))

    def set_optimizer(self, optimizer):
        self.kernel_optimizer = copy.copy(optimizer)
        self.bias_optimizer = copy.copy(optimizer)

        self.kernel_optimizer.set_weight(self.kernel)
        self.bias_optimizer.set_weight(self.bias)

    def forward(self, x, grad=True):
        self.input = x

        self.output_len = x.shape[0] - self.kernel_size + 1
        output = np.zeros(shape=(self.output_len, self.out_channels))

        for kernel_i, ker in enumerate(self.kernel):
            for i in range(self.output_len):
                output[i:self.kernel_size+i, kernel_i] = self.bias[kernel_i] + np.sum(x[i:self.kernel_size+i, :] * ker.T)

        return output

    def backward(self, output_error):
        dy_dkernel = np.zeros(shape=self.kernel.shape)
        dy_dbias = np.zeros(shape=self.bias.shape)
        dy_dx = np.zeros(shape=self.input.shape)

        for kernel_i, ker in enumerate(self.kernel):
            helper_k = np.zeros(shape=ker.T.shape)

            for i in range(self.output_len):
                helper_k += self.input[i:self.kernel_size+i, :] * output_error[i, kernel_i]
                dy_dx[i:self.kernel_size+i, :] += ker.T * output_error[i, kernel_i]

            dy_dkernel[kernel_i] = helper_k.T
            dy_dbias[kernel_i] = np.sum(output_error[:, kernel_i])

        # self.kernel = self.kernel_optimizer.step(dy_dkernel)
        # self.bias = self.bias_optimizer.step(dy_dbias)

        # return dy_dx
        return dy_dkernel, dy_dbias, dy_dx

In [34]:
sample = to_matrix(np.random.choice(lines, size=5), token_to_id, max_len=130)

emb = Embedding(len(token_to_id), 16)
encoded_data = emb(sample[0])

conv = Conv1dVanilla(16, 4, 5)
torch_conv = torch.nn.Conv1d(16, 4, 5)
torch_conv.weight.data = torch.as_tensor(conv.kernel)
torch_conv.bias.data = torch.as_tensor(conv.bias)

torch_input = torch.tensor(encoded_data[np.newaxis, :], dtype=torch.float64, requires_grad=True)
torch_out = torch_conv(torch_input.permute(0, 2, 1))  # permute потому что torch на вход принимает формат [BATCH_SIZE, EMB_DIM, SENTENCE_LEN]

# проверка что forward работает также как у torch, 
print("Vanilla Forward такой же как и torch forward:", np.allclose(torch_out.permute(0, 2, 1).detach().numpy(), conv(encoded_data)))

print("Shape выхода свертки:", conv(encoded_data).shape)

# случайная ошибка, которая приходит "сверху" от вышестоящих слоев, по размеру она совпадает с выходом слоя
check_error = np.random.normal(loc=-3, scale=100, size=(conv(encoded_data).shape))
check_error_torch = torch.tensor(np.transpose(check_error[np.newaxis, :], (0, 2, 1)))

torch_out.backward(check_error_torch)  # считаем градиенты для всех тензоров, которые участвуют в forward проходе
kernel_grad, bias_grad, in_error = conv.backward(check_error)  # тоже самое, только в ручном слое

# проверка градиентов весов и входа
print("Градиент по входу совпадает:", np.allclose(torch_input.grad.detach().numpy(), in_error))
print("Градиент по смещениям совпадает:", np.allclose(torch_conv.bias.grad.detach().numpy(), bias_grad))
print("Градиент по ядру совпадает:", np.allclose(torch_conv.weight.grad.detach().numpy(), kernel_grad))

Vanilla Forward такой же как и torch forward: True
Shape выхода свертки: (126, 4)
Градиент по входу совпадает: True
Градиент по смещениям совпадает: True
Градиент по ядру совпадает: True


In [35]:
class Conv1d(BaseLayer):
    """
    Сверточный слой, со страйдом 1 и без паддингов, для батча
    """
    def __init__(self, in_channels, out_channels, kernel_size):
        self.in_channels = in_channels
        self.out_channels = out_channels
        self.kernel_size = kernel_size

        scale = np.sqrt(1/(in_channels*kernel_size))
        self.kernel = np.random.uniform(-scale, scale, size=(out_channels, in_channels, kernel_size))
        self.bias = np.random.uniform(-scale, scale, size=(out_channels))

    def set_optimizer(self, optimizer):
        self.kernel_optimizer = copy.copy(optimizer)
        self.bias_optimizer = copy.copy(optimizer)

        self.kernel_optimizer.set_weight(self.kernel)
        self.bias_optimizer.set_weight(self.bias)

    def forward(self, x, grad=True):
        """
        Работает с битчами вида [BATCH_SIZE, SENTENCE_LEN, EMB_DIM]
        """
        self.input = x
        self.batch_size = x.shape[0]
        self.input_len = x.shape[1]
        self.output_len = self.input_len - self.kernel_size + 1

        result = []

        for sentence in x:
            result.append(self._forward_for_one(sentence))

        return np.array(result)

    def _forward_for_one(self, x):
        """
        Просто свертка для 1 предложения
        """
        output = np.zeros(shape=(self.output_len, self.out_channels))

        # для каждого выходного канала и ядра, отвечающего за этот канал
        for kernel_i, ker in enumerate(self.kernel):
            # по выходной длине
            for i in range(self.output_len):
                # умножаем срез по размеру ядра на ядро и суммируем
                output[i:self.kernel_size+i, kernel_i] = self.bias[kernel_i] + np.sum(x[i:self.kernel_size+i, :] * ker.T)

        return output

    def backward(self, output_error):
        """
        Градиенты по всем батчу
        """
        dy_dkernels = []
        dy_dbiass = []
        dy_dxs = []

        for i in range(self.batch_size):
            dy_dkernel, dy_dbias, dy_dx = self._calc_grad_for_one(output_error[i], self.input[i])
            dy_dkernels.append(dy_dkernel)
            dy_dbiass.append(dy_dbias)
            dy_dxs.append(dy_dx)

        dy_dkernels = np.sum(np.array(dy_dkernels), axis=0)  # суммируем градиенты по батчу
        dy_dbiass = np.sum(np.array(dy_dbiass), axis=0)
        dy_dxs = np.array(dy_dxs)

        # self.kernel = self.kernel_optimizer.step(dy_dkernels)  # делаем шаг спуска по сумме градиентов
        # self.bias = self.bias_optimizer.step(dy_dbiass)

        # return dy_dxs
        return dy_dkernels, dy_dbiass, dy_dxs

    def _calc_grad_for_one(self, output_error, x):
        dy_dkernel = np.zeros(shape=self.kernel.shape)
        dy_dbias = np.zeros(shape=self.bias.shape)
        dy_dx = np.zeros(shape=x.shape)

        for kernel_i, ker in enumerate(self.kernel):
            helper_k = np.zeros(shape=ker.T.shape)

            for i in range(self.output_len):
                helper_k += x[i:self.kernel_size+i, :] * output_error[i, kernel_i]
                dy_dx[i:self.kernel_size+i, :] += ker.T * output_error[i, kernel_i]

            dy_dkernel[kernel_i] = helper_k.T
            dy_dbias[kernel_i] = np.sum(output_error[:, kernel_i])

        return dy_dkernel, dy_dbias, dy_dx

In [36]:
# тоже самое, только теперь ручная свертка умеет работать с батчами, np.newaxis теперь не нужен, размер входа будет [BATCH_SIZE, SENTENCE_LEN, EMB_DIM]

sample = to_matrix(np.random.choice(lines, size=5), token_to_id, max_len=130)

emb = Embedding(len(token_to_id), 16)
encoded_data = emb(sample) # батч из 5 предложений

conv = Conv1d(16, 4, 5)
torch_conv = torch.nn.Conv1d(16, 4, 5)
torch_conv.weight.data = torch.as_tensor(conv.kernel)
torch_conv.bias.data = torch.as_tensor(conv.bias)

torch_input = torch.tensor(encoded_data, dtype=torch.float64, requires_grad=True)
torch_out = torch_conv(torch_input.permute(0, 2, 1))  # permute потому что torch на вход принимает формат [BATCH_SIZE, EMB_DIM, SENTENCE_LEN]

# проверка что forward работает также как у torch, 
print("Vanilla Forward такой же как и torch forward:", np.allclose(torch_out.permute(0, 2, 1).detach().numpy(), conv(encoded_data)))

print("Shape выхода свертки:", conv(encoded_data).shape)

# случайная ошибка, которая приходит "сверху" от вышестоящих слоев, по размеру она совпадает с выходом слоя
check_error = np.random.normal(loc=-3, scale=100, size=(conv(encoded_data).shape))
check_error_torch = torch.tensor(np.transpose(check_error, (0, 2, 1)))

torch_out.backward(check_error_torch)  # считаем градиенты для всех тензоров, которые участвуют в forward проходе
kernel_grad, bias_grad, in_error = conv.backward(check_error)  # тоже самое, только в ручном слое

# проверка градиентов весов и входа
print("Градиент по входу совпадает:", np.allclose(torch_input.grad.detach().numpy(), in_error))
print("Градиент по смещениям совпадает:", np.allclose(torch_conv.bias.grad.detach().numpy(), bias_grad))
print("Градиент по ядру совпадает:", np.allclose(torch_conv.weight.grad.detach().numpy(), kernel_grad))

Vanilla Forward такой же как и torch forward: True
Shape выхода свертки: (5, 126, 4)
Градиент по входу совпадает: True
Градиент по смещениям совпадает: True
Градиент по ядру совпадает: True


# emb check

In [37]:
class Embedding(BaseLayer):
    def __init__(self, n_input, emb_dim, pad_idx=None):
        self.n_input = n_input
        self.emb_dim = emb_dim
        self.pad_idx = pad_idx
        
        self.weights = np.random.normal(scale=np.sqrt(2/(n_input+emb_dim)), size=(n_input, emb_dim))

    def set_optimizer(self, optimizer):
        self.weights_optimizer = copy.copy(optimizer)

        self.weights_optimizer.set_weight(self.weights)

    def forward(self, x, grad=True):
        self.input = x
        return self.weights[x]

    def backward(self, output_error):
        weights_grad = np.zeros_like(self.weights)
        input_shape_len = len(self.input.shape)

        if input_shape_len == 2:
            for batch_n, s in enumerate(self.input):
                for i, emb_i in enumerate(s):
                    weights_grad[emb_i] += output_error[batch_n][i]

        elif input_shape_len == 1:
            for i, emb_i in enumerate(self.input):
                weights_grad[emb_i] += output_error[i]

        if self.pad_idx is not None:
            weights_grad[self.pad_idx] = 0

        # self.weights = self.weights_optimizer.step(weights_grad)
        return weights_grad

In [40]:
# проверка на то, что градиент эмбеддингов считается правильно

emb = Embedding(len(token_to_id), 16)
print("Embeddings shape:", emb.weights.shape)

torch_emb = torch.nn.Embedding(len(token_to_id), 16)
torch_emb.weight.data = torch.as_tensor(emb.weights)

torch_out = torch_emb(torch.as_tensor(sample))

print("Forward совпадает:", np.allclose(torch_out.detach().numpy(), emb(sample)))

check_error = np.random.normal(0, 100, torch_out.shape)
check_error_torch = torch.tensor(check_error)

torch_out.backward(check_error_torch)

print("Градиенты совпадают", np.allclose(torch_emb.weight.grad.detach().numpy(), emb.backward(check_error)))

Embeddings shape: (136, 16)
Forward совпадает: True
Градиенты совпадают True


# linear check

In [44]:
class Linear(BaseLayer):
    """
    Linear class permorms ordinary FC layer in neural networks
    Parameters:
    n_input - size of input neurons
    n_output - size of output neurons
    Methods:
    set_optimizer(optimizer) - is used for setting an optimizer for gradient descent
    forward(x) - performs forward pass of the layer
    backward(output_error, learning_rate) - performs backward pass of the layer
    """

    def __init__(self, n_input: int, n_output: int) -> None:
        super().__init__()
        self.input = None
        self.n_input = n_input
        self.n_output = n_output
        self.w = np.random.normal(scale=np.sqrt(2 / (n_input + n_output)), size=(n_input, n_output))
        self.b = np.random.normal(scale=np.sqrt(2 / (n_input + n_output)), size=(1, n_output))

        self.w_optimizer = None
        self.b_optimizer = None

    def set_optimizer(self, optimizer) -> None:
        self.w_optimizer = copy.copy(optimizer)
        self.b_optimizer = copy.copy(optimizer)

        self.w_optimizer.set_weight(self.w)
        self.b_optimizer.set_weight(self.b)

    def forward(self, x: np.array, grad: bool = True) -> np.array:
        self.input = x
        return x.dot(self.w) + self.b

    def backward(self, output_error: np.array) -> np.array:
        # assert self.w_optimizer is not None and self.b_optimizer is not None, 'You should set an optimizer'
        w_grad = self.input.T.dot(output_error)
        b_grad = np.ones((1, len(output_error))).dot(output_error)
        input_error = output_error.dot(self.w.T)

        # self.w = self.w_optimizer.step(w_grad)
        # self.b = self.b_optimizer.step(b_grad)
        return w_grad, b_grad, input_error

In [50]:
# проверка того, что линейный слой работает правильно, транспонирование весов происходит потому, что домножение на веса в моем слое справа

sample = np.random.normal(loc=0, scale=100, size=(100, 16))

linear = Linear(16, 25)
torch_linear = torch.nn.Linear(16, 25)
torch_linear.weight.data = torch.as_tensor(linear.w.T)
torch_linear.bias.data = torch.as_tensor(linear.b)

torch_input = torch.tensor(sample, dtype=torch.float64, requires_grad=True)
torch_out = torch_linear(torch_input)

# проверка что forward работает также как у torch, 
print("Vanilla Forward такой же как и torch forward:", np.allclose(torch_out.detach().numpy(), linear(sample)))

print("Shape выхода линейного слоя:", linear(sample).shape)

# случайная ошибка, которая приходит "сверху" от вышестоящих слоев, по размеру она совпадает с выходом слоя
check_error = np.random.normal(loc=-3, scale=100, size=(linear(sample).shape))
check_error_torch = torch.tensor(check_error)

torch_out.backward(check_error_torch)  # считаем градиенты для всех тензоров, которые участвуют в forward проходе
kernel_grad, bias_grad, in_error = linear.backward(check_error)  # тоже самое, только в ручном слое

# проверка градиентов весов и входа
print("Градиент по входу совпадает:", np.allclose(torch_input.grad.detach().numpy(), in_error))
print("Градиент по смещениям совпадает:", np.allclose(torch_linear.bias.grad.detach().numpy(), bias_grad))
print("Градиент по ядру совпадает:", np.allclose(torch_linear.weight.grad.detach().numpy(), kernel_grad.T))

Vanilla Forward такой же как и torch forward: True
Shape выхода линейного слоя: (100, 25)
Градиент по входу совпадает: True
Градиент по смещениям совпадает: True
Градиент по ядру совпадает: True


In [60]:
class Linear3d(BaseLayer):
    """
    Linear class permorms ordinary FC layer in neural networks
    Parameters:
    n_input - size of input neurons
    n_output - size of output neurons
    Methods:
    set_optimizer(optimizer) - is used for setting an optimizer for gradient descent
    forward(x) - performs forward pass of the layer
    backward(output_error, learning_rate) - performs backward pass of the layer
    """

    def __init__(self, n_input: int, n_output: int) -> None:
        super().__init__()
        self.input = None
        self.n_input = n_input
        self.n_output = n_output
        self.w = np.random.normal(scale=np.sqrt(2 / (n_input + n_output)), size=(n_input, n_output))
        self.b = np.random.normal(scale=np.sqrt(2 / (n_input + n_output)), size=(1, n_output))

        self.w_optimizer = None
        self.b_optimizer = None

    def set_optimizer(self, optimizer) -> None:
        self.w_optimizer = copy.copy(optimizer)
        self.b_optimizer = copy.copy(optimizer)

        self.w_optimizer.set_weight(self.w)
        self.b_optimizer.set_weight(self.b)

    def forward(self, x: np.array, grad: bool = True) -> np.array:
        self.input = x
        return np.matmul(x, self.w) + self.b  # the same as @

    def backward(self, output_error: np.array) -> np.array:
        # assert self.w_optimizer is not None and self.b_optimizer is not None, 'You should set an optimizer'
        # перемножаем последние 2 измерения друг с другом с помощью matmul и суммируем
        w_grad = np.sum(np.transpose(self.input, (0, 2, 1)) @ output_error, axis=0)
        b_grad = np.sum(output_error, axis=(0, 1))
        input_error = output_error @ self.w.T

        # self.w = self.w_optimizer.step(w_grad)
        # self.b = self.b_optimizer.step(b_grad)
        return w_grad, b_grad, input_error

In [61]:
# проверка того, что линейный слой для 3-х измерений работает правильно, транспонирование весов происходит потому, что домножение на веса в моем слое справа

sample = np.random.normal(loc=0, scale=100, size=(100, 32, 16))

linear = Linear3d(16, 25)
torch_linear = torch.nn.Linear(16, 25)
torch_linear.weight.data = torch.as_tensor(linear.w.T)
torch_linear.bias.data = torch.as_tensor(linear.b)

torch_input = torch.tensor(sample, dtype=torch.float64, requires_grad=True)
torch_out = torch_linear(torch_input)

# проверка что forward работает также как у torch, 
print("Vanilla Forward такой же как и torch forward:", np.allclose(torch_out.detach().numpy(), linear(sample)))

print("Shape выхода линейного слоя:", linear(sample).shape)

# случайная ошибка, которая приходит "сверху" от вышестоящих слоев, по размеру она совпадает с выходом слоя
check_error = np.random.normal(loc=-3, scale=100, size=(linear(sample).shape))
check_error_torch = torch.tensor(check_error)

torch_out.backward(check_error_torch)  # считаем градиенты для всех тензоров, которые участвуют в forward проходе
kernel_grad, bias_grad, in_error = linear.backward(check_error)  # тоже самое, только в ручном слое

# проверка градиентов весов и входа
print("Градиент по входу совпадает:", np.allclose(torch_input.grad.detach().numpy(), in_error))
print("Градиент по смещениям совпадает:", np.allclose(torch_linear.bias.grad.detach().numpy(), bias_grad))
print("Градиент по ядру совпадает:", np.allclose(torch_linear.weight.grad.detach().numpy(), kernel_grad.T))

Vanilla Forward такой же как и torch forward: True
Shape выхода линейного слоя: (100, 32, 25)
Градиент по входу совпадает: True
Градиент по смещениям совпадает: True
Градиент по ядру совпадает: True


In [74]:
np.transpose(sample, (0, 2, 1))[1].dot(check_error[1])

array([[-5.33729935e+04,  2.99805194e+04,  3.96743561e+04,
         6.88546288e+03, -8.85127219e+04,  4.26513984e+04,
        -8.80717716e+04,  4.70198178e+04,  1.41635668e+05,
         1.10022164e+05,  4.20638563e+03,  4.08319039e+04,
         1.85867000e+04,  3.82476984e+04,  2.42839361e+04,
        -8.20261782e+04, -5.99060540e+04,  3.25365059e+01,
        -2.60974035e+04, -3.08492366e+04,  3.74566343e+03,
        -6.36716036e+04, -5.19861661e+04, -1.21212382e+03,
        -1.83173832e+04],
       [ 1.43420748e+05, -2.11700321e+03, -3.77749467e+03,
         1.13032592e+05, -1.24576652e+04,  5.12167133e+04,
         3.35379181e+04, -2.53694445e+04, -1.40089525e+04,
        -2.35459870e+04,  2.49881961e+04, -3.72129313e+03,
        -2.50694547e+04, -7.98593786e+03, -2.14602851e+04,
         5.52001197e+04,  3.12700945e+04,  6.46610080e+04,
        -6.89430058e+04,  3.99354935e+04,  1.43211015e+05,
         7.32862847e+04,  1.88587996e+04,  2.32676257e+04,
         2.83708205e+04],
    

In [75]:
(np.transpose(sample, (0, 2, 1))@check_error)[1]

array([[-5.33729935e+04,  2.99805194e+04,  3.96743561e+04,
         6.88546288e+03, -8.85127219e+04,  4.26513984e+04,
        -8.80717716e+04,  4.70198178e+04,  1.41635668e+05,
         1.10022164e+05,  4.20638563e+03,  4.08319039e+04,
         1.85867000e+04,  3.82476984e+04,  2.42839361e+04,
        -8.20261782e+04, -5.99060540e+04,  3.25365059e+01,
        -2.60974035e+04, -3.08492366e+04,  3.74566343e+03,
        -6.36716036e+04, -5.19861661e+04, -1.21212382e+03,
        -1.83173832e+04],
       [ 1.43420748e+05, -2.11700321e+03, -3.77749467e+03,
         1.13032592e+05, -1.24576652e+04,  5.12167133e+04,
         3.35379181e+04, -2.53694445e+04, -1.40089525e+04,
        -2.35459870e+04,  2.49881961e+04, -3.72129313e+03,
        -2.50694547e+04, -7.98593786e+03, -2.14602851e+04,
         5.52001197e+04,  3.12700945e+04,  6.46610080e+04,
        -6.89430058e+04,  3.99354935e+04,  1.43211015e+05,
         7.32862847e+04,  1.88587996e+04,  2.32676257e+04,
         2.83708205e+04],
    