In [32]:
import numpy as np
from abc import ABC, abstractmethod
import numpy as np 
import sys
import pdb
from sklearn.neural_network import MLPClassifier
from sklearn.metrics import classification_report
import matplotlib.pyplot as plt 
from sklearn.preprocessing import OneHotEncoder

from sklearn.metrics import precision_recall_fscore_support
import multiprocessing
plt.style.use('ggplot')



In [33]:
def get_data(x_path, y_path):
    '''
    Args:
        x_path: path to x file
        y_path: path to y file
    Returns:
        x: np array of [NUM_OF_SAMPLES x n]
        y: np array of [NUM_OF_SAMPLES]
    '''
    x = np.load(x_path)
    y = np.load(y_path)

    y = y.astype('float')
    x = x.astype('float')

    #normalize x:
    x = 2*(0.5 - x/255)
    return x, y

def get_metric(y_true, y_pred):
    '''
    Args:
        y_true: np array of [NUM_SAMPLES x r] (one hot) 
                or np array of [NUM_SAMPLES]
        y_pred: np array of [NUM_SAMPLES x r] (one hot) 
                or np array of [NUM_SAMPLES]
                
    '''
    results = classification_report(y_pred, y_true)
    print(results)

In [34]:
class BaseLayer(ABC):
    def __init__(self, input_size, output_size):
        self.weights =  np.random.normal(0, 0.01, (input_size + 1, output_size)) # Includes bias
        self.input = None
        self.output = None
        # self.bias = np.random.rand(output_size, 1)
    
    @abstractmethod
    def activation(self, input):
        pass
    
    def forward(self, input):
        input_data_with_bias = np.hstack((np.ones((input.shape[0],1)), input))
        self.input = input_data_with_bias
        
        z = np.dot(input_data_with_bias, self.weights)
        
        self.output = self.activation(z)
        
        # activation = 1/(1+np.exp(-(np.dot(input_data_with_bias, self.weights))))
        return self.output

    @abstractmethod
    def backward(self, grad_output, learning_rate):
        pass

In [35]:
class SigmoidLayer(BaseLayer):
    def __init__(self, input_size, output_size):
        super().__init__(input_size, output_size)
    
    def activation(self, input_data):
        return 1/(1 + np.exp(-input_data))
    
    def backward(self, grad_output, learning_rate):
        grad_input = grad_output * self.output * (1 - self.output)
        grad_weights = np.dot(self.input.T, grad_input)
        
        self.weights -= learning_rate * grad_weights
        
        weight_with_bias = self.weights[1:, :]
        return np.dot(grad_input, weight_with_bias.T)
        

In [36]:
class ReluLayer(BaseLayer):
    def __init__(self, input_size, output_size):
        super().__init__(input_size, output_size)
    
    def activation(self, input_data):
        return np.maximum(0.0, input_data)
    
    def backward(self, grad_output, learning_rate):
        grad_relu = (self.output > 0).astype(float)
        grad_input = grad_output * grad_relu
        grad_w = np.dot(self.input.T, grad_input)

        # Update weights
        self.weights -= learning_rate * grad_w

        weight_with_bias = self.weights[1:, :]
        return np.dot(grad_input, weight_with_bias.T)
        

In [37]:
class SoftMaxLayer(BaseLayer):
    def __init__(self, input_size, output_size):
        super().__init__(input_size, output_size)
    
    def activation(self, input_data):
        exp_input = np.exp(input_data - np.max(input_data, axis=1, keepdims=True))  # Numerical stability
        return exp_input / np.sum(exp_input, axis=1, keepdims=True)
    
    def backward(self, grad_output, learning_rate=0.1):

        grad_input = grad_output * self.output * (1 - self.output)

        grad_w = np.dot(self.input.T, grad_input)

        self.weights -= learning_rate * grad_w

        weight_with_bias = self.weights[1:, :]
        
        return np.dot(grad_input, weight_with_bias.T)

In [45]:
class NN:
    def __init__(self, layers) -> None:
        self.layers = layers
    
    def forward(self, x):
        for layer in self.layers:
            x = layer.forward(x)
        return x

    def backward(self, grad , learning_rate=0.1):
        for layer in reversed(self.layers):
            grad = layer.backward(grad, learning_rate)
        return grad

    def cross_entropy_loss(self, output, target):
        # epsilon = 1e-15  # Avoid log(0)
        # output = np.clip(output, epsilon, 1 - epsilon)
        return -np.sum(target * np.log(output)) / len(output)

    def grad_cross_entropy_loss(self, output, target):
        # epsilon = 1e-15
        # output = np.clip(output, epsilon, 1 - epsilon)
        return (output - target) # Len output divide check

    def train(self, X_train, y_train, learning_rate=0.1, epochs=100, batch_size=32, min_delta=1e-6, patience=5):
        
        prev_loss = float('inf')
        consecutive_no_improvement = 0
           
        num_samples = X_train.shape[0]
        for epoch in range(epochs):
            indices = np.arange(num_samples)
            np.random.shuffle(indices)
            X_train_shuffled = X_train[indices]
            y_train_shuffled = y_train[indices]

            
            for i in range(0, num_samples, batch_size):
                batch_X = X_train_shuffled[i:i + batch_size]
                batch_y = y_train_shuffled[i:i + batch_size]
                
                output = self.forward(batch_X)
                grad = self.grad_cross_entropy_loss(output, batch_y)

                self.backward(grad, learning_rate)
                
            output = self.forward(X_train)
            total_loss = self.cross_entropy_loss(output, y_train)
            
            if total_loss > prev_loss - min_delta:
                consecutive_no_improvement += 1
                if consecutive_no_improvement >= patience:
                    print(f"Early stopping at epoch {epoch}")
                    break
            else:
                consecutive_no_improvement = 0
            

            print(f"Epoch {epoch}: Loss {total_loss}")
                
            

    def __call__(self, X):
        return self.forward(X)

    

