In [1]:
import numpy as np
import torch
import torch.nn.functional as F
from math import floor
import math
import torch.nn as nn

## 单向 RNN

In [4]:
def rnn_forward(input, weight_x, weight_h, bias_x, bias_h, prev_h=None):
    """
    param:
        input: [batch_size, seq_len, input_size]
        weight_x: [input_size, hidden_size]
        weight_h: [hidden_size, hidden_size]
        prev_h: [batch_size, hidden_size]
    """    
    batch_size, seq_len, input_size = input.shape
    input_size, hidden_size = weight_x.shape

    out_put = np.zeros((batch_size, seq_len, hidden_size))

    if prev_h is None:
        prev_h = np.zeros((batch_size, hidden_size))
    
    if bias_x is None:
        bias_x = np.zeros((hidden_size))
    if bias_h is None:
        bias_h = np.zeros((hidden_size))

    for i in range(seq_len):
        cur_x = input[:, i, :] # 当前时刻的输入
        cur_h = np.tanh(cur_x @ weight_x + prev_h @ weight_h + bias_x + bias_h)
        out_put[:,i, :] = cur_h
        prev_h = cur_h

    
    return out_put, prev_h.reshape(1, batch_size, hidden_size)

In [3]:
# test
batch_size, seq_len, input_size = 8, 7, 3
hidden_size = 4

x = np.random.randn(batch_size, seq_len, input_size,)
x_ = torch.from_numpy(x).to(torch.float32)
rnn = nn.RNN(input_size, hidden_size, batch_first=True)
torch_output, torch_hn = rnn(x_)


# 提取 rnn 中的参数出来使用
params = list(rnn.named_parameters())
w_x = params[0][1].detach().numpy().T
w_h = params[1][1].detach().numpy().T
b_x = params[2][1].detach().numpy()
b_h = params[3][1].detach().numpy()

my_output, my_hn = rnn_forward(x, w_x, w_h, b_x, b_h)

print("torch 输出", torch_output.shape, torch_hn.shape)
print("手写rnn 输出", my_output.shape, my_hn.shape)
print(np.allclose(my_output, torch_output.detach().numpy()), np.allclose(my_hn, torch_hn.detach().numpy()))

torch 输出 torch.Size([8, 7, 4]) torch.Size([1, 8, 4])
手写rnn 输出 (8, 7, 4) (1, 8, 4)
True True


In [None]:
def fnn(X):
    W = np.random.normal(0, 1, )

In [None]:
import numpy as np


class RNNCell:
    def __init__(self, input_size, hidden_size, bias, nonlinearity='tanh'):
        """
        parameters:
            input_size: The number of expected features in the input x
            hidden_size: The number of features in the hidden state h
            bias: If False, then the layer does not use bias weights b. Default: True
            nonlinearity: The non-linearity to use. Can be either 'tanh' or 'relu'. Default: 'tanh'
        """
        # init_param 
        self.W_hh = np.random.normal(loc=0, scale=np.sqrt(1/hidden_size))
        self.W_hx = np.random.normal(loc=0, scale=np.sqrt(2/(input_size+hidden_size)))
        self.bias = bias
        if self.bias:
            self.b = np.zeros((hidden_size))
        self.nonlinearity = nonlinearity


    def forward(self, x, prev_h):
        """input_ :[batch_size, input_size]
        """
        a = prev_h @ self.W_hh + x @ self.W_hx
        if self.bias:
            a += self.b
        if self.nonlinearity == "tanh":
            a = np.tanh(a)
        elif self.nonlinearity == "relu":
            a = self.relu(a)
        return a


    def __call__(self, *arg):
        return self.forward(*arg)     

    def backward(self, pre_grad):
        pass

    def relu(self, x):
        mask = x < 0
        x[mask] = 0
        return x
        

In [None]:
class RNN:
    def __init__(self, input_size, hidden_size, bias, nonlinearity):
        self.input_size = input_size
        self.hidden_size = hidden_size

        self.cell = RNNCell(self.input_size, self.hidden_size, bias, nonlinearity)
        
    def forward(self, input_, prev_h):
        """
        parameters:
            input_ :[batch_size, seq_len, input_size]
            prev_h: [batch_size, seq_len, hidden_size]
        return :
            outputs: [batch_size, seq_len, hidden_size]
            cur_h: [1, batch_size, hidden_size]
        """
        batch_size, seq_len, input_size = input_.shape

        outputs = np.zeros((batch_size, seq_len, self.hidden_size))

        for i in range(0, seq_len):
            cur_x = input_[:, i, :]
            cur_h = self.cell(cur_x, prev_h)
            
            outputs[:, i, :] = cur_h
            prev_h = cur_h

        return outputs, cur_h.reshape(1, batch_size, self.hidden_size)  

    def __call__(self, *arg):
        return self.forward(*arg)

    def backward(self, pre_grad):
        pass      