In [1]:
import numpy as np
import pandas as pd
from matplotlib import pyplot as plt
import math
data = pd.read_csv('/Users/tanhoangminhco/Documents/Coding/Python/Machine Learning/datasets/mnist_test.csv')

In [2]:
class CNN:
    def __init__(self):
        self.bpg_input = None
        self.convo1 = None
        self.convo2 = None
        self.pooled = None
        self.neurons = 15
        self.m = None
        self.W1, self.b1, self.W2, self.b2 = None, None, None, None
        self.X_train, self.Y_train, self.X_test, self.Y_test = None, None, None, None
        self.kernel = None
        self.best_kernel = None
        self.N_of_kernel = 5
        self.Len_of_kernel = 1
        self.iter_each_epoch = 3000
        self.epoch = 2
        self.alpha = 0.1
        self.highest_train_accuracy = 0

    def prepare_data(self, data, test_size=0.9):
        data = np.array(data)
        m, n = data.shape
        np.random.shuffle(data)
        test_size = int(m * test_size)

        data_test = data[:test_size].T #785 x test_size
        Y_test = data_test[0] #test_size x 1
        X_test = data_test[1:n] #784 x test_size
        X_test = X_test / 255.

        data_train = data[test_size:].T #785 x train_size
        Y_train = data_train[0] #train_size x 1
        X_train = data_train[1:n]
        X_train = X_train / 255.

        self.m = m - test_size

        return X_train, Y_train, X_test, Y_test
    
    def input_For_Back_prop(self, X_Train, stride=1):
        #This method should be used only once
        a, b = X_Train.shape
        len_kernel = self.Len_of_kernel
        max = int(math.sqrt(a))
        res = []
        for i in range(b):
            data = X_Train[:,i]
            data = data.reshape(max,max)
            r = 0
            while (r+len_kernel <= max):
                c = 0
                while (c+len_kernel <= max):
                    x = data[r:r+len_kernel, c:c+len_kernel]
                    res.append(x.reshape(1,-1))
                    c += stride
                r += stride
        return np.array(res).reshape(self.m, -1, len_kernel**2)
    
    def conv(self, BigData, kernels, stride=1):
        a, b = BigData.shape
        max = int(math.sqrt(a))
        res = []
        k = self.Len_of_kernel

        for i in range(b):
            data = BigData[:,i]
            data = data.reshape(max,max)

            for kernel in kernels:
                r = 0
                while (r+k <= max):
                    c = 0
                    while (c+k <= max):
                        x = data[r:r+k, c:c+k] * kernel
                        res.append(np.sum(x))
                        c += stride
                    r += stride
        size_of_each = int(len(res)/(b * self.N_of_kernel))
        self.convo = np.array(res).reshape(b, self.N_of_kernel, 1, size_of_each)
        return self.convo
    
    def pooling(self, BigData, k=4):
        a,b,_,d = BigData.shape
        max = int(math.sqrt(d))
        res = []
        for i in range(a):
            for y in range(b):
                r = 0
                data = BigData[i][y][0]
                data = data.reshape(max,max)
                while (r+k <= max):
                    c = 0
                    while (c+k <= max):
                        x = data[r:r+k, c:c+k]
                        res.append(np.max(x))
                        c += k
                    r += k
        size_of_each = int(len(res)/(a*b))
        self.pooled = np.array(res).reshape(a,b,_,size_of_each)
        return self.pooled
    
    def flatten(self, BigData):
        m,k,_,size = BigData.shape
        return BigData.reshape(m,size*k).T
    
    def draw(self, current_image):
        current_image = current_image[0]
        size = int(math.sqrt(len(current_image)))
        current_image = current_image.reshape((size,size)) * 255
        plt.gray()
        plt.imshow(current_image, interpolation='nearest')
        plt.show()
    
    def init_params(self, n):
        W1 = np.random.rand(self.neurons, n) - 0.5
        b1 = np.random.rand(self.neurons,1) - 0.5
        W2 = np.random.rand(10, self.neurons) - 0.5
        b2 = np.random.rand(10,1) - 0.5
        return W1, b1, W2, b2
    
    def one_hot(self, Y):
        one_hot_Y = np.zeros((Y.size, Y.max() + 1))
        one_hot_Y[np.arange(Y.size), Y] = 1
        one_hot_Y = one_hot_Y.T
        return one_hot_Y
    
    def ReLU(self, Z):
        return np.maximum(0,Z)
    
    def SoftMax(self, A):
        A = np.exp(A) / sum(np.exp(A))
        return A
    
    def ReLU_deriv(self, Z):
        return Z > 0
    
    def poolingAndConv_deriv(self, BigData, pooled, k=4):
        a,b,_,d = BigData.shape
        max = int(math.sqrt(d))
        pooled = pooled.reshape(a,b,-1, int(max/k))
        for i in range(a):
            for y in range(b):
                r = 0
                p_row = 0
                data = BigData[i][y][0]
                pool = pooled[i][y]
                data = data.reshape(max,max)
                while (r+k <= max):
                    c = 0
                    p_col = 0
                    while (c+k <= max):
                        x = data[r:r+k, c:c+k]
                        maximum = np.max(x)
                        data[r:r+k, c:c+k] = (data[r:r+k, c:c+k] == maximum) * pool[p_row][p_col]
                        c += k
                        p_col += 1
                    
                    r += k
                    p_row += 1
        return BigData
    
    def get_predictions(self, A2):
        return np.argmax(A2, 0)

    def get_accuracy(self, predictions, Y):
        return np.sum(predictions == Y) / Y.size

    def forward_prop(self, W1, b1, W2, b2, X):
        Z1 = W1.dot(X)
        A1 = self.ReLU(Z1 + b1)
        Z2 = W2.dot(A1)
        A2 = self.SoftMax(Z2 + b2)
        return Z1, A1, A2
    
    def convo_For_Back_prop(self, convo1):
        return convo1.reshape(self.m ,self.N_of_kernel,-1)
    
    def input_For_Back_prop(self, X_train, stride=1):
        #This method should be used only once
        a, b = X_train.shape
        max = int(math.sqrt(a))
        res = []
        for i in range(b):
            data = X_train[:,i]
            data = data.reshape(max,max)
            r = 0
            while (r+self.Len_of_kernel <= max):
                c = 0
                while (c+self.Len_of_kernel <= max):
                    x = data[r:r+self.Len_of_kernel, c:c+self.Len_of_kernel]
                    res.append(x.reshape(1,-1))
                    c += stride
                r += stride
        return np.array(res).reshape(self.m, -1, self.Len_of_kernel ** 2)

    def back_prop_convo(self,X,one_hot_Y,Z1,A1,A2,W1,W2, convo1, convo2, pooled, convo_back_prop):
        m = self.m
        dZ2 = A2 - one_hot_Y
        dW2 = 1/m * ( dZ2 @ A1.T )
        db2 = 1/m * np.sum(dZ2, axis=1).reshape(-1,1)
        dZ1 = W2.T @ dZ2 * self.ReLU_deriv(Z1)
        dW1 = 1/m  * ( dZ1 @ X.T )
        db1 = 1/m * np.sum(dZ1, axis=1).reshape(-1,1)
        ####
        dKernel = None
        if (convo_back_prop):
            dX = 1/m * W1.T @ dZ1
            dX = dX.reshape(self.m, self.N_of_kernel , 1, -1)#(m,3,)
            dConvo2 = self.poolingAndConv_deriv(convo2, pooled)
            dConvo1 = dConvo2 * self.ReLU_deriv(convo1) #(m,3,24,24)
            #####
            convo1_back = self.convo_For_Back_prop(dConvo1)
            dKernel = 1/m * (convo1_back @ self.bpg_input).sum(axis = 0)
            dKernel = dKernel.reshape(self.N_of_kernel, self.Len_of_kernel, self.Len_of_kernel)

        return dW1, db1, dW2, db2, dKernel
    
    def gradient_descent(self, X, Y, epochs, alpha):
        
        # n,_ = conv_X_train.shape
        n = 36 * self.N_of_kernel ###Fix later
        aW1, aW2, aW3, aW4 = self.init_params(n)
        one_hot_Y = self.one_hot(Y.reshape(1,-1))
        
        for epoch in range(epochs):
            convo_back_prop = True
            self.convo1 = self.conv(X, self.kernel)
            self.convo2 = self.ReLU(self.convo1)
            self.pooled = self.pooling(self.convo2)
            conv_X_train = self.flatten(self.pooled)

            W1, b1, W2, b2 = aW1, aW2, aW3, aW4
            for iter in range(self.iter_each_epoch):
                Z1, A1, A2 = self.forward_prop(W1, b1, W2, b2, conv_X_train)
                dW1, db1, dW2, db2, dKernel = self.back_prop_convo(conv_X_train, one_hot_Y,Z1,A1,A2,W1,W2, self.convo1, self.convo2, self.pooled, convo_back_prop)
                if (iter == 0):
                    self.kernel = self.kernel - alpha * dKernel
                    convo_back_prop = False
                ##Update
                W1 = W1 - alpha * dW1
                b1 = b1 - alpha * db1
                W2 = W2 - alpha * dW2
                b2 = b2 - alpha * db2
                ##Show accuracy
                if (iter % 100 == 0):
                    predictions = self.get_predictions(A2)
                    acc = self.get_accuracy(predictions, Y)
                    print(f'Epoch {epoch+1}, iter {iter}__Accuracy: {acc}')
                    if (acc > self.highest_train_accuracy):
                        self.highest_train_accuracy = acc
                        self.W1, self.b1, self.W2, self.b2 = W1, b1, W2, b2
                        self.best_kernel = self.kernel 

    def fit(self, data):
        self.X_train, self.Y_train, self.X_test, self.Y_test = self.prepare_data(data)
        self.bpg_input = self.input_For_Back_prop(self.X_train)
        self.kernel = np.random.randint(-3,3,(self.N_of_kernel,self.Len_of_kernel,self.Len_of_kernel))
        self.gradient_descent(self.X_train, self.Y_train, self.epoch, self.alpha)
    
    def predict_test(self):
        data = self.X_test[:,:]
        conv_test = self.ReLU(self.conv(data,self.best_kernel))
        conv_X_test = self.flatten(self.pooling(conv_test))
        _, _, A2 = self.forward_prop(self.W1, self.b1, self.W2, self.b2, conv_X_test)
        predictions = self.get_predictions(A2)
        acc = self.get_accuracy(predictions, self.Y_test)
        print(acc)


In [3]:
cnn = CNN()
cnn.fit(data)

ValueError: shapes (15,180) and (245,1000) not aligned: 180 (dim 1) != 245 (dim 0)

In [None]:
cnn.predict_test()

0.3980612244897959
