In [32]:
import warnings
warnings.filterwarnings('ignore')

In [1]:
from typing import *

import numpy as np
np.set_printoptions(suppress=True)
import pandas as pd
import matplotlib.pyplot as plt

from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.model_selection import train_test_split
from sklearn.metrics import confusion_matrix, accuracy_score, roc_auc_score
from sklearn.ensemble import RandomForestClassifier
from sklearn.pipeline import Pipeline
from sklearn.model_selection import GridSearchCV
from sklearn.neighbors import NearestNeighbors

# Explainable AI tools:
import shap
from lime.lime_tabular import LimeTabularExplainer
from alibi.explainers import AnchorTabular # why not used the original anchor package?

from scipy.stats import spearmanr, pearsonr

from tensorflow.keras.layers import Input, Dense
from tensorflow.keras.models import Model
from tensorflow.keras.optimizers import Adam  # Import the Adam optimizer

# MCDM:
import pymcdm

# Rank Aggregation:
from ranx import Run, fuse

2024-12-07 15:39:17.391087: I tensorflow/core/util/port.cc:153] oneDNN custom operations are on. You may see slightly different numerical results due to floating-point round-off errors from different computation orders. To turn them off, set the environment variable `TF_ENABLE_ONEDNN_OPTS=0`.
2024-12-07 15:39:17.668482: I tensorflow/core/platform/cpu_feature_guard.cc:210] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations.
To enable the following instructions: AVX2 AVX_VNNI FMA, in other operations, rebuild TensorFlow with the appropriate compiler flags.


In [34]:
# Configure pandas output
pd.set_option('display.max_columns', None)
pd.set_option('display.max_rows', 40)

# Data loading and preprocessing

In [35]:
original_data = pd.read_csv('data/german_credit_data_updated.csv')

# Dataset overview - German Credit Risk (from Kaggle):
# 1. Age (numeric)
# 2. Sex (text: male, female)
# 3. Job (numeric: 0 - unskilled and non-resident, 1 - unskilled and resident, 2 - skilled, 3 - highly skilled)
# 4. Housing (text: own, rent, or free)
# 5. Saving accounts (text - little, moderate, quite rich, rich)
# 6. Checking account (numeric, in DM - Deutsch Mark)
# 7. Credit amount (numeric, in DM)
# 8. Duration (numeric, in month)
# 9. Purpose (text: car, furniture/equipment, radio/TV, domestic appliances, repairs, education, business, vacation/others)

display(original_data.head())
display(original_data.describe())
display(original_data.info())

# Display the unique values of thprecision=3, e categorical features:
print('Unique values of the categorical features:')
for col in original_data.select_dtypes(include='object'):
    print(f'\t- {col}: {original_data[col].unique()}')

Unnamed: 0.1,Unnamed: 0,Age,Sex,Job,Housing,Saving accounts,Checking account,Credit amount,Duration,Purpose,Credit Risk
0,0,67,male,2,own,,little,1169,6,radio/TV,1
1,1,22,female,2,own,little,moderate,5951,48,radio/TV,2
2,2,49,male,1,own,little,,2096,12,education,1
3,3,45,male,2,free,little,little,7882,42,furniture/equipment,1
4,4,53,male,2,free,little,little,4870,24,car,2


Unnamed: 0.1,Unnamed: 0,Age,Job,Credit amount,Duration,Credit Risk
count,954.0,954.0,954.0,954.0,954.0,954.0
mean,476.5,35.501048,1.909853,3279.112159,20.780922,1.302935
std,275.540378,11.379668,0.649681,2853.315158,12.046483,0.459768
min,0.0,19.0,0.0,250.0,4.0,1.0
25%,238.25,27.0,2.0,1360.25,12.0,1.0
50%,476.5,33.0,2.0,2302.5,18.0,1.0
75%,714.75,42.0,2.0,3975.25,24.0,2.0
max,953.0,75.0,3.0,18424.0,72.0,2.0


<class 'pandas.core.frame.DataFrame'>
RangeIndex: 954 entries, 0 to 953
Data columns (total 11 columns):
 #   Column            Non-Null Count  Dtype 
---  ------            --------------  ----- 
 0   Unnamed: 0        954 non-null    int64 
 1   Age               954 non-null    int64 
 2   Sex               954 non-null    object
 3   Job               954 non-null    int64 
 4   Housing           954 non-null    object
 5   Saving accounts   779 non-null    object
 6   Checking account  576 non-null    object
 7   Credit amount     954 non-null    int64 
 8   Duration          954 non-null    int64 
 9   Purpose           954 non-null    object
 10  Credit Risk       954 non-null    int64 
dtypes: int64(6), object(5)
memory usage: 82.1+ KB


None

