
**Neural Network Classifier for eICU Patient Data**


Mount Google Drive

In [None]:
from google.colab import drive
drive.mount('/content/gdrive/')

In [None]:
pip install -r "/content/gdrive/MyDrive/MA_XAI/CTdata/Brain_Hemorage_python/requirements.txt"

In [None]:
import os
import pandas as pd
import numpy as np
import matplotlib as mpl
import matplotlib.pyplot as plt
import pickle
import gc


from sklearn.impute import KNNImputer
from sklearn.model_selection import train_test_split
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import classification_report, confusion_matrix
from sklearn import metrics
import seaborn as sns
import statsmodels.api as sm
import xgboost


import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torch.utils.data import DataLoader, Dataset
from torch.utils.data import sampler
from torchvision import transforms
import fastprogress
import time
import random
from datetime import date



# imports from captum library
#from captum.attr import (
#    IntegratedGradients,
#    DeepLift,
#    KernelShap,
#    LRP,
#    Lime,
#    visualization)
from captum._utils.models.linear_model import SkLearnLinearRegression, SkLearnLasso



import plotly.express as px

import quantus
import shap

In [None]:
import sys
sys.path.append('/content/gdrive/MyDrive/')

In [None]:
import utils_MA
import ExplanationWrapper_tabular
from utils_MA import get_device, CustomPatientData, MLP, accuracy, train2, validate2, run_training2, plot, test, EarlyStopper
from ExplanationWrapper_tabular import (explainer_wrapper,
                                       lime_explainer,
                                       kernelshap_explainer,
                                       lrp_explainer,
                                       deeplift_explainer,
                                       intgrad_explainer,
                                       random_explainer,
                                       custom_perturbation_func,
                                       radar_factory)

System checks

In [None]:
device = get_device()

# Get number of cpus to use for faster parallelized data loading
num_cpus = os.cpu_count()
print(num_cpus, 'CPUs available')

Import Dataset

In [None]:
orig_data = pd.read_csv("/content/gdrive/MyDrive/MA_XAI/PatientData_ready/final_eICU_data.csv")

# specify target variable Y and remove variables not used for training
Y_df = orig_data['intubated']
useddata = orig_data.drop(['intubated','patientunitstayid',
                           'oxygen_therapy_type', #'supp_oxygen',
                           'icu_los','unitdischargestatus',
                           'vent_start'],axis = 1)

# Label NA values in 'vassopressor' such that they are modeled as own category
useddata['vasopressor'] = useddata['vasopressor'].fillna(0)

# specify categorical variables as 'category'
cat_vals = ['specialty']


# check shares of missing values within each feature
print(useddata.isnull().mean().sort_values(ascending=False))


""" Imputation - KNN imputation for missing values for
    SpO2, gcs, glucose, PaO2, PaCO2, or HCO3
    Using the KNNImputer from sklearn """

imp = KNNImputer()

# impute missing data
imputed_X = imp.fit_transform(useddata)
X_df = pd.DataFrame(imputed_X, columns = useddata.columns)

X_df['gcs'] = round(X_df['gcs'],0)

In [None]:
# Group specialty variable and assign meaningful lables

# Specialty
specialty_conditions = [
    (X_df['specialty'] == 0),
    (X_df['specialty'] == 1),
    (X_df['specialty'] == 2)
]
specialty_values = ['Spec:Surgery','Spec:Other','Spec:Medical']
X_df['specialty_grouped'] = np.select(specialty_conditions, specialty_values)


# Data preparation
specialty = pd.get_dummies(X_df['specialty_grouped'],drop_first = True)
X_df.drop(['specialty','specialty_grouped'],axis = 1, inplace=True)
X_df = pd.concat([X_df,specialty],axis = 1)
colnames = X_df.columns



Normalize data and transform to tensor

In [None]:
# TODO calculate mean and standard deviation of train set
meanX = X_df.mean(axis = 0)
stdX = X_df.std(axis = 0)

X_norm = (X_df - meanX) / stdX

# convert to torch tensors

X = torch.tensor(X_norm.values.astype(np.float32))
Y = torch.tensor(Y_df.astype(np.int64).values)

Logistic Regression

In [None]:
# Estimation
model = LogisticRegression(fit_intercept = True, C = 1,max_iter = 10000)
mdl = model.fit(X_norm, Y_df)
pred = mdl.predict(X_norm)
cm = metrics.confusion_matrix(Y_df, pred)

