In [None]:
# !pip install shap

In [None]:
# Import libraries
import os
import pandas as pd
import pickle
from sklearn.ensemble import RandomForestClassifier
import matplotlib.pyplot as plt
import random
import lime
from lime import lime_tabular
import shap

import json
from typing import Union
from datetime import datetime as dt
import numpy as np


Define the necessary paths to obtain data sources after running the main pipeline

In [None]:
root = "/storage/scratch/e17-4yp-xai/Documents/artefact_backup/backup_1M_inputs_rand_frst/model_outputs/artifacts/random_forest"

In [None]:
train_files = sorted(['train_fold_1_2017-01-06.csv', 'train_fold_2_2016-09-08.csv', 'train_fold_3_2016-05-11.csv', 'train_fold_4_2016-01-12.csv', 'train_fold_5_2015-09-14.csv' , 'train_fold_6_2015-05-17.csv'], reverse=True)
test_files = [ 'test_fold_6_2015-05-17.csv' , 'test_fold_5_2015-09-14.csv', 'test_fold_4_2016-01-12.csv', 'test_fold_3_2016-05-11.csv',  'test_fold_2_2016-09-08.csv', 'test_fold_1_2017-01-06.csv']


test_pred = ['test_prediction_fold_6_2015-05-17.csv' , 'test_prediction_fold_5_2015-09-14.csv', 'test_prediction_fold_4_2016-01-12.csv', 'test_prediction_fold_3_2016-05-11.csv', 'test_prediction_fold_2_2016-09-08.csv' , 'test_prediction_fold_1_2017-01-06.csv' ]

models = sorted(["random_forest_fold_1_2017-01-06.pkl", "random_forest_fold_2_2016-09-08.pkl", "random_forest_fold_3_2016-05-11.pkl", "random_forest_fold_4_2016-01-12.pkl", "random_forest_fold_5_2015-09-14.pkl", "random_forest_fold_6_2015-05-17.pkl" ], reverse = True)

In [None]:
#### SET PATH
# ROOT = "/storage/scratch/e17-4yp-xai/Documents/e17-4yp-using-machine-learning-in-high-stake-settings/code/"
ROOT = "./"
LIME_DEST = ROOT + "model_outputs/lime/"
SHAP_DEST = ROOT + "model_outputs/shap/"
TREESHAP_DEST = ROOT + "model_outputs/treeshap/"


In [None]:
# Define paths for testing locally  - comment out when not required 
root = 'C:/Users/User/random_forest/'
train_files = sorted(['train_fold_1_2017-01-06.csv'])
test_files = ['test_fold_1_2017-01-06.csv']
test_pred = ['test_prediction_fold_1_2017-01-06.csv']
models = sorted(["random_forest_fold_1_2017-01-06.pkl"])
LIME_DEST = root + "xai_test_results/lime/"
SHAP_DEST = root + "xai_test_results/kernelshap/"
TREESHAP_DEST = root + "xai_test_results/treeshap/"
OUTPUT_DEST = root + "xai_test_results/"

In [None]:
# Required helper functions

def load_model(model_file_path):
    return pickle.load(open(model_file_path, 'rb'))

def save_json(dict_obj: Union[dict, list], path: str):
    writable_json = json.dumps(dict_obj, indent=4)
    with open(path, 'w') as file:
        file.write(writable_json)

Functions for LIME, TreeSHAP and KernelSHAP

In [None]:

# Functions used to generate LIME explanation and to save the explanation

# Function to convert the features in a LIME explanation object to a list of tuples
def get_lime_feature_list(exp_object, feat_names):
    exp_list = exp_object.as_map().get(1)
    exp_list_with_feature_names = [(feat_names[x[0]], x[1]) for x in exp_list]
    print(f'exp_list_with_feature_names = {exp_list_with_feature_names}')
    return exp_list_with_feature_names

def get_lime_feature_list_encoded(exp_object):
    exp_list = exp_object.as_map().get(1)
    print(f'exp_list = {exp_list}')
    return exp_list

