In [1]:
import torch
from torch import nn
import torch.nn.functional as F

import numpy as np

In [2]:
class Softmax(object):
    @staticmethod
    def forward(x_in):
        exps = np.exp(x_in-np.max(x_in, axis=-1, keepdims=True))
        return exps / np.sum(exps, axis=-1, keepdims=True)


class Tanh(object):

    @staticmethod
    def forward(x_in):
        return np.tanh(x_in)

    @staticmethod
    def backward(x_in):
        # dEdX = dEdY * dYdX = dEdY * 1 - (tanh(X))^2
        return 1 - (x_in) ** 2


class ReLu(object):

    @staticmethod
    def forward(x_in):
        return np.maximum(x_in, 0)

    @staticmethod
    def backward(x_in):
        return x_in > 0
    
    
class CrossEntropyLoss(object):
    def __init__(self):
        self.y_pred = None

    def forward(self, y, o):
        self.y_pred = Softmax.forward(o)
        return np.sum(-y * np.log(self.y_pred + 1e-15))/y.shape[0]

    def backward(self, y):
        return (self.y_pred - y) / y.shape[0]

In [3]:
class RnnLayer(object):

    def __init__(self, input_dim, hidden_dim, seq_len, batch_size, use_bias=True, activation=Tanh):
        sq = np.sqrt(1. / hidden_dim)
        self.use_bias = use_bias
        self.seq_len = seq_len
        self.batch_size = batch_size
        self.hidden_dim = hidden_dim
        self.input_dim = input_dim
        self.activation = activation()
        self.input_weights = np.random.uniform(-sq, sq, (hidden_dim, input_dim))
        self.hidden_weights = np.random.uniform(-sq, sq, (hidden_dim, hidden_dim))

        if self.use_bias:
            self.bias = np.random.uniform(-sq, sq, hidden_dim)
        else:
            self.bias = np.zeros(hidden_dim)

    def forward(self, x_in):
        # treba li dodati provjeru je li X_in stvarno ima sekvencu jednaku seq_len?
        # treba li dodati provjeru je li X_in prva koordinata jednaka batch_size

        # u ovom slucaju sam pretpostavio da je za sve inpute, pocetno stanje 0 u 0. vremenskom trenutku
        H = np.zeros((self.batch_size, self.seq_len + 1, self.hidden_dim))

        for i in range(self.seq_len):
            input_part = np.einsum('ij,jk->ik', x_in[:, i, :], self.input_weights.T)
            hidden_part = np.einsum('ij,jk->ik', H[:, i, :], self.hidden_weights.T)

            H[:, i + 1, :] = self.activation.forward(input_part + hidden_part + self.bias)

        return H, H[:, self.seq_len, :]

    def book_forward(self, x_in):

        H = np.zeros((self.batch_size, self.seq_len + 1, self.hidden_dim))

        for i in range(self.seq_len):
            # ovdje dobivam transponirano iz mog forwarda, ali sam u einsum zamijenio vrijednosti, tako da zapravo dobijem isto
            input_part = np.einsum('ij,jk->ki', self.input_weights, x_in[:, i, :].T)
            hidden_part = np.einsum('ij,jk->ik', self.hidden_weights, H[:, i, :].T)

            H[:, i + 1, :] = self.activation.forward(input_part + hidden_part + self.bias)

        return H, H[:, self.seq_len, :]

    def backward(self, x, h, dEdY):
        dEdW_in = np.zeros_like(self.input_weights)
        dEdW_hh = np.zeros_like(self.hidden_weights)

        dEdB_in = np.zeros_like(self.bias)
        
        H_grad = np.zeros((self.batch_size, self.seq_len + 1, self.hidden_dim))
        H_grad[:, self.seq_len, :] = dEdY[:,self.seq_len - 1, :]
        

        for i in range(self.seq_len, 0, -1):
            
            activation_backward = self.activation.backward(h[:, i, :])
            back_reshaped = activation_backward.reshape(self.batch_size, self.hidden_dim, 1)
            
            dEdW_in += np.sum(back_reshaped * (np.einsum('bh,bi->bhi', H_grad[:, i, :], x[:, i - 1, :])), axis=0)
            dEdW_hh += np.sum(back_reshaped * (np.einsum('bh,bk->bhk', H_grad[:, i, :], h[:, i - 1, :])), axis=0)

            if self.use_bias:
                dEdB_in += np.sum(self.activation.backward(h[:, i, :]) * H_grad[:, i, :], axis=0)
            else:
                pass

            if i > 1:
                H_grad[:, i - 1, :] = np.einsum('bh,hk->bk', H_grad[:, i, :],
                                                self.hidden_weights) * activation_backward + dEdY[:, i - 2, :]
            else:
                H_grad[:, i - 1, :] = np.einsum('bh,hk->bk', H_grad[:, i, :],
                                                self.hidden_weights) * activation_backward

    
        return dEdW_in, dEdW_hh, dEdB_in, H_grad

    def backward_checker(self, X, H, dEdY):
        dEdW_in = np.zeros_like(self.input_weights)
        dEdW_hh = np.zeros_like(self.hidden_weights)

        dEdB_in = np.zeros_like(self.bias)

        H_grad = np.zeros((self.batch_size, self.seq_len + 1, self.hidden_dim))
        H_grad[:, self.seq_len, :] = dEdY[:, self.seq_len - 1, :]
        
        for i in range(self.seq_len, 0, -1):

            for k in range(self.batch_size):
                act_grad = np.diag(self.activation.backward(H[k, i, :]))
                h_grad = H_grad[k, i, :].reshape(self.hidden_dim, 1)

                dEdW_in += np.dot(act_grad, np.dot(h_grad, X[k, i - 1, :].reshape(1, self.input_dim)))
                dEdW_hh += np.dot(act_grad, np.dot(h_grad, H[k, i - 1, :].reshape(1, self.hidden_dim)))

            if self.use_bias:
                dEdB_in += np.sum(self.activation.backward(H[:, i, :]) * H_grad[:, i, :], axis=(0))
            else:
                pass

            if i > 1:
                H_grad[:, i - 1, :] = np.dot(H_grad[:, i, :],self.hidden_weights) * self.activation.backward(H[:, i, :]) + dEdY[:,i - 2, :]
            else:
                H_grad[:, i - 1, :] = np.dot(H_grad[:, i, :],self.hidden_weights) * self.activation.backward(H[:, i, :])

        return dEdW_in, dEdW_hh, dEdB_in
    
    
