# Setup

In [1]:
# library to check function types of imported modules
from typeguard import install_import_hook

# custom functions for plotting, etc.
with install_import_hook('custom_ml_plots'):
    import custom_ml_plots as cmp
with install_import_hook('custom_dataset_tools'):
    import custom_dataset_tools as cdt
with install_import_hook('basic_ml_operations'):
    import basic_ml_operations as bmo
with install_import_hook('ml_data_objects'):
    import ml_data_objects as mdo

# data import
import pyreadr

# data storage libraries
import pandas as pd
import numpy as np
from scipy.stats import pearsonr

from typing import Tuple

from matplotlib import pyplot as plt


# Scaling
from sklearn.preprocessing import StandardScaler

# k-fold cross-validation
from sklearn.model_selection import KFold

# global parameters
RANDOM_STATE = 42
TEST_SET_PORTION = 0.15
CV_SET_PORTION = 0.15
TOP_LINE_THRESH = 0.8

## Import Data

In [None]:
# import dataset
eyt1 = pyreadr.read_r('./data/eyt1.RData')

# extract training example labels
y = eyt1['Pheno_Disc_Env1']

y.head()

In [None]:
y = y[['GY']].set_index(y['GID'])

# sort by index
y = y.sort_index()

# check missing values
cdt.assert_no_bad_values(y)

# each seed was planted in 4 different environments, but we don't care about environmental differences
# so we take the average of every group of four rows to reduce the dataset to 1/4 its original size
y = cdt.avg_rows(y, 4)

y.head()

In [None]:
# extract feature matrix
X = eyt1['Geno_Env1']

X.head()

In [None]:
# scale feature matrix
X_scaler = StandardScaler()
X_sc = pd.DataFrame(X_scaler.fit_transform(X), index=X.index, columns=X.columns)

y_scaler = StandardScaler()
y_sc = pd.DataFrame(y_scaler.fit_transform(y), index=y.index, columns=y.columns)

y_sc.head()

In [6]:

def plot_shaded_scatter_grids(y_preds_grid: np.ndarray, y_test_grid: np.ndarray, axis1_params: mdo.AxisParams, axis2_params: mdo.AxisParams, pearson_grid: np.ndarray, plot_title: str, i: int):
    """
    Plot predictions vs actuals and colour by pearson coefficient and add best fit
    Created: 2024/11/30
    """
    # create plot of predictions vs actuals
    fig, axs = cmp.create_scatter_grid(y_preds_grid, y_test_grid, axis1_params, axis2_params, f"{plot_title} | Inner Fold {i}")

    # colour by pearson coefficient and add best fit and title
    cmp.color_spectrum(fig, axs, pearson_grid, label="Pearson Coefficient")
    cmp.add_best_fit(axs)
    plt.show(fig)
    plt.close(fig)

In [7]:

def plot_shaded_roc_grids(y_preds_grid: np.ndarray, y_test_grid: np.ndarray, axis1_params: mdo.AxisParams, axis2_params: mdo.AxisParams, f1_grid: np.ndarray, plot_title: str, i: int):
    """
    Plot predictions vs actuals and colour by pearson coefficient and add best fit
    Created: 2024/12/22
    """
    # create plot of predictions vs actuals
    fig, axs = cmp.create_roc_grid(y_preds_grid, y_test_grid, axis1_params, axis2_params, f"{plot_title} | Inner Fold {i}")

    # colour by pearson coefficient and add best fit and title
    cmp.color_spectrum(fig, axs, f1_grid, label="f1 Score")
    cmp.add_best_fit(axs)
    plt.show(fig)
    plt.close(fig)

# Model R

