In [1]:
import torch
import random
import numpy as np
import matplotlib.pyplot as plt
from torch.utils.data import Dataset
import pandas as pd
import pickle
from sklearn.model_selection import train_test_split

In [2]:
class My_dataset(Dataset):
    def __init__(self, data_dir, anno_csv) -> object:
        self.anno_data = pd.read_csv(anno_csv)
        self.data_dir = data_dir

    def __len__(self):
        return len(self.anno_data)

    def __getitem__(self, idx):
        data_name = self.anno_data.iloc[idx, 0]
        data_location = self.data_dir + data_name
        data = np.float32(np.load(data_location))
        # This is for one-hot encoding of the output label
        gt_y = np.float32(np.zeros(10))
        index = self.anno_data.iloc[idx, 1]
        gt_y[index] = 1
        return data, gt_y
        
    def get_batch(self, batch_size):
        indices = np.arange(0, len(self))
        random.shuffle(indices)

        batch_indices = np.array_split(indices, batch_size)
        for i in range(0, len(self.anno_data), batch_size):
            batch_indices = range(i, min(i + batch_size, len(self.anno_data)))
            batch_data = [self.__getitem__(idx) for idx in batch_indices]
            batch_data, batch_gt_y = zip(*batch_data)
            yield np.array(batch_data), np.array(batch_gt_y)



In [7]:
import torch.nn.init as init
class NN:
    def __init__(self):
        self.w1 = torch.empty(784, 100)
        init.xavier_uniform_(self.w1)
        self.b1 = torch.rand(100,1)
        self.w2 = torch.empty(100,100)
        init.xavier_uniform_(self.w2)
        self.b2 = torch.rand(100,1) 
        self.w3 = torch.empty(100,10)
        init.xavier_uniform_(self.w3)
        self.b3 = torch.rand(10,1)
        self.eye_10 = torch.eye(10).unsqueeze(2)
        self.eye_100 = torch.eye(100).unsqueeze(2)

    def ReLU(self,Z):
        return torch.clamp(Z, min=0)
    def d_reLU(self,Z):
        result = torch.zeros(Z.shape[0],Z.shape[0])
        for i in range(Z.shape[0]):
            for j in range(Z.shape[0]):
                if i == j and Z[i] > 0:
                    result[i][j] = 1
        return result

    def forward_propogation(self,X,Y):
        X = X.unsqueeze(1)
        Z1 = torch.matmul(self.w1.T,X) + self.b1
        H1 = self.ReLU(Z1)
        Z2 = torch.matmul(self.w2.T,H1) + self.b2
        H2 = self.ReLU(Z2)
        Z3 = torch.matmul(self.w3.T,H2) + self.b3
        output = self.stable_softmax(Z3)
        loss = self.loss(Y,output)
        return Z1,H1,Z2,H2,Z3,output,loss

    def loss(self, Y,Y_pred):
        total_loss = 0
        for k in range(10):
            log_pred = torch.log(Y_pred[k])
            total_loss += Y[k] * log_pred
        return -1* total_loss

        
        
    def gradient_loss(self,Y,Y_pred):
        delta_y = torch.zeros(10,1)
        for k in range(10):
            delta_y[k] = -1 * Y[k]/Y_pred[k]

        return delta_y
    def stable_softmax(self,x):
        return torch.exp(x - torch.max(x)) / torch.sum(torch.exp(x - torch.max(x)), axis=0)
        
    def deriv_softmax(self,y_pred):
        y_pred = y_pred.squeeze()
        diag_y_pred = torch.diag(y_pred)
        outer_y_pred = torch.ger(y_pred, y_pred)
        d_softmax = diag_y_pred - outer_y_pred
    
        return d_softmax
    def backward_propogation(self,X,Z1,H1,Z2,H2,Z3,Y_pred,Y):
        delta_y = self.gradient_loss(Y, Y_pred)
        dZ3_w3 = (self.eye_10 * H2.squeeze(1))
        dZ3_h2 = self.w3
        d_softmax = self.deriv_softmax(Y_pred)
        delta_w3_0 = torch.matmul(d_softmax,delta_y)

        delta_w3 = torch.einsum("kkn, kj-> nk",dZ3_w3, delta_w3_0)
      
        delta_h2 = torch.matmul(dZ3_h2, delta_w3_0)
        
        dZ2_w2 = (self.eye_100 * H1.squeeze(1))
        dZ2_h1 = self.w2
        d_reLU2 = self.d_reLU(Z2)

        delta_w2_0 = torch.matmul(d_reLU2, delta_h2)
        delta_w2 = torch.einsum("kkn, kj -> nk",dZ2_w2, delta_w2_0)
        delta_h1 = torch.matmul(dZ2_h1, delta_w2_0)

        dZ2_w1 = (self.eye_100 * X).transpose(1,2)
        
        d_reLU1 = self.d_reLU(Z1)
        delta_w1_0 = torch.matmul(d_reLU1, delta_h1)
        delta_w1 = torch.einsum("nkn, nj -> nk",dZ2_w1,delta_w1_0)

        return delta_w3, delta_w3_0, delta_w2, delta_w2_0, delta_w1, delta_w1_0
    
    def update(self, lr, delta_w3, delta_w3_0, delta_w2, delta_w2_0, delta_w1, delta_w1_0):
        self.w3 -= lr* delta_w3
        self.b3 -= lr*delta_w3_0 
        self.w2 -= lr*delta_w2.squeeze()
        self.b2 -= lr*delta_w2_0
        self.w1 -= lr* delta_w1.squeeze().T
        self.b1 -= lr* delta_w1_0








        



