In [None]:
import numpy as np 
import matplotlib.pyplot as plt 
import pandas as pd 
import pickle
import warnings
warnings.filterwarnings("ignore")

plt.style.use("default")
plt.rc("text", usetex=True)
plt.rc("font", family="cm")
plt.rcParams["grid.color"] = (0.5, 0.5, 0.5, 0.2)

In [None]:
X_dataframe = pd.read_csv("/mnt/ferracci/features_dataframe_new.csv.gz")
X = np.load("/mnt/ferracci/features_new.npz", allow_pickle=True)['a']
y = np.array(pd.read_csv("/mnt/ferracci/targets_dataframe_new.csv.gz")["Qedep"])

In [None]:
# we are only interested in the selected features
with open('/home/ferracci/new_dataset/features_list.txt', 'r') as f:
    file_content = f.read()

features_list = file_content.split('\n')
selected_features_names = eval(features_list[0])

selected_X_dataframe = X_dataframe[selected_features_names]
X = X[:, [X_dataframe.columns.get_loc(name) for name in selected_features_names]]
selected_X_dataframe.head()

In [None]:
import torch 
import optuna
from torch import nn
import torch.nn.functional as F
from torch.utils.data import TensorDataset, DataLoader
from torchmetrics import MeanSquaredError
from sklearn.model_selection import train_test_split

device = "cuda" if torch.cuda.is_available() else "cpu"
X_train, X_valid, y_train, y_valid = train_test_split(X, y, test_size=0.3, shuffle=True)
mean_squared_error = MeanSquaredError().to(device)

# normalize dataset 
X_train_mean = np.mean(X_train, axis=0)
X_train_std = np.std(X_train, axis=0)
X_train_norm = (X_train - X_train_mean) / X_train_std
X_valid_norm = (X_valid - X_train_mean) / X_train_std 

train_dataset = TensorDataset(torch.Tensor(X_train_norm), torch.Tensor(y_train))
valid_dataset = TensorDataset(torch.Tensor(X_valid_norm), torch.Tensor(y_valid))

In [None]:
BATCH_SIZE=1024
train_dataloader = DataLoader(dataset=train_dataset, batch_size=BATCH_SIZE, shuffle=True)
valid_dataloader = DataLoader(dataset=valid_dataset, batch_size=BATCH_SIZE, shuffle=False)

In [None]:
class FCDNN(nn.Module):
    def __init__(self, n_features, n_hidden_layers, n_hidden_units, activation_name):
        super().__init__()
        self.n_hidden_layers = n_hidden_layers
        self.activation_name = activation_name

        # first hidden layer 
        self.dense_first = nn.Linear(n_features, n_hidden_units)

        # prototype of intermediate hidden layer
        self.dense = nn.Linear(n_hidden_units, n_hidden_units)

        # last hidden layer 
        self.dense_last = nn.Linear(n_hidden_units, 1)

    def forward(self, x):
        x = get_activation_layer(self.activation_name, self.dense_first(x))

        for i in range(self.n_hidden_layers-1):
            x = get_activation_layer(self.activation_name, self.dense(x))

        x = self.dense_last(x)

        return x

In [None]:
def train_step(model: torch.nn.Module, dataloader: torch.utils.data.DataLoader, loss_fn, 
               optimizer: torch.optim.Optimizer, device=device):
    train_loss, train_mse = 0, 0
    model.train()
    
    for batch_idx, (X, y) in enumerate(dataloader):
        X, y = X.to(device), y.to(device)
        y_pred = model(X).squeeze()
        loss = loss_fn(y_pred, y)
        train_loss += loss
        train_mse += mean_squared_error(y_pred, y)
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

    train_loss /= len(dataloader)
    train_mse /= len(dataloader)
    return train_loss, train_mse