display(cm)
# norm coefficents and store them in data frame
coef = model.coef_[0]
coef_X_input = (coef*X_norm) # mulitply coeficients of logistic regression with inputs
normed_coef_X_input = coef_X_input / np.linalg.norm(coef_X_input, ord=1)

#normed_coef = coef / np.linalg.norm(coef, ord=1)
logit_coef = pd.DataFrame({'Features':X_norm.columns,'variable': 'Logistic Regression', 'value': coef})
abs_logit_coef = pd.DataFrame({'Features':X_norm.columns,'variable': 'Logistic Regression', 'value': abs(coef)})

**Weight initialization for MLP model**

In [None]:
def init_weights(m):
    if isinstance(m, nn.Linear):
        torch.nn.init.xavier_normal_(m.weight)
        m.bias.data.fill_(0.01)

Run training 'nr_iter' times and save loss, AUC, accuracy, XAI attributions, predictions and labels for every iteration

In [None]:
""" Set hyperparamters for training """
nr_iter = 100
batch_size = 16

# network architecture
num_hidden_units = 50
num_hidden_layers = 1
dropout = .05

# early stopper
patience_param = 5

# training
loss_function = torch.nn.MSELoss()
num_epochs = 30
lr = 0.0001

# evaluation
threshold = 0.3

# xai methods
xai_methods = ['Lime','KernelShap','LRP','DeepLIFT','IntegratedGradients']



In [None]:
def define_eval_metrics():

  eval_metrics = {
      "LocalLipSchitz": quantus.LocalLipschitzEstimate(
          nr_samples=20,
          norm_numerator=quantus.similarity_func.distance_euclidean,
          norm_denominator=quantus.similarity_func.distance_euclidean,    
          perturb_func=custom_perturbation_func,
          perturb_func_kwargs = {'sets':sets, 'len_sets':len_sets},
          similarity_func=quantus.similarity_func.lipschitz_constant,
          aggregate_func=np.mean,
          return_aggregate=True,
          return_nan_when_prediction_changes = True,
          disable_warnings = True
          
      ),
      "AvgSensitivity": quantus.AvgSensitivity(
          nr_samples=20,
          lower_bound=0.2,
          norm_numerator=quantus.norm_func.fro_norm,
          norm_denominator=quantus.norm_func.fro_norm,
          perturb_func=custom_perturbation_func,
          perturb_func_kwargs = {"sets":sets, "len_sets": len_sets},
          similarity_func=quantus.similarity_func.difference,
          abs=False,
          normalise=False,
          aggregate_func=np.mean,
          return_aggregate=True,
          disable_warnings = True
      ),
      "Faithfulness_Correlation": quantus.FaithfulnessCorrelation(
          nr_runs=30,
          subset_size = 5,
          perturb_func=quantus.perturb_func.baseline_replacement_by_indices,
          perturb_func_kwargs = {'sets':sets, 'len_sets':len_sets, 'use_baseline':True, 'baseline':baseline_median},
          similarity_func=quantus.similarity_func.correlation_pearson,
          abs=False,
          normalise=False,
          aggregate_func=np.mean,
          return_aggregate=True,
          disable_warnings = True
      ),
      "Faithfulness_Estimate": quantus.FaithfulnessEstimate(
          perturb_func=quantus.perturb_func.baseline_replacement_by_indices,
          perturb_func_kwargs = {'sets':sets, 'len_sets':len_sets, 'use_baseline':True, 'baseline':baseline_median},
          similarity_func=quantus.similarity_func.correlation_pearson,
          aggregate_func=np.mean,
          features_in_step=2,  
          return_aggregate=True,
          abs = False,
          disable_warnings = True
      ),
      "Complexity": quantus.Sparseness(
          abs=True,
          normalise=False,
          aggregate_func=np.mean,
          return_aggregate=True,
          disable_warnings = True
      ),
      "Randomisation": quantus.ModelParameterRandomisation(
          similarity_func=quantus.similarity_func.correlation_spearman,
          return_sample_correlation=True,
          return_aggregate=True,
          aggregate_func=np.mean,
          layer_order="independent",
          disable_warnings=True,
          normalise=True,
          abs=True,
      ),
  }

  return eval_metrics

In [None]:
# lists to save training metrics for each iteration
TRAIN_LOSSES, TRAIN_ACCS, VAL_LOSSES, VAL_ACCS, VAL_AUCS, EPOCHS_STOPPED = [], [], [], [], [], []
CONFUSION_MATRIX, YS, Y_PREDS = [], [], []

