# Import libraries

In [None]:
import numpy as np 
import scipy.io as sio 
import scipy.optimize as opt
import matplotlib.pyplot as plt 
import pandas as pd
import time
from IPython.display import clear_output

# Load data 

In [None]:
training_data = sio.loadmat("ex4data1.mat")
X: np.array = np.array(training_data['X'], dtype=np.float128)
y: np.array = np.array(training_data['y'], dtype=np.float128)
# Map label 10 from matlab back to 0 (python has index 0)
y[y == 10] = 0

# Shuffle data
dataset = np.hstack((X, y))
np.random.shuffle(dataset)
# Split into training set, cross validation set and test set 
train_set = dataset[None:4000, :]
val_set = dataset[4000:4500, :]
test_set = dataset[4500:None, :]

X_train = train_set[:, None:-1]
y_train = train_set[:, -1:None]

X_val = val_set[:, None:-1]
y_val = val_set[:, -1:None]

X_test = test_set[:, None:-1]
y_test = test_set[:, -1:None]

print("Data loaded")
print("Size of X, y:", X.shape, y.shape)
print("Shape of X, y (train):", X_train.shape, y_train.shape)
print("Shape of X, y (vald):", X_val.shape, y_val.shape)
print("Shape of X, y (test):", X_test.shape, y_test.shape)

test_theta_data = sio.loadmat("ex4weights.mat")
TestTheta1: np.array = np.array(test_theta_data['Theta1'], dtype=np.double)
TestTheta2: np.array = np.array(test_theta_data['Theta2'], dtype=np.double)

print("Size of TestTheta1:", TestTheta1.shape)
print("Size of TestTheta2:", TestTheta2.shape)

# Display 100 random images

In [None]:
# Create 100 random indices
# randIdx = np.random.randint(0, X.shape[0], 100).reshape(10, 10)
# fig, ax = plt.subplots(10, 10)

# for i in range(randIdx.shape[0]):
#     for j in range(randIdx.shape[1]):
#         example = X[randIdx[i, j]]
#         example = example.reshape((20, 20)).T
#         ax[i, j].imshow(example, vmin=-1, vmax=1, cmap='gray')
#         ax[i, j].set_xticks([])
#         ax[i, j].set_yticks([])
# plt.show()

# Define utility functions

In [None]:
def sigmoid(z):
    return 1/(1 + np.exp(-z))
def gsigmoid(z):
    s = sigmoid(z)
    return np.multiply(s, 1 - s)

# Define hypothesis function (feed-forward algorithm)

- Init `a1=x`
- Add bias term to `a1`
- Calculate `z2 = Theta1 * a1`
- Calculate `a2 = sigmoid(z2)`
- Calculate `z3 = Theta2 * a2`
- Calculate `h = a3 = sigmoid(z3)`

In [None]:
def hypothesis(x, Theta1, Theta2):
    a1 = np.vstack((1, x.reshape(-1, 1)))
    z2 = Theta1 @ a1
    a2 = np.vstack((1, sigmoid(z2).reshape(-1, 1)))
    z3 = Theta2 @ a2 
    h = sigmoid(z3)
    return (
        a1.reshape(-1, 1), 
        a2.reshape(-1, 1), 
        h.reshape(-1, 1), 
        z2.reshape(-1, 1), 
        z3.reshape(-1, 1)
    )

# Define cost function and its gradient

