In [1]:
import collections
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import matplotlib.patches as mpatches
import seaborn as sns
import sympy
from sympy.abc import x
%matplotlib inline

In [2]:
class Linear:
    def __init__(self, input_dim, output_dim):
        self.type = "linear"
        self.input_dim = input_dim # 2-dim Matrix, each row defines each piece of data
        self.output_dim = output_dim # Int, output dimensionality
        
        # Weight Matrix: Right Multiplication, Glorot Initialization
        self.W = np.random.uniform(low=-1, high=1, size=(self.input_dim, self.output_dim)) / np.sqrt(6 / (self.input_dim + self.output_dim))
        self.b = np.random.uniform(low=-1, high=1, size=(self.output_dim, )) / np.sqrt(6 / self.output_dim)
        self.W_grad = None
        self.b_grad = None
        
    def forward(self, Input):
        self.W_grad = np.transpose(Input)
        return np.matmul(Input, self.W) + self.b
    
    def backward(self, output_grad):
        self.W_grad = np.matmul(self.W_grad, output_grad)
        self.b_grad = output_grad.sum(0)
        return np.matmul(output_grad, np.transpose(self.W))

class Sigmoid:
    def __init__(self):
        self.type = "act_fun"
        self.output = None
        
    def forward(self, Input):
        self.output = 1 / (1 + np.exp(-Input))
        return self.output
    
    def backward(self, output_grad):
        return self.output * (1 - self.output) * output_grad

class Softmax:
    def __init__(self):
        self.type = "act_fun"
        self.output = None
        
    def forward(self, Input):
        Exp = np.exp(Input)
        total = Exp.sum(1, keepdims=True)
        self.output = Exp / total
        return self.output
    
    def backward(self, output_grad):
        product = self.output * output_grad
        return self.output * (output_grad - product.sum(1, keepdims=True))

class MeanCategoricalCrossEntropy:
    def __init__(self, label_idx):
        self.label_idx = label_idx
        
    def forward(self, Input):
        self.input = Input
        return - np.sum(self.label_idx * np.log(Input)) / self.label_idx.shape[0]
    
    def backward(self):
        return - (self.label_idx / self.input) / self.label_idx.shape[0]

class Flow:
    def __init__(self, flow=[]):
        self.flow = flow
#         self.loss = loss
        
    def forward(self, data):
        if not self.flow:
            print("Error: Empty Neural Network")
            return
        
        x = data.copy()
        for layer in self.flow:
            x = layer.forward(x)
        return x
    
    def backward(self, loss_grad):
        if not self.flow:
            print("Error: Empty Neural Network")
            return
        
        y = loss_grad.copy()
        for layer in self.flow[::-1]:
            y = layer.backward(y)
        return y
    
class SGD:
    def __init__(self, model, lr):
        self.lr = lr
        self.model = model
    
    def step(self):
        for layer in self.model.flow:
            if layer.type == "linear":
                layer.W -= layer.W_grad * self.lr
                layer.b -= layer.b_grad * self.lr
                layer.W_grad = None
                layer.b_grad = None 

class Dataloader:
    def __init__(self, data, label):
        self.data = data
        self.label = label
        self.batch_size = len(self.data) // 10 + 1
        self.shuffle = True
        self.index = np.array([i for i in range(data.shape[0])])
        self.data_batches = []
        self.label_batches = []
        self.build()
        
    
    def build(self):
        if self.shuffle:
            np.random.shuffle(self.index)
        
        index_batches = np.split(self.index, np.arange(self.batch_size, self.index.shape[0], self.batch_size))
        self.data_batches.clear()
        self.label_batches.clear()
        for ind in index_batches:
            self.data_batches.append(self.data[self.index[ind], :])
            self.label_batches.append(self.label[self.index[ind], :])
#         curr_idx = 0
#         while True:
#             if curr_idx + self.batch_size <= self.index.shape[0]:
#                 self.data_batches.append(self.data[self.index[curr_idx:curr_idx + self.batch_size], :])
#                 self.label_batches.append(self.label[self.index[curr_idx:curr_idx + self.batch_size], :])
#                 curr_idx += self.batch_size
#             else:
#                 self.data_batches.append(self.data[self.index[curr_idx:], :])
#                 self.label_batches.append(self.label[self.index[curr_idx:], :])
#                 break
        
        
        
        
        
        
        
        
        

In [13]:
data1 = pd.read_csv("mnist_train_0_1.csv",header=None)
data = data1.iloc[:,1:].to_numpy() / 255
label1 = data1.iloc[:,0]
label = pd.get_dummies(label1).to_numpy()

dataloader = Dataloader(data, label)
dataloader.batch_size = 500

net = Flow()
net.flow = [Linear(784,64), Sigmoid(), Linear(64,2), Softmax()]
loss = MeanCategoricalCrossEntropy(label)
optimizer = SGD(net, 0.5)
for epoch in range(50):
    loss_record = []
    dataloader.build()
    for batch_data, batch_label in zip(dataloader.data_batches, dataloader.label_batches):
        loss.label_idx = batch_label
        y_hat = optimizer.model.forward(batch_data)
        loss_record.append(loss.forward(y_hat))
        optimizer.model.backward(loss.backward())
        optimizer.step()
    
    if epoch % 5 == 0:
        print("Epoch ", epoch, ": ", np.array(loss_record).mean())

