# Sprint22 深層学習スクラッチ リカレントニューラルネットワーク

## 【問題１】SimpleRNNのフォワードプロパゲーション実装
SimpleRNNのクラスSimpleRNNを作成してください。基本構造はFCクラスと同じになります。


フォワードプロパゲーションの数式は以下のようになります。ndarrayのshapeがどうなるかを併記しています。


バッチサイズをbatch_size、入力の特徴量数をn_features、RNNのノード数をn_nodesとして表記します。活性化関数はtanhとして進めますが、これまでのニューラルネットワーク同様にReLUなどに置き換えられます。

$$
a_t = x_{t}\cdot W_{x} + h_{t-1}\cdot W_{h} + B\\
$$

$$h_t = tanh(a_t)$$

$a_t$ : 時刻tの活性化関数を通す前の状態 (batch_size, n_nodes)  
$h_t$ : 時刻tの状態・出力 (batch_size, n_nodes)  
$x_{t}$ : 時刻tの入力 (batch_size, n_features)  
$W_{x}$ : 入力に対する重み (n_features, n_nodes)  
$h_{t-1}$ : 時刻t-1の状態（前の時刻から伝わる順伝播） (batch_size, n_nodes)  
$W_{h}$ : 状態に対する重み。 (n_nodes, n_nodes)  
$B$ : バイアス項 (n_nodes,)  

初期状態 h_{0} は全て0とすることが多いですが、任意の値を与えることも可能です。


上記の処理を系列数n_sequences回繰り返すことになります。RNN全体への入力 x は(batch_size, n_sequences, n_features)のような配列で渡されることになり、そこから各時刻の配列を取り出していきます。


分類問題であれば、それぞれの時刻のhに対して全結合層とソフトマックス関数（またはシグモイド関数）を使用します。タスクによっては最後の時刻のhだけを使用することもあります。

## 【問題２】小さな配列でのフォワードプロパゲーションの実験
小さな配列でフォワードプロパゲーションを考えてみます。


入力x、初期状態h、重みw_xとw_h、バイアスbを次のようにします。


ここで配列xの軸はバッチサイズ、系列数、特徴量数の順番です。

In [1]:
import numpy as np

### 活性化関数クラス

In [2]:
class Tanh:
    """tanh関数（活性化関数）"""
    def forward(self, A):
        Z = np.tanh(A)
        return Z
    
    def backward(self, dZ, A):
        dA = dZ * (1 - np.tanh(A)**2)
        return dA

class Softmax:
    """softmax関数
    backwardは交差エントロピー誤差関数と合わせて計算する
    """
    def forward(self, A):
        Z = np.exp(A) / np.sum(np.exp(A), axis=1, keepdims=True)
        # backward用にZを保持
        self.Z = Z
        return Z
    
    def backward(self, y):
        dA = self.Z - y
        return dA


### Initializer


In [3]:
class HeInitializer:
    """Heの初期値"""
    def W(self, n_nodes1, n_nodes2):
        """重みの初期化
        :parameters
            n_nodes1 (int): 前の層のノード数
            n_nodes2 (int): 後の層のノード数
        :returns
            W (2d-array, (n_nodes1, n_nodes2)): ランダムに発生させた重み
        """
        return np.sqrt(2.0 / n_nodes1) * np.random.randn(n_nodes1, n_nodes2)
    
    def B(self, n_nodes1, n_nodes2):
        """バイアスの初期化
        :parameters
            n_nodes2 (int): 後の層のノード数
        :returns
            B (1d-array, (n_nodes2, )): ランダムに発生させたバイアス
        """
        return np.sqrt(2.0 / n_nodes1) * np.random.randn(1, n_nodes2)

### Optimizer