# lists to save attributions each iteration
RANDOM_ATTR, LIME_ATTR, KSHAP_ATTR, LRP_ATTR, DeepLIFT_ATTR, IG_ATTR  = [],[],[],[],[],[]
IG_ATTR_TP, IG_ATTR_FP, IG_ATTR_TN, IG_ATTR_FN = [],[],[],[]



""" median-mode baseline """
baseline_median = X.median(dim = 0)[0].unsqueeze(dim = 0).reshape(1,1,20)

for i in range(nr_iter):
  
  # Different random seed each iteration
  RANDOM_SEED = i
  RANDOM = np.random.RandomState(RANDOM_SEED)

  """ 
  Apply Monte Carlo Cross Validation:
  In each iteration a new training and validation set is created
  """
  X_train, X_valid, Y_train, Y_valid = train_test_split(X,Y, test_size = 0.1,shuffle = True, random_state = RANDOM)
  traindata, valdata = CustomPatientData(X_train, Y_train), CustomPatientData(X_valid, Y_valid)
  traindl, valdl = DataLoader(traindata, batch_size = batch_size, shuffle=False, drop_last = False), DataLoader(valdata, batch_size = batch_size, shuffle=False, drop_last = False)

  """ instantiate model """
  model = MLP(num_features = 20, dropout = dropout,  
            num_hidden_units = num_hidden_units, num_hidden_layers = num_hidden_layers)


  """ specify model and optimizer """
  model = model.to(device)
  optimizer = torch.optim.Adam(model.parameters(), lr=lr)
  

  """ instantiate early stopper """
  stopper = EarlyStopper(patience=patience_param)


  """ run training """
  train_losses, val_losses, train_accs, val_accs, val_AUC, epoch, confusion_matrix, y, y_pred = run_training2(model, optimizer, loss_function, device, num_epochs, 
                traindl, valdl, lr, early_stopper=stopper, verbose=False, threshold = threshold)


  """ Only keep metrics from best model """
  z = epoch - patience_param
  TRAIN_ACCS.append(train_accs[z])
  TRAIN_LOSSES.append(train_losses[z])
  EPOCHS_STOPPED.append(epoch)
  VAL_LOSSES.append(val_losses[z])
  VAL_ACCS.append(val_accs[z])
  VAL_AUCS.append(val_AUC[z])
  CONFUSION_MATRIX.append(confusion_matrix[z])

  """ Label TruePositives, FalsePositives etc. """
  tp = np.where( (y[z] == 1) & (y_pred[z] == 1), True, False)
  fp = np.where( (y[z] == 0) & (y_pred[z] == 1), True, False)
  tn = np.where( (y[z] == 0) & (y_pred[z] == 0), True, False)
  fn = np.where( (y[z] == 1) & (y_pred[z] == 0), True, False)
  

  """ compute XAI attributions """
  x_batch = X_valid.reshape(len(X_valid),1,20) #bring X_valid in right format for Quantus
  sets = [np.unique(x_batch[:,:,i].cpu().detach().numpy()) for i  in range(20)] # for perturbation function
  len_sets = np.asarray([len(sets[i]) for i in range(20)])

  
  
  """ Random explanation as control """
  gc.collect()
  torch.cuda.empty_cache()
  RANDOM_ATTR.append(random_explainer(model=model,
                                inputs=x_batch,
                                **{"device": device},))
  
  
  
  """ LIME """
  gc.collect()
  torch.cuda.empty_cache()
  st = time.time()
  LIME_ATTR.append(lime_explainer(model=model,
                                  inputs=x_batch,
                                  baseline = baseline_median,
                                  **{"device": device},))
  et = time.time()
  lime_time = et - st

  
  """ Kernel Shap """
  gc.collect()
  torch.cuda.empty_cache()
  st = time.time()
  KSHAP_ATTR.append(kernelshap_explainer(model=model,
                                         inputs=x_batch,
                                         baseline = baseline_median,
                                         **{"device": device},))
  et = time.time()
  kshap_time = et - st

  
  x_batch.requires_grad_()  # for the gradient based methods
  
  """ LRP """
  gc.collect()
  torch.cuda.empty_cache()
  st = time.time()
  LRP_ATTR.append(lrp_explainer(model=model,
                                inputs=x_batch,
                                **{"device": device},))
  et = time.time()
  lrp_time = et - st

  """ DeepLIFT """
  gc.collect()
  torch.cuda.empty_cache()
  st = time.time()
  DeepLIFT_ATTR.append(deeplift_explainer(model=model,
                                          inputs=x_batch,
                                          baseline = baseline_median,
                                          **{"device": device},))
  et = time.time()
  deeplift_time = et - st
  
  
  """ Integrated Gradient """
  # for addditonal analysis distinguish attributions for IG between true positives, false positives etc, ....

  # complete data
  gc.collect()
  torch.cuda.empty_cache()
  st = time.time()
  IG_ATTR.append(intgrad_explainer(model=model,
                                    inputs=x_batch,
                                    baseline = baseline_median,
                                    **{"device": device, "n_steps": 30},))
  et = time.time()
  ig_time = et - st
  
  
  # true positives
  gc.collect()
  torch.cuda.empty_cache()
  IG_ATTR_TP.append(intgrad_explainer(model=model,
                                     inputs=x_batch[tp],
                                     baseline = baseline_median,
                                     **{"device": device, "n_steps": 30},))
  
  # false positives
  gc.collect()
  torch.cuda.empty_cache()
  IG_ATTR_FP.append(intgrad_explainer(model=model,
                                     inputs=x_batch[fp],
                                     baseline = baseline_median,
                                     **{"device": device, "n_steps": 30},))
  
  # true negatives
  gc.collect()
  torch.cuda.empty_cache()
  IG_ATTR_TN.append(intgrad_explainer(model=model,
                                     inputs=x_batch[tn],
                                     baseline = baseline_median,
                                     **{"device": device, "n_steps": 30},))
  
  # false negatives
  gc.collect()
  torch.cuda.empty_cache()
  IG_ATTR_FN.append(intgrad_explainer(model=model,
                                     inputs=x_batch[fn],
                                     baseline = baseline_median,
                                     **{"device": device, "n_steps": 30},))
  

  durations = {
    "Lime": lime_time,
    "KernelShap": kshap_time,
    "LRP": lrp_time,
    "DeepLIFT": deeplift_time,
    "IntegratedGradients": ig_time
    }

  """ Evaluate attributions """
  eval_metrics = define_eval_metrics()

  results = {method : {} for method in xai_methods}

  for method in xai_methods:
      for metric, metric_func in eval_metrics.items():
          print(f"Evaluating {metric} of {method} method.")
          gc.collect()
          torch.cuda.empty_cache()

          # Get scores and append results.
          scores = metric_func(
              model=model,
              x_batch=x_batch.cpu().detach().numpy(),
              y_batch=0,
              a_batch=None,
              device=device,
              explain_func=explainer_wrapper,
              explain_func_kwargs={
                  "method": method,
                  "device": device,
                  "baseline":baseline_median,
                  "n_steps": 30,         
              },
          )
          results[method][metric] = scores

          # Empty cache.
          gc.collect()
          torch.cuda.empty_cache()
        
      results[method]['ComputationTime'] = durations.get(method) 
 

  # Postprocessing of scores: to get how the different explanation methods rank across criteria.
  results_agg = {}
  for method in xai_methods:
      results_agg[method] = {}
      for metric, metric_func in eval_metrics.items():
          results_agg[method][metric] = np.mean(results[method][metric])
      results_agg[method]['ComputationTime'] = results[method]['ComputationTime']

  if i == 0:
    df0 = pd.DataFrame.from_dict(results_agg)
    df0 = df0.T.abs()
    df0['Run'] = i+1
    df_all = df0.copy()
  else:
    dfi = pd.DataFrame.from_dict(results_agg)
    dfi = dfi.T.abs()
    dfi['Run'] = i+1
    df_all = df_all.append(dfi)