In [8]:
def inner_CV_R(n_splits: int, X : pd.DataFrame, y : pd.DataFrame, axis1_params: mdo.AxisParams, axis2_params: mdo.AxisParams, train_model_callback, kfold_random_state: int, plot_title: str = "", **kwargs):
    kfold = KFold(n_splits=n_splits, shuffle=True, random_state=kfold_random_state)

    # arrays to store best parameters for each fold
    best_params = pd.DataFrame(columns=['param1', 'param2'], index=range(n_splits))

    # Iterate through each train-test split
    for i, (train_index, test_index) in enumerate(kfold.split(X)):
        """
        # For debugging
        print(f'Fold {i}')
        """
        
        # Split the data into train and test sets
        X_train, X_test = X.iloc[train_index], X.iloc[test_index]
        y_train, y_test = y.iloc[train_index], y.iloc[test_index]
        
        """
        # For debugging
        print('Training data:')
        print(f'X: {X_train}')
        print(f'y: {y_train}')
        """
        
        # train model grid
        model_grid = bmo.train_model_grid(X_train, y_train, axis1_params, axis2_params, train_model_callback, **kwargs)

        # use trained models to predict test set
        y_preds_grid = bmo.grid_predict(X_test, model_grid)

        # create grid of actuals to compare against predictions
        y_test_grid = cdt.np_array_of_dfs(y_test, y_preds_grid.shape)

        # evaluate predictions by comparing to actuals, calculating pearson coefficient
        pearson_grid = bmo.calculate_pearson_coefficients(y_preds_grid, y_test_grid)

        """
        # For debugging
        print(f'Model grid:')
        # print each model's tree count and depth
        for row in range(model_grid.shape[0]):
            for col in range(model_grid.shape[1]):
                model = model_grid[row, col]
                print(f'Model at row {row}, col {col}:')
                # print based on whether model is svm or xgb
                if hasattr(model, 'n_estimators'):
                    print(f'n_estimators: {model.get_params()["n_estimators"]}, max_depth: {model.get_params()["max_depth"]}')
                else:    
                    # SVM, print gamma and C
                    print(f'gamma: {model.get_params()["gamma"]}, C: {model.get_params()["C"]}')
        print('Actuals:')
        cdt.pretty_print_np_array_of_dfs(y_test_grid, rows_per_df=6)
        print('Predictions:')
        cdt.pretty_print_np_array_of_dfs(y_preds_grid, rows_per_df=6)
        """

        # find index of best pearson coefficient in the 2d array of pearson coefficients
        best_row, best_col = np.unravel_index(np.argmax(pearson_grid), pearson_grid.shape)
        
        # store best parameters for this fold
        best_params.loc[i] = [axis1_params.values[best_row], axis2_params.values[best_col]]

        plot_shaded_scatter_grids(y_preds_grid, y_test_grid, axis1_params, axis2_params, pearson_grid, plot_title, i)        

    # calculate average best parameters over all folds
    avg_best_param1 = best_params['param1'].mean()
    avg_best_param2 = best_params['param2'].mean()

    return avg_best_param1, avg_best_param2

        

In [9]:
def outer_CV_R(n_outer_splits: int, n_inner_splits: int, X : pd.DataFrame, y : pd.DataFrame, axis1_params: mdo.AxisParams, axis2_params: mdo.AxisParams, train_model_callback, kfold_random_state: int, **kwargs) -> pd.DataFrame:
    kfold = KFold(n_splits=n_outer_splits, shuffle=True, random_state=kfold_random_state)

    kfold_metrics = pd.DataFrame(columns=['Pearson', 'F1 Score', 'Sensitivity', 'Specificity', 'Kappa'])

    # Iterate through each train-test split
    for i, (train_index, test_index) in enumerate(kfold.split(X)):
        # Split the data into train and test sets
        X_train, X_test = X.iloc[train_index], X.iloc[test_index]
        y_train, y_test = y.iloc[train_index], y.iloc[test_index]

        # find average best values using inner fold CV
        best_param1, best_param2 = inner_CV_R(n_inner_splits, X_train, y_train, axis1_params, axis2_params, train_model_callback, kfold_random_state, plot_title=f"Outer Fold {i}", **kwargs)

        # train model with all training data of outer fold using average best parameters
        super_model = train_model_callback(X_train, np.ravel(y_train), **dict(zip([axis1_params.name, axis2_params.name], [best_param1, best_param2])), **kwargs)

        # use trained model to predict test set
        y_pred = pd.DataFrame(super_model.predict(X_test), index=y_test.index, columns=y_test.columns)

        # calculate pearson coefficient
        pearson, _ = pearsonr(np.ravel(y_pred), np.ravel(y_test))

        # classify predictions and actuals as top or not top
        y_pred_top = cdt.classify_top(y_pred, TOP_LINE_THRESH)
        y_test_top = cdt.classify_top(y_test, TOP_LINE_THRESH)

        # calculate classification metrics
        classification_metrics = cdt.classification_metrics(y_pred_top, y_test_top)

        # combine pearson and classification metrics into one dataframe side by side, then add them to kfold_metrics
        pearson_df = pd.DataFrame([pearson], columns=['Pearson'])
        metrics_row = pd.concat([pearson_df, classification_metrics], axis=1)
        kfold_metrics = pd.concat([kfold_metrics, metrics_row], axis=0)        
    
    kfold_metrics.index = range(n_outer_splits)
    return kfold_metrics