Unique values of the categorical features:
	- Sex: ['male' 'female']
	- Housing: ['own' 'free' 'rent']
	- Saving accounts: [nan 'little' 'quite rich' 'rich' 'moderate']
	- Checking account: ['little' 'moderate' nan 'rich']
	- Purpose: ['radio/TV' 'education' 'furniture/equipment' 'car' 'business'
 'domestic appliances' 'repairs' 'vacation/others']


In [36]:
preprocessed_data = original_data.copy()

# For savings and checking accounts, we will replace the missing values with 'none':
preprocessed_data['Saving accounts'].fillna('none', inplace=True)
preprocessed_data['Checking account'].fillna('none', inplace=True)

# Dropping index column:
preprocessed_data.drop(columns=['Unnamed: 0'], inplace=True)

# Using pd.dummies to one-hot-encode the categorical features
preprocessed_data["Job"] = preprocessed_data["Job"].map({0: 'unskilled_nonresident', 1: 'unskilled_resident',
                                                         2: 'skilled', 3: 'highlyskilled'})

categorical_features = preprocessed_data.select_dtypes(include='object').columns
numerical_features = preprocessed_data.select_dtypes(include='number').columns.drop('Credit Risk')
print(f'Categorical features: {categorical_features}')
print(f'Numerical features: {numerical_features}')

preprocessed_data = pd.get_dummies(preprocessed_data, columns=categorical_features, dtype='int64')

# Remapping the target variable to 0 and 1:
preprocessed_data['Credit Risk'] = preprocessed_data['Credit Risk'].map({1: 0, 2: 1})

# Make sure all column names are valid python identifiers (important for pd.query() calls):
preprocessed_data.columns = preprocessed_data.columns.str.replace(' ', '_')
preprocessed_data.columns = preprocessed_data.columns.str.replace('/', '_')

# Normalizing the data
scaler = StandardScaler()
scaled_preprocessed_data = scaler.fit_transform(preprocessed_data)

display(preprocessed_data.head())
display(preprocessed_data.info())

display(scaled_preprocessed_data)


Categorical features: Index(['Sex', 'Job', 'Housing', 'Saving accounts', 'Checking account',
       'Purpose'],
      dtype='object')
Numerical features: Index(['Age', 'Credit amount', 'Duration'], dtype='object')


Unnamed: 0,Age,Credit_amount,Duration,Credit_Risk,Sex_female,Sex_male,Job_highlyskilled,Job_skilled,Job_unskilled_nonresident,Job_unskilled_resident,Housing_free,Housing_own,Housing_rent,Saving_accounts_little,Saving_accounts_moderate,Saving_accounts_none,Saving_accounts_quite_rich,Saving_accounts_rich,Checking_account_little,Checking_account_moderate,Checking_account_none,Checking_account_rich,Purpose_business,Purpose_car,Purpose_domestic_appliances,Purpose_education,Purpose_furniture_equipment,Purpose_radio_TV,Purpose_repairs,Purpose_vacation_others
0,67,1169,6,0,0,1,0,1,0,0,0,1,0,0,0,1,0,0,1,0,0,0,0,0,0,0,0,1,0,0
1,22,5951,48,1,1,0,0,1,0,0,0,1,0,1,0,0,0,0,0,1,0,0,0,0,0,0,0,1,0,0
2,49,2096,12,0,0,1,0,0,0,1,0,1,0,1,0,0,0,0,0,0,1,0,0,0,0,1,0,0,0,0
3,45,7882,42,0,0,1,0,1,0,0,1,0,0,1,0,0,0,0,1,0,0,0,0,0,0,0,1,0,0,0
4,53,4870,24,1,0,1,0,1,0,0,1,0,0,1,0,0,0,0,1,0,0,0,0,1,0,0,0,0,0,0


<class 'pandas.core.frame.DataFrame'>
RangeIndex: 954 entries, 0 to 953
Data columns (total 30 columns):
 #   Column                       Non-Null Count  Dtype
---  ------                       --------------  -----
 0   Age                          954 non-null    int64
 1   Credit_amount                954 non-null    int64
 2   Duration                     954 non-null    int64
 3   Credit_Risk                  954 non-null    int64
 4   Sex_female                   954 non-null    int64
 5   Sex_male                     954 non-null    int64
 6   Job_highlyskilled            954 non-null    int64
 7   Job_skilled                  954 non-null    int64
 8   Job_unskilled_nonresident    954 non-null    int64
 9   Job_unskilled_resident       954 non-null    int64
 10  Housing_free                 954 non-null    int64
 11  Housing_own                  954 non-null    int64
 12  Housing_rent                 954 non-null    int64
 13  Saving_accounts_little       954 non-null    int64

None

