In [1]:
import numpy as np
import pandas as pd
from sklearn import datasets
from sklearn.linear_model import LogisticRegression
import scipy as sp
import random

In [45]:
class Geco():
    def __init__(self, pred_fn, score_fn, data, continuous):
        self.data = data
        self.pred_fn = pred_fn
        self.continuous = continuous
        self.score_fn = score_fn
    
    
    def get_data_params(self, prec=1, tol=1e-4):
        # precision, num_features, feature_range, etc.
        self.num_features = self.data.shape[1]
        self.feature_names = self.data.columns
        self.mad = sp.stats.median_abs_deviation(self.data)
        self.feature_range = {}
        self.precision = {}
        for feature in self.data.columns:
            if feature in self.continuous:
                self.feature_range[feature] = [np.min(self.data[feature]), np.max(self.data[feature])]
                if self.data[feature].dtype == 'int':
                    self.precision[feature] = 0
                else:
                    self.precision[feature] = prec
                    
            else:
                self.feature_range[feature] = np.unique(self.data[feature])
        
        
    def do_random_init(self, num_inits, features_to_vary, query, desired_class):
        
        init = np.zeros((num_inits, self.num_features))
        kx = 0
        
        while kx < num_inits:
            one_init = np.zeros(self.num_features)
            for jx, feature in enumerate(self.feature_names):
                if feature not in features_to_vary:
                    one_int[jx] = query_instance[feature]
                    continue
                if feature in self.continuous:
                    one_init[jx] = np.round(np.random.uniform(
                            self.feature_range[feature][0], self.feature_range[feature][1]), self.precision[feature])
                else:
                    one_init[jx] = np.random.choice(self.feature_range[feature])
            test = pd.DataFrame(one_init.reshape(1,-1), columns=self.feature_names)
            pred = self.pred_fn(test)
            if pred == desired_class:
                init[kx] = one_init
                kx += 1
        return (pd.DataFrame(init, columns=self.feature_names))
    
    def compute_proximity(self, cfs, query):
        diff = np.abs(cfs - np.array(query).reshape(1,-1)) / self.mad
        return np.sum(diff, axis=1) / np.sum(self.mad)
    
    def compute_sparsity(self, cfs, query):
        sparsity = np.count_nonzero(cfs - np.array(query).reshape(1,-1), axis=1)
        return sparsity / len(self.feature_names)
    
    def compute_yloss(self, cfs, desired_class):
        preds = self.score_fn(cfs)[:,desired_class]
        loss = -preds
        loss[np.where(preds < 0.5)] += 2
        loss[np.where(preds >= 0.5)] = 0
        return loss.flatten()
    
    def compute_loss(self, cfs, query, desired_class):
        prox = self.compute_proximity(cfs, query)
        sparse = self.compute_sparsity(cfs, query)
        yloss = self.compute_yloss(cfs, desired_class)
        loss = np.array(prox + sparse + yloss).reshape(-1,1)
        index = np.arange(len(cfs)).reshape(-1,1)
        return np.concatenate([index, loss], axis=1)
    
    def mate(self, parent1, parent2, features_to_vary, query):
        one_init = np.zeros(self.num_features)
        for i in range(self.num_features):
            feat = self.feature_names[i]
            if feat not in features_to_vary:
                one_init[i] = query[feat]
                continue
                
            prob = random.random()
            
            if prob < 0.40:
                one_init[i] = parent1[feat]
                
            elif prob < 0.80:
                one_init[i] = parent2[feat]
            
            else:
                if feat in self.continuous:
                    one_init[i] = np.round(
                        np.random.uniform(
                            self.feature_range[feat][0],
                            self.feature_range[feat][1]),
                            decimals = self.precision[feat]
                    )
                else:
                    one_init[i] = np.random.choice(
                        self.feature_range[self.feat_name[i]]
                    )
        return one_init
    
    def get_counterfactuals(self, query, features_to_vary, n_cfs, init_multiplier = 10, maxiter = 500, thres=1e-2):
        self.get_data_params()
        num_inits = init_multiplier * n_cfs
        desired_class = (self.pred_fn(query) + 1) % 2
        
        population = self.do_random_init(num_inits, features_to_vary, query, desired_class)
        iterations = 0
        prev_best = -np.inf
        cur_best = np.inf
        stop_count = 0
        cfs_pred = [np.inf] * n_cfs
        
        while iterations < maxiter:
            if (abs(prev_best - cur_best) <= thres) and (i == desired_pred for i in cfs_pred):
                stop_count += 1
            else:
                stop_count = 0
            if stop_count >= 5:
                break
            
            prev_best = cur_best
            
            fitness = self.compute_loss(population, query, desired_class)
            fitness = fitness[fitness[:,1].argsort()]
            cur_best = fitness[0][1]
            
            new_generation_1 = population.iloc[fitness[:n_cfs,0], :]
            cfs_pred = self.pred_fn(new_generation_1)
            
            remaining_pop = num_inits - n_cfs
            new_generation_2 = np.zeros((remaining_pop, self.num_features))
            top_half = fitness[:int(fitness.shape[0]/2),0].astype(np.int32)
            for idx in range(remaining_pop):
                parent1 = population.iloc[random.choice(top_half),:]
                parent2 = population.iloc[random.choice(top_half),:]
                child = self.mate(parent1, parent2, features_to_vary, query)
                new_generation_2[idx] = child
            new_generation_2 = pd.DataFrame(new_generation_2, columns=self.feature_names)
            population = pd.concat([new_generation_1, new_generation_2])
            
            iterations += 1
        
        return population, fitness, iterations
        
        
        

In [6]:
load = datasets.load_breast_cancer(as_frame=True)
data = load['data']
target = load['target']
model = LogisticRegression(solver='liblinear')
model = model.fit(data, target)
continuous = data.select_dtypes(include = 'number').columns
query = data.iloc[:1,:]

In [46]:
continuous = data.select_dtypes(include = 'number').columns
query = data.iloc[:1,:]
g = Geco(model.predict, model.predict_proba, data, continuous = continuous)
test = g.get_counterfactuals(query, features_to_vary=continuous, n_cfs=10)