## Support Vector Machine

In [None]:
"""# Real values
x_params_SVM_R = mdo.AxisParams('gamma', bmo.power_list(2, -14, -6))
y_params_SVM_R = mdo.AxisParams('C', bmo.power_list(2, -2, 6))
metrics_SVM_R = outer_CV_R(10, 5, X_sc, y_sc, x_params_SVM_R, y_params_SVM_R, bmo.train_SVM_regressor, kfold_random_state=RANDOM_STATE, kernel='rbf')
"""
# Dummy values for quick training tests
x_params_SVM_R = mdo.AxisParams('gamma', bmo.power_list(2, -8, -7))
y_params_SVM_R = mdo.AxisParams('C', bmo.power_list(2, 0, 1))
metrics_SVM_R = outer_CV_R(2, 2, X_sc, y_sc, x_params_SVM_R, y_params_SVM_R, bmo.train_SVM_regressor, kfold_random_state=RANDOM_STATE, kernel='rbf')


In [None]:
# display metrics
display(metrics_SVM_R)

In [None]:
# Print average of each metric
display(metrics_SVM_R.mean())

## XGBoost

In [None]:
"""# proper values
x_params_XGB_R = mdo.AxisParams('n_estimators', [13, 25, 50, 100, 200])
y_params_XGB_R = mdo.AxisParams('max_depth', [1, 2, 3, 4, 6, 10, 16])
# Perform grid search with XGBoost models
metrics_XGB_R = outer_CV_R(10, 5, X_sc, y_sc, x_params_XGB_R, y_params_XGB_R, bmo.train_XGB_regressor, kfold_random_state=RANDOM_STATE, random_state=RANDOM_STATE, objective="reg:squarederror", eval_metric="rmse")
"""
# dummy values
x_params_XGB_R = mdo.AxisParams('n_estimators', [1, 2])
y_params_XGB_R = mdo.AxisParams('max_depth', [1, 2])
metrics_XGB_R = outer_CV_R(2, 2, X_sc, y_sc, x_params_XGB_R, y_params_XGB_R, bmo.train_XGB_regressor, kfold_random_state=RANDOM_STATE, random_state=RANDOM_STATE, objective="reg:squarederror", eval_metric="rmse")


In [None]:
# display metrics
display(metrics_XGB_R)

In [None]:
# Print average of each metric
display(metrics_XGB_R.mean())

# Model B

