In [65]:
import numpy as np

In [4]:
train = np.genfromtxt('MNIST_CSV/mnist_train.csv', delimiter=',',dtype='int')
test = np.genfromtxt('MNIST_CSV/mnist_test.csv', delimiter=',',dtype='int')

X_train = train[:,1:].reshape(-1,1,28,28)
y_train = train[:,0]

X_test = test[:,1:].reshape(-1,1,28,28)
y_test = test[:,0]

In [5]:
X_train_buf = np.pad(X_train,((0,0),(0,0),(2,2),(2,2)))
X_test_buf = np.pad(X_test,((0,0),(0,0),(2,2),(2,2)))

In [6]:
X_train = X_train_buf
X_test = X_test_buf

In [37]:
class Layer:

    def __init__(self):
        pass

class LayerInput(Layer):

    def __init__(self):
        self.activation = 'linear'

    def __repr__(self):
        return f'{type(self).__name__}'

class LayerConv2D(Layer):

    def __init__(self,kernel_size,n_kernels,stride=1,padding=0,W=None,b=None,activation='linear',use_bias=True):
        self.kernel_size = kernel_size
        self.stride = stride
        self.padding = padding
        self.n_kernels = n_kernels
        self.activation = activation
        self.use_bias = use_bias
        self._W_and_b = False
        if type(W) == np.ndarray:
            self.W = W
            self.n_channels = W.shape[1]
            self._W_and_b = True
        if type(b) == np.ndarray:
            self.b = b
    
    def __repr__(self):
        return f'{type(self).__name__}: kernel_size = {self.kernel_size}, stride = {self.stride}, padding = {self.padding}, n_kernels = {self.n_kernels}, activation = {self.activation}'

    def initialize_weights_and_biases(self):
        self.W = np.random.uniform(-0.5,0.5,(self.n_kernels,self.n_channels,self.kernel_size,self.kernel_size))
        self.b = np.random.uniform(-0.5,0.5,(1,self.n_kernels,1,1))

    def update_weights_and_biases(self,dW,db,lr=0.001):
        self.W -= lr*dW
        self.b -= lr*db

class LayerMaxPool2D(Layer):

    def __init__(self,kernel_size,stride=2,padding=0):
        
        self.kernel_size = kernel_size
        self.stride = stride
        self.padding = padding
        self.activation = 'linear'

    def __repr__(self):
        return f'{type(self).__name__}: kernel_size = {self.kernel_size}, stride = {self.stride}, padding = {self.padding}'
    
class LayerAveragePool2D(Layer):

    def __init__(self,kernel_size,stride=2,padding=0):
        
        self.kernel_size = kernel_size
        self.stride = stride
        self.padding = padding
        self.activation = 'linear'

    def __repr__(self):
        return f'{type(self).__name__}: kernel_size = {self.kernel_size}, stride = {self.stride}, padding = {self.padding}'
        
class LayerDense(Layer):

    def __init__(self,n_neurons,activation='linear'):
        self.n_neurons = n_neurons
        self.activation = activation

    def __repr__(self):
        return f'{type(self).__name__}: activation = {self.activation}'
    
    def initialize_weights_and_biases(self,n_neurons_prev):
        self.W = np.random.rand(self.n_neurons, n_neurons_prev) - 0.5
        self.b = np.random.rand(self.n_neurons, 1) - 0.5

    def update_weights_and_biases(self,dW,db,lr=0.01):
        self.W -= lr*dW
        self.b -= lr*db
        
class LayerFlatten(Layer):

    def __init__(self):
        self.activation = 'linear'

    def __repr__(self):
        return f'{type(self).__name__}'

