In [None]:
#import modules
import numpy as np
from sklearn.tree import DecisionTreeClassifier
from sklearn.tree import export_text
import pandas as pd

In [None]:
f=pd.read_csv('compas-scores-two-years.csv')
f1=f.loc[f['race'].isin(['African-American','Caucasian'])]
f1.loc[f1['race'] == 'Caucasian', 'race'] = 0
f1.loc[f1['race'] == 'African-American', 'race'] = 1
f1.loc[f1['sex'] == 'Male', 'sex'] = 1
f1.loc[f1['sex'] == 'Female', 'sex'] = 0
f1=f1[['sex','age','race','juv_fel_count','priors_count.1','two_year_recid','score_text']]
f1.loc[f1['score_text']=='Low', 'score_text'] = 0
f1.loc[f1['score_text'] != 0, 'score_text'] = 1
tr=f1[['sex','age','race','juv_fel_count','priors_count.1','two_year_recid']]
tar=f1['score_text'].values
tar=tar.astype('int')
names=list(tr.columns)
tr=tr.values.astype(int)

In [None]:
#train decision tree qnd export text
decision_tree = DecisionTreeClassifier(random_state=0, max_depth=60)
decision_tree = decision_tree.fit(tr, tar)
r = export_text(decision_tree, feature_names=names,max_depth=1000)

In [None]:
#turns exported text to list of rules
def make_positive_polynomial_and_dicts(text_rules, names=names):
    r=text_rules
    dt=[]
    rules = r.splitlines()
    currentrule=[]
    for rule in rules:
        depth = rule.count("|")
        rule=rule.split("--- ",1)[1]
        if (depth<len(currentrule)):

            dt.append(currentrule)
            currentrule=currentrule[:depth-1]
            currentrule.append(rule)
            continue
        currentrule.append(rule)
    dt.append(currentrule)
    
    #turns dranching rules to binary variables, introducing one variable per rule
    dict={}
    i=1
    for rule in dt:
        for feat in rule:
            if ((">" in feat) or ("class" in feat)):
                continue
            if (feat not in dict.values()):
                    dict["x"+str(i)]=feat
                    i=i+1
    dict1=dict
    dict={v: k for k, v in dict.items()}
    
    #rewrites decision tree internal rules in terms of the new binary variables
    boolrules=[]
    temp=[]
    for rule in dt:
        for feat in rule:
            if ">" in feat:
                terms=feat.split(">",1)
                val=[value for key, value in dict.items() if terms[0].strip() in key and terms[1].strip() == key.split(' ')[-1]][0]
                temp.append(str(val)+"=0")
            elif "<=" in feat:
                val=dict[feat]
                temp.append(str(val)+"=1")
            else:
                temp.append(feat)
        boolrules.append(temp)
        temp=[]
        
        #construct positive and negative polynomials (as rules)
    positiverules=[]
    negrules=[]
    for rule in boolrules:
        if (rule[-1]=="class: 1"):
            positiverules.append(rule[:-1])
        else:
            negrules.append(rule[:-1])
            
    #construct dictionary that maps the original variables to the features involving them, as well as the binary variables they correspond to
    featurenames=names
    d= dict.items()
    newdict={}
    for feat in featurenames:
        keys = [(float(key.split(" ")[-1]),val) for key,val in d if feat in key]
        newdict[feat]= keys
        
    return positiverules, dict, dict1, newdict

In [None]:
#import solver
from ortools.linear_solver import pywraplp

In [None]:
#create the actual binary variables
def make_variables(dict, solver):
    variables=[]
    for var in dict.values():
        variables.append(solver.IntVar(0.0, 1.0, var))
    return variables

In [None]:
#cat_names is a list of lists with the new categorical names, in the form X_1, X_2...
def add_categorical_constraints(variables, cat_names, dict1, solver):

    for cat_vars in cat_names:
        sum=0
        for var in cat_vars:
            bool_var=[key for key,val in dict1.items() if var in val][0]
            index = int(''.join(filter(str.isdigit, bool_var))) -1
            sum=sum + variables[index]
        solver.Add(sum==len(cat_vars)-1)

In [None]:
#construct the left hand side of the constraints, assuming we use the positive polynomial
def add_constraints(positiverules,variables, counter_outcome, solver):
    constrs=[]
    num=0
    for rule in positiverules:
        sum=0
        for feat in rule:
            if (feat[-1]=="0"):
                sum=sum + 1-variables[int(''.join(filter(str.isdigit, feat.split('=',1)[0])))-1] 
            else:                                     
                sum=sum + variables[int(''.join(filter(str.isdigit, feat.split('=',1)[0])))-1]  
            num=num+1                                 
        constrs.append((sum,num))
        num=0
        
    if (counter_outcome==0):
        #add constraints to enforce negative outcome, using the positive polynomial (so all positive rules must be zero)
        for const in constrs:
            solver.Add(const[0] <= const[1]-1)
    elif(counter_outcome==1):
        #alternatively, these constraints enforce positive outcome (having the positive polynomial equal 1)
        sum=0
        for const in constrs:
            delta=solver.IntVar(0.0, 1.0, 'delta')
            solver.Add(const[0] >= delta*const[1])
            sum=sum+delta
        solver.Add(sum==1)

