# Import Libraries

In [1]:
import pandas as pd
import numpy as np
import pickle

from sklearn.model_selection import train_test_split
from sklearn.tree import DecisionTreeClassifier
from sklearn.metrics import accuracy_score
from sklearn.metrics import confusion_matrix

from matplotlib import pyplot as plt

import lime

import plotly.graph_objects as go

import shap

# API Functions

In [2]:
# reads csv file
'''
df = the data file path
sep = separator for dividing each column in the csv file
'''
def read_csv(df, sep = ","):
    return pd.read_csv(df, sep)

In [3]:
# converts the dataframe to csv file and save 
'''
df = the dataframe to convert
filepath = the filepath which will save the csv file
'''
def to_csv(df, filepath):
    df.to_csv(filepath, index=False)

In [4]:
# returns list of columns in the dataframe
'''
df = the dataframe to return columns it has
'''
def return_columns(df):
    return df.columns

In [5]:
# returns list of classes the dataset has
'''
target_column = the array of targets or the column which has classes in the dataframe
'''
def return_list_of_class(target_column):
    return list(np.unique(target_column))

In [6]:
# appends two dataframes
'''
df1, df2 = the dataframes to merge
'''
def merge_datasets(df1, df2):
    return pd.DataFrame(df1.append(df2))

In [7]:
# creates decision tree classifier
def create_decision_tree(criterion="gini", splitter="best"):
    model = DecisionTreeClassifier(criterion=criterion, splitter=splitter, random_state=1)
    return model

In [8]:
# train(fit)s decision tree
'''
train_X = the dataframe of training data but for features
train_y = the array of training data of targets
model = the model to train
'''
def fit_decision_tree(train_X, train_y, model):
    model.fit(train_X, train_y)

In [9]:
# loads(reads) saved ML model in binary mode
'''
file_to_open = the file to open to load ML models
'''
def load_ML_model(file_to_open):
    filehandler = open(file_to_open, 'rb') 
    return pickle.load(filehandler)

In [10]:
# saves ML model in binary mode
'''
file_name = the name of the file without any extensions
model = the model to save
'''
def save_ML_model(file_name, model):
    filehandler = open(file_name + '.obj', 'wb') 
    pickle.dump(model, filehandler)

In [11]:
# calculates the accuracy of the model
'''
testcases = the array of testing data used for evaluation
prediction_result = the array of prediction results made against testcases
'''
def calculate_accuracy(testcases, prediction_result):
    return round(accuracy_score(testcases, prediction_result) * 100.00, 2)

In [12]:
# trains a decision tree classifier based on agreed and disagreed data
'''
all_cases = the dataframe of all datapoints which two classifiers agreed and disagreed
features = the features of data
'''
def train_decision_tree_agreed_disagreed(all_cases, features):
    train_X = all_cases[features]
    train_y = all_cases.Agreed
    model = create_decision_tree()
    fit_decision_tree(train_X, train_y, model)
    return model

In [13]:
# evaluate smodel against the data
'''
model = the model to use for evaluation/testing
testcases = the dataframe of testing data for making predictions
'''
def generate_predictions(model, testcases):
    return model.predict(testcases)

In [14]:
# calculates the similarity between two models
'''
prediction_result1 = the array of prediction outputs made by classifier 1
prediction_result2 = the array of prediction outputs made by classifier 2
'''
def calculate_similarity(prediction_result1, prediction_result2):
    return round(accuracy_score(prediction_result1, prediction_result2) * 100.00, 2)

In [15]:
# returns the number of disagreed data points
'''
testcases = the array of testing data used for evaluating model
similarity = the similarity between two classifiers
'''
def number_of_disagreed_cases(testcases, similarity):
    wrong_proportion = 1 - (similarity / 100)
    return round(len(testcases) * wrong_proportion)

In [16]:
# returns set of datapoints both models agreed on
'''
testcases = the array of testing data used for evaluating model
prediction_result1 = the array of prediction outputs made by classifier 1
prediction_result2 = the array of prediction outputs made by classifier 2
'''
def return_agreed_cases(testcases, prediction_result1, prediction_result2):
    filters = []
    for i in range(len(testcases)):
        filters.append(prediction_result1[i] == prediction_result2[i])
    agreed = testcases[filters]
    agreed.loc[:,'Agreed'] = 1
    return agreed

