In [1]:
## Python Package Imports
import numpy as np
import pandas as pd
import pickle

## Custom Module Imports
from activation_functions.SoftMax import SoftMax
from activation_functions.ReLU import ReLU
from loss_functons.mean_square_error import mean_square_error
from activation_functions.LeakyReLU import LeakyReLU
from sklearn.datasets import load_diabetes
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler

In [2]:
class DenseLayer:
    """
    Represents a dense layer of (num_neurons) neurons, as a weight matrix (W) of shape(out_n, in_n) and bias (b) of shape(out_n)
    """
    def __init__(self, input_size, num_neurons, activ_func):
        self.num_in_n = input_size
        self.num_out_n = num_neurons
        # self.weight_matrix = np.array([np.random.rand(input_size) for _ in range(num_neurons)])
        # Above line has been upgraded to line below
        self.W = np.random.randn(self.num_out_n, self.num_in_n)/2
        self.b = np.random.randn(self.num_out_n)/2
        self.activation_func = activ_func

    def batch_input(self, X):
        """
        Returns the matrix product [input_matrix] * [weight_matrix]^T of dimensions
        (batch_size, num_in_neurons) * (num_in_neurons, num_out_neurons) = (batch_size, num_out_neurons)

        
        XW^T + b is (batch_size, num_out_neurons) + (num_out_neurons), where the bias is brodcast for each row
        """    
        self.X = X
        self.batch_size = X.shape[0] 
        self.raw_output = np.dot(self.X, self.W.T) + self.b
        self.activation_output = self.activation_func.forward(self.raw_output)
        return self.activation_output
    
    
    def backward(self, error_matrix, learning_rate):
        """
        Given the error vector dC/da^(l), returns the new error vector for the next layer, dC/da^(l-1)

        C = cost func
        a^(l) = activation function at layer l
        z = XW^T + b
        """
        eta = learning_rate

        dC_da_1 = error_matrix # (batch_size, out_n)
        da_dz = self.activation_func.derivative(self.raw_output) # (batch_size, out_n)
        dC_dz = dC_da_1 * da_dz # (batch_size, num_out_n)

        # Error Gradient
        dC_dX = np.tensordot(dC_dz, self.W, axes=(1,0)) # (batch_size, in_n)
        # Gradient of W (average weight at w)
        dC_dw = np.sum(np.matmul(dC_dz.T , self.X), axis=0) / self.batch_size # (out_n)

        # Gradient of b
        dC_db = np.sum(dC_dz, axis=0) / self.batch_size # (out_n)

        self.W = self.W - (eta * dC_dw)
        self.b = self.b - (eta * dC_db)
        return dC_dX
    

class OutLayer:
    """
    Represents the output layer of a neural network as a weight vector (w) with shape(in_n) and scalar bias (b)
    """
    def __init__(self, input_size, loss_func):
        self.num_in_n = input_size
        self.loss_func = loss_func
        self.num_out_n = 1
        self.W = np.random.randn(self.num_in_n)/2
        self.b = np.random.randn(self.num_out_n)/2

    def batch_input(self, X):
        self.X = X
        self.batch_size = X.shape[0]
        """
        Returns the matrix product [input_matrix] * [weight_matrix]^T of dimensions
        (batch_size, num_in_neurons) * (num_in_neurons, num_out_neurons) = (batch_size, num_out_neurons)


        XW^T + bias is (batch_size, num_out_neurons) + (num_out_neurons), where the bias is brodcast for each row
        """     
        self.raw_output = np.dot(self.X, self.W.T) + self.b
        return self.raw_output
    
    
    def backward(self, y_true, learning_rate):
        """
        Given the error vector dC/da^(l), returns the new error vector for the next layer, dC/da^(l-1)

        C = cost func
        a^(l) = activation function at layer l
        z = XW^T + b
        """
        eta = learning_rate

        # Error Gradient
        dC_dX = self.loss_func.dC_dX(X=self.X, w=self.W, b=self.b, y=y_true)
        # Gradient of W
        dC_dw = self.loss_func.dC_dw(X=self.X, w=self.W, b=self.b, y=y_true)
        # Gradient of b
        dC_db = self.loss_func.dC_db(X=self.X, w=self.W, b=self.b, y=y_true)

        self.W = self.W - (eta * dC_dw)
        self.b = self.b - (eta * dC_db)
        return dC_dX
    

