# ROAD Development Notebook

In [1]:
import os
print(os.cpu_count())

10


In [2]:
# Author: Eddie Perez Claudio
# The goal of this script is to evaluate the performance of XAI methods on the simulated datasets using the ROAD method.

## Import Libraries ##

#XAI
import shap

#Imputation
import miceforest as mf

# Data wrangling
import itertools
import pandas as pd
import joblib
from sklearn.model_selection import train_test_split



## Libraries ##
#General
import numpy as np
# import pandas as pd
import copy



#Model Evalutation
from sklearn.metrics import accuracy_score, balanced_accuracy_score, f1_score, roc_auc_score, average_precision_score, recall_score
import shap
import shap.benchmark

# multiprocessing
from multiprocess.pool import Pool
import os

## SHAP Benchmark (Remove and Debias)

In [3]:
#### Remove and Retrain ####
# This is a version of ROAD (as described in Rong et al 2022) 
# which works with any scikit-learn model and tabular data (pandas dataframes)
#### Code ####

## Explanation Function ##
# Trains a model, builds an explanation with KernelShap, and then
# outputs the ranking of each feature in descending order.
# Arguments:
# clf : A trained ML model with a .predict method
# X: Training data as pandas dataframe
# x:  Test data as pandas dataframe
# explainer: any explainer that follows the shap api format 
def explain(clf, X, x, explainer = shap.explainers.Permutation):
    X = X.to_numpy()
    x = x.to_numpy()

    if explainer == shap.KernelExplainer:
        try:
                    # #Build Explanation
            explanation = explainer(clf.predict_proba, shap.kmeans(X, 4))
            shap_values = np.abs(explanation.shap_values(x)[1]).mean(0)
            return shap_values

        except:
        # #Build Explanation
            explanation = explainer(clf.predict, shap.kmeans(X, 4))
            shap_values = np.abs(explanation.shap_values(x)).mean(0)
            return shap_values
   
    elif explainer == shap.explainers.Tree:
        # #Build Explanation
        explanation = explainer(clf, X)
        shap_values = explanation(x).values
        if len(shap_values.shape) == 2:
            shap_values = np.abs(shap_values).mean(0)
        else:
            shap_values = np.abs(shap_values[...,1]).mean(0)
        return shap_values
    
    elif explainer == shap.explainers.Linear:
        # #Build Explanation
        explanation = explainer(clf, X)
        shap_values = np.abs(explanation(x).values).mean(0)
        return shap_values
    
    else:
        try:
                    # #Build Explanation
            explanation = explainer(clf.predict_proba, X)
            shap_values = np.abs(explanation(x).values[...,1]).mean(0)
            return shap_values
        
        except:
        # #Build Explanation
            explanation = explainer(clf.predict, X)
            shap_values = np.abs(explanation(x).values).mean(0)
            return shap_values




## Ranking Function ##
# Outputs the ranking of each feature in descending order.
# Arguments:
# shap_values: the output of any shap explainer
def ranker(shap_values):
    values = copy.deepcopy(shap_values)     
    #Get ranks
    ranks = np.argsort(values)
    return ranks


## Metrics function ##
# Utility function which outputs a series of metrics to evaluate
# Currently gets accuracy, balanced_accuracy, and f-score

#Arguments:
#clf: A trained ML model with a predict method
#x: a pd.DataFrame of test data
#y: Target of the test data
def metrics(clf, x, y):   
    #Predict
    # print(len(x))
    yhat = clf.predict(x.to_numpy())
    try:
        yscore = clf.predict_proba(x.to_numpy())[:, 1]
    except:
        yscore = clf.decision_function(x.to_numpy())
    #Get metrics
    accu = accuracy_score(y, yhat)
    accu_balanced = balanced_accuracy_score(y, yhat)
    f1 = f1_score(y, yhat)
    auroc = roc_auc_score(y, yscore)
    auprc = average_precision_score(y, yscore)
    recall = recall_score(y, yhat)
        
    return np.array([[accu], [accu_balanced], [f1], [auroc], [auprc], [recall]])



## Imputation Function assistant ##

def impute(i,k, rankings, x_test):
       #Add NA values
        x_test = mf.ampute_data(x_test, variables= x_test.columns[rankings[i:k]].to_list(), perc = 0.8, random_state=42)
        #impute
        variables = x_test.columns.to_list()
        index = rankings.copy()[i:k].tolist().pop()
        kds = mf.ImputationKernel(
        x_test,
        variable_schema={variables.pop(index) : variables},
        save_all_iterations=True,
        random_state=1991
        )
        # Run the MICE algorithm for 2 iterations
        kds.mice(2)
        # Return the completed dataset.
        x_test = kds.complete_data()

        return x_test


