In [1]:
import numpy as np
import matplotlib.pyplot as plt
%matplotlib inline

## 【問題1】チャンネル数を1に限定した1次元畳み込み層クラスの作成

## 【問題2】1次元畳み込み後の出力サイズの計算

## 【問題3】小さな配列での1次元畳み込み層の実験

## 【問題4】チャンネル数を限定しない1次元畳み込み層クラスの作成

In [2]:
class SimpleConv():
    def __init__(self, initializer, optimizer, in_channel=1, out_channel=1, kernel=(1,1), stride=(1,1), padding=0, to_nn=False):
        self.stride = stride    # tuple
        self.padding = padding  # int
        self.kernel = kernel    # tuple
        self.W = initializer.W(in_channel, out_channel, kernel[0], kernel[1])
        self.b = initializer.b(out_channel)
        self.optimizer = optimizer
        self.to_nn = to_nn
        self.Z = None

    def forward(self, X):
        if self.to_nn:
            X = X.reshape(X.shape[0], 1, 1, X.shape[1])
        #Xの次元は(batchsize, channel, height, width)
        self.X = X
        
        #outputのshapeは 0:in_channel, 1:out_channel, 2:out_height, 3:out_width
        self.batchsize, self.in_channel, self.in_h, self.in_w = X.shape
        self.out_channel = len(self.b)
        self.out_h = (self.in_h+2*self.padding-self.kernel[0])/self.stride[0] + 1        
        self.out_w = (self.in_w+2*self.padding-self.kernel[1])/self.stride[1] + 1
        if not self.out_w.is_integer() or not self.out_h.is_integer():
            print('Check kernelsize, stride, padding')
            print(f'output shape: ({self.batchsize}, {self.out_channel}, {self.out_h}, {self.out_w})')
            return 
        else:
            self.out_h, self.out_w = int(self.out_h), int(self.out_w)
            #print(f'output shape: ({self.batchsize}, {self.out_channel}, {self.out_h}, {self.out_w})')
        
        if self.padding == 0:
            self.X_pad = self.X.copy()
        elif self.padding > 0:
            self.X_pad = np.zeros((self.batchsize, self.in_channel,
                                   self.in_h+2*self.padding, self.in_w+2*self.padding))
            self.X_pad[:,:,self.padding:-self.padding, self.padding:-self.padding] += self.X
        
        self.Z = np.zeros((self.batchsize, self.out_channel, self.out_h, self.out_w))
        sh, sw = self.stride
        kh, kw = self.kernel
        for b in range(self.batchsize):
            for o_ch in range(self.out_channel):
                for i_ch in range(self.in_channel):
                    for o_h in range(self.out_h):
                        for o_w in range(self.out_w):
                            self.Z[b,o_ch,o_h,o_w] += np.sum(self.X_pad[b,i_ch,sh*o_h:sh*o_h+kh,sw*o_w:sw*o_w+kw]*self.W[i_ch,o_ch])
                self.Z[b,o_ch] += self.b[o_ch]
        if self.to_nn:
            self.Z = self.Z.reshape(self.batchsize, -1)
            
        return self.Z

    def backward(self, dA):
        if self.to_nn:
            dA = dA.reshape(self.batchsize, 1, 1, -1)
    
        self, dA = self.optimizer.update(self, dA)
        return dA

In [3]:
class ConvInitializer:
    def __init__(self, sigma=1):
        self.sigma = sigma
        
    def W(self, inchannel, outchannel, kh, kw):
        return np.random.randn(inchannel, outchannel, kh, kw)
    
    def b(self, outchannel):
        return np.random.randn(outchannel)

