# Sprint22 RNN

リカレントニューラルネットワーク（RNN） のクラスをスクラッチで作成していきます。NumPyなど最低限のライブラリのみを使いアルゴリズムを実装していきます。

フォワードプロパゲーションの実装を必須課題とし、バックプロパゲーションの実装はアドバンス課題とします。

クラスの名前はScratchSimpleRNNClassifierとしてください。クラスの構造などは以前のSprintで作成したScratchDeepNeuralNetrowkClassifierを参考にしてください。

## 【問題1】SimpleRNNのフォワードプロパゲーション実装


SimpleRNNのクラスSimpleRNNを作成してください。基本構造はFCクラスと同じになります。

ndarrayのshapeがどうなるかを併記しています。

バッチサイズをbatch_size、入力の特徴量数をn_features、RNNのノード数をn_nodesとして表記します。活性化関数はtanhとして進めますが、これまでのニューラルネットワーク同様にReLUなどに置き換えられます。

a
t
 : 時刻tの活性化関数を通す前の状態 (batch_size, n_nodes)

h
t
 : 時刻tの状態・出力 (batch_size, n_nodes)

x
t
 : 時刻tの入力 (batch_size, n_features)

W
x
 : 入力に対する重み (n_features, n_nodes)

h
t
−
1
 : 時刻t-1の状態（前の時刻から伝わる順伝播） (batch_size, n_nodes)

W
h
 : 状態に対する重み。 (n_nodes, n_nodes)

B
 : バイアス項 (n_nodes,)

初期状態 
h
0
 は全て0とすることが多いですが、任意の値を与えることも可能です。

上記の処理を系列数n_sequences回繰り返すことになります。RNN全体への入力 
x
 は(batch_size, n_sequences, n_features)のような配列で渡されることになり、そこから各時刻の配列を取り出していきます。

分類問題であれば、それぞれの時刻のhに対して全結合層とソフトマックス関数（またはシグモイド関数）を使用します。タスクによっては最後の時刻のhだけを使用することもあります。

In [27]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
# from numpy import linalg as LA
import copy
sns.set()
%matplotlib inline
import time
import math
import copy
import random

from sklearn.preprocessing import OneHotEncoder
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score

class GetMiniBatch:
    """
    ミニバッチを取得するイテレータ

    Parameters
    ----------
    X : 次の形のndarray, shape (n_samples, n_features)
      訓練用データ
    y : 次の形のndarray, shape (n_samples, 1)
      正解値
    batch_size : int
      バッチサイズ
    seed : int
      NumPyの乱数のシード
    """
    def __init__(self, X, y, batch_size = 1, 
                 seed=0):
        self.batch_size = batch_size
        np.random.seed(seed)
        shuffle_index = np.random.permutation(np.arange(X.shape[0]))
        self._X = X[shuffle_index]
        self._y = y[shuffle_index]
        self._stop = np.ceil(X.shape[0]/self.batch_size).astype(np.int)

    def __len__(self):
        return self._stop

    def __getitem__(self,item):
        p0 = item*self.batch_size
        p1 = item*self.batch_size + self.batch_size
        return self._X[p0:p1], self._y[p0:p1]        

    def __iter__(self):
        self._counter = 0
        return self

    def __next__(self):
        if self._counter >= self._stop:
            raise StopIteration()
        p0 = self._counter*self.batch_size
        p1 = self._counter*self.batch_size + self.batch_size
        self._counter += 1
        return self._X[p0:p1], self._y[p0:p1]

In [28]:
class SimpleInitializer:
    """
    ガウス分布によるシンプルな初期化
    Parameters
    ----------
    sigma : float
      ガウス分布の標準偏差
    """
    def __init__(self, weight_type = 'xav', sigma=0.01):
        self.weight_type = weight_type
        self.sigma = sigma
    def W_x(self, n_features, n_nodes):
        """
        重みの初期化
        Parameters
        ----------
        n_features : int
          X の N (n_features)
        n_nodes : int
          出力ノード数

        Returns
        ----------
        W_x : W_x # (n_features, n_nodes)
        """
        self.n_features = n_features
        self.n_nodes = n_nodes
        
        if self.weight_type == 'sig':
            self.W_std = self.sigma
        
        elif self.weight_type == 'xav':
            self.W_std = 1/np.sqrt(n_features)
            
        elif self.weight_type == 'he':
            self.W_std = np.sqrt(2/n_features)
            
        W_x = self.W_std * np.random.randn(self.n_features, self.n_nodes)
        return W_x
    
    def W_h(self):
        """
        重みの初期化
        Parameters
        ----------
        n_features : int
          X の N (n_features)
        n_nodes : int
          出力ノード数

        Returns
        ----------
        W_h : W_h # (n_nodes, n_nodes)
        """
        W_h = self.W_std * np.random.randn(self.n_nodes, self.n_nodes)
        return W_h # (n_nodes, n_nodes)
    
    def B(self):
        """
        バイアスの初期化
        Parameters
        ----------
        n_nodes2 : int
          後の層のノード数

        Returns
        ----------
        B :
        """
        B = self.sigma * np.random.randn(self.n_nodes)
        return B

