# RNN

## RNN forward スクラッチ実装

In [26]:
import torch
import math
from torch import nn

In [16]:
class myRNN:
    def __init__(self, input_size, hidden_size):
        self.hidden_size = hidden_size
        init_range = 1.0 / math.sqrt(hidden_size)
        self.W_in = torch.empty(hidden_size, input_size).uniform_(-init_range, init_range) # .uniform_の「_」はtorchを上書きするという意味
        self.W_h = torch.empty(hidden_size, hidden_size).uniform_(-init_range, init_range)

        self.b_in = torch.empty(hidden_size).uniform_(-init_range, init_range)
        self.b_h = torch.empty(hidden_size).uniform_(-init_range, init_range)

    def forward(self, input, h_0 = None):
        # 埋め込みベクトルが入力 [batch_size, seq_len, input_size]を想定
        batch_size, seq_len, _ = input.size()

        if h_0 is None:
            h_0 = torch.zeros(1, batch_size, self.hidden_size)
        h = h_0
        outputs = []
        for i in range(seq_len):
            h = torch.tanh(input[:, i]@self.W_in.T + self.b_in + h.squeeze(0)@self.W_h.T + self.b_h) # [batch_size, hidden_size]
            outputs.append(h.unsqueeze(1)) # [batch_size, 1, hidden_size]
        output_seq = torch.cat(outputs, dim = 1)
        h_n = h.unsqueeze(0)

        return output_seq, h_n

In [18]:
x = torch.randn(5, 1, 8)
x2 = torch.randn(5,8)

In [19]:
x

tensor([[[-0.0218,  0.9922, -0.2399, -1.1270,  0.2945,  0.8130, -0.2159,
          -0.5460]],

        [[ 0.4150,  0.4166, -2.0738,  0.3873,  0.8671, -0.3171, -1.0317,
          -0.2121]],

        [[ 0.6203,  1.5436,  0.6842, -0.4510,  0.9774, -0.4703,  1.1409,
          -0.0494]],

        [[ 1.2131, -0.0858, -2.9257, -0.4792,  0.1307, -1.0303,  0.2659,
          -0.6366]],

        [[ 0.1213, -0.9121, -0.3683,  0.5056, -0.2147, -1.6706, -1.1620,
           1.8605]]])

In [20]:
x2

tensor([[ 1.9050,  1.1296, -1.2857,  0.4050, -0.2830,  0.4128,  0.5930,  0.9919],
        [-0.9388, -0.2263, -0.3448,  1.0541, -1.5921, -1.2072, -0.4160,  0.3614],
        [ 1.1878, -0.2560,  0.1291,  0.1409,  0.6517,  1.3268,  1.3561,  0.4439],
        [ 1.7305, -0.8819, -1.0847, -0.9024, -1.4085, -1.8254, -0.7930, -0.0320],
        [ 1.1730, -0.4159,  2.3209,  0.9216, -0.6664,  1.7733,  1.0118, -0.3913]])

In [21]:
input_size = 10
hidden_size = 3
batch_size = 8
seq_len = 5
input_tensor = torch.randn(batch_size, seq_len, input_size)
rnn = myRNN(input_size, hidden_size)

In [23]:
output_seq, h_n = rnn.forward(input_tensor)

In [24]:
output_seq.shape

torch.Size([8, 5, 3])

In [25]:
h_n.shape

torch.Size([1, 8, 3])

## RNNモデルの実装

In [36]:
class myRNNModel(nn.Module):
    def __init__(self, input_size, hidden_size):
        super().__init__()
        self.hidden_size = hidden_size
        init_range = 1.0 / math.sqrt(hidden_size)
        self.W_in = torch.empty(hidden_size, input_size).uniform_(-init_range, init_range) # .uniform_の「_」はtorchを上書きするという意味
        self.W_h = torch.empty(hidden_size, hidden_size).uniform_(-init_range, init_range)

        self.b_in = torch.empty(hidden_size).uniform_(-init_range, init_range)
        self.b_h = torch.empty(hidden_size).uniform_(-init_range, init_range)
        self.output_linear = nn.Linear(hidden_size, 2)

    def forward(self, input, h_0 = None):
        # 埋め込みベクトルが入力 [batch_size, seq_len, input_size]を想定
        batch_size, seq_len, _ = input.size()

        if h_0 is None:
            h_0 = torch.zeros(1, batch_size, self.hidden_size)
        h = h_0
        outputs = []
        for i in range(seq_len):
            h = torch.tanh(input[:, i]@self.W_in.T + self.b_in + h.squeeze(0)@self.W_h.T + self.b_h) # [batch_size, hidden_size]
            outputs.append(h.unsqueeze(1)) # [batch_size, 1, hidden_size]
        output_seq = torch.cat(outputs, dim = 1) # [batch_size, seq_len, hidden_size]
        h_n = h.unsqueeze(0) # [1, batch_size, hidden_size]
        output = self.output_linear(h_n.squeeze(0))

        return output

