In [None]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

# SEED


### read toy dataset

In [None]:

toy_test = open("data/toy/testNN.txt", "r")
toy_train = open("data/toy/trainNN.txt", "r")

# loop until end of file
test_input = []
train_input = []

for line in toy_test:
    # split line into words
    words = line.split()
    # convert words to float
    for i in range(len(words)):
        words[i] = float(words[i])
    # append words to list
    test_input.append(words)

for line in toy_train:
    # split line into words
    words = line.split()
    # convert words to float
    for i in range(len(words)):
        words[i] = float(words[i])
    # append words to list
    train_input.append(words)


# split X and Y
test_X = []
test_Y = []
train_X = []
train_Y = []

# take last column as Y
for i in range(len(test_input)):
    test_Y.append(int(test_input[i][-1]))
    test_X.append(test_input[i][:-1])

for i in range(len(train_input)):
    train_Y.append(int(train_input[i][-1]))
    train_X.append(train_input[i][:-1])


# normalize with mean and std
train_X = (train_X - np.mean(train_X, axis=0)) / np.std(train_X, axis=0)
test_X = (test_X - np.mean(test_X, axis=0)) / np.std(test_X, axis=0)

num_of_class = len(set(test_Y))
num_of_input_features = len(test_X[0])


def one_hot(label):
    y = np.zeros((num_of_class,1))
    y[label] = [1]
    return y



training_data = []

for i in range(len(train_X)):
    training_data.append( (np.array(train_X[i]).reshape(num_of_input_features,1), np.array(one_hot(train_Y[i]-1)) ) )


test_data = []

for i in range(len(train_X)):
    test_data.append( (np.transpose(test_X[i]).reshape(num_of_input_features,1), np.array(one_hot(test_Y[i]-1)) ) )



### read MNIST dataset

In [None]:
mnist_test = open("data/MNIST/mnist_test.csv", "r")
mnist_train = open("data/MNIST/mnist_train.csv", "r")

# pandas dataframe
mnist_test_df = pd.read_csv(mnist_test)
mnist_train_df = pd.read_csv(mnist_train)

## make dataset smaller

# take first 5000 rows of test and train
mnist_test_df = mnist_test_df.iloc[:1000,:]
mnist_train_df = mnist_train_df.iloc[:5000,:]


mnist_test_df

In [None]:
num_of_input_features = 28*28
num_of_class = 10

training_data = []



test_data = []




### Backprop algo

In [None]:
np.warnings.filterwarnings('ignore', 'overflow')
def sigmoid(z):
    return 1 / (1 + np.exp(-z))


def sigmoid_prime(z):
    return sigmoid(z)*(1-sigmoid(z))

def relu(z):
    return np.maximum(0,z)

def relu_prime(z):
    z[z<=0] = 0
    z[z>0] = 1
    return z
    
class Network(object):
    def __init__(self, sizes):
        self.num_layers = len(sizes)
        self.sizes = sizes
        np.random.seed(42)
        self.biases = [np.random.randn(y, 1) for y in sizes[1:]]
        self.weights = [np.random.randn(y, x) for x, y in zip(sizes[:-1], sizes[1:])]
        
        
        
    def feedforward(self, a):
        for layer, w, b in zip(range(self.num_layers),self.weights, self.biases):
            if layer == 0:
                a = relu(np.matmul(w, a) + b)
            else:
                a = sigmoid(np.matmul(w, a) + b) 
        return a
    
    
    def train(self, training_data, epochs, mini_batch_size, eta, test_data):
        n_test = len(test_data)
        n = len(training_data)
        evaluates = []
        for j in range(epochs):
            np.random.shuffle(training_data)
            mini_batches = [training_data[k:k+mini_batch_size] for k in range(0, n, mini_batch_size)]
            for mini_batch in mini_batches:
                self.gradient_descent(mini_batch, eta)
           

            # evaluate the model
            evaluate = self.evaluate(test_data)
            evaluates.append(evaluate)
            print(f'Epoch {j}: {evaluate} / {n_test}')
            

    
    def gradient_descent(self, mini_batch, eta):
        delJdelB = [np.zeros(b.shape) for b in self.biases]
        delJdelW = [np.zeros(w.shape) for w in self.weights]
        
        for x, y in mini_batch:
            delta_delJdelB, delta_delJdelW = self.backpropagation(x, y)
            delJdelB = [nb+dnb for nb, dnb in zip(delJdelB, delta_delJdelB)]
            delJdelW = [nw+dnw for nw, dnw in zip(delJdelW, delta_delJdelW)]
            
        self.weights = [w-(eta/len(mini_batch))*nw for w, nw in zip(self.weights, delJdelW)]
        self.biases = [b-(eta/len(mini_batch))*nb for b, nb in zip(self.biases, delJdelB)]
        
    
    def backpropagation(self, x, y):
        # initialize 
        delJdelB = [np.zeros(b.shape) for b in self.biases]
        delJdelW = [np.zeros(w.shape) for w in self.weights]
        
        
        activation = x
        activations = [x]
        z_vector = []
        for w, b in zip(self.weights, self.biases):
            z = np.matmul(w, activation) + b
            z_vector.append(z)
            activation = sigmoid(z)
            activations.append(activation)
            
            
        delta = sigmoid_prime(z_vector[-1]) * self.cost_derivative(activations[-1], y) 
        delJdelB[-1] = delta
        delJdelW[-1] = np.matmul(delta, activations[-2].T)
        
        
        for l in range(2, self.num_layers):
            z = z_vector[-l]
            sp = relu_prime(z)
            delta = np.matmul(self.weights[-l+1].T, delta) * sp
            delJdelB[-l] = delta
            delJdelW[-l] = np.matmul(delta, activations[-l-1].T)
        return (delJdelB, delJdelW)
            
                                    
    def evaluate(self, test_data):
        test_results = [(np.argmax(self.feedforward(x)), np.argmax(y)) for x, y in test_data]
        return sum([int(x == y) for x, y in test_results])
    
    def cost_function(self, a, y):
        return np.sum(np.power((a - y), 2))
    
    
    def cost_derivative(self, activated_output, y):
        return 2 * (activated_output - y)

