In [2]:
import lime
import sklearn
import numpy as np
import sklearn.ensemble
import sklearn.metrics
import sklearn.model_selection
from lime.lime_tabular import LimeTabularExplainer
import pickle
import os
import pandas as pd
import time
from sklearn.metrics.pairwise import rbf_kernel
import shap


from MMD_critic import mmd

# Data Load
Data loading and train-test split.
Selected task is 1-year mortality perdiction from Hospital's data. 

In [None]:
base_directory = os.path.abspath(os.path.join(os.curdir, os.pardir, "ADHF"))
data_dir = os.path.join(base_directory, "data")

df_patients = pd.read_csv(os.path.join(data_dir, "CHF_data_2015_normalized_z_score_bun_temp_k_na_hb.csv"),
                          header=0, thousands=',',low_memory=False, index_col=0)

y = pd.read_csv(os.path.join(data_dir, "mesurements_for_pred_with_index_2015_z_score.csv"), index_col=0)

In [None]:
y_label = '1y_mort'

x_train = df_patients.loc[~df_patients['year'].isin([10,11])]
y_train = y.loc[x_train.index,y_label]

x_test = df_patients.loc[df_patients['year'].isin([10,11])]
y_test = y.loc[x_test.index,y_label]

In [None]:
def x_train_test(cols):

    x_train = df_patients.loc[~df_patients['year'].isin([10,11]),cols]
    x_test = df_patients.loc[df_patients['year'].isin([10,11]),cols]
    return x_train, x_test

# Load Classifiers
Loading different pre-trained LR classifiers and their coefficents for evaluation.

In [None]:
full_colls_name = 'indices_disease_drug_admin_personal'
four_best_colls_name = 'indices_disease_admin_personal'
three_best_colls_name = 'indices_admin_personal'

# type_list = [full_colls_name, four_best_colls_name, three_best_colls_name]
type_list = [full_colls_name]
models_dir = os.path.join(base_directory, "models")
norm_type = 'z_score'
norm_dir = os.path.join(models_dir, norm_type)

fpr_tpr_list = []
predictors = []
prob_pos_list = []
y_pred_list = []
for type_ in type_list:
    df_colls = pd.read_csv(os.path.join(norm_dir, "coef_1_{}.csv".format(type_)), index_col=0)
    x_train, x_test = x_train_test(df_colls['0'])
    logit_reg = pickle.load(open(os.path.join(norm_dir, "1_{}.pkl".format(type_)),'rb'))
    
    # for callibration
    prob_pos = logit_reg.predict_proba(x_test)[:, 1]
    y_pred = logit_reg.predict(x_test)
    prob_pos_list.append(prob_pos)
    y_pred_list.append(y_pred)
    
    for auc
    fpr, tpr, _ = roc_curve(y_test, prob_pos)
    roc_auc = auc(fpr, tpr)
    # save auc
    predictors.append(logit_reg)
    fpr_tpr_list.append((fpr, tpr, roc_auc))    

# Interpetability Evaluation

Different combinations of sample-pick and local explantions approach selected for evaluation are:
* SP-LIME + LIME
* SHAP + SP-LIME
* LIME + MMD-critic
* SHAP + MMD-critic
* LIME + $\alpha, \beta$- mistakes

Note that the local explantions for this case are compatible with regression classifers. 

Examined condifdence intervals: 
* 0-0.3
* 0.3-0.5
* 0.5- 0.8
* 0.8 - 1

In [None]:
# specific classifier
logit_reg = predictors[0]

In [None]:
model_coef = pd.read_csv(os.path.join(models_dir, "coef_1_indices_disease_admin_personal.csv"), index_col=0)

four_best_cols = model_coef['0']

In [None]:
# x_train, x_test = x_train_test(full_colls_name)

In [1]:
def interpret_data(X, y_, func, indexs, feature_names=None):
    """
    Explantions with LIME. 
    Note - Class names are manually set, change according to y-labels.
    
    params:
    X: train
    y_: test
    func: calssifer
    index: selected samples
    feature_names: features to evaluate. If None than all features are explained.
    
    Returns: samples explanations & explainer
    """
    if not feature_names:
        feature_names=X.columns.tolist()
    explainer = LimeTabularExplainer(X, discretize_continuous=False, feature_names=feature_names, class_names=['alive', 'dead'], random_state=24)
    explanations = []
    for i in indexs:
        explanations.append(explainer.explain_instance(X.values[i, :], func, num_features=10))
    return explanations, explainer


