In [1]:
import numpy as np
from collections import Counter
from sklearn.datasets import load_iris
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score
from aif360.sklearn.datasets import fetch_adult
from sklearn import tree

pip install 'aif360[LawSchoolGPA]'


In [2]:
# class Node:
#     '''
#     Helper class which implements a single tree node.
#     '''
#     def __init__(self, feature=None, threshold=None, data_left=None, data_right=None, gain=None, value=None):
#         self.feature = feature
#         self.threshold = threshold
#         self.data_left = data_left
#         self.data_right = data_right
#         self.gain = gain
#         self.value = value


class Node:
    def __init__(self, leaf=None, feature=None, threshold=None, greater_node=None, less_node=None, gain=None, value=None):
        self.leaf = leaf
        self.feature = feature
        self.threshold = threshold
        self.greater_node = greater_node
        self.less_node = less_node
        self.data = []
        self.height = None
        self.gain = gain
        self.value = value
    
    def add_greater_node(self, greater_node):
        self.greater_node = greater_node

    def add_less_node(self, less_node):
        self.less_node = less_node
    
    def is_leaf(self):
        return self.leaf != None

    def __repr__(self):
        if self.is_leaf():
            return "{class="+str(self.leaf)+"}"
        return "{<="+str(self.threshold)+":"+str(self.less_node)+",>"+str(self.threshold)+":"+str(self.greater_node)+"}"
    
    def __str__(self):
        if self.is_leaf():
            return "{class="+str(self.leaf)+"}"
        return "{<="+str(self.threshold)+":"+str(self.less_node)+",>"+str(self.threshold)+":"+str(self.greater_node)+"}"
    
    def export_json(self):
        if self.is_leaf():
            return {"class":self.leaf}
        return {str(self.feature)+"<="+str(self.threshold):self.less_node.export_json(), str(self.feature)+">"+str(self.threshold):self.greater_node.export_json()}
    
    def add_data(self, data):
        if not self.is_leaf():
            if data[self.feature] <= self.threshold:
                leaf = self.less_node.add_data(data)
            else:
                leaf = self.greater_node.add_data(data)
            data = np.append(data, leaf)
            self.data.append(data)
            return leaf
        else:
            data = np.append(data, self.leaf)
            self.data.append(data)
            return self.leaf

    
    def get_leafs(self):
        if self.is_leaf():
            return [self]
        leafs = []
        leafs.extend(self.less_node.get_leafs())
        leafs.extend(self.greater_node.get_leafs())
        return leafs

    def get_accuracy(self, index=None, value=None):
        if self.is_leaf():
            correct = 0
            total = 0
            for data in self.data:
                if index==None:
                    correct+=data[-1] == self.leaf
                    total+=1
                elif data[index] == value:
                    correct+=data[-1] == self.leaf
                    total+=1
            if total == 0:
                print('no matches for index:',index,'value:',value)
                return -1
            return correct/total
        
        print("not a leaf")
        return -1

    def get_prediction(self, data):
        if self.is_leaf():
            return self.leaf
        else:
            if data[self.feature] <= self.threshold:
                return self.less_node.get_prediction(data)
            else:
                return self.greater_node.get_prediction(data)
    
    def relabel(self):
        if self.is_leaf():
            self.leaf = [1.0,0.0][round(self.leaf)]
        else:
            print("not a leaf")

    def flip_nodes(self):
        if self.is_leaf():
            print("is leaf")
        else:
            temp = self.greater_node
            self.greater_node = self.less_node
            self.less_node = temp
            self.reset_data()

    def get_height(self):
        if self.height != None:
            return self.height
        elif self.is_leaf():
            return 0
        else:
            return max(self.less_node.get_height(), self.greater_node.get_height()) + 1

    def get_layer(self, height):
        if self.get_height() == height:
            return [self]
        elif self.get_height() < height:
            return []
        else:
            temp = self.less_node.get_layer(height)
            temp.extend(self.greater_node.get_layer(height))
            return temp

    def reset_data(self):
        if self.is_leaf():
            self.data = []
        else:
            self.less_node.reset_data()
            self.greater_node.reset_data()

    def copy(self):
        if self.is_leaf():
            copy = Node(self.leaf, self.feature, self.threshold)
            return copy
        else:
            copy = Node(self.leaf, self.feature, self.threshold)
            if self.less_node != None:
                copy.add_greater_node(self.greater_node.copy())
            if self.greater_node != None:
                copy.add_less_node(self.less_node.copy())
            return copy

    def relabel_acc(self):
        if self.is_leaf():
            if self.data != []:
                self.leaf = round(np.average(np.array(self.data)[:,-1]))
        else:
            if self.less_node != None:
                self.less_node.relabel_acc()
            if self.greater_node != None:
                self.greater_node.relabel_acc()

    def discrimination(self, discriminatory_index):
        return discrimination(self.data, discriminatory_index)

    def accuracy(self):
        return accuracy(self.data)

    def get_bad_nodes(self, comp, discriminatory_index):
        if comp == None:
            comp = self.discrimination(discriminatory_index)
        if (self.less_node.is_leaf() and self.discrimination(discriminatory_index) < comp) or (not self.less_node.is_leaf() and self.less_node.discrimination(discriminatory_index) < comp):
            if (self.greater_node.is_leaf() and self.discrimination(discriminatory_index) < comp) or (not self.greater_node.is_leaf() and self.greater_node.discrimination(discriminatory_index) < comp):
                # RETRAIN sub tree
                print("retrain node:\n"+str(self))
                print("discrimination:", self.discrimination(discriminatory_index))
                return
        if not self.less_node.is_leaf():
            self.less_node.get_bad_nodes(comp, discriminatory_index)
        if not self.greater_node.is_leaf():
            self.greater_node.get_bad_nodes(comp, discriminatory_index)
        

        



