## Hyperparameter tuning via Optuna for Binary MLP model

### Being a binary model this notebook will be limited to predicting one class 1 or 0, yes or no.
### Here I will be predicting if a cell received a treatment or not

In [None]:
import sys
import torch
import optuna
import plotly

import pandas as pd
from pathlib import Path
from sklearn import preprocessing
from sklearn.model_selection import train_test_split
import torch.nn as nn
import torch.optim as optim
import numpy as np
import matplotlib.pyplot as plt
from sklearn.metrics import confusion_matrix
from sklearn.metrics import classification_report
from sklearn.metrics import roc_auc_score
from sklearn.metrics import f1_score
from sklearn.metrics import precision_score
from sklearn.metrics import recall_score
from sklearn.metrics import accuracy_score
from sklearn.metrics import auc
from sklearn.metrics import roc_curve


sys.path.append("..")
from utils.utils import df_stats

In [None]:
# Constants
# Subset option yes or no? if no SUBSET_NUMBER won't be used
SUBSET_OPTION = False

# number of rows to subset main df for
# in this casse each row is 1 cell
SUBSET_NUMBER = 15000

# Batch of data to load into data loader (1 is equivalent to 1 row or 1 cell in this case)
BATCH_SIZE = 100

# number of epochs to use for model optimization
OPTIM_EPOCHS = 100
# number of trials to use for model optimization
N_TRIALS = 500

# number of epochs to use for optimized model
TRAIN_EPOCHS = 100


# device use
# defined as global for use in the optimizer function and training function
# global DEVICE
DEVICE = torch.device("cuda" if (torch.cuda.is_available()) else "cpu")
print(DEVICE)

In [None]:
# Import Data
# set data file path under pathlib path for multi-system use
file_path = Path(
    "../../Extracted_Features_(CSV_files)/interstellar_wave3_sc_norm_fs_cellprofiler.csv.gz"
)
df = pd.read_csv(
    file_path,
    low_memory=False,
)

Combine treatment with dosage to be able to discern treatments with different doses as a different condition

In [None]:
# Combine treatment and dose
df["Metadata_treatment"] = df["Metadata_treatment"] + "_" + df["Metadata_dose"]
print(df["Metadata_treatment"].unique())

# Generate df speceific to analysis and model
df = df.query(
    "Metadata_treatment == 'LPS_10µg/ml'| Metadata_treatment == 'Media only_0'"
)
print(df["Metadata_treatment"].unique())

df_stats(df)
# Drop na and reindex accordingly
df = df.dropna()
df.reindex()
# Check for Nans again
df_stats(df)
# Understand categorical data such as treatment and dosing
df[["Metadata_treatment", "Metadata_dose"]].drop_duplicates()
if SUBSET_OPTION:
    df = df.sample(n=SUBSET_NUMBER)
else:
    pass
# Code snipptet for metadata extraction by Jenna Tomkinson
df_metadata = list(df.columns[df.columns.str.startswith("Metadata")])

# define which columns are data and which are descriptive
df_descriptive = df[df_metadata]
df_values = df.drop(columns=df_metadata)

### Setting up data for network training

In [None]:
# Creating label encoder
le = preprocessing.LabelEncoder()
# Converting strings into numbers
df_values["Metadata_treatment"] = le.fit_transform(df_descriptive["Metadata_treatment"])
# split into X and Y where Y are the predictive column and x are the observable data
df_values_X = df_values.drop("Metadata_treatment", axis=1)
df_values_Y = df_values["Metadata_treatment"]

# Random seed set for reproducibility
seed = 1
# split data into train-test
X_train, X_test, Y_train, Y_test = train_test_split(
    df_values_X, df_values_Y, test_size=0.15, random_state=seed, stratify=df_values_Y
)
# split train data into train-validate
X_train, X_val, Y_train, Y_val = train_test_split(
    X_train, Y_train, test_size=0.2, random_state=seed, stratify=Y_train
)

# reset the index to avoid downstream errors
X_train = X_train.reset_index(drop=True)

