<a href="https://colab.research.google.com/github/T-Sawao/diveintocode-ml/blob/master/term2_sprint11.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

## 2.1次元の畳み込みニューラルネットワークスクラッチ

畳み込みニューラルネットワーク（CNN） のクラスをスクラッチで作成していきます。NumPyなど最低限のライブラリのみを使いアルゴリズムを実装していきます。


このSprintでは1次元の 畳み込み層 を作成し、畳み込みの基礎を理解することを目指します。次のSprintでは2次元畳み込み層とプーリング層を作成することで、一般的に画像に対して利用されるCNNを完成させます。


クラスの名前はScratch1dCNNClassifierとしてください。クラスの構造などは前のSprintで作成したScratchDeepNeuralNetrowkClassifierを参考にしてください。


**1次元畳み込み層とは**  
CNNでは画像に対しての2次元畳み込み層が定番ですが、ここでは理解しやすくするためにまずは1次元畳み込み層を実装します。1次元畳み込みは実用上は自然言語や波形データなどの 系列データ で使われることが多いです。


畳み込みは任意の次元に対して考えることができ、立体データに対しての3次元畳み込みまではフレームワークで一般的に用意されています。


**データセットの用意** 
検証には引き続きMNISTデータセットを使用します。1次元畳み込みでは全結合のニューラルネットワークと同様に平滑化されたものを入力します。

In [None]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as setattr
from sklearn.metrics import accuracy_score

In [None]:
x = np.array([1,2,3,4])
y = np.array([45, 70])
w = np.array([3, 5, 7])
b = np.array([1])

F = w.shape[0]
P = 0
S = 1

### 1.1.1（解答）フォワードプロパゲーション

In [None]:
y_hat = np.zeros(F-1)
for i in range(F-1):
  count = 0
  for s in range(w.shape[0]):
    count += x[i+s] * w[s]
  y_hat[i] = count + b
y_hat


1.1.2（別解)

In [None]:
y_hat1 = np.zeros(F-1)
for m in range(F-1):
  y_hat1[m] = np.sum((x[m:m+F])*w)+b
print(y_hat1)

### 1.2.1（解答）更新式

In [None]:
loss = y - y_hat
loss

### 1.3.1（解答）バックプロパゲーション

In [None]:
db = np.sum(loss)
db

In [None]:
db = sum(loss)
print("db", db)

dw = np.zeros(w.shape[0])
for k in range(2):
  dw += loss[k] * x[k:k+F]
print("dw", dw)

In [None]:
dx = np.sum([np.r_[0, loss[1]*w], np.r_[loss[0]*w, 0]], axis=0)
dx

### 2.1.1（解答）

In [None]:
Nout = ((len(x) + 2*P - F)/S)+1
Nout

## 畳み込み層

In [None]:
class SimpleConv1d:
    def __init__(self, n_nodes1, n_nodes2, initializer, optimizer):
        self.optimizer = optimizer
        self.n_nodes1 = n_nodes1
        self.n_nodes2 = n_nodes2
        # self.W = initializer.W(self.n_nodes1, self.n_nodes2)
        # self.B = initializer.B(self.n_nodes2)
        self.w = np.array([3, 5, 7])
        self.b = np.array([1])

        self.P = 0
        self.S = 1
        self.F = len(self.w)

        self.dw = np.array([])
        self.dx = np.array([])

# 問題1--------------------------------------------------------------
    # フォワードプロパゲーション時の処理
    def forward(self, x):
      for m in range(self.F-1):
        y_hat1[m] = np.sum((x[m:m+self.F])*self.w)+self.b
      return y_hat1

    # バックプロパゲーション時の処理
    def backward(self, X, da):
      db = np.sum(da)
      dw = np.zeros(self.w.shape[0])
      for i in range(int(self._output_size(X))):
        dw += da[i] * x[i:i+self.F]

      dx = np.sum([np.r_[0, da[1]*self.w], np.r_[da[0]*self.w, 0]], axis=0)
      return db, dw, dx

