<a   href="https://colab.research.google.com/github/N-Nieto/OHBM_SEA-SIG_Educational_Course/blob/master/03_pitfalls/03_04_imbalance_learning_data_strategies.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

### If you are running in Google Colab, uncomment the cell below to load the data.
### If you are running locally, ignore the cell.

For questions on this notebook contact: n.nieto@fz-juelich.de

In [None]:
# from pathlib import Path
# from urllib.request import urlretrieve
# # Clean files
# import pandas as pd
# import numpy as np

# # Download necessary data files
# Path("data").mkdir(exist_ok=True)

# # 01_basic_ML.ipynb needs this files
# urlretrieve('https://zenodo.org/records/17056022/files/cleaned_VBM_GM_Schaefer100x17_mean_aggregation.csv?download=1', './data/cleaned_VBM_GM_Schaefer100x17_mean_aggregation.csv')
# urlretrieve('https://zenodo.org/records/17056022/files/cleaned_IXI_behavioural.csv?download=1', './data/cleaned_IXI_behavioural.csv')

# # 02_XAI.ipynb needs also this files
# urlretrieve('https://zenodo.org/records/17056022/files/cleaned_VBM_GM_TianxS1x3TxMNI6thgeneration_mean_aggregation.csv?download=1', './data/cleaned_VBM_GM_TianxS1x3TxMNI6thgeneration_mean_aggregation.csv')

# # Load data
# df_behav = pd.read_csv("data/cleaned_IXI_behavioural.csv", index_col=0)

# # Some height values are not sensible, we filter them out
# height = df_behav["HEIGHT"].values
# df_behav = df_behav[np.logical_and(height > 120, height < 200)]

# # Remove NaNs and duplicates
# df_behav.dropna(inplace=True)
# df_behav.drop_duplicates(keep='first', inplace=True)
# df_behav.to_csv('data/cleaned_IXI_behavioural.csv')

# # Remove NaNs
# df_cortical_100 = pd.read_csv("data/cleaned_VBM_GM_Schaefer100x17_mean_aggregation.csv", index_col=0)
# df_cortical_100.dropna(inplace=True)
# df_cortical_100.to_csv('data/cleaned_VBM_GM_Schaefer100x17_mean_aggregation.csv')

# # Remove NaNs
# df_subcortical = pd.read_csv("data/cleaned_VBM_GM_TianxS1x3TxMNI6thgeneration_mean_aggregation.csv", index_col=0)
# df_subcortical.dropna(inplace=True)
# df_subcortical.to_csv('data/cleaned_VBM_GM_TianxS1x3TxMNI6thgeneration_mean_aggregation.csv')

# data_path = Path("data/")


# Imbalance learning: Data strategies

### Imports

In [None]:
import numpy as np
import pandas as pd
from pathlib import Path
import matplotlib.pyplot as plt
from sklearn.linear_model import LogisticRegression
from sklearn.model_selection import train_test_split
from sklearn.metrics import (
    accuracy_score,
    precision_score,
    recall_score,
    f1_score,
    roc_auc_score,
    confusion_matrix,
    balanced_accuracy_score,
    ConfusionMatrixDisplay,
)

from imblearn.metrics import specificity_score

import warnings

warnings.filterwarnings("ignore", category=FutureWarning)  # Ignore

if 'data_path' not in locals():
    data_path = Path("../data/")


### Data loading and preparation

In [None]:
# Prepare the data
# Features: Cortical + Subcortical
features = ["cortical", "subcortical"]

# Target: Sex
target = ["SEX_ID (1=m, 2=f)"]
# Confounding variables: No for this example
confounding = []

df_data = pd.read_csv(data_path / "cleaned_IXI_behavioural.csv", index_col=0)
columns_features = []
for feature in features:
    if feature == "cortical":
        df_feature = pd.read_csv(
            data_path / "cleaned_VBM_GM_Schaefer100x17_mean_aggregation.csv",
            index_col=0,
        )
    elif feature == "subcortical":
        df_feature = pd.read_csv(
            data_path
            / "cleaned_VBM_GM_TianxS1x3TxMNI6thgeneration_mean_aggregation.csv",
            index_col=0,
        )
    else:
        print("feature not recognized")

    df_data = df_data.join(df_feature, how="inner")
    columns_features = columns_features + df_feature.columns.to_list()

