In [None]:
### IMPORT GENERAL LIBRARIES
import pandas as pd 
import numpy as np 
import seaborn as sns 
import matplotlib.pyplot as plt 
from scipy.stats import ttest_1samp, ttest_ind, zscore, skew, kurtosis
from scipy import stats as st
from matplotlib import ticker
from matplotlib.font_manager import FontProperties
import random
from matplotlib.ticker import ScalarFormatter, FormatStrFormatter

### IMPORT LIBRARIES FOR MODELING
from sklearn.model_selection import train_test_split, GridSearchCV
from sklearn.ensemble import GradientBoostingClassifier
from sklearn.metrics import classification_report, make_scorer, f1_score, accuracy_score, precision_score, recall_score
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import KFold
from sklearn.metrics import confusion_matrix
from sklearn.metrics import roc_curve, auc
import xgboost as xgb

In [None]:

def gaussian_kernel(x, y, sigma):
    min_length = min(len(x), len(y))
    x = x[:min_length]
    y = y[:min_length]
    return np.exp(-np.sum((x - y)**2) / (2 * sigma**2))

def compare_individual_imfs(df1, df2, sigma=.5, threshold=1e-10):
    results = []
    significant_indices = []
    for i in range(16):
        imf1 = df1.iloc[:, i].values
        imf2 = df2.iloc[:, i].values
        similarities = [gaussian_kernel(x, y, sigma) for x, y in zip(imf1, imf2)]
        avg_similarity = np.mean(similarities)
        results.append(avg_similarity)
        if avg_similarity <= threshold:
            significant_indices.append(i+1)
    return results, significant_indices

df1 = female_dep_imfs # see utils.ipynb field in src folder
df2 = female_health_imfs

similarities, significant_imfs = compare_individual_imfs(df1, df2)

np.random.seed(42)

def extract_statistics(imfs_array, window_size=10000):
    n_signals, n_imfs, n_samples = imfs_array.shape
    n_windows = n_samples // window_size
    if n_windows == 0 or n_imfs == 0:
        raise ValueError("There are not enough samples or IMFs to form a single window.")
    all_stats = np.zeros((n_signals, n_imfs * 5))
    for i in range(n_signals):
        signal_stats = []
        for j in range(n_imfs):
            signal = imfs_array[i, j]
            stats = []
            for w in range(n_windows):
                window = signal[w*window_size:(w+1)*window_size]
                stats.append([
                    skew(window),
                    kurtosis(window),
                    np.median(window),
                    np.std(window),
                    np.mean(window)
                ])
            stats = np.array(stats)
            signal_stats.extend(stats.mean(axis=0))
        all_stats[i, :] = signal_stats
    return all_stats

# Función para aplicar el balanceo de datos
def balance_data(X, y):
    class_0 = np.where(y == 0)[0]
    class_1 = np.where(y == 1)[0]

    # Determinamos el número mínimo de filas entre las dos clases
    min_size = min(len(class_0), len(class_1))

    # Seleccionamos aleatoriamente las filas para balancear las clases
    balanced_indices_0 = np.random.choice(class_0, min_size, replace=False)
    balanced_indices_1 = np.random.choice(class_1, min_size, replace=False)

    balanced_indices = np.concatenate([balanced_indices_0, balanced_indices_1])

    # Balanceamos X e y
    X_balanced = X[balanced_indices]
    y_balanced = y[balanced_indices]

    return X_balanced, y_balanced

imfs_array = np.stack([np.stack([df1.iloc[i, idx] for idx in significant_imfs]) for i in range(df1.shape[0])])
print(imfs_array.shape)  # Debe ser (n_signals, n_imfs, n_samples)

imfs_array2 = np.stack([np.stack([df2.iloc[i, idx] for idx in significant_imfs]) for i in range(df2.shape[0])])
print(imfs_array2.shape)

df1_stats = extract_statistics(imfs_array)
df2_stats = extract_statistics(imfs_array2)

X_stats = np.vstack([df1_stats, df2_stats])
y_labels = np.concatenate([np.ones(df1_stats.shape[0]), np.zeros(df2_stats.shape[0])])  