# 問題2--------------------------------------------------------------
    def _output_size(self, X):
      self.Nout = int((len(X)) + (2*self.P) - self.F) / self.S + 1
      return self.Nout

In [None]:
test = SimpleConv1d(1,2,3,4)
test.forward(x)

In [None]:
test._output_size(x)

In [None]:
db, dw, dx = test.backward(x, loss)
print("db", db, "dw", dw, "dx",dx)

### 4.1.1（解答）

In [None]:
class SimpleConv1_1d:
    def __init__(self, n_nodes1, n_nodes2, initializer, optimizer):
        self.optimizer = optimizer
        self.n_nodes1 = n_nodes1
        self.n_nodes2 = n_nodes2
        # self.W = initializer.W(self.n_nodes1, self.n_nodes2)
        # self.B = initializer.B(self.n_nodes2)

        # diver確認用サンプルデータ
        self.W = np.array([[[1,1,1],[1,1,1]],[[1,1,1],[2,1,1]],[[2,1,1],[1,1,2]]])
        self.b =np.array([3, 2, 1]) 

        self.P = 0           # パディング(今回未使用)
        self.S = 1           # ストライド値
        self.F = int(self.W.shape[2])  # フィルターサイズ

        self.dw = np.array([])
        self.dx = np.array([])

# 問題1--------------------------------------------------------------
    # フォワードプロパゲーション時の処理
    def forward(self, x):
      self.X = x
      self.Nout = self._output_size(self.X)

      # 出力値を記載する枠を作成
      a = np.zeros([self.W.shape[0], self.Nout])
      for j in range(self.F):
        for i in range(self.Nout):
          # 1.フィルターを移動させ、w重みを掛け合わす。
          # 2.上記作業をチャンネル回数繰り返す。
          a[j,i] = np.sum(self.X[:, i:i+self.F]*self.W[j])+self.b[j]
          #                       -----------
          #                       列の指定（フィルター)[i]から[i+F]までの値を返す。
      return a

      # 更新
      self = self.optimizer.updatbe(self)
      return self

    # バックプロパゲーション時の処理
    def backward(self, loss):
      db = np.sum(loss)
      dw = np.zeros(self.W.shape)
      dx = np.zeros(self.X.shape)
      for j in range(self.W.shape[0]):
        for i in range(self.W.shape[1]):

          # dwの式　W.shape(2,3)を１つとして、1つ目のfor分で行軸をスライドさせ、２つ目のfor分でチャネルを更新。
          dw[j] += loss[j,i] * self.X[:,i:i+3]

        # dxの式 W.shape(1,3)を１つとして、loss[0,0]*W[0]+loss[0,1]*W[0]と、
        # loss[0,0]*W[1]+loss[0,1]*W[1]でfor文でチャネルを更新して、各要素ごとに足して合計を返す。
        dx += np.r_[[np.r_[np.sum([loss[i,0]*self.W[i,0,:]], axis=0), 0] + np.r_[0,np.sum([loss[i,1]*self.W[i,0,:]], axis=0)]],
                     [np.r_[np.sum([loss[i,0]*self.W[i,1,:]], axis=0), 0] + np.r_[0,np.sum([loss[i,1]*self.W[i,1,:]], axis=0)]]]
      return db, dw, dx

# 問題2--------------------------------------------------------------
    def _output_size(self, X):
      Nout = int(((X.shape[1]) + (2*self.P) - self.F) / self.S + 1)
      return Nout

In [None]:
X = np.array([[1, 2, 3, 4], [2, 3, 4, 5]]) # shape(2, 4)で、（入力チャンネル数、特徴量数）である。
W = np.array([[[1,1,1],[1,1,1]],[[1,1,1],[2,1,1]],[[2,1,1],[1,1,2]]])
B = np.array([1, 2, 3]) # （出力チャンネル数）
loss = np.array([[52,56],[32,35],[9,11]])

In [None]:
test2 = SimpleConv1_1d(1,2,3,4)
test2.forward(X)

In [None]:
stride = stride
padding = padding

A = X
output_size, chanel_size, filter_size = self.W.shape
feature_size = self.A.shape[2]
sample_size = self.A.shape[0]

