In [12]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from tqdm import tqdm
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from mlxtend.data import loadlocal_mnist


In [27]:
class NeuralNetwork:
    activations = ['sigmoid', 'tanh', 'relu',
                   'leaky_relu', 'linear', 'softmax']
    weight_init_funcs = ['zero', 'random', 'normal']

    def __init__(self, n_layers, layer_sizes, lr, activation, weight_init_func, epochs, batch_size) -> None:
        self.n_layers = n_layers
        self.layer_sizes = layer_sizes
        self.lr = lr
        self.activation = activation
        self.weight_init_func = weight_init_func
        self.epochs = epochs
        self.batch_size = batch_size
        self.w = {}
        self.b = {}
        self.init_weights()
        
    def activation_func(self, x, derivative=False):
        activation = self.activation
        if activation == 'sigmoid':
            if derivative:
                t = self.activation_func(x,activation)
                return t * (1 - t)
            return 1 / (1 + np.exp(-x))
        elif activation == 'tanh':
            if derivative:
                t = self.activation_func(x, activation)
                return 1 - t**2
            return np.tanh(x)
        elif activation == 'relu':
            if derivative:
                return np.where(x > 0, 1, 0)
            return np.where(x > 0, x, 0)
        elif activation == 'leaky_relu':
            if derivative:
                return np.where(x > 0, 1, 0.01)
            return np.where(x > 0, x, 0.01 * x)
        elif activation == 'linear':
            if derivative:
                return np.ones(x.shape)
            return x
        elif activation == 'softmax':
            if derivative:
                t = self.activation_func(x, activation)
                return t * (1 - t)
            return np.exp(x) / np.sum(np.exp(x), axis=0)
        
    def _weight(self, shape, weight_init_func):
        if weight_init_func == 'zero':
            return np.zeros(shape)
        elif weight_init_func == 'random':
            return np.random.rand(shape[0],shape[1])*0.01
        elif weight_init_func == 'normal':
            return np.random.randn(shape[0],shape[1])*0.01
        
    
    def init_weights(self):
        for i in range(1, self.n_layers):
            self.w[i] = self._weight((self.layer_sizes[i-1], self.layer_sizes[i]), self.weight_init_func)
            self.b[i] = self._weight((1, self.layer_sizes[i]), self.weight_init_func)
        return self.w, self.b
    
        
    
    def forward(self, X):
        for i in range(1, self.n_layers):
            
            X = self.activation_func(X, self.activation)
        caches = {}
        A = X

        # print(X.shape)

        L = self.n_layers

        for i in range(0, L-1):

            A_prev = A
            Z = np.dot(A_prev, self.w[i+1]) + self.b[i+1]

            A = self.activation_func(Z)

            caches[str(i+1)] = (Z, A)
            A_prev = A

        Z_l = np.dot(A_prev, self.w[L]) + self.b[L]
        A_l = self.activation_func(Z_l)

        caches[str(L)] = (Z_l, A_l)

        return A_l, caches
            
    
    def backward(self, X, y, caches):
        grads = {}

        L = self.n_layers - 1
        Lx = len(X)
        caches[str(0)] = (X, X)

        A = caches[str(L)][1]

        dZ = A-y

        dW = np.dot(caches[str(L-1)][1].T, dZ) / Lx
        db = np.sum(dZ, axis=0, keepdims=True) / Lx

        dA_prev = np.dot(dZ, self.parameters["W" + str(L)].T)

        grads["dW" + str(L)] = dW
        grads["db" + str(L)] = db

        for i in range(L - 1, 0, -1):
            d_act = self.activation_func(caches[str(i)][0], derivative=True)

            # print(dA_prev.shape)
            # print(d_act.shape)
            # dZ = np.matmul(dA_prev, d_act)
            dZ = dA_prev * d_act
            # print(dZ)
            dW = np.dot(caches[str(i-1)][1].T, dZ) / Lx
            db = np.sum(dZ, axis=0, keepdims=True) / Lx

            if i > 1:
                dA_prev = np.dot(dZ, self.parameters["W" + str(i)].T)

            grads["dW" + str(i)] = dW
            grads["db" + str(i)] = db

        self.update_weights(grads)

        return grads
    
    def update_weights(self, grads):
        N = self.n_layers
        
        for i in range(1, N):
            self.w[i] -= self.lr * grads['dW'+str(i)]
            self.b[i] -= self.lr * grads['db'+str(i)]
    
    def labels_to_class(self, y):
        m = len(y)
        c = int(np.max(y))
        y_classes = np.zeros((m, c+1))
        for i in range(m):
            l = int(y[i])
            y_classes[i, l] = 1

        return y_classes
    
    def reset_gradients(self):
        pass
    
        
    def fittt(self, X, y , x_test=None, y_test=None):
        y = self.labels_to_class(y)
        
        m , n_0 = X.shape
        n_l = y.shape[1]

        parameters = self.initializeWeights()
        self.parameters = parameters

        train_loss_history = []
        train_accuracy_history = []
        test_loss_history = []
        test_accuracy_history = []

        

        for epoch in tqdm(range(self.epochs), desc = "Progress Total : ", position = 0, leave = True):


            n_batches = m//self.batch_size
            X_batches = [X[self.batch_size*i:self.batch_size*(i+1),:] for i in range(0,n_batches)]
            y_batches = [y[self.batch_size*i:self.batch_size*(i+1),:] for i in range(0,n_batches)]

            train_batch_loss = []
            test_batch_loss = []
            train_batch_accuracy = []
            test_batch_accuracy = []

            for curr_x, curr_y in tqdm(zip(X_batches,y_batches), desc = "Progress Epoch: " + str(epoch+1) + "/" + str(self.num_epochs), position = 0, leave = True, total = len(X_batches)):
                A, caches = self.forward_propogation(curr_x,parameters)

                train_cost = self.cross_entropy_loss(A,curr_y)
                train_batch_loss.append(train_cost)
