In [None]:
#!pip install lime
#!pip install shap
#!pip install anchor-exp
#!pip install hyperopt

import pandas as pd
import numpy as np

import xgboost as xgb
from xgboost import XGBClassifier

from hyperopt import hp
import pickle

import sklearn
from sklearn.metrics import f1_score


import os
import joblib

import warnings
warnings.filterwarnings('ignore')

import matplotlib.pyplot as plt
plt.style.use('seaborn-deep')

import statistics
import scipy as scp
import math

import lime
import lime.lime_tabular

import shap

from anchor import anchor_tabular

import seaborn as sns

import random

In [None]:
def calculate_mape(p1, diffs):
    iters = len(diffs)
    total_diffs = sum([abs(diff/p1) for diff in diffs])
    mape = total_diffs/iters
    
    return mape

In [None]:
def generate_random_excluding(exc_min, exc_max, min_val, max_val):
    new_val = random.uniform(min_val, max_val)
    
    while new_val > exc_min and new_val < exc_max:
        new_val = random.uniform(min_val, max_val)
    return new_val

In [None]:
def rule_breakdown(exp_list, feat_locs, feat_names, train_data):
    
    distribs = []
    
    for i in range(len(feat_locs)):
        feat_name = feat_names[i]
                
        min_vals = []
        max_vals = []

        for rule in exp_list:
            if feat_name in rule:
                loc = feat_locs[i]
    
                if rule.count("<") == 1:
                    min_vals.append(min([inst[loc] for inst in train_data]))
                    max_vals.append(float(rule.split("<")[-1].strip(" ").strip("=")))
                    
                elif rule.count("<") == 0 and rule.count(">") == 1:
                    min_vals.append(float(rule.split(">")[-1].strip(" ").strip("=")))
                    max_vals.append(max([inst[loc] for inst in train_data]))
                    
                elif rule.count("<") > 1:
                    min_vals.append(float(rule.split("<")[0].strip(" ").strip("=")))
                    max_vals.append(float(rule.split("<")[-1].strip(" ").strip("=")))
        
        distribs.append((feat_locs[i], min(min_vals), max(max_vals)))
                
    return distribs

In [None]:
def find_lime_distribution(instance, explainer, cls, quartiles, train_data, classification):
    lime_exp = []
    for i in range(exp_iter):
        if classification==True:
            lime_exp.extend(explainer.explain_instance(instance, cls.predict_proba, 
                                                num_features=len(feat_list), labels=[0,1]).as_list())
        else:
            lime_exp.extend(explainer.explain_instance(instance, cls.predict, 
                                                    num_features=len(feat_list), labels=[0,1]).as_list())
            
    weights = [[] for each in feat_list]
    for exp in lime_exp:
        feat = exp[0].replace("= ",'')
        if '<' in feat:
            parts = feat.split('<')
        elif '>' in feat:
            parts = feat.split('>')
        
        for part in parts:
            if part.replace('.','').replace(' ','').isdigit()==False:
                feat_name = part.replace(' ','')
        n = feat_list.index(feat_name)
        weights[n].append(exp[1])
    
    weights = np.transpose(weights)
    avg_weight = np.average(np.array(weights), axis = 0)
    abs_weight = [abs(weight) for weight in avg_weight] 
        
    #For average explanation    
    bins = pd.cut(abs_weight, 10, retbins = True, duplicates = "drop")
    quartile_min = bins[1][quartiles]
    
    rel_features = [i for i in range(len(abs_weight)) if abs_weight[i] > quartile_min]    
    feat_names = [feat_list[i] for i in rel_features]
    
    rules = [exp[0] for exp in lime_exp]
    
    distribs = rule_breakdown(rules, rel_features, feat_names, train_data)
    
    return distribs


def find_shap_val_distribution(instance, explainer, quartiles, percentage, global_exp, train_data, classification):
    full_exp = [shap_explainer(instance, check_additivity = False).values for i in range(exp_iter)]
    
    if classification==True:
        shap_exp = []
        for each in full_exp:
            single_exp = [feat[0] for feat in each]
            shap_exp.append(single_exp)
    else:
        shap_exp = [exp.reshape(len(feat_list)) for exp in full_exp]

    avg_shap_exp = np.average(shap_exp, axis = 0)
    
    abs_exp = [abs(exp) for exp in avg_shap_exp]

    bins = pd.cut(abs_exp, 10, retbins = True, duplicates = "drop")
    quartile_min = bins[1][quartiles]

    rel_feats = [i for i in range(len(avg_shap_exp)) if abs_exp[i] > quartile_min]

    distribs = []
    

    for loc in rel_feats:
        orig_val = avg_shap_exp[loc]
        bin_size = abs(orig_val * percentage)
        min_val = orig_val-bin_size
        max_val = orig_val+bin_size

        feat_vals = []

        for i in range(len(global_exp)):
            if global_exp[i][loc] > min_val and global_exp[i][loc] < max_val:
                feat_vals.append(train_data[i][loc])

        max_feat = max(feat_vals)
        min_feat = min(feat_vals)

        distribs.append((loc, min_feat, max_feat))

    return distribs