In [4]:
class ConvSGD:
    def __init__(self, lr=0.001):
        self.lr = lr
        
    def update(self, layer, dA):
        db = np.zeros(layer.out_channel)
        for o_ch in range(layer.out_channel):
            db[o_ch] = np.sum(dA[:,o_ch,:,:])
        dW = np.zeros_like(layer.W)
    
        for b in range(layer.batchsize):
            for i_ch in range(layer.in_channel):
                for o_ch in range(layer.out_channel):
                    for h in range(layer.kernel[0]):
                        for w in range(layer.kernel[1]):
#                             print(layer.X_pad.shape)
#                             print(b,i_ch,h,h+layer.out_h,w,w+layer.out_w)
#                             print(dA[b,o_ch].shape)
                            dW[i_ch,o_ch,h,w] = np.sum(layer.X_pad[b,i_ch,h:h+layer.out_h,w:w+layer.out_w]*dA[b,o_ch])
                            
        dX = np.zeros_like(layer.X)
        for b in range(layer.batchsize):
            for i_ch in range(layer.in_channel):
                for o_ch in range(layer.out_channel):
                    for h in range(layer.out_h):
                        for w in range(layer.out_w):
                            dX[b,i_ch,h:h+layer.kernel[0],w:w+layer.kernel[1]] += layer.W[i_ch,o_ch] * dA[b,o_ch,h,w]
        
        layer.b -= self.lr * db
        layer.W -= self.lr * dW
        self.dX = dX
        self.dW = dW
        return layer, dX

In [5]:
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score
from keras.datasets import mnist
from tqdm import tqdm

Using TensorFlow backend.


In [6]:
(X_train, y_train), (X_test, y_test) = mnist.load_data()
X_train = X_train.astype(np.float)
X_test = X_test.astype(np.float)
X_train /= 255
X_test /= 255

from sklearn.preprocessing import OneHotEncoder
enc = OneHotEncoder(handle_unknown='ignore', sparse=False)
y_train_one_hot = enc.fit_transform(y_train[:, np.newaxis])
y_test_one_hot = enc.transform(y_test[:, np.newaxis])
X_train, X_val, y_train, y_val = train_test_split(X_train, y_train_one_hot, test_size=0.99)

### 動作テスト

In [7]:
X_train = X_train.reshape(600, 1, 28, 28)
y_train = y_train.reshape(600, 1, 10)

In [8]:
initializer = ConvInitializer()
optimazer = ConvSGD()
for _ in range(1):
    conv = SimpleConv(initializer, optimazer, in_channel=1, out_channel=2, stride=(1,1), padding=0)
    conv.forward(X_train)
    #print(f'b: {conv1d.b}, W: {conv1d.W}')

In [9]:
dA = conv.Z.copy()
dA = conv.backward(dA)

### diver内の動作テスト

テスト1

In [10]:
x = np.array([[[[1,2,3,4]]]]).astype(float)
w = np.array([[[[3, 5, 7]]]]).astype(float)
b = np.array([1]).astype(float)

In [11]:
initializer = ConvInitializer()
optimazer = ConvSGD()
conv = SimpleConv(initializer, optimazer, in_channel=1, out_channel=1, kernel=(1,3), stride=(1,1), padding=0)
conv.W = w
conv.b = b

conv.forward(x)
conv.Z

array([[[[35., 50.]]]])

In [12]:
dA = np.array([[[[10, 20]]]]).astype(float)
conv.backward(dA)

array([[[[ 30., 110., 170., 140.]]]])

テスト2

In [13]:
initializer = ConvInitializer()
optimazer = ConvSGD()
conv = SimpleConv(initializer, optimazer, in_channel=2, out_channel=3, kernel=(1,3), stride=(1,1), padding=0)

x = np.array([[[[1, 2, 3, 4]], [[2, 3, 4, 5]]]]).astype(float)  #(1, 2, 1, 4)
w = np.ones((2, 3, 1, 3))
b = np.array([1, 2, 3]).astype(float)

conv.W = w
conv.b = b
conv.forward(x)
conv.Z

array([[[[16., 22.]],

        [[17., 23.]],

        [[18., 24.]]]])