## Mask Features ## 
#These functions mask data using imputation and retrain the ML model
#They mask from the top %, bottom %, or random.

# Arguments:
# t: percentage or number of features to be masked in each iteration
# rankings: a list of rankings (descending rankings) to guide the removal
# X, x: dataframes of the data from which we will remove the features. Training set and testing set, respectively
# Y, y: Targets for the training and testing sets, respectively
#clf: Model to retrain
# base: metrics of the full model. Used to compare the performance of the masked model
# direction: direction of masking. Can be 'top', 'bottom', or 'random'
# seed: seed for random number generator

#Mask
def mask(t, rankings, X, Y, x, y, clf, base = np.empty((3,1)), direction = 'top'):
    #Make copies of our data to modify
    X_train = copy.deepcopy(X)
    x_test = copy.deepcopy(x)
    results = copy.deepcopy(base)

    ## Directional Masking ##
    #Top
    if direction == 'top':
        #Set masking schedule and iterator
        if type(t) != int:
            j = int(np.round(len(rankings)*t))
            i = len(rankings) - j
            k = len(rankings)
        else: 
            j = t
            i = len(rankings) - t
            k = len(rankings)

        #Impute and Predict
        while k >= j:
            #Impute
            x_test = impute(i,k, rankings, x_test)
            #Predict
            results =  np.hstack((results, metrics(clf, x_test, y)))
            
            #Move iterator forward
            i -= j
            k -= j

        return results 
        
    #Bottom
    elif direction == 'bottom':   
        #Set masking schedule
        if type(t) != int:
            j = int(np.round(len(rankings)*t))
            i = 0
            k = j
        else:
            j = t
            i = 0
            k = j

        #Impute and Predict
        while k <= len(rankings):
            #Impute
            x_test = impute(i,k, rankings, x_test)
            #Predict
            results =  np.hstack((results, metrics(clf, x_test, y)))
            
            #Move iterator forward
            i += j
            k += j
        return results
    
    #Random
    elif direction == 'random':
        np.random.seed(42)
        random_choices = np.random.permutation(rankings)
        #Set masking schedule
        if type(t) != int:
            j = int(np.round(len(rankings)*t))
            i = 0
            k = j
        else:
            j = t
            i = 0
            k = j

        #Impute and Predict
        while k <= len(rankings):
            #Impute
            x_test = impute(i,k, random_choices, x_test)
            #Predict
            results =  np.hstack((results, metrics(clf, x_test, y)))
            
            #Move iterator forward
            i += j
            k += j
        return results
   

    


    
## ROAD ##
# The main function of the library.
#Wraps all other functions in a nice pipeline which is easy to use.
#Accepts any scikit-learn model. It was built and tested using a binary target.

# Arguments:
# model : the model to be re-trained
# t: percentage of features to be removed in each iteration
# X: Training data as pandas dataframe
# Y: Target values for training (This was build using a binary target)
# x:  Test data as pandas dataframe
# y: Target values for testing
# explainer: any explainer which built with the shap api
# repeats: how many times to explain and do the whole retraining

#outputs accuracy, balanced_accuracy, f1_score, and ranks for each iteration. 
def road(X, Y, x, y, model, explainer = None, t = 0.10, shap_values = None, repeats = 10):
    #Initialize variables
    base = metrics(model, x, y)
    if explainer != None:
        values = explain(model, X, x, explainer)
        ranks = ranker(values)
       
    elif shap_values != None:
        values = shap_values
        ranks = ranker(values)
    else:
        print('Must supply either an explainer or shap_values')
        return
    
    top = mask(t, ranks, X, Y, x, y, model, base, direction='top')
   
    bottom = mask(t, ranks, X, Y, x, y, model, base, direction='bottom')
  
    random = mask(t, ranks, X, Y, x, y, model, base, direction='random')

    

    #Set progress bar
    # Repeat x times
    for i in range(repeats-1):
                #Initialize
        if explainer != None:
            iter_values = explain(model, X, x, explainer)
            ranks = ranker(iter_values)
            values = np.dstack((values, iter_values))
        elif shap_values != None:
            ranks = ranker(values)
        else:
            print('Must supply either an explainer or shap_values')
            return
        top = np.dstack((top, mask(t, ranks, X, Y, x, y, model, base, direction='top')))
        bottom = np.dstack((bottom, mask(t, ranks, X, Y, x, y, model, base, direction='bottom')))
        random = np.dstack((random, mask(t, ranks, X, Y, x, y, model, base, direction='random')))
    return [top, bottom, random, values]