In [8]:
import matplotlib.pyplot as plt
def PA2_train():
    # Specifying the training directory and label files
    train_dir = './'
    train_anno_file = './data_prog2Spring24/labels/train_anno.csv'

    # Specifying the device to GPU/CPU. Here, GPU means 'cuda' and CPU means 'cpu'
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

    # Read the data and labels from the training data
    MNIST_training_dataset = My_dataset(data_dir=train_dir, anno_csv=train_anno_file)
    #You can set up your own maximum epoch. You may need  5 or 10 epochs to have a correct model.
    my_max_epoch = 10
    epochs = np.arange(0, my_max_epoch)
    nn = NN()
    acc_acc_list = []
    test_acc_acc_list = []
    avg_loss_list = []
    for epoch in range(2):
        train_avg_loss = 0
        batches = MNIST_training_dataset.get_batch(50)
        
        accuracy_for_shown_up = [0] * 10
        num_shown_up = [0] * 10
        accuracy = 0
        total_batch = 1000
        b = 0
        total_loss = 0
        for batch in batches:
            delta_w3 = 0
            delta_w3_0 = 0
            delta_w2 = 0 
            delta_w2_0= 0
            delta_w1 = 0 
            delta_w1_0 = 0
            b+=1
            print(f"Batch --> {b}")
            for m in range(len(batch[0])):
                X = torch.from_numpy(batch[0][m])
                Y = torch.from_numpy(batch[1][m])

                all_out = nn.forward_propogation(X,Y)
                # with profile(activities=[ProfilerActivity.CPU], record_shapes=True) as prof:
                #     with record_function("model_inference"):
                num_shown_up[int(torch.argmax(Y))]+=1
                if torch.argmax(all_out[5]) == torch.argmax(Y):
                    accuracy_for_shown_up[int(torch.argmax(Y))]+=1
                    accuracy+=1
                total_loss += all_out[6]
                bp = nn.backward_propogation(X,all_out[0],all_out[1],all_out[2],all_out[3],all_out[4],all_out[5],Y)
                delta_w3 += bp[0]
                delta_w3_0+= bp[1]
                delta_w2+= bp[2]
                delta_w2_0+= bp[3]
                delta_w1 += bp[4]
                delta_w1_0 += bp[5]
            nn.update(.1, delta_w3/50, delta_w3_0/50, delta_w2/50, delta_w2_0/50, delta_w1/50, delta_w1_0/50)
        
        
        acc_list = [0] * 10
        for i in range(len(num_shown_up)):
            acc_list[i] = (accuracy_for_shown_up[i]/num_shown_up[i])
        acc_acc_list.append(acc_list)
        avg_loss = total_loss/MNIST_training_dataset.__len__()
        avg_loss_list.append(avg_loss)

        filehandler = open("multiclass_parameters.txt", "wb")
        pickle.dump(nn,filehandler)
        filehandler.close()
        
        test_acc = PA2_test()
        test_acc_acc_list.append(test_acc)

    

    plt.figure()
    for i in range(len(test_acc_acc_list[0])):
        plt.title(f'Test Epoch vs {i+1} Accuracy')

        plt.plot([test_acc_acc_list[0][i],test_acc_acc_list[1][i]])
        plt.show()

    plt.figure()
    for i in range(len(acc_acc_list[0])):
        plt.title(f'Train Epoch vs {i+1} Accuracy')

        plt.plot([acc_acc_list[0][i],acc_acc_list[1][i]])
        plt.show()
    
    plt.figure()
    plt.title("Train Loss over Epoch")
    plt.plot(avg_loss_list)
    
    plt.show()


            

        #Take the mean of all the mini-batch losses and denote it as your loss of the current epoch
        #Collect loss for each epoch and save the parameter Theta after each epoch


    # Plot the training loss vs accuracy
    # Visualize the final weight matrix
    # Save the final weight matrix