In [None]:
def valid_step(model: torch.nn.Module, dataloader: torch.utils.data.DataLoader, loss_fn, device=device):
    valid_loss, valid_mse = 0, 0
    model.eval()
    
    with torch.inference_mode():
        for (X, y) in dataloader:
            X, y = X.to(device), y.to(device)
            y_pred = model(X).squeeze()
            valid_loss += loss_fn(y_pred, y)
            valid_mse += mean_squared_error(y_pred, y)
        valid_loss /= len(dataloader)
        valid_mse /= len(dataloader)

    return valid_loss, valid_mse

In [None]:
def get_activation_layer(activation_name, x):
    if activation_name == "SELU":
        activation_layer = F.selu(x)
    elif activation_name == "ELU":
        activation_layer = F.elu(x)
    else:
        activation_layer = F.relu(x)
    
    return activation_layer

In [None]:
def get_optimizer(trial, model):
    optimizer_name = trial.suggest_categorical("optimizer", ["Adam", "SGD", "RMSprop"])
    lr = trial.suggest_float("learning_rate", 1e-4, 1e-2, log=True)
  
    if optimizer_name == "Adam": 
        optimizer = torch.optim.Adam(model.parameters(), lr=lr)
    elif optimizer_name == "SGD":
        optimizer = torch.optim.SGD(model.parameters(), lr=lr, momentum=0.9)
    else:
        optimizer = torch.optim.RMSprop(model.parameters())
  
    return optimizer

In [None]:
def get_scheduler(trial, optimizer):
    scheduler_name = trial.suggest_categorical("scheduler", ["None", "Exp", "Cos"])

    if scheduler_name == "Exp":
        gamma = trial.suggest_float("gamma", 0.1, 0.9)
        scheduler = torch.optim.lr_scheduler.ExponentialLR(optimizer, gamma)
    elif scheduler_name == "Cos":
        T_max = trial.suggest_int("T_max", 10, 1000)
        scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max)
    else:
        scheduler = None

    return scheduler

In [None]:
def MAPELoss(output, target, epsilon=1e-7):
    error = torch.abs((target - output) / (target + epsilon))
    loss = torch.mean(error)
    
    return loss

In [None]:
def plot_learning_curves(best_epoch, train_losses, valid_losses, train_mses, valid_mses):
    fig, ax_left = plt.subplots(nrows=1, ncols=1, figsize=(7,5), dpi=150)

    ax_left.plot(range(best_epoch+1), np.array(train_losses[:best_epoch+1])*100, color="blue", label="Training MAPE")
    ax_left.plot(range(best_epoch+1), np.array(valid_losses[:best_epoch+1])*100, color="red", label="Validation MAPE")
    ax_left.set_ylim((0.0, 4))
    ax_left.set_ylabel("MAPE, $\%$ curves", fontsize=15)
    ax_left.set_xlabel("Number of epochs", fontsize=15)
    ax_left.tick_params(axis="both", which="major", labelsize=12)
    ax_left.tick_params(axis="both", which='minor', labelsize=12)

    ax_right = ax_left.twinx()
    ax_right.plot(range(best_epoch+1), train_mses[:best_epoch+1], color="orange", label="Training MSE")
    ax_right.plot(range(best_epoch+1), valid_mses[:best_epoch+1], color="purple", label="Validation MSE")
    ax_right.set_ylim((0.0, 0.04))
    ax_right.set_ylabel("MSE curves", fontsize=15, rotation=270, labelpad=20)
    ax_right.tick_params(axis="both", which="major", labelsize=12)
    ax_right.tick_params(axis="both", which='minor', labelsize=12)

    lines1, labels1 = ax_left.get_legend_handles_labels()
    lines2, labels2 = ax_right.get_legend_handles_labels()
    ax_left.legend(lines1 + lines2, labels1 + labels2, loc=0, fontsize=12, fancybox=False, edgecolor="k")
    ax_left.grid()

    fig.savefig("/home/ferracci/new_dataset/images/FCDNN_learning_curve.png", dpi=300, bbox_inches="tight", pad_inches=0.2)

    plt.close()

