# RNNスクラッチ
## 【問題1】SimpleRNNのフォワードプロパゲーション実装

In [1]:
import numpy as np
from copy import deepcopy

class FC:
    """
    ノード数n_nodes1からn_nodes2への全結合層
    Parameters
    ----------
    n_nodes1 : int
      前の層のノード数
    n_nodes2 : int
      後の層のノード数
    initializer : 初期化方法のインスタンス
    optimizer : 最適化手法のインスタンス
    """
    def __init__(self, n_nodes1, n_nodes2, initializer, optimizer):
        self.optimizer = optimizer
        # 初期化
        # initializerのメソッドを使い、self.Wとself.Bを初期化する
        self.W = initializer.W(n_nodes1, n_nodes2)
        self.B = initializer.B(n_nodes2)
        self.Z = 0
        self.dA = 0

    def forward(self, X):
        """
        フォワード
        Parameters
        ----------
        X : 次の形のndarray, shape (batch_size, n_nodes1)
            入力
        Returns
        ----------
        A : 次の形のndarray, shape (batch_size, n_nodes2)
            出力
        """ 
        self.Z = deepcopy(X)
        A = np.dot(X, self.W) + self.B
        return A
    
    def backward(self, dA):
        """
        バックワード
        Parameters
        ----------
        dA : 次の形のndarray, shape (batch_size, n_nodes2)
            後ろから流れてきた勾配
        Returns
        ----------
        dZ : 次の形のndarray, shape (batch_size, n_nodes1)
            前に流す勾配
        """
        self.dA = deepcopy(dA)
        dW = np.dot(self.Z.T, dA)
        dZ = np.dot(dA, self.W.T) 
        # 更新
        self = self.optimizer.update(self)
        
        return dZ

In [2]:
class SimpleInitializer:
    """
    ガウス分布によるシンプルな初期化
    Parameters
    ----------
    sigma : float
      ガウス分布の標準偏差
    """
    
    def __init__(self, sigma=0.01):
        self.sigma = sigma
        
    def W(self, n_nodes1, n_nodes2):
        """
        重みの初期化
        Parameters
        ----------
        n_nodes1 : int
          前の層のノード数
        n_nodes2 : int
          後の層のノード数

        Returns
        ----------
        W : 次の形のndarray, shape(n_nodes1, n_nodes2)
        """
        W = self.sigma * np.random.randn(n_nodes1, n_nodes2)
        return W.astype("f")
    
    def B(self, n_nodes2):
        """
        バイアスの初期化
        Parameters
        ----------
        n_nodes2 : int
          後の層のノード数

        Returns
        ----------
        B : 次の形のndarray, shape(1, nodes2)
        """
        B = self.sigma * np.random.randn(1, n_nodes2)
        return B.astype("f")
    
class XavierInitializer:
    """
    Xavierによる初期化
    Sigmoid」かTanhに向いている
    """
    
    def __init__(self):
        self.sigma = None
        
    def W(self, n_nodes1, n_nodes2):
        """
        重みの初期化
        Parameters
        ----------
        n_nodes1 : int
          前の層のノード数
        n_nodes2 : int
          後の層のノード数

        Returns
        ----------
        W : 次の形のndarray, shape(n_nodes1, n_nodes2)
        """
        self.sigma = 1 / np.sqrt(n_nodes1)
        W = self.sigma * np.random.randn(n_nodes1, n_nodes2)
        return W.astype("f")
    
    def B(self, n_nodes2):
        """
        バイアスの初期化
        Parameters
        ----------
        n_nodes2 : int
          後の層のノード数

        Returns
        ----------
        B : 次の形のndarray, shape(1, nodes2)
        """
        B = self.sigma * np.random.randn(1, n_nodes2)
        return B.astype("f")
    
class HeInitializer:
    """
    Heによる初期化
    ReLUと相性がいい
    """
    
    def __init__(self):
        self.sigma = 0
        
    def W(self, n_nodes1, n_nodes2):
        """
        重みの初期化
        Parameters
        ----------
        n_nodes1 : int
          前の層のノード数
        n_nodes2 : int
          後の層のノード数

        Returns
        ----------
        W : 次の形のndarray, shape(n_nodes1, n_nodes2)
        """
        self.sigma = np.sqrt(2 / n_nodes1)
        W = (self.sigma * np.random.randn(n_nodes1, n_nodes2))
        return W.astype("f")
    
    def B(self, n_nodes2):
        """
        バイアスの初期化
        Parameters
        ----------
        n_nodes2 : int
          後の層のノード数

        Returns
        ----------
        B : 次の形のndarray, shape(1, nodes2)
        """
        B = self.sigma * np.random.randn(1, n_nodes2)
        return B.astype("f")

