## import libraries

In [None]:
import pandas as pd
import numpy as np
import itertools

from pydeseq2.dds import DeseqDataSet
from pydeseq2.ds import DeseqStats

from sklearn.decomposition import PCA
from sklearn.manifold import TSNE
from sklearn.ensemble import RandomForestClassifier
from sklearn.linear_model import LogisticRegression
from sklearn.preprocessing import label_binarize, LabelEncoder

from sklearn.model_selection import (
    train_test_split, 
    RandomizedSearchCV,
    StratifiedKFold
)

from sklearn.metrics import ( 
    f1_score, 
    precision_score, 
    recall_score, 
    confusion_matrix, 
    precision_recall_curve, 
    average_precision_score, 
    classification_report, 
    log_loss,
    auc
)

from xgboost import XGBClassifier
import xgboost as xgb

import umap.umap_ as umap

import matplotlib.pyplot as plt

## Import data

In [None]:
# Define file paths
FOLDER_PATH = r'path'

FPKM_PATH = F'{FOLDER_PATH}\TCGA-BRCA.htseq_counts.tsv'
UQ_PATH = F'{FOLDER_PATH}\TCGA-BRCA.htseq_fpkm-uq.tsv'

PROBE_MAP_PATH = F'{FOLDER_PATH}\gencode.v22.annotation.gene.probeMap'
PHENOTYPE_PATH = F'{FOLDER_PATH}\TCGA-BRCA.GDC_phenotype.tsv'
METADATA_PATH = F'{FOLDER_PATH}\Lehmann_metadata.csv'

# Load data
expression_data = pd.read_csv(FPKM_PATH, sep='\t')
expression_data_UQ = pd.read_csv(UQ_PATH, sep='\t')

gene_annotation = pd.read_csv(PROBE_MAP_PATH, sep='\t')
phenotype_data = pd.read_csv(PHENOTYPE_PATH, sep='\t')
phenotype_data = phenotype_data.set_index(phenotype_data.columns[0])
lehmann_metadata = pd.read_csv(METADATA_PATH, sep=',', index_col=0)

## data preprocessing - htseq_counts

In [None]:
# Merging data
merged_data = expression_data.merge(gene_annotation, left_on='Ensembl_ID', right_on='id', how='left')
merged_data['Ensembl_ID'] = merged_data['gene'].fillna(merged_data['Ensembl_ID'])
merged_data.drop(columns=['id', 'gene'], inplace=True)
merged_data.set_index('Ensembl_ID', inplace=True)
merged_data = merged_data[~merged_data.index.str.startswith('_')]

# Handle genes with all zero
filtered_merged_data = merged_data[merged_data.iloc[:, :1217].sum(axis=1) != 0]

# Handle duplicate gene names
duplicated_indices = filtered_merged_data.index[filtered_merged_data.index.duplicated(keep=False)]

new_index = []

for idx, row in filtered_merged_data.iterrows():
    if idx in duplicated_indices:
        new_idx = f"{idx}_({row['chrom']}_{row['chromStart']}_{row['chromEnd']}_{row['strand']})"
        new_index.append(new_idx)
    else:
        new_index.append(idx)

filtered_merged_data.index = new_index

# Convert log2 values to raw counts
raw_expression = filtered_merged_data.iloc[:, :1217].applymap(lambda x: 2**x - 1).T

# Add phenotype info
expression_with_samples = raw_expression.merge(phenotype_data[['sample_type.samples']], left_index=True, right_index=True, how='left')
expression_with_samples.index = expression_with_samples.index.str.rstrip('A')

# Add PAM50 data and handle 'Solid Tissue Normal' cases
expression_with_pam50 = expression_with_samples.merge(lehmann_metadata[['PAM50']], left_index=True, right_index=True, how='left')
expression_with_pam50.loc[expression_with_pam50['sample_type.samples'] == 'Solid Tissue Normal', 'PAM50'] = 'Solid Tissue Normal'

# Data Cleaning
filtered_data = expression_with_pam50.dropna(subset=['PAM50']).copy()
zero_expression_genes = filtered_data.sum()[filtered_data.sum() == 0].index
filtered_data.drop(zero_expression_genes, axis=1, inplace=True)

# Extract and transform expression data
expression_values = filtered_data.iloc[:, :-2]
expression_values["PAM50"] = filtered_data["PAM50"]
expression_values.index.name = 'Geneid'
transformed_expression = expression_values.iloc[:, :-1].astype(int)

## DE analysis