In [16]:
def inner_CV_B(n_splits: int, X : pd.DataFrame, y : pd.DataFrame, axis1_params: mdo.AxisParams, axis2_params: mdo.AxisParams, train_model_callback, kfold_random_state: int, classification_col : int, plot_title: str = "", **kwargs):
    kfold = KFold(n_splits=n_splits, shuffle=True, random_state=kfold_random_state)

    # arrays to store best parameters for each fold
    best_params = pd.DataFrame(columns=['param1', 'param2'], index=range(n_splits))
    best_thresholds = pd.DataFrame(columns=['threshold'], index=range(n_splits))

    # Iterate through each train-test split
    for i, (train_index, test_index) in enumerate(kfold.split(X)):
        # Split the data into train and test sets
        X_train, X_test = X.iloc[train_index], X.iloc[test_index]
        y_train, y_test = y.iloc[train_index], y.iloc[test_index]
        
        # train model grid
        model_grid = bmo.train_model_grid(X_train, y_train, axis1_params, axis2_params, train_model_callback, **kwargs)

        # use trained models to predict test set classification
        y_binary_preds_grid = bmo.grid_predict(X_test, model_grid)
        
        # also predict probabilities
        y_proba_preds_grid = bmo.grid_predict_proba(X_test, model_grid, classification_col)

        # create grid of actuals to compare against predictions
        y_test_grid = cdt.np_array_of_dfs(y_test, y_proba_preds_grid.shape)

        # evaluate predictions by comparing to actuals, calculating f1 scores
        f1_grid = bmo.calculate_f1_scores(y_binary_preds_grid, y_test_grid)

        # find index of best f1 score in the 2d array of f1 scores
        best_row, best_col = np.unravel_index(np.argmax(f1_grid), f1_grid.shape)
        
        # store best parameters for this fold
        best_params.loc[i] = [axis1_params.values[best_row], axis2_params.values[best_col]]

        # find classification threshold that yields lowest squared difference between sensitivity and specificity using this optimal model
        best_model_y_preds = y_proba_preds_grid[best_row, best_col]
        best_thresholds.iloc[i, 0] = bmo.find_optimal_threshold(y_test, best_model_y_preds)

        plot_shaded_roc_grids(y_proba_preds_grid, y_test_grid, axis1_params, axis2_params, f1_grid, plot_title, i)        

    # calculate average best parameters over all folds
    avg_best_param1 = best_params['param1'].mean()
    avg_best_param2 = best_params['param2'].mean()

    # calculate average best threshold over all folds
    best_threshold = best_thresholds['threshold'].mean()

    return avg_best_param1, avg_best_param2, best_threshold

        

In [17]:
def outer_CV_B(n_outer_splits: int, n_inner_splits: int, X : pd.DataFrame, y : pd.DataFrame, axis1_params: mdo.AxisParams, axis2_params: mdo.AxisParams, train_model_callback, kfold_random_state: int, classification_col : int, **kwargs) -> pd.DataFrame:
    
    kfold = KFold(n_splits=n_outer_splits, shuffle=True, random_state=kfold_random_state)

    kfold_metrics = pd.DataFrame(columns=['Pearson', 'F1 Score', 'Sensitivity', 'Specificity', 'Kappa'])

    # Iterate through each train-test split
    for i, (train_index, test_index) in enumerate(kfold.split(X)):
        # Split the data into train and test sets
        X_train, X_test = X.iloc[train_index], X.iloc[test_index]
        y_train, y_test = y.iloc[train_index], y.iloc[test_index]

        # find average best values using inner fold CV
        best_param1, best_param2, best_threshold = inner_CV_B(n_inner_splits, X_train, y_train, axis1_params, axis2_params, train_model_callback, kfold_random_state, classification_col, plot_title=f"Outer Fold {i}", **kwargs)

        # train model with all training data of outer fold using average best parameters
        super_model = train_model_callback(X_train, np.ravel(y_train), **dict(zip([axis1_params.name, axis2_params.name], [best_param1, best_param2])), **kwargs)

        # use trained model to predict test set
        y_pred = pd.DataFrame(super_model.predict_proba(X_test)[:, classification_col], index=y_test.index, columns=y_test.columns)

        # calculate pearson coefficient
        pearson, _ = pearsonr(np.ravel(y_pred), np.ravel(y_test))
        pearson = float(pearson)  # Convert numpy.float32 to Python float

        # classify predictions as top or not top
        y_pred_top = cdt.classify_top(y_pred, best_threshold)

        # calculate classification metrics
        classification_metrics = cdt.classification_metrics(y_pred_top, y_test)

        # combine pearson and classification metrics into one dataframe side by side, then add them to kfold_metrics
        pearson_df = pd.DataFrame([pearson], columns=['Pearson'])
        metrics_row = pd.concat([pearson_df, classification_metrics], axis=1)
        kfold_metrics = pd.concat([kfold_metrics, metrics_row], axis=0)        
    
    kfold_metrics.index = range(n_outer_splits)
    return kfold_metrics

In [None]:
# Replace range of labels with binary variable representing whether the gene line is top or not
y_sc_binary = cdt.classify_top(y_sc, TOP_LINE_THRESH) # .5, .6. .7
display(y_sc_binary)

## SVM

