In [None]:
import joblib
from sklearn.tree import DecisionTreeClassifier
import sklearn
import sys
import numpy as np
import pandas as pd
import math
import random

In [None]:
# print(joblib.__version__)
# print(sklearn.__version__)
# print(sys.version)

In [None]:
# Load the trained model from the file
clf = joblib.load('decision_tree_model.pkl')
X = joblib.load('X_dummies.joblib')
y = joblib.load('y.joblib')

# Access the tree structure
tree = clf.tree_

# Define the class label for red (class 0) and blue (class 1) leaves
red_leaf_class = 0
blue_leaf_class = 1

In [None]:
leaf_stop = 15 # Number of blue leaves to generate counterfactuals for
num_orig_pts = 200 # Number of points from the dataset to generate counterfactuals for

In [None]:
def get_path(x): # Get the decision path of data point
    path = clf.decision_path([x]).toarray()
    return path[0].tolist()

def get_leaf(path): # Get leaf of path 
    for i in range(len(path)-1, -1, -1):
        if path[i] == 1:
            return i

def get_node_class(tree, node): # Get the class of a node
    value = tree.value[node]
    return np.argmax(value)

def get_parent(tree, node):
    if node == 0: # Root
        return None
    
    children_left = tree.children_left
    children_right = tree.children_right
    
    # Check if the node is a left child
    parent = np.where(children_left == node)[0]
    if parent.size == 0:  # If it's not a left child, check if it's a right child
        parent = np.where(children_right == node)[0]
    
    return parent[0]

def get_sibling(tree, node):
    parent = get_parent(tree, node)
    left_child = tree.children_left[parent]
    right_child = tree.children_right[parent]
    return left_child if node==right_child else right_child

def is_leaf(tree, node): # Test whether node is a leaf
    return tree.children_left[node] == tree.children_right[node]

def construct_path(tree, leaf): # Construct path from leaf
    path = [0] * tree.node_count
    path[leaf] = 1
    path[0] = 1

    curr_node = leaf
    while curr_node > 0:
        parent = get_parent(tree, curr_node)
        path[parent] = 1
        curr_node = parent
    
    return path

def is_blue_leaf_point(tree, x): # Verify counterfactual is blue leaf
    path = get_path(x)
    leaf = get_leaf(path)
    return get_node_class(tree, leaf)==1

In [None]:
blue_leaf_nodes = [i for i in range(tree.node_count) if tree.children_left[i] == -1 and tree.children_right[i] == -1 and get_node_class(tree, i)==1]
random.shuffle(blue_leaf_nodes)

print(blue_leaf_nodes)
num_blue_leaves = len(blue_leaf_nodes)
print(num_blue_leaves)

In [None]:
def get_divergence_index(path1, path2):
    # Initialize a variable to store the last index of 1 before a difference
    divergence_index = -1
    
    # Iterate over the lists
    for i in range(min(len(path1), len(path2))):
        if path1[i] == path2[i]:
            # If both are 1, update the last_one_index
            if path1[i] == 1:
                divergence_index = i
        else:
            # Stop when the lists differ
            break
    
    return divergence_index

# Identify conditions to change from the divergence point to the blue leaf
ft_names = {i: col for i, col in enumerate(X.columns)}

def get_conditions(tree, path):
    conditions = []
    
    for node, traversed in enumerate(path):
        if traversed == 1 and not is_leaf(tree, node):
            feature = tree.feature[node]
            threshold = tree.threshold[node]
            
            # Determine if condition needs to be True or False
            if path[tree.children_left[node]] == 1:
                conditions.append((ft_names[feature], threshold, True))  # True for going left
            else:
                conditions.append((ft_names[feature], threshold, False))  # False for going right
    
    return conditions

In [None]:
cat_rank = {'employment_type': ['employment_type_self', 'employment_type_private', 'employment_type_govt'],
            
            'education_type': ['education_type_less_than_high_school', 'education_type_high_school', 
            'education_type_associate', 'education_type_bachelors', 'education_type_advanced']
            }

def mutate_cat(cat, threshold, direction, x0, x):
    # last_underscore = cat.find('type')+4
    feature = cat[:cat.find('type')+4]
    features = x.index.tolist()
    cats = [f for f in features if f.__contains__(feature)]
    not_cats = [c for c in cats if c!=cat]

    # Categorical features
    if direction:
        if x0[cat] > threshold:
            x[cat] = False
            cur_rank = cat_rank[feature].index(cat)
            prev_rank = cur_rank-1
            if cur_rank>0:
                x[cat_rank[feature][prev_rank]] = True
        
    elif not direction:
        if x0[cat] <= threshold:
            x[cat] = True
            for c in not_cats:
                x[c] = False
    
    return x