In [50]:
class ConvNet:

    def __init__(self,file=None,layers=None):
        if file:
            self.load(file)
        elif layers:
            self.layers = layers
        else:
            self.layers = [LayerInput()]
            self.score = 0.0
            self.best_score = 0.0
            self.best = None

    def __repr__(self):
        return f'{type(self).__name__}'

    def add_layer(self,layer):
        self.layers.append(layer)
        
    def compile(self,X_shape):
        self.activations = []
        if len(X_shape) == 4:
            _, channels, height, width = X_shape
            input_flat = False
        elif len(X_shape) == 2:
            n_neurons, _ = X_shape
            input_flat = True
        for i,layer in enumerate(self.layers):
            self.activations.append(layer.activation)
            if type(layer) == LayerInput:
                layer.input_shape = (n_neurons, None) if input_flat else (None, channels, height, width)
                layer.output_shape = (n_neurons, None) if input_flat else (None, channels, height, width)
                layer.n_trainable_parameters = 0
            if type(layer) == LayerConv2D:
                if layer._W_and_b:
                    if layer.n_channels != channels:
                        raise ValueError(f'Layer {i} is incopatible with input')
                else:
                    layer.n_channels = channels
                    layer.initialize_weights_and_biases()
                    layer._W_and_b = True
                new_height = int((height - layer.kernel_size + 2 * layer.padding)/layer.stride) + 1
                new_width = int((width - layer.kernel_size + 2 * layer.padding)/layer.stride) + 1
                layer.input_shape = (None, channels, height, width)
                layer.output_shape = (None, layer.n_kernels, new_height, new_width)
                layer.n_trainable_parameters = (layer.kernel_size**2 * layer.n_channels + 1) * layer.n_kernels
                _, channels, height, width = layer.output_shape
            if type(layer) == LayerMaxPool2D:
                new_height = int((height - layer.kernel_size + 2 * layer.padding)/layer.stride) + 1
                new_width = int((width - layer.kernel_size + 2 * layer.padding)/layer.stride) + 1
                layer.input_shape = (None, channels, height, width)
                layer.output_shape = (None, channels, new_height, new_width)
                layer.n_trainable_parameters = 0
                _, channels, height, width = layer.output_shape
            if type(layer) == LayerAveragePool2D:
                new_height = int((height - layer.kernel_size + 2 * layer.padding)/layer.stride) + 1
                new_width = int((width - layer.kernel_size + 2 * layer.padding)/layer.stride) + 1
                layer.input_shape = (None, channels, height, width)
                layer.output_shape = (None, channels, new_height, new_width)
                layer.n_trainable_parameters = 0
                _, channels, height, width = layer.output_shape
            if type(layer) == LayerFlatten:
                n_neurons = channels*height*width
                layer.input_shape = (None, channels, height, width)
                layer.output_shape = (n_neurons, None)
                layer.n_trainable_parameters = 0
                n_neurons, _ = layer.output_shape
            if type(layer) == LayerDense:
                layer.initialize_weights_and_biases(n_neurons)
                layer.input_shape = (n_neurons, None)
                layer.output_shape = (layer.n_neurons, None)
                layer.n_trainable_parameters = layer.n_neurons * (n_neurons + 1)
                n_neurons, _ = layer.output_shape
        self.describe()
                
    def describe(self):
        print(f' --------------------------------------------------------------------------------------------------------------------------------------------------')
        print(f'| Layer |     Layer type      |      Input shape      | Kernel size | Stride | Padding |      Output shape     | Activation | Trainable Parameters |')
        print(f' --------------------------------------------------------------------------------------------------------------------------------------------------')
        for i,layer in enumerate(self.layers):
            layer_2D = False
            if type(layer) == LayerConv2D:
                layer_type = "{:^21}".format('LayerConv2D')
                layer_2D = True
            elif type(layer) == LayerMaxPool2D:
                layer_type = "{:^21}".format('LayerMaxPool2D')
                layer_2D = True
            elif type(layer) == LayerAveragePool2D:
                layer_type = "{:^21}".format('LayerAveragePool2D')
                layer_2D = True
            elif type(layer) == LayerDense:
                layer_type = "{:^21}".format('LayerDense')
            elif type(layer) == LayerFlatten:
                layer_type = "{:^21}".format('LayerFlatten')
            elif type(layer) == LayerInput:
                layer_type = "{:^21}".format('LayerInput')
            input_shape = "{:^23}".format(str(layer.input_shape))
            output_shape = "{:^23}".format(str(layer.output_shape))
            padding = "{:^9}".format(layer.padding) if layer_2D else "{:^9}".format('None')
            stride = "{:^8}".format(layer.stride) if layer_2D else "{:^8}".format('None')
            kshape = "{:^13}".format(f'({layer.kernel_size},{layer.kernel_size})') if layer_2D else "{:^13}".format('None')
            n_trainable_parameters = "{:^22}".format(layer.n_trainable_parameters)
            activation = "{:^12}".format(layer.activation)
            print(f'|{"{:^7}".format(i)}|{layer_type}|{input_shape}|{kshape}|{stride}|{padding}|{output_shape}|{activation}|{n_trainable_parameters}|')
        print(f' --------------------------------------------------------------------------------------------------------------------------------------------------') 

    def zero_pad(self,matrix, padding):
        return np.pad(matrix,((0,0),(0,0),(padding,padding),(padding,padding)))

    def one_hot(self,Y):
        Y_gt = np.zeros((Y.size, self.layers[-1].n_neurons))
        Y_gt[np.arange(Y.size), Y] = 1
        return Y_gt.T

    def ReLu(self,A):
        return np.maximum(A,0)
    
    def dReLu(self,Z):
        return Z > 0
    
    def SoftMax(self,A):
        return np.exp(A) / sum(np.exp(A))
    
    def activate(self,A,activation):
        if activation == 'relu':
            return self.ReLu(A)
        if activation == 'softmax':
            return self.SoftMax(A)
        if activation == 'linear':
            return A
        
    def dActivation(self,Z,activation):
        if activation == 'relu':
            return self.dReLu(Z)
        if activation == 'linear':
            return 1
    
    def forward_convolve(self,A,layer,is_training):
        A = self.zero_pad(A,layer.padding)
        window = np.lib.stride_tricks.sliding_window_view(A,layer.W.shape[2:4],axis=(2,3))[:,:,::layer.stride,::layer.stride]
        Z = np.einsum('bchwkt,fckt->bfhw', window, layer.W)
        if layer.use_bias:
            Z += layer.b
        if is_training:
            layer.cache = A, Z
        A = self.activate(Z, layer.activation)
        return A
    
    def backward_convolve(self,dA_prev,layer,batch_size,lr,activation):
        A_prev, Z_prev =  layer.cache
        dZ_prev = dA_prev * self.dActivation(Z_prev,activation)
        W_rot = np.rot90(layer.W,k=2,axes=[2,3])
        dZ_shape = dZ_prev.shape
        dZ_dilated = np.zeros((dZ_shape[0],dZ_shape[1],(dZ_shape[2])+(dZ_shape[2]-1)*(layer.stride-1),(dZ_shape[3])+(dZ_shape[3]-1)*(layer.stride-1)),dtype=dZ_prev.dtype)
        dZ_dilated[:,:,::layer.stride,::layer.stride] = dZ_prev
        dZ_paded = self.zero_pad(dZ_dilated,layer.W.shape[2]-1)
        window=np.lib.stride_tricks.sliding_window_view(dZ_paded,W_rot.shape[2:4],axis=(2,3))
        dA = np.einsum('bfhwkt,fckt->bchw', window, W_rot)
        shape_diff = A_prev.shape[2] - dA.shape[2]
        if shape_diff:
            dA = np.pad(dA,((0,0),(0,0),(0,shape_diff),(0,shape_diff)))
            A_prev = A_prev[:,:,:-shape_diff,:-shape_diff]
        if layer.padding:
            dA = dA[:,:,layer.padding:-layer.padding,layer.padding:-layer.padding]
        window=np.lib.stride_tricks.sliding_window_view(A_prev,dZ_dilated.shape[2:4],axis=(2,3))
        dW = 1/batch_size * np.einsum('bchwkt,bfkt->fchw', window, dZ_dilated)
        db = 1/batch_size * np.sum(dZ_prev,axis=(0,2,3),keepdims=True)
        layer.update_weights_and_biases(dW,db,lr)
        layer.cache = None
        return dA
    
    def forward_maxpool(self,A,layer,is_training):
        windows = np.lib.stride_tricks.sliding_window_view(A,(layer.kernel_size,layer.kernel_size),axis=(2,3))[:,:,::layer.stride,::layer.stride]
        A_next = np.max(windows,axis=(4,5))
        A_next_full_shape = np.max(windows,axis=(4,5),keepdims=True)
        mask = (A_next_full_shape == windows).astype(int)
        if is_training:
            layer.cache = (A, mask, A_next_full_shape.shape)
        return A_next

    def backward_maxpool(self,dA,layer):
        A_prev, mask, dA_full_shape = layer.cache
        dA_next = np.ones_like(A_prev)
        dA_window = dA.reshape(dA_full_shape) * mask
        windows = np.lib.stride_tricks.sliding_window_view(dA_next,(2,2),axis=(2,3),writeable=True)[:,:,::layer.stride,::layer.stride]
        windows *= dA_window
        layer.cache = None
        return dA_next

    # def forward_averagepool(self,A,layer,is_training):
    #     windows = np.lib.stride_tricks.sliding_window_view(A,(layer.kernel_size,layer.kernel_size),axis=(2,3))[:,:,::layer.stride,::layer.stride]
    #     A_next = np.mean(windows,axis=(4,5))
    #     A_next_full_shape = np.mean(windows,axis=(4,5),keepdims=True)
    #     if is_training:
    #         layer.cache = (A, A_next_full_shape.shape)
    #     return A_next
    
    # def backward_averagepool(self,dA,layer):
    #     A_prev, dA_full_shape = layer.cache
    #     dA_next = np.ones_like(A_prev)
    #     dA_window = dA.reshape(dA_full_shape) / layer.kernel_size**2
    #     windows = np.lib.stride_tricks.sliding_window_view(dA_next,(2,2),axis=(2,3),writeable=True)[:,:,::layer.stride,::layer.stride]
    #     windows *= dA_window
    #     layer.cache = None
    #     return dA_next
    
    def forward_averagepool(self,A,layer,is_training):
        windows = np.lib.stride_tricks.sliding_window_view(A,(layer.kernel_size,layer.kernel_size),axis=(2,3))[:,:,::layer.stride,::layer.stride]
        A_next = np.mean(windows,axis=(4,5))
        if is_training:
            layer.cache = A
        return A_next
    
    def backward_averagepool(self,dA_prev,layer):
        A_prev = layer.cache
        dA = dA_prev / layer.kernel_size**2
        shape_diff = A_prev.shape[2] - dA.shape[2]
        if shape_diff:
            dA = np.pad(dA,((0,0),(0,0),(0,shape_diff),(0,shape_diff)))
        if layer.padding:
            dA = dA[:,:,layer.padding:-layer.padding,layer.padding:-layer.padding]
        layer.cache = None
        return dA

    def forward_flatten(self,A):
        batch_size, _, _, _ = A.shape
        return A.reshape(batch_size,-1).T
    
    def backward_flatten(self,dA_prev,layer):
        _, channels, height, width = layer.input_shape
        dA = dA_prev.T.reshape(-1,channels, height, width)
        return dA
    
    def forward_dense(self,A,Z,layer,is_training=False):
        if is_training:
            layer.cache = (A,Z)
        Z = layer.W.dot(A) + layer.b
        A = self.activate(Z,layer.activation)
        return A, Z
    
    def backward_dense(self,dA_prev,layer,batch_size,lr,activation):
        A_prev, Z_prev = layer.cache
        dW = 1/batch_size * dA_prev.dot(A_prev.T)
        db = 1/batch_size * np.sum(dA_prev)
        dA = layer.W.T.dot(dA_prev) * self.dActivation(Z_prev,activation)
        layer.update_weights_and_biases(dW,db,lr)
        layer.cache = None
        return dA

    def forward(self,A,is_training=False):
        Z = A.copy()
        for layer in self.layers:
            if type(layer) == LayerConv2D:
                A = self.forward_convolve(A,layer,is_training)
            elif type(layer) == LayerMaxPool2D:
                A = self.forward_maxpool(A,layer,is_training)
            elif type(layer) == LayerAveragePool2D:
                A = self.forward_averagepool(A,layer,is_training)
            elif type(layer) == LayerFlatten:
                A = self.forward_flatten(A)
            elif type(layer) == LayerDense:
                A, Z = self.forward_dense(A,Z,layer,is_training)
        return A
    
    def backward(self,dA,batch_size,lr):
        for layer, activation in zip(self.layers[1:][::-1],self.activations[:-1][::-1]):
            if type(layer) == LayerDense:
                dA = self.backward_dense(dA,layer,batch_size,lr,activation)
            if type(layer) == LayerFlatten:
                dA = self.backward_flatten(dA,layer)
            if type(layer) == LayerMaxPool2D:
                dA = self.backward_maxpool(dA,layer)
            if type(layer) == LayerAveragePool2D:
                dA = self.backward_averagepool(dA,layer)
            if type(layer) == LayerConv2D:
                dA = self.backward_convolve(dA,layer,batch_size,lr,activation)
        
    def fit(self,X_train,y_train,X_test=None,y_test=None,batch_size=100,epochs=10,lr=0.01,verbose=True):
        self.print_progress(verbose,'start')
        for epoch in range(epochs):
            num_batches = X_train.shape[0] // batch_size
            for i in range(0,X_train.shape[0],batch_size):
                X_i = X_train[i:i+batch_size]
                y_i = y_train[i:i+batch_size]
                y_pred_oh = self.forward(X_i,is_training=True)
                y_true_oh = self.one_hot(y_i)
                dA = y_pred_oh - y_true_oh
                self.backward(dA,batch_size,lr)
                self.print_progress(verbose,'batch',epoch=epoch,i=i,num_batches=num_batches,batch_size=batch_size,y_i=y_i,y_pred_oh=y_pred_oh)
            train_acc = self.val(X_train,y_train)
            if (X_test is not None) and (y_test is not None):
                test_acc = self.val(X_test,y_test)
                self.print_progress(verbose,'epoch',epoch=epoch,train_acc=train_acc,test_acc=test_acc)
                self.best_score = max(self.best_score,test_acc)
                self.score = test_acc
            else:
                self.print_progress(verbose,'epoch',epoch=epoch,train_acc=train_acc,test_acc=None)
                self.best_score = max(self.best_score,train_acc)
                self.score = train_acc
        self.print_progress(verbose,'end')

    def print_progress(self,verbose,which,epoch=None,i=None,num_batches=None,batch_size=None,y_i=None,y_pred_oh=None,train_acc=None,test_acc=None):
        if verbose:
            if which == 'start':
                print(f'{"{:^7}".format("epoch")} | {"{:^12}".format("progress")} | {"{:^10}".format("accuracy")}')
                print(f'{"{:<53}".format("-"*53)}')
            elif which == 'batch':
                print(f'{"{:^7}".format(str(epoch))} | [{"{:<10}".format("#"*int(i*10/num_batches/batch_size))}] | {"{:^7}".format(str(round(self.evaluate_accuracy(y_i,np.argmax(y_pred_oh,axis=0)),4)))}',end='\r', flush=True)
            elif which == 'epoch':
                if test_acc is not None:
                    print(f'{"{:^7}".format(str(epoch))} | [{"{:<10}".format("#"*10)}] | train: {"{:^7}".format(str(round(train_acc,4)))} test: {"{:^7}".format(str(round(test_acc,4)))}')
                else:
                    print(f'{"{:^7}".format(str(epoch))} | [{"{:<10}".format("#"*10)}] | train: {"{:^7}".format(str(round(train_acc,4)))}')
            elif which == 'end':
                print(f'{"{:<53}".format("-"*53)}')

    def predict(self,X):
        y = self.forward(X)
        return np.argmax(y,0)

    def evaluate_accuracy(self,y_true,y_pred):
        return (y_true == y_pred).sum() / len(y_true)

    def val(self,X_test,y_true):
        y_pred = self.predict(X_test)
        acc = self.evaluate_accuracy(y_true,y_pred)
        return acc
                
    def save(self,file):
        if not file.endswith('.npz'):
            file = file +'.npz'
        np.savez(file=file,
                 layers=self.layers,
                 activations=self.activations,
                 score = self.score,
                 best_score=self.best_score)
        print(f'Saved as {file}')
        
    def load(self,file):
        if not file.endswith('.npz'):
            file = file + '.npz'
            
        model_archieve = np.load(file,allow_pickle=True)
        self.layers = model_archieve['layers'].tolist()
        self.activations = model_archieve['activations'].tolist()
        self.score = float(model_archieve['score'])
        self.best_score = float(model_archieve['best_score'])
        
        