In [None]:
# Dummy values for tests
x_params_SVM_B = mdo.AxisParams('gamma', bmo.power_list(2, -10, -9))
y_params_SVM_B = mdo.AxisParams('C', bmo.power_list(2, 0, 1))
metrics_SVM_B = outer_CV_B(2, 2, X_sc, y_sc_binary, x_params_SVM_B, y_params_SVM_B, bmo.train_SVM_classifier, kfold_random_state=RANDOM_STATE, kernel='rbf', probability=True, classification_col=1)

"""
# Real values
x_params_SVM_B = mdo.AxisParams('gamma', bmo.power_list(2, -14, -6))
y_params_SVM_B = mdo.AxisParams('C', bmo.power_list(2, -2, 6))
metrics_SVM_B = outer_CV_B(10, 5, X_sc, y_sc_binary, x_params_SVM_B, y_params_SVM_B, bmo.train_SVM_classifier, kfold_random_state=RANDOM_STATE, kernel='rbf', probability=True, classification_col=1)
"""


In [None]:
# display metrics
display(metrics_SVM_B)

In [None]:
# Print average of each metric
display(metrics_SVM_B.mean())

## XGBoost

In [None]:
"""
x_params_XGB_B = mdo.AxisParams('n_estimators', [13, 25, 50, 100, 200])
y_params_XGB_B = mdo.AxisParams('max_depth', [1, 2, 3, 4, 6, 10, 16])
metrics_XGB_B = outer_CV_B(10, 5, X_sc, y_sc_binary, x_params_XGB_B, y_params_XGB_B, bmo.train_XGB_classifier, kfold_random_state=RANDOM_STATE, random_state=RANDOM_STATE, classification_col=1, objective="binary:logistic", eval_metric="logloss")

"""

# dummy values for quick tests
x_params_XGB_B = mdo.AxisParams('n_estimators', [1, 2])
y_params_XGB_B = mdo.AxisParams('max_depth', [1, 2])

metrics_XGB_B = outer_CV_B(2, 2, X_sc, y_sc_binary, x_params_XGB_B, y_params_XGB_B, bmo.train_XGB_classifier, kfold_random_state=RANDOM_STATE, random_state=RANDOM_STATE, classification_col=1, objective="binary:logistic", eval_metric="logloss")


In [None]:
# display metrics
display(metrics_XGB_B)

In [None]:
# Print average of each metric
display(metrics_XGB_B.mean())

# Model RO

In [25]:
def inner_CV_RO(n_splits: int, X : pd.DataFrame, y : pd.DataFrame, axis1_params: mdo.AxisParams, axis2_params: mdo.AxisParams, train_model_callback, kfold_random_state: int, plot_title: str = "", **kwargs):
    kfold = KFold(n_splits=n_splits, shuffle=True, random_state=kfold_random_state)

    # arrays to store best parameters for each fold
    best_params = pd.DataFrame(columns=['param1', 'param2'], index=range(n_splits))
    best_thresholds = pd.DataFrame(columns=['threshold'], index=range(n_splits))

    # Iterate through each train-test split
    for i, (train_index, test_index) in enumerate(kfold.split(X)):
        # Split the data into train and test sets
        X_train, X_test = X.iloc[train_index], X.iloc[test_index]
        y_train, y_test = y.iloc[train_index], y.iloc[test_index]
        
        # train model grid
        model_grid = bmo.train_model_grid(X_train, y_train, axis1_params, axis2_params, train_model_callback, **kwargs)

        # use trained models to predict test set
        y_preds_grid = bmo.grid_predict(X_test, model_grid)

        # create grid of actuals to compare against predictions
        y_test_grid = cdt.np_array_of_dfs(y_test, y_preds_grid.shape)

        # evaluate predictions by comparing to actuals, calculating pearson coefficient
        pearson_grid = bmo.calculate_pearson_coefficients(y_preds_grid, y_test_grid)

        # find index of best pearson coefficient in the 2d array of pearson coefficients
        best_row, best_col = np.unravel_index(np.argmax(pearson_grid), pearson_grid.shape)
        
        # store best parameters for this fold
        best_params.loc[i] = [axis1_params.values[best_row], axis2_params.values[best_col]]

        # find classification threshold that yields lowest squared difference between sensitivity and specificity using this optimal model
        best_model_y_preds = y_preds_grid[best_row, best_col]
        best_thresholds.iloc[i, 0] = bmo.find_optimal_threshold(y_test, best_model_y_preds)

        plot_shaded_scatter_grids(y_preds_grid, y_test_grid, axis1_params, axis2_params, pearson_grid, plot_title, i)        

    # calculate average best parameters over all folds
    avg_best_param1 = best_params['param1'].mean()
    avg_best_param2 = best_params['param2'].mean()

    # calculate average best threshold over all folds
    best_threshold = best_thresholds['threshold'].mean()

    return avg_best_param1, avg_best_param2, best_threshold

        

