In [None]:
#imports
import dalex as dx
import matplotlib.pyplot as plt
import numpy as np
import os
import pandas as pd
import pickle
import shap
import sys
import time
from sklearn import preprocessing
from sklearn.base import BaseEstimator
from sklearn.dummy import DummyClassifier
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import accuracy_score,balanced_accuracy_score,f1_score,matthews_corrcoef,roc_auc_score
from sklearn.model_selection import StratifiedKFold
from sklearn.svm import SVC
from sklearn.utils.validation import check_X_y, check_array, check_is_fitted
from sklearn.utils.multiclass import unique_labels

In [None]:
#definitions of path
MODEL_DIR = os.path.join("./SVMRBF/")
path_train_data=os.path.join("../data/trainValid.csv")
path_test_data=os.path.join("../data/test.csv")
path_test_AIBL=os.path.join("../data/AIBL.csv")
path_test_OASIS=os.path.join("../data/OASIS.csv")
filenameCSV=os.path.join("./SVMRBF/SVMRBF_hyperparameter_tuning")
filename_predictions_for_platt_scaling="./SVMRBF/SVMRBF_predictions_for_platt_scaling.csv"
mapping_ML_DL=os.path.join("../additional_data/Mapping_DKT_Regions_Deep_ML_new.csv")

In [None]:
#load ADNI train and test data
test=pd.read_csv(path_test_data,index_col="PTID")
trainValidMerged=pd.read_csv(path_train_data,index_col="PTID")

In [None]:
#if model directory not exists create model directory
if not os.path.exists(MODEL_DIR):
    os.makedirs(MODEL_DIR)

In [None]:
#hyperparameter tuning (grid-search), 5-fold CV
for C in [1e-5,1e-4,1e-3,1e-2,1e-1,1e-0,1e1,1e2,1e3,1e4,1e5]:
    for gamma in ["scale", "auto"]:
        kf = StratifiedKFold(n_splits=5,shuffle=True,random_state=101)
        cvIt=0
        for trainCross, validCross in kf.split(trainValidMerged,trainValidMerged.DX):
            cvIt+=1
            #identify training and validation data
            training=trainValidMerged.iloc[trainCross]
            valid=trainValidMerged.iloc[validCross]
            #change format of training data
            Y_train=pd.get_dummies(training.DX,drop_first=True).to_numpy().squeeze()
            X_train=training.drop(["DX"],axis=1).to_numpy()
            #change format of validation data
            Y_valid=pd.get_dummies(valid.DX,drop_first=True).to_numpy().squeeze()
            X_valid=valid.drop(["DX"],axis=1).to_numpy()
            #fit scaler for training dataset
            scaler = preprocessing.StandardScaler().fit(X_train)
            #apply scaling for training and validation data
            X_train_pre=scaler.transform(X_train)
            X_valid_pre=scaler.transform(X_valid)
            #train model on training data
            model = SVC(C=C,gamma=gamma,kernel="rbf", random_state=666,probability=True)
            model.fit(X_train_pre, Y_train)
            #predict results on validation data
            predictions_val=model.predict(X_valid_pre)
            #calculate accuracy score on validation dataset
            acc=accuracy_score(Y_valid, predictions_val)
            #save results for CV iteration to file
            d = {'C': [C], 'gamma': [gamma],'CV':[cvIt], "Epoch-Accuracy":[acc*100]}
            df = pd.DataFrame(data=d)
            if os.path.isfile(filenameCSV):
                df.to_csv(filenameCSV, mode='a', header=False)
            else:
                df.to_csv(filenameCSV, mode='w', header=True)
            #save model
            filename=MODEL_DIR+"model_"+str(C)+"_"+str(gamma)+"_"+str(cvIt)+".sav"
            pickle.dump(model, open(filename, 'wb'))

In [None]:
#identify hyperparameters that achieve best mean accuracy during CV
#load dataset
datasetMonai=pd.read_csv(filenameCSV,index_col=0)
datasetMonaiTest=datasetMonai.drop(["Epoch-Accuracy"], axis=1)
#remove duplicate datapoints (e.g. if the pipeline was run multiple time, the last entry is kept)
datasetMonai=datasetMonai[~datasetMonaiTest.duplicated(keep="last")]
#group CV results by hyperparameters
listCV=datasetMonai.groupby(["C","gamma"])
listCV = [group for _, group in listCV]
#create new data frame
column_names = ["C","gamma", "CV-Accuracy","CV-Accuracy_sd"]
dfGes = pd.DataFrame(columns = column_names)
#for each hyperparameter combination calculate mean and sd of validation accuracy
for i in listCV:
    row_a={'C': i["C"].iloc[0],'gamma': i["gamma"].iloc[0], 'CV-Accuracy':i["Epoch-Accuracy"].mean(), 'CV-Accuracy_sd':i["Epoch-Accuracy"].std()}
    row_df = pd.DataFrame(row_a,index=[0])
    dfGes = pd.concat([dfGes,row_df])
