# Sprint 深層学習スクラッチ リカレントニューラルネットワーク
リカレントニューラルネットワーク（RNN） のクラスをスクラッチで作成していきます。NumPyなど最低限のライブラリのみを使いアルゴリズムを実装していきます。<br>
<br>
フォワードプロパゲーションの実装を必須課題とし、バックプロパゲーションの実装はアドバンス課題とします。<br>
<br>
クラスの名前はScratchSimpleRNNClassifierとしてください。クラスの構造などは以前のSprintで作成したScratchDeepNeuralNetrowkClassifierを参考にしてください。

## 【問題1】SimpleRNNのフォワードプロパゲーション実装
SimpleRNNのクラスSimpleRNNを作成してください。基本構造はFCクラスと同じになります。<br>
<br>
フォワードプロパゲーションの数式は以下のようになります。ndarrayのshapeがどうなるかを併記しています。<br>
<br>
バッチサイズをbatch_size、入力の特徴量数をn_features、RNNのノード数をn_nodesとして表記します。活性化関数はtanhとして進めますが、これまでのニューラルネットワーク同様にReLUなどに置き換えられます。

$$
a_t = x_{t}\cdot W_{x} + h_{t-1}\cdot W_{h} + B\\
h_t = tanh(a_t)
$$

$a_t$ : 時刻tの活性化関数を通す前の状態 (batch_size, n_nodes)

$h_t$ : 時刻tの状態・出力 (batch_size, n_nodes)

$x_{t}$ : 時刻tの入力 (batch_size, n_features)

$W_{x}$ : 入力に対する重み (n_features, n_nodes)

$h_{t-1}$ : 時刻t-1の状態（前の時刻から伝わる順伝播） (batch_size, n_nodes)

$W_{h}$ : 状態に対する重み。 (n_nodes, n_nodes)

$B$ : バイアス項 (n_nodes,)

初期状態$h_{0}$は全て0とすることが多いですが、任意の値を与えることも可能です。<br>
<br>
上記の処理を系列数n_sequences回繰り返すことになります。RNN全体への入力$x$は(batch_size, n_sequences, n_features)のような配列で渡されることになり、そこから各時刻の配列を取り出していきます。<br>
<br>
分類問題であれば、それぞれの時刻のhに対して全結合層とソフトマックス関数（またはシグモイド関数）を使用します。タスクによっては最後の時刻のhだけを使用することもあります。

In [32]:
import numpy as np
from copy import deepcopy

In [7]:
class SimpleRNN:
    """
    層の生成
    
    Parameters
    ----------
    activation : object instance
      活性化関数インスタンス
    optimizer : object instance
      最適化手法のインスタンス
      
    Attributes
    ----------
    w : 次の形のtuple, shape (Wx, Wh)
      重みパラメータ
    b : 次の形のndarray, shape (n_nodes, )
      バイアスパラメータ
    """
    def __init__(self, activation, optimizer=None, weights=None, intercept=None):
        self.activ = activation
        self.optimizer = optimizer
        
        self.w = weights
        self.b = intercept

    
    def forward(self, X, h):
        """
        順伝播
        
        Parameters
        ----------
        X : 次の形のndarray, shape (batch_size, n_features)
            入力
        h : 次の形のndarray, shape (batch_size, n_nodes)
            前の層の入力（t-1の状態）
            
        Returns
        ----------
        H : 次の形のndarray, shape (batch_size, n_nodes)
            出力（tの状態）
        """      
        W_x = self.w[0]
        W_h = self.w[1]
        
        A = X@W_x + h@W_h + self.b
        H = self.activ.forward(A)

        self.X = X[None,:]
        self.h = h
        self.output = H
        self.A_ = A

        return H
    

class Tanh:
    """
    ハイパーボリックタンジェント関数
    
    Parameters
    ----------
    Z_ : 次の形のndarray, shape (batch_size, n_nodes_self)
      順伝播の出力
    dA_ : 次の形のndarray, shape (batch_size, n_nodes_self)
      逆伝播入力に対するdA勾配
    """
    def __init__(self):
        self.Z_ = None
        self.dA_ = None
        
    def forward(self, A):
        """
        順伝播
        
        Parameters
        ----------
        A : 次の形のndarray, shape (batch_size, n_nodes_self)
          ある層に順伝播された特徴量データ
        """
        self.Z_ = np.tanh(A)
        
        return self.Z_
    
    def backward(self, dZ):
        """
        逆伝播
        
        Parameters
        ----------
        dZ : 次の形のndarray, shape (batch_size, n_nodes_self)
          ある層に逆伝播されたZに関するLoss勾配
        """
        self.dA_ = dZ * (1 - self.Z_**2)
        
        return self.dA_