array([[ 2.7694545 , -0.7399179 , -1.22763429, ...,  1.62518349,
        -0.14633276, -0.11286653],
       [-1.18704073,  0.93690642,  2.26068929, ...,  1.62518349,
        -0.14633276, -0.11286653],
       [ 1.18685641, -0.41486224, -0.72930235, ..., -0.61531514,
        -0.14633276, -0.11286653],
       ...,
       [-1.0111965 , -0.39768023,  1.26402541, ..., -0.61531514,
        -0.14633276, -0.11286653],
       [-0.65950803,  0.29240557,  0.26736153, ..., -0.61531514,
        -0.14633276, -0.11286653],
       [-0.83535227,  2.69823821,  1.26402541, ..., -0.61531514,
        -0.14633276, -0.11286653]])

# Fitting classifier

In [37]:
y = preprocessed_data['Credit_Risk']
X = preprocessed_data.drop(columns='Credit_Risk')

X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

In [38]:
clf = RandomForestClassifier(random_state=42)
clf.fit(X_train, y_train)

y_pred = clf.predict(X_test)

print(f'Accuracy: {accuracy_score(y_test, y_pred)}')
print(f'ROC AUC: {roc_auc_score(y_test, y_pred)}')

Accuracy: 0.7696335078534031
ROC AUC: 0.6830357142857143


# Defining explanation model wrapper (for standard behaviour)

In [39]:
class ExplainerWrapper:

    def __init__(self, clf, X_train: pd.DataFrame | np.ndarray, categorical_feature_names: list[str], predict_proba: callable = None):
        self.clf = clf

        if hasattr(clf, 'predict_proba') and predict_proba is None:
            self.predict_proba = clf.predict_proba
        elif predict_proba is not None:
            self.predict_proba = predict_proba
        else:
            raise ValueError('The classifier does not have a predict_proba method and no predict_proba_function was provided.')

        self.X_train = X_train
        self.categorical_feature_names = categorical_feature_names

    
    def explain_instance(self, instance_data_row: pd.Series | np.ndarray) -> pd.DataFrame:
        pass

class LimeWrapper(ExplainerWrapper):

    def __init__(self, clf, X_train: pd.DataFrame | np.ndarray, categorical_feature_names: list[str], predict_proba: callable = None):
        super().__init__(clf, X_train, categorical_feature_names, predict_proba)
        
        self.explainer = LimeTabularExplainer(self.X_train.values, feature_names=self.X_train.columns, discretize_continuous=False)
    
    def explain_instance(self, instance_data_row: pd.Series | np.ndarray) -> pd.DataFrame:
        lime_exp = self.explainer.explain_instance(instance_data_row, self.predict_proba, num_features=len(self.X_train.columns))
        
        ranking = pd.DataFrame(lime_exp.as_list(), columns=['feature', 'score'])
        return ranking

class ShapTabularTreeWrapper(ExplainerWrapper):
    
        def __init__(self, clf, X_train: pd.DataFrame | np.ndarray, categorical_feature_names: list[str], predict_proba: callable = None, **additional_explainer_args):
            super().__init__(clf, X_train, categorical_feature_names, predict_proba)
            
            self.explainer = shap.TreeExplainer(clf, self.X_train, **additional_explainer_args)
        
        def explain_instance(self, instance_data_row: pd.Series | np.ndarray) -> pd.DataFrame:
            shap_values = self.explainer.shap_values(instance_data_row)
    
            ranking = pd.DataFrame(list(zip(self.X_train.columns, shap_values[:, 0])), columns=['feature', 'score'])
            ranking = ranking.sort_values(by='score', ascending=False, key=lambda x: abs(x)).reset_index(drop=True)
            
            return ranking

class AnchorWrapper(ExplainerWrapper):

    def __init__(self, clf, X_train: pd.DataFrame | np.ndarray, categorical_feature_names: list[str], predict_proba: callable = None):
        super().__init__(clf, X_train, categorical_feature_names, predict_proba)
        
        self.explainer = AnchorTabular(predictor=self.predict_proba, feature_names=self.X_train.columns) # TODO: fix parameters
        self.explainer.fit(self.X_train.values)
    
    def explain_instance(self, instance_data_row: pd.Series | np.ndarray) -> pd.DataFrame:
        if isinstance(instance_data_row, pd.Series):
            instance_data_row = instance_data_row.to_numpy()

        feature_importances = {feature: 0 for feature in self.X_train.columns}
        explanation = self.explainer.explain(instance_data_row)
        
        for rule in explanation.anchor:
            # Extract the feature name from the rule string
            # This method won't work for column names that have spaces in them or that don't contain any letters
            for expression_element in rule.split():
                if any(c.isalpha() for c in expression_element):
                    referenced_feature = expression_element
                    break

            rule_coverage = self.X_train.query(rule).shape[0] / self.X_train.shape[0]
            feature_importances[referenced_feature] = 1 - rule_coverage
        
        return pd.DataFrame(list(feature_importances.items()), columns=['feature', 'score']).sort_values(by='score', ascending=False).reset_index(drop=True)