In [None]:
# Data class for x and y data
class Dataset:
    """
    A class for formatting data for a data loader

    Attributes:
    ----------
    X : Pandas DataFrame
        the X dimension of data (features)
    Y : Pandas DataFrame
        the Y dimension of data (predictor)

    Methods
    -------
    __len__:
        returns the length of the X dimension
    -------
    __getitem__:
        returns a row of the X and Y dimension given an index
    """

    def __init__(
        self,
        X,
        Y,
    ):
        self.X = X
        self.Y = Y

    def __len__(self):
        return len(self.X)

    def __getitem__(self, idx):
        return self.X[idx], self.Y[idx]

In [None]:
# produce data objects for train, val and test datasets
train_data = Dataset(
    torch.FloatTensor(X_train.values), torch.FloatTensor(Y_train.values)
)
val_data = Dataset(torch.FloatTensor(X_val.values), torch.FloatTensor(Y_val.values))
test_data = Dataset(torch.FloatTensor(X_test.values), torch.FloatTensor(Y_test.values))

IN_FEATURES = X_train.shape[1]
print("Number of in features: ", IN_FEATURES)
out_features = len(df_values["Metadata_treatment"].unique())
print("Number of out features: ", out_features)

In [None]:
# convert data class into a dataloader to be compatible with pytorch
train_loader = torch.utils.data.DataLoader(dataset=train_data, batch_size=BATCH_SIZE)
valid_loader = torch.utils.data.DataLoader(dataset=val_data, batch_size=BATCH_SIZE)
test_loader = torch.utils.data.DataLoader(dataset=test_data, batch_size=BATCH_SIZE)

In [None]:
# based on https://www.kaggle.com/code/ludovicocuoghi/pytorch-pytorch-lightning-w-optuna-opt
def build_model_custom(trial, in_features):
    """
    This function lays out the general arcitecture of a Nueral Network.
    There are variables throughout to optimizet the hyperparameters of this model.
    This function is meant to be used with optuna to optimize functions.

    Parameters:
        trial : optuna object
            an optuna object foe which optimizatioon trial to input for
            what parameters to use in the derfined search space
        in_features : int
            the number of input features to define the shape of the model

    Return:
        nn.Sequential(*layers) : dict
            this returns in a dict the architecture of the model with optimized parameters
    """

    # number of hidden layers
    # suugest.int takes into account the defined search space
    n_layers = trial.suggest_int("n_layers", 1, 10)

    #  layers will be added to this list and called upon later
    layers = []

    for i in range(n_layers):

        # the number of units within a hidden layer
        out_features = trial.suggest_int("n_units_l{}".format(i), 2, 50)

        layers.append(nn.Linear(in_features, out_features))
        # activation function
        layers.append(nn.ReLU())

        # dropout rate
        p = trial.suggest_float("dropout_{}".format(i), 0.1, 0.5, step=0.05)
        layers.append(nn.Dropout(p))
        in_features = out_features

    # final layer append
    layers.append(nn.Linear(in_features, 1))

    # add layers to the model
    return nn.Sequential(*layers)