In [136]:
class SGDrnn:
    """
    確率的勾配降下法
    Parameters
    ----------
    lr : 学習率
    """
    def __init__(self, lr):
        self.lr = lr
        
    def update(self, layer):
        """
        ある層の重みやバイアスの更新
        Parameters
        ----------
        layer : 更新前の層のインスタンス

        Returns
        ----------
        layer : 更新後の層のインスタンス
        """
        layer.WX[...] = layer.WX - self.lr * np.dot(layer.X.T, layer.dA) / len(layer.dA)
        layer.B[...] = layer.B - self.lr * np.mean(layer.dA)
        layer.Wh[...] = layer.Wh[...] - self.lr * np.dot(layer.ht.T, layer.dA) / len(layer.dA)
        return layer
    
class SGD:
    """
    確率的勾配降下法
    Parameters
    ----------
    lr : 学習率
    """
    def __init__(self, lr):
        self.lr = lr
        
    def update(self, layer):
        """
        ある層の重みやバイアスの更新
        Parameters
        ----------
        layer : 更新前の層のインスタンス

        Returns
        ----------
        layer : 更新後の層のインスタンス
        """
        layer.W[...] = layer.W - self.lr * np.dot(layer.Z.T, layer.dA) / len(layer.dA)
        layer.B[...] = layer.B - self.lr * np.mean(layer.dA, axis=0)
        return layer


class AdaGrad:
    """
    学習率を変化を減少させていく勾配降下法
    Parameters
    ----------
    lr : 学習率
    """
    def __init__(self, lr):
        self.lr = lr
        self.HW = 0
        self.HB = 0
        
    def update(self, layer):
        """
        ある層の重みやバイアスの更新
        Parameters
        ----------
        layer : 更新前の層のインスタンス

        Returns
        ----------
        layer : 更新後の層のインスタンス
        """
        
        dW = np.dot(layer.Z.T, layer.dA) / len(layer.dA)
        dB = np.mean(layer.dA, axis=0)
        self.HW += dW**2
        self.HB +=  dB**2
        layer.W[...] = layer.W - self.lr / np.sqrt(self.HW +1e-7) * dW #0で割るとまずいので +le-7
        layer.B[...] = layer.B - self.lr / np.sqrt(self.HB + 1e-7)  * dB
        return layer
    
class Momentum:
    
    """
    momentumSGD
    Parameters
    ----------
    lr : 学習率
    momentum : 学習係数
    """
    def __init__(self, lr=0.01, momentum=0.9):
        self.lr = lr
        self.momentum = momentum
        self.vW = 0
        self.vB = 0
        
    def update(self, layer):
        """
        ある層の重みやバイアスの更新
        Parameters
        ----------
        layer : 更新前の層のインスタンス

        Returns
        ----------
        layer : 更新後の層のインスタンス
        """

        dW = np.dot(layer.Z.T, layer.dA) / len(layer.dA)
        dB = np.mean(layer.dA, axis=0)
        
        self.vW = self.momentum * self.vW - self.lr * dW
        self.vB =  self.momentum * self.vB - self.lr * dB
        
        layer.W[...] = layer.W + self.vW
        layer.B[...] = layer.B + self.vB
        
        return layer
    
class Adam:

    """
    Adam
    RMSprop に Momentum 法を組み合わせたような形
    Parameters
    ----------
    lr : 学習率
    momentum : 学習係数
    beta1
    beta2
    """

    def __init__(self, lr=0.001, beta1=0.9, beta2=0.999):
        self.lr = lr
        self.beta1 = beta1
        self.beta2 = beta2
        self.iter = 0
        self.mW = 0
        self.vW = 0
        self.mB = 0
        self.vB = 0
        
    def update(self, layer):
        
        self.iter += 1
        dW = np.dot(layer.Z.T, layer.dA) / len(layer.dA)
        dB = np.mean(layer.dA, axis=0)
        
        lr_t  = self.lr * np.sqrt(1.0 - self.beta2**self.iter) / (1.0 - self.beta1**self.iter) 
        
        self.mW += (1 - self.beta1) * (dW - self.mW)
        self.vW += (1 - self.beta2) * (dW**2 - self.vW)
        self.mB += (1 - self.beta1) * (dB - self.mB)
        self.vB += (1 - self.beta2) * (dB**2 - self.vB)
        
        layer.W -= lr_t * self.mW / (np.sqrt(self.vW) + 1e-7)
        layer.B -= lr_t * self.mB / (np.sqrt(self.vB) + 1e-7)