a = np.zeros([sample_size, output_size, feature_size-2])
for samples in range(sample_size):
    for output in range(output_size):
        for j in range(filter_size - 1):
            sig = 0
            for chanel in range(chanel_size):
                for i in range(filter_size):
                    sig += X[samples, chanel, i+j] * self.W[output, chanel, j]
            a[samples, output, j] = sig + b[output]

return a

In [None]:
output_size, chanel_size, filter_size = W.shape
output_size
chanel_size
filter_size

### 4.2.1 (解答）バックプロパゲーション

In [None]:
db, dw, dx = test2.backward(loss)
print("db",db)
print("dw",dw)
print("dx", dx)

In [None]:
dw = np.zeros(W.shape)
for j in range(3):
  for i in range(2):
    dw[j] += loss[j,i] * X[:,i:i+3]
dw

In [None]:
x = np.zeros(X.shape)
for i in range(3):
  x += np.r_[[np.r_[np.sum([loss[i,0]*W[i,0,:]], axis=0), 0] + np.r_[0,np.sum([loss[i,1]*W[i,0,:]], axis=0)]],
             [np.r_[np.sum([loss[i,0]*W[i,1,:]], axis=0), 0] + np.r_[0,np.sum([loss[i,1]*W[i,1,:]], axis=0)]]]
x

# 3.検証

##【問題8】学習と推定  
これまで使ってきたニューラルネットワークの全結合層の一部をConv1dに置き換えてMNISTを学習・推定し、Accuracyを計算してください。


出力層だけは全結合層をそのまま使ってください。ただし、チャンネルが複数ある状態では全結合層への入力は行えません。その段階でのチャンネルは1になるようにするか、 平滑化 を行なってください。


画像に対しての1次元畳み込みは実用上は行わないことのため、精度は問いません。

In [None]:
# メインクラス
class ScratchDeepNeuralNetworkClassifier():
  def __init__(self, epoch_num=1, batch_size=20, verbose=True): 
        self.epoch = epoch_num
        self.batch_size = batch_size
        self.verbose = verbose

  def fit(self, initializing, act, optimization, X, y, X_val=None, y_val=None, sigma=0.01, lr=0.01, n_features=784, n_nodes1=400, n_nodes2=200, n_output=10):  
        self.initializing = initializing # 初期化クラスの設定
        self.activation= act # 活性化関数クラスの設定
        self.optimization= optimization # 活性化関数クラスの設定
        self.sigma = sigma
        self.lr = lr
        self.n_features = n_features
        self.n_nodes1 = n_nodes1
        self.n_nodes2 = n_nodes2
        self.n_output = n_output
        self.loss = np.zeros(self.epoch)
        self.val_loss = np.zeros(self.epoch)
        self.h = None #最適化クラス adagraidで使用

        # 最適化クラスの設定
        optimizer1 = self.optimization(self.lr)
        optimizer2 = self.optimization(self.lr)
        optimizer3 = self.optimization(self.lr)

        # 全結合層にインスタンスを渡す
        self.FC1 = FC(self.n_features, self.n_nodes1, self.initializing(self.sigma), optimizer1)
        self.activation1 = self.activation() #　活性化関数の設定
        self.FC2 = FC(self.n_nodes1, self.n_nodes2, self.initializing(self.sigma), optimizer2)
        self.activation2 = self.activation() #　活性化関数の設定
        self.FC3 = FC(self.n_nodes2, self.n_output, self.initializing(self.sigma), optimizer3)
        self.activation3 = Softmax()

        # エポック数分の学習
        for i in range(self.epoch):
          # ミニバッチの作成
          get_mini_batch = GetMiniBatch(X, y, self.batch_size)
          # 1エポック（全バッチ）の学習
          for x_min, y_min in get_mini_batch:
            y_hat = self._forward_propagation(x_min)
            loss = self._back_propagation(X=y_hat, Y=y_min)
            # print("y_hat",y_hat[:1])

          self.loss[i] += loss
          if (type(X_val) != bool):
            self.val = 1
            y_hat_val = self._forward_propagation(X_val)
            loss_val = -np.mean(y_val_one_hot * np.log(y_hat_val + 1e-7))
            self.val_loss[i] += loss_val

          # self.acc_val[i] = accuracy_score(np.argmax(y_val, axis=1), np.argmax(y_hat_val, axis=1))
          # verboseをTrueにした際は学習過程を出力
          if self.verbose :
            print(f"--{i+1}回目~loss~-------\n{self.loss[i]}")
            print(f"--{i+1}回目~loss_val~---\n{self.val_loss[i]}")
            # print(f'epoch:{self.epoch:>3} loss:{self.loss:>8,.3f}')

  # フォワードプロパゲーションの実行
  def _forward_propagation(self, X):
      A1 = self.FC1.forward(X)
      Z1 = self.activation1.forward(A1)
      A2 = self.FC2.forward(Z1)
      Z2 = self.activation2.forward(A2)
      A3 = self.FC3.forward(Z2)
      Z3 = self.activation3.forward(A3)
      return Z3

  # バックプロパゲーションの実行
  def _back_propagation(self, X, Y):
      dA3, loss = self.activation3.backward(X, Y) # 交差エントロピー誤差とソフトマックスを合わせている
      dZ2 = self.FC3.backward(dA3)
      dA2 = self.activation2.backward(dZ2)
      dZ1 = self.FC2.backward(dA2)
      dA1 = self.activation1.backward(dZ1)
      dZ0 = self.FC1.backward(dA1) # dZ0は使用しない
      return loss

  def predict(self, X):
      y_hat = self._forward_propagation(X)
      return np.argmax(y_hat, axis=1)

  def plot_cost(self):
      plt.title("Num_of_Iteration vs Loss")
      plt.xlabel("Num_of_Iteration")
      plt.ylabel("Loss")
      a = range(self.epoch)
      plt.plot(range(1, self.epoch+1), self.loss, color="b", label="train_loss")
      if self.val ==1:
          plt.plot(range(1, self.epoch+1), self.val_loss, color="orange", label="val_loss")
      plt.grid()
      plt.legend()