In [None]:
# function for training and tracking model
def objective(trial, return_info=False):
    """
    This function trains the model and tests it on validation data.
    The accuarcy and loss output is how the success of the model is tracked.

    Parameters:
        trial : optuna object

        return_info : bool
            If set to False only one metric will be returned to optimize the model
            If set to True multiple metrics will be returned via printing
            This is required as the optmization of the model is tacked by one output metric
            and returning more than one metric will cause the optimization to fail but after optmization
            it is nice to know about what the other output metrics are

    Return:
        metric(s) : str or float
            if return_info == True:
                Mean Validation Accuracy
                Mean Validation Loss
                Mean Training Accuracy
                Mean Training Loss
            if return_info == False:
                return the mean validation accuracy

    """

    # calling model function
    model = build_model_custom(trial, IN_FEATURES)

    # param dictionary for optimization
    params = {
        "learning_rate": trial.suggest_float("learning_rate", 1e-5, 1e-1),
        "optimizer": trial.suggest_categorical("optimizer", ["Adam", "RMSprop", "SGD"]),
        "n_unit": trial.suggest_int("n_unit", 1, 50),
    }

    # param optimizer pick
    optimizer = getattr(optim, params["optimizer"])(
        model.parameters(), lr=params["learning_rate"]
    )
    # loss function

    # for binary model use different for multi-class
    criterion = nn.BCEWithLogitsLoss()

    # send model to device(cuda)

    model = model.to(DEVICE)
    criterion = criterion.to(DEVICE)

    # train set accuracy and loss
    train_acc = []
    train_loss = []

    # validation set accuracy and loss
    valid_acc = []
    valid_loss = []

    # total number of data to pass through
    total_step = len(train_loader)
    total_step_val = len(valid_loader)

    for epoch in range(OPTIM_EPOCHS):

        running_loss = 0
        correct = 0
        total = 0

        # TRAINING
        model.train()

        for batch_idx, (X_train_batch, y_train_batch) in enumerate(train_loader):
            X_train_batch, y_train_batch = X_train_batch.to(DEVICE), y_train_batch.to(
                DEVICE
            )
            optimizer.zero_grad()
            output = model(X_train_batch)
            y_pred = torch.round(torch.sigmoid(output))
            # LOSS
            loss = criterion(output, y_train_batch.unsqueeze(1))
            loss.backward()
            optimizer.step()
            running_loss += loss.item()  # sum all batch losses
            # ACCURACY
            correct += torch.sum(y_pred == y_train_batch.unsqueeze(1)).item()
            total += y_train_batch.size(0)
        train_acc.append(100 * correct / total)
        train_loss.append(
            running_loss / total_step
        )  # get average loss among all batches dividing total loss by the number of batches

        # VALIDATION
        correct_v = 0
        total_v = 0
        batch_loss = 0
        with torch.no_grad():
            model.eval()
            for batch_idx, (X_valid_batch, y_valid_batch) in enumerate(valid_loader):
                X_valid_batch, y_valid_batch = X_valid_batch.to(
                    DEVICE
                ), y_valid_batch.to(DEVICE)
                # PREDICTION
                output = model(X_valid_batch)
                y_pred = torch.round(torch.sigmoid(output))
                # LOSS
                loss_v = criterion(output, y_valid_batch.unsqueeze(1))
                batch_loss += loss_v.item()
                # ACCURACY
                correct_v += torch.sum(y_pred == y_valid_batch.unsqueeze(1)).item()
                total_v += y_valid_batch.size(0)
            valid_acc.append(100 * correct_v / total_v)
            valid_loss.append(batch_loss / total_step_val)

        trial.report(np.mean(valid_loss), epoch)

        # Handle pruning based on the intermediate value
        if trial.should_prune():
            raise optuna.exceptions.TrialPruned()

    # I want information returned but only 1 metric required for the optimize function called by study.optimize
    # with out this conditional statement the optimization will fail
    if return_info == True:
        print(f"Validation Accuracy: {np.mean(valid_acc)}")
        print(f"Validation Loss: {np.mean(valid_loss)}")
        print(f"Training Accuracy: {np.mean(train_acc)}")
        print(f"Training Loss: {np.mean(train_loss)}")
    else:
        return np.mean(valid_acc)

In [None]:
# Study is the object for model optimzation
study = optuna.create_study(direction="maximize")
# Here I appply the optimize function of the study to the objective function \
# This optimizes each parameter specified to be optinmized from the defined search space
study.optimize(objective, n_trials=N_TRIALS)
# Prints out the best trial's optimized parameters
objective(study.best_trial, True)

In [None]:
optuna.visualization.plot_optimization_history(study)

In [None]:
optuna.visualization.plot_intermediate_values(study)

In [None]:
def extract_best_trial_params(best_params):
    """
    This function extractions the best parameters from the best trial.

    These extracted parameters will be used to create a new model.

    Parameters:
        best_params : obj
            study.best_params
            the best_params function of the study object
    Return:
        param_dict : dict
            dictionary of all of the params for the best trial model
    """

    params = best_params
    units = []
    dropout = []
    n_layers = params["n_layers"]
    optimizer = params["optimizer"]
    lr = params["learning_rate"]
    for i in range(params["n_layers"]):
        units.append(params[f"n_units_l{i}"])
        dropout.append(params[f"dropout_{i}"])
    param_dict = {
        "units": units,
        "dropout": dropout,
        "n_layers": n_layers,
        "optimizer": optimizer,
        "learning_rate": lr,
    }
    return param_dict