In [4]:
class AdaGrad:
    """Adative Gradient法
    :parameters
        lr (float): 学習率
    """
    def __init__(self, lr=0.01):
        self.lr = lr
        self.H_Wx = 0
        self.H_Wh = 0
        self.H_B = 0
    
    def update(self, layer):
        """ある層の重みやバイアスを更新する
        :parameters
            layer (instance): 更新前の層（FC）のインスタンス
        """
        # 行列Hを求める
        self.H_Wx += layer.dWx * layer.dWx
        self.H_Wh += layer.dWh * layer.dWh
        self.H_B += layer.dB * layer.dB
        # 重み・バイアスの更新
        layer.Wx -= self.lr * layer.dWx / (np.sqrt(self.H_Wx) + 1e-8) / layer.N
        layer.Wh -= self.lr * layer.dWh / (np.sqrt(self.H_Wh) + 1e-8) / layer.N
        layer.B -= self.lr * layer.dB / (np.sqrt(self.H_B) + 1e-8) / layer.N
        return layer

### SimpleRNN

In [5]:
x = np.array([[[1, 2], [2, 3], [3, 4]]])/100 # (batch_size, n_sequences, n_features)
w_x = np.array([[1, 3, 5, 7], [3, 5, 7, 8]])/100 # (n_features, n_nodes)
w_h = np.array([[1, 3, 5, 7], [2, 4, 6, 8], [3, 5, 7, 8], [4, 6, 8, 10]])/100 # (n_nodes, n_nodes)
batch_size = x.shape[0] # 1
n_sequences = x.shape[1] # 3
n_features = x.shape[2] # 2
n_nodes = w_x.shape[1] # 4
h0 = np.zeros((batch_size, n_nodes)) # (batch_size, n_nodes)
b = np.array([1, 1, 1, 1]) # (n_nodes,)
h_true = np.array([[0.79494228, 0.81839002, 0.83939649, 0.85584174]]) # (batch_size, n_nodes)

In [6]:
class SimpleRNN:
    """ノード数n_nodes1からn_nodes2への全結合層
    :parameters
        n_node1 (int): 前層のノード数
        n_node2 (int): 後層のノード数
        initializer (class instance): 初期化方法のインスタンス
        optimizer (class instance): 最適化手法のインスタンス
    """
    def __init__(self, n_features, n_nodes, initializer=None, optimizer=AdaGrad(), activation=Tanh()):
        self.optimizer = optimizer
        self.activation = activation
        self.n_features = n_features
        self.n_nodes = n_nodes
        # 重みとバイアスの初期化
        if initializer is None:
            self.Wx = w_x  # shape(n_features, n_nodes)
            self.Wh = w_h  # shape(n_nodes, n_nodes)
            self.B = b.astype(np.float)
        else:
            self.Wx = initializer.W(n_features, n_nodes)
            self.Wh = initializer.W(n_nodes, n_nodes)
            self.B = initializer.B(n_features, n_nodes)
        
    def forward(self, X, pre_H):
        """順伝播
        :parameters
            X (3d-array, (batch_size, n_sequences, n_features)): 入力
            pre_H (2d-array, (batch_size, n_nodes)): 前時点のH （活性化前の線形結合）
        :returns
            H (2d-array, (batch_size, n_features)): 出力
        """
        
        # backward用に保持する変数（最初に１度だけ）
        if not hasattr(self, "X"):
            self.X = X
            self.N = X.shape[0]  # Backward内のoptimizer.updateで、平均を計算するとき使用する
            self.n_sequences = X.shape[1]
            self.A = np.zeros((self.n_sequences, self.n_nodes))
            self.H = np.zeros((self.n_sequences, self.n_nodes))
        
        # 系列方向に再帰的にforwardしていく
        print("X.shape :", X.shape)
        if X.shape[1] > 0:
            # 線形結合
            A = np.matmul(X[:, 0, :], self.Wx) + np.matmul(pre_H, self.Wh) + self.B
            cur_H = self.activation.forward(A)
            
            # backward用に保存
            self.A[self.n_sequences - X.shape[1], :] = A
            self.H[self.n_sequences - X.shape[1], :] = cur_H
            
            # 再帰関数（Xからは今回の系列を除く）
            H = self.forward(X[:, 1:, :], cur_H)
        else:
            H = pre_H
        return H
    
    def backward(self, dH, s=1):
        """逆伝播
        :parameters
            dH (2d-array, (batch_size, n_features)): 後層から戻ってきた勾配
            s (int): 後ろから何層目かを表すインデックス
        :attributes
            cur_dH (2d-array, (batch_size, n_nodes1)): 勾配
        :returns
            dX (3d-array, (batch_size, n_sequences, n_features)) : 入力の勾配
        """
        # 勾配を足し合わせていくための格納用変数を用意
        if not hasattr(self, "dB"):
            self.dB = np.zeros(self.B.shape)
            self.dWx = np.zeros(self.Wx.shape)
            self.dWh = np.zeros(self.Wh.shape)
            self.dX = np.zeros(self.X.shape)
            
        print("s", s, self.X.shape[1])
        cur_dH = np.zeros_like(dH[0, :])
        while s <= self.X.shape[1]:
            # tanhのbackward
            dA = self.activation.backward(dH[-s, :] + cur_dH, self.A[-s, :])
            # 勾配を計算して、これまでの勾配と合計する
            self.dB += dA
            self.dWx += np.matmul(self.X[:, -s, :].T, dA[np.newaxis, :])
            self.dWh += np.matmul(self.H[-s, :][np.newaxis, :].T, dA[np.newaxis, :])
            self.dX[:, -s, :] = np.matmul(dA, self.Wx.T)
            print("dX:\n", self.dX)
            
            # 一つ前に渡すdHを求める
            cur_dH = np.matmul(dA, self.Wh.T)
            
            # 時間をさかのぼる。
            s += 1
            
        # 勾配をもとに重みとバイアスを更新
        self = self.optimizer.update(self)