In [4]:
class sigmoid:
    """
    シグモイド関数
    """
    
    def __init__(self):
        self.Z = 0
    
    def forward(self, A):
        """
        フォワード
        Parameters
        ----------
        A : 次の形のndarray, shape (batch_size, n_nodes)
            入力
        Returns
        ----------
        Z : 次の形のndarray, shape (batch_size, n_nodes)
            出力
        """ 
        Z = 1 / (1 + np.exp(-A))
        self.Z = Z
        return Z
    
    def backward(self, dZ):
        """
        バックワード
        Parameters
        ----------
        dZ : 次の形のndarray, shape (batch_size, n_nodes)
            後ろから流れてきた勾配
        Returns
        ----------
        dA : 次の形のndarray, shape (batch_size, n_nodes)
            前に流す勾配
        """
        dA = dZ  *  (1 - self.Z) * self.Z 
        return dA
    
class Tanh:
    """
    ハイパボリックタンジェント関数
    """
    
    def __init__(self):
        self.Z = 0
    
    def forward(self, A):
        """
        フォワード
        Parameters
        ----------
        A : 次の形のndarray, shape (batch_size, n_nodes)
            入力
        Returns
        ----------
        Z : 次の形のndarray, shape (batch_size, n_nodes)
            出力
        """ 
        Z = np.tanh(A)
        self.Z = Z
        return Z
    
    def backward(self, dZ):
        """
        バックワード
        Parameters
        ----------
        dZ : 次の形のndarray, shape (batch_size, n_nodes)
            後ろから流れてきた勾配
        Returns
        ----------
        dA : 次の形のndarray, shape (batch_size, n_nodes)
            前に流す勾配
        """
        dA = dZ  *  (1 - self.Z**2)
        return dA

class Softmax:
    """
    ソフトマックス関数
    """
    
    def __init__(self):
        self.Z = 0
    
    def forward(self, A):
        """
        フォワード
        Parameters
        ----------
        A : 次の形のndarray, shape (batch_size, n_nodes)
            入力
        Returns
        ----------
        Z : 次の形のndarray, shape (batch_size, n_nodes)
            出力
        """ 
        
        c = np.max(A)
        A = A - c
        ex = np.exp(A)
        Z = ex / (np.sum(ex, axis=1))[:, np.newaxis]
        self.Z = Z
        return Z
    
    def backward(self, y):
        """
        バックワード
        Parameters
        ----------
        y : 次の形のndarray, shape (batch_size, n_class)
            正解ラベル
        Returns
        ----------
        dA : 次の形のndarray, shape (batch_size, n_class)
            前に流す勾配
        """
        
        dA = self.Z - y
        
        return dA
    
class ReLU:
    """
    ReLU関数
    """
    
    def __init__(self):
        self.Z = None
    
    def forward(self, A):
        """
        フォワード
        Parameters
        ----------
        A : 次の形のndarray, shape (batch_size, n_nodes)
            入力
        Returns
        ----------
        Z : 次の形のndarray, shape (batch_size, n_nodes)
            出力
        """ 
        Z = np.maximum(0, A)
        self.Z = deepcopy(Z)
        return Z
    
    def backward(self, dZ):
        """
        バックワード
        Parameters
        ----------
        dZ : 次の形のndarray, shape (batch_size, n_nodes)
            後ろから流れてきた勾配
        Returns
        ----------
        dA : 次の形のndarray, shape (batch_size, n_nodes)
            前に流す勾配
        """
        
        dA = dZ  *  np.where(self.Z != 0, 1, self.Z)
        
        return dA


In [194]:
class SimpleRNN:
    """
    RNN 各シーケンスで出力するパターン
    Parameters
    ----------
    initializer : 初期化方法のインスタンス
    optimizer : 最適化手法のインスタンス
    """
    def __init__(self, n_features, n_nodes, initializer, optimizer, activation):
        self.optimizer = optimizer
        self.activation = activation
        # 初期化
        # initializerのメソッドを使い、self.Wとself.Bを初期化する
        self.WX = initializer.W(n_features, n_nodes)
        self.Wh = initializer.W(n_nodes, n_nodes)
        self.B = initializer.B(1)
        self.n_nodes = n_nodes
        self.A = None
        self.ht = None
        self.Z = None
        self.dA = None
        self.Xar = None
        self.X = None
        self.n_features = n_features

    def forward(self, X):
        """
        フォワード
        Parameters
        ----------
        X : 次の形のndarray, shape (batch_size, n_sequences, n_features)
            入力
        Returns
        ----------
        A : 次の形のndarray, shape (batch_size, n_sequencesm n_nodes)
            出力
        """ 
        #self.Z = deepcopy(X)
        self.Xar = X
        m, s, n = X.shape
        ht = np.zeros((m, self.n_nodes))
        A = np.empty((0, m, self.n_nodes))
        for i in range(s):
            ht = np.dot(X[:, i, :], self.WX) + np.dot(ht, self.Wh) + self.B
            ht = self.activation.forward(ht)
            A = np.vstack((A, ht[np.newaxis,:])) #shape (シーケンス,　バッチ、n_node)
            
        A = A.transpose(1, 0, 2)
        self.A = A
        return A #shape (,バッチ、シーケンス、n_node)
     
    def backward(self, dA ):
        """
        バックワード
        Parameters
        ----------
        dA : 次の形のndarray, shape (batch_size, n_sequences, n_nodes)
            後ろから流れてきた勾配
        Returns
        ----------
        dX : 次の形のndarray, shape (batch_size, n_sequences, n_features)
            前に流す勾配
        """
        #self.dA = deepcopy(dA)
        m, s, n_nodes = self.A.shape
        htd = 0
        dX = np.zeros((s, m, self.n_features))
        for i in reversed(range(s)):
            da = dA[:, i,:]
            da = da + htd
            da = da  *  (1 - self.A[:, i, :]**2)#shape (m,n_nodes)
            self.dA = da
            self.X = self.Xar[:, i, :]
            self.ht = self.A[:, i, :]
            self = self.optimizer.update(self)
            htd = np.dot(da, self.Wh.T) #shape(batch, n_nodes)
            dX[i, :, :] = np.dot(da, self.WX.T) #dot後のshape (batch, n_features)
            
        dX = dX.transpose(1,0,2)
        return dX