In [None]:
def find_feat_val_distribution(instance, explainer, explanation_method = "lime", cls = None, deciles = 9, percentage = 0.3, global_exp = None, train_data = None, classification = True):
    
    if deciles == 1:
        bin_loc = -2
    elif deciles == 2:
        bin_loc = -3
    elif deciles == 3:
        bin_loc = -4
    elif deciles == 4:
        bin_loc = -5
    elif deciles == 5:
        bin_loc = -6
    elif deciles == 6:
        bin_loc = -7
    elif deciles == 7:
        bin_loc = -8
    elif deciles == 8:
        bin_loc = -9
    else:
        bin_loc = -10
    
    if explanation_method == "shap":
        distribs = find_shap_val_distribution(instance, explainer, bin_loc, percentage, global_exp, train_data, classification)
    
    elif explanation_method == "lime":
        distribs = find_lime_distribution(instance, explainer, cls, bin_loc, train_data, classification)
        
    return distribs

In [None]:
# path to project folder
# please change to your own
PATH = os.getcwd()

dataset = "income"
cls_method = "xgboost" 
classification = True

if classification == True:
    shap_deciles = 8
    lime_deciles = 7
    percentage = 0.5
else:
    shap_deciles = 9
    lime_deciles = 9
    percentage = 0.3

random_state = 39
exp_iter = 10
random.seed(random_state)

save_to = "%s/%s/" % (PATH, dataset)
dataset_folder = "%s/datasets/" % (save_to)
final_folder = "%s/%s/" % (save_to, cls_method)

#Get datasets
X_train = pd.read_csv(dataset_folder+dataset+"_Xtrain.csv", index_col=False, sep = ";")
test_x = pd.read_csv(final_folder+"test_sample.csv", index_col=False, sep = ";").values

results = pd.read_csv(os.path.join(final_folder,"results.csv"), index_col=False, sep = ";")

feat_list = [each.replace(' ','_') for each in X_train.columns]

X = np.vstack((X_train.values, test_x))

In [None]:
cls = joblib.load(save_to+cls_method+"/cls.joblib")

In [None]:
results

In [None]:
lime_supporting = []
lime_contrary = []

if classification==True:
    class_names=['Negative','Positive']# negative is 0, positive is 1, 0 is left, 1 is right
    lime_explainer = lime.lime_tabular.LimeTabularExplainer(X_train.values, feature_names = feat_list, 
                                                            class_names=class_names, discretize_continuous=True)
else:
    class_names = ['Final Value']
    lime_explainer = lime.lime_tabular.LimeTabularExplainer(X_train.values, feature_names = feat_list, 
                                                            class_names=class_names, discretize_continuous=True, mode = "regression")
#index = 0
for instance in test_x:
    #index+=1
    #print(index, "of", len(test_x))
    distribution = find_feat_val_distribution(instance, lime_explainer, explanation_method = "lime", cls = cls, 
                                              train_data = X, classification = classification, deciles = lime_deciles)    
    for i in range(len(distribution)):
        each = list(distribution[i])
        
        all_val = [instance[each[0]] for instance in X]
        feat_min = min(all_val)
        feat_max = max(all_val)
        interval = each[2]-each[1]
                
        if feat_min != each[1] or feat_max != each[2]:
            new_min = each[1] - interval if each[1] - interval > feat_min else feat_min
            new_max = each[2] + interval if each[2] + interval < feat_max else feat_max
            
            new_min = new_min if new_min != each[1] and new_min < each[1] else each[1] - interval
            new_max = new_max if new_max != each[2] and new_max > each[2] else each[2] + interval
                        
            new_min = new_min if each[2] != feat_min else feat_min-100
            new_max = new_max if each[1] != feat_max else feat_max+100
            
        else:
            new_min = each[1] - interval
            new_max = each[2] + interval
        
        each.append(new_min)
        each.append(new_max)
        
        distribution[i] = tuple(each)
        
    if classification:
        pred = cls.predict(instance.reshape(1, -1))
        p1 = float(cls.predict_proba(instance.reshape(1, -1))[0][pred])
    else:
        p1 = cls.predict(instance.reshape(1, -1))[0]
        
    supp_diffs = []
    for i in range(exp_iter):
        alt_input = np.copy(instance)
        for each in distribution:
            alt_input[each[0]] = random.uniform(each[1], each[2])
        if classification:
            p2 = float(cls.predict_proba(alt_input.reshape(1, -1))[0][pred])
        else:
            p2 = cls.predict(instance.reshape(1, -1))[0]
        diff = p1-p2
        supp_diffs.append(diff)

    supporting_mape = 1-calculate_mape(p1, supp_diffs)
    lime_supporting.append(supporting_mape)
    
    cont_diffs = []
    for i in range(exp_iter):
        alt_input = np.copy(instance)
        for each in distribution:
            alt_input[each[0]] = generate_random_excluding(each[1], each[2], each[3], each[4])
        if classification:
            p2 = float(cls.predict_proba(alt_input.reshape(1, -1))[0][pred])
        else:
            p2 = cls.predict(instance.reshape(1, -1))[0]
        diff = p1-p2
        cont_diffs.append(diff)
    contrary_mape = calculate_mape(p1, cont_diffs)
    lime_contrary.append(contrary_mape)
    