class DenseLayer(object):

    def __init__(self, input_dim, output_dim, use_bias=True):
        sq = np.sqrt(1. / input_dim)
        self.use_bias = use_bias
        self.weights = np.random.uniform(-sq, sq, (output_dim, input_dim))
        if use_bias:
            self.bias = np.random.uniform(-sq, sq, output_dim)
        else:
            self.bias = np.zeros(output_dim)

    def forward(self, x_in):
        return np.tensordot(x_in, self.weights.T, axes=((-1), 0)) + self.bias

    def backward(self, de_dy, x_in):
        # de_dw = de_dy * dYdW = de_dy * X
        # dEdb = de_dy * dYdb = de_dy
        # dEdX = de_dy * dYdX = de_dy * W
        axis = tuple(range(len(x_in.shape) - 1))
        de_dw = np.tensordot(de_dy, x_in, axes=(axis, axis))
        de_db = np.sum(de_dy, axis=axis)
        de_dx = np.tensordot(de_dy, self.weights, axes=(-1, 0))

        return de_dx, de_dw, de_db

    def refresh(self, de_dw, de_db, learning_rate):
        self.weights = self.weights - learning_rate * de_dw
        if self.use_bias:
            self.bias = self.bias - learning_rate * de_db

## Softmax, Cross Entropy Loss

In [4]:
N = 5
num_classes = 4

x = torch.randn(N, num_classes) # logits
y = torch.randint(num_classes, (N,)) # labels

x_ = x.numpy()
y_ = np.eye(num_classes)[y.numpy()] # one-hot encoding

In [5]:
softmax_out = F.softmax(x, dim=-1)
softmax_out

softmax = Softmax()
softmax_out_ = softmax.forward(x_)

print('Softmax check:', np.isclose(softmax_out, softmax_out_).all())

Softmax check: True


In [6]:
cel = nn.CrossEntropyLoss()
loss = cel(x, y).item()

cel_ = CrossEntropyLoss()
loss_ = cel_.forward(y_, x_)
loss_

print('Cross entropy loss check:', np.isclose(loss, loss_))

Cross entropy loss check: True


## Linear Layer

In [7]:
N = 5
num_classes = 4
seq_len = 3

x = torch.randn(N, seq_len, requires_grad=True)
y = torch.randint(num_classes, (N,))

x_ = x.detach().numpy()
y_ = np.eye(num_classes)[y.numpy()]

linear = nn.Linear(seq_len, num_classes)
linear_ = DenseLayer(seq_len, num_classes)

linear_.weights = linear.weight.detach().numpy()
linear_.bias = linear.bias.detach().numpy()

lin_out = linear(x)
lin_out_ = linear_.forward(x_)

print('Linear layer forward check: ', np.isclose(lin_out.detach().numpy(), lin_out_).all())