In [None]:
def filter_and_extract_top_genes(df):
    """
    Filter genes based on pvalue (padj) < 0.05 & abs(log2FoldChange) > 1.

    Parameters:
    - df (DataFrame): Input dataframe with genes data.

    Returns:
    - tuple: 
        - DataFrame: Filtered genes based on pvalue and log2FoldChange.
        - DataFrame: Number of differentially expressed genes.
    """
    # First, drop rows with NaN values in the 'padj' column
    df = df.dropna(subset=['padj'])
    
    # Filter only DE genes based on p-value (padj) and abs(log2FoldChange)
    filtered_df = df[(df['padj'] < 0.05) & (abs(df['log2FoldChange']) > 1)]
    
    number_of_DE_genes = len(filtered_df.index)
    de_genes_df = pd.DataFrame({"Number_of_DE_genes": [number_of_DE_genes]})
    
    return filtered_df, de_genes_df

In [None]:
def process_gene_counts(top_genes_df, res_df, condition_name):
    """
    Determine overexpressed and underexpressed genes, and calculate related statistics.

    Parameters:
    - top_genes_df (DataFrame): Input dataframe.
    - res_df (DataFrame): Results dataframe from DESeq.
    - condition_name (str): Name of the condition being processed.

    Returns:
    - DataFrame: Dataframe with statistics for specific condition.
    """
    # Filter overexpressed genes.
    overexpressed = top_genes_df[top_genes_df['log2FoldChange'] > 0]
    
    # Filter underexpressed genes.
    underexpressed = top_genes_df[top_genes_df['log2FoldChange'] < 0]
    
    # Count overexpressed genes.
    overexpressed_count = len(overexpressed)
    
    # Count underexpressed genes.
    underexpressed_count = len(underexpressed)
    
    # Calculate the percentage of differentially expressed genes.
    percentage = (de_genes_count_df.iloc[0, 0] / len(res_df)) * 100
    
    relative_overexpressed_percentage = overexpressed_count / de_genes_count_df.iloc[0, 0]
    
    relative_underexpressed_percentage = underexpressed_count / de_genes_count_df.iloc[0, 0]
    
    overexpressed_percentage = relative_overexpressed_percentage * percentage
    underexpressed_percentage = relative_underexpressed_percentage * percentage

    # Output do dict.
    data = {
        'Combination': condition_name,
        'Number_of_DE_genes': de_genes_count_df.iloc[0, 0],
        'Percentage_of_DE_genes': percentage,
        'Overexpressed_Count': overexpressed_count,
        'Underexpressed_Count': underexpressed_count,
        'Overexpressed_Percentage': overexpressed_percentage,
        'Underexpressed_Percentage': underexpressed_percentage
    }

    # Convert to df
    return pd.DataFrame(data, index=[0])

#### Solid Tissue Normal vs PAM50 subtypes

In [None]:
# Create metadata
metadata = pd.DataFrame({'Sample': expression_values.index.astype(str), 'Condition': expression_values["PAM50"]}).set_index("Sample")

In [None]:
# run analysis
dds = DeseqDataSet(counts = transformed_expression, metadata = metadata, design_factors = "Condition")
dds.deseq2()

In [None]:
conditions = ["LumA", "LumB", "Her2", "Basal", "Normal", "Solid Tissue Normal"]
results_list = []
res_dfs_list = []
de_genes_counts_df_list = []

for cond in conditions:
    if cond != "Solid Tissue Normal":
        stat_res = DeseqStats(dds, contrast=("Condition", "Solid Tissue Normal", cond))
        stat_res.summary()
        res_df = stat_res.results_df
        
        res_dfs_list.append(res_df)
        top_genes_df, de_genes_count_df = filter_and_extract_top_genes(res_df)
        results_list.append(top_genes_df)
        
        condition_name = f"Solid Tissue Normal vs {cond}"
        df = process_gene_counts(top_genes_df, res_df, condition_name)
        de_genes_counts_df_list.append(df)

de_genes_counts_df = pd.concat(de_genes_counts_df_list, ignore_index=True)

#### PAM50 subtypes combined

In [None]:
# Selecting data without Solid Tissue Normal samples
expression_values2 = expression_values[expression_values['PAM50'] != 'Solid Tissue Normal']
transformed_expression2 = expression_values2.iloc[:, :-1].T.dropna(inplace=False).T.astype(int)

In [None]:
# Create metadata
metadata2 = pd.DataFrame({'Sample': expression_values2.index.astype(str), 'Condition': expression_values2["PAM50"]}).set_index("Sample")

In [None]:
conditions = ["LumA", "LumB", "Her2", "Basal", "Normal"]
combined_results_list = []
combined_res_dfs_list = []
combined_de_genes_counts_df_list = []