In [7]:
class ScratchRNNClassifier():
    """リカレントニューラルネットワーク分類器
    :attributes
        layers (list of instances): 各層のインスタンスを並べたリスト
        n_epoch (int): 繰り返すエポック数
        batch_size (int): ミニバッチのデータ数
        plot_interval (int): 損失関数を記録する間隔
        epoch_interval (int): 何epochごとにprintするか
        loss (list): 損失関数の推移（訓練データ）
        lossval (list): 損失関数の推移（検証データ）
        verbose (bool): 学習過程を出力する場合はTrue
    """
    def __init__(self, layers, n_epoch=10, batch_size=20, plot_interval=1, verbose=True, epoch_interval=1):
        self.layers = layers
        self.n_epoch = n_epoch
        self.batch_size = batch_size
        self.plot_interval = plot_interval
        self.epoch_interval = epoch_interval
        self.verbose = verbose
        self.loss = []
        self.loss_val = []

    def predict_proba(self, X, H0):
        """ニューラルネットワーク分類器を使い推定する。推定結果は確率のOne-hot表現。
        :parameters
            X (3d-array, (n_samples, n_sequences, n_features)): 入力
            pre_H (2d-array, (n_samples, n_nodes)): 前時点のH （活性化前の線形結合）
        :returns
            y (2d-array, (n_samples, n_classes)): 推定確率
        """
        # forwardの先が recuurentなので複雑にしない。
        y_pred = self.layers.forward(X, H0)
        return y_pred
    
    def predict(self, X, H0):
        """ニューラルネットワーク分類器を使い推定する。
        :parameters
            X (2d-array, (n_samples, n_features)): サンプル
        :returns
            label (1d-array, (n_samples,)): 推定ラベル
        """
        probability = self.predict_proba(X, H0)
        label = np.argmax(probability, axis=1)
        return label

In [8]:
layer = SimpleRNN(n_features, n_nodes)
print("Pred", layer.forward(x, h0))
print("True", h_true)
print()
rnn_classifier = ScratchRNNClassifier(layer)
print("forward output:", rnn_classifier.predict_proba(x, h0))
print()

print("predicted label:", rnn_classifier.predict(x, h0))


X.shape : (1, 3, 2)
X.shape : (1, 2, 2)
X.shape : (1, 1, 2)
X.shape : (1, 0, 2)
Pred [[0.79494228 0.81839002 0.83939649 0.85584174]]
True [[0.79494228 0.81839002 0.83939649 0.85584174]]

