# Sprint 12

# SimpleConv2d

## Setup

In [1]:
import numpy as np

In [2]:
class SimpleInitializer:
    """
    Simple initialization with Gaussian distribution
    Parameters
    ----------
    sigma : float
      Standard deviation of Gaussian distribution
    """
    def __init__(self, sigma=0.01):
        self.sigma = sigma
        
    def W(self, *shape):
        """
        Weight initialization
        Parameters
        ----------
        n_nodes1 : int
          Number of nodes in the previous layer
        n_nodes2 : int
          Number of nodes in the later layer
        Returns
        ----------
        W :
        """
        W = self.sigma * np.random.randn(*shape)
        return W
    
    def B(self, *shape):
        """
        Bias initialization
        Parameters
        ----------
        n_nodes2 : int
          Number of nodes in the later layer
        Returns
        ----------
        B :
        """
        B = self.sigma * np.random.randn(*shape)
        return B

In [3]:
class HeInitializer:
    
    def __init__(self):
        pass
    
    def W(self, n_nodes1, n_nodes2):
        self.sigma = np.sqrt(2 / n_nodes1)
        W = self.sigma * np.random.randn(n_nodes1, n_nodes2)
        return W
    
    def B(self, n_nodes2):
        B = self.sigma * np.random.randn(1, n_nodes2)
        return B

In [4]:
class SGD:
    """
    Stochastic gradient descent
    Parameters
    ----------
    lr : Learning rate
    """
    
    def __init__(self, lr):
        self.lr = lr
        
    def update(self, layer):
        """
        Update weights and biases for a layer
        Parameters
        ----------
        layer : Instance of the layer before update
        """
        layer.W -= self.lr * layer.dW
        layer.B -= self.lr * layer.dB
        return layer

In [5]:
class AdaGrad:
    
    def __init__(self, lr):
        self.lr = lr
        
    def update(self, layer):
        layer.HW += layer.dW * layer.dW
        layer.HB = layer.dB * layer.dB
        delta = 1e-7
        layer.W -= self.lr * layer.dW / (np.sqrt(layer.HW) + delta)
        layer.B -= self.lr * layer.dB / (np.sqrt(layer.HB) + delta)
        return layer

In [6]:
class ReLU:
    
    def forward(self, A):
        self.A = A
        Z = np.maximum(0, A)
        return Z
    
    def backward(self, dZ):
        dA = dZ * np.where(self.A > 0, 1, 0)
        return dA

In [7]:
class Softmax:
    
    def forward(self, A):
        self.Z = np.exp(A - np.max(A)) / np.sum(np.exp(A - np.max(A)), axis=1, keepdims=True)
        return self.Z
    
    def backward(self, dZ):
        return dZ

In [8]:
class FC:
    """
    Number of nodes Fully connected layer from n_nodes1 to n_nodes2
    Parameters
    ----------
    n_nodes1 : int
      Number of nodes in the previous layer
    n_nodes2 : int
      Number of nodes in the later layer
    initializer: instance of initialization method
    optimizer: instance of optimization method
    """
    def __init__(self, n_nodes1, n_nodes2, initializer, optimizer, activation):
        self.optimizer = optimizer
        self.n_nodes1 = n_nodes1
        self.n_nodes2 = n_nodes2
        self.W = initializer.W(self.n_nodes1, self.n_nodes2)
        self.B = initializer.B(self.n_nodes2)
        self.optimizer = optimizer
        self.activation = activation
        pass
    
    def forward(self, X):
        """
        forward
        Parameters
        ----------
        X : The following forms of ndarray, shape (batch_size, n_nodes1)
            入力
        Returns
        ----------
        A : The following forms of ndarray, shape (batch_size, n_nodes2)
            output
        """
        self.X = X
        self.A = np.dot(self.X, self.W) + self.B
        return self.activation.forward(self.A)
    
    def backward(self, dZ):
        """
        Backward
        Parameters
        ----------
        dA : The following forms of ndarray, shape (batch_size, n_nodes2)
            Gradient flowing from behind
        Returns
        ----------
        dZ : The following forms of ndarray, shape (batch_size, n_nodes1)
            Gradient to flow forward
        """
        dA = self.activation.backward(dZ)
        self.dB = np.sum(dA, axis=0)
        self.dW = np.dot(self.X.T, dA) / len(self.X)
        self.dZ = np.dot(dA, self.W.T)
        self = self.optimizer.update(self)
        return self.dZ