print(f"Final data shape: {df_data.shape}")

y = df_data[target].values.ravel()
if target == ["SEX_ID (1=m, 2=f)"]:
    y = np.where(y == 2, 0, 1)  # Put the classes as 0 and 1

X = df_data.loc[:, columns_features].values  # only brain features

print("X shape")
print(X.shape)


### Forcing Imbalance

In [None]:
# Force imbalance in the dataset
imbalance_ratio = 0.15  # Minority class will be 15% of majority class
X_minority = X[y == 0]
y_minority = y[y == 0]
X_majority = X[y == 1][: int(imbalance_ratio * len(X_minority))]
y_majority = y[y == 1][: int(imbalance_ratio * len(X_minority))]  # Keep only 15% of majority class
X = np.vstack((X_minority, X_majority))
y = np.hstack((y_minority, y_majority))

print("X shape after imbalance")
print(X.shape)
print("Target distribution")
print(y.sum(), len(y) - y.sum())
print(f"Imbalance ratio: {y.sum() / len(y):.2f}")


random_state = 42
test_size = 0.3
# Split data
X_train, X_test, y_train, y_test = train_test_split(
    X, y, test_size=test_size, random_state=random_state
)

## Training a ML model and plot performance

In [None]:
# Split data
X_train, X_test, y_train, y_test = train_test_split(
    X, y, test_size=0.3, random_state=42
)

# Train logistic regression
model = LogisticRegression(max_iter=1000)
model.fit(X_train, y_train)
y_pred = model.predict(X_test)
y_proba = model.predict_proba(X_test)[:, 1]


# Calculate metrics
metrics = {
    "Accuracy": accuracy_score(y_test, y_pred),
    "Balanced Accuracy": balanced_accuracy_score(y_test, y_pred),
    "Precision": precision_score(y_test, y_pred),
    "Recall": recall_score(y_test, y_pred),
    "F1": f1_score(y_test, y_pred),
    "Specificity": specificity_score(y_test, y_pred),
    "ROC AUC": roc_auc_score(y_test, y_proba),
}


# Plot metrics
plt.figure(figsize=(12, 7))
plt.grid(axis="y", linestyle="--", alpha=0.7)
plt.bar(
    metrics.keys(),
    metrics.values(),
    color=[
        "skyblue",
        "lightgreen",
        "salmon",
        "orange",
        "purple",
        "lightcoral",
        "lightseagreen",
    ],
)
plt.title("Model Performance Metrics on Imbalanced Data")
plt.ylim(0, 1.1)

plt.ylabel("Score")
for i, v in enumerate(metrics.values()):
    plt.text(i, v + 0.05, f"{v:.3f}", ha="center")
plt.show()


In [None]:
# Confusion matrix
cm = confusion_matrix(y_test, y_pred)
disp = ConfusionMatrixDisplay(confusion_matrix=cm)
disp.plot(cmap="Blues")
plt.title("Confusion Matrix")
plt.show()


# Until here, we have the same as before, let's explore data strategies to mitigate the class imbalance

# Oversampling Minority

In [None]:
from imblearn.over_sampling import RandomOverSampler, SMOTE

# Random oversampling of minority class
ros = RandomOverSampler(random_state=23, shrinkage= 0.1)
X_resampled, y_resampled = ros.fit_resample(X_train, y_train)

# Train on resampled data
model_ros = LogisticRegression(max_iter=1000)
model_ros.fit(X_resampled, y_resampled)
y_pred = model_ros.predict(X_test)
y_proba = model_ros.predict_proba(X_test)[:, 1]

# Calculate metrics after resampling
metrics_ros = {
    "Accuracy": accuracy_score(y_test, y_pred),
    "Balanced Accuracy": balanced_accuracy_score(y_test, y_pred),
    "Precision": precision_score(y_test, y_pred),
    "Recall": recall_score(y_test, y_pred),
    "F1": f1_score(y_test, y_pred),
    "Specificity": specificity_score(y_test, y_pred),
    "ROC AUC": roc_auc_score(y_test, y_proba),
}

# Random oversampling of minority class
smote = SMOTE(random_state=23)
X_resampled, y_resampled = smote.fit_resample(X_train, y_train)

