# Set up

In [None]:
import warnings


def warn(*args, **kwargs):
    pass


warnings.warn = warn

In [None]:
import logging
import matplotlib.patches as mpatches
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import pathlib
import SimpleITK as sitk

plt.rcParams["font.family"] = "DeJavu Serif"
plt.rcParams["font.serif"] = ["Times New Roman"]

In [None]:
root_dir = pathlib.Path("..").resolve()

In [None]:
def get_labels_and_features(rater: str = "900", label: int = 1, from_image: str = "t2w") -> tuple:
    """
    Reads a CSV file from the given label and returns the labels and features as separate dataframes.

    :param rater: The rater identifier. Default is "900".
    :type rater: str
    :param label: A number from 1 to 5 indicating the disc of interest. Default is 1.
    :type label: bool
    :return: A tuple containing the labels and features.
    :rtype: tuple
    """
    labels_df = pd.read_csv(root_dir.joinpath("data", f"midasdisclabels{rater}.csv"), sep=",")
    labels_df.dropna(inplace=True)
    labels_df.rename(columns={"subject_ID": "Subject_XNAT", "ID": "Session_XNAT"}, inplace=True)

    midas_img_relation = pd.read_csv(root_dir.joinpath("data", "filtered_midas900_t2w.csv"), sep=",")
    midas_img_relation["Subject_MIDS"] = midas_img_relation["Image"].map(lambda x: x.split("/")[8])
    midas_img_relation["Session_MIDS"] = midas_img_relation["Image"].map(lambda x: x.split("/")[9])
    midas_img_relation["Subject_XNAT"] = midas_img_relation["Subject_MIDS"].map(lambda x: f"ceibcs_S{int(x.split('sub-S')[1])}")
    midas_img_relation["Session_XNAT"] = midas_img_relation["Session_MIDS"].map(lambda x: f"ceibcs_E{int(x.split('ses-E')[1])}")

    id_labels = labels_df.merge(midas_img_relation, on=["Subject_XNAT", "Session_XNAT"])
    id_labels.rename(
        columns={
            "L5-S": "1",
            "L4-L5": "2",
            "L3-L4": "3",
            "L2-L3": "4",
            "L1-L2": "5",
        },
        inplace=True,
    )

    radiomic_features = pd.read_csv(root_dir.joinpath("data", f"filtered_midas900_{from_image}_radiomics.csv"), sep=",")
    radiomic_features.rename(columns={"Unnamed: 0": "ID"}, inplace=True)

    data = id_labels.merge(radiomic_features, on="ID")

    data = data.rename(columns={str(label): f"label{label}", "ID": f"label{label}ID"})
    columns_mask = data.columns.str.contains(f"label{label}") & ~data.columns.str.contains("Configuration")
    data = data.loc[:, columns_mask]
    data = data.rename(columns={f"label{label}": "label", f"label{label}ID": "ID"})

    label_data = data.dropna(axis=0, how="any")
    label_data = label_data.loc[label_data["label"] != 0]
    label_data["ID"] = label_data["ID"].map(lambda x: x + str(label))
    label_data = label_data.set_index("ID")
    labels = label_data["label"]
    features = label_data[label_data.select_dtypes(include="number").columns.tolist()].drop(columns="label")
    return labels, features


def get_labels_and_features_all_discs(rater: str = "900", verbose: bool = False, from_image: str = "t1w_t2w") -> tuple:
    """
    Get labels and features for all discs.

    :param rater: The rater identifier. Default is "900".
    :type rater: str
    :param verbose: Whether to print additional information and plot the label distribution. Default is False.
    :type verbose: bool
    :return: A tuple containing the labels and features.
    :rtype: tuple
    """
    features = []
    labels = []
    for label in range(1, 6):
        labels_i, features_i = get_labels_and_features(rater=rater, label=label, from_image=from_image)
        labels.append(labels_i)
        features_i = features_i.rename(columns={name: name.replace(f"label{label}_", "") for name in features_i.columns.to_list()})
        features.append(features_i)
    features = pd.concat(features, axis=0)
    labels = pd.concat(labels, axis=0)
    if verbose:
        print(f"Labels shape: {labels.shape}, Features shape: {features.shape}")
        labels.plot(kind="hist", xticks=[1, 2, 3, 4, 5], title="Label distribution")
        plt.show()
    return labels, features

In [None]:
from sklearn.base import BaseEstimator, TransformerMixin
from sklearn.feature_selection import VarianceThreshold