for cond in conditions:
    temp_metadata = metadata2.copy()
    temp_metadata['Condition'] = temp_metadata['Condition'].apply(lambda x: cond if x == cond else 'Not ' + cond)
    
    dds = DeseqDataSet(counts=transformed_expression2, metadata=temp_metadata, design_factors="Condition")
    dds.deseq2()
    
    stat_res = DeseqStats(dds, contrast=("Condition", cond, 'Not ' + cond))
    stat_res.summary()
    res_df = stat_res.results_df
    
    combined_res_dfs_list.append(res_df)
    top_genes_df, de_genes_count_df = filter_and_extract_top_genes(res_df)
    combined_results_list.append(top_genes_df)
    
    condition_name = f"{cond} vs Not {cond}"
    df = process_gene_counts(top_genes_df, res_df, condition_name)
    combined_de_genes_counts_df_list.append(df)

combined_de_genes_counts_df = pd.concat(combined_de_genes_counts_df_list, ignore_index=True)

## preprocessing HTSeq - FPKM-UQ

In [None]:
# Merging data
merged_data_UQ = expression_data_UQ.merge(gene_annotation, left_on='Ensembl_ID', right_on='id', how='left')
merged_data_UQ['Ensembl_ID'] = merged_data_UQ['gene'].fillna(merged_data_UQ['Ensembl_ID'])
merged_data_UQ.drop(columns=['id', 'gene'], inplace=True)
merged_data_UQ.set_index('Ensembl_ID', inplace=True)
merged_data_UQ = merged_data_UQ[~merged_data_UQ.index.str.startswith('_')]

# Handle genes with all zero
filtered_merged_data_UQ = merged_data_UQ[merged_data.iloc[:, :1217].sum(axis=1) != 0]

# Handle duplicate gene names
duplicated_indices_UQ = filtered_merged_data_UQ.index[filtered_merged_data_UQ.index.duplicated(keep=False)]

new_index = []

for idx, row in filtered_merged_data_UQ.iterrows():
    if idx in duplicated_indices_UQ:
        new_idx = f"{idx}_({row['chrom']}_{row['chromStart']}_{row['chromEnd']}_{row['strand']})"
        new_index.append(new_idx)
    else:
        new_index.append(idx)

filtered_merged_data_UQ.index = new_index

filtered_merged_data_UQ2 = filtered_merged_data_UQ.iloc[:, :1217].T

filtered_merged_data_UQ2.index = filtered_merged_data_UQ2.index.str.rstrip('A') # this to find 20% variable set 

filtered_merged_data_UQ3 = filtered_merged_data_UQ2.merge(lehmann_metadata[['PAM50']], left_index=True, right_index=True, how='left')

# Data Cleaning
filtered_data_UQ = filtered_merged_data_UQ3.dropna(subset=['PAM50']).copy()
zero_expression_genes_UQ = filtered_data_UQ.sum()[filtered_data_UQ.sum() == 0].index
filtered_data_UQ.drop(zero_expression_genes_UQ, axis=1, inplace=True)

## Selecting 20% most variable genes

In [None]:
def compute_gene_variance_processed(df, percentage=20):
    """
    Compute variance for each gene from pre-processed data and extract the top percentage of most variable genes.

    Parameters:
    - df (pandas.DataFrame): Sample-wise gene counts that are already processed and log transformed.
    - percentage (float): Percentage of top variable genes to be extracted.

    Returns:
    - DataFrame with the top percentage of most variable genes.
    """
    
    # Compute the variance for each gene using the original dataframe
    variance = df.var(axis=0)
    
    # Sort variances in descending order
    sorted_variance = variance.sort_values(ascending=False)
    
    # Extract top percentage of most variable genes
    top_genes_count = int(len(sorted_variance) * (percentage / 100))
    top_genes = sorted_variance.head(top_genes_count).index
    
    return df[top_genes]

In [None]:
# Compute variance
top_20_percent_genes_df = compute_gene_variance_processed(filtered_merged_data_UQ2, 20)

# add PAM50 classification
top_20_percent_genes_LM = top_20_percent_genes_df.merge(lehmann_metadata[['PAM50']], left_index=True, right_index=True, how='left')

# Data Cleaning
top_20_percent_genes_F = top_20_percent_genes_LM.dropna(subset=['PAM50']).copy()

## Selecting only DE genes and extract these genes from HTSeq - FPKM-UQ dataset

In [None]:
# Combine both lists of DE genes into a single list
all_results = results_list + combined_results_list

# Extract genes from each dataframe and combine
all_genes = []
for df in all_results:
    all_genes.extend(df.index.tolist())

# Get unique genes
unique_genes = set(all_genes)

# Convert the set of unique genes to a DataFrame
unique_genes_df = pd.DataFrame(list(unique_genes), columns=['gene'])

In [None]:
# Change row to columns
mean_expression_UQ_T = filtered_merged_data_UQ2.T

