In [3]:
import numpy as np

In [4]:
# from Library import NN_lib

In [5]:
class NN_lib():
    
    def __init__(self):
        
#         import numpy as np
        
        self.epsilon = 1e-5
        
        self.act_func_map = {'relu':self.ReLU, 'sigmoid':self.sigmoid, 'tanh':self.tanh, 
                             'none':self.none, 'softmax':self.softmax}
        
        self.act_diff_map = {'relu':self.ReLU_diff, 'sigmoid':self.sigmoid_diff, 
                             'tanh':self.tanh_diff, 'softmax':self.softmax_diff}
        
        self.loss_error_map = {'mse':self.mse_error, 'binary_crossentropy':self.bincross_error, 
                               'categorical_crossentropy':self.catcross_error, 
                               'sparse_categorical_crossentropy':self.sparcat_error}
        
        self.metric_map = {'mse':self.calc_mse, 'binary_crossentropy':self.calc_bincross, 
                           'categorical_crossentropy':self.calc_catcross, 
                           'accuracy':self.calc_accuracy, 
                           'sparse_categorical_crossentropy':self.calc_sparcat}
            
    def tanh(self, X):
        return np.divide( (np.exp(X) - np.exp(-X)), (np.exp(X) + np.exp(-X) + self.epsilon) )
    
    def none(self, X):
        return X
    
    def sigmoid(self, X):
        f = np.vectorize(self.stable_sigmoid)
        return f(X)
        
    def stable_sigmoid(self, x):
        if x >= 0:
            z = np.exp(-x)
            return 1 / (1 + z)
        else:
            z = np.exp(x)
            return z / (1 + z)
    
    def ReLU(self, X):
        return np.maximum(X, 0)
    
    def softmax(self, X):
        X = X - np.max(X, axis=1, keepdims=True)
        numerator = np.exp(X)
        denominator = np.sum(numerator, axis=1, keepdims=True)
        return np.divide(numerator, denominator)
    
    def tanh_diff(self, X):
        return 1 - np.square(self.tanh(X))
    
    def sigmoid_diff(self, X):
        return np.multiply(self.sigmoid(X), (1-self.sigmoid(X)))
    
    def ReLU_diff(self, X):
        X[X<=0] = 0
        X[X>0] = 1
        return X
    
    def softmax_diff(self, X):
        return np.multiply(self.softmax(X), (1-self.softmax(X)))
    
        
    def compile_network(self, loss, *args):
        
        if len(args) == 1:
            self.metrics = args[0]
        else:
            self.metrics = []
        self.metrics.insert(0, loss)
        self.loss_func_error = self.loss_error_map[loss]
        self.loss = loss
        
    def predict(self, X, return_sequences=False):
        self.forward_pass(X)
        if self.loss == 'binary_crossentropy':
            if return_sequences==False:
                return (self.sequences[-1] > 0.5).astype(int)
            else:
                return (self.sequences > 0.5).astype(int)
        else:
            if return_sequences==False:
                return self.sequences[-1]
            else:
                return self.sequences
        
    def clip_gradients(self, grad):
        grad[grad>1] = 1
        grad[grad<-1] = -1
        return grad
        
    def mse_error(self, y, y_hat):      
        n_samples = y.shape[0]
        e = -(2/n_samples)*(y-y_hat)
        return e
            
    def bincross_error(self, y, y_hat):     
        n_samples = y.shape[0]
        e = (1/n_samples)*( -np.divide(y, y_hat+ self.epsilon) + np.divide((1-y), (1-y_hat+ self.epsilon)) )   
        return e
    
    def catcross_error(self, y, y_hat):     
        n_classes = y.shape[1]
        n_samples = y.shape[0]
        e = (1/(n_samples*n_classes))*( -np.divide(y, y_hat + self.epsilon) ) 
        self.kk = e  
        return e

    def sparcat_error(self, y, y_hat):
        n_samples = y_hat.shape[0]
        n_classes = y_hat.shape[1]
        sample_inds = np.arange(0, n_samples).reshape(-1, 1)
        e = np.zeros(y_hat.shape)
        e[sample_inds, y] = 1/(y_hat[sample_inds, y] + self.epsilon)
        e = -(1/n_samples*n_classes)*e
        self.kk = e
        return e
    
    def calc_mse(self, X, y):
        n_samples = y.shape[0]
        y_hat = self.predict(X)
        return np.sum(np.square(y - y_hat))/n_samples
    
    def calc_bincross(self, X, y):
        n_samples = y.shape[0]
        self.forward_pass(X)
        y_hat = self.activations[-1]
        return -np.sum(np.multiply(y, np.log(y_hat + self.epsilon)) + np.multiply((1-y), np.log(1-y_hat + self.epsilon)) ) / n_samples
    
    def calc_catcross(self, X, y):
        n_samples = y.shape[0]
        n_classes = y.shape[1]
        self.forward_pass(X)
        y_hat = self.activations[-1]
        return -np.sum(np.multiply(y, np.log(y_hat + self.epsilon)))/(n_samples*n_classes)
    
    def calc_sparcat(self, X, y):
        n_samples = y.shape[0]
        self.forward_pass(X)
        y_hat = self.activations[-1]
        n_classes = y_hat.shape[1]
        sample_inds = np.arange(0, n_samples).reshape(-1, 1)
        y_target_probs = y_hat[sample_inds, y]
        return -np.sum(np.log(y_target_probs + self.epsilon))/(n_samples*n_classes)

    def calc_rmse(self, X, y):
        return np.sqrt(self.calc_mse(X, y))    
        
    def calc_r2(self, X, y):
        n_samples = y.shape[0]
        y_hat = self.predict(X)
        r2 = 1 - (np.sum(np.square(y - y_hat))) / (np.sum(np.square(y - np.mean(y))))
        return r2
    
    def calc_accuracy(self, X, y):
        n_samples = y.shape[0]
        if self.loss == 'binary_crossentropy':
            y_hat = (self.predict(X) > 0.5).astype(int)
            return np.sum(y == y_hat) / n_samples
        elif self.loss == 'categorical_crossentropy':
            y_hat = np.argmax(self.predict(X), axis=1)
            y =  np.argmax(y, axis=1)
            return np.sum(y == y_hat) / n_samples
        elif self.loss == 'sparse_categorical_crossentropy':
            y = y.flatten()
            y_hat = np.argmax(self.predict(X), axis=1)
            return np.sum(y == y_hat) / n_samples

    def evaluate(self, X, y):
        
        if len(y.shape) == 1:
            y = np.expand_dims(y, 1)
        
        if self.loss == 'mse':
            print(f"Root mean square error: {self.calc_rmse(X, y)}")
            print(f"R2 score: {self.calc_r2(X, y)}")
            
        else:
            print(f"Accuracy: {self.calc_accuracy(X, y)}")