def save_lime_explanation(exp, instance_loc, model_name, position, project_id,fold):

    # exp.show_in_notebook(show_table=True)
    # Save as html file
    filepath = f"{LIME_DEST}{fold}/{position}"
    try:
        os.makedirs(filepath, exist_ok = True)
        print("Directory '%s' created successfully" %filepath)
    except OSError as error:
        print("Directory '%s' can not be created")

    # Saving the explanation as an image
    exp.save_to_file(f'{LIME_DEST}{fold}/{position}/lime_exp_{project_id}_{model_name}.html')

    # Save as pyplot figure
    exp.as_pyplot_figure()
    plt.savefig(f'{LIME_DEST}{fold}/{position}/lime_exp_{project_id}_{model_name}.png')
    print(f"Saving lime exp for {project_id}_{model_name}")
    plt.show()
    plt.cla()

    return


# Function to generate explanations for a list of instances
def get_lime_for_list(instance_loc_list, x_test, explainer_lime, model, model_name, list_type, fold, num_of_feat):

    # Dictionary used to save the explanation objects
    exp_objects = {}
    exp_as_list_objects = {}
    
    for instance_loc in instance_loc_list:
        # Select instance
        instance = x_test.iloc[instance_loc]
        # Find its Project ID
        project_id = instance["Project ID"]
        # Drop the Project ID value from the instance since its not a feature
        instance = instance.drop(["Project ID"])
        # Get the explanation
        exp = explainer_lime.explain_instance(
            data_row=instance,
            predict_fn=model.predict_proba,
            num_features=num_of_feat
        )
        # Append exp object to dictionary
        exp_objects[project_id] = exp
        # Find the feature names as a list
        feat_names = instance.keys().to_list()
        # Get the explanation as a list of tuples and save
        exp_as_list = get_lime_feature_list(exp, feat_names)
        exp_as_list_objects[project_id] = exp_as_list
        # Save the explanation as a figure
        save_lime_explanation(exp, instance_loc, model_name, list_type, project_id, fold)
        
    return exp_objects, exp_as_list_objects

def get_lime_explanation(x_train, x_test, top_instance_loc_list, bottom_instance_loc_list, class_names, mode, model, model_name, fold, num_of_feat):

    # take the list of instances and save the explaination of each instance.
    # LIME: define the explainer
    # Ex: mode = 'classification' or 'regression'
    #     class_names = ['0', '1']

    categorical_feature_names = x_train.dtypes[x_train.dtypes==bool].index.to_list()
    categorical_feature_index = [x_train.columns.get_loc(col) for col in categorical_feature_names]
    
    # Define the explainer
    explainer_lime = lime_tabular.LimeTabularExplainer(
        training_data=np.array(x_train),
        feature_names=x_train.columns,
        categorical_features = categorical_feature_index,
        class_names=class_names,
        mode=mode
    )
    
    # Get LIME explanations for both top and bottom lists with Project ID
    print("Top list")
    exp_objects_top, exp_as_list_objects_top = get_lime_for_list(top_instance_loc_list, x_test, explainer_lime, model, model_name, "top", fold, num_of_feat)
    print("Bottom list")
    exp_objects_bottom, exp_as_list_objects_bottom = get_lime_for_list(bottom_instance_loc_list, x_test, explainer_lime, model, model_name, "bottom", fold, num_of_feat)
    
    return exp_as_list_objects_top, exp_as_list_objects_bottom




In [None]:
def save_treeshap_explanation(explainer_tree, exp, instance, model_name, list_type, project_id, fold):

    # Visualize and save
    filepath = f'{TREESHAP_DEST}{fold}/{list_type}/treeshap_exp_{project_id}_{model_name}.png'
    print(f"Saving treeshap exp for {filepath}")
    shap.force_plot(explainer_tree.expected_value[1], 
                exp[1],
                instance,
                show=False, 
                matplotlib=True, 
                text_rotation=45).savefig(filepath, format = "png", dpi = 150, bbox_inches = 'tight')
    return


