In [None]:
#imports
import os
import numpy as np
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, f1_score, roc_auc_score
from tqdm import tqdm
from sklearn.preprocessing import LabelEncoder
import plotly.io as pio

from helpers.datasetHelper import get_samples, split_data
from imblearn.over_sampling import SMOTE
from sklearn.metrics import cohen_kappa_score

from helpers.ploting import display_confusion_matrix_pink_variants
from helpers.metaheuristics import run_pso_with_progress, run_ga_with_progress
from models import MyXGboost
import numpy as np
from sklearn.metrics import recall_score, precision_score

# Update the import to match the actual function name in hiper_params_search.py
from hiper_params_search import get_best_xgboost

# Set the default renderer to 'browser' to ensure plots open in the browser
pio.renderers.default = 'browser'

In [None]:
directory_path_combined = './datasets/GEO'
GSE32396_HEALTHY_BRCA = get_samples(os.path.join(directory_path_combined, 'GSE32396-HEALTHYxBRCA.csv'))
GSE41037_HEALTHY = get_samples(os.path.join(directory_path_combined, 'GSE41037_HEALTHY.csv'))
GSE57285_HEALTHY_BRCA = get_samples(os.path.join(directory_path_combined, 'GSE57285-HEALTHYxBRCA.csv'))
GSE58045_HEALTHY_UNK = get_samples(os.path.join(directory_path_combined, 'GSE58045-HEALTHY-UNK.csv'))
GSE32396_HEALTHY_BRCA = get_samples(os.path.join(directory_path_combined, 'GSE32396-HEALTHYxBRCA.csv'))
GSE58119_HEALTHY_PREBRCA = get_samples(os.path.join(directory_path_combined, 'GSE58119-HEALTHYxPRE-BRCA.csv'))

datasets_list = [
    GSE32396_HEALTHY_BRCA,
    GSE41037_HEALTHY,
    GSE57285_HEALTHY_BRCA,
    GSE58045_HEALTHY_UNK,
    GSE58119_HEALTHY_PREBRCA
]

tag_counts = {}
all_instances = []

for idx, dataset in enumerate(datasets_list):
    # Ignore the first column for all rows except the header
    tags = [row[1:] if i == 0 else row[-1] for i, row in enumerate(dataset)]
    # For header row, skip (do not count as tag)
    tags = [tag for tag in tags if not isinstance(tag, (np.ndarray, list))]
    unique_tags, counts = np.unique(tags, return_counts=True)
    print(f"Dataset {idx+1}:")
    for tag, count in zip(unique_tags, counts):
        print(f"  {tag}: {count}")
        tag_counts[tag] = tag_counts.get(tag, 0) + count
    print(f"  Total: {len(tags)}")
    # Add all rows except header, and ignore first column
    all_instances.extend([row[1:] for row in dataset[1:]])

print("\nConcatenated dataset:")
all_tags = [row[-1] for row in all_instances]
unique_tags_all, counts_all = np.unique(all_tags, return_counts=True)
for tag, count in zip(unique_tags_all, counts_all):
    print(f"  {tag}: {count}")
print(f"  Total: {len(all_tags)}")

healthy_mt_cases, healthy_wt_cases, healthy_unk_cases, prebrca_cases, brca_mt_cases, brca_wt_cases = split_data(all_instances)

In [None]:
from sklearn.model_selection import StratifiedKFold