In [26]:
def outer_CV_RO(n_outer_splits: int, n_inner_splits: int, X : pd.DataFrame, y : pd.DataFrame, axis1_params: mdo.AxisParams, axis2_params: mdo.AxisParams, train_model_callback, kfold_random_state: int, **kwargs) -> pd.DataFrame:
    
    kfold = KFold(n_splits=n_outer_splits, shuffle=True, random_state=kfold_random_state)

    kfold_metrics = pd.DataFrame(columns=['Pearson', 'F1 Score', 'Sensitivity', 'Specificity', 'Kappa'])

    # Iterate through each train-test split
    for i, (train_index, test_index) in enumerate(kfold.split(X)):
        # Split the data into train and test sets
        X_train, X_test = X.iloc[train_index], X.iloc[test_index]
        y_train, y_test = y.iloc[train_index], y.iloc[test_index]

        # find average best values using inner fold CV
        best_param1, best_param2, best_threshold = inner_CV_RO(n_inner_splits, X_train, y_train, axis1_params, axis2_params, train_model_callback, kfold_random_state, plot_title=f"Outer Fold {i}", **kwargs)

        # train model with all training data of outer fold using average best parameters
        super_model = train_model_callback(X_train, np.ravel(y_train), **dict(zip([axis1_params.name, axis2_params.name], [best_param1, best_param2])), **kwargs)

        # use trained model to predict test set
        y_pred = pd.DataFrame(super_model.predict(X_test), index=y_test.index, columns=y_test.columns)

        # calculate pearson coefficient
        pearson, _ = pearsonr(np.ravel(y_pred), np.ravel(y_test))

        # classify predictions and actuals as top or not top
        y_pred_top = cdt.classify_top(y_pred, best_threshold)
        y_test_top = cdt.classify_top(y_test, best_threshold)

        # calculate classification metrics
        classification_metrics = cdt.classification_metrics(y_pred_top, y_test_top)

        # combine pearson and classification metrics into one dataframe side by side, then add them to kfold_metrics
        pearson_df = pd.DataFrame([pearson], columns=['Pearson'])
        metrics_row = pd.concat([pearson_df, classification_metrics], axis=1)
        kfold_metrics = pd.concat([kfold_metrics, metrics_row], axis=0)        
    
    kfold_metrics.index = range(n_outer_splits)
    return kfold_metrics

## SVM

In [None]:
x_params_SVM_RO = mdo.AxisParams('gamma', bmo.power_list(2, -14, -6))
y_params_SVM_RO = mdo.AxisParams('C', bmo.power_list(2, -2, 6))

metrics_SVM_RO = outer_CV_RO(10, 5, X_sc, y_sc, x_params_SVM_RO, y_params_SVM_RO, bmo.train_SVM_regressor, kfold_random_state=RANDOM_STATE, kernel='rbf')

In [None]:
# display metrics
display(metrics_SVM_RO)

In [None]:
# Print average of each metric
display(metrics_SVM_RO.mean())

## XGBoost

In [None]:
x_params_XGB_RO = mdo.AxisParams('n_estimators', [13, 25, 50, 100, 200])
y_params_XGB_RO = mdo.AxisParams('max_depth', [1, 2, 3, 4, 6, 10, 16])
metrics_XGB_RO = outer_CV_RO(10, 5, X_sc, y_sc, x_params_XGB_RO, y_params_XGB_RO, bmo.train_XGB_regressor, kfold_random_state=RANDOM_STATE, random_state=RANDOM_STATE, objective="reg:squarederror", eval_metric="rmse")

In [None]:
# display metrics
display(metrics_XGB_RO)

In [None]:
# Print average of each metric
display(metrics_XGB_RO.mean())