#sort data frame by mean CV accuracy
dfGes=dfGes.sort_values(by=['CV-Accuracy'])
dfGes=dfGes.reset_index()

In [None]:
#extract best hyperparameters
C=dfGes.iloc[dfGes.shape[0]-1]["C"]
gamma=dfGes.iloc[dfGes.shape[0]-1]["gamma"]
#extract best mean CV accuracy and sd 
cv_acc_mean=dfGes.iloc[dfGes.shape[0]-1]["CV-Accuracy"]
cv_acc_sd=dfGes.iloc[dfGes.shape[0]-1]["CV-Accuracy_sd"]

In [None]:
# calculate predictions of model with best hyperparameters during CV for Platt scaling
kf = StratifiedKFold(n_splits=5,shuffle=True,random_state=101)
cvIt=0
for trainCross, validCross in kf.split(trainValidMerged,trainValidMerged.DX):
    cvIt+=1
    #identify training and validation datasets
    training=trainValidMerged.iloc[trainCross]
    valid=trainValidMerged.iloc[validCross]
    #change format of training data
    Y_train=pd.get_dummies(training.DX,drop_first=True).to_numpy().squeeze()
    X_train=training.drop(["DX"],axis=1).to_numpy()
    #change format of validation data
    Y_valid=pd.get_dummies(valid.DX,drop_first=True).to_numpy().squeeze()
    X_valid=valid.drop(["DX"],axis=1).to_numpy()
    #fit scaler for training dataset
    scaler = preprocessing.StandardScaler().fit(X_train)
    #apply scaling for training and validation data
    X_train_pre=scaler.transform(X_train)
    X_valid_pre=scaler.transform(X_valid)
    #load trained model
    filename=MODEL_DIR+"model_"+str(C)+"_"+str(gamma)+"_"+str(cvIt)+".sav"
    model = pickle.load(open(filename, 'rb'))
    #calculate predictions for validation dataset
    predictions_test = model.predict(X_valid_pre)
    predictions_test_prob=model.predict_proba(X_valid_pre)[:, 1]
    #save predictions for validation dataset
    predictionsRes=pd.DataFrame({"labels":Y_valid,"predictions_bin":predictions_test,"predictions_prob":predictions_test_prob})
    if os.path.isfile(filename_predictions_for_platt_scaling):
        predictionsRes.to_csv(filename_predictions_for_platt_scaling, mode='a', header=False)
    else:
        predictionsRes.to_csv(filename_predictions_for_platt_scaling, mode='w', header=True)

In [None]:
#calculate results for ADNI test dataset
#change format of test dataset
Y_test=pd.get_dummies(test.DX,drop_first=True).to_numpy().squeeze()
X_test=test.drop(["DX"],axis=1).to_numpy()

In [None]:
#change format of training dataset
Y_train=pd.get_dummies(trainValidMerged.DX,drop_first=True).to_numpy().squeeze()
X_train=trainValidMerged.drop(["DX"],axis=1).to_numpy()
#fit scaler for training dataset
scaler = preprocessing.StandardScaler().fit(X_train)
#apply scaling for training and ADNI test dataset
X_train_pre=scaler.transform(X_train)
X_test_pre=scaler.transform(X_test)
#train final model on entire training dataset
model = SVC(C=C,gamma=gamma,kernel="rbf", random_state=666,probability=True)
model.fit(X_train_pre, Y_train)

In [None]:
#define calibration model
class MyClassifier(BaseEstimator):
    def __init__(self, estimator1=None, estimator2=None):
        self.estimator1 = estimator1
        self.estimator2 = estimator2
    def fit(self, X, y):
        # Check that X and y have correct shape
        X, y = check_X_y(X, y)
        # Store the classes seen during fit
        self.classes_ = unique_labels(y)
        self.X_ = X
        self.y_ = y
        # Return the classifier
        return self

    def predict(self, X):
        # Input validation
        X = check_array(X)
        pred1=self.estimator1.predict_proba(X)
        pred1=np.expand_dims(pred1[:,1], axis=1)
        pred2=self.estimator2.predict(pred1)
        return pred2
    def predict_proba(self, X):
        # Input validation
        X = check_array(X)
        pred1=self.estimator1.predict_proba(X)
        pred1=np.expand_dims(pred1[:,1], axis=1)
        pred2=self.estimator2.predict_proba(pred1)
        return pred2
#load CV calibrations for Platt scaling
pred=pd.read_csv(filename_predictions_for_platt_scaling)
test = np.expand_dims(pred.predictions_prob.to_numpy(), axis=1)
#train logistic regression model on CV calibrations for Platt scaling
clf = LogisticRegression(random_state=0).fit(test, pred.labels)
clfTest=MyClassifier(model,clf)