In [None]:
wrong_predicted = np.where(logit_reg.predict(x_test) != y_test)[0]


In [None]:
# specific patient

i = [490]
print("the patient is: {}".format("alive" if y_test.values[i[0]] == 0 else "dead"))

y_hat = logit_reg.predict_proba(x_test)

exp, explainer = interpret_data(x_test, y_hat, logit_reg.predict_proba, i, )
exp[2].show_in_notebook(show_table=True,)



In [None]:
# sample selections with mmd-critic
kernal = rbf_kernel(x_train, gamma=0.5)
m = 200
k = 40

selected = mmd.greedy_select_protos(kernal, np.array(range(np.shape(kernal)[0])), m)
select_crit = mmd.select_criticism_regularized(kernal, selected, k, is_K_sparse=False)

In [None]:
# explain with LIME
times, scores = interpret_data(x_test, y_hat, logit_reg.predict_proba)
print('%9.4fs %9.4fs %9.4fs' % (min(times), sum(times) / len(times), max(times)))
print('%9.4f %9.4f% 9.4f' % (min(scores), sum(scores) / len(scores), max(scores)))

In [None]:
intersect = set(select_crit).intersection(set(wrong_predicted))
print(intersect)

print("selected:{}, wrong_predicted:{}, diff:{}".format(len(select_crit), len(wrong_predicted), len(intersect)))

In [None]:
# explain with SHAP
shap.initjs()


In [None]:
import matplotlib.pyplot as plt


explainer = shap.KernelExplainer(logit_reg.predict_proba, x_train.iloc[selected], link='logit')
shap_values = explainer.shap_values(x_test[:2])
shap.summary_plot(shap_values, x_test.iloc[:0,:], class_names=['Dead', 'Alive'], show=False)
plt.tight_layout()
plt.savefig("test.png", bbox_inches='tight')

In [None]:
logit_reg.predict_proba(x_test.values)[:, 1][:2]
y_test[:2]

In [None]:
import warnings
from lime import submodular_pick
sp_obj = submodular_pick.SubmodularPick(explainer, x_train.values, logit_reg.predict_proba, sample_size=100, num_features=14,num_exps_desired=6)

In [None]:
# confidence intervals

ranges = []

# 1 - 0 - 0.3
ranges.append(np.where(logit_reg.predict_proba(x_test)[:, 1] < 0.3)[0])

# 2 - 0.3 - 0.5
ranges.append(np.where((logit_reg.predict_proba(x_test)[:, 1] < 0.5) & (logit_reg.predict_proba(x_test)[:, 1] >= 0.3))[0])

# 3 - 0.5 - 0.8
ranges.append(np.where((logit_reg.predict_proba(x_test)[:, 1] < 0.8) & (logit_reg.predict_proba(x_test)[:, 1] >= 0.5))[0])

# 4 - 0.8 - 1
ranges.append(np.where((logit_reg.predict_proba(x_test)[:, 1] <= 1) & (logit_reg.predict_proba(x_test)[:, 1] >= 0.8))[0])

In [None]:
m = 2
k = 2
selected_list = []
selected_crit = []
for range_ in ranges:
    kernal = rbf_kernel(x_test.values[range_], gamma=0.1)
    selected = mmd.greedy_select_protos(kernal, np.array(range(np.shape(kernal)[0])), m)
    selected_list.append(selected)
    selected_crit.append(mmd.select_criticism_regularized(kernal, selected, k, is_K_sparse=False))
    

In [None]:
range_selection = zip(selected_list, selected_crit)
y_hat = logit_reg.predict_proba(x_test)
types = {0: 'prototype',1: 'prototype', 2: 'critic', 3: 'critic'}
ranges_type = {0: '[0, 0.3)', 1: '[0.3, 0.5)', 2: '[0.5, 0.8)', 3: '[0.8, 1]'}