class VarianceFeatureReduction(BaseEstimator, TransformerMixin):
    """
    VarianceFeatureReduction is a transformer that reduces the feature space by removing features with low variance.

    Parameters:
    -----------
    threshold : float, optional (default=0.05)
        The threshold below which features will be removed.
    """

    def __init__(self, threshold=0.05):
        self.threshold = threshold
        self.selector = None

    def fit(self, X, y=None):
        """
        Fit the VarianceFeatureReduction transformer to the input data.

        Parameters:
        -----------
        X : array-like, shape (n_samples, n_features)
            The input data.
        y : array-like, shape (n_samples,), optional (default=None)
            The target values.

        Returns:
        --------
        self : object
            Returns self.
        """
        self.selector = VarianceThreshold(threshold=self.threshold)
        self.selector.fit(X)
        return self

    def transform(self, X, y=None):
        """
        Transform the input data by removing features with low variance.

        Parameters:
        -----------
        X : array-like, shape (n_samples, n_features)
            The input data.
        y : array-like, shape (n_samples,), optional (default=None)
            The target values.

        Returns:
        --------
        X_ : array-like, shape (n_samples, n_selected_features)
            The transformed data with low variance features removed.
        """
        X_ = X.copy()
        X_ = X_.loc[:, self.selector.get_support()]
        return X_


class CorrelationFeatureReduction(BaseEstimator, TransformerMixin):
    """
    A transformer class for reducing features based on correlation.

    Parameters:
    -----------
    threshold : float, optional (default=0.8)
        The threshold above which features will be removed.
    """

    def __init__(self, threshold=0.8):
        self.threshold = threshold
        self.corr_matrix_var = None
        self.to_keep = None

    def fit(self, X, y=None):
        """
        Fit the transformer to the input data.

        Parameters:
        -----------
        X : pandas DataFrame
            The input data.

        Returns:
        --------
        self : CorrelationFeatureReduction
            The fitted transformer object.

        """
        self.corr_matrix_var = X.corr(method="spearman").abs()  # absolute correlation matrix

        # Initialize the flag vector with True values
        self.to_keep = np.full((self.corr_matrix_var.shape[1]), True, dtype=bool)

        for i in range(self.corr_matrix_var.shape[1]):
            for j in range(i + 1, self.corr_matrix_var.shape[1]):
                if self.to_keep[i] and self.corr_matrix_var.iloc[i, j] >= self.threshold:
                    if self.to_keep[j]:
                        self.to_keep[j] = False
        return self

    def transform(self, X, y=None):
        """
        Transform the input data by removing highly correlated features.

        Parameters:
        -----------
        X : pandas DataFrame
            The input data.

        Returns:
        --------
        X_ : pandas DataFrame
            The transformed data with highly correlated features removed.

        """
        X_ = X.copy()
        X_ = X_.iloc[:, self.to_keep]
        return X_

In [None]:
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier, AdaBoostClassifier, ExtraTreesClassifier
from sklearn.svm import SVC
from sklearn.linear_model import LogisticRegression, SGDClassifier
from sklearn.naive_bayes import GaussianNB
from sklearn.neighbors import KNeighborsClassifier
from sklearn.neural_network import MLPClassifier
from sklearn.metrics import f1_score
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler


def test_multiple_models(features, labels):
    # Define classifiers to test
    classifiers = {
        "Random Forest": RandomForestClassifier(),
        "Gradient Boosting": GradientBoostingClassifier(),
        "SVM": SVC(),
        "Logistic Regression": LogisticRegression(),
        "Stochastic Gradient Descent": SGDClassifier(),
        "Naive Bayes": GaussianNB(),
        "K-Nearest Neighbors": KNeighborsClassifier(),
        "Multilayer Perceptron": MLPClassifier(),
        "AdaBoost": AdaBoostClassifier(),
        "ExtraTrees": ExtraTreesClassifier(),
    }

    # Split the data into training and testing sets
    X_train, X_test, y_train, y_test = train_test_split(features, labels, test_size=0.25, random_state=0, stratify=labels)

    # Test each classifier
    f1_scores = {}
    for name, clf in classifiers.items():
        pipeline = Pipeline(
            [
                ("variancethreshold", VarianceFeatureReduction(threshold=0.05)),
                ("correlationreduction", CorrelationFeatureReduction()),
                ("scaler", StandardScaler()),
                ("classifier", clf),
            ]
        )
        pipeline.fit(X_train, y_train)
        y_pred = pipeline.predict(X_test)
        f1_scores[name] = f1_score(y_test, y_pred, average="weighted")

    # Select the classifier with the highest F1 score
    best_classifier = max(f1_scores, key=f1_scores.get)  # type: ignore
    print("Best classifier:", best_classifier)
    print("F1 score:", f1_scores[best_classifier])