LIME_ATTR, KSHAP_ATTR = np.stack(LIME_ATTR), np.stack(KSHAP_ATTR)
LRP_ATTR, DeepLIFT_ATTR, IG_ATTR  =  np.stack(LRP_ATTR), np.stack(DeepLIFT_ATTR), np.stack(IG_ATTR)
IG_ATTR_TP, IG_ATTR_FP, IG_ATTR_TN, IG_ATTR_FN = np.concatenate(IG_ATTR_TP), np.concatenate(IG_ATTR_FP), np.concatenate(IG_ATTR_TN), np.concatenate(IG_ATTR_FN)


# Save explanations to file.
explanations = {
    "Lime": LIME_ATTR,
    "KernelShap": KSHAP_ATTR,
    "LRP": LRP_ATTR,
    "DeepLIFT": DeepLIFT_ATTR,
    "IntegratedGradients": IG_ATTR
}

explanations_by_group = {
    "True Positive": IG_ATTR_TP,
    "False Positive": IG_ATTR_FP,
    "True Negative": IG_ATTR_TN,
    "False Negative": IG_ATTR_FN
}

# Save train & test metrics in data frame
metrics_list = [TRAIN_LOSSES, TRAIN_ACCS, VAL_LOSSES, VAL_ACCS, VAL_AUCS, EPOCHS_STOPPED]
metrics_df = pd.DataFrame({'Train_Loss':metrics_list[0],'Train_ACC':metrics_list[1], 'Val_Loss':metrics_list[2], 'Val_ACC':metrics_list[3], 'Val_AUC':metrics_list[4],
                            'Epoch':metrics_list[5]})