for idx, selection in enumerate(range_selection):
    print("for the range of {}:".format(ranges_type[idx]))
    selected = np.hstack(selection)
    selected = [ranges[idx][i] for i in selected]
    explenation, explainer = interpret_data(x_test, y_hat, logit_reg.predict_proba, selected)
    for i in range(4):
        print("the {} #{} patient is: {}".format(types[i], selected[i]
                                                 , "alive" if y_test.values[selected[idx]] == 0 else "dead"))
        explenation[i].show_in_notebook(show_table=True,)

In [None]:
from sklearn.datasets import load_wine
from sklearn.linear_model import LogisticRegression

wine = load_wine()

data_locs = np.where(np.array(wine['target']) != 2)

x = wine['data'][data_locs]
y = wine['target'][data_locs]

wine_logit_reg = LogisticRegression()

wine_logit_reg.fit(x,y)
wine_logit_reg.predict(x)

wine_logit_reg.score(x,y)


In [None]:
def interpret_data2(X, y_, func, indexs, feature_names=None):
    if not feature_names:
        feature_names=X.columns.tolist()
    explainer = LimeTabularExplainer(X, discretize_continuous=False, feature_names=feature_names, class_names=['Class 1', 'Class 2'], random_state=24)
    explanations = []
    explanations.append(explainer.explain_instance(X[indexs], func, num_features=10))
    return explanations, explainer

In [None]:
y_hat = wine_logit_reg.predict_proba(x)

explenation, explainer = interpret_data2(x, y_hat, wine_logit_reg.predict_proba, 4, wine['feature_names'])
print("the #{} wine is in class {}".format(4 , y[4]+1))
explenation[0].show_in_notebook(show_table=True,)

In [None]:
y_hat

In [None]:
# # [np.hstack(i) for i in zip(selected_list, selected_crit)]
# range_selection = zip(selected_list, selected_crit)

# for idx, selection in enumerate(range_selection):
#     print("for the range of {}:".format(ranges_type[idx]))
#     selected = np.hstack(selection)
#     print([logit_reg.predict_proba(x_test.values[ranges[idx][x]].reshape(1,-1))[0][1] for x in selected])


# logit_reg.predict_proba(x_test.values[ranges[1]])
# # # np.vstack(selected_list, selected_crit)

explainer = LimeTabularExplainer(x_test, discretize_continuous=False, feature_names=x_test.columns.tolist(), class_names=['alive', 'dead'], random_state=24)
#     times, scores = [], []
explanation = explainer.explain_instance(x_test.values[1847, :], logit_reg.predict_proba, num_features=20)# range

with open('bla.html','w') as f:
    f.write(explanation.as_html(show_predicted_value=True))
# explanation.as_list()
# logit_reg.predict_proba(x_test.values[0].reshape(1,-1))[0]

In [None]:
explainer = LimeTabularExplainer(x_test, discretize_continuous=False, feature_names=x_test.columns.tolist(), class_names=['alive', 'dead'], random_state=24)


for idx, range_ in enumerate(ranges):
    print("for the range of {}:".format(ranges_type[idx]))
#     selected = np.hstack(selection)
#     selected = [ranges[idx][i] for i in selected]
#     explainer = LimeTabularExplainer(x_test, discretize_continuous=False, feature_names=x_test.columns.tolist(), class_names=['alive', 'dead'], random_state=24)

#     explenation, explainer = interpret_data(x_test, y_hat, logit_reg.predict_proba, selected)
    sp_obj = submodular_pick.SubmodularPick(explainer, x_train.values[range_], logit_reg.predict_proba, sample_size=20, num_features=5,num_exps_desired=4)

    for i in range(4):
        print("the #{} patient is: {}".format(selected[i]
                                                 , "alive" if y_test.values[selected[idx]] == 0 else "dead"))
        sp_obj.sp_explanations[i].show_in_notebook(show_table=True,)






In [None]:
logit_reg.predict_proba(x_test)[:, 1][491]


In [None]:
# selection according to typeI typeII mistakes and LIME

# wrong_predicted = np.where(logit_reg.predict(x_test)[ranges[0]] != y_test.values[ranges[0]])[0]
figs_dir = os.path.abspath(os.path.join(os.curdir, "figs"))