def get_treeshap_feature_list(exp, feat_names, num_of_feat):
    
    # Create list of tuples
    shap_values = dict(zip(feat_names, exp[1][0]))
    sorted_shap_exp = sorted(shap_values.items(), key=lambda x:abs(x[1]), reverse=True)[:num_of_feat]

    #exp_list = exp[1].tolist()
    #exp_list_with_feature_names = [(feat_names[x], exp_list[x]) for x in range(len(exp_list))]
    #print(f'exp_list_with_feature_names = {exp_list_with_feature_names}')
    
    return sorted_shap_exp


# Function to generate explanations for a list of instances
def get_treeshap_for_list(instance_loc_list, x_test, explainer_tree, model_name, list_type, fold, num_of_feat):

    # Dictionary used to save the explanation objects
    exp_objects = {}
    exp_as_list_objects = {}
    
    for i, instance_loc in enumerate(instance_loc_list):
        # Select instance
        instance = x_test.iloc[[instance_loc]]
        # Find its Project ID
        project_id = instance["Project ID"].values[0]
        # Drop the Project ID value from the instance since its not a feature
        instance = instance.drop(["Project ID"], axis=1)
        # Get the explanation
        exp = explainer_tree.shap_values(instance)
        #print(f'exp[1] = {exp[1]}')
        # Append exp object to dictionary
        exp_objects[project_id] = exp
        # Find the feature names as a list
        feat_names = instance.columns.to_list()
        print(f'len(exp[1][0]) = {len(exp[1][0])}')
        print(f'len(feat_names) = {len(feat_names)}')
        print(f'exp[1][0] = {exp[1][0]}')
        print(f'feat_names top {num_of_feat} = {feat_names[:num_of_feat]}')
        
        # Get the explanation as a list of tuples and save
        exp_as_list = get_treeshap_feature_list(exp, feat_names, num_of_feat)
        exp_as_list_objects[project_id] = exp_as_list
        #print(exp_as_list)
        print(f'type(exp_as_list) = {type(exp_as_list)}')
        # Save the explanation as a figure
        save_treeshap_explanation(explainer_tree, exp, instance, model_name, list_type, project_id, fold)
        
    return exp_objects, exp_as_list_objects


def get_treeshap_explanation(x_train, x_test, top_instance_loc_list, bottom_instance_loc_list, model, model_name, fold, num_of_feat):
    
#     print(x_train.head())
    # Define the KernelSHAP explainer
#     explainer_tree = shap.TreeExplainer(model=model, data=x_train, model_output="raw")
    print(f"Treeshap explainer: Start training")
    explainer_tree = shap.TreeExplainer(model=model, feature_perturbation='interventional', data=x_train, model_output="raw")
    print(f"Treeshap explainer: Done training")
    
    filepath = f'{TREESHAP_DEST}{fold}/top/'
    try:
        os.makedirs(filepath, exist_ok = True)
        print("Directory '%s' created successfully" %filepath)
    except OSError as error:
        print("Directory '%s' can not be created")
        
    filepath = f'{TREESHAP_DEST}{fold}/bottom/'
    try:
        os.makedirs(filepath, exist_ok = True)
        print("Directory '%s' created successfully" %filepath)
    except OSError as error:
        print("Directory '%s' can not be created")
            
    print("Top list")          
    exp_objects_top, exp_as_list_objects_top = get_treeshap_for_list(top_instance_loc_list, x_test, explainer_tree, model_name, "top", fold, num_of_feat)
    
    print("Bottom list")
    exp_objects_bottom, exp_as_list_objects_bottom = get_treeshap_for_list(bottom_instance_loc_list, x_test, explainer_tree, model_name, "bottom", fold, num_of_feat)
    
    return exp_as_list_objects_top, exp_as_list_objects_bottom