In [29]:
class Softmax:
    def __init__(self):
        pass    

    def forward(self,A):
        if A.ndim == 2:
            x = A.T
            x = x - np.max(x, axis=0)
            y = np.exp(x) / np.sum(np.exp(x), axis=0)
            return y.T
        
#         x = A - np.max(A) 
        # オーバーフロー対策
#         Z_last = np.exp(A) / np.sum(np.exp(A))
        return np.exp(A) / np.sum(np.exp(A))
        
#         return Z_last
    
    def backward(self,Z_last,y):
        dA_last = Z_last - y #交差エントロピー誤差の計算も含む実装
        return dA_last

In [30]:
"""
合計が１になるか確認することが大切。
"""
soft_func = Softmax()
soft = np.sum(soft_func.forward(x),axis=1)
soft

array([[0.49750002, 0.50249998]])

In [31]:
class Hipo:
    def __init__(self):
        self.A = None    

    def forward(self,A):
        self.A = A
        Z = np.tanh(self.A)
        return Z
    
    def backward(self,dZ):
        dA = dZ * ((np.tanh(self.A))**2)
        return dA

In [109]:
class FC:
    """
    ノード数n_nodes1からn_nodes2への全結合層
    Parameters
    ----------
    n_nodes1 : int
      前の層のノード数
    n_nodes2 : int
      後の層のノード数
    initializer : 初期化方法のインスタンス
    optimizer : 最適化手法のインスタンス
    """
    def __init__(self, n_features, n_nodes, 
                 initializer = False, optimizer = False):
        self.optimizer = optimizer
        if initializer:
            # initializerのメソッドを使い、self.W_x, self.W_h, self.Bを初期化する
            self.W_x = initializer.W_x(n_features, n_nodes)
            self.W_h = initializer.W_h()
            self.B = initializer.B()
        else:
            self.W_x = np.array([[1, 3, 5, 7], [3, 5, 7, 8]])/100 # (n_features, n_nodes)
            self.W_h = np.array([[1, 3, 5, 7], [2, 4, 6, 8], [3, 5, 7, 8], [4, 6, 8, 10]])/100 # (n_nodes, n_nodes)
            self.B = np.array([1, 1, 1, 1]) # (n_nodes,)

        self.X = None
        
    def forward(self, X, h_s):
        """
        フォワード
        Parameters
        ----------
        X : 次の形のndarray, shape (batch_size, n_features)
            入力
        Returns
        ----------
        h_s : 次の形のndarray, shape (batch_size, n_nodes)
            出力
        """        
        self.X = X
        self.h_s = h_s
        A = self.X @ self.W_x + self.h_s @ self.W_h + self.B 
        # W_x(n_features, n_nodes), W_h(n_nodes, n_nodes), B(n_nodes,)
        
        return A #出力 (batch_size, n_nodes)
    
    def backward(self, dA):
        """
        バックワード
        Parameters
        ----------
        dA : 次の形のndarray, shape (batch_size, n_nodes2)
            後ろから流れてきた勾配
        Returns
        ----------
        dZ : 次の形のndarray, shape (batch_size, n_nodes1)
            前に流す勾配
        """
        self.dB = np.sum(dA,axis=0)
        self.dW = self.X.T @ dA
        dZ = dA @ self.W.T
        
        # 更新
        self = self.optimizer.update(self) 
        return dZ

In [None]:
class SGD:
    """
    確率的勾配降下法
    Parameters
    ----------
    lr : 学習率
    """
    def __init__(self, lr):
        self.lr = lr
    def update(self, layer): 
        """
        ある層の重みやバイアスの更新
        Parameters
        ----------
        layer : 更新前の層のインスタンス
        """
        '''
        layer (FC)から持ってこれるようになる。
        layer.Z = FC内のself.Z
        layer.W
        layer.B
        '''
        layer.W_x = layer.W_x - self.lr * layer.dW_x
        layer.W_h = layer.W_h - self.lr * layer.dW_h
        layer.B = layer.B - self.lr * layer.dB

In [119]:
'''
スクラッチ
'''    

class ScratchSimpleRNNClassifier:
    def __init__(self, 
                 W_x = 0,
                 W_h = 0,
                 max_iter = 1,
                 lr = 0.01, sigma = 0.01,
                 verbose = False):
        self.iter = max_iter
        self.lr = lr
        self.sigma = sigma
        self.verbose = verbose
    