In [None]:
# net = Network([num_of_input_features, 16, 16, num_of_class])
# net.train(training_data, 10, 10, 2, test_data)

'''
Conv 6 5 1 2 ReLU
Pool 2 2 Conv 12 5 1 0 ReLU
Pool 2 2
Conv 100 5 1 0 ReLU
FC 10
Softmax
'''


### CNN

In [123]:
class ConvNet(object):

    '''
    input : height, width, depth, stride, activation_function
    '''
    

    def __init__(self,out_depth, filter_dim, stride, padding) :
        self.out_depth = out_depth
        self.filter_dim = filter_dim
        self.stride = stride
        self.padding = padding
        np.random.seed(42)
        self.filtes = np.random.randn(out_depth, filter_dim, filter_dim) / np.sqrt(filter_dim**2)    
        

    def forward(self,input) :
        
        # padding input
        input_padded = np.pad(input, (self.padding,self.padding), 'constant',constant_values=0)

        self.input = input_padded
        self.input_shape = input.shape

        output_height = int((self.input_shape[0] - self.filter_dim ) / self.stride) + 1
        output_width = int((self.input_shape[1] - self.filter_dim ) / self.stride) + 1

        output = np.zeros((output_height, output_width, self.out_depth))

        
        for i in range(output_height):
            for j in range(output_width):
                for k in range(self.out_depth):
                    im = self.input[i*self.stride:i*self.stride+self.filter_dim, j*self.stride:j*self.stride+self.filter_dim]
                    output[i,j,k] = np.sum(im * self.filtes[k])
           
        return output
    



class MaxPool:

    def __init__(self,height,width):
        self.height = height
        self.width = width
    
    def forward(self,input):
        self.input = input

        output_height = int(self.input.shape[0] / self.height)
        output_width = int(self.input.shape[1] / self.width)

        output = np.zeros((output_height, output_width, self.input.shape[2]))


        # convolution
        for i in range(output_height):
            for j in range(output_width):
                for k in range(self.input.shape[2]):
                    output[i,j,k] = np.max(self.input[i*self.height:i*self.height+self.height, j*self.width:j*self.width+self.width, k])

        return output



class FC:
    
    # fully connected layer

    def __init__(self,out_dim) :
        self.out_dim = out_dim
    
    def flatten(self,input) :
        input_shape = input.shape
        # convert a series of convolutional filters into a single column vector
        return input.reshape(input_shape[0] * input_shape[1] * input_shape[2])

    def forward(self,input) :
        self.input = input
        flattened_input = self.flatten(input)
        np.random.seed(42)
        self.weights = np.random.randn(self.out_dim, flattened_input.shape[0]) / np.sqrt(flattened_input.shape[0])
        self.biases = np.random.randn(self.out_dim) / np.sqrt(flattened_input.shape[0])

        self.z = np.matmul(self.weights, flattened_input) + self.biases

        return self.z

        

class ReLU:
    
    def forward(self,input) :
        self.input = input
        return np.maximum(0,input)
    
    def backward(self,input) :
        return 1 * (input > 0)
    

def Softmax(input) :
    exp = np.exp(input)
    return exp / np.sum(exp, axis=0)



class CNN :

    def __init__(self,file_name) :
        # open a text file in read mode
        file = open(file_name, "r")
        self.lines = file.readlines()
          
    

    def forward(self,input) :

        self.output_layer = []
        self.output_layer.append(input)

        for line in self.lines :
            words = line.split()
            print(words)

            if words[0] == 'Conv' :
                conv = ConvNet(int(words[1]), int(words[2]), int(words[3]), int(words[4]))
                out = conv.forward(self.output_layer[-1])
                self.output_layer.append(out)
            
            elif words[0] == 'Pool' :
                pool = MaxPool(int(words[1]), int(words[2]))
                out = pool.forward(self.output_layer[-1])
                self.output_layer.append(out)

            elif words[0] == 'FC' :
                fc = FC(int(words[1]))
                out = fc.forward(self.output_layer[-1])
                self.output_layer.append(out)
            
            elif words[0] == 'ReLU' :
                relu = ReLU()
                out = relu.forward(self.output_layer[-1])
                self.output_layer.append(out)
            
            elif words[0] == 'Softmax' :
                out = Softmax(self.output_layer[-1])
                self.output_layer.append(out)
            
            else :
                print('Error')
        
        return self.output_layer[-1]
    

In [124]:
myNet = CNN('data/archi.txt')


image = np.random.randn(28,28)

out = myNet.forward(image)

print(out.shape)
print(out)



['Conv', '6', '5', '1', '2']
['ReLU']
['Pool', '2', '2']
['Conv', '12', '5', '1', '0']


ValueError: operands could not be broadcast together with shapes (5,5,6) (5,5) 

In [122]:
# a sample 28x28 3 channel image



out1 = ConvNet(6,5,1,2).forward(image)
out2 = MaxPool(2,2).forward(out1)
out3 = FC(10).forward(out2)

print(out1.shape)
print(out2.shape)
print(out3)

(24, 24, 6)
(12, 12, 6)
[ 0.32416408  1.80284466  0.04520829 -0.94732839 -1.30551966 -0.09391854
 -2.29825827 -0.43230547  0.6015653   0.98847814]