In [39]:
x_train_path = 'part_b/x_train.npy'
y_train_path = 'part_b/y_train.npy'

X_train, y_train = get_data(x_train_path, y_train_path)

x_test_path = 'part_b/x_test.npy'
y_test_path = 'part_b/y_test.npy'

X_test, y_test = get_data(x_test_path, y_test_path)

#you might need one hot encoded y in part a,b,c,d,e
label_encoder = OneHotEncoder(sparse_output = False)
label_encoder.fit(np.expand_dims(y_train, axis = -1))

y_train_onehot = label_encoder.transform(np.expand_dims(y_train, axis = -1))
y_test_onehot = label_encoder.transform(np.expand_dims(y_test, axis = -1))

In [40]:
hidden_layer = 10

nn = NN([
    SigmoidLayer(1024, hidden_layer),
    SoftMaxLayer(hidden_layer, 5)
])


In [47]:

nn.train(X_train, y_train_onehot, 0.01, 100, 32)

predictions = nn(X_test)

predicted_classes = np.argmax(predictions, axis=1)
actual_classes = np.argmax(y_test_onehot, axis=1)

# Compute precision, recall, and F1 scores for each class
precision, recall, f1, _ = precision_recall_fscore_support(actual_classes, predicted_classes, average='macro')

print(precision, recall, f1)

Epoch 0: Loss 0.8164999782867046
Epoch 1: Loss 0.8199688514671065
Epoch 2: Loss 0.8185295632853021
Epoch 3: Loss 0.8141829784499667
Epoch 4: Loss 0.8113387291782533
Epoch 5: Loss 0.8150320483114504
Epoch 6: Loss 0.8124899869146512
Epoch 7: Loss 0.8099637572266917
Epoch 8: Loss 0.8115363586482408
Epoch 9: Loss 0.8108345760516494
Epoch 10: Loss 0.8108690470553093
Epoch 11: Loss 0.8128037160346134
Epoch 12: Loss 0.8109155842409423
Epoch 13: Loss 0.8076035896488382
Epoch 14: Loss 0.810917335003658
Epoch 15: Loss 0.8079765954011714
Epoch 16: Loss 0.8106300455123141
Epoch 17: Loss 0.808188084704589
Epoch 18: Loss 0.8094029982724131
Epoch 19: Loss 0.8084754068356308
Epoch 20: Loss 0.807226514320198
Epoch 21: Loss 0.8048640518250472
Epoch 22: Loss 0.8078937017699027
Epoch 23: Loss 0.8078858207090476
Epoch 24: Loss 0.8089904154360367
Epoch 25: Loss 0.8082901327295424
Epoch 26: Loss 0.8044068727231104
Epoch 27: Loss 0.8045665749511398
Epoch 28: Loss 0.8068687347087483
Epoch 29: Loss 0.8072810664

In [None]:
def train_and_evaluate(hidden_layer_size, X_train, y_train_onehot, X_test, y_test_onehot):
    nn = NN([
        SigmoidLayer(1024, hidden_layer_size),
        SoftMaxLayer(hidden_layer_size, 5)
    ])
    
    nn.train(X_train, y_train_onehot, 0.01, 100, 32)

    predictions = nn(X_test)

    predicted_classes = np.argmax(predictions, axis=1)
    actual_classes = np.argmax(y_test_onehot, axis=1)

    # Compute precision, recall, and F1 scores for each class
    precision, recall, f1, _ = precision_recall_fscore_support(actual_classes, predicted_classes, average='macro')

    return precision, recall, f1

In [None]:
hidden_layer_sizes = [1, 5, 10, 50, 100]

results = []

with multiprocessing.Pool(processes=len(hidden_layer_sizes)) as pool:
    results = pool.starmap(train_and_evaluate, [(size, X_train, y_train_onehot, X_test, y_test_onehot) for size in hidden_layer_sizes])

precision, recall, f1 = zip(*results)

plt.figure(figsize=(10, 6))

plt.plot(hidden_layer_sizes, precision, label='Precision')
plt.plot(hidden_layer_sizes, recall, label='Recall')
plt.plot(hidden_layer_sizes, f1, label='F1 Score')

plt.xlabel('Hidden Layer Size')
plt.ylabel('Score')
plt.legend()
plt.title('Precision, Recall, and F1 Score vs. Hidden Layer Size')
plt.grid()

plt.show()