# Defining autoencoder noisy data generator
(to be used on calculating sensitivity metrics)

In [40]:
class AutoencoderNoisyDataGenerator():
    def __init__(self, X: pd.DataFrame, ohe_categorical_features_names: list[str], encoding_dim: int = 5, epochs=500):
        self.X = X
        self.categorical_features_names = ohe_categorical_features_names
        self.encoding_dim = encoding_dim
        self.epochs = epochs

        scaler = StandardScaler()
        self.X_scaled = scaler.fit_transform(self.X)
        
        input_dim = self.X_scaled.shape[1]

        input_layer = Input(shape=(input_dim,))
        encoded = Dense(self.encoding_dim, activation='relu')(input_layer)
        decoded = Dense(input_dim, activation='sigmoid')(encoded)

        self.autoencoder = Model(inputs=input_layer, outputs=decoded)
        self.encoder = Model(inputs=input_layer, outputs=encoded)
        self.was_fit = False
        
    
    def fit(self):
        self.autoencoder.compile(optimizer=Adam(), loss='mean_squared_error')
        self.autoencoder.fit(self.X_scaled, self.X_scaled, epochs=self.epochs, batch_size=32, shuffle=True, validation_split=0.2)
        # Extract hidden layer representation:
        self.hidden_representation = self.encoder.predict(self.X_scaled)
        self.was_fit = True


    def generate_noisy_data(self, num_features_to_replace: int = 2) -> pd.DataFrame:
        """
        Returns a DataFrame containing a noisy variation of the data.

        The noise is generated by swapping the values of a small number of features between a sample and a random close neighbor.
        To determine the neighbors, we use an autoencoder to reduce the dimensionality of the data and then calculate the use the NearestNeightbors algorithm in the reduced space.
        """

        if not self.was_fit:
            raise ValueError('The autoencoder has not been fitted yet. Call the fit() method before generating noisy data.')

        # Compute Nearest Neighbors using hidden_representation
        nbrs = NearestNeighbors(n_neighbors=5, algorithm='auto').fit(self.hidden_representation)
        distances, indices = nbrs.kneighbors(self.hidden_representation)

        X_noisy = self.X.copy()

        # Get id's of columns that belong to the same categorical feature (after being one-hot-encodeded);
        # Columns that belong to the same categorical feature start with the same name, and will be treated as a single feature when adding noise.
        categorical_features_indices = [
            [self.X.columns.get_loc(col_name) for col_name in self.X.columns if col_name.startswith(feature)]
            for feature in self.categorical_features_names
        ]

        # Replace features with random neighbor's features
        for i in range(self.X.shape[0]):  # Iterate over each sample
            available_features_to_replace = list(range(self.X.shape[1]))
            for j in range(num_features_to_replace):
                # Select features to replace; if the feture selected belong to one of the lists in categorical_features_indices, we will replace all the features in that list
                features_to_replace = np.random.choice(available_features_to_replace, 1)
                for feature_indices in categorical_features_indices:
                    if features_to_replace in feature_indices:
                        features_to_replace = feature_indices
                        break
                
                # Remove the selected features from the list of available features to replace
                available_features_to_replace = [f for f in available_features_to_replace if f not in features_to_replace]

                # Choose a random neighbor from the nearest neighbors
                neighbor_idx = np.random.choice(indices[i][1:])

                # Replace the selected features with the neighbor's features
                X_noisy.iloc[i, features_to_replace] = self.X.iloc[neighbor_idx, features_to_replace]

        return X_noisy

### Usage Example:

In [41]:
# Usage Example:
autoencoder_noisy_data_generator = AutoencoderNoisyDataGenerator(X_train, categorical_features, encoding_dim=5, epochs=10)
autoencoder_noisy_data_generator.fit()
noisy_data = autoencoder_noisy_data_generator.generate_noisy_data(num_features_to_replace=2)
display("Mean Absolute Difference: ", np.mean(np.abs(X_train - noisy_data)))