In [14]:
initializer = ConvInitializer()
optimizer = ConvSGD()
conv = SimpleConv(initializer, optimizer, in_channel=2, out_channel=3, kernel=(1,3), stride=(1,1), padding=0)

x = np.array([[[[1,2,3,4]], [[2,3,4,5]]]]).astype(float)  #shape:(1, 2, 1, 4)
w = np.array([[[[1,1,2]], [[2,1,1]], [[1,1,1]]],
             [[[2,1,1]], [[1,1,1]], [[1,1,1]]]]).astype(float)     #shape: (2, 3, 1, 3)
b = np.array([1,2,3]).astype(float)
conv.W = w
conv.b = b

conv.forward(x)
conv.Z

array([[[[21., 29.]],

        [[18., 25.]],

        [[18., 24.]]]])

In [15]:
loss_ = np.array([[[[9,11]],
                   [[32,35]],
                   [[52,56]]]])
dX = conv.backward(loss_)
optimizer.dW, optimizer.dX

(array([[[[ 31.,  51.,  71.]],
 
         [[102., 169., 236.]],
 
         [[164., 272., 380.]]],
 
 
        [[[ 51.,  71.,  91.]],
 
         [[169., 236., 303.]],
 
         [[272., 380., 488.]]]]), array([[[[125., 230., 204., 113.]],
 
         [[102., 206., 195., 102.]]]]))

In [16]:
x_ = np.array([[1,2,3,4],
               [2,3,4,5]])
w_ = np.array([[[1,1,2],[2,1,1]],
              [[2,1,1],[1,1,1]],
              [[1,1,1],[1,1,1]]])
b_ = np.array([1,2,3])

# フォワードの出力
out_ = np.array([[21,29],
                [18,25],
                [18,24]])
loss_ = np.array([[9,11],
                [32,35],
                [52,56]])

# バックワードの勾配
x_delta = np.array([[125,230,204,113],
                    [102,206,195,102]])
w_delta = np.array([[[31,51,71],[51,71,91]],
                    [[102,169,236],[169,236,303]],
                    [[164,272,380],[272,380,488]]])

## 【問題8】学習と推定

In [17]:
class FC():
    def __init__(self, n_nodes1, n_nodes2, activator, initializer, optimizer):
        self.optimizer = optimizer
        self.activator = activator
        self.W = initializer.W(n_nodes1, n_nodes2)
        self.B = initializer.B(n_nodes2)

    def forward(self, X):  
        self.X = X
        self.n_batch = len(self.X)
        self.Z = np.dot(X, self.W) + self.B
        self.A = self.activator.forward(self.Z)
        return self.A    

    def backward(self, dA):
        dZ = self.activator.backward(dA)
        self, dA = self.optimizer.update(self, dZ)
        return dA
    
class SimpleInitializer:
    def __init__(self, sigma):
        self.sigma = sigma
        
    def W(self, n_nodes1, n_nodes2):
        return np.random.randn(n_nodes1, n_nodes2)
    
    def B(self, n_nodes2):
        return np.random.randn(n_nodes2)
    
class SGD:
    def __init__(self, lr):
        self.lr = lr
        
    def update(self, layer, dZ):
        dB = np.sum(dZ, axis=0)
        dW = np.dot(layer.X.T, dZ)
        dA = np.dot(dZ, layer.W.T)
        layer.B -= self.lr * dB / layer.n_batch
        layer.W -= self.lr * dW / layer.n_batch
        return layer, dA

class Relu():
    def __init__(self):
        pass
    
    def forward(self, Z):
        self.A = np.maximum(0, Z)
        return self.A
    
    def backward(self, dA):
        return  dA * np.where(self.A>0, 1, 0)
    
class Softmax():
    def __init__(self):
        pass
    
    def forward(self, Z):
        if Z.ndim == 2:
            Z = Z.T
            self.A = (np.exp(Z) / np.sum(np.exp(Z), axis=0)).T
            return self.A
        self.A = np.exp(Z) / np.sum(np.exp(Z))
        return self.A
    
    def backward(self, y):
        return self.A - y