In [3]:
class DecisionTreeFair:
    '''
    Class which implements a decision tree classifier algorithm.
    '''
    def __init__(self, min_samples_split=2, max_depth=5):
        self.min_samples_split = min_samples_split
        self.max_depth = max_depth
        self.root = None
        
    @staticmethod
    def _entropy(s):
        '''
        Helper function, calculates entropy from an array of integer values.
        
        :param s: list
        :return: float, entropy value
        '''
        # Convert to integers to avoid runtime errors
        counts = np.bincount(np.array(s, dtype=np.int64))
        # Probabilities of each class label
        percentages = counts / len(s)

        # Caclulate entropy
        entropy = 0
        for pct in percentages:
            if pct > 0:
                entropy += pct * np.log2(pct)
        return -entropy
    
    def _statisticalParity(self, left_child, right_child, left_pred, right_pred, protectedIndex):
        left_child = np.insert(left_child, -1, left_pred, axis=1)
        right_child = np.insert(right_child, -1, right_pred, axis=1)
        data = np.append(left_child, right_child, axis=0)
        protectedClass = data[np.where(data[:,protectedIndex]==0)]
        elseClass = data[np.where(data[:,protectedIndex]!=0)]
    
        if len(protectedClass) == 0 or len(elseClass) == 0:
            # print("protectedClass or elseClass is empty")
            return -1
        else:
            protectedProb = np.count_nonzero(protectedClass[:,-1]) / len(protectedClass)
            elseProb = np.count_nonzero(elseClass[:,-1]) / len(elseClass)
    
        statParity = protectedProb-elseProb

        protectedClass = protectedClass[np.where(protectedClass[:,-1]==protectedClass[:,-2])]
        elseClass = elseClass[np.where(elseClass[:,-1]==elseClass[:,-2])]
    
        if len(protectedClass) == 0 or len(elseClass) == 0:
            # print("protectedClass or elseClass is empty")
            return -1
        else:
            protectedProb = np.count_nonzero(protectedClass[:,-1]) / len(protectedClass)
            elseProb = np.count_nonzero(elseClass[:,-1]) / len(elseClass)

        equalOdds = protectedProb-elseProb
        # if left_pred!=right_pred:
        #     print('disc', protectedClass[:,[protectedIndex, -2, -1]], elseClass[:,[protectedIndex, -2, -1]])

        return (equalOdds + statParity)/2

    def _information_gain(self, parent, left_child, right_child):
        '''
        Helper function, calculates information gain from a parent and two child nodes.
        
        :param parent: list, the parent node
        :param left_child: list, left child of a parent
        :param right_child: list, right child of a parent
        :return: float, information gain
        '''
        num_left = len(left_child) / len(parent)
        num_right = len(right_child) / len(parent)
        
        # One-liner which implements the previously discussed formula
        return self._entropy(parent) - (num_left * self._entropy(left_child) + num_right * self._entropy(right_child))
    
    def _best_split(self, X, y, protected_index):
        '''
        Helper function, calculates the best split for given features and target
        
        :param X: np.array, features
        :param y: np.array or list, target
        :return: dict
        '''
        best_split = {}
        best_gain = -float('inf')
        best_info_gain = 0
        best_discrimination = 0
        n_rows, n_cols = X.shape
        
        # For every dataset feature
        for f_idx in range(n_cols):
            X_curr = X[:, f_idx]
            # For every unique value of that feature
            for threshold in np.unique(X_curr):
                # Construct a dataset and split it to the left and right parts
                # Left part includes records lower or equal to the threshold
                # Right part includes records higher than the threshold
                df = np.concatenate((X, y.reshape(1, -1).T), axis=1)
                df_left = np.array([row for row in df if row[f_idx] <= threshold])
                df_right = np.array([row for row in df if row[f_idx] > threshold])

                # Do the calculation only if there's data in both subsets
                if len(df_left) > 0 and len(df_right) > 0:
                    # Obtain the value of the target variable for subsets
                    y = df[:, -1]
                    y_left = df_left[:, -1]
                    y_right = df_right[:, -1]

                    pred_left = np.average(y_left) > 0.5
                    pred_right = np.average(y_right) > 0.5


                    sensitivity = df[:, protected_index]
                    sensitivity_left = df_left[:, protected_index]
                    sensitivity_right = df_right[:, protected_index]

                    # Caclulate the information gain and save the split parameters
                    # if the current split if better then the previous best
                    info_gain = self._information_gain(y, y_left, y_right)
                    # sensitivity_gain = self._information_gain(sensitivity, sensitivity_left, sensitivity_right)
                    discrimination = self._statisticalParity(df_left, df_right, pred_left, pred_right, protected_index)
                    gain = (info_gain + discrimination + 1)/2
                    # print(info_gain, discrimination, gain)
                    if gain > best_gain:
                        best_split = {
                            'feature_index': f_idx,
                            'threshold': threshold,
                            'df_left': df_left,
                            'df_right': df_right,
                            'gain': gain
                        }
                        best_gain = gain
                        best_info_gain = info_gain
                        best_discrimination = discrimination
                        # print('temp:',best_split['feature_index'],int(best_split['threshold']),'\t', best_gain, best_info_gain, best_discrimination, pred_left==pred_right)
        # print(best_split['feature_index'],int(best_split['threshold']),'\t', best_gain, best_info_gain, best_discrimination)
        return best_split
    
    def _build(self, X, y, sensitive_index, depth=0):
        '''
        Helper recursive function, used to build a decision tree from the input data.
        
        :param X: np.array, features
        :param y: np.array or list, target
        :param depth: current depth of a tree, used as a stopping criteria
        :return: Node
        '''
        n_rows, n_cols = X.shape
        
        # Check to see if a node should be leaf node
        if n_rows >= self.min_samples_split and depth <= self.max_depth:
            # Get the best split
            best = self._best_split(X, y, sensitive_index)
            # If the split isn't pure
            if best['gain'] > 0:
                # Build a tree on the left
                left = self._build(
                    X=best['df_left'][:, :-1], 
                    y=best['df_left'][:, -1], 
                    sensitive_index=sensitive_index,
                    depth=depth + 1
                )
                right = self._build(
                    X=best['df_right'][:, :-1], 
                    y=best['df_right'][:, -1], 
                    sensitive_index=sensitive_index,
                    depth=depth + 1
                )
                return Node(
                    feature=best['feature_index'], 
                    threshold=best['threshold'], 
                    less_node=left, 
                    greater_node=right, 
                    gain=best['gain']
                )
        # Leaf node - value is the most common target value 
        return Node(
            value=Counter(y).most_common(1)[0][0]
        )
    
    def fit(self, X, y, sensitive_index):
        '''
        Function used to train a decision tree classifier model.
        
        :param X: np.array, features
        :param y: np.array or list, target
        :return: None
        '''
        # Call a recursive function to build the tree
        self.root = self._build(X, y, sensitive_index)
        
    def _predict(self, x, tree):
        '''
        Helper recursive function, used to predict a single instance (tree traversal).
        
        :param x: single observation
        :param tree: built tree
        :return: float, predicted class
        '''
        # Leaf node
        if tree.value != None:
            return tree.value
        feature_value = x[tree.feature]
        
        # Go to the left
        if feature_value <= tree.threshold:
            return self._predict(x=x, tree=tree.less_node)
        
        # Go to the right
        if feature_value > tree.threshold:
            return self._predict(x=x, tree=tree.greater_node)
        
    def predict(self, X):
        '''
        Function used to classify new instances.
        
        :param X: np.array, features
        :return: np.array, predicted classes
        '''
        # Call the _predict() function for every observation
        return [self._predict(x, self.root) for x in X]