In [None]:
def save_kernelshap_explanation(explainer_tree, exp, instance, model_name, list_type, project_id, fold):

    # Visualize and save
    filepath = f'{SHAP_DEST}{fold}/{list_type}/kernelshap_exp_{project_id}_{model_name}.png'
    print(f"Saving kernelshap exp for {filepath}")
    shap.force_plot(explainer_tree.expected_value[1], 
                exp[1],
                instance,
                show=False, 
                matplotlib=True, 
                text_rotation=45).savefig(filepath, format = "png", dpi = 150, bbox_inches = 'tight')
    return

def get_kernelshap_feature_list(exp, feat_names, num_of_feat):
    
    # Create list of tuples
    shap_values = dict(zip(feat_names, exp[1]))
    sorted_shap_exp = sorted(shap_values.items(), key=lambda x:abs(x[1]), reverse=True)[:num_of_feat]

    #exp_list = exp[1].tolist()
    #exp_list_with_feature_names = [(feat_names[x], exp_list[x]) for x in range(len(exp_list))]
    #print(f'exp_list_with_feature_names = {exp_list_with_feature_names}')
    
    return sorted_shap_exp


# Function to generate explanations for a list of instances
def get_kernelshap_for_list(instance_loc_list, x_test, explainer_tree, model_name, list_type, fold, num_of_feat, nsamples):

    # Dictionary used to save the explanation objects
    exp_objects = {}
    exp_as_list_objects = {}
    
    for i, instance_loc in enumerate(instance_loc_list):
        # Select instance
        instance = x_test.iloc[instance_loc]
        # Find its Project ID
        project_id = instance["Project ID"]
        # Drop the Project ID value from the instance since its not a feature
        instance = instance.drop(["Project ID"])
        
        # Get the explanation
        exp = explainer_tree.shap_values(instance, nsamples=nsamples) # nsamples can be either 'auto' or an int
        print(f'len(exp[1]) = {len(exp[1])}')
        print(f'exp[1] = {exp[1]}')
        # Find the feature names as a list
        feat_names = instance.keys().to_list()
        print(f'len(feat_names) = {len(feat_names)}')
        print(f'feat_names top entries = {feat_names[:num_of_feat]}')
        
        # Append exp object to dictionary
        exp_objects[project_id] = exp
        # Get the explanation as a list of tuples and save
        exp_as_list = get_kernelshap_feature_list(exp, feat_names, num_of_feat)
        exp_as_list_objects[project_id] = exp_as_list
        
        print(f'type(exp_as_list) = {type(exp_as_list)}')
        # Save the explanation as a figure
        save_kernelshap_explanation(explainer_tree, exp, instance, model_name, list_type, project_id, fold)
        
    return exp_objects, exp_as_list_objects



def get_kernelshap_explanation(x_train, x_test, top_instance_loc_list, bottom_instance_loc_list, model, model_name, fold, num_of_feat, nsamples):
    
    # Define the KernelSHAP explainer
    print("Kernel Explainer Loading ..... ")
    explainer_shap = shap.KernelExplainer(model=model.predict_proba, data=x_train)
    print("Kernel Explainer : Done training")
    
    filepath = f'{SHAP_DEST}{fold}/top/'
    
    try:
        os.makedirs(filepath, exist_ok = True)
        print("Directory '%s' created successfully" %filepath)
    except OSError as error:
        print("Directory '%s' can not be created")
        
    filepath = f'{SHAP_DEST}{fold}/bottom/'
    
    try:
        os.makedirs(filepath, exist_ok = True)
        print("Directory '%s' created successfully" %filepath)
    except OSError as error:
        print("Directory '%s' can not be created")
            
            
    print("Top list")    
    exp_objects_top, exp_as_list_objects_top = get_kernelshap_for_list(top_instance_loc_list, x_test, explainer_shap, model_name, "top", fold, num_of_feat, nsamples)   
    print("Bottom list")
    exp_objects_bottom, exp_as_list_objects_bottom = get_kernelshap_for_list(bottom_instance_loc_list, x_test, explainer_shap, model_name, "bottom", fold, num_of_feat, nsamples)

    return exp_as_list_objects_top, exp_as_list_objects_bottom