# selection based on colum 'gene'
only_DE_genes = mean_expression_UQ_T[mean_expression_UQ_T.index.isin(unique_genes_df['gene'])].T

# add PAM50 classification
only_DE_genes_LM = only_DE_genes.merge(lehmann_metadata[['PAM50']], left_index=True, right_index=True, how='left')

# Data Cleaning
only_DE_genes_F = only_DE_genes_LM.dropna(subset=['PAM50']).copy()

## Dimension reduction & visualization

In [None]:
def plot_dimensionality_reduction(ax, X_transformed, y, title, explained_var=None):
    """
    Plots 2D dimensionality reduction.
    
    Parameters:
    - ax: The axis on which to plot.
    - X_transformed: 2D array-like data after dimensionality reduction.
    - y: Labels for the data points.
    - title: Title of the plot.
    - explained_var: Explained variance for PCA, default is None for other techniques.
    """

    # Create a color mapping based on classes
    classes = y.unique()
    colors = plt.cm.jet(np.linspace(0, 1, len(classes)))  # Using the 'jet' colormap
    color_map = dict(zip(classes, colors))

    # Map classes to colors
    y_colors = y.map(color_map)

    # Plot 
    ax.scatter(X_transformed[:, 0], X_transformed[:, 1], c=y_colors)
    ax.set_title(title)
    if explained_var is not None:
        ax.set_xlabel(f'PC1 ({explained_var[0]*100:.2f}%)')
        ax.set_ylabel(f'PC2 ({explained_var[1]*100:.2f}%)')
    else:
        ax.set_xlabel('Component 1')
        ax.set_ylabel('Component 2')

    # Create proxy artists for the legend
    proxies = [plt.Line2D([0], [0], linestyle='none', marker='o', markersize=10, markerfacecolor=color_map[class_]) for class_ in classes]

    legend_data = list(zip(proxies, classes))
    ax.legend(*zip(*legend_data), loc='center left', bbox_to_anchor=(1, 0.8))

In [None]:
def plot_PCA(X, y, n_components=2, figsize=(8, 5)):
    """
    Apply PCA on data X and plot the results.

    Parameters:
    - X: array-like or pd.DataFrame, shape (n_samples, n_features)
    - y: array-like, shape (n_samples,)
    - n_components: int, default=2
        Number of components for PCA.
    - figsize: tuple, default=(8, 5)
    
    Returns:
    - None. Displays a plot.
    """

    # Create PCA instance and transform data
    pca = PCA(n_components=n_components)
    X_pca = pca.fit_transform(X)

    # Create a new figure and axis
    fig, ax = plt.subplots(figsize=figsize)

    # function to plot on this axis
    plot_dimensionality_reduction(ax, X_pca, y, 'PCA', explained_var=pca.explained_variance_ratio_)

    # Display the plot
    plt.show()

In [None]:
def plot_PCA_variance(X, n_components=50, figsize=(8, 5)):
    """
    Apply PCA on data X and plot the explained variance and cumulative explained variance for each component (represented by Y axis).

    Parameters:
    - X: array-like or pd.DataFrame, shape (n_samples, n_features)
    - n_components: int, default=50
        Number of components for PCA.
    - figsize: tuple, default=(8, 5)
        Size of the figure.
    
    Returns:
    - None. Displays a plot.
    """
    
    # Fit PCA 
    pca_full = PCA(n_components=n_components)
    pca_full.fit_transform(X)

    # Get the explained variance for each component
    explained_variance = pca_full.explained_variance_ratio_

    # Calculate the cumulative explained variance
    cumulative_variance = np.cumsum(explained_variance)

    # Plot explained variance for each component
    plt.figure(figsize=figsize)
    plt.bar(range(1, len(explained_variance) + 1), explained_variance, alpha=0.7, align='center', label="Explained Variance")
    plt.plot(range(1, len(explained_variance) + 1), cumulative_variance, '-o', label="Cumulative Explained Variance")
    plt.xlabel('Number of components')
    plt.ylabel('Variance (%)')
    plt.title('PCA Explained Variance')
    plt.legend(loc='upper left')
    plt.tight_layout()
    plt.show()