In [None]:
#diversity constraints
#the 3 first are list of tuples: (feat, _constraint value)
#must have initialized the optimization problem and all dicts and remaining constraints. This is the last step before solving!
def diverse_counterfactual_constraints(eq_constrs, leq_constrs, gr_constrs,solver, newdict, variables):
    for constr in eq_constrs:
        feat, constr_val = constr
        feats = newdict[feat]
        for val, var in feats:
            index= int(''.join(filter(str.isdigit, var))) -1
            if (constr_val<= val):
                solver.Add(variables[index] ==1)
            else:
                solver.Add(variables[index] ==0)
                
    for constr in leq_constrs:
        feat, constr_val = constr
        feats = newdict[feat]
        for val, var in feats:
            index= int(''.join(filter(str.isdigit, var))) -1
            if (constr_val<= val):
                solver.Add(variables[index] ==1)
                
    for constr in gr_constrs:
        feat, constr_val = constr
        feats = newdict[feat]
        for val, var in feats:
            index= int(''.join(filter(str.isdigit, var))) -1
            if (constr_val > val):
                solver.Add(variables[index] ==0)

In [None]:
#this computes the transformed data point

def make_actual_datapoint(datapoint, dict, newdict, featurenames):
    new_datapoint=np.zeros(len(dict))
    new_datapoint_indices=np.zeros(len(dict))
    sum=0 
    i=0
    j=0
    for feat in featurenames:
        current_tree_features=newdict[feat]
        for rules in current_tree_features:
            index= int(''.join(filter(str.isdigit, rules[1]))) -1
            new_datapoint_indices[j]=index
            cond= rules[0]
            if (datapoint[i]<= cond):
                new_datapoint[j]=1
            else:
                new_datapoint[j]=0
            j=j+1
        i=i+1
    return new_datapoint, new_datapoint_indices

In [None]:
#this computes the standard deviation of binary features
def make_coefficients(tr, new_datapoint, featurenames, newdict):
    sum=0
    i=0
    binary=np.zeros(len(new_datapoint))
    j=0
    for feat in featurenames:
        current_tree_features=newdict[feat]
        for rules in current_tree_features:
            index= int(''.join(filter(str.isdigit, rules[1]))) -1
            cond= rules[0]
            masked_data= tr[:,i]   # dataset[:,i]
            n=len(masked_data)
            masked_data=masked_data[masked_data<=cond]
            p=len(masked_data)
            p=p/n
            variance=p*(1-p)
            binary[j]=np.sqrt(variance)
            j=j+1
        i=i+1
    return binary

In [None]:
#construct the objective function
def make_objective(actualdatapoint, coeff, variables, new_datapoint_indices, solver):
    obj=0
    index=0
    for feat in actualdatapoint:
        if feat==0:
            obj=obj+coeff[index]*variables[int(new_datapoint_indices[index])]
        else:
            obj=obj+coeff[index]*(1-variables[int(new_datapoint_indices[index])])
        index=index+1
    solver.Minimize(obj)

In [None]:
def written_counterfactual(variables, new_datapoint, new_datapoint_indices, dict1):
    solution=[]
    for var in variables:
            solution.append(var.solution_value())

    counter_changes=[]
    counter_indices=np.where(new_datapoint[np.argsort(np.array(list(map(int,new_datapoint_indices))))]!=solution)[0]

    for counter_index in counter_indices:
        if solution[counter_index]==1:
            counter_changes.append(dict1['x'+str(counter_index+1)])
        elif solution[counter_index]==0:
            counter_changes.append(dict1['x'+str(counter_index+1)].replace('<=','>'))

    ls= list(map(lambda x: x.split('<=')+['<='] if '<=' in x  else x.split('>')+['>'], counter_changes))
    from collections import defaultdict
    d = defaultdict(list)

    for k, *v in ls:
        d[k].append(v)

    bn=list(d.items())
    grouped=[]
    for p, r in bn:
        greater=[]
        lower=[]
        for k, v in r:
            if (v=='<='):
                lower.append(k)
            elif (v=='>'):
                greater.append(k)
        lower=['<=']+lower
        greater=['>']+greater
        grouped.append([p,lower,greater])
    written_counterfactuals=[]
    for feature,lower,greater in grouped:
        s=feature
        if(len(lower)>1):
            u_bound=min(map(float,lower[1:]))
            s=s+ '<= '+ str(u_bound)
        if(len(greater)>1):
            l_bound=max(map(float,greater[1:]))
            s= str(l_bound) + ' < ' +s
        written_counterfactuals.append(s)
    return written_counterfactuals

In [None]:
import random
random.seed(90)
K=5
indices = random.sample(range(len(tr)), K)
exp_data=[tr[i] for i in sorted(indices)]
exp_tar=[decision_tree.predict(exp_data[i].reshape(1, -1)) for i in range(len(exp_data))]

positiverules, feat_to_bool, bool_to_feat, var_to_feats_and_bools = make_positive_polynomial_and_dicts(r)


counterfactuals=[]

for i in range(K):
    datapoint = exp_data[i]
    counter_outcome = 0 if exp_tar[i] == 1 else 1
    
    #initialize solver
    solver = pywraplp.Solver.CreateSolver('SCIP')
    variables = make_variables(feat_to_bool, solver)
    
    add_constraints(positiverules,variables, counter_outcome, solver)
    new_datapoint, new_datapoint_indices = make_actual_datapoint(datapoint, feat_to_bool, var_to_feats_and_bools, names)
    coeff = make_coefficients(tr, new_datapoint, names, var_to_feats_and_bools)
    make_objective(new_datapoint, coeff, variables, new_datapoint_indices, solver)
    status = solver.Solve()
    cntr=written_counterfactual(variables, new_datapoint, new_datapoint_indices, bool_to_feat)
    counterfactuals.append(cntr)