type_1_erros = []
type_2_erros = []

type_1_erros = np.where(logit_reg.predict(x_test) > y_test.values)[0]
type_2_erros = np.where(logit_reg.predict(x_test) < y_test.values)[0]
correctly_predicted = np.where(logit_reg.predict(x_test) == y_test.values)[0]
    
kobe_numbers = 2408
np.random.seed(kobe_numbers)
    
num_of_samples = 5
y_hat = logit_reg.predict_proba(x_test)

types = {0: 'type-I', 1: 'type-II', 2: 'Correctelly predicted'}

selected_type_1 = np.random.choice(type_1_erros, num_of_samples, replace=False)
selected_type_2 = np.random.choice(type_2_erros, num_of_samples, replace=False)
selected_correct = np.random.choice(correctly_predicted, num_of_samples, replace=False)
selected = np.hstack([selected_type_1, selected_type_2, selected_correct])

explenation, explainer = interpret_data(x_test, y_hat, logit_reg.predict_proba, selected)

for i in range(15):
    if i % num_of_samples == 0:
        print("for the type of of {}:".format(types[i // num_of_samples]))
    print("the #{} patient is: {}".format(selected[i] 
                                          , "alive" if y_test.values[selected[i]] == 0 else "dead"))
    explenation[i].show_in_notebook(show_table=True,)
     explenation[i].save_to_file(file_path=os.path.join(figs_dir, ""))

In [None]:
# selection according to typeI typeII mistakes and SHAP

figs_dir = os.path.abspath(os.path.join(os.curdir, "figs"))

type_1_erros = np.where(logit_reg.predict(x_test) > y_test.values)[0]
type_2_erros = np.where(logit_reg.predict(x_test) < y_test.values)[0]
correctly_predicted = np.where(logit_reg.predict(x_test) == y_test.values)[0]
    
kobe_numbers = 2408
np.random.seed(kobe_numbers)
    
num_of_samples = 2
y_hat = logit_reg.predict_proba(x_test)

types = {0: 'type-I', 1: 'type-II', 2: 'Correctelly predicted'}

selected_type_1 = np.random.choice(type_1_erros, num_of_samples, replace=False)
selected_type_2 = np.random.choice(type_2_erros, num_of_samples, replace=False)
selected_correct = np.random.choice(correctly_predicted, num_of_samples, replace=False)
explainer_orig = shap.KernelExplainer(logit_reg.predict_proba, x_train.iloc[selected], link='logit')

for idx, selected in enumerate([selected_type_1, selected_type_2, selected_correct]):
    for s in selected:
        shap_values = explainer.shap_values(x_test.iloc[[s]])
        shap.summary_plot(shap_values, x_test.iloc[:0,:], class_names=['Dead', 'Alive'], show=False)
        plt.title("Shap for {}_{}".format(types[idx], s))
        plt.tight_layout()
        plt.savefig("Shap for {}_{}".format(types[idx], s), bbox_inches='tight')
        plt.close()

In [None]:
# with confidence intervals

range_selection = zip(selected_list, selected_crit)
y_hat = logit_reg.predict_proba(x_test)
types = {0: 'prototype',1: 'prototype', 2: 'critic', 3: 'critic'}
ranges_type = {0: '[0, 0.3)', 1: '[0.3, 0.5)', 2: '[0.5, 0.8)', 3: '[0.8, 1]'}

for idx, selection in enumerate(range_selection):
    print("for the range of {}:".format(ranges_type[idx]))
    selected = np.hstack(selection)
    selected = [ranges[idx][i] for i in selected]
#     explenation, explainer = interpret_data(x_test, y_hat, logit_reg.predict_proba, selected)
    for i in range(4):
        shap_values = explainer.shap_values(x_test.iloc[[selected[i]]])
        shap.summary_plot(shap_values, x_test.iloc[:0,:], class_names=['Dead', 'Alive'], show=False)
        plt.title("Shap for the range of {}_{}".format(ranges_type[idx], selected[i]))
        plt.tight_layout()
        file_name = os.path.join(figs_dir, "Shap for the range of {}_{}".format(types[idx], selected[i]))
        plt.savefig(file_name, bbox_inches='tight')
        plt.close()