## 【問題2】小さな配列でのフォワードプロパゲーションの実験
小さな配列でフォワードプロパゲーションを考えてみます。

入力x、初期状態h、重みw_xとw_h、バイアスbを次のようにします。

ここで配列xの軸はバッチサイズ、系列数、特徴量数の順番です。

```
x = np.array([[[1, 2], [2, 3], [3, 4]]])/100 # (batch_size, n_sequences, n_features)
w_x = np.array([[1, 3, 5, 7], [3, 5, 7, 8]])/100 # (n_features, n_nodes)
w_h = np.array([[1, 3, 5, 7], [2, 4, 6, 8], [3, 5, 7, 8], [4, 6, 8, 10]])/100 # (n_nodes, n_nodes)
batch_size = x.shape[0] # 1
n_sequences = x.shape[1] # 3
n_features = x.shape[2] # 2
n_nodes = w_x.shape[1] # 4
h = np.zeros((batch_size, n_nodes)) # (batch_size, n_nodes)
b = np.array([1, 1, 1, 1]) # (n_nodes,)
```

フォワードプロパゲーションの出力が次のようになることを作成したコードで確認してください。

```
h = np.array([[0.79494228, 0.81839002, 0.83939649, 0.85584174]]) # (batch_size, n_nodes)
```

In [8]:
x = np.array([[[1, 2], [2, 3], [3, 4]]])/100 # (batch_size, n_sequences, n_features)
w_x = np.array([[1, 3, 5, 7], [3, 5, 7, 8]])/100 # (n_features, n_nodes)
w_h = np.array([[1, 3, 5, 7], [2, 4, 6, 8], [3, 5, 7, 8], [4, 6, 8, 10]])/100 # (n_nodes, n_nodes)
batch_size = x.shape[0] # 1
n_sequences = x.shape[1] # 3
n_features = x.shape[2] # 2
n_nodes = w_x.shape[1] # 4
h = np.zeros((batch_size, n_nodes)) # (batch_size, n_nodes)
b = np.array([1, 1, 1, 1]) # (n_nodes,)

In [9]:
# 手計算
H1 = np.tanh(x[0,0]@w_x + h@w_h + b)
H2 = np.tanh(x[0,1]@w_x + H1@w_h + b)
H3 = np.tanh(x[0,2]@w_x + H2@w_h + b)
H3

array([[0.79494228, 0.81839002, 0.83939649, 0.85584174]])

In [11]:
W = (w_x, w_h)
B = b
rnn = SimpleRNN(Tanh(), weights=W, intercept=B)

for s in range(n_sequences):
    # 順伝播
    H = rnn.forward(x[0,s], h)
    h = H
print(H)

[[0.79494228 0.81839002 0.83939649 0.85584174]]


In [12]:
h_true = np.array([[0.79494228, 0.81839002, 0.83939649, 0.85584174]]) # (batch_size, n_nodes)
H - h_true

array([[-9.57790736e-10,  3.93828459e-09, -1.37243750e-09,
        -1.88850646e-09]])

## 【問題3】（アドバンス課題）バックプロパゲーションの実装
バックプロパゲーションを実装してください。

RNNの内部は全結合層を組み合わせた形になっているので、更新式は全結合層などと同様です。

$$
W_x^{\prime} = W_x - \alpha \frac{\partial L}{\partial W_x} \\
W_h^{\prime} = W_h - \alpha \frac{\partial L}{\partial W_h} \\
B^{\prime} = B - \alpha \frac{\partial L}{\partial B}
$$

$\alpha$ : 学習率

$\frac{\partial L}{\partial W_x}$ : $W_x$に関する損失$L$の勾配

$\frac{\partial L}{\partial W_h}$ : $W_h$に関する損失$L$の勾配

$\frac{\partial L}{\partial B}$ : $B$に関する損失$L$の勾配<br>
<br>
勾配を求めるためのバックプロパゲーションの数式が以下です。<br>

$\frac{\partial h_t}{\partial a_t}$ = $\frac{\partial L}{\partial h_t} × (1-tanh^2(a_t))$

