### Multiclass diagnosis of Alzheimer's disease stages based on multimodal data

#### Importing libraries

In [1]:
import numpy as np
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt

from sklearn.cross_decomposition import CCA
from sklearn.svm import SVC
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import confusion_matrix, classification_report, make_scorer, accuracy_score, f1_score, precision_score, recall_score, auc, roc_auc_score, roc_curve
from sklearn.model_selection import train_test_split, StratifiedKFold, GridSearchCV, RandomizedSearchCV
from sklearn.impute import SimpleImputer
from sklearn.preprocessing import MinMaxScaler
from sklearn.model_selection import GridSearchCV
from sklearn.multiclass import OneVsOneClassifier, OneVsRestClassifier

from imblearn.combine import SMOTEENN, SMOTETomek
from imblearn.over_sampling import SMOTE

In [2]:
def print_df_info(df):
    print(f'df shape: {df.shape}')
    #print(f'df info: {df.info()}')
    print(f'Number os NANs: {df.isnull().sum().sum()}')
    print()

#### Importing data

In [3]:
df_patients = pd.read_csv("resource/patients.csv")
df_csf_source = pd.read_csv("resource/csf_data_roche_elecsys.csv")
df_mri_source = pd.read_csv("resource/ASHS volume data [ADNI2,3].csv") # ASHS volume data [ADNI2,3]
df_pet_source = pd.read_csv("resource/UCBERKELEY_AMY_6MM_07Aug2024.csv")

#### CSF Analysis

In [None]:
df_csf = df_csf_source[["RID", "VISCODE2", "ABETA42", "TAU", "PTAU"]]
print_df_info(df_csf)

print(round(df_csf.isnull().sum() / len(df_csf) * 100, 2))

df_csf = df_csf.dropna(axis=0) # remove entire column
print_df_info(df_csf)

#### MRI Image Analysis

In [None]:
df_mri = df_mri_source.iloc[:, 5:-1]
df_mri.insert(0, 'RID', df_mri_source['RID'])
df_mri.insert(1, 'VISCODE', df_mri_source['VISCODE'])

print_df_info(df_mri)

#### PET Image Analysis

In [None]:
df_pet = df_pet_source.iloc[:, 11:-1]
col_index = df_pet.columns.get_loc('AMYLOID_STATUS_COMPOSITE_REF') # get the index of AMYLOID_STATUS_COMPOSITE_REF

df_pet.insert(0, 'RID', df_pet_source['RID'])
df_pet.insert(1, 'VISCODE', df_pet_source['VISCODE'])
print_df_info(df_pet)

print(round(df_pet.isnull().sum()/len(df_pet)*100, 2))

# handle missing data for categorical input
cat_imputer = SimpleImputer(strategy='most_frequent')
df_pet['AMYLOID_STATUS'] = cat_imputer.fit_transform(df_pet[['AMYLOID_STATUS']])
df_pet['AMYLOID_STATUS_COMPOSITE_REF'] = cat_imputer.fit_transform(df_pet[['AMYLOID_STATUS']])

# handle missing data for numerical input
num_imputer = SimpleImputer(strategy='mean')
for col in df_pet.columns[col_index + 1:]:
    df_pet[col] = num_imputer.fit_transform(df_pet[[col]])

# print info
print_df_info(df_pet)

#### Merge Data

In [None]:
# merge csf + patients
df_merged = pd.merge(df_patients[['PHASE', 'RID', 'VISCODE', 'VISCODE2', 'DIAGNOSIS']], df_csf, on=['RID', 'VISCODE2'], how='inner')
df_merged.dropna(inplace=True, axis=0) # remove entire row
print_df_info(df_merged)

# merge mri in df_merged
df_merged = pd.merge(df_merged, df_mri, on=['RID', 'VISCODE'], how='inner')
print_df_info(df_merged)

# merge PET in df_merged
df_merged = pd.merge(df_merged, df_pet, on=['RID', 'VISCODE'], how='inner')
print_df_info(df_merged)

print(df_merged.value_counts('DIAGNOSIS'))
df_merged.head()

#### Graph Analisys

Count Plot