#     def loss_func(self,y,y_pred_proba):
#         sigma_c = np.sum(y * np.log(y_pred_proba), axis = 1) 
#         #batch_size方向に平均
#         pass
    
#         return #-np.mean(sigma_c)
    
    def fit(self, X, y= False, sigma = 0.01,
            n_nodes = 4,
            X_val = False, y_val = False):
        """
        Parameters
        ----------
        X : 次の形のndarray, shape (n_sequence, n_features)
            訓練用データの特徴量
        y : 次の形のndarray, shape (batch_size, n_nodes)
            訓練用データの正解値
        """
        self.n_nodes = n_nodes
        self.n_features = X.shape[2]
        self.n_sequence = X.shape[1]
        self.batch_size = X.shape[0]
        
        self.h_s = np.zeros((self.batch_size, 
                             self.n_sequence,
                             self.n_nodes)) # shape(1,3,4)
        
#         optimizer = SGD(self.lr) #更新方法選択
        self.FC = FC(self.n_features,
                     self.n_nodes,
                     optimizer = optimizer)
        self.activation = Hipo()
    
        for i in range(self.n_sequence):
            for n in range(self.iter): #何回回そう？ MNISTなら50回ほどで試す
                A = self.FC.forward(X[:,i,:],self.h_s[:,i-1,:]) #(batch_size, n_nodes)
                h_s = self.activation.forward(A) #(batch_size, n_nodes)
                self.h_s[:,i,:] = h_s
                
                #問題3 back 
                
            print("{}回目時点の出力:{}".format(i, h_s))

## 【問題2】小さな配列でのフォワードプロパゲーションの実験
小さな配列でフォワードプロパゲーションを考えてみます。

入力x、初期状態h、重みw_xとw_h、バイアスbを次のようにします。

ここで配列xの軸はバッチサイズ、系列数、特徴量数の順番です。



In [120]:
x = np.array([[[1, 2], [2, 3], [3, 4]]])/100 # (batch_size, n_sequences, n_features)
w_x = np.array([[1, 3, 5, 7], [3, 5, 7, 8]])/100 # (n_features, n_nodes)
w_h = np.array([[1, 3, 5, 7], [2, 4, 6, 8], [3, 5, 7, 8], [4, 6, 8, 10]])/100 # (n_nodes, n_nodes)
batch_size = x.shape[0] # 1
n_sequences = x.shape[1] # 3
n_features = x.shape[2] # 2
n_nodes = w_x.shape[1] # 4
h = np.zeros((batch_size, n_nodes)) # (batch_size, n_nodes)
b = np.array([1, 1, 1, 1]) # (n_nodes,)

# print("x",x)
# print(x.shape)
# print("w_x",w_x)
# print("w_h",w_h)
# print("h.shape",h.shape)
# print("b.shape",b.shape)

フォワードプロパゲーションの出力が次のようになることを作成したコードで確認してください。



In [116]:
RNN = ScratchSimpleRNNClassifier(max_iter=1,
                                 lr=0.01,
                                 verbose = False)

In [117]:
t1 = time.time()
RNN.fit(x)

t2 = time.time()
print("processing time:",t2-t1)

# y_pred = NN.predict(X_test)
# print("accuracy",accuracy_score(np.argmax(y_test_one_hot,axis=1),y_pred))


X_i [[0.01 0.02]]
X_i (1, 2)
0回目時点の出力:[[0.76188798 0.76213958 0.76239095 0.76255841]]
X_i [[0.02 0.03]]
X_i (1, 2)
1回目時点の出力:[[0.792209   0.8141834  0.83404912 0.84977719]]
X_i [[0.03 0.04]]
X_i (1, 2)
2回目時点の出力:[[0.79494228 0.81839002 0.83939649 0.85584174]]
processing time: 0.002191781997680664


In [118]:
h = np.array([[0.79494228, 0.81839002, 0.83939649, 0.85584174]]) # (batch_size, n_nodes)

## 【問題3】（アドバンス課題）バックプロパゲーションの実装
バックプロパゲーションを実装してください。

RNNの内部は全結合層を組み合わせた形になっているので、更新式は全結合層などと同様です。

In [None]:
#仮のLOSSを各時間ごとに自作