In [37]:
myrnnmodel = myRNNModel(input_size, hidden_size)

In [38]:
output = myrnnmodel.forward(input_tensor)

In [39]:
output.shape

torch.Size([8, 2])

In [40]:
class myRNNModel:
    def __init__(self, input_size, hidden_size, output_size):
        self.rnn = myRNN(input_size, hidden_size)
        self.fc = nn.Linear(hidden_size, output_size)

    def forward(self, x):
        output_seq, h_n = self.rnn.forward(x)
        # output_seq: [batch_size, seq_len, hidden_size]
        # h_n: [1, batch_size, hidden_size]
        out = self.fc(h_n.squeeze(0))
        return out

In [41]:
output_size = 2 # ２クラス問題
model = myRNNModel(input_size, hidden_size, output_size)
out = model.forward(input_tensor)
out.size()

torch.Size([8, 2])

## nn.RNN

In [44]:
class myRNNModel(nn.Module):
    def __init__(self, input_size, hidden_size, output_size):
        super().__init__()
        self.rnn = nn.RNN(input_size, hidden_size, batch_first = True)
        self.fc = nn.Linear(hidden_size, output_size)

    def forward(self, x):
        output_seq, h_n = self.rnn(x)
        # output_seq: [batch_size, seq_len, hidden_size]
        # h_n: [1, batch_size, hidden_size]
        out = self.fc(h_n.squeeze(0)) # nn.Linearは入力物の一番最後の次元を見てる
        return out

In [45]:
output_size = 2 # ２クラス問題
model = myRNNModel(input_size, hidden_size, output_size)
out = model(input_tensor)
out.size()

torch.Size([8, 2])

In [48]:
for name, param in model.named_parameters():
    print(f'{name}: {param.shape}')

rnn.weight_ih_l0: torch.Size([3, 10])
rnn.weight_hh_l0: torch.Size([3, 3])
rnn.bias_ih_l0: torch.Size([3])
rnn.bias_hh_l0: torch.Size([3])
fc.weight: torch.Size([2, 3])
fc.bias: torch.Size([2])


## RNN Backpropagation

In [56]:
class myRNN:
    def __init__(self, input_size, hidden_size):
        self.hidden_size = hidden_size

        init_range = 1.0/math.sqrt(hidden_size)
        self.W_in = torch.empty(hidden_size, input_size).uniform_(-init_range, init_range).requires_grad_(True)
        self.W_h = torch.empty(hidden_size, hidden_size).uniform_(-init_range, init_range).requires_grad_(True)

        self.b_in = torch.empty(hidden_size).uniform_(-init_range, init_range).requires_grad_(True)
        self.b_h = torch.empty(hidden_size).uniform_(-init_range, init_range).requires_grad_(True)
        
    def forward(self, input, h_0=None):
        # input: [batch_size, seq_len, input_size]
        self.input = input
        self.h_0 = h_0
        batch_size, self.seq_len, _ = input.size()

        if h_0 is None:
            self.h_0 = torch.zeros(1, batch_size, self.hidden_size)#.to(device)

        h = self.h_0 # [1, batch_size, hidden_size]
        outputs = []
        for i in range(self.seq_len):
            # [batch_size, hidden_size]
            h = torch.tanh(input[:, i]@self.W_in.T + self.b_in + h.squeeze(0)@self.W_h.T + self.b_h)
            outputs.append(h.unsqueeze(1))# [batch_size, hidden_size] -> # [batch_size, 1, hidden_size]
        self.output_seq = torch.cat(outputs, dim=1)
        h_n = h.unsqueeze(0) # [batch_size, hidden_size] -> [1, batch_size, hidden_size]

        return self.output_seq, h_n

    def backward(self, out_grad):

        self.grad_W_in_list = []
        self.grad_W_h_list = []
        self.grad_b_in_list = []
        self.grad_b_h_list = []

        self.grad_h_list = []
        self.grad_h_tanh_list = []

        # 勾配の初期化
        grad_W_in = torch.zeros_like(self.W_in)
        grad_W_h = torch.zeros_like(self.W_h)
        grad_b_in = torch.zeros_like(self.b_in)
        grad_b_h = torch.zeros_like(self.b_h)
        grad_h = torch.zeros_like(self.h_0)

        # 各ステップの隠れ状態の勾配を初期化
        grad_output_seq = torch.zeros_like(self.output_seq) # [batch_size, seq_len, hidden_size] 
        grad_output_seq[:, -1, :] = out_grad

        # 各ステップにおける勾配を計算
        for i in reversed(range(self.seq_len)):

            # tanhの微分 (dh*(1-dh^2))
            grad_h_tanh = grad_output_seq[:, i] * (1 - self.output_seq[:, i].pow(2))
            grad_W_in += torch.sum(grad_h_tanh.unsqueeze(2)*self.input[:, i].unsqueeze(1), dim=0)
            grad_b_in += torch.sum(grad_h_tanh, dim=0)
            grad_b_h += torch.sum(grad_h_tanh, dim=0)
            grad_h = grad_h_tanh @ self.W_h


            if i != 0:
                grad_output_seq[:, i-1] = grad_h
                # self.output_seqを使って計算
                grad_W_h += torch.sum(grad_h_tanh.unsqueeze(2)*self.output_seq[:, i-1].unsqueeze(1), dim=0)
            else:
                # h_0を使って計算
                grad_W_h += torch.sum(grad_h_tanh.unsqueeze(2)*self.h_0.squeeze(0).unsqueeze(1), dim=0)           
            # 勾配を保持
            self.grad_W_in_list.append(grad_W_in.clone())
            self.grad_W_h_list.append(grad_W_h.clone())
            self.grad_b_in_list.append(grad_b_in.clone())
            self.grad_b_h_list.append(grad_b_h.clone())
            self.grad_h_list.append(grad_h.clone())
            self.grad_h_tanh_list.append(grad_h_tanh.clone())

        # self.W_in -= grad_W_in