""" save metrics, evaluation results and explanations to disk """

day = date.today().strftime("%m%d")

explanations_save_name = 'explanations_%s.pkl' % day
pickle_file = F"/content/gdrive/MyDrive/MA_XAI/PatientData_ready/Results/{explanations_save_name}"
with open(pickle_file, 'wb') as f0:
    pickle.dump(explanations, f0)

explanations_save_name2 = 'explanations_by_group_%s.pkl' % day
pickle_file = F"/content/gdrive/MyDrive/MA_XAI/PatientData_ready/Results/{explanations_save_name2}"
with open(pickle_file, 'wb') as f1:
    pickle.dump(explanations_by_group, f1)

df_all_save_name = 'df_all_%s.csv' % day
df_all.to_csv("/content/gdrive/MyDrive/MA_XAI/PatientData_ready/Results/%s" % df_all_save_name)

metrics_df_save_name = 'metrics_results_%s.csv' % day
metrics_df.to_csv("/content/gdrive/MyDrive/MA_XAI/PatientData_ready/Results/%s" % metrics_df_save_name)

In [None]:
# Load saved results

day = '0207'

df_metrics = pd.read_csv(F"/content/gdrive/MyDrive/MA_XAI/PatientData_ready/Results/metrics_results_{day}.csv",index_col = 0)
df_scores = pd.read_csv(F"/content/gdrive/MyDrive/MA_XAI/PatientData_ready/Results/df_all_{day}.csv", index_col = 0)

avgs = df_scores.groupby(level=0,sort=False)
df_avg = avgs.agg({'Faithfulness_Correlation':'mean', 'Faithfulness_Estimate':'mean','LocalLipSchitz':'mean', 'AvgSensitivity':'mean', 'Complexity':'mean', 'Randomisation':'mean', 'ComputationTime':'mean'})
df_avg

# Take inverse ranking for Robustness, since lower is better.
df_normalised = df_avg.loc[:, ((df_avg.columns != 'LocalLipSchitz') & (df_avg.columns != 'AvgSensitivity') & (df_avg.columns != 'ComputationTime')) ].apply(lambda x: x / x.max())
df_normalised["LocalLipSchitz"] = df_avg["LocalLipSchitz"].min()/df_avg["LocalLipSchitz"].values
df_normalised["AvgSensitivity"] = df_avg["AvgSensitivity"].min()/df_avg["AvgSensitivity"].values
df_normalised["ComputationTime"] = df_avg["ComputationTime"].min()/df_avg["ComputationTime"].values
df_normalised_rank = df_normalised.rank()
df_normalised_rank

# Load attributions
explanations_save_name = 'explanations_%s.pkl' % day
pickle_file = F"/content/gdrive/MyDrive/MA_XAI/PatientData_ready/Results/{explanations_save_name}"
with open(pickle_file, 'rb') as a0:
    attributions = pickle.load(a0)

explanations_save_name2 = 'explanations_by_group_%s.pkl' % day
pickle_file = F"/content/gdrive/MyDrive/MA_XAI/PatientData_ready/Results/{explanations_save_name2}"
with open(pickle_file, 'rb') as a1:
    attributions_by_group = pickle.load(a1)



In [None]:
sns.set()

df_normalised_rank = df_normalised_rank.iloc[:5,:]
df_for_plots = df_normalised_rank[['Faithfulness_Correlation', 'Complexity', 'Randomisation', 'LocalLipSchitz', 'ComputationTime']]

