[課題のURL](https://diver.diveintocode.jp/curriculums/1877)

# Sprint 深層学習スクラッチ 畳み込みニューラルネットワーク1

In [1]:
import numpy as np
import time

import matplotlib.pyplot as plt

from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score

* データセットをダウンロード

In [2]:
from keras.datasets import mnist

(X_train, y_train), (X_test, y_test) = mnist.load_data()

Using TensorFlow backend.


In [3]:
print(X_train.shape) # (60000, 28, 28)
print(X_test.shape) # (10000, 28, 28)
#print(X_train[0].dtype) # uint8
#print(X_train[0])

(60000, 28, 28)
(10000, 28, 28)


* 平滑化

In [4]:
# X_train = X_train.reshape(-1, 784)
# X_test = X_test.reshape(-1, 784)
# print(X_train.shape)
# print(y_train.shape)

* 前処理

画像は0から255のuint8型で表されますが、機械学習をする上では0から1のfloat型で扱うことになります。

In [4]:
X_train = X_train.astype(np.float)
X_test = X_test.astype(np.float)
X_train /= 255
X_test /= 255
print(X_train.max()) # 1.0
print(X_train.min()) # 0.0
print(X_train.shape)
print(X_test.shape)

1.0
0.0
(60000, 28, 28)
(10000, 28, 28)


In [6]:
# X_train[0] # 0から1の値

正解ラベルは0から9の整数ですが、ニューラルネットワークで多クラス分類を行う際には one-hot表現 に変換します。  
scikit-learnのOneHotEncoderを使用したコードが以下です。  
このone-hot表現による値はそのラベルである確率を示していることになるため、float型で扱います。  

In [5]:
from sklearn.preprocessing import OneHotEncoder
enc = OneHotEncoder(handle_unknown='ignore', sparse=False)
y_train_one_hot = enc.fit_transform(y_train[:, np.newaxis])
y_test_one_hot = enc.transform(y_test[:, np.newaxis])
print(y_train.shape) # (60000,)
print(y_train_one_hot.shape) # (60000, 10)
print(y_train_one_hot.dtype) # float64

(60000,)
(60000, 10)
float64


さらに、訓練データ6万枚の内2割を検証データとして分割してください。  
訓練データが48000枚、検証データが12000枚となります。  

In [6]:
X_train, X_val, y_train, y_val = train_test_split(X_train, y_train, test_size=0.2)
print(X_train.shape) # (48000, 784)
print(X_val.shape) # (12000, 784)

(48000, 28, 28)
(12000, 28, 28)


In [7]:
y_train_one_hot = enc.fit_transform(y_train[:, np.newaxis])
y_test_one_hot = enc.transform(y_test[:, np.newaxis])

In [8]:
print(y_train_one_hot[0])
print(y_train_one_hot[1])

[1. 0. 0. 0. 0. 0. 0. 0. 0. 0.]
[0. 0. 0. 0. 0. 1. 0. 0. 0. 0.]


# 1次元の畳み込みニューラルネットワークスクラッチ

今回はこれを任意の層数に拡張しやすいものに書き換えていきます。  
その上で、活性化関数や初期値、最適化手法について発展的なものを扱えるようにしていきます。  

* ミニバッチ処理

ニューラルネットワークではデータを分割して入力する 確率的勾配降下法 が一般的です。  
分割した際のひとかたまりを ミニバッチ 、そのサンプル数を バッチサイズ と呼びます。  

In [9]:
class GetMiniBatch:
    """
    ミニバッチを取得するイテレータ

    Parameters
    ----------
    X : 次の形のndarray, shape (n_samples, n_features)
      訓練データ
    y : 次の形のndarray, shape (n_samples, 1)
      正解値
    batch_size : int
      バッチサイズ
    seed : int
      NumPyの乱数のシード
    """
    def __init__(self, X, y, batch_size = 20, seed=0):
        self.batch_size = batch_size
        np.random.seed(seed)
        shuffle_index = np.random.permutation(np.arange(X.shape[0]))
        self._X = X[shuffle_index]
        self._y = y[shuffle_index]
        self._stop = np.ceil(X.shape[0]/self.batch_size).astype(np.int)
    def __len__(self):
        return self._stop
    def __getitem__(self,item):
        p0 = item*self.batch_size
        p1 = item*self.batch_size + self.batch_size
        return self._X[p0:p1], self._y[p0:p1]        
    def __iter__(self):
        self._counter = 0
        return self
    def __next__(self):
        if self._counter >= self._stop:
            raise StopIteration()
        p0 = self._counter*self.batch_size
        p1 = self._counter*self.batch_size + self.batch_size
        self._counter += 1
        return self._X[p0:p1], self._y[p0:p1]

In [12]:
get_mini_batch = GetMiniBatch(X_train, y_train, batch_size=20)
#print(len(get_mini_batch)) # 2400
#print(get_mini_batch[5]) # 5番目のミニバッチが取得できる
for mini_X_train, mini_y_train in get_mini_batch:
    # このfor文内でミニバッチが使える
    pass

In [13]:
X_train.shape

(48000, 28, 28)

In [25]:
def im2col(input_data, filter_h, filter_w, stride=1, pad=0):
    """
    Parameters
    ----------
    input_data : (データ数, チャンネル, 高さ, 幅)の4次元配列からなる入力データ
    filter_h : フィルターの高さ
    filter_w : フィルターの幅
    stride : ストライド
    pad : パディング
    Returns
    -------
    col : 2次元配列
    """
    N, C, H, W = input_data.shape
    out_h = (H + 2*pad - filter_h)//stride + 1
    out_w = (W + 2*pad - filter_w)//stride + 1

    img = np.pad(input_data, [(0,0), (0,0), (pad, pad), (pad, pad)], 'constant')
    col = np.zeros((N, C, filter_h, filter_w, out_h, out_w))

    for y in range(filter_h):
        y_max = y + stride*out_h
        for x in range(filter_w):
            x_max = x + stride*out_w
            col[:, :, y, x, :, :] = img[:, :, y:y_max:stride, x:x_max:stride]

    col = col.transpose(0, 4, 5, 1, 2, 3).reshape(N*out_h*out_w, -1)
    return col

In [26]:
def col2im(col, input_shape, filter_h, filter_w, stride=1, pad=0):
    """
    Parameters
    ----------
    col :
    input_shape : 入力データの形状（例：(10, 1, 28, 28)）
    filter_h :
    filter_w
    stride
    pad
    Returns
    -------
    """
    N, C, H, W = input_shape
    out_h = (H + 2*pad - filter_h)//stride + 1
    out_w = (W + 2*pad - filter_w)//stride + 1
    col = col.reshape(N, out_h, out_w, C, filter_h, filter_w).transpose(0, 3, 4, 5, 1, 2)

    img = np.zeros((N, C, H + 2*pad + stride - 1, W + 2*pad + stride - 1))
    for y in range(filter_h):
        y_max = y + stride*out_h
        for x in range(filter_w):
            x_max = x + stride*out_w
            img[:, :, y:y_max:stride, x:x_max:stride] += col[:, :, y, x, :, :]

    return img[:, :, pad:H + pad, pad:W + pad]

In [27]:
def im2col_slow(input_data, filter_h, filter_w, stride=1, pad=0):
    N, C, H, W = input_data.shape
    out_h = (H + 2*pad - filter_h)//stride + 1
    out_w = (W + 2*pad - filter_w)//stride + 1

    img = np.pad(input_data, [(0,0), (0,0), (pad, pad), (pad, pad)], 'constant')
    col = np.zeros((N, C, filter_h, filter_w, out_h, out_w))

    for move_y in range(out_h):
        for move_x in range(out_w):
            for y in range(filter_h):
                for x in range(filter_w):
                    col[:, :, y, x, move_y, move_x] = \
                        img[:, :, y + stride * move_y, x + stride * move_x]

    col = col.transpose(0, 4, 5, 1, 2, 3).reshape(N*out_h*out_w, -1)
    return col

# 【問題1】チャンネル数を1に限定した1次元畳み込み層クラスの作成

チャンネル数を1に限定した1次元畳み込み層のクラスSimpleConv1dを作成してください

In [10]:
class SimpleConv1d:
    def __init__(self, W, b, stride=1, pad=0, initializer="test", optimizer="test"):
        self.optimizer = optimizer
        
        self.W = W
        self.b = b
        self.x = None
        self.dw = None
        self.db = None
        
        self.col = None
        
        self.stride = stride
        self.pad = pad
    
    def calc_N_out(self, N_in, P, F, S):
    
        N_out = (N_in + 2*P - F)/S + 1

        return int(N_out)
    
    def forward(self, x):
        N_in = x.shape[0] # 入力サイズ
        P = self.pad # ある方向へのパディング数
        F = self.W.shape[0] # フィルタのサイズ
        S = self.stride # ストライドのサイズ
        
        self.x = x
        
        N_out = self.calc_N_out(N_in, P, F, S)

        # xを計算しやすようにcolに変換
        col = np.zeros([w.shape[0], N_out])
        for i in range(w.shape[0]):
            i_max = i + N_out
            col[i, :] = x[i:i_max]

        self.col = col # backwardで使用
        out = np.dot(col.T, self.W) + self.b
            
        return out
    
    def backward(self, dout):
        N_out = dout.shape[0]
        n_w = self.W.shape[0]
        n_x = self.x.shape[0]
        
        dw = np.zeros(n_w)
        #for s in range(n_w): # Wのサイズ
            #for i in range(N_out):
                #dw[s] = dw[s] + self.x[i+s]*dout[i] 
                # print(s,i, self.x[i+s], dout[i], self.x[i+s]*dout[i])

        dw = np.dot(self.col, dout)    
        db = np.sum(dout, axis=0)

#        self = self.optimizer.update(self, dw, db)
        
        # dx(x)分
        dx = np.zeros(n_x)
        #for j in range(n_x): # データサイズ
            #for s in range(n_w): # Wのサイズ
                #if (j-s)>=0 and (j-s)<=(N_out-1):
                    #print(j, s, dout[j-s], self.W[s], dout[j-s]*self.W[s])
                    #dx[j] = dx[j] + dout[j-s]*self.W[s]

        # dxを計算しやすいようにdcolに変換
        dcol = np.zeros([w.shape[0], N_out])
        for i in range(w.shape[0]):
            dcol[i,:] = dout * self.W[i]
        
        dx = np.zeros(x.shape[0])
        for i in range(w.shape[0]):
            i_max = i + N_out
            dx[i:i_max] = dx[i:i_max] + dcol[i,:]
                    
        return db, dw, dx

# 【問題2】1次元畳み込み後の出力サイズの計算

~~~
def calc_N_out(self, N_in, P, F, S):
    
        N_out = (N_in + 2*P - F)/S + 1

        return int(N_out)
~~~

# 【問題3】小さな配列での1次元畳み込み層の実験

In [11]:
x = np.array([1,2,3,4])
w = np.array([3, 5, 7])
b = np.array([1])

class_test = SimpleConv1d(w, b)
out = class_test.forward(x)
print(out)

delta_a = np.array([10, 20])

db, dw, dx = class_test.backward(delta_a)
print(db, dw, dx)

[35. 50.]
30 [ 50.  80. 110.] [ 30. 110. 170. 140.]


In [12]:
# 期待結果
#delta_b = np.array([30])
#delta_w = np.array([50, 80, 110])
#delta_x = np.array([30, 110, 170, 140])

# 【問題4】チャンネル数を限定しない1次元畳み込み層クラスの作成

チャンネル数を1に限定しない1次元畳み込み層のクラスConv1dを作成してください。

In [123]:
class Conv1d:
        def __init__(self, w, b, stride=1, pad=0):
            self.W = w
            self.B = b
            self.stride = stride
            self.pad = pad

            # 中間データ（backward時に使用）
            self.x = None   
            self.col = None
            self.col_W = None

            # 重み・バイアスパラメータの勾配
            self.dW = None
            self.db = None

        def _N_out_h(self, H, FH):
            # H:入力データの高さ、FH:フィルタの高さ
            return (1 + int((H + 2*self.pad - FH) / self.stride))

        def _N_out_w(self, W, FW):
            # W:入力データの高さ、FW:フィルタの高さ
            return (1 + int((W + 2*self.pad - FW) / self.stride))

        def forward(self, x):
            # xのIFを4次元に変更する
            # x = x.reshape(x.shape[0], 1, x.shape[1], x.shape[2])

            N, C, H, W = x.shape # N:データ数, C:チャネル数、H:データの高さ、W:データの幅
            FN, C, FH, FW = self.W.shape # FN:フィルタ数、C:チャネル数、FH:フィルタの高さ、FW:フィルタの幅
            
            # 一つあたりの出力データのサイズ(畳み込み後のサイズ)を求める
            out_h = self._N_out_h(H, FH)
            out_w = self._N_out_w(W, FW)
            #print("出力データのサイズ")
            #print(out_h, out_w)

            # 演算を行うため2次元のデータに変換する
            col = im2col(x, FH, FW, self.stride, self.pad)
            #print(col)
            
            col_W = self.W.reshape(FN, -1).T
            #print(col_W)
            
            # 畳み込み処理
            out = np.dot(col, col_W) + self.B
            #print(out.shape)
            #print(out)
    
            # 畳み込み処理の結果を任意の次元に変換 C:チャンネル1の場合
            #out = out.reshape(N, out_h, out_w, -1).transpose(0, 3, 1, 2)
            #out = out.transpose(1, 0)
            _out = np.zeros((FN, out_w))
            for ch in range(FN):
                _out[ch] = out[ch][:, 0]
            
            # backward処理のためにデータ保存
            self.x = x
            self.col = col
            self.col_W = col_W

            #print("Nanの確認", np.sum(x), np.sum(col), np.sum(col_W))
            
            return _out

        def backward(self, dout):
            FN, C, FH, FW = self.W.shape # FN:フィルタ数、C:チャネル数、FH:フィルタの高さ、FW:フィルタの幅

            # C:チャンネル1の場合
            dout = dout.transpose(0,2,3,1).reshape(-1, FN)
            #dout =  out.transpose(1, 0)

            self.db = np.sum(dout, axis=0)
            self.dW = np.dot(self.col.T, dout)
            self.dW = self.dW.transpose(1, 0).reshape(FN, C, FH, FW)

            dcol = np.dot(dout, self.col_W.T)
            dx = col2im(dcol, self.x.shape, FH, FW, self.stride, self.pad)

            # パラメータを更新
            # print("self.dW, self.dbの確認:更新前", np.sum(self.dW), np.sum(self.db))
            self = self.optimizer.update(self, self.dW, self.db)
            # print("self.dW, self.dbの確認:更新後", np.sum(self.dW), np.sum(self.db))
    
            return dx

In [126]:
x = np.array([[1, 2, 3, 4], [2, 3, 4, 5]]) # shape(2, 4)で、（入力チャンネル数、特徴量数）である。
w = np.ones((3, 2, 3)) # 例の簡略化のため全て1とする。(出力チャンネル数、入力チャンネル数、フィルタサイズ)である。
b = np.array([1, 2, 3]) # （出力チャンネル数）

x_4d = x.reshape(1, 2, 1, 4) # (データ数、チャンネル、h, w)
w_4d = w.reshape(3, 2, 1, 3) # (フィルタ数、チャンネル、h, w)
b_4d = b.reshape(3, 1, 1) # (フィルタ数、1, 1)

class_test = Conv1d(w_4d, b_4d)
out = class_test.forward(x_4d)
print(out.shape)
print(out)

(3, 2)
[[16. 22.]
 [17. 23.]
 [18. 24.]]


In [125]:
# 期待結果
a = np.array([[16, 22], [17, 23], [18, 24]]) # shape(3, 2)で、（出力チャンネル数、特徴量数）である。

# 【問題8】学習と推定

これまで使ってきたニューラルネットワークの全結合層の一部をConv1dに置き換えてMNISTを学習・推定し、Accuracyを計算してください。

出力層だけは全結合層をそのまま使ってください。ただし、チャンネルが複数ある状態では全結合層への入力は行えません。その段階でのチャンネルは1になるようにするか、 平滑化 を行なってください。

画像に対しての1次元畳み込みは実用上は行わないことのため、精度は問いません。

In [127]:
class ScratchDeepNeuralNetrowkClassifier():
    """
    N層ニューラルネットワーク分類器

    Parameters
    ----------

    Attributes
    ----------
    """
    def __init__(self, epoch = 5, alpha = 0.001, activation='sigmoid', optimizer="SGD", init="simple" ,verbose = True):
        self.verbose = verbose
        
        self.epoch = epoch
        self.batch_size = 20 # バッチサイズ
        
        self.n_features = 784 # 特徴量の数
        self.n_nodes1 = 400 # 1層目のノード数
        self.n_nodes2 = 100
        self.n_output = 10 # 出力のクラス数（3層目のノード数）
        
        self.init_type = init # 初期化方法
        self.activation_type = activation # 活性化関数
        self.optimizer_type = optimizer # 最適化方法
        
        self.sigma = 0.01 # ガウス分布の標準偏差
        self.lr = alpha

    def _encode_10(self, data):
        t = np.zeros((data.size, 10))
        for i in range(data.size):
            t[i, data[i]] = 1
        return t

    def _calc_loss(self, y, z):
        # エントロピー誤差を求める
        # 0割回避のため(z=0を防ぐため)
        delta = 1e-7
        n = y.shape[0]

        l = -1/n*np.sum(y*np.log(z+delta))

        return l
    
    class sigmoid:
        def __init__(self):
            self.out = None
        
        def _sigmoid(self, x):
            sigmoid_range = 34.538776394910684
            x = np.clip(x, -sigmoid_range, sigmoid_range)
            return 1.0/(1.0+np.exp(-x))
        
        def forward(self, x):
            y = self._sigmoid(x)
            self.out = y
            
            return y

        def backward(self, dz):
            dA = dz*(1 - self.out)*(self.out)
            
            return dA
    
    class ReLU:
        def __init__(self):
            self.mask = None

        def forward(self, x):
            self.mask = (x <= 0)
            x_out = x.copy()
            x_out[self.mask] = 0

            return x_out

        def backward(self, x):
            x[self.mask] = 0
            dx = x

            return dx
    
    class activation_softmax:
        def __init__(self):
            self.y = None
            self.t = None # one-hot
            
        def _softmax(self, x):
            n = x.shape[0]

            sigmoid_range = 34.538776394910684
            x = np.clip(x, -sigmoid_range, sigmoid_range)
            
            for i in range(n):
                exp_x = np.exp(x[i])
                sum_exp_x = np.sum(exp_x)
                x[i] = exp_x/sum_exp_x

            return x
        
        def forward(self, x):
            self.y = self._softmax(x)
            
            return self.y

        def backward(self, z, t):
            d = z - t
            
            return d
    
    class SimpleInitializer:
        def __init__(self, sigma):
            self.sigma = sigma
        def W(self, n_nodes1, n_nodes2):
            np.random.seed(1)
            W = self.sigma * np.random.randn(n_nodes1, n_nodes2)
            return W
        
        def B(self, n_nodes1, n_nodes2):
            # n_nodes2は使用しないが、他の初期化関数とIFを合わせる
            np.random.seed(1)
            b = self.sigma * np.random.randn(n_nodes2)
            return b

    class XavierInitializer:
        def __init_(self):
        # 何もしない
            pass
    
        def W(self, n_nodes1, n_nodes2):
            np.random.seed(1)
            W = (1/np.sqrt(n_nodes1)) * np.random.randn(n_nodes1, n_nodes2)
            return W
        
        def B(self, n_nodes1, n_nodes2):
            np.random.seed(1)
            b = (1/np.sqrt(n_nodes1)) * np.random.randn(n_nodes2)
            return b
    
    class HeInitializer:
        def __init_(self):
            # 何もしない
            pass

        def W(self, n_nodes1, n_nodes2):
            np.random.seed(1)
            W = np.sqrt(2/n_nodes1) * np.random.randn(n_nodes1, n_nodes2)
            return W

        def B(self, n_nodes1, n_nodes2):
            np.random.seed(1)
            b = np.sqrt(2/n_nodes1) * np.random.randn(n_nodes2)
            return b
    
    class SGD:
        def __init__(self, lr=0.001):
            self.lr = lr
        def update(self, layer, dw, db):
            layer.W = layer.W - self.lr*dw
            layer.B = layer.B - self.lr*db
            
            return layer
    
    class AdaGrad:
        def __init__(self, lr=0.001):
            self.lr = lr
            self.hw = None
            self.hb = None

        def update(self, layer, dw, db):
            if (self.hw is None) or (self.hb is None):
                self.hw = np.zeros_like(dw)
                self.hb = np.zeros_like(db)

            self.hw = self.hw + (dw*dw)
            layer.W = layer.W - self.lr*dw/(np.sqrt(self.hw) + 1e-7)

            self.hb = self.hb + (db*db)
            layer.B = layer.B - self.lr*db/(np.sqrt(self.hb) + 1e-7)
         
            return layer
    
    class FC:
        def __init__(self, n_nodes1, n_nodes2, initializer, optimizer):
            self.optimizer = optimizer
            
            self.W = initializer.W(n_nodes1, n_nodes2)
            self.B = initializer.B(n_nodes1, n_nodes2)
            self.x = None
            self.dw = None
            self.db = None
            
        def forward(self, x):
            self.x = x
            return (np.dot(x, self.W) + self.B )

        def backward(self, dout):
            dw = np.dot((self.x).T, dout)
            db = np.sum(dout, axis=0)
            
            # パラメータを更新
            self = self.optimizer.update(self, dw, db)
            
            # 前の層に渡す
            dZ = np.dot(dout, (self.W).T)
            
            return dZ
        
    class Convolution:
        def __init__(self, w, b, optimizer, stride=1, pad=0):
            self.optimizer = optimizer
            
            self.W = w
            self.B = b
            self.stride = stride
            self.pad = pad

            # 中間データ（backward時に使用）
            self.x = None   
            self.col = None
            self.col_W = None

            # 重み・バイアスパラメータの勾配
            self.dW = None
            self.db = None

        def _N_out_h(self, H, FH):
            # H:入力データの高さ、FH:フィルタの高さ
            return (1 + int((H + 2*self.pad - FH) / self.stride))

        def _N_out_w(self, W, FW):
            # W:入力データの高さ、FW:フィルタの高さ
            return (1 + int((W + 2*self.pad - FW) / self.stride))

        def forward(self, x):
            # xのIFを4次元に変更する
            # x = x.reshape(x.shape[0], 1, x.shape[1], x.shape[2])

            N, C, H, W = x.shape # N:データ数, C:チャネル数、H:データの高さ、W:データの幅
            FN, C, FH, FW = self.W.shape # FN:フィルタ数、C:チャネル数、FH:フィルタの高さ、FW:フィルタの幅
            
            # 一つあたりの出力データのサイズ(畳み込み後のサイズ)を求める
            out_h = self._N_out_h(H, FH)
            out_w = self._N_out_w(W, FW)
            #print("出力データのサイズ")
            #print(out_h, out_w)

            # 演算を行うため2次元のデータに変換する
            col = im2col(x, FH, FW, self.stride, self.pad)
            col_W = self.W.reshape(FN, -1).T

            # 畳み込み処理
            out = np.dot(col, col_W) + self.B

            # 畳み込み処理の結果を任意の次元に変換 C:チャンネル1の場合
            out = out.reshape(N, out_h, out_w, -1).transpose(0, 3, 1, 2)
            # out = out.transpose(1, 0) 

            # backward処理のためにデータ保存
            self.x = x
            self.col = col
            self.col_W = col_W

            #print("Nanの確認", np.sum(x), np.sum(col), np.sum(col_W))
            
            return out

        def backward(self, dout):
            FN, C, FH, FW = self.W.shape # FN:フィルタ数、C:チャネル数、FH:フィルタの高さ、FW:フィルタの幅

            # C:チャンネル1の場合
            dout = dout.transpose(0,2,3,1).reshape(-1, FN)
            #dout =  out.transpose(1, 0)

            self.db = np.sum(dout, axis=0)
            self.dW = np.dot(self.col.T, dout)
            self.dW = self.dW.transpose(1, 0).reshape(FN, C, FH, FW)

            dcol = np.dot(dout, self.col_W.T)
            dx = col2im(dcol, self.x.shape, FH, FW, self.stride, self.pad)

            # パラメータを更新
            # print("self.dW, self.dbの確認:更新前", np.sum(self.dW), np.sum(self.db))
            self = self.optimizer.update(self, self.dW, self.db)
            # print("self.dW, self.dbの確認:更新後", np.sum(self.dW), np.sum(self.db))
    
            return dx
    class Pooling:
        def __init__(self, pool_h, pool_w, stride=1, pad=0):
            self.pool_h = pool_h
            self.pool_w = pool_w
            self.stride = stride
            self.pad = pad

            self.x = None
            self.arg_max = None

        def forward(self, x):
            # xのIFを4次元に変更する
            # x = x.reshape(x.shape[0], 1, x.shape[1], x.shape[2])
            
            N, C, H, W = x.shape
            out_h = int(1 + (H - self.pool_h) / self.stride)
            out_w = int(1 + (W - self.pool_w) / self.stride)

            col = im2col(x, self.pool_h, self.pool_w, self.stride, self.pad)
            col = col.reshape(-1, self.pool_h*self.pool_w)

            arg_max = np.argmax(col, axis=1)
            out = np.max(col, axis=1)
            out = out.reshape(N, out_h, out_w, C).transpose(0, 3, 1, 2)

            self.x = x
            self.arg_max = arg_max
            
            return out

        def backward(self, dout):
            dout = dout.transpose(0, 2, 3, 1)

            pool_size = self.pool_h * self.pool_w
            dmax = np.zeros((dout.size, pool_size))
            dmax[np.arange(self.arg_max.size), self.arg_max.flatten()] = dout.flatten()
            dmax = dmax.reshape(dout.shape + (pool_size,)) 

            dcol = dmax.reshape(dmax.shape[0] * dmax.shape[1] * dmax.shape[2], -1)
            dx = col2im(dcol, self.x.shape, self.pool_h, self.pool_w, self.stride, self.pad)

            return dx
        
    def fit(self, X, y, X_val=None, y_val=None):
        # 検証データ(X_val, y_val)の有無を確認
        flag_val = True
        if (X_val is None) and (y_val is None):
            # 検証データが入力されなかった場合、検証データの保存と学習過程を表示しない
            flag_val = False

        # self.sigma : ガウス分布の標準偏差
        # self.lr : 学習率
        # self.n_nodes1 : 1層目のノード数
        # self.n_nodes2 : 2層目のノード数
        # self.n_output : 出力層のノード数
        sigma = self.sigma
        alpha = self.lr
        epoch = self.epoch
        batch_size = self.batch_size
        
        # 初期化
        score = np.zeros(epoch)
        accuracy = np.zeros(epoch)
        score_val = np.zeros(epoch)
        accuracy_val = np.zeros(epoch)
        
        # ノード設定
        # [Conv - ReLU - Pool]:4320 - 400:[afine - ReLU]:100 - 100:[afine - ReLU]:10 
        filter_num = 30
        filter_size = 5
        filter_pad = 0
        filter_stride = 1
        hidden_size = 100
        input_size = 28
        output_size = 10
        weight_init_std = sigma
        
        conv_output_size = (input_size - filter_size + 2*filter_pad) / filter_stride + 1
        pool_output_size = int(filter_num * (conv_output_size/2) * (conv_output_size/2))
        
        # 畳み込み層　出力チャンネル数30、フィルタサイズ5×5、ストライド1
        np.random.seed(1)
        w_C1 = weight_init_std*np.random.randn(filter_num, 1, filter_size, filter_size)
        b_C1 = np.zeros(filter_num)
        
        self.C1 = self.Convolution(w_C1, b_C1, self.AdaGrad(alpha), stride=1, pad=0)
        self.activation1 = self.ReLU()
        self.P1 = self.Pooling(pool_h=2, pool_w=2, stride=2, pad=0)
        
        """
        # ノード設定
        #  # 784:[afine - ReLU]:400 - 400:[afine - ReLU]:100 - 100:[afine - ReLU]:10
        self.FC1 = self.FC(self.n_features, self.n_nodes1, self.SimpleInitializer(sigma), self.SGD(alpha))
        self.activation1 = self.ReLU()
        """
        self.FC2 = self.FC(pool_output_size, self.n_nodes2, self.HeInitializer(), self.AdaGrad(alpha))
        self.activation2 = self.ReLU()
        
        self.FC3 = self.FC(self.n_nodes2, self.n_output, self.HeInitializer(), self.AdaGrad(alpha))
        self.activation3 = self.activation_softmax()
        
        for count_epoch in range(epoch):
            get_mini_batch = GetMiniBatch(X, y, batch_size, seed=count_epoch)
            
            # @@@debug
            start = time.time()
            count = 0
            
            # 1 epoch
            for mini_X_train, mini_y_train in get_mini_batch:
                count = count + 1
                
                mini_y_train_one_hot = self._encode_10(mini_y_train)
                
                # IFを合わせるため、chを追加
                mini_X_train = mini_X_train.reshape(mini_X_train.shape[0], 1, mini_X_train.shape[1], mini_X_train.shape[2])
                
                # forward
                C1 = self.C1.forward(mini_X_train) # 畳み込み層　出力チャンネル数6、フィルタサイズ5×5、ストライド1
                C1_activation = self.activation1.forward(C1) # ReLu
                P1 = self.P1.forward(C1_activation) # 最大プーリング
                
                # 4dを次の層に渡すため1次元に変換
                P1_flatten = P1.reshape(-1, P1.shape[1]*P1.shape[2]*P1.shape[3])
                
                """
                A1 = self.FC1.forward(P1_flatten) 
                Z1 = self.activation1.forward(A1)
                """

                A2 = self.FC2.forward(P1_flatten)                
                Z2 = self.activation2.forward(A2)
                A3 = self.FC3.forward(Z2)
                Z3 = self.activation3.forward(A3)
                
                # back
                dA3 = self.activation3.backward(Z3, mini_y_train_one_hot)
                dZ2 = self.FC3.backward(dA3)
                dA2 = self.activation2.backward(dZ2)
                dZ1 = self.FC2.backward(dA2)

                # 平滑化したデータを4dに変換
                dZ1_4d = dZ1.reshape(batch_size, P1.shape[1], P1.shape[2], P1.shape[3])
                
                dP1 = self.P1.backward(dZ1_4d)
                dA1 = self.activation1.backward(dP1)
                dZ0 = self.C1.backward(dA1)
                
                """
                dA1 = self.activation1.backward(dZ1)
                dZ0 = self.FC1.backward(dA1) # dZ0は使用しないがw,bの更新で利用
                """
                
                # 1バッチあたりの損失
                portion = Z3
                label = np.argmax(portion, axis = 1)
            
                _d_score = self._calc_loss(self._encode_10(mini_y_train), portion)
                _d_accuracy = accuracy_score(label, mini_y_train)
            
                if count % 1000 == 0:
                    #print("epoch:", count_epoch+1, "count:", count, _d_score, _d_accuracy)
                    #break
                    pass

            # エポックごとの損失
            y_pred, y_pred_portion = self.predict(X[0:10000])
            
            # 交差エントロピー誤差(1 epoch)
            score[count_epoch] = self._calc_loss(self._encode_10(y[0:10000]), y_pred_portion)
            
            # 正解率
            accuracy[count_epoch] = accuracy_score(y_pred, y[0:10000])
            
            if flag_val == True:
                y_pred_val, y_pred_val_portion = self.predict(X_val[0:10000])
                score_val[count_epoch] = self._calc_loss(self._encode_10(y_val[0:10000]), y_pred_val_portion)
                accuracy_val[count_epoch] = accuracy_score(y_pred_val, y_val[0:10000])

            if self.verbose:
                #verboseをTrueにした際は学習過程などを出力する
                # @@@debug
                elapsed_time = time.time() - start
                print ("elapsed_time:{0}".format(elapsed_time) + "[sec]")
                
                if flag_val == True:
                    print("epoch:",(count_epoch+1),"accuracy:",accuracy[count_epoch], "loss:", score[count_epoch])
                    print("epoch:",(count_epoch+1),"accuracy_val:",accuracy_val[count_epoch], "loss_val:", score_val[count_epoch])
                else:
                    print("epoch:",(count_epoch+1),"accuracy:",accuracy[count_epoch], "loss:", score[count_epoch])
        
            # 学習データの保存
            self._score = score
            self._accuracy = accuracy
            self._score_val = score_val
            self._accuracy_val = accuracy_val
        
        return

    def predict(self, X):
        
        X = X.reshape(X.shape[0], 1, X.shape[1], X.shape[2])
        
        C1 = self.C1.forward(X) # 畳み込み層　出力チャンネル数6、フィルタサイズ5×5、ストライド1
        C1_activation = self.activation1.forward(C1) # ReLu
        P1 = self.P1.forward(C1_activation) # 最大プーリング
        
        # 4dを次の層に渡すため1次元に変換
        P1_flatten = P1.reshape(-1, P1.shape[1]*P1.shape[2]*P1.shape[3])        
        
        """
        A1 = self.FC1.forward(X) 
        Z1 = self.activation1.forward(A1)
        """
        
        A2 = self.FC2.forward(P1_flatten)                
        Z2 = self.activation2.forward(A2)
        A3 = self.FC3.forward(Z2)
        Z3 = self.activation3.forward(A3)
        
        portion = Z3
        label = np.argmax(portion, axis = 1)
        
        return label, portion

In [128]:
%%time
clf_test = ScratchDeepNeuralNetrowkClassifier(epoch = 1, alpha = 0.005, activation='ReLU', optimizer="AdaGrad", init="HeInitializer", verbose = True)
clf_test.fit(X_train, y_train, X_val, y_val)

elapsed_time:630.9281468391418[sec]
epoch: 1 accuracy: 0.9161 loss: 0.2875303765609803
epoch: 1 accuracy_val: 0.9116 loss_val: 0.2897560947954917
CPU times: user 4min 13s, sys: 2min 50s, total: 7min 3s
Wall time: 10min 32s


In [129]:
y_pred, y_pred_portion = clf_test.predict(X_test)
print("Accuracy（正解率）", accuracy_score(y_test, y_pred))

Accuracy（正解率） 0.9193