## Faux Model

In [5]:
## Build a faux model for the data
## This will only work for ShapSampling and ShapKernel, or ShapPermutation

class faux_model:
  def __init__(self):
      self.name = 'FauxModel'

  @staticmethod
  def predict(x):
    if str(type(x)).__contains__('pandas'):
      x = x.to_numpy()
    
    return x[:,0:3].prod(axis = 1)
  
  def predict_proba(self, x):
    yhat = self.predict(x)
    return np.array([np.where(yhat == 0, 1, 0),yhat]).T


# y = data['3_vars_corr_2HC_n10000.csv']['y_test']
# x = data['3_vars_corr_2HC_n10000.csv']['X_test']

## Model Eval

### Imports

In [6]:
### Import Data ###
# Simulated Datasets

# Path to Simulated Datasets

dir_path = '/Users/eddie/Library/CloudStorage/OneDrive-UniversityofPittsburgh/Research/Projects/Explainability Method Comparison/Data-ML-XAI-Eval/Synthetic Data/Data Files'

# Lists to store files name
res_ = []
res = []
for (dir_path, dir_names, file_names) in os.walk(dir_path):
    res_.extend(file_names)
for file in res_:
    if file not in ['GroundTruth.csv', '.Rhistory', '.DS_Store']:
        res.append(file)

print(res)

data = {}
for file in res:
    data_path = f'{dir_path}/{file}'
    df = pd.read_csv(data_path)
    train, test = train_test_split(df, test_size=0.25, random_state=42)
    data[file] = {
        'X_train': train.drop('Target', axis=1),
        'y_train': train.Target,
        'X_test': test.drop('Target', axis=1),
        'y_test': test.Target
    }


### Import Models ###
# Import Models
names = ['PassiveAgressive', 'SGDClassifier', 'RandomForest', 'Perceptron', 'RidgeClassifier', 'LogisticRegression', 'DecisionTree', 'XGBoost', 'GaussianNB', 'FauxModel']
file_models = {}
for file in res:
    models = {}
    for model in names:
        if model == 'FauxModel':
            models[model] = faux_model()
        else:
            models[model] = joblib.load(
                f'/Users/eddie/Library/CloudStorage/OneDrive-UniversityofPittsburgh/Research/Projects/Explainability Method Comparison/Data-ML-XAI-Eval/Model Training/Models/{file}_{model}.joblib')
    file_models[file] = models

### Model types ###
# Linear Models
linear = ['PassiveAgressive', 'SGDClassifier', 'Perceptron', 'RidgeClassifier', 'LogisticRegression']
# Tree Models
tree = ['RandomForest', 'DecisionTree', 'XGBoost']
# Other
other = ['MultinomialNB', 'GaussianNB', 'FauxModel']

### Set Explainers ###
# Explainers of Interest
xai = {
    'ShapLinear': shap.explainers.Linear,
    'ShapTree': shap.explainers.Tree,
    'ShapPermutation': shap.explainers.Permutation,
    'ShapSampling': shap.explainers.Sampling,
    'ShapExact' : shap.explainers.Exact,
    'ShapKernel': shap.explainers._kernel.Kernel
}