In [None]:
def plot_TSNE(X, y, init='random', figsize=(20, 20)):
    """
    Apply t-SNE on data X with various perplexity and learning rate and plot the results.

    Parameters:
    - X: array-like or pd.DataFrame, shape (n_samples, n_features)
    - y: array-like, shape (n_samples,)
    - init: str, default='random'. Can be 'pca' 
    - figsize: tuple, default=(20, 20)

    Returns:
    - None. Displays a plot.
    """
    
    # List of perplexity values
    perplexity_values = [5, 30, 50, 100]
    
    # List of learning rates
    learning_rates = [10, 100, 500, 1000]

    # Setting up the figure for subplots
    fig, axes = plt.subplots(len(perplexity_values), len(learning_rates), figsize=figsize)

    # Iterating over different perplexity and learning rate and plot the results
    for perplexity_row, perplexity in enumerate(perplexity_values):
        for lr_col, lr in enumerate(learning_rates):
            X_tsne = TSNE(n_components=2, init=init, perplexity=perplexity, learning_rate=lr).fit_transform(X)
            plot_dimensionality_reduction(axes[perplexity_row, lr_col], X_tsne, y, f"t-SNE\nPerplexity: {perplexity}\nLR: {lr}")

    plt.tight_layout()
    plt.show()

In [None]:
def plot_UMAP(X, y, figsize=(20, 20)):
    """
    Apply UMAP on data X with various n_neighbors and min_dist and plot the results.

    Parameters:
    - X: array-like or pd.DataFrame, shape (n_samples, n_features)
    - y: array-like, shape (n_samples,)
    - figsize: tuple, default=(20, 20)
    
    Returns:
    - None. Displays a plot.
    """
    
    # List of n_neighbors values
    n_neighbors_values = [5, 10, 50, 100]
    
    # List of min_dist values
    min_dist_values = [0.01, 0.1, 0.5, 1.0]

    # Setting up the figure for subplots
    fig, axes = plt.subplots(len(n_neighbors_values), len(min_dist_values), figsize=figsize)

    # Iterating over different n_neighbors and min_dist values and plot the results
    for n_neighbors_row, n_neighbors in enumerate(n_neighbors_values):
        for min_dist_col, min_dist in enumerate(min_dist_values):
            reducer = umap.UMAP(n_neighbors=n_neighbors, min_dist=min_dist)
            X_umap = reducer.fit_transform(X)
            plot_dimensionality_reduction(axes[n_neighbors_row, min_dist_col], X_umap, y, f"UMAP\nNeighbors: {n_neighbors}\nMin Dist: {min_dist}")

    plt.tight_layout()
    plt.show()

dimension reduction & visualization for ALL data

In [None]:
# Separate the features and the classes
X = filtered_data_UQ.drop('PAM50', axis=1)
y = filtered_data_UQ['PAM50']

In [None]:
# PCA
plot_PCA(X, y)

# PCA explained variance
plot_PCA_variance(X)

# T-SNE
plot_TSNE(X, y, init='random')

# T-SNE + PCA
plot_TSNE(X, y, init='pca')

# UMAP
plot_UMAP(X, y)

dimension reduction & visualization for 20% subset

In [None]:
# Separate the features and the classes
X = top_20_percent_genes_F.drop('PAM50', axis=1)
y = top_20_percent_genes_F['PAM50']

In [None]:
# PCA
plot_PCA(X, y)

# PCA explained variance
plot_PCA_variance(X)

# T-SNE
plot_TSNE(X, y, init='random')

# T-SNE + PCA
plot_TSNE(X, y, init='pca')

# UMAP
plot_UMAP(X, y)

Dimension reduction & visualization for DE genes

In [None]:
# Separate the features and the classes
X = only_DE_genes_F.drop('PAM50', axis=1)
y = only_DE_genes_F['PAM50']

In [None]:
# PCA
plot_PCA(X, y)

# PCA explained variance
plot_PCA_variance(X)

# T-SNE
plot_TSNE(X, y, init='random')

# T-SNE + PCA
plot_TSNE(X, y, init='pca')

# UMAP
plot_UMAP(X, y)

## Classification models

In [None]:
def data_splitting(df, target_column):
    """
    Split dataframe into training (60%), validation (20%) and testing (blind) (20%) sets.

    This function first separates the features (X) and target (y) from the dataframe.
    It then encodes the target labels and splits the data into a 60-20-20 for
    training, validation and testing sets.

    Parameters:
    - df (DataFrame): Dataframe to split.
    - target_column (str): The name of the column to be used as the target.

    Returns:
    - tuple: Contains training, validation, and testing datasets for features and targets
             (X_train, X_val, X_test and y_train_encoded, y_val_encoded, y_test_encoded)
             as well as the original target values (y).
    """

    # Separate the dataframe into features (X) and target (y)
    X = df.drop(target_column, axis=1)
    y = df[target_column]

    # Encode the target column using LabelEncoder
    le = LabelEncoder()
    y_encoded = le.fit_transform(y)

    # Split the data into 60% training and 40% temporary set.
    # Stratified sampling.
    X_train, X_temp, y_train_encoded, y_temp_encoded = train_test_split(
        X, y_encoded, test_size=0.4, random_state=42, stratify=y_encoded
    )

    # Split the temporary data into 50% validation and 50% test,
    # achieving a 20-20 split from the original dataset.
    X_val, X_test, y_val_encoded, y_test_encoded = train_test_split(
        X_temp, y_temp_encoded, test_size=0.5, random_state=42, stratify=y_temp_encoded
    )

    return X_train, X_val, X_test, y_train_encoded, y_val_encoded, y_test_encoded, y