results["LIME Supporting Fidelity"] = lime_supporting
results["LIME Contrary Fidelity"] = lime_contrary


In [None]:
print(np.mean(lime_supporting))
print(np.mean(lime_contrary))

In [None]:
shap_supporting = []
shap_contrary = []

if cls_method == "xgboost":
    shap_explainer = shap.Explainer(cls)
else:
    shap_explainer = shap.Explainer(cls, X, check_additivity = False)

all_shap_val = []
for i in range(exp_iter):
    exp = shap_explainer(X, check_additivity = False).values
    
    if classification==True:
        all_exp = []
        for each in exp:
            if len(each.shape) != 1:
                single_exp = np.array([feat[0] for feat in each])
                all_exp.append(single_exp)
            else:
                single_exp = each
                all_exp.append(single_exp)
    else:
        all_exp = exp
        
    all_shap_val.append(all_exp)

all_shap_exp = np.average(all_shap_val, axis = 0)


for instance in test_x:
    if cls_method == "xgboost":
        instance = instance.reshape(1, -1)
    distribution = find_feat_val_distribution(instance, shap_explainer, explanation_method = "shap", 
                                              global_exp = all_shap_exp, train_data = X, classification = classification, deciles = shap_deciles, percentage = percentage)
    
    for i in range(len(distribution)):
        each = list(distribution[i])
        
        all_val = [instance[each[0]] for instance in X_train.values]
        feat_min = min(all_val)
        feat_max = max(all_val)
        interval = each[2]-each[1]
                
        if feat_min != each[1] or feat_max != each[2]:
            new_min = each[1] - interval if each[1] - interval > feat_min else feat_min
            new_max = each[2] + interval if each[2] + interval < feat_max else feat_max
            
            new_min = new_min if new_min != each[1] and new_min < each[1] else each[1] - interval
            new_max = new_max if new_max != each[2] and new_max > each[2] else each[2] + interval
            
        else:
            new_min = each[1] - interval
            new_max = each[2] + interval
        
        each.append(new_min)
        each.append(new_max)
        
        distribution[i] = tuple(each)
    
    if classification:
        pred = cls.predict(instance.reshape(1, -1))
        p1 = float(cls.predict_proba(instance.reshape(1, -1))[0][pred])
    else:
        p1 = cls.predict(instance.reshape(1, -1))[0]
    
    
    supp_diffs = []
    for i in range(exp_iter):
        alt_input = np.copy(instance.reshape(len(feat_list)))
        for each in distribution:
            alt_input[each[0]] = random.uniform(each[1], each[2])
        if classification:
            p2 = float(cls.predict_proba(alt_input.reshape(1, -1))[0][pred])
        else:
            p2 = cls.predict(instance.reshape(1, -1))[0]
        diff = p1-p2
        supp_diffs.append(diff)
    supporting_mape = 1-calculate_mape(p1, supp_diffs)
    shap_supporting.append(supporting_mape)
    
    cont_diffs = []
    for i in range(exp_iter):
        alt_input = np.copy(instance.reshape(len(feat_list)))
        for each in distribution:
            alt_input[each[0]] = generate_random_excluding(each[1], each[2], each[3], each[4])
        if classification:
            p2 = float(cls.predict_proba(alt_input.reshape(1, -1))[0][pred])
        else:
            p2 = cls.predict(instance.reshape(1, -1))[0]
        diff = p1-p2
        cont_diffs.append(diff)
    contrary_mape = calculate_mape(p1, cont_diffs)
    shap_contrary.append(contrary_mape)

results["SHAP Supporting Fidelity"] = shap_supporting
results["SHAP Contrary Fidelity"] = shap_contrary


In [None]:
print(np.mean(shap_supporting))
print(np.mean(shap_contrary))

In [None]:
shap_explainer.features = feat_list
new_exp = shap_explainer(X)
new_exp.feature_names = feat_list

shap.plots.beeswarm(new_exp, max_display = len(feat_list))

In [None]:
results.to_csv(os.path.join(final_folder, "results.csv"), index = False, sep = ";")

In [None]:
results