In [None]:
def print_hist(stat_type = 'count', palette = 'pastel'):
    # Step 1: Define the custom order and labels
    class_order = [1.0, 2.0, 3.0]
    class_labels = ['HS', 'MCI', 'AD']
    
    sep = ''
    if (stat_type == 'percent'):
        sep = '%'

    # Step 2: Plot the frequency graph with custom labels
    plt.figure(figsize=(8, 6))
    ax = sns.countplot(x='DIAGNOSIS', data=df_merged, hue='DIAGNOSIS', order=class_order, palette=palette, stat=stat_type, legend='full')

    # Step 3: Add the exact count values on top of the bars
    for p, label in zip(ax.patches, class_labels):
        ax.annotate(f'{int(p.get_height())}' + sep, 
                    (p.get_x() + p.get_width() / 2.0, p.get_height()), 
                    ha='center', va='baseline', 
                    fontsize=12, color='black', xytext=(0, 5), 
                    textcoords='offset points')

    # Update x-tick labels to use the custom labels
    ax.set_xticklabels(class_labels)
    
    plt.xlabel('Diagnosis Class')
    plt.ylabel('Frequency')
    plt.title('Frequency of Diagnosis Classes')
    plt.show()

print_hist(stat_type = 'count', palette = 'flare')
print_hist(stat_type = 'percent', palette = 'flare')


#### Scaling Data

In [8]:
state = 1024

In [None]:
X = df_merged.iloc[:, 5:]
Y = df_merged[['DIAGNOSIS']]

print_df_info(X)
print_df_info(Y)

scaler = MinMaxScaler()
X_scaled = scaler.fit_transform(X)

#### Reduce Data Dimension

In [93]:
X_reduced = X_scaled

Sequential Feature Selection

In [None]:
from sklearn.feature_selection import SequentialFeatureSelector

svm = SVC(C=1.0, decision_function_shape='ovr', kernel='rbf', random_state=state)
sfs = SequentialFeatureSelector(svm, n_features_to_select=10, scoring=make_scorer(f1_score , average='macro'), cv=10, n_jobs=-1)
X_reduced = sfs.fit_transform(X_scaled, Y.values.ravel())

print(X_reduced.shape)

In [None]:
# Get the indices of the selected features
selected_features_indices = sfs.get_support(indices=True)

# Get the names of the selected features
selected_features_names = X.columns[selected_features_indices]

# Print the selected feature names
print("Selected features:", selected_features_names)

Canonical Correlation Analysis

In [None]:
cca = CCA(n_components=1)
print(X_scaled.shape)

X_reduced = cca.fit(X_scaled, Y.values.ravel()).transform(X_scaled)
print(X_reduced.shape)

In [None]:
coef_df = pd.DataFrame(np.round(cca.coef_, 2), columns = [X.columns]).T
print(coef_df)
print(coef_df.columns[0])

print(coef_df.nlargest(10, columns=coef_df.columns[0]))

####  Oversampling

Imbalanced Data: If your data is imbalanced, meaning one class is much more prevalent than the others, the model might be overfitting to the dominant class. This could lead to high precision for that class but poor generalization.

Increase the number of samples in the minority class using techniques like SMOTE (Synthetic Minority Over-sampling Technique) or simple random oversampling.

In [13]:
X_resampled = X_reduced
Y_resampled = Y

SMOTETomek

In [None]:
# Over-sampling using SMOTE and cleaning using Tomek links.
# Combine over- and under-sampling using SMOTE and Tomek links.
from imblearn.under_sampling import *

oversample = SMOTETomek(n_jobs=-1, random_state=state)
X_resampled, Y_resampled = oversample.fit_resample(X_reduced, Y)

print(X_reduced.shape)
print(Y.shape)
print(Y.value_counts())
print()

print(X_resampled.shape)
print(Y_resampled.shape)
print(Y_resampled.value_counts())


#### Find Best Model

SVM

In [None]:
param_grid = { 
    'kernel': ['linear', 'rbf'],
    'C': [1e-3, 1e-2, 1e-1, 1.0, 1e1, 1e2],
    'decision_function_shape': ['ovo', 'ovr']
    #'class_weight': ['balanced']
}

# target metric
target_score = make_scorer(f1_score , average='macro')

# Set up the GridSearchCV
grid_search = GridSearchCV(estimator=SVC(), param_grid=param_grid, 
                           scoring=target_score, cv=10, verbose=1, n_jobs=-1)

# Fit the grid search to the data
grid_search.fit(X_resampled, Y_resampled.values.ravel())

print(round(grid_search.best_score_ * 100.0, 2))
print(grid_search.best_params_)

print(Y.value_counts())
print(Y_resampled.value_counts())

x_train, x_test, y_train, y_test = train_test_split(X_resampled, Y_resampled.values.ravel(), test_size=0.30, random_state=42)
svm = SVC(C=1.0, class_weight='balanced', decision_function_shape='ovr', kernel='rbf')
svm.fit(x_train, y_train)
y_pred = svm.predict(x_test)
print(classification_report(y_test, y_pred))

#### Training Data