In [17]:
# returns set of datapoints the models disagreed on
'''
testcases = the array of testing data used for evaluating model
prediction_result1 = the array of prediction outputs made by classifier 1
prediction_result2 = the array of prediction outputs made by classifier 2
'''
def return_disagreed_cases(testcases, prediction_result1, prediction_result2):
    filters = return_agreed_cases(testcases,prediction_result1, prediction_result2)
    # filters out 'agreed' data
    disagreed = testcases.loc[testcases.index.difference(filters.index)]
    disagreed.loc[:,'Agreed'] = 0
    return disagreed

In [18]:
# outputs the feature importance of tree-based models
'''
fittedmodel = the trained tree-based model
features = the list of features of the fittedmodel
'''
def print_feature_importance(fittedmodel, features):
    feature_importance = fittedmodel.feature_importances_
    feature_importance_output = pd.DataFrame(feature_importance, features)
    feature_importance_output.set_axis(['Output'], axis=1, inplace=True)
    print(feature_importance_output)
    ax = feature_importance_output.plot.barh(figsize=(9,5))

In [19]:
# outputs the feature importance of linear models
'''
fittedmodel = the trained linear model
features = the list of features of the fittedmodel
'''
def print_feature_importance_regression(fittedmodel, features):
    importance = fittedmodel.coef_
    importance = importance.reshape(-1,1)
    df = pd.DataFrame(importance, features).round()
    df.set_axis(['Feature coefficient'], axis=1, inplace=True)
    print(df)
    ax = df.plot.barh(figsize=(9,5))
    plt.show()

In [20]:
# puts feature importance of tree-based models together to compare and outputs a horizontal bar plot
'''
model1, model2 = the tree-based ML models to compare
features = the list of features of the models
name1, name2 = the name of each model to denote on the bar plot
'''
def feature_importance_comparison(model1, model2, features, name1, name2):
    feature_importance1 = model1.feature_importances_
    feature_importance2 = model2.feature_importances_
    feature_importance1_output = pd.DataFrame(feature_importance1, features)
    feature_importance2_output = pd.DataFrame(feature_importance2, features)
    concat_df = pd.concat([feature_importance1_output, feature_importance2_output],axis=1)
    concat_df.set_axis([name1,name2], axis=1, inplace=True)
    ax=concat_df.plot.barh(figsize=(9,7))
    plt.show()    

In [21]:
# puts feature importance of linear models together to compare and outputs a horizontal bar plot
'''
model1, model2 = the linear ML models to compare
features = the list of features of the models
name1, name2 = the name of each model to denote on the bar plot
'''
def feature_importance_comparison_regression(model1, model2, features, name1, name2):
    feature_importance1 = (model1.coef_).reshape(-1,1)
    feature_importance2 = (model2.coef_).reshape(-1,1)
    feature_importance1_output = pd.DataFrame(feature_importance1, features)
    feature_importance2_output = pd.DataFrame(feature_importance2, features)
    concat_df = pd.concat([feature_importance1_output, feature_importance2_output],axis=1)
    concat_df.set_axis([name1,name2], axis=1, inplace=True)
    ax=concat_df.plot.barh(figsize=(9,7))
    plt.show()    

In [22]:
# outputs feature importance of model trained on disagreed data points
'''
features = the list of features of the models
agreed_cases = the dataframe of data points both classifier agreed on
disagreed_cases = the dataframe of data points the models disagreed on
'''
def feature_importance_for_disagreement(features, agreed_cases, disagreed_cases):
    all_cases = pd.DataFrame(agreed_cases.append(disagreed_cases))
    X_train = all_cases[features]
    y_train = all_cases['Agreed']
    model = create_decision_tree()
    fit_decision_tree(X_train, y_train, model)
    print_feature_importance(model, features)

In [23]:
# outputs LIME for disagreement between two binary classifiers
'''
train1, train2 = the dataframes used for training model1 and model2
all_disagreed_testcases = the dataframe of all datapoints which classifiers disagreed on
model1, model2 = the models to compare
features = the list of features of the models
classes = the classes of the models
index = the index of instance to examine
'''
def disagreed_LIME(train1, train2, all_disagreed_testcases, model1, model2, features, classes, index = None):
    train1_numpy = train1.to_numpy()
    train2_numpy = train2.to_numpy()
    all_disagreed_testcases_numpy = all_disagreed_testcases[features].to_numpy()

    explainer = lime.lime_tabular.LimeTabularExplainer(train1_numpy,
                                                       feature_names=features,
                                                       class_names=classes)

    # if sample not selected, randomly select one
    if index == None:
        index = np.random.randint(0, len(all_disagreed_testcases_numpy))
    disagreed_case = all_disagreed_testcases_numpy[index]

    exp1 = explainer.explain_instance(disagreed_case, model1.predict_proba, num_features=len(features))
    exp1_map = exp1.as_map()
    exp1.show_in_notebook()

    explainer = lime.lime_tabular.LimeTabularExplainer(train2_numpy,
                                                       feature_names=features,
                                                       class_names=classes,
                                                       discretize_continuous=True)

    exp2 = explainer.explain_instance(disagreed_case, model2.predict_proba, num_features=len(features))
    exp2_map = exp2.as_map()
    exp2.show_in_notebook()
    
    # the class the model predicted
    index1= model1.predict(disagreed_case.reshape(1,-1))[0]
    index2= model2.predict(disagreed_case.reshape(1,-1))[0]
    
    # use the class index to select the final results
    exp1_result = classes[index1]
    exp2_result = classes[index2]
    
    return exp1_map[1], exp2_map[1], disagreed_case, exp1_result, exp2_result