# Plotting configs.
sns.set(font_scale=1)
plt.style.use("seaborn-v0_8-whitegrid")
plt.rcParams['ytick.labelleft'] = True
plt.rcParams['xtick.labelbottom'] = True

include_titles = True
include_legend = True


# Plotting configs.
colours_order = ["#008080", "#FFA500", "#124E78", "#d62728",'purple']
methods_order = ['Faithfulness_Correlation', 'Complexity', 'Randomisation', 'LocalLipSchitz', 'ComputationTime']

plt.rcParams['ytick.left'] = True
plt.rcParams['ytick.labelleft'] = True
plt.rcParams['xtick.bottom'] = True
plt.rcParams['xtick.labelbottom'] = True
include_titles = True

In [None]:
# Make spyder graph!
data = [df_for_plots.columns.values, (df_for_plots.to_numpy())]
theta = radar_factory(len(data[0]), frame='polygon')
spoke_labels = data.pop(0)

fig, ax = plt.subplots(figsize=(10, 10), subplot_kw=dict(projection='radar'))
fig.subplots_adjust(top=0.85, bottom=0.05)
for i, (d, method) in enumerate(zip(data[0], xai_methods)):
    line = ax.plot(theta, d, label=method, color=colours_order[i], linewidth=5.0)
    ax.fill(theta, d, alpha=0.15)

# Set lables.
if include_titles:
  ax.set_varlabels(labels=['\nFaithfulness', 'Complexity\n', '\nRandomisation', '\nStability', 'Speed\n'])
else:
  ax.set_varlabels(labels=[]) 

ax.set_rgrids(np.arange(0, df_for_plots.values.max() + 0.5), labels=[]) 

# Set a title.
ax.set_title("Summary of Explanation Quantification",  position=(0.5, 1.1), ha='center', fontsize=15)

# Put a legend to the right of the current axis.
if include_legend:
    ax.legend(loc='upper left', bbox_to_anchor=(1, 0.5),fontsize = 15)

plt.tight_layout()
#plt.savefig('/content/gdrive/MyDrive/MA_XAI/PatientData_ready/Results/Spiderplot.png',bbox_inches='tight')

In [None]:
LIME_ATTR = attributions.get('Lime').reshape(attributions.get('Lime').shape[0]* attributions.get('Lime').shape[1],20)
KS_ATTR = attributions.get('KernelShap').reshape(attributions.get('KernelShap').shape[0]* attributions.get('KernelShap').shape[1],20)
LRP_ATTR = attributions.get('LRP').reshape(attributions.get('LRP').shape[0]* attributions.get('LRP').shape[1],20)
DeepLIFT_ATTR = attributions.get('DeepLIFT').reshape(attributions.get('DeepLIFT').shape[0]* attributions.get('DeepLIFT').shape[1],20)
IG_ATTR = attributions.get('IntegratedGradients').reshape(attributions.get('IntegratedGradients').shape[0]* attributions.get('IntegratedGradients').shape[1],20)

attributions_long = {'Lime':LIME_ATTR,'KernelShap':KS_ATTR,'LRP':LRP_ATTR,'DeepLIFT':DeepLIFT_ATTR,'IntegratedGradients':'IG_ATTR'}

In [None]:
# Use SHAP plot framework
X_train_df = pd.DataFrame(X_train.cpu().detach().numpy(),columns = colnames)
X_valid_df = pd.concat([pd.DataFrame(X_valid.cpu().detach().numpy(),columns = colnames)]*nr_iter)

# compute fake SHAP values that will overwritten with the computed attributions
Xtest,ytest = shap.datasets.adult()
modeltest = xgboost.XGBClassifier().fit(Xtest, ytest)
explainer = shap.Explainer(modeltest, X_train_df)
shap_values = explainer(X_valid_df)

attr = LRP_ATTR

shap_values.values = attr
shap_values.data = np.round((shap_values.data * stdX[np.newaxis,:]) + meanX[np.newaxis,:],4)

fig = plt.figure()
fig.set_size_inches(10, 6)
shap.plots.bar(shap_values, max_display=18, show_data=False, show = False)
plt.xlabel('mean|LRP Attributions|')
plt.xlim([0,0.6])
#plt.savefig('/content/gdrive/MyDrive/MA_XAI/PatientData_ready/Results/Barplot_LRP.png',bbox_inches='tight')