In [12]:
Y_transformed = Y_resampled.values.ravel()

def train_with_test_split(clf, test_size = 0.30, random_state = state):
    X_train, X_test, y_train, y_test = train_test_split(X_resampled, Y_transformed, test_size=test_size, random_state=random_state)
    scores = []

    clf.fit(X_train, y_train)
    y_pred = clf.predict(X_test)
    scores.append((y_test, y_pred))
    
    return scores

def train_with_cross_validation(clf, kfolds = 10, suffle = True, random_state = state):
    skf = StratifiedKFold(n_splits=kfolds, shuffle=suffle, random_state=random_state)
    scores = []
    
    for train_index, test_index in skf.split(X_resampled, Y_transformed):
        x_train_fold, x_test_fold = X_resampled[train_index], X_resampled[test_index]
        y_train_fold, y_test_fold = Y_transformed[train_index], Y_transformed[test_index]
        clf.fit(x_train_fold, y_train_fold)
        y_pred_fold = clf.predict(x_test_fold)
        scores.append((y_test_fold, y_pred_fold))
        
    return scores

#### Data Report

In [13]:
def plot_confusion_matrix(y_test, y_pred, pallete='flare'):
    cm = confusion_matrix(y_test, y_pred)
    classes = ['HS', 'MCI', 'AD']

    cm_percentage = cm.astype('float') / cm.sum(axis=1)[:, np.newaxis] * 100
    cm_df = pd.DataFrame(cm_percentage, index=classes, columns=classes)

    plt.figure(figsize=(5, 4))
    sns.heatmap(cm_df, annot=True, cmap=sns.color_palette(pallete, as_cmap=True), fmt='.2f')
    
    for t in plt.gca().texts:
        t.set_text(t.get_text() + "%")
    
    plt.title('Matriz de Confusão (Porcentagem)')
    plt.ylabel('Valores Reais')
    plt.xlabel('Valores Previstos')
    #plt.savefig('confusion-matrix.png', format='png', dpi=400, transparent=True)
    plt.show()

In [14]:
def safe_divide(a, b):
    return a / b if b != 0 else 0

def generate_metrics_report(y_test, y_pred, n_classes = 3, target_map = {1: 'HS', 2: 'MCI', 3: 'AD'}):
    cm = confusion_matrix(y_test, y_pred)
    report_data = []

    for c in range(n_classes):
        tp = cm[c, c]
        fp = sum(cm[:, c]) - cm[c, c]
        fn = sum(cm[c, :]) - cm[c, c]
        tn = sum(np.delete(sum(cm) - cm[c, :], c))

        recall = safe_divide(tp, tp + fn)
        precision = safe_divide(tp, tp + fp)
        specificity = safe_divide(tn, tn + fp)
        f1_score = safe_divide(2 * precision * recall, precision + recall)
        fpr = safe_divide(fp, fp + tn)
        fnr = safe_divide(fn, fn + tp)
        acc = safe_divide(tp + tn, tp + tn + fp + fn) 
            
        report_data.append([target_map[c + 1], round(acc, 4), round(precision, 4), round(f1_score, 4), round(recall, 4), round(specificity, 4), round(fpr, 4), round(fnr, 4)])

    df_report = pd.DataFrame(report_data, columns = ['class', 'accuracy', 'precision', 'f1-score', 'sensitivity', 'specificity', 'fpr', 'fnr'])
     
    avg_row = {
        'class': 'Avg',
        'accuracy': round(df_report['accuracy'].mean(), 4),
        'precision': round(df_report['precision'].mean(), 4),
        'f1-score': round(df_report['f1-score'].mean(), 4),
        'sensitivity': round(df_report['sensitivity'].mean(), 4),
        'specificity': round(df_report['specificity'].mean(), 4),
        'fpr': round(df_report['fpr'].mean(), 4),
        'fnr': round(df_report['fnr'].mean(), 4)
    }

    df_report.loc[-1] = [avg_row['class'], avg_row['accuracy'], avg_row['precision'], avg_row['f1-score'], avg_row['sensitivity'], avg_row['specificity'], avg_row['fpr'], avg_row['fnr']]
    df_report.index = df_report.index + 1  # shifting index
    
    return df_report

In [15]:
def calculate_metrics(y_test, y_pred, score='accuracy', n_classes = 3, target_map = {1: 'HS', 2: 'MCI', 3: 'AD'}):
    df_report = generate_metrics_report(y_test, y_pred, n_classes = n_classes, target_map = target_map)
    return round(df_report[score].mean(), 4)