In [None]:
#predict calibrated results for ADNI test set
predictions_test = clfTest.predict(X_test_pre)
predictions_test_prob=clfTest.predict_proba(X_test_pre)[:, 1]
#calculate metrics
acc_adni_test=accuracy_score(Y_test, predictions_test)
bacc_adni_test=balanced_accuracy_score(Y_test, predictions_test)
f1_adni_test=f1_score(Y_test, predictions_test, average='macro')
mcc_adni_test=matthews_corrcoef(Y_test, predictions_test)
auroc_adni_test=roc_auc_score(Y_test, predictions_test_prob)

In [None]:
#load AIBL test dataset
AIBL=pd.read_csv(path_test_AIBL,index_col="PTID")
#change format of AIBL dataset
Y_AIBL=pd.get_dummies(AIBL.DX,drop_first=True).to_numpy().squeeze()
X_AIBL=AIBL.drop(["DX"],axis=1).to_numpy()
#apply scaling for AIBL test dataset
X_AIBL_pre=scaler.transform(X_AIBL)
#predict calibrated results for AIBL test set
predictions_test = clfTest.predict(X_AIBL_pre)
predictions_test_prob=clfTest.predict_proba(X_AIBL_pre)[:, 1]
#calculate metrics for AIBL
acc_aibl_test=accuracy_score(Y_AIBL, predictions_test)
bacc_aibl_test=balanced_accuracy_score(Y_AIBL, predictions_test)
f1_aibl_test=f1_score(Y_AIBL, predictions_test, average='macro')
mcc_aibl_test=matthews_corrcoef(Y_AIBL, predictions_test)
auroc_aibl_test=roc_auc_score(Y_AIBL, predictions_test_prob)

In [None]:
#load OASIS test dataset
OASIS=pd.read_csv(path_test_OASIS,index_col="PTID")
#change format of OASIS test dataset
Y_OASIS=pd.get_dummies(OASIS.DX,drop_first=True).to_numpy().squeeze()
X_OASIS=OASIS.drop(["DX"],axis=1).to_numpy()
#apply scaling for OASIS test dataset
X_OASIS_pre=scaler.transform(X_OASIS)
#predict calibrated results for OASIS test set
predictions_test = clfTest.predict(X_OASIS_pre)
predictions_test_prob=clfTest.predict_proba(X_OASIS_pre)[:, 1]
#calculate metrics for OASIS
acc_oasis_test=accuracy_score(Y_OASIS, predictions_test)
bacc_oasis_test=balanced_accuracy_score(Y_OASIS,predictions_test)
f1_oasis_test=f1_score(Y_OASIS, predictions_test, average='macro')
mcc_oasis_test=matthews_corrcoef(Y_OASIS, predictions_test)
auroc_oasis_test=roc_auc_score(Y_OASIS, predictions_test_prob)

In [None]:
#interpret ML model
#load mapping of features between ML and DL
mapping=pd.read_csv(mapping_ML_DL)
#identify features which are available for both model types
X_train_pre_red=X_train_pre[:,training.drop(["DX"],axis=1).columns.isin(mapping.feature_ML)]
#identify aspects for features which are available for both model types
dummy_clf = DummyClassifier(strategy="most_frequent")
dataNamed=pd.DataFrame(X_train_pre_red,columns=training.drop(["DX"],axis=1).columns[training.drop(["DX"],axis=1).columns.isin(mapping.feature_ML)])
exp = dx.Explainer(dummy_clf, dataNamed, Y_train)
asp = dx.Aspect(exp)
aspects=asp.get_aspects(h=0.5)
#add features only available in ML models (eTIV)
notInDeep=training.drop(["DX"],axis=1).columns[~training.drop(["DX"],axis=1).columns.isin(mapping.feature_ML)]
notInDeep=notInDeep.tolist()
for value in notInDeep:
    aspects[value]=[value]

In [None]:
# calculate permutation importance for aspects
dataNamed=pd.DataFrame(X_train_pre,columns=training.drop(["DX"],axis=1).columns)
exp = dx.Explainer(clfTest, dataNamed, Y_train)
asp = dx.Aspect(exp)
mai_asp = asp.model_parts(variable_groups=aspects)

In [None]:
#calculate SHAP importances for aspects
dataNamed=pd.DataFrame(X_train_pre,columns=training.drop(["DX"],axis=1).columns)
exp = dx.Explainer(clfTest, dataNamed, Y_train)
asp = dx.Aspect(exp)
df=pd.DataFrame(columns=aspects.keys())
for i in range(X_train_pre.shape[0]):
    print(i)
    mai_asp_shap = asp.predict_parts(X_train_pre[i],variable_groups=aspects, label='for aspects created by user', type='shap', random_state=42,N=1000)
    res=mai_asp_shap.result
    df=pd.concat([df,pd.DataFrame([res.importance.tolist()],columns=res.aspect_name,index=[trainValidMerged.index[i]])])

