In [2]:
import sys
import os
from datetime import datetime
from pathlib import Path

# Import necessary libraries
import logging
import traceback
import numpy as np
import pandas as pd

# Scikit-learn imports
from sklearn.ensemble import RandomForestRegressor
from sklearn.svm import SVR
from sklearn.linear_model import LinearRegression, Ridge, Lasso
from sklearn.model_selection import (
    GridSearchCV, 
    KFold, 
    RepeatedKFold, 
    LeaveOneOut, 
    StratifiedKFold
)
from sklearn.metrics import (
    mean_squared_error, 
    mean_absolute_error, 
    r2_score, 
    explained_variance_score
)

from sklearn.feature_selection import SelectKBest, f_classif


from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.impute import SimpleImputer
from sklearn.feature_selection import SelectKBest, f_regression
from sklearn.decomposition import PCA
import xgboost as xgb


pd.set_option('display.max_columns', None)

sys.path.append(os.path.abspath("../src"))  

def bold_max(df, dataset="", precision=2):
    """
    Return a Styler that bolds the column-wise maxima.

    Works with both:
    - numeric values
    - strings in the format '0.84 ± 0.02'

    Parameters
    ----------
    df : pd.DataFrame
        DataFrame with either numeric or 'mean ± std' strings.
    dataset : str
        A caption or title to display above the table.
    precision : int, default 2
        Number of decimals to show if numeric.
    """
    def is_string_with_std(val):
        return isinstance(val, str) and '±' in val
    
    def is_string_with_bracket(val):
        return isinstance(val, str) and '[' in val

    if df.applymap(is_string_with_std).all().all():
        # All cells are strings with ±
        def highlight_max(col):
            means = col.str.extract(r"(\d+\.\d+) ±")[0].astype(float)
            max_val = means.max()
            return ['font-weight: bold' if v == max_val else '' for v in means]

        return df.style.set_caption(f"Dataset: {dataset}").apply(highlight_max, axis=0)

    else:
        # Assume numeric DataFrame
        return (
            df.style
              .set_caption(f"Dataset: {dataset}")
              .format(f"{{:.{precision}f}}")
              .apply(lambda col: ['font-weight: bold' if v == col.max() else '' for v in col], axis=0)
        )


In [3]:
processed_path = '../data/processed/'
# Read data
df_digital_tmt_with_target = pd.read_csv(processed_path + 'df_digital_tmt_with_target.csv') 
demographic_df = pd.read_csv(processed_path + 'demographic_df.csv') 
non_digital_df = pd.read_csv(processed_path + 'non_digital_df.csv') 
df_digital_hand_and_eye = pd.read_csv(processed_path + 'df_digital_hand_and_eye.csv') 
digital_test_less_subjects = pd.read_csv(processed_path + 'digital_test_less_subjects.csv') 
non_digital_test_less_subjects = pd.read_csv(processed_path + 'non_digital_test_less_subjects.csv') 


# Final checks
print(df_digital_tmt_with_target['group'].value_counts())
print(demographic_df['group'].value_counts())
print(non_digital_df['group'].value_counts())
print(df_digital_hand_and_eye['group'].value_counts())
print(digital_test_less_subjects['group'].value_counts())
print(non_digital_test_less_subjects['group'].value_counts())

group
1    41
0    33
Name: count, dtype: int64
group
1    41
0    33
Name: count, dtype: int64
group
1    41
0    33
Name: count, dtype: int64
group
1    29
0    23
Name: count, dtype: int64
group
1    29
0    23
Name: count, dtype: int64
group
1    29
0    23
Name: count, dtype: int64


In [None]:
# Configure logging
# Ensure logs folder exists
log_dir = "logs"
os.makedirs(log_dir, exist_ok=True)
# Create a fresh log file each run
log_filename = os.path.join(log_dir, f"error_log_{datetime.now().strftime('%Y-%m-%d_%H-%M-%S')}.log")
logging.basicConfig(filename=log_filename,
                    level=logging.ERROR,
                    format='%(asctime)s - %(levelname)s - %(message)s')


# ───────────────────────────────────────────────────────────────
# 0. SET-UP GENERAL 
# ───────────────────────────────────────────────────────────────
df_metadata_for_ml = pd.read_csv("metadata_for_ml.csv")
df_y = df_metadata_for_ml[['suj', 'mmse']]

n_splits = 2
n_repeats = 1

global_seed = 42
inner_cv_seed = 50  # Fixed for reproducibility in inner CV
perform_pca = False
type_of_csv = 'loo'
n_components = 4
tune_hyperparameters = False
feature_selection = True

# datasets = [
#             # N=79
#             'demographic', 
#             'digital_test', 
#             'demographic+digital',
#             'non_digital_tests', 
#             'non_digital_tests+demo',

#             # N=56
#             'demographic_less_subjects', 
#             'digital_test_less_subjects', 
#             'demographic+digital_less',
#             'hand_and_eye',
#             'hand_and_eye_demo',
#             'non_digital_test_less_subjects', 
#             'non_digital_test_less_subjects+demo'
#             ]