In [24]:
# outputs LIME for disagreement between two multiclass classifiers
'''
train1, train2 = the dataframes used for training model1 and model2
all_disagreed_testcases = the dataframe of all datapoints which classifiers disagreed on
model1, model2 = the models to compare
features = the list of features of the models
classes = the classes of the models
index = the index of instance to examine
'''
def disagreed_LIME_multi(train1, train2, all_disagreed_testcases, model1, model2, features, classes, index = None):
    train1_numpy = train1.to_numpy()
    train2_numpy = train2.to_numpy()
    all_disagreed_testcases_numpy = all_disagreed_testcases[features].to_numpy()

    explainer = lime.lime_tabular.LimeTabularExplainer(train1_numpy,
                                                       feature_names=features,
                                                       class_names=classes)

    if index == None:
        index = np.random.randint(0, len(all_disagreed_testcases_numpy))
    disagreed_case = all_disagreed_testcases_numpy[index]
    
    # substract min from each index as the index of class should start from 0
    index1= int(model1.predict(disagreed_case.reshape(1,-1))[0]) - min(classes)
    index2= int(model2.predict(disagreed_case.reshape(1,-1))[0]) - min(classes)

    exp1 = explainer.explain_instance(disagreed_case, model1.predict_proba, num_features=len(features), labels=[index1])
    exp1_map = exp1.as_map()
    exp1.show_in_notebook()

    explainer = lime.lime_tabular.LimeTabularExplainer(train2_numpy,
                                                       feature_names=features,
                                                       class_names=classes,
                                                       discretize_continuous=True)

    exp2 = explainer.explain_instance(disagreed_case, model2.predict_proba, num_features=len(features), labels=[index2])
    exp2_map = exp2.as_map()
    exp2.show_in_notebook()
    
    exp1_result = classes[index1]
    exp2_result = classes[index2]
    
    return exp1_map[index1], exp2_map[index2], disagreed_case, exp1_result, exp2_result

In [25]:
# outputs LIME for regression
'''
train_set = the dataframes used for training regression model
test_set = the dataframe for evaluating the performance of regressor
model = the regression model to utilise
features = the list of features of the model
class_name = the name of the output column
index = the index of instance to examine
'''
def regression_LIME(train_set, test_set, model, features, class_name, index = None):
    explainer = lime.lime_tabular.LimeTabularExplainer(train_set.to_numpy(),
                                                       feature_names=features,
                                                       class_names=class_name,
                                                       verbose=True,
                                                       mode='regression')
    test_set_numpy = test_set.to_numpy()
    if index == None:
        index = np.random.randint(0, len(test_set_numpy))
    exp = explainer.explain_instance(test_set_numpy[index], model.predict, num_features=len(features))
    exp.show_in_notebook()

In [26]:
# puts LIME bar plots together
'''
features = the list of features of the models
exp1_map, exp2_map = the dictionary type of explanations
exp1_result, exp2_result = the predicted classes
'''
def LIME_compare_bar_plot(features, exp1_map, exp2_map, exp1_result, exp2_result):
    # sort outputs in alphabetical order of feature names
    exp1_map.sort()
    exp2_map.sort()
    
    # convert dictionary type output to dataframe for plotting
    exp1_map_df = pd.DataFrame(exp1_map)
    exp2_map_df = pd.DataFrame(exp2_map)
    
    # append column of feature names
    exp1_map_df[0] = features
    exp2_map_df[0] = features
    
    # merge outputs into one to plot
    merged_df= pd.merge(exp1_map_df, exp2_map_df, on=0)
    
    ax = merged_df.plot.barh(figsize=(9,7))
    plt.yticks(np.arange(len(features)),features)
    plt.legend([exp1_result, exp2_result])
    plt.show()