def evaluate_model(patition_name, selector, X_val, y_val, label_encoder):
    skf = StratifiedKFold(n_splits=5, shuffle=True, random_state=42)
    accuracy_vals, f1_vals, roc_auc_vals, sensitivity_vals, specificity_vals, precision_vals, kappa_vals = [], [], [], [], [], [], []

    for train_idx, test_idx in skf.split(X_val, y_val):
        X_fold, y_fold = X_val.iloc[test_idx], y_val[test_idx]
        y_pred_fold = selector.predict(X_fold)
        y_pred_proba_fold = selector.predict_proba(X_fold)

        accuracy_vals.append(accuracy_score(y_fold, y_pred_fold))
        f1_vals.append(f1_score(y_fold, y_pred_fold, average='weighted'))
        sensitivity_vals.append(recall_score(y_fold, y_pred_fold, average='macro'))

        specificities_fold = []
        for class_idx in range(len(np.unique(y_val))):
            true_neg = np.sum((y_fold != class_idx) & (y_pred_fold != class_idx))
            total_neg = np.sum(y_fold != class_idx)
            specificities_fold.append(true_neg / total_neg if total_neg > 0 else 0)
        specificity_vals.append(np.mean(specificities_fold))
        precision_vals.append(precision_score(y_fold, y_pred_fold, average='weighted'))

        if y_pred_proba_fold.shape[1] == 2:
            roc_auc_vals.append(roc_auc_score(y_fold, y_pred_proba_fold[:, 1]))
        else:
            roc_auc_vals.append(roc_auc_score(y_fold, y_pred_proba_fold, multi_class='ovr'))

        kappa_vals.append(cohen_kappa_score(y_fold, y_pred_fold))

    accuracy_val = np.mean(accuracy_vals)
    f1_val = np.mean(f1_vals)
    roc_auc_val = np.mean(roc_auc_vals)
    sensitivity_val = np.mean(sensitivity_vals)
    specificity_val = np.mean(specificity_vals)
    precision_val = np.mean(precision_vals)
    kappa_val = np.mean(kappa_vals)

    print(f"{patition_name} Accuracy (cv = 5): {accuracy_val:.4f}")
    print(f"{patition_name} F1 Score (cv = 5): {f1_val:.4f}")
    print(f"{patition_name} ROC AUC (cv = 5): {roc_auc_val:.4f}")
    print(f"{patition_name} Sensitivity (cv = 5) : {sensitivity_val:.4f}")
    print(f"{patition_name} Specificity (cv = 5): {specificity_val:.4f}")
    print(f"{patition_name} Precision (cv = 5): {precision_val:.4f}")
    print(f"{patition_name} Kappa index (cv = 5): {kappa_val:.4f}")

    with open("evaluation_results.txt", "w") as f:
        f.write(f"{patition_name} Accuracy (cv = 5): {accuracy_val:.4f}\n")
        f.write(f"{patition_name} F1 Score (cv = 5): {f1_val:.4f}\n")
        f.write(f"{patition_name} ROC AUC (cv = 5): {roc_auc_val:.4f}\n")
        f.write(f"{patition_name} Sensitivity (cv = 5) : {sensitivity_val:.4f}\n")
        f.write(f"{patition_name} Specificity (cv = 5): {specificity_val:.4f}\n")
        f.write(f"{patition_name} Precision (cv = 5): {precision_val:.4f}\n")
        f.write(f"{patition_name} Kappa index (cv = 5): {kappa_val:.4f}\n")

    display_confusion_matrix_pink_variants(selector, X_val, y_val, label_encoder.classes_)

In [None]:
def process_dataframes(X, Y):
    n_features = X.shape[1]
    print(f"Loaded dataset with {n_features} features and {len(Y)} samples")

    # Use DecisionTreeClassifier as the estimator
    estimator = MyXGboost.DecisionTreeMultiClass()

    # Run PSO
    best_weights, best_fitness, progress, X_selected = run_pso_with_progress(
        X, Y, estimator, n_features,
        swarmsize=30,
        maxiter=10,
        threshold=0.7
    )

    X_selected_pso = X.iloc[:, X_selected]

    best_weights_ga, best_fitness_ga, progress_ga, X_selected_proc = run_ga_with_progress(
    X, Y, estimator, X.shape[1], 
    pop_size=25, n_generations=10, threshold=0.8
    )

    X_selected_ga = X.iloc[:, X_selected_proc]

    # Use LabelEncoder to encode the target classes
    label_encoder = LabelEncoder()
    label_encoder.fit(Y)
    Y_encoded = label_encoder.transform(Y)
    print("Label indices and names:")
    for idx, name in enumerate(label_encoder.classes_):
        print(f"{idx}: {name}")

    print(f"Encoded target classes: {label_encoder.classes_}")

    # 1) evaluate with all features
    X_train_all, X_test_all, y_train_all, y_test_all = train_test_split(
        X, Y_encoded, test_size=0.3, random_state=42
    )    
    # Split X_test_all and y_test_all into validation and test sets (15% each)
    X_val_nfs, X_test_nfs, y_val_nfs, y_test_nfs = train_test_split(
        X_test_all, y_test_all, test_size=0.5, random_state=42
    )

    # 2) evaluate with selected features
    X_train_ga, X_test_ga, y_train_ga, y_test_ga = train_test_split(
        X_selected_ga, Y_encoded, test_size=0.3, random_state=42
    )
    # Split X_test_all and y_test_all into validation and test sets (15% each)
    X_val_ga, X_test_ga, y_val_ga, y_test_ga = train_test_split(
        X_test_ga, y_test_ga, test_size=0.5, random_state=42
    )

    # 3) evaluate with PSO selected features
    X_train_pso, X_test_pso, y_train_pso, y_test_pso = train_test_split(
        X_selected_pso, Y_encoded, test_size=0.3, random_state=42
    )
    # Split X_test_all and y_test_all into validation and test sets (15% each)
    X_val_pso, X_test_pso, y_val_pso, y_test_pso = train_test_split(
        X_test_pso, y_test_pso, test_size=0.5, random_state=42
    )

    # Apply SMOTE to balance the training instances - ALL
    smote = SMOTE(sampling_strategy='auto', random_state=None, k_neighbors=10)
    X_train_nfs, y_train_nfs = smote.fit_resample(X_train_nfs, y_train_nfs)

    # Apply SMOTE to balance the training instances - GA
    smote = SMOTE(sampling_strategy='auto', random_state=None, k_neighbors=10)
    X_train_ga, y_train_ga = smote.fit_resample(X_train_ga, y_train_ga)

    # Apply SMOTE to balance the training instances - PSO
    smote = SMOTE(sampling_strategy='auto', random_state=None, k_neighbors=10)
    X_train_pso, y_train_pso = smote.fit_resample(X_train_pso, y_train_pso)

    return [('GA', X_train_ga, X_test_ga, X_val_ga, y_train_ga, y_test_ga, y_val_ga), 
                   ('PSO', X_train_pso, X_test_pso, X_val_pso, y_train_pso, y_test_pso, y_val_pso),
                   ('ALL', X_train_nfs, X_test_nfs, X_val_nfs, y_train_nfs, y_test_nfs, y_val_nfs)]