In [9]:
class GetMiniBatch:
    """
    Iterator to get a mini-batch
    Parameters
    ----------
    X : The following forms of ndarray, shape (n_samples, n_features)
      Training data
    y : The following form of ndarray, shape (n_samples, 1)
      Correct answer value
    batch_size : int
      Batch size
    seed : int
      NumPy random number seed
    """
    
    def __init__(self, X, y, batch_size = 20, seed=0):
        self.batch_size = batch_size
        np.random.seed(seed)
        shuffle_index = np.random.permutation(np.arange(X.shape[0]))
        self._X = X[shuffle_index]
        self._y = y[shuffle_index]
        self._stop = np.ceil(X.shape[0]/self.batch_size).astype(np.int)
        
    def __len__(self):
        return self._stop
    
    def __getitem__(self,item):
        p0 = item*self.batch_size
        p1 = item*self.batch_size + self.batch_size
        return self._X[p0:p1], self._y[p0:p1]        
    
    def __iter__(self):
        self._counter = 0
        return self
    
    def __next__(self):
        if self._counter >= self._stop:
            raise StopIteration()
        p0 = self._counter*self.batch_size
        p1 = self._counter*self.batch_size + self.batch_size
        self._counter += 1
        return self._X[p0:p1], self._y[p0:p1]

In [10]:
class SimpleInitializerConv2d:
    
    def __init__(self, sigma=0.01):
        self.sigma = sigma
        
    def W(self, F, C, FH, FW):
        return self.sigma * np.random.randn(F, C, FH, FW)
    
    def B(self, F):
        return np.zeros(F)

In [11]:
from tensorflow.keras.datasets import mnist
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import OneHotEncoder
from sklearn.metrics import classification_report

(X_train, y_train), (X_test, y_test) = mnist.load_data()

X_train = X_train.astype(np.float)
X_test = X_test.astype(np.float)
X_train /= 255
X_test /= 255

X_train, X_val, y_train, y_val = train_test_split(X_train, y_train, test_size=0.2)

print(X_train.shape)
print(y_train.shape)
print(X_val.shape)
print(y_val.shape)

(48000, 28, 28)
(48000,)
(12000, 28, 28)
(12000,)


In [12]:
enc = OneHotEncoder(handle_unknown='ignore', sparse=False)
y_train_one_hot = enc.fit_transform(y_train[:, np.newaxis])
y_val_one_hot = enc.transform(y_val[:, np.newaxis])

## 【Problem 1】Creating a 2-D convolutional layer