In [None]:
def costFunction(X, y, lambd, nn_params, hidden_layer_size, output_layer_size):
    m = X.shape[0]
    input_layer_size = X.shape[1]
    # nn_params: unrolled Theta1, Theta2 
    # Extract Theta1, Theta2 from nn_params 
    Theta_vec = nn_params.reshape(-1, 1)
    breakpnt = (input_layer_size + 1) * hidden_layer_size
    Theta1 = Theta_vec[None:breakpnt, :].reshape((hidden_layer_size, input_layer_size + 1))
    Theta2 = Theta_vec[breakpnt:None, :].reshape((output_layer_size, hidden_layer_size + 1))
    
    J = 0 
    Theta1_grad = np.zeros(shape=Theta1.shape)
    Theta2_grad = np.zeros(shape=Theta2.shape)
    Delta1 = 0 
    Delta2 = 0
    for i in range(X.shape[0]):
        x = X[i, :].reshape((-1, 1))
        # Feed-forward
        a1, a2, h, z2, z3 = hypothesis(x, Theta1, Theta2)
        # print(np.sum(a1), np.sum(z2), np.sum(a2), np.sum(z3), np.sum(h))
        # break
        # Compute J 
        y_hop = np.zeros(output_layer_size).reshape(-1, 1)
        y_hop[int(y[i])] = 1
        j = -y_hop.T @ np.log(h) - (1 - y_hop).T @ np.log(1 - h)
        J += float(j)

        # Back propagation 
        delta3 = h - y_hop 
        delta2 = np.multiply(Theta2.T @ delta3, gsigmoid(np.vstack((0, z2))))
        delta2 = delta2[1:None]
        Delta1 += delta2 @ a1.T 
        Delta2 += delta3 @ a2.T 
    # Average 
    J *= 1/m 
    # Add regularized term for cost function 
    theta1_reg = Theta1
    theta1_reg[:,0] = 0 
    theta2_reg = Theta2
    theta2_reg[:,0] = 0 
    J += lambd/(2*m) * (np.sum(np.power(theta1_reg, 2)) + np.sum(np.power(theta2_reg, 2)))
    # Regularized term for theta_gradient 
    Theta1_grad = (1/m * Delta1) + (lambd/m) * theta1_reg
    Theta2_grad = (1/m * Delta2) + (lambd/m) * theta2_reg

    # Unroll gradient 
    grad = np.vstack((Theta1_grad.reshape(-1, 1), Theta2_grad.reshape(-1, 1))).reshape(-1, 1)

    return (float(J), grad)

# Define functions to calculate and check numerical gradients

In [None]:
def numericalGradient(J, theta):
    numgrad = np.zeros(shape=theta.shape)
    for i in range(len(theta)):
        e = np.power(10.0, -4, dtype=np.double)
        epsilon_vec = np.zeros(shape=theta.shape)
        epsilon_vec[i] = e 

        left_j, _ = J(theta - epsilon_vec)
        right_j, _ = J(theta + epsilon_vec)
        numgrad[i] = (right_j - left_j)/(2*e)
    return numgrad

def debugInitializeWeights(out_conn, in_conn):
    W = np.zeros((out_conn, 1 + in_conn))
    W = (np.sin(np.array(range(1, W.size+1)))/10).reshape(W.shape)
    return W

def initializeRandomWeights(L_in, L_out):
    W = np.zeros((L_out, 1 + L_in))
    e = 0.12 
    W = np.random.rand(L_out, 1 + L_in) * 2 * e - e
    return W


def checkNumGradient(lamd):
    input_layer_size = 3
    hidden_layer_size = 5 
    num_labels = 3 
    m = 5

    Theta1 = debugInitializeWeights(hidden_layer_size, input_layer_size)
    Theta2 = debugInitializeWeights(num_labels, hidden_layer_size)
    X = debugInitializeWeights(m, input_layer_size - 1)
    y = (np.array(range(1, m+1)) % num_labels).reshape(-1, 1) 

    cost_func_ptr = lambda theta: costFunction(X, y, lamd, theta, hidden_layer_size, num_labels)
    nn_params = np.concatenate((Theta1.reshape(-1, 1), Theta2.reshape(-1, 1)), axis=0)
    # Compute numerical gradient
    _, grad = cost_func_ptr(nn_params)
    
    num_grad = numericalGradient(cost_func_ptr, nn_params)

    print("Visual check")
    print(np.hstack((num_grad.reshape(-1, 1), grad.reshape(-1, 1))))

    diff = np.linalg.norm(num_grad - grad)/np.linalg.norm(num_grad + grad)
    print("Errors:", diff)

# Check backpropagation implementation