In [None]:
healthy_mt_cases = pd.DataFrame(healthy_mt_cases)
healthy_wt_cases = pd.DataFrame(healthy_wt_cases)
healthy_unk_cases = pd.DataFrame(healthy_unk_cases)

prebrca_cases = pd.DataFrame(prebrca_cases)

brca_mt_cases = pd.DataFrame(brca_mt_cases)
brca_wt_cases = pd.DataFrame(brca_wt_cases)

healthy_consolidated_cases = pd.concat([healthy_mt_cases, healthy_wt_cases, healthy_unk_cases], ignore_index=True)
healthy_consolidated_cases['Tag'] = 'HEALTHY'
brca_consolidated_cases = pd.concat([brca_mt_cases, brca_wt_cases], ignore_index=True)
brca_consolidated_cases['Tag'] = 'BRCA'


healthy_mt_cases['Tag'] = 'HEALTHY-MT'
healthy_wt_cases['Tag'] = 'HEALTHY-WT'
healthy_unk_cases['Tag'] = 'HEALTHY-UNK'

prebrca_cases['Tag'] = 'PRE-BRCA'

brca_mt_cases['Tag'] = 'BRCA-MUT'
brca_wt_cases['Tag'] = 'BRCA-WT'

brca_consolidated_cases['Tag'] = 'BRCA'

print(f"Healthy MT cases shape: {healthy_mt_cases.shape}")
print(f"Healthy WT cases shape: {healthy_wt_cases.shape}")
print(f"Healthy consolidated cases shape: {healthy_consolidated_cases.shape}")
print("--------------------------------------------------")
print(f"PRE-BRCA cases shape: {prebrca_cases.shape}")
print("--------------------------------------------------")
print(f"BRCA MT cases shape: {brca_mt_cases.shape}")
print(f"BRCA WT cases shape: {brca_wt_cases.shape}")    
print(f"BRCA consolidated cases shape: {brca_consolidated_cases.shape}")

In [None]:
df_cancer = pd.concat([healthy_unk_cases, brca_consolidated_cases], ignore_index=True) #blood samples
X = df_cancer.iloc[:, :-1].apply(pd.to_numeric, errors='coerce')
Y = df_cancer.iloc[:, -1]

# Fill missing values with the lowest value of its cpg site
label_encoder = LabelEncoder()
label_encoder.fit(Y)
Y_encoded = label_encoder.transform(Y)

print("Label indices and names:")
for idx, name in enumerate(label_encoder.classes_):
    print(f"{idx}: {name}")

X_train, X_test_all, y_train_all, y_test_all = train_test_split(
    X, Y_encoded, test_size=0.40, random_state=42
)