In [None]:
from sklearn.model_selection import cross_val_score, StratifiedKFold


def cv(clf, features, labels):
    # Create a stratified 5-fold cross-validation object
    skf = StratifiedKFold(n_splits=5)

    # Perform cross-validation
    pipeline_clf = Pipeline(
        [
            ("variancethreshold", VarianceFeatureReduction(threshold=0.05)),
            ("correlationreduction", CorrelationFeatureReduction()),
            ("scaler", StandardScaler()),
            ("classifier", clf),
        ]
    )
    scores = cross_val_score(pipeline_clf, features, labels, cv=skf, scoring="f1_weighted")
    print(f"Cross Validation F1 Score: {scores.mean():0.4f} +/- {scores.std():0.2f}")

In [None]:
from imblearn.ensemble import BalancedBaggingClassifier
from imblearn.ensemble import BalancedRandomForestClassifier
from imblearn.ensemble import RUSBoostClassifier
from imblearn.ensemble import EasyEnsembleClassifier
from imblearn.under_sampling import RandomUnderSampler


def imbalanced_learning_suite(features, labels):
    # Define classifiers to test
    classifiers = {
        "Balanced Bagging Classifier": BalancedBaggingClassifier(sampler=RandomUnderSampler()),
        "Balanced RandomForest Classifier": BalancedRandomForestClassifier(),
        "RUS Boost Classifier": RUSBoostClassifier(),
        "Easy Ensemble Classifier": EasyEnsembleClassifier(),
    }

    # Create a stratified 5-fold cross-validation object
    skf = StratifiedKFold(n_splits=5)

    # Perform cross-validation
    for name, clf in classifiers.items():
        pipeline_clf = Pipeline(
            [
                ("variancethreshold", VarianceFeatureReduction(threshold=0.05)),
                ("correlationreduction", CorrelationFeatureReduction()),
                ("scaler", StandardScaler()),
                ("classifier", clf),
            ]
        )
        scores = cross_val_score(pipeline_clf, features, labels, cv=skf, scoring="f1_weighted")
        print(f"{name}: {scores.mean():0.2f} f1 with a standard deviation of {scores.std():0.2f}")

In [None]:
import yellowbrick.classifier as viz

from yellowbrick.style import set_palette

set_palette("flatui")


def visual_metrics(clf, features, labels, classes=["1", "2", "3", "4", "5"]):
    labels_ = labels.copy()
    if min(labels_) != 0:
        labels_ = labels_ - min(labels_)
    X_train, X_test, y_train, y_test = train_test_split(features, labels_, test_size=0.25, random_state=0, stratify=labels_)

    _, axes = plt.subplots(1, 3, figsize=(15, 5))
    labels.plot(kind="hist", title="Pfirmann grade distribution", ax=axes[0], xticks=[1, 2, 3, 4, 5], align="mid")

    pipeline_clf = Pipeline(
        [
            ("variancethreshold", VarianceFeatureReduction(threshold=0.05)),
            ("correlationreduction", CorrelationFeatureReduction()),
            ("scaler", StandardScaler()),
            ("classifier", clf),
        ]
    )
    pipeline_clf.fit(X_train, y_train)
    axes[1].set_title("Classification Report")
    axes[1].set_ylabel("Class")
    visualizer_class = viz.ClassificationReport(pipeline_clf, classes=classes[::-1], support=True, ax=axes[1], cmap="Blues")
    visualizer_class.score(X_test, y_test)

    axes[2].set_title("Classification Prediction Error")
    axes[2].set_xlabel("Class")
    axes[2].set_ylabel("Number of Predictions")
    visualizer_pred = viz.ClassPredictionError(pipeline_clf, classes=classes, ax=axes[2])
    visualizer_pred.score(X_test, y_test)

    plt.tight_layout()
    plt.show()

In [None]:
from sklearn.model_selection import RandomizedSearchCV


def random_search(clf, distribution, features, labels):
    pipeline_clf = Pipeline(
        [
            ("variancethreshold", VarianceFeatureReduction(threshold=0.05)),
            ("correlationreduction", CorrelationFeatureReduction()),
            ("scaler", StandardScaler()),
            ("classifier", clf),
        ]
    )

    skf = StratifiedKFold(n_splits=5)
    rs_clf = RandomizedSearchCV(pipeline_clf, distribution, cv=skf, scoring="f1_weighted", n_iter=10, random_state=0)
    search = rs_clf.fit(features, labels)

    print(f"Best parameter (CV score={search.best_score_:0.3f}): {search.best_params_}")
    return {key.replace("classifier__", ""): value for key, value in search.best_params_.items() if key.startswith("classifier__")}