class SimpleNeuralNetwork:
    """
    Represents a neural network as an array of {"DenseLayer", "OutLayer"} objects.
    The last element in the array must be of type "OutLayer"
    """
    def __init__(self, input_size, loss_func = mean_square_error()):
        self.nn_array = []
        self.input_size = input_size
        self.loss_func = loss_func


    def add_layer(self, num_neurons, activ_func=ReLU(), type="dense"):
        """
        type = {'dense', 'output'}

        New layer must have input size corresponding to previous layer's output size
        num_neurons - is the number of neurons in the current layer
        activ_func - is the activation function that should be applied to the outputs of this layer
        """
        num_in_n = 0
        if(len(self.nn_array) == 0):
            num_in_n = self.input_size
        else:
            num_in_n = self.nn_array[-1].W.shape[0]
        
        if(type == "output"):
            self.nn_array.append(OutLayer(
                input_size = num_in_n, 
                loss_func=self.loss_func))
        elif(type == "dense"):
            self.nn_array.append(DenseLayer(
                input_size=num_in_n,
                num_neurons=num_neurons,
                activ_func=activ_func
            ))
        else:
            raise(ValueError(f"Invalid Argument {type}, expected 'dense' or 'output'"))
        
        
    def describe_network(self):
        # weight matrix shape is (num_neurons, input_size)
        for layer in self.nn_array:
            print(layer)

    def forward_pass(self, input_matrix):
        for i in range(len(self.nn_array)):
            layer = self.nn_array[i]
            input_matrix = layer.batch_input(input_matrix)    
        return input_matrix
    
    def backward_pass(self, y_true,learning_rate):
        layer = self.nn_array[-1]
        dC_da = layer.backward(y_true, learning_rate)
        for i in range(len(self.nn_array), 0, -1):
            layer = self.nn_array[i-1]
            error_vector = layer.backward(dC_da, learning_rate)

In [3]:
# from sklearn.preprocessing import StandardScaler
# diabetes = load_diabetes()
# scaler = StandardScaler()
# scaler.fit(diabetes.data)
# X_transformed = scaler.transform(diabetes.data)
# scaler.fit(diabetes.target.reshape(-1,1))
# y_transformed = scaler.transform(diabetes.target.reshape(-1,1))
# X_train, X_test, y_train, y_test = train_test_split(X_transformed, y_transformed, test_size=0.2, random_state=42, shuffle=True)
# nn = SimpleNeuralNetwork(input_size=10, loss_func=mean_square_error())
# nn.add_layer(num_neurons=8, activ_func=LeakyReLU(0.01), type="dense")
# nn.add_layer(num_neurons=32, activ_func=LeakyReLU(0.01), type="dense")
# nn.add_layer(num_neurons=64, activ_func=LeakyReLU(0.01), type="dense")
# nn.add_layer(num_neurons=32, activ_func=LeakyReLU(0.01), type="dense")
# nn.add_layer(num_neurons=8, activ_func=LeakyReLU(0.01), type="dense")
# nn.add_layer(num_neurons=1, activ_func=LeakyReLU(0.01), type="output")

# # batches
# mse_func = mean_square_error()
# X_train_batches = [X_train[i: i+10] for i in range(0, len(X_train), 10)]
# y_train_batches = [y_train[i:i+10] for i in range(0, len(y_train), 10)]

# num_epochs = 100
# for i in range(num_epochs):
#     # Epoch
#     # for i in range(len(X_train_batches)):
#     for i in range(2):
#         X_train_batch = X_train_batches[i]
#         y_train_batch = y_train_batches[i]
#         y_pred_batch = nn.forward_pass(X_train_batch)
#         nn.backward_pass(mse_func.derivative(y_train_batch, y_pred_batch))
#     y_true = y_train
#     y_pred = nn.forward_pass(X_train)
#     print('MSE Loss:', mse_func.compute(y_true=y_true, y_pred=y_pred))