In [13]:
class SimpleConv2d():
    """
    2D convolutional layer
    Parameters
    -------
    F: Number of output channels
    C: Number of input channels
    FH: Filter's height.
    FC: Filter's width.
    P: Padding.
    S: Stride.
    initializer: Instances of initialization methods
    optimizer: Instances of optimization methods
    """
    def __init__(self, F, C, FH, FW, P=0, S=1, initializer=None, optimizer=None, activation=None):
        self.P = P
        self.S = S
        self.initializer = initializer
        self.optimizer = optimizer
        self.activation = activation
        
        # Initializer W and B with initializer
        self.W = self.initializer.W(F, C, FH, FW)
        self.B = self.initializer.B(F)
        
    def forward(self, X):
        """
        Forward method.
        Parameters
        -------
        X: ndarray
            Input data
        Returns
        -------
        A: ndarray
            Output data
        """
        # (n_samples, n_channels, height, width)
        N, C, H, W = X.shape
        F, C, FH, FW = self.W.shape
        
        OH, OW = self.output_shape_2d(H, W, self.P, self.P, FH, FW, self.S, self.S)

        self.params = N, C, H, W, F, FH, FW, OH, OW
        
        A = np.zeros([N, F, OH, OW])
        
        self.X_pad = np.pad(X, ((0,0), (0,0), (self.P, self.P), (self.P, self.P)))
    
        # For each sample
        for n in range(N):
            for ch in range(F):
                # Vertical slide
                for row in range(0, OH, self.S):
                    # Horizontal slide
                    for col in range(0, OW, self.S):
                        A[n, ch, row, col] = np.sum(self.X_pad[n, :, row:row+FH, col:col+FW] * self.W[ch, :, :, :]) + self.B[ch]
        
        if self.activation == None:
            return A
        else:
            return self.activation.forward(A)
    
    def backward(self, dZ):
        """
        Backward method.
        Parameters
        -------
        dZ: 
            The gradient flown in from behind.
        Returns
        -------
        dZ: ndarray
            Forward slope
        """
        dA = dZ if self.activation == None else self.activation.backward(dZ)
        N, C, H, W, F, FH, FW, OH, OW = self.params
        
        dZ = np.zeros(self.X_pad.shape)
        self.dW = np.zeros(self.W.shape)
        self.dB = np.zeros(self.B.shape)
        
        # Calculate dZ
        for n in range(N):
            for ch in range(F):
                for row in range(0, OH, self.S):
                    for col in range(0, OW, self.S):
                        dZ[n, :, row:row+FH, col:col+FW] += dA[n, ch, row, col] * self.W[ch, :, :, :]
        
        dl_rows = range(self.P), range(H + self.P, H + 2*self.P, 1)
        dl_cols = range(self.P), range(W + self.P, W + 2*self.P, 1)
        
        dZ = np.delete(dZ, dl_rows, axis=2)
        dZ = np.delete(dZ, dl_cols, axis=3)
        
        # Calculate dW
        for n in range(N):
            for ch in range(F):
                for row in range(OH):
                    for col in range(OW):
                        self.dW[ch, :, :, :] += dA[n, ch, row, col] * self.X_pad[n, :, row:row+FH, col:col+FW]
        
        # Calculate dB
        for ch in range(F):
            self.dB[ch] = np.sum(dA[:, ch, :, :])
            
        # Update
        if self.optimizer is not None:    
            self = self.optimizer.update(self)
        
        return dZ
    
    def output_shape_2d(self, H, W, PH, PW, FH, FW, SH, SW):
        OH = (H + 2*PH - FH) / SH + 1
        OW = (W + 2*PW - FW) / SW + 1
        return int(OH), int(OW)

In [14]:
class ConstantInitializer():
    
    def __init__(self, w, b):
        self.w = w
        self.b = b
        
    def W(self, *args):
        return self.w
    
    def B(self, *args):
        return self.b

In [15]:
# (1,1,4,4)
x = np.array([[[[ 1,  2,  3,  4],
                [ 5,  6,  7,  8],
                [ 9, 10, 11, 12],
                [13, 14, 15, 16]]]])
# (2,1,3,3)
w = np.array([[[[ 0.,  0.,  0.],
               [ 0.,  1.,  0.],
               [ 0., -1.,  0.]]],
              [[[ 0.,  0.,  0.],
               [ 0., -1.,  1.],
               [ 0.,  0.,  0.]]]])

b = np.array([0, 0])

test = SimpleConv2d(1, 1, 3, 3, initializer=ConstantInitializer(w, b))

In [16]:
result = test.forward(x)
result

array([[[[-4., -4.],
         [-4., -4.]],

        [[ 1.,  1.],
         [ 1.,  1.]]]])

In [17]:
delta = np.array([[[[ -4,  -4],
                   [ 10,  11]],
                  [[  1,  -7],
                   [  1, -11]]]])

back_result = test.backward(delta)
back_result