In [None]:
# Function used to create csv using explanations
def create_csv_for_exps(feat_names, lime_dict, treeshap_dict, kernelshap_dict, list_type, fold):
    # First create an empty df
    data = pd.DataFrame()
    # Create feature column
    data['features'] = feat_names
    # Create columns to store lime, treeshap and kernalshap values for each project in the dictionary
    projects_list = list(lime_dict.keys())
    print(f'Projects list = {projects_list}')
    for project in projects_list:
        # Create empty columns
        data[f'lime_{list_type}_{project}'] = "" 
        data[f'treeshap_{list_type}_{project}'] = ""
        data[f'kernelshap_{list_type}_{project}'] = ""

        # Get lime, treeshap and kernelshap lists of tuples
        lime_list = lime_dict[project]
        treeshap_list = treeshap_dict[project]
        kernelshap_list = kernelshap_dict[project]

        # Fill the necessary cells with data
        # For lime
        for item in lime_list:
            feature = item[0]
            value = item[1]
            data.loc[data['features'] == feature, f'lime_{list_type}_{project}'] = value
        # For treeshap
        for item in treeshap_list:
            feature = item[0]
            value = item[1]
            data.loc[data['features'] == feature, f'treeshap_{list_type}_{project}'] = value
        # For kernelshap
        for item in kernelshap_list:
            feature = item[0]
            value = item[1]
            data.loc[data['features'] == feature, f'kernelshap_{list_type}_{project}'] = value

    
    print(data.columns)
    # Save as csv
    data.to_csv(OUTPUT_DEST+f'all_explanations_{fold}_{list_type}.csv') #### MAKE SURE OUTPUT_DEST IS DEFINED
    
    return data