Epoch  0 :  1.3479192261358648
Epoch  5 :  0.08704795738613064
Epoch  10 :  0.056251928151025095
Epoch  15 :  0.042164014444657186
Epoch  20 :  0.03743504417390316
Epoch  25 :  0.03101420848033281
Epoch  30 :  0.029982307395125564
Epoch  35 :  0.02574813999041033
Epoch  40 :  0.025002378864070144
Epoch  45 :  0.022719901677019504


In [14]:
preds = np.argmax(optimizer.model.forward(data), axis=1)
num_classes = 2
confmat = np.array([[0]*num_classes for _ in range(num_classes)])
for pred, label in zip(preds, np.array(label1)):
    confmat[label][pred] += 1
print("Bonus Training Confusion: \n", confmat)
print("Training Accuracy: ", confmat.diagonal().sum() / confmat.sum())  

test = pd.read_csv("mnist_test_0_1.csv",header=None)
t_data = test.iloc[:,1:].to_numpy() / 255
t_label = test.iloc[:,0].to_numpy()

preds = np.argmax(optimizer.model.forward(t_data), axis=1)
num_classes = 5
confmat = np.array([[0]*num_classes for _ in range(num_classes)])
for pred, label in zip(preds, np.array(t_label)):
    confmat[label][pred] += 1
print("\n Bonus Testing Confusion: \n", confmat)
print("Testing Accuracy: ", confmat.diagonal().sum() / confmat.sum())  

Bonus Training Confusion: 
 [[5888   35]
 [  36 6706]]
Training Accuracy:  0.9943939992104224

 Bonus Testing Confusion: 
 [[ 974    6    0    0    0]
 [   7 1128    0    0    0]
 [   0    0    0    0    0]
 [   0    0    0    0    0]
 [   0    0    0    0    0]]
Testing Accuracy:  0.9938534278959811


-------------------------------------------------------------------------------

----------------------------MNIST_0_4 Bonus------------------------------------

-------------------------------------------------------------------------------

In [12]:
data1 = pd.read_csv("mnist_train_0_4.csv",header=None)
data = data1.iloc[:,1:].to_numpy() / 255
label1 = data1.iloc[:,0]
label = pd.get_dummies(label1).to_numpy()

dataloader = Dataloader(data, label)
dataloader.batch_size = 500

net = Flow()
net.flow = [Linear(784,128), Sigmoid(), Linear(128,64), Sigmoid(), Linear(64,5), Softmax()]
loss = MeanCategoricalCrossEntropy(label)
optimizer = SGD(net, 5.0)
for epoch in range(50):
    loss_record = []
    dataloader.build()
    for batch_data, batch_label in zip(dataloader.data_batches, dataloader.label_batches):
        loss.label_idx = batch_label
        y_hat = optimizer.model.forward(batch_data)
        loss_record.append(loss.forward(y_hat))
        optimizer.model.backward(loss.backward())
        optimizer.step()
    
    if epoch % 5 == 0:
        print("Epoch ", epoch, ": ", np.array(loss_record).mean())
        

Epoch  0 :  5.888010814782005
Epoch  5 :  0.3036209249977863
Epoch  10 :  0.20509253794094862
Epoch  15 :  0.16969604610403233
Epoch  20 :  0.1446702458312133
Epoch  25 :  0.12791691445252112
Epoch  30 :  0.11286100122018082
Epoch  35 :  0.10231389669167983
Epoch  40 :  0.09197075003280927
Epoch  45 :  0.08630229396757967


In [5]:
preds = np.argmax(optimizer.model.forward(data), axis=1)
num_classes = 5
confmat = np.array([[0]*num_classes for _ in range(num_classes)])
for pred, label in zip(preds, np.array(label1)):
    confmat[label][pred] += 1
print("Bonus Training Confusion: \n", confmat)
print("Training Accuracy: ", confmat.diagonal().sum() / confmat.sum())  

test = pd.read_csv("mnist_test_0_4.csv",header=None)
t_data = test.iloc[:,1:].to_numpy() / 255
t_label = test.iloc[:,0].to_numpy()

preds = np.argmax(optimizer.model.forward(t_data), axis=1)
num_classes = 5
confmat = np.array([[0]*num_classes for _ in range(num_classes)])
for pred, label in zip(preds, np.array(t_label)):
    confmat[label][pred] += 1
print("\n Bonus Testing Confusion: \n", confmat)
print("Testing Accuracy: ", confmat.diagonal().sum() / confmat.sum())  

Bonus Training Confusion: 
 [[5822    0   56   32   13]
 [   0 6681   34   19    8]
 [  43   21 5782   85   27]
 [  31   10  182 5883   25]
 [  14   21   67   28 5712]]
Training Accuracy:  0.9765982481370113

 Bonus Testing Confusion: 
 [[ 939    3   23   11    4]
 [   0 1112   12    8    3]
 [  22   14  942   40   14]
 [  14   15   42  928   11]
 [   9   12   35   19  907]]
Testing Accuracy:  0.9394823895699552