In [None]:
# call function
param_dict = extract_best_trial_params(study.best_params)

In [None]:
# function for new optimized model
def optimized_model(in_features, paramater_dict):
    """
    This function uses the extracted optimized functions to create a new model

    Parameters:
        in_features : int
            this is the number of in features to used for the model
        parameter_dict : dict
            this is a dictionary returned fropm the extract_best_trial_params function

    Return:
        nn.Sequential(*layers) : dict
            this returns in a dict the architecture of the model with optimized parameters
    """
    n_layers = paramater_dict["n_layers"]

    layers = []

    in_features

    for i in range(n_layers):

        out_features = paramater_dict["units"][i]

        layers.append(nn.Linear(in_features, out_features))
        layers.append(nn.ReLU())
        p = paramater_dict["dropout"][i]
        layers.append(nn.Dropout(p))
        in_features = out_features
    layers.append(nn.Linear(out_features, 1))

    return nn.Sequential(*layers)

In [None]:
model = optimized_model(IN_FEATURES, param_dict).cuda()
criterion = nn.BCEWithLogitsLoss()
optim_method = param_dict["optimizer"].strip("'")
optimizer = f'optim.{optim_method}(model.parameters(), lr=param_dict["learning_rate"])'
optimizer = eval(optimizer)

In [None]:
# Model Training
def train_optimized_model(EPOCHS):
    """
    This function trains the optimized model on the training dataset

    Parameters:
        EPOCHS : int
            the number of epochs to train the model for
    Return:
        training metrics : str

    """
    early_stopping_patience = 15
    early_stopping_counter = 0

    train_acc = []
    train_loss = []

    valid_acc = []
    valid_loss = []

    total_step = len(train_loader)
    total_step_val = len(valid_loader)

    valid_loss_min = np.inf

    for epoch in range(EPOCHS):

        running_loss = 0
        correct = 0
        total = 0

        # TRAINING

        model.train()

        for batch_idx, (X_train_batch, y_train_batch) in enumerate(train_loader):
            X_train_batch, y_train_batch = X_train_batch.to(DEVICE), y_train_batch.to(
                DEVICE
            )
            optimizer.zero_grad()
            output = model(X_train_batch)
            y_pred = torch.round(torch.sigmoid(output))
            # LOSS
            loss = criterion(output, y_train_batch.unsqueeze(1))
            loss.backward()
            optimizer.step()
            running_loss += loss.item()  # sum loss for every batch
            # ACCURACY
            correct += torch.sum(y_pred == y_train_batch.unsqueeze(1)).item()
            total += y_train_batch.size(0)
        train_acc.append(
            100 * correct / total
        )  # calculate accuracy among all entries in the batches
        train_loss.append(
            running_loss / total_step
        )  # get average loss among all batches dividing total loss by the number of batches

        # VALIDATION
        correct_v = 0
        total_v = 0
        batch_loss = 0
        with torch.no_grad():
            model.eval()
            for batch_idx, (X_valid_batch, y_valid_batch) in enumerate(valid_loader):
                X_valid_batch, y_valid_batch = X_valid_batch.to(
                    DEVICE
                ), y_valid_batch.to(DEVICE)
                # PREDICTION
                output = model(X_valid_batch)
                y_pred = torch.round(torch.sigmoid(output))
                # LOSS
                loss_v = criterion(output, y_valid_batch.unsqueeze(1))
                batch_loss += loss_v.item()
                # ACCURACY
                correct_v += torch.sum(y_pred == y_valid_batch.unsqueeze(1)).item()
                total_v += y_valid_batch.size(0)
            valid_acc.append(100 * correct_v / total_v)
            valid_loss.append(batch_loss / total_step_val)

        if np.mean(valid_loss) <= valid_loss_min:
            torch.save(model.state_dict(), "./state_dict.pt")
            print(
                f"Epoch {epoch + 0:01}: Validation loss decreased ({valid_loss_min:.6f} --> {np.mean(valid_loss):.6f}).  Saving model ..."
            )
            valid_loss_min = np.mean(valid_loss)
            early_stopping_counter = 0  # reset counter if validation loss decreases
        else:
            print(f"Epoch {epoch + 0:01}: Validation loss did not decrease")
            early_stopping_counter += 1

        if early_stopping_counter > early_stopping_patience:
            print("Early stopped at epoch :", epoch)
            break

        print(
            f"\t Train_Loss: {np.mean(train_loss):.4f} Train_Acc: {(100 * correct / total):.3f} Val_Loss: {np.mean(valid_loss):.4f}  BEST VAL Loss: {valid_loss_min:.4f}  Val_Acc: {(100 * correct_v / total_v):.3f}\n"
        )