In [27]:
# draws sankey diagram of two classifiers
'''
title = the title of the sankey diagram 
classes = the list of classes which classifiers can predict
pred1, pred2 = the prediction results made by classifier 1 and classifier 2, respectively
ground_truth = the actual values
'''
def draw_sankey_diagram(title, classes, pred1, pred2, ground_truth):
    number_of_classes = len(classes)

    # generate labels
    classes_labels = list()
    for i in range(max(classes), min(classes) - 1, -1):
        classes_labels.append("Class " + str(i))
    
    labels = classes_labels * 3
    
    # generate confusion matrix for each model
    confusion_matrix1 = confusion_matrix(ground_truth, pred1, labels=classes)
    confusion_matrix2 = confusion_matrix(ground_truth, pred2, labels=classes)
    
    targets = list()
    for i in range(number_of_classes * 2 - 1, number_of_classes * 2 - number_of_classes - 1, -1):
        for j in range(1, 1 + number_of_classes):
            targets.append(i)
    
    start = targets.copy()
    
    for i in range(number_of_classes):
        for j in range(3 * number_of_classes - 1, 2 * number_of_classes -1, -1):
            targets.append(j)

    sources = list()
    for i in range(number_of_classes):
        for j in range(number_of_classes - 1, -1, -1):
            sources.append(j)

    sources += start
    
    # generate values (puts the values from each confusion matrix together)
    values = list(confusion_matrix1.ravel()) + list(confusion_matrix2.ravel())
    
    my_colors = [('rgba('+str(np.random.randint(0, high = 256))+','+
                str(np.random.randint(0, high = 256))+','+
                str(np.random.randint(0, high = 256))) for i in range(len(classes_labels))]
    my_colors_node = []
    my_colors_opac = []

    for rgba in my_colors:
        my_colors_node.append(rgba + ',0.8)')
        my_colors_opac.append(rgba + ',0.4)')
    my_colors_opac = my_colors_opac * number_of_classes
    
    fig = go.Figure(data=[go.Sankey(
    node = dict(
        pad = 15,
        thickness = 20,
        line = dict(color = "black", width = 0.5),    
        label = labels,
        color = my_colors_node * 3
    ),
    link = dict(
        source = sources,
        target = targets,
        value = values,
        color = my_colors_opac[::-1] + my_colors_opac[::-1]
    ))])

    fig.update_layout(title_text=title, font_size=12)
    fig.show()

In [28]:
# creates linear SHAP explainer
'''
model = the model to explain
train_data = the training data used for training the model
sample = the number of samples to select
'''
def create_LinearExplainer(model, train_data, sample = None):
    if sample != None:
        train_data = shap.sample(train_data, sample)
    return shap.LinearExplainer(model, train_data)

In [29]:
# creates Kernel SHAP explainer for classification
'''
model = the model to explain
train_data = the training data used for training the model
sample = the number of samples to select
'''
def create_Classification_KernelExplainer(model, train_data, sample = None):
    if sample != None:
        train_data = shap.sample(train_data, sample)
    return shap.KernelExplainer(model.predict_proba, train_data)

In [30]:
# creates Kernel SHAP explainer for regression
'''
model = the model to explain
train_data = the training data used for training the model
sample = the number of samples to select
'''
def create_Regression_KernelExplainer(model, train_data, sample = None):
    if sample != None:
        train_data = shap.sample(train_data, sample)
    return shap.KernelExplainer(model.predict, train_data)

In [31]:
# creates tree SHAP explainer
'''
model = the model to explain
train_data = the training data used for training the model
sample = the number of samples to select
'''
def create_TreeExplainer(model, train_data, sample = None):
    if sample != None:
        train_data = shap.sample(train_data, sample)
    return shap.TreeExplainer(model, train_data)

In [32]:
# samples data for SHAP
'''
data = the input dataset
features = the list of names of the features in the dataset
sample = the number of samples to select
'''
def SHAP_sample_data(data, features, sample = None):
    if sample == None:
        sample = len(data) / 5
    return shap.sample(data[features], sample)

In [33]:
# prints the information of SHAP such as SHAP values and the prediction without visualisations
'''
shap_explainer = the SHAP explainer for explaining instances
model = the model to explain
test_data = the dataframe of all testing data
index = the index of data to examine
sample = the number of samples to select
'''
def print_SHAP_info(shap_explainer, model, test_data, index, sample = None):
    
    if sample != None:
        test_data = shap.sample(test_data, sample)    
    
    shap_values = shap_explainer.shap_values(test_data)
    
    # checks if the list of SHAP values is wrapped in extra list
    if len(shap_values) == 1 and len(test_data) != 1:
        shap_values = shap_values[0]

    print("Base Value : ", shap_explainer.expected_value)
    print()
    print("Shap Values for Sample %d: " %(index), shap_values[index])
    print("\n")
    print("Prediction From Model                            : ", model.predict((test_data.iloc[index]).to_numpy().reshape(1,-1))[0])
    print("Input instance                                  : ", test_data.iloc[index])
    

