# Sprint 深層学習スクラッチ 畳み込みニューラルネットワーク1

## 【問題1】チャンネル数を1に限定した1次元畳み込み層クラスの作成
チャンネル数を1に限定した1次元畳み込み層のクラスSimpleConv1dを作成してください。基本構造は前のSprintで作成した全結合層のFCクラスと同じになります。なお、重みの初期化に関するクラスは必要に応じて作り変えてください。Xavierの初期値などを使う点は全結合層と同様です。


ここでは パディング は考えず、ストライド も1に固定します。また、複数のデータを同時に処理することも考えなくて良く、バッチサイズは1のみに対応してください。この部分の拡張はアドバンス課題とします。

In [1]:
import numpy as np

In [2]:
class CNN_FC:
    """
    ノード数n_nodes1からn_nodes2への全結合層
    Parameters
    ----------
    n_nodes1 : int
      前の層のノード数
    n_nodes2 : int
      後の層のノード数
    initializer : 初期化方法のインスタンス
    optimizer : 最適化手法のインスタンス
    """
    def __init__(self, w, b , optimizer,stride, padding):
        self.optimizer = optimizer
        self.W = w
        self.B = b
        self.stride = stride
        self.padding = padding

    def forward(self, X):
        """
        フォワード
        Parameters
        ----------
        X : 次の形のndarray, shape (batch_size, n_nodes1)
            入力
        Returns
        ----------
        A : 次の形のndarray, shape (batch_size, n_nodes2)
            出力
        """
        self.A = X
        a = np.zeros(len(X)-2)
        for i in range(len(X) - 2):
            sig = 0
            for j in range(len(self.W)):
                sig += X[i+j] * self.W[j]
            sig += self.B
            a[i] = sig
        return a

    def backward(self, dA):
        """
        バックワード
        Parameters
        ----------
        dA : 次の形のndarray, shape (batch_size, n_nodes2)
            後ろから流れてきた勾配
        Returns
        ----------
        dZ : 次の形のndarray, shape (batch_size, n_nodes1)
            前に流す勾配
        """
        n_out = N_OUT(self.stride, self.padding, self.A, self.W)
        self.LB = np.sum(dA)
        self.LW = np.zeros_like(self.W)
        
        for s in range(len(self.W)):
            sigma = 0
            for i in range(len(self.W)-1):
                sigma += dA[i] * self.A[i+s]
            self.LW[s] = sigma
        
        self.dZ = np.zeros_like(self.A)
        for j in range(len(self.A)):
            sigma = 0
            for s in range(len(self.W)):
                if j - s < 0 or j - s > n_out-1:
                    pass
                else:
                    sigma += dA[j-s] * self.W[s]
            self.dZ[j] = sigma

        self = self.optimizer.update(self)
        return self.dZ

## 【問題2】1次元畳み込み後の出力サイズの計算
畳み込みを行うと特徴量の数が変化します。どのように変化するかは以下の数式から求められます。パディングやストライドも含めています。この計算を行う関数を作成してください。

$
N_{out} =  \frac{N_{in}+2P-F}{S} + 1\\
$

In [3]:
def N_OUT(stride, padding, X,  W):
    if X.ndim == 1:
        return int((X.shape[0] + (2*padding) - len(W) / stride) + 1)
    elif X.ndim == 3:
        return int((X.shape[2] + (2*padding) - len(W) / stride) + 1 )

## 【問題3】小さな配列での1次元畳み込み層の実験
次に示す小さな配列でフォワードプロパゲーションとバックプロパゲーションが正しく行えているか確認してください。

In [4]:
x = np.array([1,2,3,4])
w = np.array([3, 5, 7])
b = np.array([1])
delta_a = np.array([10, 20])

In [5]:
class SGD:
    """
    確率的勾配降下法
    Parameters
    ----------
    lr : 学習率
    """
    def __init__(self, lr):
        self.lr = lr
    def update(self, layer):
        """
        ある層の重みやバイアスの更新
        Parameters
        ----------
        layer : 更新前の層のインスタンス
        """
        layer.W = layer.W -  self.lr * layer.LW
        layer.B = layer.B - self.lr*layer.LB
        
        return layer

In [6]:
cnn = CNN_FC(w, b, SGD(0.1), 1, 0)
print("forward:",cnn.forward(x))
print("backward:",cnn.backward(delta_a))

forward: [35. 50.]
backward: [ 30 110 170 140]


## 【問題4】チャンネル数を限定しない1次元畳み込み層クラスの作成
チャンネル数を1に限定しない1次元畳み込み層のクラスConv1dを作成してください。



In [7]:
x = np.array([[1, 2, 3, 4], [2, 3, 4, 5]]) # shape(2, 4)で、（入力チャンネル数、特徴量数）である。
w = np.ones((3, 2, 3)) # 例の簡略化のため全て1とする。(出力チャンネル数、入力チャンネル数、フィルタサイズ)である。
b = np.array([1, 2, 3]) # （出力チャンネル数）