### Define Parallel Function ###
def parallel_road(file, model_name, exp):

    #Check if exists
    if os.path.isfile(f'/content/drive/MyDrive/XAI method performacne when Explainaing the PORT Dataset/Results/Evaluations/ROAD/SimTrack/Sampling Redo/{file}_{model_name}_{exp}.joblib'):
      return

    df = data[file]
    models = file_models[file]
    model = models[model_name]
    save_path = '/Users/eddie/Library/CloudStorage/OneDrive-UniversityofPittsburgh/Research/Projects/Explainability Method Comparison/Data-ML-XAI-Eval/Model Explanation/ROAD Output'
    # res_ = {}
    # evaluate linear exps
    if (model_name in linear) and (exp in ['ShapSampling', 'ShapLinear', 'ShapPermutation']):
            res_ = road(df['X_train'], df['y_train'], df['X_test'],
                                  df['y_test'], model=model, t=1,  explainer=xai[exp])
            joblib.dump(res_, f'{save_path}/{file}_{model_name}_{exp}.joblib', compress=3)

    # eval tree exps
    elif (model_name in tree) and (exp in ['ShapSampling', 'ShapTree', 'ShapPermutation']):
            res_ = road(df['X_train'], df['y_train'], df['X_test'],
                                  df['y_test'], model=model, t=1,  explainer=xai[exp])
            joblib.dump(res_, f'{save_path}/{file}_{model_name}_{exp}.joblib', compress=3)

    # ev al agnostic exps
    elif (exp in ['ShapSampling', 'ShapPermutation', 'ShapExact', 'ShapKernel']):
            res_ = road(df['X_train'], df['y_train'], df['X_test'],
                                  df['y_test'], model=model, t=1,  explainer=xai[exp])
            joblib.dump(res_, f'{save_path}/{file}_{model_name}_{exp}.joblib', compress=3)
    
    print(f'{file}_{model_name}_{exp} done')
    #joblib.dump(res_, f'/content/drive/MyDrive/XAI method performacne when Explainaing the PORT Dataset/Results/Evaluations/ROAD/SimOptim/{file}_{model_name}_optimized.joblib', compress=3)
    return

print('built')

['0_vars_corr_0HC_n25000.csv', '0_vars_corr_0HC_n10000_skew_0.5194.csv', '0_vars_corr_0HC_n100.csv', '0_vars_corr_0HC_n10000_skew_0.7354.csv', '3_vars_corr_1HC_n10000.csv', '0_vars_corr_0HC_n10000_skew_0.8612.csv', '0_vars_corr_0HC_n15000.csv', '2_vars_corr_1HC_n10000.csv', '0_vars_corr_0HC_n10000_skew_0.9694.csv', '0_vars_corr_0HC_n10000_skew_0.9699.csv', '0_vars_corr_0HC_n10000_skew_0.7298.csv', '0_vars_corr_0HC_n10000_skew_0.6145.csv', '2_vars_corr_2HC_n10000.csv', '0_vars_corr_0HC_n10000.csv', '0_vars_corr_0HC_n10000_skew_0.8608.csv', '0_vars_corr_0HC_n5000.csv', '0_vars_corr_0HC_n10000_skew_0.508.csv', '0_vars_corr_0HC_n10000_skew_0.6222.csv', '3_vars_corr_2HC_n10000B.csv', '0_vars_corr_0HC_n1000.csv', '0_vars_corr_0HC_n30000.csv', '3_vars_corr_2HC_n10000.csv', '0_vars_corr_0HC_n20000.csv']


pandas.Int64Index is deprecated and will be removed from pandas in a future version. Use pandas.Index with the appropriate dtype instead.


built


In [7]:
ml = ['PassiveAgressive', 'SGDClassifier', 'RandomForest', 'Perceptron', 'RidgeClassifier', 'LogisticRegression', 'DecisionTree', 'XGBoost', 'GaussianNB', 'FauxModel']
exp = list(xai.keys())

print(len(exp)*len(res_)*len(ml))

1440


### Run

In [8]:
## Paralelization ###
if __name__ == '__main__':
  pool = Pool(5)
  results = pool.starmap_async(parallel_road, itertools.product(res_, ml, exp), chunksize=1)
  results.get()

0_vars_corr_0HC_n25000.csv_PassiveAgressive_ShapTree done


Using 18750 background data samples could cause slower run times. Consider using shap.sample(data, K) or shap.kmeans(data, K) to summarize the background as K samples.


0_vars_corr_0HC_n25000.csv_SGDClassifier_ShapLinear done
0_vars_corr_0HC_n25000.csv_SGDClassifier_ShapTree done
0_vars_corr_0HC_n25000.csv_PassiveAgressive_ShapLinear done


  0%|          | 0/6250 [00:00<?, ?it/s]

0_vars_corr_0HC_n25000.csv_PassiveAgressive_ShapExact done
0_vars_corr_0HC_n25000.csv_PassiveAgressive_ShapPermutation done


Using 18750 background data samples could cause slower run times. Consider using shap.sample(data, K) or shap.kmeans(data, K) to summarize the background as K samples.


0_vars_corr_0HC_n25000.csv_RandomForest_ShapLinear done




KeyboardInterrupt: 



: 