In [16]:
def print_metrics(clf, kfolds, score='accuracy', test_size = 0.30, fn_action = None, suffle=True, random_state=state):
    if (kfolds == 1):
        print(f'Running Simple TrainTestSplit with test size: {test_size}: ')
    else:
        print(f'Running Cross Validation with {kfolds} folds:')
        
    print(f'- clf: {clf}')
    print(f'- score: {score}')
    print(f'- suffle: {suffle}')
    print(f'- random_state: {random_state}\n')
    
    scores = []
    
    # we use simple train_test_split
    if (kfolds == 1):
        scores = train_with_test_split(clf, test_size=test_size, random_state=random_state)
    else:
        scores = train_with_cross_validation(clf, kfolds=kfolds, suffle=suffle, random_state=random_state)
    
    best_params = []
        
    # Step 1: Calculate the scores for all folds
    scores_values = np.array([calculate_metrics(y_test, y_pred, n_classes=3, score=score) 
                             for y_test, y_pred in scores])

    # Step 2: Calculate the average score
    average_score = np.mean(scores_values)

    # Step 3: Find the index of the score closest to the average score
    closest_index = np.argmin(np.abs(scores_values - average_score))

    # Retrieve the parameters corresponding to the closest score
    best_params = scores[closest_index]
                
    df_metrics = generate_metrics_report(best_params[0], best_params[1], n_classes = 3)
            
    print('{} ± {} & {} ± {} & {} ± {} & {} ± {} & {} ± {} & {} ± {} & {} ± {}'
          .format(round(df_metrics['accuracy'].mean() * 100.0, 2), 
                  round(df_metrics['accuracy'].std() * 100.0, 2),
                  round(df_metrics['precision'].mean() * 100.0, 2), 
                  round(df_metrics['precision'].std() * 100.0, 2),
                  round(df_metrics['f1-score'].mean() * 100.0, 2), 
                  round(df_metrics['f1-score'].std() * 100.0, 2),
                  round(df_metrics['sensitivity'].mean() * 100.0, 2), 
                  round(df_metrics['sensitivity'].std() * 100.0, 2),
                  round(df_metrics['specificity'].mean() * 100.0, 2), 
                  round(df_metrics['specificity'].std() * 100.0, 2),
                  round(df_metrics['fpr'].mean() * 100.0, 2), 
                  round(df_metrics['fpr'].std() * 100.0, 2),
                  round(df_metrics['fnr'].mean() * 100.0, 2),
                  round(df_metrics['fnr'].std() * 100.0, 2)))
    
    print(f'[+] Best value (folds) ({score}): {round(scores_values[closest_index], 4) * 100.0}% (+- {round(scores_values.std() * 100.0, 2)}%)')
    print(f'[+] Accuracy Mean: {round(df_metrics['accuracy'].mean() * 100.0, 2)}% (+- {round(df_metrics['accuracy'].std() * 100.0, 2)}%)')
    print(f'[+] Precision Mean: {round(df_metrics['precision'].mean() * 100.0, 2)}% (+- {round(df_metrics['precision'].std() * 100.0, 2)}%)')
    print(f'[+] F1-Score Mean: {round(df_metrics['f1-score'].mean() * 100.0, 2)}% (+- {round(df_metrics['f1-score'].std() * 100.0, 2)}%)')
    print(f'[+] Sensitivity Mean: {round(df_metrics['sensitivity'].mean() * 100.0, 2)}% (+- {round(df_metrics['sensitivity'].std() * 100.0, 2)}%)')
    print(f'[+] Specificity Mean: {round(df_metrics['specificity'].mean() * 100.0, 2)}% (+- {round(df_metrics['specificity'].std() * 100.0, 2)}%)')
    print(f'[+] False Positive Rate Mean: {round(df_metrics['fpr'].mean() * 100.0, 2)}% (+- {round(df_metrics['fpr'].std() * 100.0, 2)}%)')
    print(f'[+] False Negative Rate Mean: {round(df_metrics['fnr'].mean() * 100.0, 2)}% (+- {round(df_metrics['fnr'].std() * 100.0, 2)}%)')

    print()
    
    print(df_metrics)
    print()
    
    if (fn_action != None):
        fn_action(best_params[0], best_params[1])

#### Run Models

In [None]:
clf = OneVsRestClassifier(SVC(C=1.0, kernel='rbf', random_state=state))
print_metrics(clf, 10, score='f1-score', suffle=True, random_state=state)

clf = OneVsOneClassifier(SVC(C=1.0, kernel='rbf', random_state=state))
print_metrics(clf, 10, score='f1-score', suffle=True, random_state=state, fn_action=plot_confusion_matrix)