In [8]:
class CNN2dim_FC:
    """
    ノード数n_nodes1からn_nodes2への全結合層
    Parameters
    ----------
    n_nodes1 : int
      前の層のノード数
    n_nodes2 : int
      後の層のノード数
    initializer : 初期化方法のインスタンス
    optimizer : 最適化手法のインスタンス
    """
    def __init__(self, w, b , optimizer,stride, padding):
        self.optimizer = optimizer
        self.W = w
        self.B = b
        self.stride = stride
        self.padding = padding
    
    def forward(self, X):
        """
        フォワード
        Parameters
        ----------
        X : 次の形のndarray, shape (batch_size, n_nodes1)
            入力
        Returns
        ----------
        A : 次の形のndarray, shape (batch_size, n_nodes2)
            出力
        """
        self.A = X
        output_size, chanel_size, filter_size = self.W.shape
        feature_size = self.A.shape[1]

        a = np.zeros([output_size, feature_size - 2])
        for output in range(output_size):
            for j in range(filter_size - 1):
                sig = 0
                for chanel in range(chanel_size):
                    for i in range(filter_size):
                        sig += X[chanel, i+j] * self.W[output, chanel, j]
                a[output, j] = sig + b[output]
        return a

    
    def backward(self, dA):
        """
        バックワード
        Parameters
        ----------
        dA : 次の形のndarray, shape (batch_size, n_nodes2)
            後ろから流れてきた勾配
        Returns
        ----------
        dZ : 次の形のndarray, shape (batch_size, n_nodes1)
            前に流す勾配
        """
        self.n_out = N_OUT(self.stride, self.padding, self.W, self.A)
        self.LB = np.sum(dA, axis=1)
        
        output_size, chanel_size, filter_size = self.W.shape
        feature_size = self.A.shape[1]
        
        self.LW = np.zeros_like(self.W)
        for output in range(output_size):
            for chanel in range(chanel_size):
                for i in range(filter_size):
                    for j in range(filter_size -1):
                        self.LW[output, chanel, i] += dA[output, j]*self.A[chanel, j+i]

        dZ = np.zeros_like(self.A)
        for output in range(output_size):
            for chanel in range(chanel_size):
                for j in range(feature_size):
                    sigma=0
                    for s in range(filter_size):
                        if j - s < 0 or j - s > self.n_out -1:
                            pass
                        else:
                            sigma += dA[output,  j-s] * self.W[output, chanel, s]
                    dZ[chanel, j] += sigma

        self = self.optimizer.update(self)
        return dZ

In [9]:
cnn_2 = CNN2dim_FC(w, b, SGD(0.1), 1, 0)
print('a:',cnn_2.forward(x))

a: [[16. 22.]
 [17. 23.]
 [18. 24.]]


## 【問題8】学習と推定
これまで使ってきたニューラルネットワークの全結合層の一部をConv1dに置き換えてMNISTを学習・推定し、Accuracyを計算してください。

In [10]:
from keras.datasets import mnist
from sklearn import metrics
from sklearn.preprocessing import OneHotEncoder
(X_train, t_train), (X_test, t_test) = mnist.load_data()

Using TensorFlow backend.


In [11]:
X_train  = X_train.reshape(-1, 784)
X_test = X_test.reshape(-1, 784)
X_train = X_train.astype(np.float)
X_test = X_test.astype(np.float)
X_train /= 255
X_test /= 255
enc = OneHotEncoder(handle_unknown="ignore", sparse=False)
t_train_one_hot = enc.fit_transform(t_train[:, np.newaxis])
t_test_one_hot = enc.fit_transform(t_test[:,  np.newaxis])
X_train = X_train.reshape(60000, 1, 784)
X_test = X_test.reshape(-1, 1, 784)

In [12]:
w = np.ones((3, 1, 3))
b = np.array([1, 2, 3])