# Split X_test_all and y_test_all into validation and test sets (15% each)
X_val_nfs, X_test, y_val_nfs, y_test_nfs = train_test_split(
    X_test_all, y_test_all, test_size=0.5, random_state=42
)

# Better approach - use training statistics only
X_val_nfs = X_val_nfs.apply(lambda col: col.fillna(col.mean()), axis=0)
X_test = X_test.apply(lambda col: col.fillna(col.mean()), axis=0)
X_train = X_train.apply(lambda col: col.fillna(col.mean()), axis=0)

print("Class distribution in full dataset:")
print(pd.Series(Y_encoded).value_counts().sort_index(), label_encoder.classes_)

print("Class distribution in training set (nfs):")
print(pd.Series(y_train_all).value_counts().sort_index(), label_encoder.classes_)

print("Class distribution in test set:")
print(pd.Series(y_test_nfs).value_counts().sort_index(), label_encoder.classes_)

print("Class distribution in validation set:")
print(pd.Series(y_val_nfs).value_counts().sort_index(), label_encoder.classes_)

smote = SMOTE(sampling_strategy='auto', random_state=None, k_neighbors=5)
X_train, y_train_all = smote.fit_resample(X_train, y_train_all)

selector = MyXGboost.XGBoostBinary()[0].fit(X_train, y_train_all)
# Evaluate the model on the validation set
evaluate_model("Validation", selector, X_val_nfs, y_val_nfs, label_encoder)
evaluate_model("Test", selector, X_test, y_test_nfs, label_encoder)

In [None]:
models_multiclass = [
    {
        'Name': 'Random Forest',
        'Model': MyXGboost.RandomForest300()
    },
    {
        'Name': 'XGBoost',
        'Model': MyXGboost.XGBoostMultiClass()
    },
    {
        'Name': 'Light GBM',
        'Model': MyXGboost.LightGBMMulticlass()
    },
    {
        'Name': 'Gradient Boosting',
        'Model': MyXGboost.GradientBoosting()
    },
    {
        'Name': 'Ada Boost',
        'Model': MyXGboost.AdaBoostMultiClass()
    }
]

models_binary = [
    {
        'Name': 'Random Forest',
        'Model': MyXGboost.RandomForest300()
    },
    {
        'Name': 'XGBoost',
        'Model': MyXGboost.XGBoostBinary()
    },
    {
        'Name': 'Light GBM',
        'Model': MyXGboost.LightGBMBinary()
    },
    {
        'Name': 'Gradient Boosting',
        'Model': MyXGboost.GradientBoosting()
    },
    {
        'Name': 'Ada Boost',
        'Model': MyXGboost.AdaBoostBinary()
    }
]

In [None]:
def process_pso(df_cancer):
    X = df_cancer.iloc[:, :-1].apply(pd.to_numeric, errors='coerce')
    Y = df_cancer.iloc[:, -1]

    # Fill missing values with the lowest value of its cpg site
    X = X.apply(lambda col: col.fillna(col.min()), axis=0)

    n_features = X.shape[1]
    print(f"Loaded dataset with {n_features} features and {len(Y)} samples")

    # Use DecisionTreeClassifier as the estimator
    estimator = MyXGboost.DecisionTreeMultiClass()

    # Run PSO
    best_weights, best_fitness, progress, X_selected = run_pso_with_progress(
        X, Y, estimator, n_features,
        swarmsize=30,
        maxiter=10,
        threshold=0.8
    )


    X_selected_pso = X.iloc[:, X_selected]

    print(f"Done PSO â†’ best fitness = {best_fitness:.4f}")

    return X_selected_pso

def process_ga(df_cancer):
    X = df_cancer.iloc[:, :-1].apply(pd.to_numeric, errors='coerce')
    Y = df_cancer.iloc[:, -1]

    # Fill missing values with the lowest value of its cpg site
    X = X.apply(lambda col: col.fillna(col.min()), axis=0)

    n_features = X.shape[1]
    print(f"Loaded dataset with {n_features} features and {len(Y)} samples")

    # Use DecisionTreeClassifier as the estimator
    estimator = MyXGboost.DecisionTreeMultiClass()

    best_weights_ga, best_fitness_ga, progress_ga, X_selected_proc = run_ga_with_progress(
        X, Y, estimator, X.shape[1], 
        pop_size=30, n_generations=5, threshold=0.8
    )


    X_selected_ga = X.iloc[:, X_selected_proc]

    return X_selected_ga