# Train on resampled data
model_smote = LogisticRegression(max_iter=1000)
model_smote.fit(X_resampled, y_resampled)
y_pred = model_smote.predict(X_test)
y_proba = model_smote.predict_proba(X_test)[:, 1]

# Calculate metrics after resampling
metrics_smote = {
    "Accuracy": accuracy_score(y_test, y_pred),
    "Balanced Accuracy": balanced_accuracy_score(y_test, y_pred),
    "Precision": precision_score(y_test, y_pred),
    "Recall": recall_score(y_test, y_pred),
    "F1": f1_score(y_test, y_pred),
    "Specificity": specificity_score(y_test, y_pred),
    "ROC AUC": roc_auc_score(y_test, y_proba),
}


## Lets plot and compare several metrics 

In [None]:
# Create comparison dataframe
comparison_df = pd.DataFrame(
    {"Original": metrics, "Random Oversampling": metrics_ros, "SMOTE": metrics_smote}
)

# Plot comparison - one metric per subplot
metrics_list = list(metrics.keys())
n_metrics = len(metrics_list)

# Alternative: Single plot with all metrics grouped by method
fig, ax = plt.subplots(figsize=(14, 8))

x = np.arange(len(metrics_list))
width = 0.25

bars1 = ax.bar(
    x - width,
    [metrics[m] for m in metrics_list],
    width,
    label="Original",
    color="skyblue",
)
bars2 = ax.bar(
    x,
    [metrics_ros[m] for m in metrics_list],
    width,
    label="Random Oversampling",
    color="lightgreen",
)
bars3 = ax.bar(
    x + width,
    [metrics_smote[m] for m in metrics_list],
    width,
    label="SMOTE",
    color="salmon",
)

ax.set_xlabel("Metrics")
ax.set_ylabel("Score")
ax.set_title(
    "Performance Metrics Comparison Across Different Oversampling Methods",
    fontsize=14,
    fontweight="bold",
)
ax.set_xticks(x)
ax.set_xticklabels(metrics_list, rotation=45, ha="right")
ax.legend()
ax.set_ylim(0, 1.1)
plt.grid(axis="y", linestyle="--", alpha=0.7)


# Add value labels on top of bars
def add_labels(bars):
    for bar in bars:
        height = bar.get_height()
        ax.text(
            bar.get_x() + bar.get_width() / 2.0,
            height + 0.01,
            f"{height:.3f}",
            ha="center",
            va="bottom",
            fontsize=8,
        )


add_labels(bars1)
add_labels(bars2)
add_labels(bars3)

plt.tight_layout()
plt.show()

## Confusion matrix for oversampling

In [None]:
# Confusion matrix comparison
fig, ax = plt.subplots(1, 3, figsize=(14, 5))
ConfusionMatrixDisplay.from_estimator(model, X_test, y_test, ax=ax[0], cmap="Blues")
ax[0].set_title("Original")
ConfusionMatrixDisplay.from_estimator(model_ros, X_test, y_test, ax=ax[1], cmap="Blues")
ax[1].set_title("After Oversampling RandomOverSampler")
ConfusionMatrixDisplay.from_estimator(
    model_smote, X_test, y_test, ax=ax[2], cmap="Blues"
)
ax[2].set_title("After Oversampling SMOTE")
ax[0].figure.suptitle("Confusion Matrix Comparison", fontsize=16, fontweight="bold")
# Remove the color bar from all confusion matrix plots
for axes in ax:
    if hasattr(axes, "images") and axes.images:
        for im in axes.images:
            if im.colorbar:
                im.colorbar.remove()

plt.show()


# Undersample Majority

In [None]:
from imblearn.under_sampling import RandomUnderSampler, NearMiss

# Random undersampling of majority class
rus = RandomUnderSampler(random_state=42)
X_undersampled, y_undersampled = rus.fit_resample(X_train, y_train)

# Train on undersampled data
model_rus = LogisticRegression(max_iter=1000)
model_rus.fit(X_undersampled, y_undersampled)
y_pred = model_rus.predict(X_test)
y_proba = model_rus.predict_proba(X_test)[:, 1]

# Calculate metrics after undersampling
metrics_rus = {
    "Accuracy": accuracy_score(y_test, y_pred),
    "Balanced Accuracy": balanced_accuracy_score(y_test, y_pred),
    "Precision": precision_score(y_test, y_pred),
    "Recall": recall_score(y_test, y_pred),
    "F1": f1_score(y_test, y_pred),
    "Specificity": specificity_score(y_test, y_pred),
    "ROC AUC": roc_auc_score(y_test, y_proba),
}