In [195]:
class SimpleRNN2:
    """
    RNN
    出力が最終層だけ
    Parameters
    ----------
    initializer : 初期化方法のインスタンス
    optimizer : 最適化手法のインスタンス
    """
    def __init__(self, n_features, n_nodes, initializer, optimizer, activation):
        self.optimizer = optimizer
        self.activation = activation
        # 初期化
        # initializerのメソッドを使い、self.Wとself.Bを初期化する
        self.WX = initializer.W(n_features, n_nodes)
        self.Wh = initializer.W(n_nodes, n_nodes)
        self.B = initializer.B(1)
        self.n_nodes = n_nodes
        self.A = None
        self.ht = None
        self.Z = None
        self.dA = None
        self.X = None
        self.X_ar = None
        self.n_features = n_features

    def forward(self, X):
        """
        フォワード
        Parameters
        ----------
        X : 次の形のndarray, shape (batch_size, n_sequences, n_features)
            入力
        Returns
        ----------
        ht : 次の形のndarray, shape (batch_size,n_nodes)
            出力
        """ 
        self.X_ar = X
        m, s, n = X.shape
        ht = np.zeros((m, self.n_nodes))
        A = np.empty((0, m, self.n_nodes))
        for i in range(s):
            ht = np.dot(X[:, i, :].reshape(m, n), self.WX) + np.dot(ht, self.Wh) + self.B
            ht = self.activation.forward(ht)
            A = np.vstack((A, ht[np.newaxis,:])) #shape (シーケンス,　バッチ、n_node)
            
        A = A.transpose(1, 0, 2)
        self.A = A
        return ht
     
    def backward(self, dA ):
        """
        バックワード
        Parameters
        ----------
        dA : 次の形のndarray, shape (batch_size,n_nodes)
            後ろから流れてきた勾配
        Returns
        ----------
        dX : 次の形のndarray, shape (batch_size, n_sequences, n_features)
            前に流す勾配
        """
        #self.dA = deepcopy(dA)
        m, s, n_nodes = self.A.shape
        htd = 0
        dX = np.zeros((s, m, self.n_features))
        for i in reversed(range(s)):
            #da = da + htd
            dA = dA * (1 - self.A[:, i, :]**2)#shape (m,n_nodes)
            self.dA = dA
            self.X = self.X_ar[:, i, :]
            self.ht = self.A[:, i, :]
            self = self.optimizer.update(self)
            dA = np.dot(dA, self.Wh.T) #shape(batch, n_nodes)
            dX[i, :, :] = np.dot(dA, self.WX.T) #dot後のshape (batch, n_features)
            
        dX = dX.transpose(1,0,2)
        return dX

## 【問題2】小さな配列でのフォワードプロパゲーションの実験

In [17]:
x = np.array([[[1, 2], [2, 3], [3, 4]]])/100
w_x = np.array([[1, 3, 5, 7], [3, 5, 7, 8]])/100
w_h = np.array([[1, 3, 5, 7], [2, 4, 6, 8], [3, 5, 7, 8], [4, 6, 8, 10]])/100
batch_size = x.shape[0] # 1
n_sequences = x.shape[1] # 3
n_features = x.shape[2] # 2
n_nodes = w_x.shape[1] # 4
h = np.zeros((batch_size, n_nodes))
b = np.array([1])

In [18]:
x.shape

(1, 3, 2)

In [191]:
rnn = SimpleRNN2(2, 4, SimpleInitializer(), SGDrnn(lr=0.01), Tanh())

In [192]:
h = rnn.forward(x)

In [193]:
h

array([[0.79494228, 0.81839002, 0.83939649, 0.85584174]])