In [2]:
import matplotlib.pyplot as plt
import random
from sklearn import datasets
import mpl_toolkits.mplot3d 
from sklearn.cluster import KMeans
from sklearn import metrics
import numpy as np
import pandas as pd
from cxplain.xkm import XkmExplainer
from cxplain.tree import  DecisionTreeExplainer, RandomForestExplainer, ExKMCExplainer
from cxplain.shap import  ShapExplainer
from cxplain.gradient import GradientExplainer  
from cxplain.metrics import EuclideanMetric, Metric, ManhattenMetric
from cxplain.neon import NeonKMeansExplainer
from cxplain.errors import NonExistingRelevanceError

  from .autonotebook import tqdm as notebook_tqdm


In [3]:
class NormalCKDE:
    def __init__(self, data):
        self.epsilon = 0.00001
        self.stopping_threshold = 100
        self.data = data
        self.variance = None
        self.n_obs = self.data.shape[0]
        self.n_features = self.data.shape[1]
        
    def fit(self):
        counter = 0
        variance = 1
        variance_list =[variance]
        
        while not self._is_converged(variance_list):
            variance_old = variance_list[-1]
            variance_new = self._update_variance(variance_old)
            variance_list.append(variance_new)
            counter += 1
            if counter >= self.stopping_threshold:
                print(f"No convergence after {self.stopping_threshold} steps!")
                break
                
        self.variance = variance_list[-1]
        return self
        
    def _update_variance(self, variance_old):
        update_weight = 1 / (self.n_obs * self.n_features)
        observation_list = []
        
        for obs_index in range(self.n_obs):
            base_obs = self.data[obs_index, :]
            nominator = sum([np.exp(-np.linalg.norm(base_obs - self.data[i, :])**2 / (2 * variance_old))
                             * np.linalg.norm(base_obs - self.data[i, :])**2 
                             for i in range(self.n_obs)
                             if i != obs_index])
            denominator = sum([np.exp(-np.linalg.norm(base_obs - self.data[i, :])**2 / (2 * variance_old))
                               for i in range(self.n_obs)
                               if i != obs_index])
            observation_list.append(nominator / denominator)
            
        return update_weight * sum(observation_list)
    
    def _is_converged(self, variance_list):
        if len(variance_list) < 3:
            return False
        considered_elements = variance_list[-3:]
        differences = np.diff(considered_elements)
        return np.sum(differences >= self.epsilon) == 0
                                                         
    def predict(self, feature_observation):
        rng = np.random.default_rng()
        # convert feature observation to numpy array
        feature_obs_arr = np.array(feature_observation, copy=True)
        # calculate weights
        # I first have to extract the indizes of given and to be imputed features
        index_given = np.where(feature_obs_arr != 0)
        index_impute = np.where(feature_obs_arr == 0) # I assume every obs to be imputed is 0
        # now calculate weights with only given indizes
        feature_obs_given = feature_obs_arr[index_given]
        nominators = [np.exp(-np.linalg.norm(feature_obs_given - observation[index_given])**2 / (2 * self.variance))
                      for observation in self.data]
        denominators = []
        for obs_index in range(self.n_obs):
            denominator = [np.exp(-np.linalg.norm(feature_obs_given - self.data[i, :][index_given])**2 / (2 * self.variance))
                           for i in range(self.n_obs)
                           if i != obs_index]
            denominators.append(sum(denominator))
        weights = [nominator / denominator for nominator, denominator in zip(nominators, denominators)]
        # sample index i  from weights distribution
        distribution_index = random.choices(list(range(self.n_obs)), weights=weights, k=1)[0]
        # sample from normal distribution i
        # get observation of distribution_index
        dist_obs = self.data[distribution_index, :]
        # extract only features to be imputed
        dist_obs_impute = dist_obs[index_impute]
        # sample from MVN with mean = dist_obs_impute und variance self.variance * I
        covariance = self.variance * np.identity(len(dist_obs_impute))
        sample = rng.multivariate_normal(mean=dist_obs_impute,cov=covariance, size=1)
        # fill up observation with imputed features
        feature_obs_arr[index_impute] = sample
        return feature_obs_arr
        
        

In [4]:
dataset_index = {"iris": }
datasets = {"iris": }

SyntaxError: invalid syntax (3084908807.py, line 1)

First I only use the iris data set for evaluation