# NearMiss of majority class
NM = NearMiss()
X_undersampled, y_undersampled = NM.fit_resample(X_train, y_train)

# Train on undersampled data
model_nm = LogisticRegression(max_iter=1000)
model_nm.fit(X_undersampled, y_undersampled)
y_pred = model_nm.predict(X_test)
y_proba = model_nm.predict_proba(X_test)[:, 1]
# Calculate metrics after undersampling
metrics_nm = {
    "Accuracy": accuracy_score(y_test, y_pred),
    "Balanced Accuracy": balanced_accuracy_score(y_test, y_pred),
    "Precision": precision_score(y_test, y_pred),
    "Recall": recall_score(y_test, y_pred),
    "F1": f1_score(y_test, y_pred),
    "Specificity": specificity_score(y_test, y_pred),
    "ROC AUC": roc_auc_score(y_test, y_proba),
}


In [None]:
# Create comparison dataframe
comparison_df = pd.DataFrame(
    {"Original": metrics, "Random Undersampling": metrics_rus, "NearMiss": metrics_nm}
)

# Plot comparison - one metric per subplot
metrics_list = list(metrics.keys())
n_metrics = len(metrics_list)

# Alternative: Single plot with all metrics grouped by method
fig, ax = plt.subplots(figsize=(14, 8))

x = np.arange(len(metrics_list))
width = 0.25

bars1 = ax.bar(
    x - width,
    [metrics[m] for m in metrics_list],
    width,
    label="Original",
    color="skyblue",
)
bars2 = ax.bar(
    x,
    [metrics_rus[m] for m in metrics_list],
    width,
    label="Random Undersampling",
    color="lightgreen",
)
bars3 = ax.bar(
    x + width,
    [metrics_nm[m] for m in metrics_list],
    width,
    label="NearMiss",
    color="salmon",
)

ax.set_xlabel("Metrics")
ax.set_ylabel("Score")
ax.set_title(
    "Performance Metrics Comparison Across Different Undersampling Methods",
    fontsize=14,
    fontweight="bold",
)
ax.set_xticks(x)
ax.set_xticklabels(metrics_list, rotation=45, ha="right")
ax.legend()
ax.set_ylim(0, 1.1)
plt.grid(axis="y", linestyle="--", alpha=0.7)


# Add value labels on top of bars
def add_labels(bars):
    for bar in bars:
        height = bar.get_height()
        ax.text(
            bar.get_x() + bar.get_width() / 2.0,
            height + 0.01,
            f"{height:.3f}",
            ha="center",
            va="bottom",
            fontsize=8,
        )


add_labels(bars1)
add_labels(bars2)
add_labels(bars3)

plt.tight_layout()
plt.show()

In [None]:
# Confusion matrix comparison
fig, ax = plt.subplots(1, 3, figsize=(14, 5))
ConfusionMatrixDisplay.from_estimator(model, X_test, y_test, ax=ax[0], cmap="Blues")
ax[0].set_title("Original")
ConfusionMatrixDisplay.from_estimator(model_rus, X_test, y_test, ax=ax[1], cmap="Blues")
ax[1].set_title("Random Undersampling")
ConfusionMatrixDisplay.from_estimator(model_nm, X_test, y_test, ax=ax[2], cmap="Blues")
ax[2].set_title("NearMiss")
plt.tight_layout()
# Remove the color bar from all confusion matrix plots
for axes in ax:
    if hasattr(axes, "images") and axes.images:
        for im in axes.images:
            if im.colorbar:
                im.colorbar.remove()

plt.show()

# We can combine the upsampling and downsampling techniques

In [None]:
from imblearn.combine import SMOTEENN, SMOTETomek

# Combined method SMOTE + ENN
smote_eenn = SMOTEENN(random_state=42)
X_undersampled, y_undersampled = smote_eenn.fit_resample(X_train, y_train)

# Train on undersampled data
model_smote_eenn = LogisticRegression(max_iter=1000)
model_smote_eenn.fit(X_undersampled, y_undersampled)
y_pred = model_smote_eenn.predict(X_test)
y_proba = model_smote_eenn.predict_proba(X_test)[:, 1]