In [13]:
class CNN_mnist_FC:

    def __init__(self, w, b , optimizer,stride, padding):
        self.optimizer = optimizer
        self.W = w
        self.B = b
        self.stride = stride
        self.padding = padding

    def forward(self, X):
        """
        フォワード
        Parameters
        ----------
        X : 次の形のndarray, shape (batch_size, n_nodes1)
            入力
        Returns
        ----------
        A : 次の形のndarray, shape (batch_size, n_nodes2)
            出力
        """
        self.A = X
        output_size, chanel_size, filter_size = self.W.shape
        feature_size = self.A.shape[2]
        sample_size = self.A.shape[0]

        a = np.zeros([sample_size, output_size, feature_size-2])
        for samples in range(sample_size):
            for output in range(output_size):
                for j in range(filter_size - 1):
                    sig = 0
                    for chanel in range(chanel_size):
                        for i in range(filter_size):
                            sig += X[samples, chanel, i+j] * self.W[output, chanel, j]
                    a[samples, output, j] = sig + b[output]
        
        return a

    def backward(self, dA):
        """
        バックワード
        Parameters
        ----------
        dA : 次の形のndarray, shape (batch_size, n_nodes2)
            後ろから流れてきた勾配
        Returns
        ----------
        dZ : 次の形のndarray, shape (batch_size, n_nodes1)
            前に流す勾配
        """
        self.n_out = N_OUT(self.stride, self.padding, self.W, self.A)
        
        output_size, chanel_size, filter_size = self.W.shape
        feature_size = self.A.shape[2]
        sample_size = self.A.shape[0]

        self.LB = dA.sum(axis=0)
        self.LB = self.LB.sum(axis=1)

        self.LW = np.zeros_like(self.W)
        for samples in range(sample_size):
            for output in range(output_size):
                for chanel in range(chanel_size):
                    for i in range(filter_size):
                        for j in range(filter_size -1):
                            self.LW[output, chanel, i] += dA[samples, output, j]*self.A[samples, chanel, j+i]
                        
        dZ = np.zeros_like(self.A)
        for samples in range(sample_size):
            for output in range(output_size):
                for chanel in range(chanel_size):
                    for j in range(feature_size):
                        sigma=0
                        for s in range(filter_size):
                            if j - s < 0 or j - s > self.n_out -1:
                                pass
                            else:
                                sigma += dA[samples, output,  j-s] * self.W[output, chanel, s]
                        dZ[samples, chanel, j] += sigma

        self = self.optimizer.update(self)
        return dZ

In [14]:
class Relu:
    def forward(self, X):
        self.A = X
        return np.maximum(0, X)
    
    def backward(self, Z):
        return Z * np.maximum(np.sign(self.A), 0)

In [15]:
class FC:
    """
    ノード数n_nodes1からn_nodes2への全結合層
    Parameters
    ----------
    n_nodes1 : int
      前の層のノード数
    n_nodes2 : int
      後の層のノード数
    initializer : 初期化方法のインスタンス
    optimizer : 最適化手法のインスタンス
    """
    def __init__(self, n_nodes1, n_nodes2, initializer, optimizer):
        self.optimizer = optimizer
        init = initializer
        self.n_nodes1 = n_nodes1
        self.W = init.W(n_nodes1, n_nodes2)
        self.B = init.B(n_nodes2)
    
    def forward(self, X):
        self.z = X
        self.a = X@self.W + self.B
        return self.a
    
    def backward(self, dA):
        dZ = dA @ self.W.T
        self.LW = self.z.T @ dA
        self.LB = np.sum(dA, axis=0)
        
        self = self.optimizer.update(self)
        return dZ

In [16]:
class SimpleInitializer:
    """
    ガウス分布によるシンプルな初期化
    Parameters
    ----------
    sigma : float
      ガウス分布の標準偏差
    """
    def __init__(self, sigma):
        self.sigma = sigma
        
    def W(self, n_nodes1, n_nodes2):
        W = self.sigma * np.random.randn(n_nodes1, n_nodes2)
        return W
    
    def B(self, n_nodes2):
        B  = self.sigma * np.random.randn(n_nodes2)
        return B

In [17]:
class Softmax:
    def forward(self, A):
        exp_a = np.exp(A)
        softmax_result = np.empty((A.shape[0], A.shape[1]))
        exp_sum = np.sum(exp_a, axis=1)
        for i in range(A.shape[0]):
            softmax_result[i] = exp_a[i] / exp_sum[i]
        return softmax_result
    
    def backward(self, Z, Y):
        L_A = Z - Y
        self.cross_entropy = -np.average(np.sum(Y*np.log(Z), axis=1))
        return L_A

In [18]:
cnn_mnist = CNN_mnist_FC(w, b, SGD(0.1), 1, 0)

In [19]:
A = cnn_mnist.forward(X_train)
relu = Relu()
A_relu = relu.forward(A)
A_flat = A_relu.reshape(A_relu.shape[0], -1)
FC_1 = FC(2346, 10, SimpleInitializer(0.1), SGD(0.1))
A_FC_1 = FC_1.forward(A_flat)
softmax = Softmax()
A_soft = softmax.forward(A_FC_1)
A_delta = softmax.backward(A_soft, t_train_one_hot)
delta_Z = FC_1.backward(A_delta)
delta_Z_reshape = delta_Z.reshape(A_relu.shape)
delta_Z_relu = relu.backward(delta_Z_reshape)
dZ = cnn_mnist.backward(delta_Z_relu)

In [20]:
X_test = X_test.reshape(-1, 1, 784)
t_A = cnn_mnist.forward(X_test)
t_A = relu.forward(t_A)
t_A  = t_A.reshape(t_A.shape[0], -1)
t_A = FC_1.forward(t_A)
C = np.max(t_A, axis=1)
for i in range(t_A.shape[0]):
    t_A[i] = np.exp(t_A[i] - C[i])
t_A = softmax.forward(t_A)
y = np.argmax(t_A, axis=1)

In [21]:
print("Accuracy:{}".format(metrics.accuracy_score(t_test, y)))

Accuracy:0.0982