In [None]:
# チャンネル数１の畳み込みネットワーク
class SimpleConv1_1d:
    def __init__(self, n_nodes1, n_nodes2, initializer, optimizer):
        self.optimizer = optimizer
        self.n_nodes1 = n_nodes1
        self.n_nodes2 = n_nodes2
        # self.W = initializer.W(self.n_nodes1, self.n_nodes2)
        # self.B = initializer.B(self.n_nodes2)

        # diver確認用サンプルデータ
        self.W = np.array([[[1,1,1],[1,1,1]],[[1,1,1],[2,1,1]],[[2,1,1],[1,1,2]]])
        self.b =np.array([3, 2, 1]) 

        self.P = 0           # パディング(今回未使用)
        self.S = 1           # ストライド値
        self.F = int(self.W.shape[2])  # フィルターサイズ

        self.dw = np.array([])
        self.dx = np.array([])

# 問題1--------------------------------------------------------------
    # フォワードプロパゲーション時の処理
    def forward(self, x):
      self.X = x
      self.Nout = self._output_size(self.X)

      # 出力値を記載する枠を作成
      a = np.zeros([self.W.shape[0], self.Nout])
      for j in range(self.F):
        for i in range(self.Nout):
          # 1.フィルターを移動させ、w重みを掛け合わす。
          # 2.上記作業をチャンネル回数繰り返す。
          a[j,i] = np.sum(self.X[:, i:i+self.F]*self.W[j])+self.b[j]
          #                       -----------
          #                       列の指定（フィルター)[i]から[i+F]までの値を返す。
      return a

      # 更新
      self = self.optimizer.updatbe(self)
      return self

    # バックプロパゲーション時の処理
    def backward(self, loss):
      db = np.sum(loss)
      dw = np.zeros(self.W.shape)
      dx = np.zeros(self.X.shape)
      for j in range(self.W.shape[0]):
        for i in range(self.W.shape[1]):

          # dwの式　W.shape(2,3)を１つとして、1つ目のfor分で行軸をスライドさせ、２つ目のfor分でチャネルを更新。
          dw[j] += loss[j,i] * self.X[:,i:i+3]

        # dxの式 W.shape(1,3)を１つとして、loss[0,0]*W[0]+loss[0,1]*W[0]と、
        # loss[0,0]*W[1]+loss[0,1]*W[1]でfor文でチャネルを更新して、各要素ごとに足して合計を返す。
        dx += np.r_[[np.r_[np.sum([loss[i,0]*self.W[i,0,:]], axis=0), 0] + np.r_[0,np.sum([loss[i,1]*self.W[i,0,:]], axis=0)]],
                     [np.r_[np.sum([loss[i,0]*self.W[i,1,:]], axis=0), 0] + np.r_[0,np.sum([loss[i,1]*self.W[i,1,:]], axis=0)]]]
      return db, dw, dx