In [None]:
def train_and_evaluate(X_train, y_train_encoded, classifiers, param_grid, X_val, y_val_encoded, classes):
    """
    Train multiple classifiers using predefined parameters, training and validation set. 

    Parameters:
    - X_train (DataFrame): Training data features.
    - y_train_encoded (Series): Encoded training data labels.
    - classifiers (dict): Dictionary of classifier names and their corresponding instances.
    - param_grid (dict): Dictionary containing parameter grid for hyperparameter tuning.
    - X_val (DataFrame): Validation data features.
    - y_val_encoded (Series): Encoded validation data labels.
    - classes (list): List of class labels.

    Returns:
    - dict: Best model (hyperparameters) for each classifier.
    - dict: Confusion matrices for each classifier.
    - dict: Classification reports for each classifier.
    - DataFrame: DataFrame with average metrics (precision, recall, f1-score) for each classifier.
    """

    best_models = {}
    confusion_matrices = {}
    class_reports = {}
    avg_metrics = []

    # Iterate over classifiers
    for name, classifier in classifiers.items():

        # Handle XGBoost
        if name == "XGBoost":
            # Create the XGBoost classifier
            model = xgb.XGBClassifier(eval_metric="mlogloss", # for multi-class classif.
                                      num_class=len(classes),
                                      n_estimators=1000)

            # Use RandomizedSearchCV 
            random_search = RandomizedSearchCV(model,
                                               param_distributions=param_grid[name],
                                               n_iter=25,
                                               scoring='f1_weighted',
                                               n_jobs=-1,
                                               cv=3,
                                               verbose=0,
                                               random_state=10)

            # Define the early stopping
            fit_params={"early_stopping_rounds":50,
                        "eval_set":[(X_val, y_val_encoded)],
                        "verbose": False}

            # Fit the RandomizedSearchCV with early stopping
            random_search.fit(X_train, y_train_encoded, **fit_params)

            best_models[name] = random_search.best_estimator_

        else:
            # Use RandomizedSearchCV for other models
            random_search = RandomizedSearchCV(classifier,
                                               param_distributions=param_grid[name],
                                               n_iter=25,
                                               scoring='f1_weighted',
                                               n_jobs=-1,
                                               cv=3,
                                               verbose=0,
                                               random_state=10)
            random_search.fit(X_train, y_train_encoded)
            best_models[name] = random_search.best_estimator_

        # Evaluate training on validation set
        if name == "XGBoost":
            y_pred_proba = best_models[name].predict_proba(X_val)
            y_pred = np.argmax(y_pred_proba, axis=1)
        else:
            y_pred = best_models[name].predict(X_val)

        # Confusion matrices
        confusion_matrices[name] = confusion_matrix(y_val_encoded, y_pred)
        class_reports[name] = classification_report(y_val_encoded, y_pred, target_names=classes, output_dict=True)
        
        # weighted average mmetrics
        avg_metrics.append({
            'model': name,
            'precision': class_reports[name]['weighted avg']['precision'],
            'recall': class_reports[name]['weighted avg']['recall'],
            'f1-score': class_reports[name]['weighted avg']['f1-score']
        })

    # Convert confusion_matrices into DataFrames
    for name, matrix in confusion_matrices.items():
        confusion_matrices[name] = pd.DataFrame(matrix,
                                                index=classes,
                                                columns=classes)

    # Convert class_reports into DataFrames
    for name, report in class_reports.items():
        class_reports[name] = pd.DataFrame(report).transpose()
        
        # Keep only precission, recall, f1-score
        class_reports[name] = class_reports[name][['precision', 'recall', 'f1-score']]
        class_reports[name] = class_reports[name].drop(['accuracy', 'macro avg', 'weighted avg'])

    avg_metrics_df = pd.DataFrame(avg_metrics).sort_values(by='f1-score', ascending=False)

    return best_models, confusion_matrices, class_reports, avg_metrics_df