In [9]:
def PA2_test():
    # Specifying the training directory and label files
    test_dir = './'
    test_anno_file = './data_prog2Spring24/labels/test_anno.csv'
    feature_length = 784
    # Specifying the device to GPU/CPU. Here, GPU means 'cuda' and CPU means 'cpu'
    filehandler = open("multiclass_parameters.txt", "rb")
    nn = pickle.load(filehandler)
    #Load the Weight Matrix that has been saved after training


    # Read the data and labels from the testing data
    MNIST_testing_dataset = My_dataset(data_dir=test_dir, anno_csv=test_anno_file)
    accuracy_for_shown_up = [0] * 10
    num_shown_up = [0] * 10
    accuracy = 0
    y_pred_list = []
    y_list = []
    batches = MNIST_testing_dataset.get_batch(50)
    total_batch = 1000
    b = 0
    accuracy = 0
    for batch in batches:
        for m in range(len(batch[0])):
            X = torch.from_numpy(batch[0][m])
            Y = torch.from_numpy(batch[1][m])
            all_out = nn.forward_propogation(X,Y)
            # with profile(activities=[ProfilerActivity.CPU], record_shapes=True) as prof:
            #     with record_function("model_inference"):
            y_pred_list.append(int(torch.argmax(all_out[5]))+1)
            y_list.append( int(torch.argmax(Y))+1)
            num_shown_up[int(torch.argmax(Y))]+=1
            if torch.argmax(all_out[5]) == torch.argmax(Y):
                accuracy_for_shown_up[int(torch.argmax(Y))]+=1
                accuracy+=1
    acc_list = [0] * 10
    for i in range(len(num_shown_up)):
        acc_list[i] = (accuracy_for_shown_up[i]/num_shown_up[i])
    print(acc_list)
    print(y_list)
    print(y_pred_list)
    matrix_confusion = confusion_matrix(y_list, y_pred_list)
    cm_display = metrics.ConfusionMatrixDisplay(confusion_matrix = matrix_confusion)
    cm_display.plot()
    plt.show()
    return accuracy/ MNIST_testing_dataset.__len__(), acc_list


    # Predict Y using X and updated W.

    # Calculate accuracy,


In [None]:
from torch.profiler import profile, record_function, ProfilerActivity
from sklearn.metrics import confusion_matrix
from sklearn import metrics
torch.autograd.set_detect_anomaly(True)
PA2_train()