# 問題2--------------------------------------------------------------
    def _output_size(self, X):
      Nout = int(((X.shape[1]) + (2*self.P) - self.F) / self.S + 1)
      return Nout

In [None]:
# 全結合層クラス
class FC:
    def __init__(self, n_nodes1, n_nodes2, initializer, optimizer):
        self.optimizer = optimizer
        self.n_nodes1 = n_nodes1
        self.n_nodes2 = n_nodes2
        self.W = initializer.W(self.n_nodes1, self.n_nodes2)
        self.B = initializer.B(self.n_nodes2)
        # 初期化
        # initializerのメソッドを使い、self.Wとself.Bを初期化する

    # フォワードプロパゲーション時の処理
    def forward(self, X):
      self.X = X
      out = X@self.W+self.B
      return out
    
    # バックプロパゲーション時の処理
    def backward(self, dA):
      self.dZ = dA@(self.W.T)
      self.dW = (self.X.T)@dA
      self.dB = np.sum(dA, axis=0)
      
      # 更新
      self = self.optimizer.update(self)
      return self.dZ

In [None]:
# 初期化クラス
# SimpleInitializer ------------------------------------------------------------
class SimpleInitializer:
    def __init__(self, sigma):
        self.sigma = sigma

    def W(self, n_nodes1, n_nodes2):
        W = self.sigma * np.random.randn(n_nodes1, n_nodes2)
        return W

    def B(self, n_nodes2):
        return np.zeros(n_nodes2)

# XavierInitializer ------------------------------------------------------------
class XavierInitializer:
    def __init__(self, sigma):
      self.sigma = sigma

    def W(self, n_nodes1, n_nodes2):
        W = np.random.randn(n_nodes1, n_nodes2)/np.sqrt(n_nodes1)
        return W

    def B(self, n_nodes2):
        return np.zeros(n_nodes2)

# He-----------------------------------------------------------------------------
class HeInitializer:
    def __init__(self, sigma):
      self.sigma = sigma

    def W(self, n_nodes1, n_nodes2):
        W = np.random.randn(n_nodes1, n_nodes2) * np.sqrt(2/n_nodes1) 
        return W

    def B(self, n_nodes2):
        return np.zeros(n_nodes2)

In [None]:
# 最適化クラス
# SGD---------------------------------------------
class SGD:
    def __init__(self, lr):
        self.lr = lr

    def update(self, layer):
      layer.W -= self.lr * layer.dW
      layer.B -= self.lr * layer.dB
      return layer

# AdaGrad----------------------------------------
class AdaGrad:
    def __init__(self, lr):
      self.lr = lr
      self.hw = 0
      self.hb = 0
  
    def update(self, layer):
        self.hw += layer.dW * layer.dW
        self.hb = layer.dB * layer.dB
        layer.W -= self.lr * layer.dW / (np.sqrt(self.hw) +1e-7)
        layer.B -= self.lr * layer.dB / (np.sqrt(self.hb) +1e-7)
        return layer