X.shape : (1, 3, 2)
X.shape : (1, 2, 2)
X.shape : (1, 1, 2)
X.shape : (1, 0, 2)
forward output: [[0.79494228 0.81839002 0.83939649 0.85584174]]

X.shape : (1, 3, 2)
X.shape : (1, 2, 2)
X.shape : (1, 1, 2)
X.shape : (1, 0, 2)
predicted label: [3]


## 【問題３】バックプロパゲーションの実装
バックプロパゲーションを実装してください。


RNNの内部は全結合層を組み合わせた形になっているので、更新式は全結合層などと同様です。

$$
W_x^{\prime} = W_x - \alpha \frac{\partial L}{\partial W_x} \\
W_h^{\prime} = W_h - \alpha \frac{\partial L}{\partial W_h} \\
B^{\prime} = B - \alpha \frac{\partial L}{\partial B}
$$
$\alpha$ : 学習率


$\frac{\partial L}{\partial W_x}$ : $W_x$ に関する損失 $L$ の勾配


$\frac{\partial L}{\partial W_h}$ : $W_h$ に関する損失 $L$ の勾配


$\frac{\partial L}{\partial B}$ : $B$ に関する損失 $L$ の勾配


勾配を求めるためのバックプロパゲーションの数式が以下です。

$$
\frac{\partial h_t}{\partial a_t} = \frac{\partial L}{\partial h_t} ×(1-tanh^2(a_t))
$$
$$
\frac{\partial L}{\partial B} = \frac{\partial h_t}{\partial a_t}
$$
$$
\frac{\partial L}{\partial W_x} = x_{t}^{T}\cdot \frac{\partial h_t}{\partial a_t}
$$
$$
\frac{\partial L}{\partial W_h} = h_{t-1}^{T}\cdot \frac{\partial h_t}{\partial a_t}
$$

*$\frac{\partial L}{\partial h_t}$ は前の時刻からの状態の誤差と出力の誤差の合計です。hは順伝播時に出力と次の層に伝わる状態双方に使われているからです。


前の時刻や層に流す誤差の数式は以下です。

$$
\frac{\partial L}{\partial h_{t-1}} = \frac{\partial h_t}{\partial a_t}\cdot W_{h}^{T}
$$
$$
\frac{\partial L}{\partial x_{t}} = \frac{\partial h_t}{\partial a_t}\cdot W_{x}^{T}
$$

In [9]:
dh = np.ones((3, 4))
print("dH", dh)
layer.backward(dh)
print("===== Result =====")
print("dX:\n", layer.dX)
print("\ndWx:\n", layer.dWx)
print("\ndWh:\n", layer.dWh)
print("\ndB:\n", layer.dB)

dH [[1. 1. 1. 1.]
 [1. 1. 1. 1.]
 [1. 1. 1. 1.]]
s 1 3
dX:
 [[[0.         0.        ]
  [0.         0.        ]
  [0.04708592 0.06963563]]]
dX:
 [[[0.         0.        ]
  [0.05199753 0.07646683]
  [0.04708592 0.06963563]]]
dX:
 [[[0.07237975 0.10373134]
  [0.05199753 0.07646683]
  [0.04708592 0.06963563]]]
===== Result =====
dX:
 [[[0.07237975 0.10373134]
  [0.05199753 0.07646683]
  [0.04708592 0.06963563]]]

dWx:
 [[0.02325421 0.02151871 0.01988135 0.01863078]
 [0.03524769 0.03286036 0.03059886 0.0288961 ]]

dWh:
 [[0.9377582  0.88585433 0.83619035 0.80008613]
 [0.95506827 0.90155997 0.85038445 0.81309911]
 [0.97065743 0.91570602 0.86317077 0.82482314]
 [0.98291729 0.9268302  0.87322478 0.83404108]]

dB:
 [1.19934775 1.13416539 1.07175132 1.02653129]
