In [None]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from sklearn.preprocessing import StandardScaler
import pickle

# ----------------------------
# User-defined Configuration
# ----------------------------

# Name of the CSV file containing training data
DATA_FILE = "ExampleFilename"    #Do not include ".csv"

# Choose the ratio of data which is to be used for training and validation 
# Recommended: 80% training and 20% validating
TRAIN_RATIO = 0.8
VALID_RATIO = 0.2

# Insert the desired name of your trained model to be stored as
model_name = "ExampleModelName"    #Do not include ".pkl"

# (Optional) random seed for reproducibility
# For truly random shuffling input: None
random_state = 42



# ----------------------------
# Data Loading and Preprocessing
# ----------------------------

# Checking that ratios sum to 1.
assert abs(TRAIN_RATIO + VALID_RATIO - 1.0) < 1e-6, "Training and validation ratios must sum to 1."

#Reads the datafile
data = pd.read_csv(f"{DATA_FILE}.csv")
data = data.ffill()

# Shuffles the dataset.
data = data.sample(frac=1, random_state=random_state).reset_index(drop=True)

# Defining predictors and target for our ANN.
PREDICTORS = [f"xy{i}" for i in range(1, 151)]
TARGET = "preLoad"

# Scaling the predictors.
scaler = StandardScaler()
data[PREDICTORS] = scaler.fit_transform(data[PREDICTORS])

# Split data into training and validation sets.
train_end = int(TRAIN_RATIO * len(data))
train_df = data.iloc[:train_end]
valid_df = data.iloc[train_end:]

train_x, train_y = train_df[PREDICTORS].to_numpy(), train_df[[TARGET]].to_numpy()
valid_x, valid_y = valid_df[PREDICTORS].to_numpy(), valid_df[[TARGET]].to_numpy()

# ----------------------------
# Functions for ANN and Grid Search
# ----------------------------

# ANN structure
layer_config = [150, 10, 10, 1]

def init_layers(layer_config):
    """Initialize ANN layers with random weights and biases."""
    layers = []
    for i in range(1, len(layer_config)):
        weights = np.random.rand(layer_config[i-1], layer_config[i]) / 5 - 0.1
        biases = np.ones((1, layer_config[i]))
        layers.append([weights, biases])
    return layers

def forward(batch, layers):
    """Forward propagation through the network."""
    hiddens = [batch.copy()]
    for i in range(len(layers)):
        batch = np.matmul(batch, layers[i][0]) + layers[i][1]
        # Use ReLU activation in hidden layers.
        if i < len(layers) - 1:
            batch = np.maximum(batch, 0)
        hiddens.append(batch.copy())
    return batch, hiddens

def backward(layers, hiddens, grad, lr):
    """Backward propagation through the network."""
    for i in range(len(layers) - 1, -1, -1):
        if i != len(layers) - 1:
            grad = np.multiply(grad, np.heaviside(hiddens[i+1], 0))  # ReLU derivative.
        w_grad = hiddens[i].T @ grad
        b_grad = np.mean(grad, axis=0)
        layers[i][0] -= lr * w_grad
        layers[i][1] -= lr * b_grad
        grad = grad @ layers[i][0].T
    return layers

def mse(actual, predicted):
    """Mean Squared Error (MSE)."""
    return (actual - predicted) ** 2

def mse_grad(actual, predicted):
    """Gradient of the MSE."""
    return (predicted - actual)

def compute_r2(y_true, y_pred):
    """Compute the R-squared metric."""
    ss_res = np.sum((y_true - y_pred) ** 2)
    ss_tot = np.sum((y_true - np.mean(y_true)) ** 2)
    return 1 - ss_res / ss_tot if ss_tot != 0 else 1.0

def regression_accuracy(actual, predicted):
    """Compute 'regression accuracy' as a percentage."""
    accs = []
    for a, p in zip(actual, predicted):
        if a == 0 and p == 0:
            acc = 100
        elif a == 0:
            acc = 0
        else:
            acc = (min(a, p) / max(a, p)) * 100
        accs.append(acc)
    return np.mean(accs)