datasets = ['non_digital_test_less_subjects+demo']


def extract_X_y_features_regression(df, metadata):
    df = df.merge(metadata, how='inner', on='suj')
    df = df.drop(['suj', 'group'], axis=1)
    print("group in X", 'group' in df.iloc[:, :-1].columns)
    print("suj in X", 'suj' in df.iloc[:, :-1].columns)
    display(df.head(2))
    X = df.iloc[:, :-1].values
    y = df.iloc[:, -1].values
    feature_names = df.columns[:-1]
    
    return X, y, feature_names 

def join_and_reorder_regression(df1, df2):
    df = df1.join(df2.drop(columns='group'))
    cols = [col for col in df.columns if col != 'group'] + ['group']
    df = df[cols]
    assert df.columns[-1] == 'group', "'group' is not the last column after reordering"
    return df

y_variable = 'mmse'
df_y = df_metadata_for_ml[['suj',y_variable]]

for value in [True, False]: 
    perform_pca = value
    for dataset in datasets:
        try:
            print(f"Starting {dataset}: \n\n")
            if perform_pca:
                print("Performing PCA")

            match dataset:
                case 'demographic':
                    X, y, feature_names = extract_X_y_features_regression(demographic_df, df_y)

                case 'demographic_less_subjects':  
                    df = demographic_df.loc[df_digital_hand_and_eye.index]
                    X, y, feature_names = extract_X_y_features_regression(df, df_y)

                case 'demographic+digital':
                    df = join_and_reorder_regression(df_digital_tmt_with_target, demographic_df)
                    X, y, feature_names = extract_X_y_features_regression(df, df_y)

                case 'demographic+digital_less':
                    df = join_and_reorder_regression(df_digital_tmt_with_target.loc[df_digital_hand_and_eye.index], demographic_df)
                    X, y, feature_names = extract_X_y_features_regression(df, df_y)

                case 'non_digital_tests':
                    non_digital_df = non_digital_df.drop(y_variable, axis=1)
                    X, y, feature_names = extract_X_y_features_regression(non_digital_df, df_y)

                case 'non_digital_tests+demo':
                    df = join_and_reorder_regression(non_digital_df.drop(columns=['suj', y_variable]), demographic_df)
                    X, y, feature_names = extract_X_y_features_regression(df, df_y)

                case 'non_digital_test_less_subjects':
                    X, y, feature_names = extract_X_y_features_regression(non_digital_test_less_subjects, df_y)

                case 'non_digital_test_less_subjects+demo':
                    non_digital_test_less_subjects = non_digital_test_less_subjects.drop(y_variable, axis=1)
                    df = join_and_reorder_regression(non_digital_test_less_subjects.drop(columns='suj'), demographic_df)
                    X, y, feature_names = extract_X_y_features_regression(df, df_y)

                case 'digital_test':
                    X, y, feature_names = extract_X_y_features_regression(df_digital_tmt_with_target, df_y)

                case 'digital_test_less_subjects':
                    X, y, feature_names = extract_X_y_features_regression(digital_test_less_subjects, df_y)

                case 'hand_and_eye':
                    X, y, feature_names = extract_X_y_features_regression(df_digital_hand_and_eye, df_y)

                case 'hand_and_eye_demo':
                    df = join_and_reorder_regression(df_digital_hand_and_eye, demographic_df)
                    X, y, feature_names = extract_X_y_features_regression(df, df_y)

                case _:
                    raise ValueError(f'Please select a valid dataset from: {datasets}')


            # ───────────────────────────────────────────────────────────────
            # 1. DEFINICIÓN DE PARÁMETROS Y MODELOS 
            # ───────────────────────────────────────────────────────────────

            unique, counts = np.unique(y, return_counts=True)
            print("Class distribution:", dict(zip(unique, counts)))

            # Define parameter grids
            param_grids = {
                "RandomForestClassifier": {
                    "classifier__n_estimators": [100, 500, 700, 1000],
                    "classifier__max_depth": [None, 10, 20, 30]
                },
                "SVC": {
                    "classifier__C": [0.1, 1, 10],
                    "classifier__kernel": ['linear', 'rbf']
                },
                "LogisticRegression": {
                    "classifier__C": [0.1, 1, 10],
                    "classifier__penalty": ['l2']
                },
                "XGBClassifier": {
                    "classifier__n_estimators": [100, 300],
                    "classifier__max_depth": [3, 5],
                    "classifier__learning_rate": [0.05, 0.1]
                }
            }

            # Define models to evaluate
            models = [
                # RandomForestClassifier(random_state=42, n_jobs=-1),
                # SVC(random_state=42, probability=True),
                LogisticRegression(max_iter=1000, random_state=42, solver='saga', n_jobs=-1),
                # xgb.XGBClassifier(random_state=42, tree_method="hist", eval_metric='logloss',n_jobs=-1)
            ]

            # ───────────────────────────────────────────────────────────────
            # 2. Cross validation
            # ───────────────────────────────────────────────────────────────

            match type_of_csv:
                case 'stratified':
                    print(f"RepeatedStratifiedKFold selected with n_splits = {n_splits} and n_repeats = {n_repeats}")
                    outer_cv = RepeatedStratifiedKFold(
                        n_splits=n_splits,
                        n_repeats=n_repeats,         
                        random_state=global_seed # Global seed
                    )
                case 'loo':
                    print("LeaveOneOut selected")
                    outer_cv = LeaveOneOut()
                case _:
                    print("select a valid CV type")

            mean_fpr = np.linspace(0, 1, 100)
            all_metrics_df = pd.DataFrame(columns=[
                'model', 'repeat', 'fold',   
                'accuracy', 'balanced_accuracy', 'precision', 
                'recall', 'f1', 'auc', 'specificity'
            ])

            # ───────────────────────────────────────────────────────────────
            # 3. External loop 
            # ───────────────────────────────────────────────────────────────
            for model in models:
                model_name = model.__class__.__name__
                print(f"\n🧪 CV for: {model_name}")

                tprs, aucs, best_params_list, fold_metrics = [], [], [], []
                feature_importance_counts = {n: 0 for n in feature_names}

                # fig, ax = plt.subplots(figsize=(6, 6))
                all_y_true, all_y_pred = [], []

                # Enumeramos 'repeat' y 'fold' para guardar en métricas
                for outer_idx, (train_idx, test_idx) in enumerate(outer_cv.split(X, y)):
                    n_features = 20
                    n_components = 4
                    fold = outer_idx  # index of the left-out observation
                    print('fold:', fold)

                    # ── Split
                    X_train, X_test = X[train_idx], X[test_idx]
                    y_train, y_test = y[train_idx], y[test_idx]

                    
                    # ── Inner CV: estratificado 3-fold con la MISMA semilla por repetición
                    inner_cv = StratifiedKFold(n_splits=3, shuffle=True, random_state=inner_cv_seed)
                    
                    if perform_pca:
                        n_components = min(n_components, X_train.shape[1])
                        print("n_components:", n_components)
                        pca_step = ('pca', PCA(n_components=n_components))
                    else:
                        pca_step = ('noop', 'passthrough')

                    
                    if feature_selection:
                        n_features = min(n_features, X_train.shape[1])
                        print("n_features:", n_features)
                        feature_selection_step = ('select', SelectKBest(score_func=f_classif, k=n_features))
                    else:
                        feature_selection_step = ('noop', 'passthrough')

                    pipeline = Pipeline([
                        ('imputer', SimpleImputer(strategy='mean')),  # or 'median' depending on your data
                        feature_selection_step,
                        ('scaler', StandardScaler()),
                        pca_step,
                        ('classifier', model)
                    ])

                    # Hiperparámetros
                    param_grid = param_grids.get(model_name, {})


                    if tune_hyperparameters and param_grid:
                        grid = GridSearchCV(
                            pipeline,
                            param_grid=param_grid,
                            cv=inner_cv,               
                            scoring='roc_auc',
                            n_jobs=-1,
                            verbose=0
                        )
                        grid.fit(X_train, y_train)
                        best_model = grid.best_estimator_
                        best_params_list.append(grid.best_params_)
                    else:
                        pipeline.fit(X_train, y_train)
                        best_model = pipeline
                        best_params_list.append("no tuning")

                    # ── Predicción
                    y_pred_proba = best_model.predict_proba(X_test)[:, 1]
                    y_pred = best_model.predict(X_test)

                    all_y_true.extend(y_test)
                    all_y_pred.extend(y_pred)

                    fold_metrics.append({
                        'model': model_name,
                        'fold': fold,              
                        'y_test': y_test[0],
                        'y_pred': y_pred[0],
                        'y_pred_proba': y_pred_proba[0],
                        'feature_names': feature_names.values
                    })

                # ── Guardamos métricas
                all_metrics_df = pd.concat([all_metrics_df,
                                            pd.DataFrame(fold_metrics)],
                                        ignore_index=True)


            # save
            dir = f'./results/modelling/regression/{datetime.now().strftime("%Y-%m-%d")}'
            os.makedirs(dir, exist_ok=True)
            if perform_pca:
                all_metrics_df.to_csv(f'{dir}/{dataset}_feature_{feature_selection}_n={n_features}_tune={tune_hyperparameters}_LOOCV_PCA_n_components{n_components}_{datetime.now().strftime("%s")[-4:]}.csv',index=False)
            else:
                all_metrics_df.to_csv(f'{dir}/{dataset}_feature_{feature_selection}_n={n_features}_tune={tune_hyperparameters}_LOOCV_{datetime.now().strftime("%s")[-4:]}.csv',index=False)
        except Exception as e:
            print(e)
            error_msg = traceback.format_exc().strip().split("\n")[-1]  # only last line of error
            logging.error(f"[{dataset}] PCA={perform_pca} → {error_msg}")
            print(f"⚠️ An error occurred with dataset {dataset}. Check log file: {log_filename}")