class ScratchDeepNeuralNetworkClassifier():
    def __init__(self, *layers, epoch=3):
        self.epoch = epoch
        self.n_layers = len(layers)
        self.layers = layers
        self.loss_train = []
        self.loss_valid = []

    def train(self, X, y, X_val=None, y_val=None):
        get_mini_batch = GetMiniBatch(X_train, y_train, batch_size=20)
        for _ in tqdm(range(self.epoch)):
            for mini_X_train, mini_y_train in get_mini_batch:
                fout = mini_X_train.copy()
                for layer in self.layers:
                    fout = layer.forward(fout)

                bout = mini_y_train.copy()
                for layer in self.layers[::-1]:
                    bout = layer.backward(bout)
                    
            self.loss_train.append(self.crossentropy(mini_y_train, fout))
            if X_val is not None:
                y_val_pred = X_val.copy()
                for layer in self.layers:
                    y_val_pred = layer.forward(y_val_pred)
                self.loss_valid.append(self.crossentropy(y_val, y_val_pred))
        
        
    def crossentropy(self, y, y_pred):
        loss = -np.mean(np.sum(y*np.log(y_pred), axis=1))
        return loss
    
            
    def predict(self, X_test):
        out = X_test
        for layer in self.layers:
            out = layer.forward(out)
        return out      

In [18]:
class GetMiniBatch:
    def __init__(self, X, y, batch_size=20, seed=0):
        self.batch_size = batch_size
        np.random.seed(seed)
        shuffle_index = np.random.permutation(np.arange(X.shape[0]))
        self._X = X[shuffle_index]
        self._y = y[shuffle_index]
        self._stop = np.ceil(X.shape[0]/self.batch_size).astype(np.int)
    def __len__(self):
        return self._stop
    def __getitem__(self, item):
        p0 = item*self.batch_size
        p1 = item*self.batch_size + self.batch_size
        return self._X[p0:p1], self._y[p0:p1]        
    def __iter__(self):
        self._counter = 0
        return self
    def __next__(self):
        if self._counter >= self._stop:
            raise StopIteration()
        p0 = self._counter*self.batch_size
        p1 = self._counter*self.batch_size + self.batch_size
        self._counter += 1
        return self._X[p0:p1], self._y[p0:p1]

In [19]:
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score
from tqdm import tqdm

In [20]:
(X_train, y_train), (X_test, y_test) = mnist.load_data()
X_train = X_train.reshape(-1, 784)[:500]
y_train = y_train[:500]
X_train = X_train.astype(np.float)
X_train /= 255

from sklearn.preprocessing import OneHotEncoder
enc = OneHotEncoder(handle_unknown='ignore', sparse=False)
y_train_one_hot = enc.fit_transform(y_train[:, np.newaxis])
y_test_one_hot = enc.transform(y_test[:, np.newaxis])
X_train, X_val, y_train, y_val = train_test_split(X_train, y_train_one_hot, test_size=0.1)

In [21]:
softmax = Softmax()
initializer1 = ConvInitializer()
optimizer1 = ConvSGD(lr=0.001)
initializer2 = SimpleInitializer(sigma=1)
optimizer2 = SGD(lr=0.001)

conv = SimpleConv(initializer1, optimizer1, in_channel=1, out_channel=1, to_nn=True)
fc = FC(784, 10, softmax, initializer2, optimizer2)

nn = ScratchDeepNeuralNetworkClassifier(conv, fc, epoch=3)
nn.train(X_train, y_train)

y_pred = nn.predict(X_val)

100%|██████████| 3/3 [00:17<00:00,  5.78s/it]


In [22]:
accuracy_score(np.argmax(y_pred, axis=1), np.argmax(y_val, axis=1))

0.1