X_balanced, y_balanced = balance_data(X_stats, y_labels)

X_train, X_test, y_train, y_test = train_test_split(X_balanced, y_balanced, test_size=0.2, random_state=42)

models = {
    'Gradient Boosting': (GradientBoostingClassifier(random_state=42), {
        'n_estimators': list(range(1, 35, 5)),
        'learning_rate': [0.1, 0.2, 0.3],
        'max_depth': [1, 2, 3, 5]
    })
}

def plot_confusion_matrix(cm, class_names, model_name):
    plt.figure(figsize=(4, 4))
    sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', xticklabels=class_names, yticklabels=class_names)
    plt.title(f'Confusion Matrix - {model_name}')
    plt.ylabel('True Label')
    plt.xlabel('Predicted Label')
    plt.tight_layout()
    plt.show()
    
results = []

scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train)
X_test_scaled = scaler.transform(X_test)

for model_name, (model, param_grid) in models.items():
    print(f"Evaluando {model_name}...")

    param_grid_prefixed = {f'model__{key}': value for key, value in param_grid.items()}

    cv = KFold(n_splits=5, shuffle=True, random_state=42)
    pipeline = Pipeline([('scaler', StandardScaler()), ('model', model)])
    grid_search = GridSearchCV(pipeline, param_grid_prefixed, cv=5, scoring='f1', n_jobs=-1, verbose=0)
    
    grid_search.fit(X_train_scaled, y_train)

    best_params = grid_search.best_params_
    best_scores = grid_search.cv_results_

    best_model = grid_search.best_estimator_
    y_pred = best_model.predict(X_test_scaled)
    
    cm = confusion_matrix(y_test, y_pred)
    TN, FP, FN, TP = cm.ravel() if cm.size == 4 else (cm[0,0], cm[0,1], cm[1,0], cm[1,1])
    specificity = TN / (TN + FP) if (TN + FP) > 0 else 0

    results.append({
        'Model': model_name,
        'Best Model': best_model,
        'Parameters': grid_search.best_params_,
        'Accuracy': accuracy_score(y_test, y_pred),
        'F1 Score': f1_score(y_test, y_pred),
        'Precision': precision_score(y_test, y_pred),
        'Recall': recall_score(y_test, y_pred),
        'Specificity': specificity
    })

    plot_confusion_matrix(cm, class_names=['Healthy', 'Depressed'], model_name=model_name)

def plot_roc_curves(results, X_test_scaled, y_test):
    plt.figure(figsize=(6, 6))
    
    for result in results:
        model_name = result['Model']
        model = result['Best Model']

        if hasattr(model, 'predict_proba'):
            y_pred_proba = model.predict_proba(X_test_scaled)[:, 1]
        elif hasattr(model, 'decision_function'):
            y_pred_proba = model.decision_function(X_test_scaled)
        else:
            raise AttributeError("Model has neither predict_proba nor decision_function")
              
        fpr, tpr, _ = roc_curve(y_test, y_pred_proba)
        roc_auc = auc(fpr, tpr)
        
        plt.plot(fpr, tpr, label=f'{model_name} (AUC = {roc_auc:.2f})', linewidth=2.5)
    
    plt.plot([0, 1], [0, 1], linestyle='--', color='gray', label='Random (AUC = 0.50)')
    
    plt.xlim([0.0, 1.0])
    plt.ylim([0.0, 1.05])
    plt.xlabel('False Positive Rate', fontsize=12)
    plt.ylabel('True Positive Rate', fontsize=12)
    plt.title('ROC Curve for Female Data Across Selected IMFs in balanced data', fontsize=12)
    plt.legend(loc="lower right", fontsize=12)
    plt.grid(True)
    plt.show()

results_df = pd.DataFrame(results)
print("\nProcesando resultados para los IMFs seleccionados...")
print("Resultados de GridSearchCV:")
params = results_df['Parameters'].apply(lambda x: {k.replace('model__', ''): v for k, v in x.items()})
params_df = pd.DataFrame(params.tolist())
print(params_df)
print()

plot_roc_curves(results, X_test_scaled, y_test)
results_df