In [51]:
model = ConvNet()

model.add_layer(LayerConv2D(kernel_size=5,stride=1,padding=1,n_kernels=6,activation='relu'))
model.add_layer(LayerAveragePool2D(kernel_size=2,stride=2,padding=0))
model.add_layer(LayerConv2D(kernel_size=5,stride=1,padding=0,n_kernels=16,activation='relu'))
model.add_layer(LayerAveragePool2D(kernel_size=2,stride=2,padding=0))
model.add_layer(LayerConv2D(kernel_size=5,stride=1,padding=0,n_kernels=120,activation='relu'))
model.add_layer(LayerFlatten())
model.add_layer(LayerDense(84,activation='relu'))
model.add_layer(LayerDense(10,activation='softmax'))

model.compile(X_test.shape)

 --------------------------------------------------------------------------------------------------------------------------------------------------
| Layer |     Layer type      |      Input shape      | Kernel size | Stride | Padding |      Output shape     | Activation | Trainable Parameters |
 --------------------------------------------------------------------------------------------------------------------------------------------------
|   0   |     LayerInput      |   (None, 1, 32, 32)   |    None     |  None  |  None   |   (None, 1, 32, 32)   |   linear   |          0           |
|   1   |     LayerConv2D     |   (None, 1, 32, 32)   |    (5,5)    |   1    |    1    |   (None, 6, 30, 30)   |    relu    |         156          |
|   2   | LayerAveragePool2D  |   (None, 6, 30, 30)   |    (2,2)    |   2    |    0    |   (None, 6, 15, 15)   |   linear   |          0           |
|   3   |     LayerConv2D     |   (None, 6, 15, 15)   |    (5,5)    |   1    |    0    |  (None, 16, 11, 11)

In [52]:
model.fit(X_train/256,y_train,X_test/256,y_test,batch_size=1000,epochs=1,lr=0.01)

 epoch  |   progress   |  accuracy 
-----------------------------------------------------
   0    | [##########] | train: 0.7037  test: 0.7091 
-----------------------------------------------------


In [61]:
model.fit(X_train/256,y_train,X_test/256,y_test,batch_size=1000,epochs=1,lr=0.01)

 epoch  |   progress   |  accuracy 
-----------------------------------------------------
   0    | [##########] | train: 0.7856  test: 0.7923 
-----------------------------------------------------


In [62]:
model.save('LeNet_from_scratch')

Saved as LeNet_from_scratch.npz


In [56]:
model = ConvNet('LeNet_from_scratch.npz')

In [60]:
model.val(X_test/256,y_test)

0.7091