In [None]:
# Box plots for different attributions
def boxplot_avg_attributions (attr,title):
  a0 = np.transpose(attr)
  a0 = a0 / np.linalg.norm(a0, ord=1)
  dic = dict(zip(colnames, a0))

  fig, ax = plt.subplots()
  fig.set_size_inches(8,6, forward = True)
  boxplot = ax.boxplot(dic.values())
  ax.set_xticklabels(dic.keys(), rotation = 90, ha = 'left')
  ax.set_ylabel('Attribution')
  #ax.set_title(title)
  ax.set_ylim([-0.01,0.01])
  ax.grid(False)
  

boxplot_avg_attributions(LRP_ATTR,'KernelSHAP')
#plt.savefig('/content/gdrive/MyDrive/MA_XAI/PatientData_ready/Results/Boxplot_LRP_newScale.png',bbox_inches='tight')


In [None]:
attr_tp = attributions_by_group.get('True Positive')
attr_tn = attributions_by_group.get('True Negative')

df = pd.DataFrame(np.concatenate([attr_tp,attr_tn]), columns=colnames )

tp = ['True Positive']*len(attr_tp)
tn = ['True Negative']*len(attr_tn)
cat = tp + tn

df['Model Outcomes'] = pd.Series(cat)

# create boxplot
sns.set_style("whitegrid")


fig, axes = plt.subplots(1, 3, figsize = (15,5), sharey = True)
fig.suptitle('Attributions per Model Outcomes')

sns.boxplot(ax = axes[0], x = 'Model Outcomes', y = 'gcs', data = df,palette="Greys")
axes[0].set_title('GCS')
axes[0].set_ylabel('Attribution')
sns.boxplot(ax = axes[1], x = 'Model Outcomes', y = 'sofatotal', data = df,palette="Greys")
axes[1].set_title('SOFA score')
axes[1].set_ylabel('Attribution')
sns.boxplot(ax = axes[2], x = 'Model Outcomes', y = 'pao2', data = df,palette="Greys")
axes[2].set_title('PaO2')
axes[2].set_ylabel('Attribution')


#plt.savefig('/content/gdrive/MyDrive/MA_XAI/PatientData_ready/Results/Boxplot_tptn.png',bbox_inches='tight')



In [None]:
""" Helper function """
def attributions_sum(attr_test):
    attr_test = attr_test.reshape(attr_test.shape[0]*attr_test.shape[1],20)
    attr_test_sum = abs(attr_test).sum(0)
    #attr_test_norm_sum = attr_test_sum / np.linalg.norm(attr_test_sum, ord=1)
    
    return attr_test_sum

In [None]:
attr_iter = pd.DataFrame()
attr_iter['Features'] = colnames

for name, attr_test in attributions.items():
    attr_iter[name] = attributions_sum(attr_test)

max = attr_iter[['Lime','KernelShap','LRP','DeepLIFT','IntegratedGradients']].max(axis=0).values
min = attr_iter[['Lime','KernelShap','LRP','DeepLIFT','IntegratedGradients']].min(axis=0).values
attr_iter[['Lime','KernelShap','LRP','DeepLIFT','IntegratedGradients']] = (attr_iter[['Lime','KernelShap','LRP','DeepLIFT','IntegratedGradients']] -min) / (max - min)


attr_iter = pd.melt(attr_iter, id_vars='Features')
    
# save new attributions in final data frame
#attr_iter.rename(columns = {'value': f'value{i}'},inplace = True)
final_attr_df = pd.concat((attr_iter,attr_iter.loc[:,'value']), axis = 1)

# Normalize logistic regression coefficients also between 0 and 1
max_l = abs_logit_coef.max(axis=0).value
min_l = abs_logit_coef.min(axis=0).value

abs_logit_coef['value'] = (abs_logit_coef['value'] - min_l) / (max_l - min_l)

# Attach coefficients from logistic regression
attr_iter_f = pd.concat([attr_iter,abs_logit_coef])

# Renaming for graph
attr_iter_f.columns = ['Feature', 'Method', 'value']
  

In [None]:
# visualize
fig  = px.bar(
    data_frame=attr_iter_f,
    x='value',
    y='Method',
    facet_col='Feature',
    facet_col_wrap=5,
    color='Method',
    orientation='h',
    height=1000,
    width=1200,
    template='ggplot2'
)

fig.show()