$\frac{\partial L}{\partial B}$ = $\frac{\partial h_t}{\partial a_t}$

$\frac{\partial L}{\partial W_x}$ = $x_{t}^{T}\cdot \frac{\partial h_t}{\partial a_t}$

$\frac{\partial L}{\partial W_h}$ = $h_{t-1}^{T}\cdot \frac{\partial h_t}{\partial a_t}$<br>
<br>
＊$\frac{\partial L}{\partial h_t}$は前の時刻からの状態の誤差と出力の誤差の合計です。hは順伝播時に出力と次の層に伝わる状態双方に使われているからです。

前の時刻や層に流す誤差の数式は以下です。

$\frac{\partial L}{\partial h_{t-1}}$ = $\frac{\partial h_t}{\partial a_t}\cdot W_{h}^{T}$

$\frac{\partial L}{\partial x_{t}}$ = $\frac{\partial h_t}{\partial a_t}\cdot W_{x}^{T}$

In [46]:
class SimpleRNN:
    """
    層の生成
    
    Parameters
    ----------
    activation : object instance
      活性化関数インスタンス
    optimizer : object instance
      最適化手法のインスタンス
      
    Attributes
    ----------
    w : 次の形のtuple, shape (Wx, Wh)
      重みパラメータ
    b : 次の形のndarray, shape (n_nodes, )
      バイアスパラメータ
    """
    def __init__(self, activation, optimizer=None, weights=None, intercept=None):
        self.activ = activation
        self.optimizer = optimizer
        
        self.w = weights
        self.b = intercept
        
        
    def forward(self, X, h):
        """
        順伝播
        
        Parameters
        ----------
        X : 次の形のndarray, shape (batch_size, n_features)
            入力
        h : 次の形のndarray, shape (batch_size, n_nodes)
            前の層の入力（t-1の状態）
            
        Returns
        ----------
        H : 次の形のndarray, shape (batch_size, n_nodes)
            出力（tの状態）
        """      
        W_x = self.w[0]
        W_h = self.w[1]
        
        A = X@W_x + h@W_h + self.b
        H = self.activ.forward(A)

        self.X = X[None,:]
        self.h = h
        self.output = H
        self.A_ = A

        return H
    
 
    def backward(self, dH, dW, dB):
        """
        逆伝播（Back Propagation Through Time）
        
        Parameters
        ----------
        dH : 次の形のndarray, shape (batch_size, n_nodes_self)
          後ろから流れてきた勾配
        dW : 次の形のtuple, shape (dWx, dWh)
          一つ後ろの状態の重み勾配
        dB : 次の形のndarray, shape (n_nodes, )
          一つ後ろの状態の重み勾配
        lr : float
          学習率
        """
        w_x = self.w[0]
        w_h = self.w[1]
        
        dA = self.activ.backward(dH)
        dh = dA@w_h.T
        dWx = self.X.T @ dA
        dWh = self.h.T @ dA
    
        dW[0] += dWx
        dW[1] += dWh
        dB += dA
        
        self.dH = dH
        self.dW = dW
        self.dB = dB
        
        return dh, dW, dB

In [47]:
x = np.array([[[1, 2], [2, 3], [3, 4]]])/100 # (batch_size, n_sequences, n_features)
w_x = np.array([[1, 3, 5, 7], [3, 5, 7, 8]])/100 # (n_features, n_nodes)
w_h = np.array([[1, 3, 5, 7], [2, 4, 6, 8], [3, 5, 7, 8], [4, 6, 8, 10]])/100 # (n_nodes, n_nodes)
batch_size = x.shape[0] # 1
n_sequences = x.shape[1] # 3
n_features = x.shape[2] # 2
n_nodes = w_x.shape[1] # 4
h = np.zeros((batch_size, n_nodes)) # (batch_size, n_nodes)
b = np.array([1, 1, 1, 1]) # (n_nodes,)

In [48]:
W = (w_x, w_h)
B = b

network = []
output = []
for s in range(n_sequences):
    network.append(SimpleRNN(Tanh(), weights=W, intercept=B))
    # 順伝播
    h = network[s].forward(x[0,s], h)
    output.append(h)
    
print(output[-1])

[[0.79494228 0.81839002 0.83939649 0.85584174]]


In [50]:
dW = [np.zeros(w_x.shape), np.zeros(w_h.shape)]
dB = np.zeros(b.shape)[None,:]
lr = 0
for s in range(n_sequences):
    # 逆伝播
    dH, dW, dB = network[n_sequences-s-1].backward(output[-1], dW, dB)