In [4]:
def avg_statParity_equalOdds(data, protectedIndex):
    if type(data) == list:
        data = np.array(data)
    protectedClass = data[np.where(data[:,protectedIndex]==0)]
    elseClass = data[np.where(data[:,protectedIndex]!=0)]
 
    if len(protectedClass) == 0 or len(elseClass) == 0:
        print("protectedClass or elseClass is empty")
        return 0
    else:
        protectedProb = np.count_nonzero(protectedClass[:,-1]) / len(protectedClass)
        elseProb = np.count_nonzero(elseClass[:,-1]) / len(elseClass)
 
    statParity = protectedProb-elseProb

    protectedClass = protectedClass[np.where(protectedClass[:,-1]==protectedClass[:,-2])]
    elseClass = elseClass[np.where(elseClass[:,-1]==elseClass[:,-2])]
 
    if len(protectedClass) == 0 or len(elseClass) == 0:
        print("protectedClass or elseClass is empty")
        return 0
    else:
        protectedProb = np.count_nonzero(protectedClass[:,-1]) / len(protectedClass)
        elseProb = np.count_nonzero(elseClass[:,-1]) / len(elseClass)

    equalOdds = protectedProb-elseProb

    return (equalOdds + statParity)/2

In [5]:
adult = fetch_adult(numeric_only=True)
X = adult.X.values
y = adult.y.values
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3, random_state=42)