array([[[[  0.,   0.,   0.,   0.],
         [  0.,  -5.,   4.,  -7.],
         [  0.,  13.,  27., -11.],
         [  0., -10., -11.,   0.]]]])

## [Problem 4] Creating a maximum pooling layer

In [18]:
class MaxPool2D():
    
    def __init__(self, P):
        self.P = P
        self.PA = None
        self.Pidx = None
        
    def forward(self, A):
        N, F, OH, OW = A.shape
        PS = self.P
        PH, PW = int(OH/PS), int(OW/PS)
        
        self.params = N, F, OH, OW, PS, PH, PW
        
        # Pooling filter
        self.PA = np.zeros((N, F, PH, PW))
        self.Pidx = np.zeros((N, F, PH, PW))
        
        for n in range(N):
            for ch in range(F):
                for row in range(PH):
                    for col in range(PW):
                        self.PA[n, ch, row, col] = np.max(A[n, ch, row*PS:row*PS+PS, col*PS:col*PS+PS])
                        self.Pidx[n, ch, row, col] = np.argmax(A[n, ch, row*PS:row*PS+PS, col*PS:col*PS+PS])
        return self.PA
    
    def backward(self, dA):
        N, F, OH, OW, PS, PH, PW = self.params
        dP = np.zeros((N, F, OH, OW))
        
        for n in range(N):
            for ch in range(F):
                for row in range(PH):
                    for col in range(PW):
                        idx = self.Pidx[n, ch, row, col]
                        tmp = np.zeros((PS*PS))
                        for i in range(PS*PS):
                            if i == idx:
                                tmp[i] = dA[n, ch, row, col]
                            else:
                                tmp[i] = 0
                        dP[n, ch, row*PS:row*PS+PS, col*PS:col*PS+PS] = tmp.reshape(PS, PS)
        return dP

In [19]:
X = np.array([[[
    [3, 6, 0, 3, 1, 0],
    [6, 8, 3, 4, 1, 0],
    [7, 8, 6, 6, 8, 2],
    [7, 6, 4, 4, 0, 2],
    [0, 6, 6, 1, 5, 7],
    [0, 5, 5, 4, 7, 7]
]]])
X.shape

(1, 1, 6, 6)

In [20]:
pooling = MaxPool2D(P=2)
A = pooling.forward(X)

A

array([[[[8., 4., 1.],
         [8., 6., 8.],
         [6., 6., 7.]]]])

In [21]:
pooling.Pidx

array([[[[3., 3., 0.],
         [1., 0., 0.],
         [1., 0., 1.]]]])

In [22]:
dA = A

In [23]:
dZ = pooling.backward(dA)
dZ

array([[[[0., 0., 0., 0., 1., 0.],
         [0., 8., 0., 4., 0., 0.],
         [0., 8., 6., 0., 8., 0.],
         [0., 0., 0., 0., 0., 0.],
         [0., 6., 6., 0., 0., 7.],
         [0., 0., 0., 0., 0., 0.]]]])

## [Problem 5] Create Average Pooling

In [24]:
class AveragePool2D():
    
    def __init__(self, P):
        self.P = P
        self.PA = None
        self.Pidx = None
        
    def forward(self, A):
        N, F, OH, OW = A.shape
        PS = self.P
        PH, PW = int(OH/PS), int(OW/PS)
        
        self.params = N, F, OH, OW, PS, PH, PW
        
        # Pooling filter
        self.PA = np.zeros((N, F, PH, PW))
        self.Pidx = np.zeros((N, F, PH, PW))
        
        for n in range(N):
            for ch in range(F):
                for row in range(PH):
                    for col in range(PW):
                        self.PA[n, ch, row, col] = np.mean(A[n, ch, row*PS:row*PS+PS, col*PS:col*PS+PS])
                        
        return self.PA
    
    def backward(self, dA):
        N, F, OH, OW, PS, PH, PW = self.params
        dP = np.zeros((N, F, OH, OW))
        
        for n in range(N):
            for ch in range(F):
                for row in range(PH):
                    for col in range(PW):
                        tmp = np.zeros((PS*PS))
                        for i in range(PS*PS):
                            tmp[i] = dA[n, ch, row, col] / (PS*PS)
                        dP[n, ch, row*PS:row*PS+PS, col*PS:col*PS+PS] = tmp.reshape(PS, PS)
        return dP