In [None]:
def objective(trial):
    n_hidden_layers = trial.suggest_int("hidden_layers", 6, 24)
    n_hidden_units = trial.suggest_int("hidden_units", 1, 256) 
    activation_name = trial.suggest_categorical("activation", ["ReLU", "SELU", "ELU"]) 

    global model
    model = FCDNN(13, n_hidden_layers, n_hidden_units, activation_name).to(device)
    optimizer = get_optimizer(trial, model)
    scheduler = get_scheduler(trial, optimizer)

    best_loss = float("inf")
    global best_epoch
    best_epoch = 0
    patience = 50
    epochs_no_improvement = 0
    epochs = 200
    global train_losses, train_mses, valid_losses, valid_mses
    train_losses, train_mses, valid_losses, valid_mses = [], [], [], []

    for epoch in range(epochs):
        train_loss, train_mse = train_step(model=model, dataloader=train_dataloader, loss_fn=MAPELoss, 
                                           optimizer=optimizer, device=device)
        valid_loss, valid_mse = valid_step(model=model, dataloader=valid_dataloader, loss_fn=MAPELoss, device=device)

        # handle pruning based on the intermediate value
        if trial.should_prune():
            print(f"Trial #{trial.number}. PRUNED")
            raise optuna.exceptions.TrialPruned()
        
        # manually implement early stopping
        if valid_loss < best_loss:
            best_loss = valid_loss
            best_epoch = epoch
            epochs_no_improvement = 0
            torch.save(model.state_dict(), f"/mnt/ferracci/fcdnn_tuned_{trial.number}.pth")
        else:
            epochs_no_improvement += 1
        if epochs_no_improvement == patience:
            break

        trial.report(best_loss, epoch)
        train_losses.append(train_loss.item())
        train_mses.append(train_mse.item())
        valid_losses.append(valid_loss.item())
        valid_mses.append(valid_mse.item())

    print(f"Trial #{trial.number}. Best MAPE: {best_loss:.5f}.")
    
    return best_loss

In [None]:
optuna.logging.set_verbosity(optuna.logging.WARNING)

def callback(study, trial):
    if study.best_trial == trial:
        plot_learning_curves(best_epoch, train_losses, valid_losses, train_mses, valid_mses)

study = optuna.create_study(direction="minimize")
study.optimize(objective, n_trials=200, callbacks=[callback])

trial = study.best_trial

print(f"\nBest MAPE: {trial.value:.5f}")

In [None]:
results_dataframe = study.trials_dataframe()
results_dataframe.head()

In [None]:
trials = results_dataframe[(results_dataframe["state"] == "COMPLETE") & (results_dataframe["value"] < 0.015)]
trials = trials.drop(["number", "datetime_start", "datetime_complete", "duration", "state", "params_T_max", "params_gamma"], axis=1)

# swap columns so that mse is the last column
columns = list(trials.columns)
trials[columns[0]] = 100*trials[columns[0]]
columns = [columns[2], columns[3], columns[1], columns[5], columns[4], columns[6]] + [columns[0]]
trials = trials[columns]
labels = ["number of\nlayers", "number of\nunits", "activation function", "optimizer", "learning rate", "learning scheduler", "MAPE, \%"]
trials.head()

In [None]:
from helper_functions.parallel_coordinates_plot import * 

fig = plot_parallel_coordinates(trials, labels, linewidth=0.8, alpha=0.8)
fig.set_dpi(150)
fig.supylabel("Hyperparameter tuning", fontsize=15, x=0.05)
fig.savefig("/home/ferracci/new_dataset/images/FCDNN_hyperparameter_tuning.png", dpi=300, bbox_inches="tight", pad_inches=0.2);

In [None]:
params = study.best_params

# save dictionary to a file
with open("/home/ferracci/new_dataset/fcdnn_study.pkl", "wb") as file:
    pickle.dump(params, file)