In [None]:
class ScratchDeepNeuralNetrowkClassifier():
    def __init__(self, max_iter=5,
                 lr=0.01, sigma = 0.01,
                 verbose = True):
        """
        self.sigma : ガウス分布の標準偏差
        self.lr : 学習率
        self.iter : 学習回数
        """
        self.iter = max_iter
        self.lr = lr
        self.sigma = sigma
        self.verbose = verbose
    
    def loss_func(self,y,y_pred_proba):
        sigma_c = np.sum(y * np.log(y_pred_proba), axis = 1) 
        #batch_size方向に平均
        return -np.mean(sigma_c)
    
    def fit(self, X, y,
            sigma = 0.01,n_nodes1 = 400,n_nodes2 = 200,
            n_output = 10, X_val=None, y_val=None):
        """
        ニューラルネットワーク分類器を学習する。

        Parameters
        ----------
        X : 次の形のndarray, shape (n_samples, n_features)
            訓練用データの特徴量
        y : 次の形のndarray, shape (n_samples, )
            訓練用データの正解値
        X_val : 次の形のndarray, shape (n_samples, n_features)
            検証用データの特徴量
        y_val : 次の形のndarray, shape (n_samples, )
            検証用データの正解値
        
        # self.n_nodes1 : 1層目のノード数
        # self.n_nodes2 : 2層目のノード数
        # self.n_output : 出力層のノード数

        """
        self.n_features = X.shape[1]
        self.n_nodes1 = n_nodes1
        self.n_nodes2 = n_nodes2
        self.n_output = n_output
        
        optimizer = SGD(self.lr) #更新方法選択
        
        self.FC1 = FC(self.n_features, self.n_nodes1,
                      SimpleInitializer(weight_type='xav',sigma = self.sigma),optimizer)
        self.activation1 = Hipo()
        self.FC2 = FC(self.n_nodes1, self.n_nodes2, 
                      SimpleInitializer(weight_type='xav',sigma = self.sigma),optimizer)
        self.activation2 = Hipo()
        self.FC3 = FC(self.n_nodes2, self.n_output, 
                      SimpleInitializer(weight_type='xav',sigma = self.sigma),optimizer)
        self.activation3 = Hipo()
        self.loss = None
        
        for n in range(self.iter): #何回回そう？ MNISTなら50回ほどで試す
            get_mini_batch = GetMiniBatch(X_train,
                                          y_train_one_hot,
                                          batch_size=20)
            for mini_X_train, mini_y_train in get_mini_batch:
            # このfor文内でミニバッチが使える
                #W,B = FC内 FCX.W, FCX.B (Xは1,2,...)
                A1 = self.FC1.forward(mini_X_train) #(batch_size, n_nodes1)
                Z1 = self.activation1.forward(A1) #(batch_size, n_nodes1)
                A2 = self.FC2.forward(Z1) #(batch_size, n_nodes2)
                Z2 = self.activation2.forward(A2) #(batch_size, n_nodes2)
                A3 = self.FC3.forward(Z2) #(batch_size, n_output)
                Z3 = self.activation3.forward(A3) #(batch_size, n_output)                
                
                dA3 = self.activation3.backward(Z3, mini_y_train) #(batch_size, n_output)
                # 交差エントロピー誤差とソフトマックスを合わせている
                dZ2 = self.FC3.backward(dA3) #(batch_size, n_nodes2)
                dA2 = self.activation2.backward(dZ2) #(batch_size, n_nodes2)
                dZ1 = self.FC2.backward(dA2) #(batch_size, n_nodes1)
                dA1 = self.activation1.backward(dZ1) #(batch_size, n_nodes1)
                dZ0 = self.FC1.backward(dA1) # dZ0は使用しない
            
            A1 = self.FC1.forward(X) #(sample_size, n_nodes1)
            Z1 = self.activation1.forward(A1) #(sample_size, n_nodes1)
            A2 = self.FC2.forward(Z1) #(sample_size, n_nodes2)
            Z2 = self.activation2.forward(A2) #(sample_size, n_nodes2)
            A3 = self.FC3.forward(Z2) #(sample_size, n_output)
            Z3 = self.activation3.forward(A3) #(sample_size, n_output)
            y_pred_proba = Z3
            loss = self.loss_func(y,y_pred_proba)
            self.loss = np.append(self.loss,loss)
            
        self.loss = np.delete(self.loss, 0)
        if self.verbose:
            #verboseをTrueにした際は学習過程などを出力する
            print("self.loss.shape",self.loss.shape)
       
    def predict(self, X):
        """
        ニューラルネットワーク分類器を使い推定する。

        Parameters
        ----------
        X : 次の形のndarray, shape (n_samples, n_features)
            サンプル

        Returns
        -------
            次の形のndarray, shape (n_samples, 1)
            推定結果
        """
        A1 = self.FC1.forward(X) #(n_samples, n_nodes1)
        Z1 = self.activation1.forward(A1) #(n_samples, n_nodes1)
        A2 = self.FC2.forward(Z1) #(n_samples, n_nodes2)
        Z2 = self.activation2.forward(A2) #(n_samples, n_nodes2)
        A3 = self.FC3.forward(Z2) #(n_samples, n_output)
        Z3 = self.activation3.forward(A3) #(n_samples, n_output)

        y_pred_proba = np.argmax(Z3,axis=1)
        return y_pred_proba