def train_ann(train_x, train_y, valid_x, valid_y, lr, epochs, batch_size, layer_config, best_valid_mse=float('inf'), previous_best_layers=None):
    # Variable defined to separate Grid Search from final training
    initial_best_valid_mse = best_valid_mse
    
    layers = init_layers(layer_config)
    best_layers = None
    best_epoch = None

    for epoch in range(epochs):
        # Shuffle training data each epoch.
        perm = np.random.permutation(len(train_x))
        train_x_shuffled = train_x[perm]
        train_y_shuffled = train_y[perm]
        
        #Training the model
        for i in range(0, len(train_x_shuffled), batch_size):
            x_batch = train_x_shuffled[i:i+batch_size]
            y_batch = train_y_shuffled[i:i+batch_size]
            pred, hiddens = forward(x_batch, layers)
            grad = mse_grad(y_batch, pred) / (2 * x_batch.shape[0])
            layers = backward(layers, hiddens, grad, lr)

        # Validation: storing the best weights for our model
        valid_preds, _ = forward(valid_x, layers)
        valid_mse = np.mean(mse(valid_y, valid_preds))
        if valid_mse < best_valid_mse:
            best_valid_mse = valid_mse
            best_layers = [[np.copy(w), np.copy(b)] for w, b in layers]
            best_epoch = epoch + 1  # Add 1 so it starts at 

            
            
    # Info print when function is used in final training
    if initial_best_valid_mse < float('inf'):
        if best_layers is None:
            best_layers = previous_best_layers
            print("No improvement was found during final training; kept previous model parameters from Grid Search.")
        else:
            print(f"Improved validation MSE found at epoch {best_epoch} with a value of {best_valid_mse:.2f}")
    
    return best_layers, best_valid_mse


# ----------------------------
# Tuning Alogrithm for Hyperparameters using Grid Search
# ----------------------------

#Defined grid with hyperparameters to be tuned
lr_list = [1e-7, 1e-6, 5e-6, 1e-5]
batch_size_list = [8, 12, 16, 20]
epochs_list = [1000, 3000, 5000, 10000]


#Defining variables which store the best case (lowest Valid MSE)
best_overall_mse = float('inf')
best_params = {}
best_model_layers = None

print("Starting Grid Search for Hyperparameter Tuning...\n")
for lr in lr_list:
    for batch_size in batch_size_list:
        for epochs in epochs_list:
            print(f"Testing: lr={lr}, batch_size={batch_size}, epochs={epochs}")
            model_layers, valid_mse = train_ann(train_x, train_y, valid_x, valid_y, lr, epochs, batch_size, layer_config)
            print(f"  --> Validation MSE: {valid_mse:.2f}\n")
            if valid_mse < best_overall_mse:
                best_overall_mse = valid_mse
                best_params = {'lr': lr, 'batch_size': batch_size, 'epochs': epochs}
                best_model_layers = model_layers

print("Grid Search Completed.")
print("Best Hyperparameters Found:", best_params)
print(f"Best Validation MSE: {best_overall_mse:.2f}\n")


# ----------------------------
# Final Training with the Best Hyperparameters
# ----------------------------

# Can adjust final training epochs as desired. (Only adjusts the model if it finds an epoch with lower Valid MSE)
final_epochs = 20000
final_layers, _ = train_ann(train_x, train_y, valid_x, valid_y, best_params['lr'], final_epochs, best_params['batch_size'], layer_config, best_overall_mse, best_model_layers)

# Evaluating final model on validation data.
final_valid_preds, _ = forward(valid_x, final_layers)
final_valid_mse = np.mean(mse(valid_y, final_valid_preds))
final_r2 = compute_r2(valid_y.flatten(), final_valid_preds.flatten())
final_reg_acc = regression_accuracy(valid_y.flatten(), final_valid_preds.flatten())

# Display final validation metrics in a table.
print("\nFinal Model Evaluation on Validation Data:")
print(f"Final Valid MSE: {final_valid_mse:.2f}")
print(f"Final R²: {final_r2:.2f}")
print(f"Final Regression Accuracy: {final_reg_acc:.2f}%")

# ----------------------------
# Save the Best Model
# ----------------------------
model_data = {
    "layers": final_layers,
    "scaler": scaler,
    "predictors": PREDICTORS,
    "target": TARGET
}

with open(f"{model_name}.pkl", "wb") as f:
    pickle.dump(model_data, f)

print(f"\nBest model saved to '{model_name}.pkl'.")