In [25]:
X = np.array([[[
    [2, 1, 2, 3, 0, 2],
    [4, 4, 8, 8, 8, 0],
    [5, 8, 0, 8, 7, 0],
    [8, 3, 0, 0, 0, 5],
    [0, 8, 3, 0, 5, 7],
    [4, 8, 3, 6, 7, 6]
]]])

In [26]:
pooling = AveragePool2D(P=2)
A = pooling.forward(X)
A

array([[[[2.75, 5.25, 2.5 ],
         [6.  , 2.  , 3.  ],
         [5.  , 3.  , 6.25]]]])

In [27]:
dA = np.array([[[
    [2, 6, 6],
    [6, 5, 3],
    [3, 6, 5]
]]])

In [28]:
dZ = pooling.backward(dA)
dZ

array([[[[0.5 , 0.5 , 1.5 , 1.5 , 1.5 , 1.5 ],
         [0.5 , 0.5 , 1.5 , 1.5 , 1.5 , 1.5 ],
         [1.5 , 1.5 , 1.25, 1.25, 0.75, 0.75],
         [1.5 , 1.5 , 1.25, 1.25, 0.75, 0.75],
         [0.75, 0.75, 1.5 , 1.5 , 1.25, 1.25],
         [0.75, 0.75, 1.5 , 1.5 , 1.25, 1.25]]]])

## [Problem 6] Flatten

In [29]:
class Flatten():
    
    def forward(self, X):
        self.shape = X.shape
        return X.reshape(len(X), -1)
    
    def backward(self, X):
        return X.reshape(self.shape)

In [30]:
X = np.zeros((10, 20, 30, 50))
flat = Flatten()
flat_forward = flat.forward(X)
flat_forward.shape

(10, 30000)

In [31]:
flat.backward(flat_forward).shape

(10, 20, 30, 50)

## [Problem 7] Learning and estimation

In [32]:
from sklearn.metrics import accuracy_score

class Scratch2dCNNClassifier():
    
    def __init__(self, NN, CNN, n_epoch=5, n_batch=1, verbose=False):
        self.NN = NN
        self.CNN = CNN
        self.n_epoch = n_epoch
        self.n_batch = n_batch
        self.verbose = verbose
        self.losses = []
        self.val_losses = []
        self.accs = []
        self.val_accs = []
        
    def loss_function(self, y, yt):
        delta = 1e-7
        return -np.mean(yt*np.log(y+delta))
    
    def accuracy(self, Z, Y):
        return accuracy_score(Y, Z)
    
    def fb(self, X, y, batch=1):
        """
        Do forward and backward, packaging in a function.
        """
        forward_data = X[:, np.newaxis, :, :]

        # Do forward propagation
        for layer in self.CNN:
            forward_data = layer.forward(forward_data)

        fit = Flatten()
        forward_data = fit.forward(forward_data)
        for layer in self.NN:
            forward_data = layer.forward(forward_data)
            
        # Predicted value
        Z = forward_data
        # Do back propagation
        backward_data = (Z - y) / batch
        for layer in reversed(self.NN):
            backward_data = layer.backward(backward_data)

        backward_data = fit.backward(backward_data)

        for layer in reversed(self.CNN):
            backward_data = layer.backward(backward_data)
            
        loss = self.loss_function(Z, y)
        
        return loss
        
    
    def fit(self, X, y, X_val=None, y_val=None):
        """
        Train a neural network classifier.
        
        Parameters:
        -------
        X: ndarray
            Training data.
        y: ndarray
            Target values of training data.
        X_val: ndarray
            Validation data.
        y_val: ndarray
            Target values of validation data.
        """
        for epoch in range(self.n_epoch):
            print(f"Epoch {epoch+1}")
            self.loss = 0
            self.val_loss = 0
            get_mini_batch = GetMiniBatch(X, y, batch_size=self.n_batch)
            for mini_X_train, mini_y_train in get_mini_batch:
                
                # Forward
                forward_data = mini_X_train[:, np.newaxis, :, :]
                
                for layer in range(len(self.CNN)):
                    forward_data = self.CNN[layer].forward(forward_data)
                
                # Flatten
                fit = Flatten()
                forward_data = fit.forward(forward_data)
                
                # NN
                for layer in range(len(self.NN)):
                    forward_data = self.NN[layer].forward(forward_data)
                
                Z = forward_data
                
                # Backward
                backward_data = (Z - mini_y_train)/self.n_batch
                for layer in range(len(self.NN)-1, -1, -1):
                    backward_data = self.NN[layer].backward(backward_data)
                    
                backward_data = fit.backward(backward_data)
                
                for layer in range(len(self.CNN)-1, -1, -1):
                    backward_data = self.CNN[layer].backward(backward_data)
                
                self.loss += self.loss_function(Z, mini_y_train)
            
            loss = self.loss/len(get_mini_batch)
            acc = self.accuracy(self.predict(X), np.argmax(y, axis=1))
            print(f"loss: {loss}, acc: {acc}")
            self.losses.append(loss)