# Calculate metrics after undersampling
metrics_smote_eenn = {
    "Accuracy": accuracy_score(y_test, y_pred),
    "Balanced Accuracy": balanced_accuracy_score(y_test, y_pred),
    "Precision": precision_score(y_test, y_pred),
    "Recall": recall_score(y_test, y_pred),
    "F1": f1_score(y_test, y_pred),
    "Specificity": specificity_score(y_test, y_pred),
    "ROC AUC": roc_auc_score(y_test, y_proba),
}

# Combined method SMOTE + ENN
smote_tomek = SMOTETomek(random_state=42)
X_undersampled, y_undersampled = smote_tomek.fit_resample(X_train, y_train)

# Train on undersampled data
model_smote_tomek = LogisticRegression(max_iter=1000)
model_smote_tomek.fit(X_undersampled, y_undersampled)
y_pred = model_smote_tomek.predict(X_test)
y_proba = model_smote_tomek.predict_proba(X_test)[:, 1]

# Calculate metrics after undersampling
metrics_smote_tomek = {
    "Accuracy": accuracy_score(y_test, y_pred),
    "Balanced Accuracy": balanced_accuracy_score(y_test, y_pred),
    "Precision": precision_score(y_test, y_pred),
    "Recall": recall_score(y_test, y_pred),
    "F1": f1_score(y_test, y_pred),
    "Specificity": specificity_score(y_test, y_pred),
    "ROC AUC": roc_auc_score(y_test, y_proba),
}


In [None]:
# Create comparison dataframe
comparison_df = pd.DataFrame(
    {"Original": metrics, "SMOTEENN": metrics_smote_eenn, "Smote Tomek": metrics_smote_tomek}
)

# Plot comparison - one metric per subplot
metrics_list = list(metrics.keys())
n_metrics = len(metrics_list)

# Alternative: Single plot with all metrics grouped by method
fig, ax = plt.subplots(figsize=(14, 8))

x = np.arange(len(metrics_list))
width = 0.25

bars1 = ax.bar(
    x - width,
    [metrics[m] for m in metrics_list],
    width,
    label="Original",
    color="skyblue",
)
bars2 = ax.bar(
    x,
    [metrics_smote_eenn[m] for m in metrics_list],
    width,
    label="SMOTEENN",
    color="lightgreen",
)
bars3 = ax.bar(
    x + width,
    [metrics_smote_tomek[m] for m in metrics_list],
    width,
    label="Smote Tomek",
    color="salmon",
)

ax.set_xlabel("Metrics")
ax.set_ylabel("Score")
ax.set_title(
    "Performance Metrics Comparison Across Different Undersampling Methods",
    fontsize=14,
    fontweight="bold",
)
ax.set_xticks(x)
ax.set_xticklabels(metrics_list, rotation=45, ha="right")
ax.legend()
ax.set_ylim(0, 1.1)
plt.grid(axis="y", linestyle="--", alpha=0.7)


# Add value labels on top of bars
def add_labels(bars):
    for bar in bars:
        height = bar.get_height()
        ax.text(
            bar.get_x() + bar.get_width() / 2.0,
            height + 0.01,
            f"{height:.3f}",
            ha="center",
            va="bottom",
            fontsize=8,
        )


add_labels(bars1)
add_labels(bars2)
add_labels(bars3)

plt.tight_layout()
plt.show()

In [None]:
# Confusion matrix comparison
fig, ax = plt.subplots(1, 3, figsize=(14, 5))
ConfusionMatrixDisplay.from_estimator(model, X_test, y_test, ax=ax[0], cmap="Blues")
ax[0].set_title("Original")
ConfusionMatrixDisplay.from_estimator(model_smote_eenn, X_test, y_test, ax=ax[1], cmap="Blues")
ax[1].set_title("SmoteENN")
ConfusionMatrixDisplay.from_estimator(model_smote_tomek, X_test, y_test, ax=ax[2], cmap="Blues")
ax[2].set_title("Smote Tomek")
plt.tight_layout()
# Remove the color bar from all confusion matrix plots
for axes in ax:
    if hasattr(axes, "images") and axes.images:
        for im in axes.images:
            if im.colorbar:
                im.colorbar.remove()

plt.show()

# To do!

### Combine other two sampling strategies and plot the results. 