In [None]:
# checkNumGradient(0)
# checkNumGradient(10)

# Define gradient descent function

In [None]:
scinotation = lambda f: '{:.2e}'.format(f)

def NesterovGradientDescent(cost_func, gamma, learning_rate, init_theta, max_iters):
    # Prepare data for gradient descent
    theta = init_theta.reshape(-1, 1)
    v = np.zeros_like(theta)
    change_rate = 0

    i = 0
    for i in range(max_iters):
        J, grad = cost_func(theta - gamma * v)

        grad_length = np.linalg.norm(grad)/len(grad)
        if grad_length < 5e-6:
            print("\nStopped early")
            break

        v = v * gamma + learning_rate * grad 
        theta = theta - v 
        
        print("\r", 
            "Iteration:", i, 
            "; Gradient length:", scinotation(grad_length),
            "; Cost value:", scinotation(J), 
            "; Gradient:", scinotation(np.mean(grad).flatten()[0]), end=""
        )
    
    return theta, i

# Benchmarking functions

In [None]:
def calculateAccuracy(X, y, Theta1, Theta2):
    correct_counter = 0
    for i in range(X.shape[0]): 
        x = X[i, :].reshape(-1, 1)
        _, _, h, _, _ = hypothesis(x, Theta1, Theta2)
        label = np.argmax(h)
        if (label == y[i]):
            correct_counter += 1
    return correct_counter/X.shape[0]

# Train neural network

In [None]:
input_layer_size = X.shape[1]
hidden_layer_size = 25
output_layer_size = 10

lambds = [1, 10, 20, 40, 80, 100]
Thetas = []
cross_errors = []
iterations = []
accuracies = []
for lambd in lambds:
    print("Initializing neural network")
    init_Theta1 = initializeRandomWeights(input_layer_size, hidden_layer_size).reshape(-1, 1)
    init_Theta2 = initializeRandomWeights(hidden_layer_size, output_layer_size).reshape(-1, 1)
    init_Theta = np.vstack((init_Theta1, init_Theta2)).reshape(-1, 1)

    cost_func_ptr = lambda theta: costFunction(X_train, y_train, lambd, theta, hidden_layer_size, output_layer_size)
    
    print("Begin training with lambda=%d" % lambd)
    trained_Theta, iters = NesterovGradientDescent(cost_func_ptr, 0.9, 0.4, init_Theta, 150) 
    # Extract back theta1, theta2
    breakpnt = (input_layer_size + 1) * hidden_layer_size
    Theta1 = trained_Theta[None:breakpnt, :].reshape((hidden_layer_size, input_layer_size + 1))
    Theta2 = trained_Theta[breakpnt:None, :].reshape((output_layer_size, hidden_layer_size + 1))
    Thetas.append((Theta1, Theta2))

    cross_error, _ =  costFunction(X_val, y_val, 0, trained_Theta, hidden_layer_size, output_layer_size)
    cross_accuracy = calculateAccuracy(X_val, y_val, Theta1, Theta2)
    print("Cross validation error:%f" % cross_error)
    print("Cross validation accuracy:%f\n" % cross_accuracy)
    print("Iterations: %d" % iters)
    cross_errors.append(cross_error)
    iterations.append(iters)
    accuracies.append(cross_accuracy)

# Show learning results

In [None]:
result = pd.DataFrame()
result['lambda'] = lambds
result['iters'] = iterations
result['validation'] = cross_errors
result['accuracy'] = accuracies
print(result)

# Demo

In [None]:
while True:
    try:
        # Choose a random number
        randIdx = np.random.randint(0, X.shape[0] - 1)
        x = X[randIdx]
        # Predict
        _, _, h, _, _ = hypothesis(x, Theta1, Theta2)
        # Show result
        label = np.argmax(h)
        img = x.reshape((20, 20)).T
        plt.imshow(img)
        plt.title(f"Label: {label}", fontdict={"fontsize": 30})
        plt.show()
        time.sleep(1)
        clear_output(wait=True)
    except KeyboardInterrupt:
        print("Stopped")
        break