In [34]:
# outputs feature importance of a model using summary plot
'''
shap_explainer = the SHAP explainer for explaining instances
train_data = the dataframe used for training the model
features = the list of names of the features in the data
classes = the list of names of the classes in the data
sample = the number of samples to select
'''
def shap_feature_importance(shap_explainer, train_data, features, classes, sample = None):
    shap.initjs()
    if sample != None:
        train_data = shap.sample(train_data, sample)
    shap_values = shap_explainer.shap_values(train_data)
    return shap.summary_plot(shap_values, train_data, feature_names=features, plot_type="bar", class_names=classes)

In [35]:
# outputs global explanation using force plot for classification problems
'''
shap_explainer = the SHAP explainer for explaining instances
test_data = the dataframe for evaluating the performance of the model
class_index = the index of class to look into
sample = the number of samples to select
'''

def global_classification_force_plot(shap_explainer, test_data, class_index, sample=None):
    shap.initjs()
    
    if sample != None:
        test_data = shap.sample(test_data, sample)
    
    base_value = shap_explainer.expected_value
    shap_values = shap_explainer.shap_values(test_data)
    
    return shap.force_plot(base_value[class_index], shap_values[class_index], test_data)

In [36]:
# output global explanation of model using force plot for regression problems
'''
shap_explainer = the SHAP explainer for explaining instances
test_data = the dataframe for evaluating the performance of the model
sample = the number of samples to select
'''
def global_regression_force_plot(shap_explainer, test_data, sample = None):
    shap.initjs()

    base_value = shap_explainer.expected_value
    
    if sample != None:
        test_data = shap.sample(test_data, sample)
    
    shap_values = shap_explainer.shap_values(test_data)
    
    # checks if the list of SHAP values is not wrapped in extra list
    # if it is, return the first element - shap_values[0]
    if(len(shap_values) == len(test_data)):
        return shap.force_plot(base_value, shap_values, test_data)
    return shap.force_plot(base_value, shap_values[0], test_data)

In [37]:
# outputs local explanation using waterfall plot for classification problems
'''
shap_explainer = the SHAP explainer for explaining instances
data = the data for evaluating the performance of the model
class_index = the index of class to look into
sample_index = the index of sample to examine
sample = the number of samples to select
'''
def classification_waterfall_plot(shap_explainer, data, class_index, sample_index = None, sample = None):
    shap.initjs()
    
    if sample != None:
        data = shap.sample(data, sample)

    shap_values = shap_explainer.shap_values(data)
    
    if sample_index == None:
        sample_index = np.random.randint(0, len(data))
    
    return shap.waterfall_plot(shap.Explanation(values=shap_values[class_index][sample_index], 
                                     base_values=shap_explainer.expected_value[class_index],
                                     data=data.iloc[sample_index]))

In [38]:
# outputs local explanation using waterfall plot for regression problems
'''
shap_explainer = the SHAP explainer for explaining instances
test_data = the dataframe for evaluating the performance of the model
sample_index = the index of sample to examine
sample = the number of samples to select
'''
def regression_waterfall_plot(shap_explainer, test_data, sample_index = None, sample = None):
    shap.initjs()
    
    if sample != None:
        test_data = shap.sample(test_data, sample)

    shap_values = shap_explainer(test_data)
    
    if sample_index == None:
        sample_index = np.random.randint(0, len(test_data))
    
    return shap.plots.waterfall(shap_values[sample_index])

In [39]:
# outputs local explanation using waterfall plot for any problems which used KernelExplainer
'''
shap_explainer = the SHAP explainer for explaining instances
test_data = the dataframe for evaluating the performance of the model
sample_index = the index of sample to examine
sample = the number of samples to select
'''
def kernel_waterfall_plot(shap_explainer, test_data, sample_index = None, sample = None):
    shap.initjs()
    
    if sample != None:
        test_data = shap.sample(test_data, sample)
    
    shap_values = shap_explainer.shap_values(test_data)
    
    if sample_index == None:
        sample_index = np.random.randint(0, len(test_data))
    
    return shap.plots._waterfall.waterfall_legacy(shap_explainer.expected_value[0],
                                                  shap_values[0][sample_index],
                                                  feature_names = test_data.columns)