In [3]:
#Set Seed
import random
import cProfile
import pstats
import io
import math
import time
import random
from tqdm import tqdm
from numba import njit, prange
from numba.typed import List

# Import our typed Linear Algebra functions (assumed to be decorated appropriately)
from LinearAlgebra import dot, transpose, add_matrices, subtract_matrices, scalar_multiply, add_vectors, subtract_vectors, sum_axis0

random.seed(42)

#Load Data
from data_loader import load_data, to_typed_2d_list

# Load optimization functions
from numba_adam import AdamOptimizer

In [2]:
(X_train, y_train), (X_test, y_test) = load_data()

#Convert to lists
X_train = to_typed_2d_list(X_train)
y_train = to_typed_2d_list(y_train)
X_test = to_typed_2d_list(X_test)
y_test = to_typed_2d_list(y_test)

In [11]:
# ----------------------------
# Numerical Kernels (decorated with njit)
# ----------------------------
@njit
def relu_kernel(x):
    n = len(x)
    # Preallocate a new typed list (here Numba infers type from first assignment)
    result = List()
    for i in range(n):
        if x[i] > 0:
            result.append(x[i])
        else:
            result.append(0.0)
    return result

@njit
def _softmax_kernel(x):
    # Compute a numerically stable softmax for a 1D typed list.
    n = len(x)
    max_val = x[0]
    for i in range(1, n):
        if x[i] > max_val:
            max_val = x[i]
    exps = List()
    sum_exps = 0.0
    for i in range(n):
        val = math.exp(x[i] - max_val)
        exps.append(val)
        sum_exps += val
    result = List()
    for i in range(n):
        result.append(exps[i] / sum_exps)
    return result

@njit
def softmax_kernel(x):
    # If x is a batch (typed list of typed lists), apply _softmax_kernel to each row.
    n = len(x)
    result = List()
    for i in range(n):
        result.append(_softmax_kernel(x[i]))
    return result

@njit
def cross_entropy_loss_kernel(y_true, y_pred):
    epsilon = 1e-9
    total_loss = 0.0
    batch_size = len(y_true)
    for i in range(batch_size):
        loss = 0.0
        n = len(y_true[i])
        for j in range(n):
            loss -= y_true[i][j] * math.log(y_pred[i][j] + epsilon)
        total_loss += loss
    return total_loss / batch_size

@njit
def forward_kernel(X, W1, b1, W2, b2, W3, b3, hidden_size):
    # Layer 1: z1 = X dot transpose(W1) + b1
    z1 = dot(X, transpose(W1))
    z1 = add_matrices(z1, b1)
    a1 = List()
    n = len(z1)
    for i in range(n):
        a1.append(relu_kernel(z1[i]))
    # Layer 2: z2 = a1 dot transpose(W2) + b2
    z2 = dot(a1, transpose(W2))
    z2 = add_matrices(z2, b2)
    a2 = List()
    n2 = len(z2)
    for i in range(n2):
        a2.append(relu_kernel(z2[i]))
    # Output layer: z3 = a2 dot transpose(W3) + b3
    z3 = dot(a2, transpose(W3))
    z3 = add_matrices(z3, b3)
    a3 = softmax_kernel(z3)
    return a3

@njit
def compute_gradients_kernel(a3, y_true, z2, z1, a2, a1, X, W3, W2, hidden_size):
    batch_size = len(X)
    dL_dz3 = subtract_matrices(a3, y_true)
    dL_dW3 = dot(transpose(dL_dz3), a2)
    dL_dW3 = scalar_multiply(1.0 / batch_size, dL_dW3)
    dL_db3 = scalar_multiply(1.0 / batch_size, sum_axis0(dL_dz3)[0])
    dL_da2 = dot(dL_dz3, W3)
    dL_dz2 = List()
    for i in range(batch_size):
        row = List()
        for j in range(hidden_size):
            if z2[i][j] > 0:
                row.append(dL_da2[i][j])
            else:
                row.append(0.0)
        dL_dz2.append(row)
    dL_dW2 = dot(transpose(dL_dz2), a1)
    dL_dW2 = scalar_multiply(1.0 / batch_size, dL_dW2)
    dL_db2 = scalar_multiply(1.0 / batch_size, sum_axis0(dL_dz2)[0])
    dL_da1 = dot(dL_dz2, W2)
    dL_dz1 = List()
    for i in range(batch_size):
        row = List()
        for j in range(hidden_size):
            if z1[i][j] > 0:
                row.append(dL_da1[i][j])
            else:
                row.append(0.0)
        dL_dz1.append(row)
    dL_dW1 = dot(transpose(dL_dz1), X)
    dL_dW1 = scalar_multiply(1.0 / batch_size, dL_dW1)
    dL_db1 = scalar_multiply(1.0 / batch_size, sum_axis0(dL_dz1)[0])
    gradients = List()
    gradients.append(dL_dW1)
    gradients.append(dL_db1)
    gradients.append(dL_dW2)
    gradients.append(dL_db2)
    gradients.append(dL_dW3)
    gradients.append(dL_db3)
    return gradients

In [12]:

class NeuralNetwork:
    def __init__(self, input_size, hidden_size, output_size):
        self.input_size = input_size
        self.hidden_size = hidden_size
        self.output_size = output_size
        self.init_params(input_size, hidden_size, output_size)

    # ----------------------------
    # Initialization (non-decorated)
    # ----------------------------
    def He_initialization(self, input_size, hidden_size):
        scale = math.sqrt(2 / input_size)
        matrix = List()
        for _ in range(hidden_size):
            row = List()
            for _ in range(input_size):
                row.append(random.gauss(0, 1) * scale)
            matrix.append(row)
        return matrix
    
    def init_params(self, input_size, hidden_size, output_size):
        self.W1 = self.He_initialization(input_size, hidden_size)
        self.b1 = List([0.0 for _ in range(hidden_size)])
        self.W2 = self.He_initialization(hidden_size, hidden_size)
        self.b2 = List([0.0 for _ in range(hidden_size)])
        self.W3 = self.He_initialization(hidden_size, output_size)
        self.b3 = List([0.0 for _ in range(output_size)])

    # ----------------------------
    # Instance method wrappers (not decorated)
    # ----------------------------
    def relu(self, x):
        return relu_kernel(x)

    def _softmax(self, x):
        return _softmax_kernel(x)

    def softmax(self, x):
        # If x is a batch (typed list of typed lists), use softmax_kernel.
        if isinstance(x[0], list):
            return softmax_kernel(x)
        else:
            return self._softmax(x)

    def cross_entropy_loss(self, y_true, y_pred):
        return cross_entropy_loss_kernel(y_true, y_pred)

    def forward(self, X):
        self.a3 = forward_kernel(X, self.W1, self.b1, self.W2, self.b2, self.W3, self.b3, self.hidden_size)
        # Also store intermediate activations for backpropagation.
        self.z1 = dot(X, transpose(self.W1))
        self.z1 = add_matrices(self.z1, self.b1)
        self.a1 = List()
        for z in self.z1:
            self.a1.append(self.relu(z))
        self.z2 = dot(self.a1, transpose(self.W2))
        self.z2 = add_matrices(self.z2, self.b2)
        self.a2 = List()
        for z in self.z2:
            self.a2.append(self.relu(z))
        self.z3 = dot(self.a2, transpose(self.W3))
        self.z3 = add_matrices(self.z3, self.b3)
        self.a3 = self.softmax(self.z3)
        return self.a3

    def compute_gradients(self, X, y_true):
        return compute_gradients_kernel(
            self.a3, y_true, self.z2, self.z1, self.a2, self.a1, X, self.W3, self.W2, self.hidden_size
        )

    def train(self, X_train, y_train, epochs, learning_rate):
        self.loss_values = List()
        start_time = time.time()
        params = List()
        params.append(self.W1)
        params.append(self.b1)
        params.append(self.W2)
        params.append(self.b2)
        params.append(self.W3)
        params.append(self.b3)
        optimizer = AdamOptimizer(params, learning_rate)
        with tqdm(range(epochs), desc='Training', unit='epoch') as pbar:
            for epoch in pbar:
                total_loss = 0.0
                y_pred = self.forward(X_train)
                total_loss += self.cross_entropy_loss(y_train, y_pred)
                self.loss_values.append(total_loss)
                gradients = self.compute_gradients(X_train, y_train)
                optimizer.update(gradients)
                self.W1 = optimizer.parameters[0]
                self.b1 = optimizer.parameters[1]
                self.W2 = optimizer.parameters[2]
                self.b2 = optimizer.parameters[3]
                self.W3 = optimizer.parameters[4]
                self.b3 = optimizer.parameters[5]
                pbar.set_postfix({'loss': self.loss_values[-1]})
        end_time = time.time()
        print(f'Training time: {end_time - start_time:.2f} seconds')

    def predict(self, X):
        predictions = self.forward(X)
        result = List()
        for p in predictions:
            max_val = -math.inf
            max_index = -1
            for idx in range(len(p)):
                if p[idx] > max_val:
                    max_val = p[idx]
                    max_index = idx
            result.append(max_index)
        return result

    def evaluate(self, X, y):
        predictions = self.predict(X)
        y_true = List()
        for t in y:
            idx = -1
            for i in range(len(t)):
                if t[i] == 1:
                    idx = i
                    break
            y_true.append(idx)
        correct = 0
        for p, t in zip(predictions, y_true):
            if p == t:
                correct += 1
        return correct / len(y) * 100.0

    def plot_loss(self):
        import matplotlib.pyplot as plt
        plt.plot(self.loss_values)
        plt.xlabel('Epoch')
        plt.ylabel('Loss')
        plt.title('Training Loss')
        plt.show()


In [13]:
#Test Neural Network
nn = NeuralNetwork(784, 128, 10)

In [14]:
y_preds = nn.predict(X_train[:10])

print(y_preds)

TypeError: missing a required argument: 'matrix'

In [14]:
nn.train(X_train[:10], y_train[:10], learning_rate=0.01, epochs=10)

y_preds = nn.predict(X_train[:10])

print(y_preds)

Training:   0%|          | 0/10 [00:02<?, ?epoch/s]


TypeError: unsupported operand type(s) for -: 'list' and 'List'