class myRNNModel:
    def __init__(self, input_size, hidden_size, output_size):
        self.rnn = myRNN(input_size, hidden_size)
        self.fc = nn.Linear(hidden_size, output_size)

    def forward(self, x):
        output_seq, self.h_n = self.rnn.forward(x)
        # output_seq: [batch_size, seq_len, hidden_size]
        # h_n: [1, batch_size, hidden_size]
        out = self.fc(self.h_n.squeeze(0))
        return out

In [54]:
# backwardのテスト
input_size = 3
hidden_size = 2
batch_size = 3
seq_len = 5

# 入力データと正解ラベルの定義
input_tensor = torch.randn(batch_size, seq_len, input_size)
target = torch.tensor([0]*batch_size)

# モデルのインスタンス作成
model = myRNNModel(input_size, hidden_size, output_size)
# forward
output = model.forward(input_tensor)

# 損失関数の定義
criterion = nn.CrossEntropyLoss()
# 損失計算
loss = criterion(output, target)
# 出力層の勾配計算
out_grad = torch.autograd.grad(loss, model.h_n, retain_graph=True)[0]
# スクラッチのbackward
model.rnn.backward(out_grad)
# autograd
loss.backward()

print("Autograd Gradient - W_in:", model.rnn.W_in.grad)
print("Manual Gradient - W_in:", model.rnn.grad_W_in_list[-1])
print("======================")
print("Autograd Gradient - W_h:", model.rnn.W_h.grad)
print("Manual Gradient - W_h:", model.rnn.grad_W_h_list[-1])
print("======================")
print("Autograd Gradient - b_in:", model.rnn.b_in.grad)
print("Manual Gradient - b_in:", model.rnn.grad_b_in_list[-1])
print("======================")
print("Autograd Gradient - b_h:", model.rnn.b_h.grad)
print("Manual Gradient - b_h:", model.rnn.grad_b_h_list[-1])

Autograd Gradient - W_in: tensor([[ 0.1289,  0.0223, -0.0584],
        [ 0.2189,  0.0159, -0.0840]])
Manual Gradient - W_in: tensor([[ 0.1289,  0.0223, -0.0584],
        [ 0.2189,  0.0159, -0.0840]], grad_fn=<CloneBackward0>)
Autograd Gradient - W_h: tensor([[ 0.0051, -0.0151],
        [ 0.0381,  0.0359]])
Manual Gradient - W_h: tensor([[ 0.0051, -0.0151],
        [ 0.0381,  0.0359]], grad_fn=<CloneBackward0>)
Autograd Gradient - b_in: tensor([-0.0425, -0.2699])
Manual Gradient - b_in: tensor([-0.0425, -0.2699], grad_fn=<CloneBackward0>)
Autograd Gradient - b_h: tensor([-0.0425, -0.2699])
Manual Gradient - b_h: tensor([-0.0425, -0.2699], grad_fn=<CloneBackward0>)


In [55]:
model.h_n

tensor([[[-0.5562, -0.2143],
         [-0.5922,  0.3486],
         [-0.4653,  0.4792]]], grad_fn=<UnsqueezeBackward0>)