In [None]:
def plot_precision_recall(X_val, y_val_encoded, best_models, y):
    """
    Plot the Precision-Recall (PR) curve with AUC for each classifier for the best_models.

    Parameters:
    - X_val (DataFrame): Validation data features.
    - y_val_encoded (Series): Encoded validation data labels.
    - best_models (dict): Dictionary of classifier names and their best-trained models.
    - y (Series): Original series containing the labels for the entire dataset.

    Returns:
    - None. The function directly plots the Precision-Recall curves with AUC for each classifier.
    """

    # Extract unique class labels
    classes = y.unique().tolist()

    # Convert the encoded validation labels to binary format
    y_bin = label_binarize(y_val_encoded, classes=range(len(classes)))

    # Iterate over the classifiers in the dictionary of best models
    for name, best_clf in best_models.items():
        
        #  Probability scores based on classifier 
        if name == "XGBoost":
            val_scores = best_clf.predict_proba(X_val)  
        elif hasattr(best_clf, "decision_function"):
            val_scores = best_clf.decision_function(X_val)
        else:
            val_scores = best_clf.predict_proba(X_val)

        precision = dict()
        recall = dict()
        average_precision = dict()
        pr_auc = dict()

        # For each class, compute Precision, Recall, and AUC values
        for i, class_ in enumerate(classes):
            precision[i], recall[i], _ = precision_recall_curve(y_bin[:, i], val_scores[:, i])
            average_precision[i] = average_precision_score(y_bin[:, i], val_scores[:, i])
            pr_auc[i] = auc(recall[i], precision[i])

        # Plot PR curves for each class with AUC
        plt.figure(figsize=(10, 7))
        for i, class_ in enumerate(classes):
            plt.plot(recall[i], precision[i], lw=2, 
                     label=f"PR curve of class {class_} (AUC = {pr_auc[i]:.3f})")
        plt.xlabel("Recall")
        plt.ylabel("Precision")
        plt.title(f"Precision-Recall Curve with AUC - {name}")
        plt.grid(True)
        plt.legend(loc="upper left", bbox_to_anchor=(1,1))
        plt.show()

In [None]:
def test_models(X_test, y_test_encoded, best_models, classes):
    """Test the trained models on test (blind) data.
    
    Parameters:
    - X_test (DataFrame): Test data features.
    - y_test_encoded (array): Encoded labels for test data.
    - best_models (dict): Dictionary containing trained models.
    - classes (list): Names of the classes.
    
    Returns:
    - test_results_df (DataFrame): Testing results for each model.
    - confusion_matrices (dict): Confusion matrices for each model.
    - class_reports (dict): Detailed classification report for each model.
    """
    
    test_results = []         
    confusion_matrices = {}  
    class_reports = {}      

    # Iterate through each model in the best_models
    for name, best_clf in best_models.items():
        
        # If model is XGBoost, use predict_proba method for predictions
        if name == "XGBoost":
            y_pred_proba = best_clf.predict_proba(X_test)
            test_preds = np.argmax(y_pred_proba, axis=1)
        else:
            test_preds = best_clf.predict(X_test)

        # Calculate weighted precision, recall, and F1 score
        test_precision = precision_score(y_test_encoded, test_preds, average='weighted', zero_division=1)
        test_recall = recall_score(y_test_encoded, test_preds, average='weighted')
        test_f1 = f1_score(y_test_encoded, test_preds, average='weighted')
        
        # Produce classification report
        report = classification_report(y_test_encoded, test_preds, target_names=classes, zero_division=1, output_dict=True)
        report_df = pd.DataFrame(report).transpose()
        
        # Include only precision, recall, F1-score
        report_df = report_df[['precision', 'recall', 'f1-score']]
        report_df = report_df.drop(['accuracy', 'macro avg', 'weighted avg'], errors='ignore')
        
        # Produce confusion matrix 
        cm = confusion_matrix(y_test_encoded, test_preds)
        confusion_df = pd.DataFrame(cm, index=classes, columns=classes)
        
        # Store metrics for this model in the test_results list
        test_results.append({
            'Model': name,
            'Test Precision': test_precision,
            'Test Recall': test_recall,
            'Test F1 Score': test_f1
        })

        # Update confusion_matrices and class_reports dictionaries
        confusion_matrices[name] = confusion_df
        class_reports[name] = report_df

    # Convert test results into a DataFrame
    test_results_df = pd.DataFrame(test_results)
    
    return test_results_df, confusion_matrices, class_reports