In [None]:
# Produce nth proximal point
def mutate_prox(tree, x0, blue_leaf, cf_index):
    red_path = get_path(x0)
    red_leaf = get_leaf(red_path)
    assert get_node_class(tree, red_leaf)==0
    red_path_conditions = get_conditions(tree, red_path)
    
    # blue_leaf = find_nth_nearest_blue_leaf(tree, red_leaf, n)
    assert get_node_class(tree, blue_leaf)==1
    blue_path = construct_path(tree, blue_leaf)
    blue_path_conditions = get_conditions(tree, blue_path)
    counterfactual_conditions = list(set(blue_path_conditions).difference(set(red_path_conditions)))

    # Create deep copies of the original data point for both strategies
    x = x0.copy()
    x_round = x0.copy()
    
    # Apply mutations based on conditions
    for (feature, threshold, direction) in counterfactual_conditions:
        if feature in x0:
            
            # Continuous features
            if feature in ['credit_score', 'income', 'amount_requested']:
                
                # Define the rounding rules for strategy 2
                rounding_rules = {
                    'credit_score': 10,
                    'income': 500,
                    'amount_requested': 500
                }

                if direction:
                    # Round to the nearest 10 or 500 for the specific features
                    if x0[feature] > threshold:
                        rounded_value = threshold // rounding_rules[feature] * rounding_rules[feature]
                        x_round[feature] = rounded_value
                
                elif not direction:
                    # Round to the nearest 50, 1000, or 500 for the specific features
                    if x0[feature] <= threshold:
                        rounded_value = math.ceil(threshold / rounding_rules[feature]) * rounding_rules[feature]
                        x_round[feature] = rounded_value

                if direction:
                    # Decrease the feature's value by the minimum amount to exceed the threshold
                    if x0[feature] > threshold:
                        if feature == 'credit_score':
                            x[feature] = round(threshold)-1
                        else:
                            x[feature] = threshold - 0.01
                
                elif not direction:
                    # Increase the feature's value by the minimum amount to exceed the threshold
                    if x0[feature] <= threshold:
                        if feature == 'credit_score':
                            x[feature] = round(threshold)
                        else:
                            x[feature] = threshold + 0.01
            
            # Categorical features
            else:
                x = mutate_cat(feature, threshold, direction, x0, x)
                x_round = mutate_cat(feature, threshold, direction, x0, x_round)
    
    if not is_blue_leaf_point(tree, x):
        x[:] = np.nan

    if not is_blue_leaf_point(tree, x_round):
        x_round[:] = np.nan

    return x.add_prefix(f'prox{cf_index}_'), x_round.add_prefix(f'round_prox{cf_index}_'), counterfactual_conditions

In [None]:
def get_cfs(tree, x):
    counterfactuals = []
    cf_index=0
    for blue_leaf in blue_leaf_nodes[:leaf_stop]:
        cf_index+=1
        counterfactuals.append(mutate_prox(tree, x, blue_leaf, cf_index))

    return counterfactuals

In [None]:
X.shape

In [None]:
rows = []
cont = 0
for i in range(X.shape[0]):
    
    x = X.iloc[i]
    path = get_path(x)
    leaf = get_leaf(path)
    leaf_class = get_node_class(tree, leaf)
    
    if leaf_class==1:
        cont+=1
        continue

    new_x = get_cfs(tree, x)
    cfs_list = [x.add_prefix('orig_')]
    conds_dict = dict()

    for j in range(leaf_stop):
        cfs_list.append(new_x[j][0]) # prox
        cfs_list.append(new_x[j][1]) # rounded
        conds_dict[f'prox{j+1}_conditions'] = new_x[j][2] # conditions
    
    cfs = pd.concat(cfs_list)
    conds = pd.Series(conds_dict)
    row = pd.concat([cfs, conds])
    rows.append(row)

    if i>num_orig_pts:
        break

cf_df = pd.DataFrame(rows)

In [None]:
print('Total number of original points: ', i)
print('Number of blue leaves with counterfactuals generated:', i-cont)

In [None]:
# Prefixes and feature names
prefixes = [[f"prox{i+1}_", f"round_prox{i+1}_"] for i in range(leaf_stop)]
prefixes = [item for sublist in prefixes for item in sublist]

education_features = [
    "education_type_advanced", "education_type_associate",
    "education_type_bachelors", "education_type_high_school", "education_type_less_than_high_school"
]
employment_features = [
    "employment_type_govt", "employment_type_private", "employment_type_self"
]

# Initialize counters
total_education_false_count = 0
total_employment_false_count = 0

# Iterate over each prefix and count instances with all False
for prefix in prefixes:
    # Create boolean series where all education features are False for this prefix
    education_all_false = (cf_df[[prefix + feature for feature in education_features]] == False).all(axis=1)
    
    # Create boolean series where all employment features are False for this prefix
    employment_all_false = (cf_df[[prefix + feature for feature in employment_features]] == False).all(axis=1)
    
    # Count rows where each condition is met for this prefix
    total_education_false_count += education_all_false.sum()
    total_employment_false_count += employment_all_false.sum()

In [None]:
# Output the counts
print("Total instances where all education types are False:", total_education_false_count)
print("Total instances where all employment types are False:", total_employment_false_count)
print(f'Proportion of all education types False: {total_education_false_count/(cf_df.shape[0]*8)}')
print(f'Proportion of all employment types False: {total_education_false_count/(cf_df.shape[0]*8)}')

# Count total NaN values in the DataFrame
nan_count = cf_df.isna().sum().sum()
print(f"Number of NaN values: {nan_count}")
print(f'Proportion of NaN values: {nan_count/cf_df.size}')

In [None]:
cf_df.to_csv('cfs.csv')