Epoch 1/10
[1m20/20[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 4ms/step - loss: 1.2269 - val_loss: 1.2240
Epoch 2/10
[1m20/20[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 1ms/step - loss: 1.2481 - val_loss: 1.2068
Epoch 3/10
[1m20/20[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 1ms/step - loss: 1.2139 - val_loss: 1.1903
Epoch 4/10
[1m20/20[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 1ms/step - loss: 1.1858 - val_loss: 1.1742
Epoch 5/10
[1m20/20[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 1ms/step - loss: 1.1953 - val_loss: 1.1585
Epoch 6/10
[1m20/20[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 1ms/step - loss: 1.1587 - val_loss: 1.1429
Epoch 7/10
[1m20/20[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 1ms/step - loss: 1.1240 - val_loss: 1.1272
Epoch 8/10
[1m20/20[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 1ms/step - loss: 1.1496 - val_loss: 1.1121
Epoch 9/10
[1m20/20[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[

'Mean Absolute Difference: '

5.676097075970533

# Defining evaluator class to hold evaluation metric methods

In [42]:
class XaiEvaluator:

    def __init__(self, clf, X_train: pd.DataFrame | np.ndarray, ohe_categorical_feature_names: list[str], predict_proba: callable = None, noise_gen_args: dict = {}):
        self.clf = clf
        if hasattr(clf, 'predict_proba') and predict_proba is None:
            self.predict_proba = clf.predict_proba
        elif predict_proba is not None:
            self.predict_proba = predict_proba
        else:
            raise ValueError('The classifier does not have a predict_proba method and no predict_proba_function was provided.')

        self.X_train = X_train
        self.ohe_categorical_feature_names = ohe_categorical_feature_names

        self.categorical_features_indices = [
            [self.X_train.columns.get_loc(col_name) for col_name in self.X_train.columns if col_name.startswith(feature)]
            for feature in self.ohe_categorical_feature_names
        ]

        self.noisy_data_generator = AutoencoderNoisyDataGenerator(X_train, ohe_categorical_feature_names, **noise_gen_args)

        self.was_initialized = False
    
    # Initialization opeations that take a long time to run
    def init(self):
        self.noisy_data_generator.fit()
        self.was_initialized = True
            
    def faithfullness_correlation(self, explainer: ExplainerWrapper | Type[ExplainerWrapper], instance_data_row: pd.Series, len_subset: int = None,
                                  iterations: int = 100, baseline_strategy: Literal["zeros", "mean"] = "zeros") -> float:
        if not isinstance(explainer, ExplainerWrapper):
            explainer = explainer(self.clf, self.X_train, self.ohe_categorical_feature_names, predict_proba=self.predict_proba)
        
        dimension = len(instance_data_row)  

        importance_sums = []
        delta_fs = []

        f_x = self.predict_proba(instance_data_row.to_numpy().reshape(1, -1))[0][1]
        g_x = explainer.explain_instance(instance_data_row)

        for _ in range(iterations):
            # Select a subset of features to perturb
            subset = np.random.choice(instance_data_row.index.values, len_subset if len_subset else dimension/4, replace=False)

            perturbed_instance = instance_data_row.copy()

            if baseline_strategy == "zeros":
                baseline = np.zeros(dimension)  # either mean on all zeros
            elif baseline_strategy == "mean":
                baseline = np.mean(self.X_train, axis=0)
                for feature_index in self.categorical_features_indices:
                    baseline[feature_index] = 0
                
            perturbed_instance[subset] = baseline[instance_data_row.index.get_indexer(subset)]

            importance_sum = 0
            for feature in subset:
                importance_sum += g_x[g_x['feature'] == feature]['score'].values[0] # should I take the abs value here?
            importance_sums.append(importance_sum)

            f_x_perturbed = self.predict_proba(perturbed_instance.to_numpy().reshape(1, -1))[0][1]
            delta_f = np.abs(f_x - f_x_perturbed)
            delta_fs.append(delta_f)
        
        return abs(pearsonr(importance_sums, delta_fs).statistic)
    
    def sensitivity(self, ExplainerType: Type[ExplainerWrapper], instance_data_row: pd.Series, iterations: int = 10, method: Literal['mean_squared', 'spearman', 'pearson'] = 'spearman',
                    custom_method: Callable[[pd.DataFrame, pd.DataFrame], float]=None) -> float:
        if not self.was_initialized:
            raise ValueError('The XaiEvaluator has not been initialized yet. Call the init() method before evaluating sensitivity.')
        
        original_explainer = ExplainerType(self.clf, self.X_train, self.ohe_categorical_feature_names, predict_proba=self.predict_proba)

        results: list[float] = []
        for _ in range(iterations):
            # Obtain the original explanation:
            original_explanation = original_explainer.explain_instance(instance_data_row)

            # Obtain the noisy explanation:
            noisy_data = self.noisy_data_generator.generate_noisy_data(num_features_to_replace=2)
            noisy_explainer = ExplainerType(self.clf, noisy_data, self.ohe_categorical_feature_names, predict_proba=self.predict_proba)
            noisy_explanation = noisy_explainer.explain_instance(instance_data_row)

            if custom_method is not None:
                results.append(custom_method(original_explanation, noisy_explanation))
            elif method == 'mean_squared':
                mean_squared_difference = ((original_explanation['score'] - noisy_explanation['score']) ** 2).mean()
                results.append(mean_squared_difference)
            elif method == 'spearman':
                spearman_correlation = spearmanr(original_explanation['score'], noisy_explanation['score']).correlation
                results.append(abs(spearman_correlation))
            elif method == 'pearson':
                pearson_correlation = pearsonr(original_explanation['score'], noisy_explanation['score']).correlation
                results.append(abs(pearson_correlation))
        
        return np.mean(results)

    def complexity(self, explainer: ExplainerWrapper | Type[ExplainerWrapper], instance_data_row: pd.Series, **kwargs) -> float:
        if not kwargs.get("bypass_check", False) and not isinstance(explainer, ExplainerWrapper):
            explainer = explainer(self.clf, self.X_train, self.ohe_categorical_feature_names, predict_proba=self.predict_proba)

        explanation = explainer.explain_instance(instance_data_row)

        def frac_contribution(explanation: pd.DataFrame, i: int) -> float:
            abs_score_sum = explanation['score'].abs().sum()
            return explanation['score'].abs()[i] / abs_score_sum

        sum = 0
        for i in range(explanation.shape[0]):
            fc = frac_contribution(explanation, i)
            sum += fc * np.log(fc) if fc > 0 else 0
            
        return -sum

### Usage example:

In [43]:
xai_eval = XaiEvaluator(clf, X_train, categorical_features, noise_gen_args={'encoding_dim': 5, 'epochs': 10})
xai_eval.init()

Epoch 1/10
[1m20/20[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 4ms/step - loss: 1.2429 - val_loss: 1.2328
Epoch 2/10
[1m20/20[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 1ms/step - loss: 1.2228 - val_loss: 1.2142
Epoch 3/10
[1m20/20[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 2ms/step - loss: 1.2202 - val_loss: 1.1969
Epoch 4/10
[1m20/20[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 1ms/step - loss: 1.1996 - val_loss: 1.1802
Epoch 5/10
[1m20/20[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 1ms/step - loss: 1.1479 - val_loss: 1.1634
Epoch 6/10
[1m20/20[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 1ms/step - loss: 1.1609 - val_loss: 1.1471
Epoch 7/10
[1m20/20[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 1ms/step - loss: 1.1449 - val_loss: 1.1313
Epoch 8/10
[1m20/20[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 1ms/step - loss: 1.1408 - val_loss: 1.1156
Epoch 9/10
[1m20/20[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[

In [44]:
sample_idx = 0

faithfullness =  xai_eval.faithfullness_correlation(AnchorWrapper,
                                                    X_test.iloc[sample_idx], len_subset=10, iterations=100, baseline_strategy="mean")
print("Faithfulness: ", faithfullness)
sensitivity = xai_eval.sensitivity(AnchorWrapper, X_test.iloc[sample_idx], iterations=10)
print("Sensitivity: ", sensitivity)
complexity = xai_eval.complexity(ShapTabularTreeWrapper, X_test.iloc[sample_idx])
print("Complexity: ", complexity)

Faithfulness:  0.11133162798773424
Sensitivity:  0.9327924952426636
Complexity:  2.516308873975537


In [45]:
# Testing edge complexity cases:
class TestWrapper(ExplainerWrapper):
    def __init__(self, test_explanation):
        self.test_explanation = test_explanation

    def explain_instance(self, instance_data_row: pd.Series | np.ndarray) -> pd.DataFrame:
        return self.test_explanation

test_explanation = pd.DataFrame({'feature': ['a', 'b', 'c'], 'score': [1, 0.05, 0]})
test_explainer = TestWrapper(test_explanation)

complexity = xai_eval.complexity(test_explainer, X_test.iloc[sample_idx], bypass_check=True)
print(complexity)

0.19144408195771734


### Xai Evaluation Metrics Trends for Selected Explanation Models
(Takes a long time to run)

In [46]:
# TODO: take a look at paralelization:

shap_sensitivities = []
shap_faithfullnesses = []
shap_complexities = []

lime_sensitivities = []
lime_faithfullnesses = []
lime_complexities = []

anchor_sensitivities = []
anchor_faithfullnesses = []
anchor_complexities = []

shap_exp = ShapTabularTreeWrapper(clf, X_train, categorical_features)
lime_exp = LimeWrapper(clf, X_train, categorical_features)
anchor_exp = AnchorWrapper(clf, X_train, categorical_features)

INSTANCES_TO_CHECK = 20

for i in range(INSTANCES_TO_CHECK):
    sample_idx = i
    shap_sensitivities.append(xai_eval.sensitivity(ShapTabularTreeWrapper, X_test.iloc[sample_idx], iterations=10))
    lime_sensitivities.append(xai_eval.sensitivity(LimeWrapper, X_test.iloc[sample_idx], iterations=10))
    anchor_sensitivities.append(xai_eval.sensitivity(AnchorWrapper, X_test.iloc[sample_idx], iterations=10))

    shap_faithfullnesses.append(xai_eval.faithfullness_correlation(shap_exp, X_test.iloc[sample_idx], len_subset=10, iterations=10, baseline_strategy="mean"))
    lime_faithfullnesses.append(xai_eval.faithfullness_correlation(lime_exp, X_test.iloc[sample_idx], len_subset=10, iterations=10, baseline_strategy="mean"))
    anchor_faithfullnesses.append(xai_eval.faithfullness_correlation(anchor_exp, X_test.iloc[sample_idx], len_subset=10, iterations=10, baseline_strategy="mean"))

    shap_complexities.append(xai_eval.complexity(shap_exp, X_test.iloc[sample_idx]))
    lime_complexities.append(xai_eval.complexity(lime_exp, X_test.iloc[sample_idx]))
    anchor_complexities.append(xai_eval.complexity(anchor_exp, X_test.iloc[sample_idx]))

shap_metrics = pd.DataFrame({
    'faithfullness': shap_faithfullnesses,
    'complexity': shap_complexities
})

lime_metrics = pd.DataFrame({
    'faithfullness': lime_faithfullnesses,
    'complexity': lime_complexities
})

anchor_metrics = pd.DataFrame({
    'faithfullness': anchor_faithfullnesses,
    'complexity': anchor_complexities
})

KeyboardInterrupt: 

In [None]:
print("Shap trends:")
display(shap_metrics.describe())

print("Lime trends:")
display(lime_metrics.describe())

print("Anchor trends:")
display(anchor_metrics.describe())

Shap trends:


Unnamed: 0,faithfullness,complexity
count,20.0,20.0
mean,-0.100756,2.454617
std,0.579722,0.13845
min,-0.958061,2.071901
25%,-0.624039,2.379469
50%,-0.08947,2.464648
75%,0.356174,2.528257
max,0.921765,2.731101


Lime trends:


Unnamed: 0,faithfullness,complexity
count,20.0,20.0
mean,0.099843,2.580166
std,0.313569,0.046391
min,-0.438614,2.489802
25%,-0.159231,2.556702
50%,0.158247,2.58807
75%,0.306877,2.607834
max,0.600445,2.664484


Anchor trends:


Unnamed: 0,faithfullness,complexity
count,20.0,20.0
mean,0.299502,1.536169
std,0.447762,0.614974
min,-0.691829,0.679615
25%,-0.000327,1.214978
50%,0.4518,1.418186
75%,0.706762,1.902132
max,0.835922,2.688599


# Aggregation Step

In [None]:
xai_eval = XaiEvaluator(clf, X_train, categorical_features, noise_gen_args={'encoding_dim': 5, 'epochs': 10})
xai_eval.init()

Epoch 1/10
[1m20/20[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 4ms/step - loss: 1.2939 - val_loss: 1.2636
Epoch 2/10
[1m20/20[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 1ms/step - loss: 1.2286 - val_loss: 1.2457
Epoch 3/10
[1m20/20[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 1ms/step - loss: 1.2582 - val_loss: 1.2292
Epoch 4/10
[1m20/20[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 1ms/step - loss: 1.2290 - val_loss: 1.2137
Epoch 5/10
[1m20/20[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 1ms/step - loss: 1.1965 - val_loss: 1.1983
Epoch 6/10
[1m20/20[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 1ms/step - loss: 1.2144 - val_loss: 1.1831
Epoch 7/10
[1m20/20[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 1ms/step - loss: 1.1856 - val_loss: 1.1678
Epoch 8/10
[1m20/20[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 1ms/step - loss: 1.1716 - val_loss: 1.1529
Epoch 9/10
[1m20/20[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[

### Calculating weights with MCDM algorithm: TOPSIS

In [None]:
sample_idx = 10

shap_metrics = []
lime_metrics = []
anchor_metrics = []

shap_exp = ShapTabularTreeWrapper(clf, X_train, categorical_features)
lime_exp = LimeWrapper(clf, X_train, categorical_features)
anchor_exp = AnchorWrapper(clf, X_train, categorical_features)

shap_metrics.append(xai_eval.sensitivity(ShapTabularTreeWrapper, X_test.iloc[sample_idx], iterations=1, method="pearson"))
shap_metrics.append(xai_eval.faithfullness_correlation(shap_exp, X_test.iloc[sample_idx], len_subset=10, iterations=10, baseline_strategy="mean"))
shap_metrics.append(xai_eval.complexity(shap_exp, X_test.iloc[sample_idx]))

lime_metrics.append(xai_eval.sensitivity(LimeWrapper, X_test.iloc[sample_idx], iterations=1, method="pearson"))
lime_metrics.append(xai_eval.faithfullness_correlation(lime_exp, X_test.iloc[sample_idx], len_subset=10, iterations=10, baseline_strategy="mean"))
lime_metrics.append(xai_eval.complexity(lime_exp, X_test.iloc[sample_idx]))

anchor_metrics.append(xai_eval.sensitivity(AnchorWrapper, X_test.iloc[sample_idx], iterations=1, method="pearson"))
anchor_metrics.append(xai_eval.faithfullness_correlation(anchor_exp, X_test.iloc[sample_idx], len_subset=10, iterations=10, baseline_strategy="mean"))
anchor_metrics.append(xai_eval.complexity(lime_exp, X_test.iloc[sample_idx]))

In [None]:
evaluation_matrix = np.array([
    shap_metrics,
    lime_metrics,
    anchor_metrics
])

display(evaluation_matrix)


# Maybe look into entropy weighting
robustness_metrics_weights = [
    1/3, # sensitivity
    1/3, # faithfullness
    1/3, # complexity
]
# if higher value is preferred - True
# if lower value is preferred - False
criterias = np.array([
    True,  # For sensitivity(method="pearson"), higher is better
    True,   # For faithfulness, higher is better
    False,   # For complexity, lower is better
])

t = Topsis(evaluation_matrix, robustness_metrics_weights, criterias, debug=True)
display(t.criteria)
t.calc()

shap_weight, lime_weight, anchor_weight = t.worst_similarity # I know this is confusing; it was suposed to be by best_similarity, but there seems to be a mistake in the implementation, or the naming is wrong

print(f"Shap weight: {shap_weight}")
print(f"Lime weight: {lime_weight}")
print(f"Anchor weight: {anchor_weight}")

array([[0.99894299, 0.66011371, 2.46439113],
       [0.94922866, 0.52194693, 2.64367566],
       [0.99715431, 0.58993356, 2.66161282]])

array([1., 1., 0.])

Step 1 - Evaluation Matrix:
 [[0.99894299 0.66011371 2.46439113]
 [0.94922866 0.52194693 2.64367566]
 [0.99715431 0.58993356 2.66161282]]

Step 2 - Normalized Evaluation Matrix:
 [[0.58728453 0.64231136 0.54904838]
 [0.55805717 0.50787075 0.58899167]
 [0.58623295 0.57402388 0.59298793]]

Step 3 - Weighted Normalized Evaluation Matrix
 [[0.19576151 0.21410379 0.18301613]
 [0.18601906 0.16929025 0.19633056]
 [0.19541098 0.19134129 0.19766264]]

Step 4 - worst_alternatives | best_alternatives 
 [0.18601906 0.16929025 0.19766264]  |  [0.19576151 0.21410379 0.18301613]

Step 5 - Distances to Worst Alternative | Distances to Best Alternative
 [0.04814238 0.00133209 0.02396783] [0.         0.04775398 0.02706981]

Step 6 - Similarites to Worst Alternative | Similarities to Best Alternative
 [1.         0.02713782 0.46961084] [0.         0.97286218 0.53038916]

Shap weight: 1.0
Lime weight: 0.0271378212664916
Anchor weight: 0.469610843122715


### Agregating rankings with ranx

In [None]:
# Gettings ranks:
shap_explanation = shap_exp.explain_instance(X_test.iloc[sample_idx])
lime_explanation = lime_exp.explain_instance(X_test.iloc[sample_idx])
anchor_explanation = anchor_exp.explain_instance(X_test.iloc[sample_idx])

shap_explanation["query"] = "1"
lime_explanation["query"] = "1"
anchor_explanation["query"] = "1"

shap_run = Run.from_df(shap_explanation, q_id_col="query", doc_id_col="feature", score_col="score")
lime_run = Run.from_df(lime_explanation, q_id_col="query", doc_id_col="feature", score_col="score")
anchor_run = Run.from_df(anchor_explanation, q_id_col="query", doc_id_col="feature", score_col="score")

In [None]:
aggregated_run = fuse([shap_run, lime_run, anchor_run], method="wsum", params={"weights": [shap_weight, lime_weight, anchor_weight]})

In [None]:
aggregated_run.to_dataframe()

Unnamed: 0,q_id,doc_id,score
0,1,Checking_account_none,1.392905
1,1,Credit_amount,0.596176
2,1,Purpose_radio_TV,0.559627
3,1,Checking_account_little,0.484291
4,1,Checking_account_moderate,0.353738
5,1,Age,0.332842
6,1,Sex_male,0.283322
7,1,Housing_own,0.185941
8,1,Duration,0.16973
9,1,Saving_accounts_little,0.16343