In [6]:
fair_model = DecisionTreeFair(max_depth=4)
fair_model.fit(X_train, y_train, 3)
fair_preds = fair_model.predict(X_test)

In [7]:
print(accuracy_score(y_test, fair_preds))
xyz_fair = np.append(X_test, y_test[:,None], axis=1)
xyz_fair = np.append(xyz_fair, np.array(fair_preds)[:,None], axis=1)
print(avg_statParity_equalOdds(xyz_fair, 3))

0.8109602129256808
-0.04143384402984229


In [8]:
model = DecisionTreeScratch(max_depth=4)
model.fit(X_train, y_train)
preds = model.predict(X_test)

NameError: name 'DecisionTreeScratch' is not defined

In [None]:
print(accuracy_score(y_test, preds))
xyz = np.append(X_test, y_test[:,None], axis=1)
xyz = np.append(xyz, np.array(preds)[:,None], axis=1)
print(avg_statParity_equalOdds(xyz, 3))

0.8242680679724289
-0.20955129719890703


In [None]:
clf = tree.DecisionTreeClassifier(max_depth=4)
clf = clf.fit(X_train, y_train)
sklearn_preds = clf.predict(X_test)
print(accuracy_score(y_test, sklearn_preds))
sklearn_xyz = np.append(X_test, y_test[:,None], axis=1)
sklearn_xyz = np.append(sklearn_xyz, np.array(sklearn_preds)[:,None], axis=1)
print(avg_statParity_equalOdds(sklearn_xyz, 3))

0.8289087558861666
-0.20525036901417382