In [4]:
# Testing single instance
from sklearn.preprocessing import StandardScaler
diabetes = load_diabetes()
scaler = StandardScaler()
scaler.fit(diabetes.data)
X_transformed = scaler.transform(diabetes.data)
scaler.fit(diabetes.target.reshape(-1,1))
y_transformed = scaler.transform(diabetes.target.reshape(-1,1))
X_train, X_test, y_train, y_test = train_test_split(X_transformed, y_transformed, test_size=0.2, random_state=42, shuffle=True)
nn = SimpleNeuralNetwork(input_size=10, loss_func=mean_square_error())
nn.add_layer(num_neurons=8, activ_func=LeakyReLU(0.01), type="dense")
nn.add_layer(num_neurons=32, activ_func=LeakyReLU(0.01), type="dense")
nn.add_layer(num_neurons=64, activ_func=LeakyReLU(0.01), type="dense")
nn.add_layer(num_neurons=32, activ_func=LeakyReLU(0.01), type="dense")
nn.add_layer(num_neurons=8, activ_func=LeakyReLU(0.01), type="dense")
nn.add_layer(num_neurons=1, activ_func=LeakyReLU(0.01), type="output")


# batches
mse_func = mean_square_error()
X_train_batches = [X_train[i: i+10] for i in range(0, len(X_train), 10)]
y_train_batches = [y_train[i:i+10] for i in range(0, len(y_train), 10)]

X_train_batch = X_train_batches[0]
y_train_batch = y_train_batches[0]

In [5]:
stored_mse = []

In [6]:
y_pred_batch = nn.forward_pass(X_train_batch)
loss = mse_func.compute(y_train_batch, y_pred_batch)
stored_mse.append(loss)
print('Max Prediction:', np.max(np.abs(y_pred_batch)))
print('MSE Loss:', mse_func.compute(y_true=y_train_batch, y_pred=y_pred_batch))
nn.backward_pass(y_true=y_train_batch, learning_rate=1)

Max Prediction: 17.560213541624897
MSE Loss: 859.2832393204329


ValueError: matmul: Input operand 1 has a mismatch in its core dimension 0, with gufunc signature (n?,k),(k,m?)->(n?,m?) (size 10 is different from 8)

In [None]:
import matplotlib.pyplot as plt
indices = list(range(len(y_pred_batch)))

# Create a line plot of the loss values
plt.plot(indices, y_pred_batch, marker='o', linestyle='-')
plt.plot(indices, y_train_batch, marker='o', linestyle='-')
# Add labels and a title
plt.xlabel('Index')
plt.ylabel('y_value')
plt.title('Predictions vs Ground Truth')

# Display the plot
plt.show()

In [None]:
mse_func.derivative(y_train_batch, y_pred_batch)

In [None]:
import matplotlib.pyplot as plt
indices = list(range(len(stored_mse)))

# Create a line plot of the loss values
plt.plot(indices, stored_mse, marker='o', linestyle='-')

# Add labels and a title
plt.xlabel('Index')
plt.ylabel('Loss')
plt.title('Loss Over Each Index')

# Display the plot
plt.show()

In [None]:
import sympy as sym
sym_str = ", ".join([f'x{i}' for i in range(10)])
print(sym_str)
sym_ary = sym.symbols(sym_str)
print(sym_ary)

In [None]:
X_temp = np.array([[1,1,1], [2,2,2]])
w_temp = np.array([1,10,100])
y_temp = np.array([100,200])

print(np.matmul(X_temp, w_temp))
print(np.matmul(X_temp, w_temp) - y_temp)
print(np.outer(np.matmul(X_temp, w_temp) - y_temp, w_temp))
print(np.sum(np.outer(np.matmul(X_temp, w_temp) - y_temp, w_temp), axis=0)/ X_temp.shape[0])

In [None]:
X_temp.shape[0]