# EDA

## Feature Selection

### Near-zero variance

In [None]:
import numpy as np

# Feature reduction based on variance thresholding (remove features with variance smaller than 0.05)
from sklearn.feature_selection import VarianceThreshold

# Initialize selector based on VarianceThreshold
selector = VarianceThreshold(threshold=0.05)

#  Estimate variances and reduce features
labels, radiomic_features = get_labels_and_features_all_discs(rater="JDCarlos")
selector.fit_transform(radiomic_features)

# Get the selected feature labels and reduce the Radiomic_Feature dataframe
radiomic_features_var = radiomic_features.loc[:, selector.get_support()]

# Display the number of features removed
print(f"{np.count_nonzero(~selector.get_support())}/{radiomic_features.shape[1]} features were removed due to near-zero variance.")

del selector

### Correlation

In [None]:
corr_matrix_var = radiomic_features_var.corr(method="spearman").abs()  # absolute correlation matrix

# Initialize the flag vector with True values
to_keep = np.full((corr_matrix_var.shape[1]), True, dtype=bool)

for i in range(corr_matrix_var.shape[1]):
    for j in range(i + 1, corr_matrix_var.shape[1]):
        if to_keep[i] and corr_matrix_var.iloc[i, j] >= 0.8:
            if to_keep[j]:
                to_keep[j] = False

# Retain features that are not higly correlated
radiomic_features_corr = radiomic_features_var.iloc[:, to_keep]

print(
    f"{np.count_nonzero(~to_keep)}/{radiomic_features_var.shape[1]} features were removed due to high correlation. {radiomic_features_corr.shape[1]} features remaining."
)

del to_keep, i, j

In [None]:
import seaborn as sns

# Calculate the correlation matrix of the original feature set
corr_matrix = radiomic_features.corr(method="spearman")
# Display the correlation matrix
plt.figure(figsize=(8, 6.5))
sns.heatmap(corr_matrix, cmap="coolwarm", vmin=-1, vmax=1)
plt.title("Correlation of Radiomic features")
plt.show()

# Calculate the correlation matrix of the reduced feature set
corr_matrix_red = radiomic_features_corr.corr(method="spearman")
# Display the correlation matrix
plt.figure(figsize=(8, 6.5))
sns.heatmap(corr_matrix_red, cmap="coolwarm", vmin=-1, vmax=1)
plt.title("Correlation of reduced radiomic features")
plt.show()

# Clustering

In [None]:
import copy
from scipy.stats import zscore
from scipy.spatial import distance
from scipy.cluster import hierarchy
from scipy.cluster.hierarchy import linkage, fcluster

radiomic_features_clus = copy.deepcopy(radiomic_features_corr)
# Normalize the data
radiomic_features_clus = zscore(radiomic_features_clus, axis=0)

# Calculate and plot the clustergram
row_linkage = hierarchy.linkage(distance.pdist(radiomic_features_clus.to_numpy()), method="ward")
col_linkage = hierarchy.linkage(distance.pdist(radiomic_features_clus.T.to_numpy()), method="ward")
g = sns.clustermap(
    radiomic_features_clus, row_linkage=row_linkage, col_linkage=col_linkage, method="ward", vmin=-3, vmax=3, figsize=(8, 10), cmap="viridis"
)
g.ax_cbar.set_position((0.90, 0.2, 0.03, 0.3))

# Extract 5 disc degeneration clusters and append the "Clusters" variable to the DataFrame
n_clusters = 5
radiomic_features_clus["Clusters"] = fcluster(row_linkage, n_clusters, criterion="maxclust")

# Print the cluster assignments
print("Cluster Assignments:", radiomic_features_clus["Clusters"])

del g, n_clusters

In [None]:
from scipy.stats import chi2_contingency

# Concatenate the target clinical variable to the radiomic DataFrame
radiomic_features_clus = pd.concat([radiomic_features_clus, labels], axis=1)

# Barplot clusters/grades
plt.figure(figsize=(8, 8))
sns.countplot(x="Clusters", hue="label", data=radiomic_features_clus, palette="coolwarm")
plt.title("Distribution of Pfirmann grade in each Cluster")
plt.xlabel("Cluster")
plt.ylabel("Count")
plt.legend(title="Pfirmann grade", loc="upper right")
plt.show()

# Perform chi-squared test
chi2, p, _, _ = chi2_contingency(pd.crosstab(radiomic_features_clus["label"], radiomic_features_clus["Clusters"]))