Linear layer forward check:  True


In [8]:
loss = cel(lin_out, y)
lin_out.retain_grad()
loss.retain_grad()
loss.backward()

print(f'[TORCH] dE/dy:\n{lin_out.grad}\n')
print(f'[TORCH] dE/dW:\n{linear.weight.grad}\n')
print(f'[TORCH] dE/dB:\n{linear.bias.grad}\n')
print(f'[TORCH] dE/dX:\n{x.grad}\n')

[TORCH] dE/dy:
tensor([[-0.1788,  0.0606,  0.0739,  0.0443],
        [ 0.0529,  0.0595,  0.0175, -0.1299],
        [ 0.0990,  0.0464, -0.1736,  0.0282],
        [-0.1731,  0.0731,  0.0470,  0.0531],
        [ 0.0177,  0.0843, -0.1569,  0.0549]])

[TORCH] dE/dW:
tensor([[ 0.6432,  0.0048,  0.0285],
        [-0.1244,  0.0718, -0.0022],
        [-0.1545, -0.1517, -0.0556],
        [-0.3643,  0.0751,  0.0292]])

[TORCH] dE/dB:
tensor([-0.1823,  0.3239, -0.1921,  0.0505])

[TORCH] dE/dX:
tensor([[-0.0961, -0.0593, -0.1335],
        [ 0.0058,  0.0971,  0.0600],
        [ 0.1024,  0.0162,  0.0155],
        [-0.0838, -0.0608, -0.1402],
        [ 0.0650, -0.0128, -0.0527]])



In [9]:
loss_ = cel_.forward(y_, lin_out_)
loss_

de_dy = cel_.backward(y_)
de_dx, de_dw, de_db = linear_.backward(de_dy, x_)

In [10]:
print(f'[CUSTOM] dE/dy:\n{de_dy}\n')
print(f'[CUSTOM] dE/dW:\n{de_dw}\n')
print(f'[CUSTOM] dE/dB:\n{de_db}\n')
print(f'[CUSTOM] dE/dX:\n{de_dx}\n')

print('Check dE/dy:', np.isclose(lin_out.grad, de_dy).all())
print('Check dE/dX:', np.isclose(x.grad, de_dx).all())
print('Check dE/dW:', np.isclose(linear.weight.grad, de_dw).all())
print('Check dE/dB:', np.isclose(linear.bias.grad, de_db).all())

[CUSTOM] dE/dy:
[[-0.17883323  0.0606169   0.07393674  0.0442796 ]
 [ 0.05292938  0.05950016  0.0174878  -0.12991735]
 [ 0.09900548  0.04641851 -0.17358001  0.02815602]
 [-0.17313426  0.0730741   0.04697702  0.05308315]
 [ 0.01770099  0.08429919 -0.15692319  0.05492302]]

[CUSTOM] dE/dW:
[[ 0.64319172  0.00482929  0.02852225]
 [-0.12438113  0.07183421 -0.0021866 ]
 [-0.15453876 -0.15172726 -0.05558313]
 [-0.36427187  0.07506376  0.02924749]]

[CUSTOM] dE/dB:
[-0.18233163  0.32390886 -0.19210164  0.05052444]

[CUSTOM] dE/dX:
[[-0.09606687 -0.05928671 -0.13354808]
 [ 0.00578424  0.09710226  0.06000554]
 [ 0.10241285  0.01616698  0.01546191]
 [-0.08383915 -0.06081502 -0.14015159]
 [ 0.06502276 -0.0128443  -0.05272639]]

Check dE/dy: True
Check dE/dX: True
Check dE/dW: True
Check dE/dB: True


## RNN

In [14]:
#N = 5
#emb_dim = 6
#seq_len = 3
#hidden_dim = 8

N = 5
emb_dim = 6
seq_len = 3
hidden_dim = 5


x = torch.randn(N, seq_len, emb_dim, requires_grad=True)

x_ = x.detach().numpy()

rnn = nn.RNN(emb_dim, hidden_dim, bias=False, batch_first=True)
rnn_ = RnnLayer(emb_dim, hidden_dim, seq_len, N, use_bias=False)
rnn_.input_weights = rnn.weight_ih_l0.detach().numpy()
rnn_.hidden_weights = rnn.weight_hh_l0.detach().numpy()

rnn_out, h_n = rnn(x)
rnn_out_, h_n_ = rnn_.forward(x_)
rnn_out__ = rnn_out_[:, 1:, :] # remove zeros prepended to every hidden output

print('RNN layer forward check: ', np.isclose(rnn_out.detach().numpy(), rnn_out__).all())
print('RNN layer forward check last hidden: ', np.isclose(h_n.detach().numpy(), h_n_).all())