#             self.val_losses.append(self.val_loss/len(get_mini_batch))
            self.accs.append(acc)
#             if X_val is not None and y_val is not None:
#                 self.val_accs.append(self.accuracy(self.predict(X_val), np.argmax(y_val, axis=1)))
            
    def predict(self, X):
        pred_data = X[:, np.newaxis, :, :]
        
        for layer in self.CNN:
            pred_data = layer.forward(pred_data)
            
        fit = Flatten()
        pred_data = fit.forward(pred_data)
        
        for layer in self.NN:
            pred_data = layer.forward(pred_data)
        
        return np.argmax(pred_data, axis=1)

In [33]:
NN = [
    FC(7840, 400, HeInitializer(), AdaGrad(0.01), ReLU()),
    FC(400, 200, HeInitializer(), AdaGrad(0.01), ReLU()),
    FC(200, 10, SimpleInitializer(0.01), AdaGrad(0.01), Softmax()),
]

In [34]:
CNN = [
    SimpleConv2d(F=10, C=1, FH=3, FW=3, P=1, S=1,
                 initializer=SimpleInitializerConv2d(),
                 optimizer=SGD(0.01),
                 activation=ReLU())
]

In [35]:
cnn1 = Scratch2dCNNClassifier(NN, CNN, n_epoch=10, n_batch=200, verbose=False)

cnn1.fit(X_train[:1000], y_train_one_hot[:1000], X_val[:50], y_val_one_hot[:50])

Epoch 1
loss: 0.22712575548772357, acc: 0.423
Epoch 2
loss: 0.23600258337523855, acc: 0.104
Epoch 3


KeyboardInterrupt: 

In [None]:
y_pred = cnn1.predict(X_train[:1000])
print(classification_report(y_train[:1000], y_pred))

In [None]:
y_pred_val = cnn1.predict(X_val)
print(classification_report(y_val, y_pred_val))

In [None]:
import matplotlib.pyplot as plt

plt.plot(np.arange(1, cnn1.n_epoch+1), cnn1.losses, label='train_loss')
plt.plot(np.arange(1, cnn1.n_epoch+1), cnn1.val_losses, label='val_loss')
plt.xlabel('Epochs')
plt.xticks(np.arange(1, cnn1.n_epoch+1))
plt.legend()
plt.show()

In [None]:
y_pred_train = cnn1.predict(X_train[:1000])
y_pred_train

In [None]:
print(classification_report(y_train[:1000], y_pred_train))