# Print the results
print(f"Chi-squared statistic: {chi2}")
print(f"P-value: {p}")

# Classification

## Per disc

In [None]:
def disc_results(clf, disc: int = 1):
    labels, features = get_labels_and_features(rater="JDCarlos", label=disc)
    cv(clf, features, labels)
    visual_metrics(clf, features, labels)

In [None]:
for label in range(1, 6):
    print(f"Disc: {label}")
    labels, features = get_labels_and_features(rater="JDCarlos", label=label)
    test_multiple_models(features, labels)

 ### Disc 1

In [None]:
clf = GradientBoostingClassifier()
disc_results(clf)

### Disc 2

In [None]:
clf = MLPClassifier()
disc_results(clf, disc=2)

### Disc 3

In [None]:
clf = MLPClassifier()
disc_results(clf, disc=3)

### Disc 4

In [None]:
clf = GradientBoostingClassifier()
disc_results(clf, disc=4)

### Disc 5

In [None]:
clf = SVC()
disc_results(clf, disc=5)

## All discs

In [None]:
def all_discs_results(clf):
    labels, features = get_labels_and_features_all_discs(rater="JDCarlos")
    cv(clf, features, labels)
    visual_metrics(clf, features, labels)

In [None]:
labels, features = get_labels_and_features_all_discs(rater="JDCarlos")
test_multiple_models(features, labels)

In [None]:
distribution = {
    "classifier__loss": ["log_loss", "exponential"],
    "classifier__learning_rate": [0.01, 0.1, 0.3, 0.5],
    "classifier__n_estimators": [10, 50, 100],
    "classifier__max_depth": [1, 5, 10],
    "classifier__max_features": [None, "sqrt", "log2"],
}
labels, features = get_labels_and_features_all_discs(rater="JDCarlos")
best_params = random_search(GradientBoostingClassifier(), distribution, features, labels)

In [None]:
clf = SVC()
all_discs_results(clf)

In [None]:
labels, features = get_labels_and_features_all_discs(rater="JDCarlos")
imbalanced_learning_suite(features, labels)

## Combining classes 1 and 2

### Per disc

In [None]:
def disc_results_combining_1_and_2(clf, disc: int = 1):
    labels, features = get_labels_and_features(rater="JDCarlos", label=disc)
    labels.loc[labels == 1] = 2
    cv(clf, features, labels)
    visual_metrics(clf, features, labels, classes=["1 and 2", "3", "4", "5"])

In [None]:
for label in range(1, 6):
    print(f"Disc: {label}")
    labels, features = get_labels_and_features(rater="JDCarlos", label=label)
    labels.loc[labels == 1] = 2
    test_multiple_models(features, labels)

#### Disc 1

In [None]:
clf = MLPClassifier()
disc_results_combining_1_and_2(clf)

#### Disc 2

In [None]:
clf = ExtraTreesClassifier()
disc_results_combining_1_and_2(clf, disc=2)

#### Disc 3

In [None]:
clf = ExtraTreesClassifier()
disc_results_combining_1_and_2(clf, disc=3)

#### Disc 4

In [None]:
clf = RandomForestClassifier()
disc_results_combining_1_and_2(clf, disc=4)

#### Disc 5

In [None]:
clf = GradientBoostingClassifier()
disc_results_combining_1_and_2(clf, disc=5)

### All discs

In [None]:
def all_discs_results_combining_1_and_2(clf):
    labels, features = get_labels_and_features_all_discs(rater="JDCarlos")
    labels.loc[labels == 1] = 2
    cv(clf, features, labels)
    visual_metrics(clf, features, labels, classes=["1 and 2", "3", "4", "5"])

In [None]:
labels, features = get_labels_and_features_all_discs(rater="JDCarlos")
labels.loc[labels == 1] = 2
test_multiple_models(features, labels)

In [None]:
distribution = {
    "classifier__loss": ["log_loss", "exponential"],
    "classifier__learning_rate": [0.01, 0.1, 0.3, 0.5],
    "classifier__n_estimators": [10, 50, 100],
    "classifier__max_depth": [1, 5, 10],
    "classifier__max_features": [None, "sqrt", "log2"],
}
labels, features = get_labels_and_features_all_discs(rater="JDCarlos")
labels.loc[labels == 1] = 2
best_params = random_search(RandomForestClassifier(), distribution, features, labels)

In [None]:
clf = RandomForestClassifier()
all_discs_results_combining_1_and_2(clf)

In [None]:
labels, features = get_labels_and_features_all_discs(rater="JDCarlos")
labels.loc[labels == 1] = 2
imbalanced_learning_suite(features, labels)