RNN layer forward check:  True
RNN layer forward check last hidden:  True


In [15]:
de_dy = torch.randn(N, seq_len, hidden_dim)
de_dy_ = de_dy.numpy()

rnn_out.retain_grad()
rnn_out.backward(de_dy)

print(f'[TORCH] dE/dWih:\n{rnn.weight_ih_l0.grad}\n')
print(f'[TORCH] dE/dWhh:\n{rnn.weight_hh_l0.grad}\n')
print(f'[TORCH] dE/dy:\n{de_dy}\n')
print(f'[TORCH] dE/dH:\n{rnn_out.grad}\n')

[TORCH] dE/dWih:
tensor([[ 1.5632,  1.0950, -0.4356, -0.9051, -0.5419,  1.1497],
        [ 3.6365, -3.2831, -3.2712, -3.2545,  0.7571, -1.6923],
        [-1.8961, -0.0933,  1.3080,  1.0940,  1.2342, -0.6066],
        [ 0.0826, -0.9934, -0.1278,  4.1134, -1.2327,  0.5618],
        [-3.4931, -2.2032,  3.2736,  0.8773, -2.4627,  1.8469]])

[TORCH] dE/dWhh:
tensor([[-1.6476,  0.5321, -0.9045, -0.2720,  1.3781],
        [ 2.0258, -1.3684,  0.7980,  1.7109, -1.7800],
        [-0.9835,  0.2355,  0.1630, -0.1145,  0.3965],
        [ 1.4600, -0.2265, -0.3678, -1.5220,  0.7827],
        [ 0.0067, -1.2813,  0.6293,  1.6404, -0.5236]])

[TORCH] dE/dy:
tensor([[[ 0.3413, -0.5822, -0.7120,  0.6790,  0.0666],
         [ 1.4542,  1.0778, -1.0873, -0.4267, -1.2978],
         [-0.7192, -1.3991, -0.6228,  0.9242,  1.2328]],

        [[-0.8214,  1.8177, -0.7652,  1.4600,  0.6283],
         [ 0.0898, -2.4753, -0.2170,  0.2810, -1.3683],
         [-0.1085, -0.2659, -0.8700,  0.3504, -1.1689]],

        [[ 1

In [16]:
dEdW_in, dEdW_hh, _, H_grad = rnn_.backward(x_, rnn_out_, de_dy_)

print(f'[CUSTOM] dE/dWih:\n{dEdW_in}\n')
print(f'[CUSTOM] dE/dWhh:\n{dEdW_hh}\n')
print(f'[CUSTOM] dE/dH:\n{H_grad[:,1:,:]}\n')

print('RNN layer gradient check dEdW_in: ', np.isclose(rnn.weight_ih_l0.grad.numpy(), dEdW_in).all())
print('RNN layer gradient check dEdW_hh: ', np.isclose(rnn.weight_hh_l0.grad.numpy(), dEdW_hh).all())
print('RNN layer gradient check dEdH: ', np.isclose(rnn_out.grad.numpy(), H_grad[:,1:,:]).all())

[CUSTOM] dE/dWih:
[[ 2.1704273   0.80655396 -0.09895559 -2.1960852  -0.11273439  1.4121459 ]
 [ 4.2564716  -3.3818326  -2.3150346  -4.151105    0.9410493  -2.1321788 ]
 [-1.9509997  -1.5498486   1.4614779  -0.16966856  0.7720219  -0.7037502 ]
 [ 0.5028647  -1.0100005   0.544974    3.7390335  -1.0582447   0.28043902]
 [-3.0213704  -2.2303386   2.9346206   0.82597715 -2.3038645   0.96782213]]

[CUSTOM] dE/dWhh:
[[-1.8016164   0.69000906 -1.0969379  -0.02718778  1.4229884 ]
 [ 1.7044684  -0.9310839   0.7106786   2.1803424  -1.7890654 ]
 [-0.34787923 -0.4033661   0.47439235 -0.1280304   0.04756025]
 [ 1.0414937   0.22979382 -0.34883544 -1.0666538   0.7155496 ]
 [ 0.22342995 -1.3912995   0.52715844  1.5331726  -0.42903402]]

[CUSTOM] dE/dH:
[[[ 0.62494165 -0.55459172 -0.28033203  1.00214668 -0.27574827]
  [ 1.19031345  0.33171065 -0.99559586 -0.96420243 -0.70657765]
  [-0.7191968  -1.39909518 -0.62279123  0.9242416   1.23278213]]

 [[-0.18648617  1.57574717 -0.05813043  1.71692212  1.577673