print(dH.shape)
print(dW[0].shape)
print(dW[1].shape)
print(dB.shape)

(1, 4)
(2, 4)
(4, 4)
(1, 4)


In [171]:
# SimpleRNNClassifier

class GetMiniBatch:
    """
    ミニバッチを取得するイテレータ

    Parameters
    ----------
    X : 次の形のndarray, shape (n_samples, n_features)
      訓練データ
    y : 次の形のndarray, shape (n_samples, 1)
      正解値
    batch_size : int
      バッチサイズ
    seed : int
      NumPyの乱数のシード
    """
    def __init__(self, X, y, batch_size = 20, seed=0):
        self.batch_size = batch_size
        np.random.seed(seed)
        shuffle_index = np.random.permutation(np.arange(X.shape[0]))
        self._X = X[shuffle_index]
        self._y = y[shuffle_index]
        self._stop = np.ceil(X.shape[0]/self.batch_size).astype(np.int)
    def __len__(self):
        return self._stop
    def __getitem__(self,item):
        p0 = item*self.batch_size
        p1 = item*self.batch_size + self.batch_size
        return self._X[p0:p1], self._y[p0:p1]        
    def __iter__(self):
        self._counter = 0
        return self
    def __next__(self):
        if self._counter >= self._stop:
            raise StopIteration()
        p0 = self._counter*self.batch_size
        p1 = self._counter*self.batch_size + self.batch_size
        self._counter += 1
        return self._X[p0:p1], self._y[p0:p1]

class SimpleRNN:
    """
    層の生成
    
    Parameters
    ----------
    activation : object instance
      活性化関数インスタンス
    optimizer : object instance
      最適化手法のインスタンス
      
    Attributes
    ----------
    w : 次の形のtuple, shape (Wx, Wh)
      重みパラメータ
    b : 次の形のndarray, shape (n_nodes, )
      バイアスパラメータ
    """
    def __init__(self, activation, optimizer=None, weights=None, intercept=None):
        self.activ = activation
        self.optimizer = optimizer
        
        self.w = weights
        self.b = intercept
        
        
    def forward(self, X, h):
        """
        順伝播
        
        Parameters
        ----------
        X : 次の形のndarray, shape (batch_size, n_features)
            入力
        h : 次の形のndarray, shape (batch_size, n_nodes)
            前の層の入力（t-1の状態）
            
        Returns
        ----------
        H : 次の形のndarray, shape (batch_size, n_nodes)
            出力（tの状態）
        """      
        W_x = self.w[0]
        W_h = self.w[1]
        
        A = X@W_x + h@W_h + self.b
        H = self.activ.forward(A)

        self.X = X[None,:]
        self.h = h
        self.output = H
        self.A_ = A

        return H
    
 
    def backward(self, dH, dW, dB):
        """
        逆伝播（Back Propagation Through Time）
        
        Parameters
        ----------
        dH : 次の形のndarray, shape (batch_size, n_nodes_self)
          後ろから流れてきた勾配
        dW : 次の形のtuple, shape (dWx, dWh)
          一つ後ろの状態の重み勾配
        dB : 次の形のndarray, shape (n_nodes, )
          一つ後ろの状態の重み勾配
        lr : float
          学習率
        """
        w_x = self.w[0]
        w_h = self.w[1]
        
        dA = self.activ.backward(dH)
        dh = dA@w_h.T
        dWx = self.X.T @ dA
        dWh = self.h.T @ dA
    
        dW[0] += dWx
        dW[1] += dWh
        dB += dA
        
        self.dH = dH
        self.dW = dW
        self.dB = dB
        
        return dh, dW, dB