In [10]:
n_clusters = 4
iris = datasets.load_iris()
X = iris.data
y = iris.target
n_obs = X.shape[0]
n_features = X.shape[1]
only_global = True

In [11]:
# fit Kmeans
kmeans = KMeans(n_clusters=n_clusters, random_state=3).fit(X)
cluster_centers = kmeans.cluster_centers_
predictions = kmeans.predict(X)

In [12]:
# init and fit explainer
# list allexplainers
explainers = {"tree": DecisionTreeExplainer(data= X, cluster_predictions=predictions),
             "forest": RandomForestExplainer(data= X, cluster_predictions=predictions),
             "exkmc": ExKMCExplainer(X, kmeans, k=n_clusters, max_leaves=2*n_clusters),
             "gradient": GradientExplainer(X, cluster_centers, predictions, EuclideanMetric, enable_abs_calculation=False),
             "shap": ShapExplainer(data= X, cluster_predictions=predictions),
             "neon": NeonKMeansExplainer(cluster_centers=cluster_centers, data=X, predictions=predictions),
             "xkm_next_best": XkmExplainer(X,  kmeans.cluster_centers_, "next_best", "euclidean", predictions),
             "xkm_all": XkmExplainer(X,  kmeans.cluster_centers_, "all", "euclidean", predictions)}

# fit and explain all explainers

explanations = {explainer_name:explainer.fit_explain() for explainer_name, explainer  in explainers.items()}


In [13]:
# first calculate all ROC curves for individual observations
result_individual = {}
imputer = NormalCKDE(X).fit()
for explainer_name, explanation in explanations.items():
    # init curve_list
    curve_list = []
    for index_obs in range(n_obs):
        # init list curve_obs_i to all 1 (length = num_features)
        curve_obs = [1 for i in range(n_features)]
        # init array of feature observations, I use an array instead of a list, as it is easier  later on to calculate distances to cluster centers
        feature_obs = np.array([0.0 for i in range(n_features)])
        # get relevance scores for observation, for explainers with only global scores, these will be used for every observation
        if only_global:
            relevance_scores = list(explanations[explainer_name].global_relevance)
        else:
            try:
                relevance_scores = list(explanations[explainer_name].pointwise_relevance.iloc[index_obs, :])
            except NonExistingRelevanceError:
                relevance_scores = list(explanations[explainer_name].global_relevance)
        
        for feature_index in range(n_features):
            # get biggest score and column index (indicate which feature is meant) and pop from list
            index_biggest_score = relevance_scores.index(max(relevance_scores))                
            relevance_scores[index_biggest_score] = -100 # I set to large negative number as popping would ruin the index correspondence from relevance score to feature
            # get observation for this feature
            obs_biggest_score = X[index_obs, index_biggest_score]
            # get corresponding cluster index for this observation
            cluster_index = predictions[index_obs]
            # add observation for feature to feature observations list
            feature_obs.put(index_biggest_score, obs_biggest_score) # has to be at index of feature in training data, as otherwise distance calculation is wrong
            # impute other entries (length = num_features) --> TBD
            if feature_index < (n_features - 1):
                feature_obs_imputed = imputer.predict(feature_obs)
            else:
                feature_obs_imputed = feature_obs.copy()
            # calculate distance to cluster centers for feature observations list
            distances = [np.linalg.norm(feature_obs_imputed - center) for center in cluster_centers]
            # get nearest_cluster_index
            nearest_cluster_index = distances.index(min(distances))
            # check whether cluster_index == nearest_cluster_index
            # if yes: return curve_obs_i
            # if no: replace first entry of curve_obs_i ith 0 and repeat
            if cluster_index == nearest_cluster_index:
                break
            else:
                curve_obs[feature_index] = 0
            # if yes: return curve_obs_i
            # if no: replace first entry of curve_obs_i ith 0 and repeat
            
        curve_list.append(curve_obs)
        
    # add explainer entry to dict
    result_individual[explainer_name] = curve_list
      
# Now compute AUC
result_auc = {explainer_name: (1 /(n_obs*n_features)) * sum(map(sum, curves)) for explainer_name, curves in result_individual.items()}

In [14]:
result_auc

{'tree': 0.9433333333333334,
 'forest': 0.9533333333333334,
 'exkmc': 0.9483333333333334,
 'gradient': 0.8133333333333334,
 'shap': 0.9450000000000001,
 'neon': 0.7866666666666667,
 'xkm_next_best': 0.9450000000000001,
 'xkm_all': 0.9433333333333334}