In [None]:
# call the optimized trainig model
train_optimized_model(TRAIN_EPOCHS)

In [None]:
def test_optimized_model():
    """
    This function tests the trained optimized model on test dataset

    Parameters:
        None
    Return:
        y_pred_list : list
            lsit of predicted values
        y_pred_prob_list : list
            lsit of probabilities for predicted values
    """
    y_pred_prob_list = []
    y_pred_list = []

    # Loading the best model
    # model.load_state_dict(torch.load('./state_dict.pt'))

    with torch.no_grad():
        model.eval()
        for batch_idx, (X_test_batch, y_test_batch) in enumerate(test_loader):
            X_test_batch = X_test_batch.to(DEVICE)
            # PREDICTION
            output = model(X_test_batch)
            y_pred_prob = torch.sigmoid(output)
            y_pred_prob_list.append(y_pred_prob.cpu().numpy())
            y_pred = torch.round(y_pred_prob)
            y_pred_list.append(y_pred.cpu().numpy())
    y_pred_prob_list = [a.squeeze().tolist() for a in y_pred_prob_list]
    y_pred_list = [a.squeeze().tolist() for a in y_pred_list]

    return y_pred_list, y_pred_prob_list

In [None]:
# calling the testing function and outputing list values of tested model
y_pred_list, y_pred_prob_list = test_optimized_model()

# If output list is nested
def un_nest(lst):
    """
    returns an un-nested list from a nested list

    Parameters:
        lst : list
            a list of lists
    """
    new_lst = []
    for i in lst:
        for j in i:
            new_lst.append(j)
    return new_lst


# un-nest list if nested i.e. length of input data does not match length of output data
if len(y_pred_list) != len(Y_test):
    y_pred_list = un_nest(y_pred_list)
    y_pred_prob_list = un_nest(y_pred_prob_list)
else:
    pass

In [None]:
def results_output(prediction_list, prediction_probability_list, test_data):
    """
    Function outputs visulaization of testing the model

    Parameters:
        prediction_list : lsit of predicted values from model
        prediction_probability_list : list of probabailities of predicted values from model
        test_data : input data to model

    Return:
        classification report
        confusion matrix
        AUC graph of accuracy and false positive rates

    """
    # Classification report
    print(classification_report(test_data, prediction_list))

    # confusion matrix
    confusion_matrix(test_data, prediction_list)

    # AUC graph of accuracy and false positive rates
    plt.figure(figsize=(5.5, 4))
    fpr, tpr, _ = roc_curve(test_data, prediction_probability_list)
    roc_auc = auc(fpr, tpr)
    plt.plot(fpr, tpr, "b", label="AUC = %0.2f" % roc_auc)
    plt.plot([0, 1], [0, 1], "r--")
    plt.title("ROC curve", fontsize=25)
    plt.ylabel("True Positive Rate", fontsize=18)
    plt.xlabel("False Positive Rate", fontsize=18)
    plt.legend(
        loc="lower right",
        fontsize=24,
        fancybox=True,
        shadow=True,
        frameon=True,
        handlelength=0,
    )
    plt.show()

In [None]:
# Call visulalization function
results_output(y_pred_list, y_pred_prob_list, Y_test)