In [None]:
# 活性化関数クラス
# ソフトマックス関数のクラス----------------------------------------------
class Softmax:
  def __init__(self):
    pass

  # forward時の処理
  def forward(self, X):
    y_hat = np.exp(X) / np.sum(np.exp(X), axis = 1).reshape(-1,1)
    return y_hat

  # backward時の処理
  def backward(self, X, Y):
    loss = -np.mean(Y * np.log(X + 1e-7))
    dA3 = X - Y
    return dA3, loss

# ReLU関数のクラス----------------------------------------------
class ReLU:
  def __init__(self):
        pass
  # forward時の処理
  def forward(self, X):
    self.X =X
    return np.maximum(0, X)
    
  # backward時の処理
  def backward(self, dout):
    return np.where(self.X > 0, dout, 0)

# tanh関数のクラス----------------------------------------------
class Tanh:
  def __init__(self):
    pass

  # forward時の処理
  def forward(self, X):
    self.out = np.tanh(X)
    return self.out

  # backward時の処理
  def backward(self, X):
    return X*(1-self.out**2)

# sigmoid関数のクラス----------------------------------------------
class Sigmoid:
  def __init__(self):
    pass

  # forward時の処理
  def forward(self, z):
    self.out = 1 / (1+np.exp(-z))
    return self.out

  # backward時の処理
  def backward(self, z):
    return z*(1-self.out)*self.out

### 前処理

In [None]:
class GetMiniBatch:
    """
    ミニバッチを取得するイテレータ
    Parameters
    ----------
    X : 次の形のndarray, shape (n_samples, n_features)
      訓練データ
    y : 次の形のndarray, shape (n_samples, 1)
      正解値
    batch_size : int
      バッチサイズ
    seed : int
      NumPyの乱数のシード
    """
    def __init__(self, X, y, batch_size = 20, seed=0):
        self.batch_size = batch_size
        np.random.seed(seed)
        shuffle_index = np.random.permutation(np.arange(X.shape[0]))
        self._X = X[shuffle_index]
        self._y = y[shuffle_index]
        self._stop = np.ceil(X.shape[0]/self.batch_size).astype(np.int)
    def __len__(self):
        return self._stop
    def __getitem__(self,item):
        p0 = item*self.batch_size
        p1 = item*self.batch_size + self.batch_size
        return self._X[p0:p1], self._y[p0:p1]        
    def __iter__(self):
        self._counter = 0
        return self
    def __next__(self):
        if self._counter >= self._stop:
            raise StopIteration()
        p0 = self._counter*self.batch_size
        p1 = self._counter*self.batch_size + self.batch_size
        self._counter += 1
        return self._X[p0:p1], self._y[p0:p1]

In [None]:
from keras.datasets import mnist
(X_train, y_train), (X_test, y_test) = mnist.load_data()

# 平滑化（flatten）
X_train_fltten = X_train.reshape(-1, 784)
X_test_fltten = X_test.reshape(-1, 784)

# float化と0or1処理
X_train_flt = X_train_fltten.astype(np.float)
X_test_flt = X_test_fltten.astype(np.float)
X_train_flt /= 255
X_test_flt /= 255

# one hot処理
from sklearn.preprocessing import OneHotEncoder
enc = OneHotEncoder(handle_unknown='ignore', sparse=False)
y_train_one_hot = enc.fit_transform(y_train[:, np.newaxis])
y_test_one_hot = enc.transform(y_test[:, np.newaxis])

In [None]:
# train, testの分割
from sklearn.model_selection import train_test_split
x_train, x_val, y_train_one_hot, y_val_one_hot = train_test_split(np.array(X_train_flt), np.array(y_train_one_hot), test_size=0.2)
print("x_train",x_train.shape, "x_val", x_val.shape, "y_train_one_hot.shape", y_train_one_hot.shape, "y_val_one_hot", y_val_one_hot.shape) # (48000, 784)

In [None]:
sdnn = ScratchDeepNeuralNetworkClassifier(epoch_num=7)
sdnn.fit(HeInitializer, ReLU, AdaGrad, x_train, y_train_one_hot, x_val, y_val_one_hot, sigma=0.01, lr=0.01)

In [None]:
a = np.zeros([sample_size, output_size, feature_size-2])