datasets = {
    'Healthy vs BRCA': [healthy_unk_cases, brca_consolidated_cases],
    'Healthy vs PRE-BRCA VS BRCA': [healthy_unk_cases, prebrca_cases, brca_consolidated_cases],
    'Healthy vs PRE-BRCA': [healthy_unk_cases, prebrca_cases],
    'PRE-BRCA vs BRCA': [prebrca_cases, brca_consolidated_cases],
}

In [None]:
selectors_pso = []

for key in datasets:
    X_selected_pso = process_pso(pd.concat(datasets[key], ignore_index=True))
    X = X_selected_pso

    Y = pd.concat(datasets[key], ignore_index=True).iloc[:, -1]
    label_encoder = LabelEncoder()
    Y_encoded = label_encoder.fit_transform(Y)

    models = models_binary if len(np.unique(Y)) == 2 else models_multiclass
    total_features_selcted = len(X_selected_pso)
    print(f"Total features selected by PSO: {total_features_selcted}")

    for model_info in models:
        print(f"Training model: {model_info['Name']}")        

        # Split data        
        X_train, X_test, y_train, y_test = train_test_split(
            X, Y_encoded, test_size=0.40, random_state=42
        )

        # Split X_test and y_test into validation and test sets (15% each)
        X_val, X_test, y_val, y_test = train_test_split(
            X_test, y_test, test_size=0.5, random_state=42
        )

        # Better approach - use training statistics only
        X_val = X_val.apply(lambda col: col.fillna(col.mean()), axis=0)
        X_test = X_test.apply(lambda col: col.fillna(col.mean()), axis=0)
        X_train = X_train.apply(lambda col: col.fillna(col.mean()), axis=0)

        smote = SMOTE(sampling_strategy='auto', random_state=None, k_neighbors=5)
        X_train, y_train = smote.fit_resample(X_train, y_train)

        model = model_info['Model'][0] if isinstance(model_info['Model'], (list, tuple)) else model_info['Model']
        model.fit(X_train, y_train)

        info = f"{model_info['Name']} - {key} - VALIDATION"
        evaluate_model(info, model, X_test, y_test, label_encoder)
        
        print("-" * 50)

        info = f"{model_info['Name']} - {key} - TEST"
        evaluate_model(info, model, X_test, y_test, label_encoder)

        selectors_pso.append({
            name: info,
            selector: model
            })

In [None]:
selectors_ga = []

for key in datasets:
    X_selected_ga = process_ga(pd.concat(datasets[key], ignore_index=True))
    X = X_selected_ga

    Y = pd.concat(datasets[key], ignore_index=True).iloc[:, -1]
    label_encoder = LabelEncoder()
    Y_encoded = label_encoder.fit_transform(Y)

    models = models_binary if len(np.unique(Y)) == 2 else models_multiclass
    total_features_selcted = len(X_selected_ga)
    print(f"Total features selected by GA: {total_features_selcted}")

    for model_info in models:
        print(f"Training model: {model_info['Name']}")        

        # Split data        
        X_train, X_test, y_train, y_test = train_test_split(
            X, Y_encoded, test_size=0.40, random_state=42
        )

        # Split X_test and y_test into validation and test sets (15% each)
        X_val, X_test, y_val, y_test = train_test_split(
            X_test, y_test, test_size=0.5, random_state=42
        )

        # Better approach - use training statistics only
        X_val = X_val.apply(lambda col: col.fillna(col.mean()), axis=0)
        X_test = X_test.apply(lambda col: col.fillna(col.mean()), axis=0)
        X_train = X_train.apply(lambda col: col.fillna(col.mean()), axis=0)

        smote = SMOTE(sampling_strategy='auto', random_state=None, k_neighbors=5)
        X_train, y_train = smote.fit_resample(X_train, y_train)

        model = model_info['Model'][0] if isinstance(model_info['Model'], (list, tuple)) else model_info['Model']
        model.fit(X_train, y_train)

        info = f"{model_info['Name']} - {key} - VALIDATION"
        evaluate_model(info, model, X_test, y_test, label_encoder)
        
        print("-" * 50)

        info = f"{model_info['Name']} - {key} - TEST"
        evaluate_model(info, model, X_test, y_test, label_encoder)

        selectors_ga.append({
            name: info,
            selector: model
            })
        