class ScratchSimpleRNNClassifier:
    """
    可変層畳み込みニューラルネットワーク分類器

    Parameters
    ----------
    layers : list
      ネットワークに組み込まれる層のリスト
    epoch : int
      エポック数
    sigma : float
      初期パラメータ用（SimpleInitializerのみ適用）
    batch_size : int
      ミニバッチのサンプル数
    verbose : bool
      学習経過の出力

    Attributes
    ----------
    loss_train : list
      訓練データに対するLoss
    loss_val : list
      検証データに対するLoss
    """
    def __init__(self, layers, epoch=100, h_init=0, lr=0.01, batch_size=100, verbose=False, **kwargs):
        self.layers = layers
        self.epoch = epoch
        self.lr = lr
        self.h_init = h_init
        self.verbose = verbose
        self.batch_size = batch_size
        self.loss_train = []
        self.loss_val = []
        self.wx = None
        self.wh = None
        self.b = None

    def fit(self, X, y, X_val=None, y_val=None):
        """
        ニューラルネットワーク分類器を学習する。

        Parameters
        ----------
        X : 次の形のndarray, shape (n_samples, n_sequences, n_features)
            訓練データの特徴量
        y : 次の形のndarray, shape (n_samples, n_classes)
            訓練データの正解値
        X_val : 次の形のndarray, shape (n_samples, n_sequences, n_features)
            検証データの特徴量
        y_val : 次の形のndarray, shape (n_samples, n_classes)
            検証データの正解値
        """  
        # 回帰結合ネットワーク生成
        n_sequences = len(X[1])
        self.network = []
        for i in range(n_sequences):
            self.network.append(self.layers.deepcopy())
        
        # 勾配の初期化
        dW = [np.zeros(self.layers.w[0].shape), np.zeros(self.layers.w[1].shape)]
        dB = np.zeros(self.layers.b.shape)[None,:]
        
        for i in range(self.epoch):
            
            get_mini_batch_t = GetMiniBatch(X, y, batch_size=self.batch_size, seed=i)
            
            times = []
            start = time.time()
            
            # 各mini batchの損失をリスト化
            loss_batch_t = []
            
            for X_mini, y_mini in get_mini_batch_t:

                # 初期Hを設定
                h = self.h_init
                for s in range(n_sequences):
                    # 順伝播
                    H = self.network[s].forward(x_mini[0,s], h)
                    h = H
                    # 逆伝播
                    dH, dW, dB = network[n_sequences-s-1].backward(y_mini, dW, dB)

                loss_batch_t.append(self.cross_entropy(output, y_mini))
            
                # パラメータ更新
                self.wx -= lr * self.optimizer.update_dw(self, dW[0])
                self.wh -= lr * self.optimizer.update_dw(self, dW[1])
                self.b -= lr * self.optimizer.update_db(self, dB)
            
            
            # 各epochの平均損失をselfに格納
            loss_train = np.mean(loss_batch_t)
            self.loss_train.append(loss_train)
            
            
            # 検証データの推定
            if hasattr(X_val, '__array__') and hasattr(y_val, '__array__'):
                
                batch_size_v = int(self.batch_size * len(X_val)/len(X))
                get_mini_batch_v = GetMiniBatch(X_val, y_val, batch_size=batch_size_v)
                loss_batch_v = []

                for X_mini, y_mini in get_mini_batch_v:
                    # 初期Hを設定
                    h = self.h_init
                    for s in range(n_sequences):
                        # 順伝播
                        H = self.network[s].forward(x_mini[0,s], h)
                        h = H
                        
                    loss_batch_v.append(self.cross_entropy(H, y_mini))
            
                # 各epochの平均損失をselfに格納
                loss_val = np.mean(loss_batch_v)
                self.loss_val.append(loss_val)

            end = time.time()
            times.append(end-start)

            # 学習経過の出力
            if self.verbose and (i+1) % 10 == 0:
                print("Epoch {}; Loss {:.4f}".format(i+1, loss_train),
                      "  --Avg Epoch Time {:.4f}sec".format(np.mean(times)))            
                   
    def predict(self, X):
        """
        ニューラルネットワーク分類器を使い推定する。

        Parameters
        ----------
        X : 次の形のndarray, shape (n_samples, n_features)
          検証用データ

        Returns
        -------
          次の形のndarray, shape (n_samples, )
          推定結果
        """
        n_sequences = len(X[1])
        # 初期Hを設定
        h = self.h_init
        output = []
        for s in range(n_sequences):
            H = self.network[s].forward(x[0,s], h)
            h = H        
            output.append(H)
                
        return H
        
    def cross_entropy(self, X, y):
        """
        クロスエントロピー誤差を計算

        Parameters
        ----------
        X : 次の形のndarray, shape (batch_size, n_features)
          入力データ
        y : 次の形のndarray, shape (batch_size, n_classes)
          入力データの正解ラベル

        Returns
        -------
          float
          クロスエントロピー誤差
        """
        return (-1/len(X)) * np.sum((y*np.log(X)))