#                 print(A)
                self.backward_propogation(curr_x,curr_y, caches)
#                 train_batch_accuracy.append(self.score(curr_x,np.argmax(curr_y,axis = 1)))
                if(x_test is not None):
                    proba = self.predict_proba(x_test)
#                     print(proba.shape)
                    test_loss = self.cross_entropy_loss(proba, self.labels_to_class(y_test))
                    test_batch_loss.append(test_loss)
#                     test_batch_accuracy.append(self.score(x_test, y_test))
                    
#             print("Training Accuracy : ", np.array(train_batch_accuracy).mean())
#             print("Validation Accuracy : ", np.array(test_batch_accuracy).mean())
            print("Testing loss : " ,np.array(test_batch_loss).mean())
            print("Training Loss : ", np.array(train_batch_loss).mean())
            


            train_loss_history.append( np.array(train_batch_loss).mean())
#             train_accuracy_history.append( np.array(train_batch_accuracy).mean())
            test_loss_history.append( np.array(test_batch_loss).mean())
#             test_accuracy_history.append(  np.array(test_batch_accuracy).mean())
                
                
        
        self.train_loss_history = train_loss_history
        self.train_accuracy_history = train_accuracy_history
        self.test_loss_history = test_loss_history
        self.test_accuracy_history = test_accuracy_history
        
        
        self.parameters = parameters


        return self  
        
        
    def fit (self, X_train, Y_train, X_val, Y_val):
        
        Y_train = self.labels_to_class(Y_train)
        
        m = X_train.shape[0]
        no_batches = m//self.batch_size
        datas=[]
        for subset in range(0,no_batches):
            mini_X = X_train[self.batch_size*subset : self.batch_size*(subset+1), :]
            mini_Y = Y_train[self.batch_size*subset : self.batch_size*(subset+1), :]
            datas.append((mini_X,mini_Y))
        
        
        print(X_train.shape)
        print(Y_train.shape)
        # print(X_test.shape)
        # print(Y_test.shape)
        
        self.init_weights()
        
        # self.parameters = params
        
        train_loss = []
        val_loss = []
      
        for epoch in range(1, self.epochs+1):
            print("Epoch: ", epoch)
            trainbatchloss = []
            valbatchloss = []

            for x_b, y_b in datas:
                A_l, caches = self.forward(x_b)
                train_cost = self.cross_entropy_loss(A_l, y_b)
                trainbatchloss.append(train_cost)
                self.backward(x_b,y_b, caches)
                proba = self.predict_proba(X_val)
                valloss = self.cross_entropy_loss(proba, self.labels_to_class(Y_val))
                valbatchloss.append(valloss)

            
            l1 = np.array(trainbatchloss).mean()
            l2 = np.array(valbatchloss).mean()


            train_loss.append(l1)
            val_loss.append(l2)
            
            print("Training loss : " ,l1)
            print("Validation Loss : ", l2)
              
              
      
        self.train_loss = train_loss
        self.val_loss = val_loss
  
    
     
    def cross_entropy_loss(self, A_l, y_test):
        
        temp=A_l[np.arange(len(y_test)), y_test.argmax(axis=1)]
        temp=np.where(temp>0.0000000000001,temp,0.000000000001)
        logp = - np.log(temp)
        celoss = np.sum(logp)/len(y_test)
        return celoss
        
        
        # m = len(y_test)
        
        # # logprods = np.dot(y_test, np.log(A_l).T) + np.dot((1-y_test), np.log(1-A_l).T)
        # # cost = -1/n*np.sum(logprods)
        
        # logp = - np.log(1e-7 + A_l[np.arange(m), y_test.argmax(axis=1)])
        # loss = np.sum(logp)/m
        # return loss 
            
    def predict_proba(self, X):
        
        prob, caches = self.forward_propogation(X, self.parameters)
        
        return prob
    
    def predict(self, X):
        
        prob = self.predict_proba(X)
        Y_prediction = np.argmax(prob, axis = 1)
        return Y_prediction
    
    def score(self, X, Y):
        Y_prediction = self.predict(X)
        count = 0
        for i in range(len(Y)):    
            if (Y_prediction[i] == Y[i]):
                count+=1
        return count/len(Y)
        
        
        
    # def fit(self,X, y):
    #     for e in self.epochs:
    #         print(f'Epoch {e}')
    #         loss = 0
            
    #         for i in range(0, X.shape[0], self.batch_size):
    #             X_batch = X[i:i+self.batch_size]
    #             y_batch = y[i:i+self.batch_size]
    #             loss += self.backward(X_batch, y_batch)
    #             self.update_weights()
    #             self.reset_gradients()
    
    # def predict(self,X):
    #     pass

    # def predict_proba(self,X):
    #     pass

    # def score(self,X, y):
    #     pass


In [14]:
train_images, train_labels = loadlocal_mnist(
    images_path='../data/mnist/train-images-idx3-ubyte', labels_path='../data/mnist/train-labels-idx1-ubyte')
test_images, test_labels = loadlocal_mnist(
    images_path='../data/mnist/t10k-images-idx3-ubyte', labels_path='../data/mnist/t10k-labels-idx1-ubyte')

train_images, val_images, train_labels, val_labels = train_test_split(
    train_images, train_labels, test_size=0.15)


X_train, y_train = train_images, train_labels
X_test, y_test = test_images, test_labels
X_val, y_val = val_images, val_labels



(51000, 784)


### relu

In [28]:
mlp = NeuralNetwork(6, [784, 256, 128, 64, 32, 10], 0.1, 'ReLU', 'normal', 30, 128)
mlp.fit(X_train, y_train, X_val, y_val)
print(f'Accuracy: {mlp.score(X_test, y_test)}')

(51000, 784)
(51000, 10)
Epoch:  1


TypeError: unsupported operand type(s) for *: 'NoneType' and 'float'