In [37]:
class CNN(NN_lib):
    
    def __init__(self):
        super().__init__() 
        self.layer_map = {'conv':self.conv_pass, 'dense':self.dense_pass, 'flatten':self.flatten_pass, 'pool':self.pool_pass, 'bn':self.bn_pass}
        self.pool_map = {'max':np.max, 'avg':np.mean}
        
    def initialise(self):
        self.params = []
        self.fixed_params = []
        self.layer_type = []
        self.activations = []
        self.act_shapes = []
        
    def add(self, layer):
        layer
    
    def Conv(self, n_filters, filter_shape, strides=(1,1), padding=(0, 0), act_func='none', input_shape=None):
        if input_shape == None:
            input_shape = self.act_shapes[-1]
        else:
            self.act_shapes.append(input_shape)
        self.params.append([(np.random.randn(input_shape[-1], *filter_shape)*2/np.sqrt(sum(list(filter_shape))), 0) for _ in range(n_filters)])
        self.fixed_params.append([n_filters, filter_shape, strides, padding, act_func])
        self.layer_type.append('conv')
        
        output_rows = np.floor((input_shape[0] - filter_shape[0] + 2*padding[0])/strides[0]) + 1
        output_cols = np.floor((input_shape[1] - filter_shape[1] + 2*padding[1])/strides[1]) + 1
        self.act_shapes.append((int(output_rows), int(output_cols), n_filters))
        
    def Pooling(self, type='max', pool_by=(2,2), strides=None):
        if strides == None:
            strides = pool_by
        self.params.append([])
        self.fixed_params.append([type, pool_by, strides])
        self.layer_type.append('pool')
        
        input_shape = self.act_shapes[-1]
        output_rows = np.floor((input_shape[0] - pool_by[0])/strides[0]) + 1
        output_cols = np.floor((input_shape[1] - pool_by[1])/strides[1]) + 1
        self.act_shapes.append((int(output_rows), int(output_cols), self.act_shapes[-1][-1]))
        
    def BatchNormalisation(self):
        self.params.append([])
        self.fixed_params.append([])
        self.layer_type.append('bn')
        
        self.act_shapes.append(self.act_shapes[-1])
        
    def Flatten(self):
        self.params.append([])
        self.fixed_params.append([])
        self.layer_type.append('flatten')
        
        act_len = 1
        for dim_len in self.act_shapes[-1]:
            act_len = np.int(act_len*dim_len)
            
        self.act_shapes.append((act_len,))
        
    def Dense(self, n_neurons, act_func='none'):
        inp_dim = self.act_shapes[-1][0]
        self.params.append([np.random.randn(inp_dim, n_neurons)*2/np.sqrt(n_neurons), np.zeros(n_neurons)])
        self.fixed_params.append([act_func])
        self.layer_type.append('dense')
        
        self.act_shapes.append((n_neurons,))
        
    def forward_pass(self, X):
        self.activations = [[] for _ in range(len(self.layer_type))]
        self.activations.insert(0, X)
        for i in range(len(self.params)):
            layer_type = self.layer_type[i]
            self.activations[i+1] = self.layer_map[layer_type](self.activations[i], i)
            
    def pad_img(self, img, i):
        padding = self.fixed_params[i][3]
        
        if padding == 'same':
            dim_row, dim_col = self.activations[i][1:]
            flt_row, flt_col = self.fixed_params[i][1]
            str_row, str_col = self.fixed_params[i][2]
            pad_row = np.ceil(((dim_row - 1)*str_row)+flt_row-dim_row)
            pad_col = np.ceil(((dim_col - 1)*str_col)+flt_col-dim_col)
        else:
            pad_row, pad_col = padding
                
        if pad_row % 2 == 0:
            pad_top = pad_bottom = int(pad_row/2)   
        else:
            pad_top = int(pad_row//2)
            pad_bottom = int(pad_row - pad_top)
            
        if pad_col % 2 == 0:
            pad_left = pad_right = int(pad_col/2)       
        else:
            pad_left = int(pad_col//2)
            pad_right = int(pad_col - pad_left)
            
        result = []
        for i in range(img.shape[0]):
            self.foo = img[i]
            result.append(np.pad(img[i], ((pad_top, pad_bottom), (pad_left, pad_right), (0, 0)), 'constant'))
        return np.array(result)
            
    def conv_pass(self, X, i):
        
        X_padded = self.pad_img(X, i)
        activations = []
        act_fun = self.act_func_map[self.fixed_params[i][4]]
        fil_ver_len, fil_hor_len = self.fixed_params[i][1][0], self.fixed_params[i][1][1]
        max_hor, max_ver = X_padded.shape[1]-1, X_padded.shape[2]-1
        
        for kernel in self.params[i]:
            
            act_layer = []
            act_single_row = []
            hor_pointer, ver_pointer = 0, 0
            
            while(ver_pointer + fil_ver_len -1 <= max_ver):
                
                img_portion = X_padded[:, hor_pointer:hor_pointer+fil_hor_len, ver_pointer:ver_pointer+fil_ver_len, :]
                act_single_row.append(act_fun(np.sum(np.multiply(img_portion, kernel[0]) + kernel[1])))
            
                if (hor_pointer + fil_hor_len -1 > max_hor):
                    act_layer.append(act_single_row)
                    act_single_row = []
                    hor_pointer = 0
                    ver_pointer = ver_pointer + self.fixed_params[i][2][1]
                
                else:
                    hor_pointer = hor_pointer + self.fixed_params[i][2][0]
           
            activations.append(act_layer)    
        return np.array(activations)
        
    def pool_pass(self, X, i):
        activations = []
        pool_ver_len, pool_hor_len = self.fixed_params[i][1][0], self.fixed_params[i][1][1]
        max_hor, max_ver = X.shape[1], X.shape[2]
        
        for activation in X:
            act_layer = []
            act_single_row = []
            hor_pointer, ver_pointer = 0, 0
            
            while(ver_pointer + pool_ver_len -1 <= max_ver):
                
                act_portion = activation[hor_pointer:hor_pointer+pool_hor_len, ver_pointer:ver_pointer+pool_ver_len]
                act_single_row.append(self.pool_map[self.fixed_params[i][0]](act_portion))
                if (hor_pointer + pool_hor_len -1 > max_hor):
                    act_layer.append(act_single_row)
                    act_single_row = []
                    hor_pointer = 0
                    ver_pointer = ver_pointer + pool_ver_len - 1 + self.fixed_params[i][2][1]
                
                else:
                    hor_pointer = hor_pointer + pool_hor_len -1 + self.fixed_params[i][2][0]
           
            activations.append(act_layer)
        return np.array(activations)
        
    def bn_pass(self, X, i):
        mean = np.dstack([np.mean(X, axis=-1)]*X.shape[-1])
        std = np.dstack([np.std(X, axis=-1)]*X.shape[-1])
        return np.divide(np.subtract(X, mean), std + self.epsilon)
    
    def flatten_pass(self, X, i):
        return X.reshape(X.shape[0], -1)
    
    def dense_pass(self, X, i):
        act_func = self.act_func_map[self.fixed_params[i][0]]
        return act_func(np.dot(X, self.params[i][0]) + self.params[i][1])  
    
    def summary(self, input_shape):
        table_top = '-'*86
        print(table_top)
        contents = []
        col_len = [16, 44, 20]
        contents.append(['Layer', 'Parameters', 'Activations'])
        for i in range(len(self.layer_type)):          
            if self.layer_type[i] == 'conv':
                params = f'{self.fixed_params[i][0]} filters of shape {self.fixed_params[i][1]}'       
            elif self.layer_type[i] == 'dense':
                params = f'weights: {self.params[i][0].shape}, biases: {self.params[i][1].shape}'        
            else:
                params = '[]'             
            contents.append([self.layer_type[i], params, f'{self.act_shapes[1:][i]}'])
            
        for item in contents:
            show = '|'
            for i in range(len(item)):
                space_gap =  ' '*int((col_len[i]-len(item[i]))/2)
                show = show + space_gap + item[i] + space_gap + '|'
                              
            print(show)
            print(table_top)
                             

In [38]:
model = CNN()

In [8]:
from tensorflow.keras.datasets import mnist

In [9]:
(X_train, y_train), (X_test, y_test) = mnist.load_data()

In [10]:
X_train.shape, y_train.shape

((60000, 28, 28), (60000,))

In [39]:
model.initialise()

model.add(model.Conv(16, (3,3), act_func='relu', input_shape=(28, 28, 1)))
model.add(model.Pooling('avg', (2, 2)))
model.add(model.BatchNormalisation())
          
model.add(model.Conv(16, (3,3), strides = (2, 2), act_func='relu'))
model.add(model.Pooling('max', (2, 2)))
model.add(model.BatchNormalisation())

model.add(model.Flatten())
model.add(model.Dense(10, act_func='relu'))
model.add(model.Dense(1, act_func='relu'))

model.summary((28, 28, 1))

--------------------------------------------------------------------------------------
|     Layer     |                 Parameters                 |    Activations    |
--------------------------------------------------------------------------------------
|      conv      |         16 filters of shape (3, 3)         |    (26, 26, 16)    |
--------------------------------------------------------------------------------------
|      pool      |                     []                     |    (13, 13, 16)    |
--------------------------------------------------------------------------------------
|       bn       |                     []                     |    (13, 13, 16)    |
--------------------------------------------------------------------------------------
|      conv      |         16 filters of shape (3, 3)         |     (6, 6, 16)     |
--------------------------------------------------------------------------------------
|      pool      |                     []              

In [12]:
x_ = np.expand_dims(X_train[:1000], -1)

In [40]:
model.forward_pass(x_)

ValueError: operands could not be broadcast together with remapped shapes [original->remapped]: (3,2) and requested shape (2,2)

In [42]:
model.foo.shape

(9, 10)

In [58]:
model.activations[2].shape

(16, 9, 10)