In [None]:
def feature_importance_evaluation(X_val, y_val_encoded, best_models, X_train, y_train_encoded):
    """Directly evaluate feature importances for each model and each class.

    Parameters:
    - X_val (DataFrame): Validation data features.
    - y_val_encoded (array): Encoded labels for validation data.
    - best_models (dict): Dictionary containing trained models.
    - X_train (DataFrame): Training data features.
    - y_train_encoded (array): Encoded labels for training data.

    Returns:
    - feature_importances (dict): Feature importances (first 50 featuers) for each model and each class.
    """
    
     # Feature list in the training data
    genes = X_train.columns 
    
     # Feature importance dict
    feature_importances = {}

    #  Unique class labels
    unique_classes = np.unique(y_val_encoded)

    # Iterate through each model in the best_models
    for name, best_clf in best_models.items():
        if name == "XGBoost":
            for class_index in unique_classes:
                binary_y_train = (y_train_encoded == class_index).astype(int)
                best_params = best_clf.get_params() 
                model = XGBClassifier(**best_params)
                model.fit(X_train, binary_y_train)
                importances = model.feature_importances_
                sorted_idx = importances.argsort()[::-1]
                top_50_idx = sorted_idx[:50]

                importance_df = pd.DataFrame({
                    "feature": genes[top_50_idx],
                    "importance": importances[top_50_idx]
                })
                feature_importances[name + f"_Class_{class_index}"] = importance_df

        elif name == "RandomForest":
            for class_index in unique_classes:
                binary_y_train = (y_train_encoded == class_index).astype(int)
                best_params = best_clf.get_params()
                model = RandomForestClassifier(**best_params)
                model.fit(X_train, binary_y_train)
                importances = model.feature_importances_
                sorted_idx = importances.argsort()[::-1]
                top_50_idx = sorted_idx[:50]

                importance_df = pd.DataFrame({
                    "feature": genes[top_50_idx],
                    "importance": importances[top_50_idx]
                })
                feature_importances[name + f"_Class_{class_index}"] = importance_df

        elif name == "LogisticRegression":
            for class_index in unique_classes:
                importances = np.abs(best_clf.coef_[class_index])
                sorted_idx = importances.argsort()[::-1]
                top_50_idx = sorted_idx[:50]

                importance_df = pd.DataFrame({
                    "feature": genes[top_50_idx],
                    "importance": importances[top_50_idx]
                })
                feature_importances[name + f"_Class_{class_index}"] = importance_df
                
    return feature_importances

In [None]:
# Models
classifiers = {
    "RandomForest": RandomForestClassifier(class_weight='balanced'),
    "XGBoost": xgb.XGBClassifier(objective='multi:softprob'),
    "LogisticRegression": LogisticRegression(class_weight='balanced', max_iter=1000, multi_class='ovr', solver='saga')
}

# Hyperparameters
param_dist = {
    "RandomForest": {
        'n_estimators': [10, 50, 100, 200, 500],
        'max_features': ['auto', 'sqrt', 'log2'],
        'max_depth': [None, 10, 20, 30, 40, 50],
        'min_samples_split': [2, 5, 10],
        'min_samples_leaf': [1, 2, 4]
    },
    "XGBoost": {
        'learning_rate': [0.01, 0.05, 0.1],
        'max_depth': [3, 4, 5, 6],
        'min_child_weight': [1, 5, 10],
        'gamma': [0.5, 1, 1.5, 2],
        'subsample': [0.6, 0.8, 1.0],
        'colsample_bytree': [0.6, 0.8, 1.0],
    },
    "LogisticRegression": {
        'C': [0.001, 0.01, 0.1, 1, 10, 100],
        'penalty': ['l1', 'l2'], 
        'solver': ['saga']  
    }
}

Models - all data

In [None]:
# data split
X_train, X_val, X_test, y_train_encoded, y_val_encoded, y_test_encoded, y = data_splitting(filtered_data_UQ, 'PAM50')

In [None]:
# train models + metrics
best_models, confusion_matrices, class_reports, avg_metrics_df = train_and_evaluate(X_train, y_train_encoded, classifiers, param_dist, X_val, y_val_encoded, y.unique().tolist())

In [None]:
# plot PR curve
plot_precision_recall(X_val, y_val_encoded, best_models, y)

In [None]:
# Test performances on blind data
test_results_df, test_confusion_matrices, test_class_reports = test_models(X_test, y_test_encoded, best_models, y.unique().tolist())

In [None]:
# Feature importances (50 genes printed)
feature_importances = feature_importance_evaluation(X_val, y_val_encoded, best_models, X_train, y_train_encoded)

Models - DE set

In [None]:
# data split
X_train, X_val, X_test, y_train_encoded, y_val_encoded, y_test_encoded, y = data_splitting(only_DE_genes_F, 'PAM50')

In [None]:
# train models + metrics
best_models, confusion_matrices, class_reports, avg_metrics_df = train_and_evaluate(X_train, y_train_encoded, classifiers, param_dist, X_val, y_val_encoded, y.unique().tolist())

In [None]:
# plot PR curve
plot_precision_recall(X_val, y_val_encoded, best_models, y)

In [None]:
# Test performances on blind data
test_results_df, test_confusion_matrices, test_class_reports = test_models(X_test, y_test_encoded, best_models, y.unique().tolist())

In [None]:
# Feature importances (50 genes printed)
feature_importances = feature_importance_evaluation(X_val, y_val_encoded, best_models, X_train, y_train_encoded)