In [None]:
# New pipeline for running the explainations - call this from main
def explanations_pipeline(root, model_paths, train_paths, test_paths, pred_paths, model_name):
  """this pipeline will generate explanations for the given model paths

  Args:
      root (_type_): This is the root path of the model artifacts
      model_paths (_type_): these are the paths to the model artifacts
      train_paths (_type_): these are the paths to the train artifacts
      test_paths (_type_): these are the paths to the test artifacts
      pred_paths (_type_): these are the paths to the prediction artifacts
      model_name (str): this is the model name as a string for identification. ex: "random forest"
      
      all the artifacts should be a list of files and should be in the same order
  """
  assert len(train_paths) == len(test_paths), "There should be same number of train paths and test paths"
  assert len(test_paths) == len(pred_paths), "There should be same number of predictions paths and test paths"
  assert len(model_paths) == len(test_paths), "There should be same number of model_paths paths and test paths"

  num_of_explanations = 1 # Should be 50 #### CHANGE THE VALUE TO 50
  num_of_records = 20 # Should be 1000 #### CHANGE THE VALUE TO 1000
  rows = 1000 # Should be None #### CHANGE THE VALUE TO NONE

  # Dictionary to store all explanations
  all_exp_dict = {}

  # Loop to generate explanations for samples from each fold
  for i in range(len(train_paths)):
    print(f"Fold {i} .............................")
    x_train = pd.read_csv(os.path.join(root, train_paths[i]), nrows=rows)
    x_train_cleaned = x_train.drop(["Unnamed: 0", "Project ID", "Label"], axis=1)

    fold1 = pd.read_csv(os.path.join(root,test_paths[i]), nrows=rows)
    fold_pred =  pd.read_csv(os.path.join(root,  pred_paths[i]), nrows=rows)
    Fold1 = pd.concat([fold1, fold_pred["1"]],axis=1)
    Fold1 = Fold1.drop(["Unnamed: 0"],axis=1)
    Fold1_sort = Fold1.sort_values(["1"], ascending=False)
    #Fold1_sort.head()
    x_test_with_id = Fold1_sort.drop([ "Label", "1"],axis=1)

    # Select n samples each from top and bottom k records
    print(f"Sampling the top {num_of_explanations} and bottom {num_of_explanations}")
    top_instance_loc_list = random.sample(range(num_of_records), num_of_explanations)
    bottom_instance_loc_list = random.sample(range(x_test_with_id.shape[0]-num_of_records , x_test_with_id.shape[0]), num_of_explanations)
    print(f'top_instance_loc_list = {top_instance_loc_list}')
    print(f'bottom_instance_loc_list = {bottom_instance_loc_list}')
      
    # Define the number of features required to display after generating the explanations
    #num_of_feat = x_test_with_id.shape[1] - 1
    num_of_feat = 10 #### COMMENT THIS UNCOMMENT THE PREVIOUS LINE
    x_train_n_rows = 100 #### CHANGE IF NECESSARY

    print(f"Model {model_paths} is loading")

    # Load the saved model
    model = load_model(os.path.join(root, model_paths[i]))

    # Get explanations
    print(f"Explanation for Lime")
    lime_list_objects_top, lime_list_objects_bottom = get_lime_explanation(x_train_cleaned[:x_train_n_rows].astype("float64"), x_test_with_id, top_instance_loc_list, bottom_instance_loc_list, ["0", "1"], "classification", model, model_name, f"Fold{i}", num_of_feat)
    print(f"Explanation for Kernel Shap" )
    kernelshap_list_objects_top, kernelshap_list_objects_bottom = get_kernelshap_explanation(x_train_cleaned[:x_train_n_rows].astype("float64"), x_test_with_id, top_instance_loc_list, bottom_instance_loc_list, model, model_name, f"Fold{i}", num_of_feat, 100) 
    print(f"Explanation for Tree Shap")
    treeshap_list_objects_top, treeshap_list_objects_bottom = get_treeshap_explanation(x_train_cleaned[:x_train_n_rows].astype("float64"), x_test_with_id, top_instance_loc_list, bottom_instance_loc_list, model, model_name, f"Fold{i}", num_of_feat) 

    #print(lime_list_objects_top)
    #print(treeshap_list_objects_top)
    #print(kernelshap_list_objects_top)

    # Create dictionaries to store the explanations as json
    current_fold_dict = {
        'lime': {
            'top': lime_list_objects_top,
            'bottom': lime_list_objects_bottom
        },
        'treeshap': {
            'top': treeshap_list_objects_top,
            'bottom': treeshap_list_objects_bottom
        },
        'kernelshap': {
            'top': kernelshap_list_objects_top,
            'bottom': kernelshap_list_objects_bottom
        }
    }
    all_exp_dict[f"fold{i}"] = current_fold_dict

  # Get a list of all the features
  list_of_features = x_test_with_id.drop(["Project ID"],axis=1).columns.to_list()
  # Storing explanations as json
  save_json(all_exp_dict, OUTPUT_DEST+'all_exp.json') #### MAKE SURE OUTPUT_DEST IS DEFINED

  # Creating top and bottom csvs for each fold to summarize explanations
  for i in range(len(train_paths)):
      # For the top list
      top_df = create_csv_for_exps(list_of_features, all_exp_dict[f"fold{i}"]["lime"]["top"], all_exp_dict[f"fold{i}"]["treeshap"]["top"], all_exp_dict[f"fold{i}"]["kernelshap"]["top"], "top", f"fold{i}")
      # For the bottom list
      bottom_df = create_csv_for_exps(list_of_features, all_exp_dict[f"fold{i}"]["lime"]["bottom"], all_exp_dict[f"fold{i}"]["treeshap"]["bottom"], all_exp_dict[f"fold{i}"]["kernelshap"]["bottom"], "bottom", f"fold{i}")
    
  
  return

In [None]:
explanations_pipeline(root, models, train_files, test_files, test_pred, "random_forest")