In [None]:
#visualize SHAP feature importance plot
df_sum=df.abs().sum(axis=0)
ax=pd.Series(df_sum,dtype=float).nlargest(10).plot(kind='barh')
ax.invert_yaxis()
ax.set_xlabel("SHAP importance")
ax.set_ylabel("Aspects")

In [None]:
#visualize SHAP summary plot
training_cal=trainValidMerged.copy()
for a in aspects:
    for b in aspects[a]:
        training_cal[b]=df[a]
training_cal_shap=training_cal.drop(["DX"],axis=1)
training_test=trainValidMerged.drop(["DX"],axis=1)
columnNamesNew=training_test.columns
columnNamesNew=[]
for column in trainValidMerged.columns:
    for aspect in aspects:
        for dfcol in aspects[aspect]:
            if(dfcol==column):
                if(aspect.startswith("aspect")):
                    columnNamesNew.append(column+" ("+aspect+")")
                else:
                    columnNamesNew.append(column)

shap.summary_plot(training_cal_shap.to_numpy().astype(float),training_test.to_numpy().astype(float),columnNamesNew,show=False)
plt.text(-0.4, 20, 'protective')
plt.text(0.6, 20, 'progressive')

In [None]:
#calculate LIME importances for aspects
dataNamed=pd.DataFrame(X_train_pre,columns=training.drop(["DX"],axis=1).columns)
exp = dx.Explainer(clfTest, dataNamed, Y_train)
asp = dx.Aspect(exp)
df=pd.DataFrame(columns=aspects.keys())
for i in range(X_train_pre.shape[0]):
    print(i)
    mai_asp_shap = asp.predict_parts(X_train_pre[i],variable_groups=aspects, label='for aspects created by user', type='default', random_state=42,N=1000)
    res=mai_asp_shap.result
    df=pd.concat([df,pd.DataFrame([res.importance.tolist()],columns=res.aspect_name,index=[trainValidMerged.index[i]])])

In [None]:
#visualize LIME feature importance plot
df_sum=df.abs().sum(axis=0)
ax=pd.Series(df_sum,dtype=float).nlargest(10).plot(kind='barh')
ax.invert_yaxis()
ax.set_xlabel("LIME importance")
ax.set_ylabel("Aspects")

In [None]:
#visualize LIME summary plot
training_cal=trainValidMerged.copy()
for a in aspects:
    for b in aspects[a]:
        training_cal[b]=df[a]
training_cal_shap=training_cal.drop(["DX"],axis=1)
training_test=trainValidMerged.drop(["DX"],axis=1)
columnNamesNew=training_test.columns
columnNamesNew=[]
for column in trainValidMerged.columns:
    for aspect in aspects:
        for dfcol in aspects[aspect]:
            if(dfcol==column):
                if(aspect.startswith("aspect")):
                    columnNamesNew.append(column+" ("+aspect+")")
                else:
                    columnNamesNew.append(column)

shap.summary_plot(training_cal_shap.to_numpy().astype(float),training_test.to_numpy().astype(float),columnNamesNew,show=False)
plt.text(-0.4, 20, 'protective')
plt.text(0.6, 20, 'progressive')

In [None]:
print(f"C: {C},\n gamma: {gamma},\n Mean CV-Accuracy: {round(cv_acc_mean,2)},\n SD CV-Accuracy: {round(cv_acc_sd,2)}")

In [None]:
print(f"Accuracy (ADNI test set): {round(acc_adni_test*100,2)},\n balanced-accuracy (ADNI test set): {round(bacc_adni_test*100,2)},\n Macro-averaging F1-score (ADNI test set): {round(f1_adni_test*100,2)},\n MCC (ADNI test set): {round(mcc_adni_test,3)},\n AUROC (ADNI test set): {round(auroc_adni_test*100,2)}")

In [None]:
print(f"Accuracy (AIBL test set): {round(acc_aibl_test*100,2)},\n balanced-accuracy (AIBL test set): {round(bacc_aibl_test*100,2)},\n Macro-averaging F1-score (AIBL test set): {round(f1_aibl_test*100,2)},\n MCC (AIBL test set): {round(mcc_aibl_test,3)},\n AUROC (AIBL test set): {round(auroc_aibl_test*100,2)}")

In [None]:
print(f"Accuracy (OASIS test set): {round(acc_oasis_test*100,2)},\n balanced-accuracy (OASIS test set): {round(bacc_oasis_test*100,2)},\n Macro-averaging F1-score (OASIS test set): {round(f1_oasis_test*100,2)},\n MCC (OASIS test set): {round(mcc_oasis_test,3)},\n AUROC (OASIS test set): {round(auroc_oasis_test*100,2)}")