In [None]:
# load dictionary from file
with open("/home/ferracci/new_dataset/fcdnn_study.pkl", "rb") as file:
    params = pickle.load(file)

params

### Model Evaluation

In [None]:
from pathlib import Path
from helper_functions.model_evaluation import plot_gaussian_fit
from helper_functions.model_evaluation import energy_res_fit
from helper_functions.model_evaluation import get_a_tilde

In [None]:
X_test_files = list(Path("/mnt/ferracci/").glob("features_test_*"))
y_test_files = list(Path("/mnt/ferracci/").glob("targets_dataframe_test_*"))
X_test, y_test = [], []

for X_test_file in X_test_files:
    X_test.append(np.load(X_test_file)["a"][:, [X_dataframe.columns.get_loc(name) for name in selected_features_names]])
for y_test_file in y_test_files:
    y_test.append(np.array(pd.read_csv(y_test_file)["Qedep"]))

energies = [0, 1, 10, 7, 6, 2, 0.1, 9, 5, 3, 8, 4, 0.3, 0.6]
X_test = [x for _, x in sorted(zip(energies, X_test))]
y_test = [x for _, x in sorted(zip(energies, y_test))]

In [None]:
# load dictionary from file
with open("/home/ferracci/new_dataset/fcdnn_study.pkl", "rb") as file:
    params = pickle.load(file)
    
# load the saved model from file
best_model = FCDNN(13, params["hidden_layers"], params["hidden_units"], params["activation"])
best_model.load_state_dict(torch.load(f"/mnt/ferracci/fcdnn_tuned_{trial.number}.pth"))

In [None]:
bias, res = [], []
err_bias, err_res = [], []

for i in range(len(X_test)):
    best_model.eval()
    with torch.inference_mode():
        X_test[i] = (X_test[i] - X_train_mean) / X_train_std
        X_test[i], y_test[i] = torch.Tensor(X_test[i]).float(), torch.Tensor(y_test[i])
        y_pred = best_model(X_test[i]).squeeze()
        err = (y_test[i] - y_pred).numpy()
        err = err[err - np.mean(err) < 5*np.std(err)]

    mean, std, err_mean, err_std = plot_gaussian_fit(data=err, n_bins=100, name="fcdnn", index=i)
    bias.append(100 * mean / np.mean(y_test[i].numpy()))
    res.append(100 * std / np.mean(y_test[i].numpy()))
    err_bias.append(100 * err_mean / np.mean(y_test[i].numpy()))
    err_res.append(100 * err_std / np.mean(y_test[i].numpy()))

# get fit parameters
a, b, c, pcov = energy_res_fit([np.mean(y_test[i].numpy()) for i in range(1, len(y_test)-1)], res[1:-1], err_res[1:-1])
err_a, err_b, err_c = np.sqrt(np.abs(np.diag(pcov)[0])), np.sqrt(np.abs(np.diag(pcov)[1])), np.sqrt(np.abs(np.diag(pcov)[2]))
cov_ab, cov_ac, cov_bc = pcov[0, 1], pcov[0, 2], pcov[1, 2]

print(f"a = {a:.3f} +/- {err_a:.3f}")
print(f"b = {b:.3f} +/- {err_b:.3f}")
print(f"c = {c:.3f} +/- {err_c:.3f}")

a_tilde, err_a_tilde = get_a_tilde(a, b, c, err_a, err_b, err_c, cov_ab, cov_ac, cov_bc)
print(f"\nã = {a_tilde:.3f} +/- {err_a_tilde:.3f}")

with open('/home/ferracci/new_dataset/fcdnn_results.txt', 'w') as f:
    f.write(str(bias))
    f.write('\n')
    f.write(str(res))
    f.write('\n')
    f.write(str(err_bias))
    f.write('\n')
    f.write(str(err_res))
    f.write('\n')
    f.write(str([a, b, c, err_a, err_b, err_c, a_tilde, err_a_tilde]))