 # Project 1: Implementing Decision Trees

In [1]:
# Imports
import numpy as np
import copy as copy
import timeit
from sklearn import tree as sk_tree
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score

## Impurity_calculator class

Contains all the methods used to calculate the impurity of a feature. The calculator object is given an array of the values of the feature it is calculating impurity for, and an array with the corresponding labels. It is also given an impurity measure, which can be 'entropy' or 'gini'.

In [2]:
class Impurity_calculator:
    def __init__(self, feature_column, y_values, impurity_measure):
        """
        Class used to calculate the impurity value of a feature. 
        This allows the algorithm to find the best splitting feature.
        
        :param feature_column: array of values for the feature we want to find the impurity value of
        :param y_values: array of labels corresponding to the feature values
        :param impurity_measure: either 'gini' or 'entropy'. Determines how impurity is measured by the calcuator.
        """
        self.feature_column = feature_column
        self.y = y_values
        self.impurity_measure = impurity_measure

    def prob_feat_relation_to_bound(self, feature_is_less_than):
        """
        P(feature_column[i] < / >= mean(feature_column))
        Calculates the probability of a feature being less than the mean,
        or the probability of it being greater than or equal to the mean.

        :param feature_is_less_than: True if we are looking for the probability that a feature
        value is less than the mean of the feature, False otherwise

        :return : probability of feature
        """
        counter = 0
        for data_point in self.feature_column:
            if feature_is_less_than and data_point < np.mean(self.feature_column):
                    counter += 1
            elif not(feature_is_less_than) and data_point >= np.mean(self.feature_column):
                    counter += 1
        return counter/(len(self.feature_column))

    def prob_true_given_conditional(self, feature_is_less_than):
        """
        Calculates the probability of a data point in the feature being less than / greater or equal to its mean.

        :param feature_less_than_bound: boolean which is True if the the relation of the feature to the 
        bound is "less than". False is it is "greater than or equal to"
        :return : proability of a data point in the feature being smaller / larger than its mean.
        """
        total_counter = 0
        true_counter = 0
        for i in range(len(self.feature_column)):
            if not(feature_is_less_than) and (self.feature_column[i] >= np.mean(self.feature_column)):
                total_counter +=1
                if (self.y[i] == 1):
                    true_counter = true_counter + 1
            elif feature_is_less_than and (self.feature_column[i] < np.mean(self.feature_column)):
                total_counter += 1
                if (self.y[i] == 1):
                    true_counter = true_counter + 1
        return true_counter / total_counter    

    def entpy_of_feat_in_relation_to_bound(self, feature_less_than_bound):
        """
        Calculates conditional entropy, so the entropy of a feature given that it is less / greater or equal to a bound (the mean).
        
        :param feature_less_than_bound: boolean which is True if the the relation of the feature to the 
        bound is "less than". False is it is "greater than or equal to"
        :return : etnropy of feature given that it is greater /less than or equal to a bound.
        """
        p_true = self.prob_true_given_conditional(feature_less_than_bound)
        if p_true == 0:
            pos_part = 0
        else:
            pos_part = -p_true* np.log2(p_true)
        p_false = 1-p_true
        if p_false == 0:
            neg_part = 0
        else:
            neg_part = -p_false*np.log2(p_false)
        h = pos_part + neg_part
        return h

    def gini_of_feat_in_relation_to_bound(self, feature_less_than_bound):
        """
        Calculated the conditional gini, ie the gini of a feature given that it is less / greater or equal to a bound (the mean).

        :param feature_column: array of values for the feature
        :param y: array of corresponding labels
        :param feature_less_than_bound: boolean which is True if the the relation of the feature to the 
        bound is "less than". False is it is "greater than or equal to"
        :return : gini of a feature given that it is less / greater or equal to a bound
        """
        p_true = self.prob_true_given_conditional(feature_less_than_bound)
        p_false = 1-p_true
        return 1 - (p_true**2 + p_false**2)

    def gini_of_feature(self):
        """
        Calculates the gini index of the feature of this calculator.
        
        :return : gini index of the feature
        """
        neg = self.prob_feat_relation_to_bound(True) * self.gini_of_feat_in_relation_to_bound(True)
        pos = self.prob_feat_relation_to_bound(False) * self.gini_of_feat_in_relation_to_bound(False)
        return neg + pos

    def entpy_of_feature(self):
        """
        Calculates the entropy of a feature

        :return : entropy of feature
        """
        neg = self.prob_feat_relation_to_bound(True) * self.entpy_of_feat_in_relation_to_bound(True)
        pos = self.prob_feat_relation_to_bound(False) * self.entpy_of_feat_in_relation_to_bound(False)
        return neg + pos

    def impurity_of_feature(self):
        """
        Calculates the impurity of a feature, based on the impurity measure the calculator is given.

        :return : impurity measure of feature given calcuator object.
        """
        if self.impurity_measure == 'entropy':
            return self.entpy_of_feature()
        elif self.impurity_measure == 'gini':
            return self.gini_of_feature()
        else:
            print("Error: invalid impurity measure given:", self.impurity_measure)
            return    

## Node class

The node object is used by the DecisionTree class to create a tree in the form of a linked-list-type datastucture of Node object. Each node (apart from the root) has a parent in the form of a Node object, and may have two children in the form of new Node objects. If a Node has no children it is assigned a leaf value, which in this case is either a 0 or a 1 int.

Additionally, the index of the feature it is splitting on, and the data it is splitting is stored in the node. This makes recursive pruning possible.

In [3]:
class Node:
    def __init__(self, feature_index, name, X, y, left = None, right = None, leaf = None):
        """
        Represents a node used in a decision tree. Used by the DecisionTree class.
        
        :param feature_index: int representing the index of the column of the feature that this node is splitting on
        :param name: string representing the node. Used for printing.
        :param X: 2d array of feature values that are to be split by this node.
        :param y: array of labels corresponding to the feature values.
        :param left: left child of node
        :param right: right child of node
        :param leaf: int representing the leaf value of the node
        """
        self.feature_index = feature_index
        self.name = name
        self.left = left
        self.right = right
        self.leaf = leaf
        self.X = X #keep track of the X-values used for calculating entropy for later pruning
        self.y = y #used for pruning
        self.feature_mean = np.mean(X[:,self.feature_index])
        self.majority_label = np.bincount(y.astype(int)).argmax() # used for pruning
        
        
    def is_pure(self, y_values):
        """
        Checks if a node is pure, ie if all the training data at this point in the tree creation process 
        has the same y-values.
        
        :param y values: y values of the training data potentially use to expand the tree at this node
        :return : pure value, boolean representing if node is pure
        """
        if (all(y == 0 for y in y_values)):
            return 0, True
        elif (all(y == 1 for y in y_values)):
            return 1, True
        return 0, False
    
    def accuracy_majority_label(self, y_unseen):
        """
        Calculates the accuracy of using the majority label of the node to predict the y values in the pruning data.
        
        :param y_unseen: the y values used to test the accuracy of using the majority label to predict
        :return : accuracy of the majority label
        """
        corretly_labeled_count = 0
        for i in range (len(y_unseen)):
            if (y_unseen[i] == self.majority_label):
                corretly_labeled_count += 1
        return corretly_labeled_count/len(y_unseen)
    
    def print_tree_from_node(self):
        """
        Used for testing. Recursively prints a tree from the node this function was first called on.
        So, if this function is called on the root of a tree, the entire tree is printed.
        """
        if (self.name == "empty tree"):
            print(self.name)
            return
        if ((self.left is None) and (self.right is None)):
            print("⟨" +  str(self.leaf) + "⟩", end ="")
            return
        else:
            print(self.name, end = ": ")
        print('(', end = "")
        if (self.left is not None):
             self.left.print_tree_from_node()
        print(') (', end = "")
        if (self.right is not None):
            self.right.print_tree_from_node()
        print(' )', end = "")

## DecisionTree class

This class is initialized only with the training data it recieves. The *learn* method takes in *impurity_measure* and *prune_tree* as parameters, which determine how the decision tree is generate.

In [4]:
class DecisionTree:
    def __init__(self, X_train, y_train):
        self.X = X_train
        self.y = y_train
        self.total_number_of_features = len(self.X[0])
        self.root = Node(0, "empty tree", X_train, y_train)
    
    
    def learn(self, impurity_measure, prune_tree):
        """
        Calls on id3 with the appropriate parameters, given those it recieves. 
        If the tree should be pruned, training data is split into training and pruning data.
        
        :param impurity_measure: 'entropy' or 'gini'. Determines how the features are split.
        :param prune_tree: boolean which is True if the tree should be pruned, False otherwise
        """
        self.impurity_measure = impurity_measure
        self.prune_tree = prune_tree
        used_features = []
        if prune_tree:
            X_train, X_prune, y_train, y_prune = train_test_split(self.X, self.y,
                                                        test_size=0.2,
                                                        shuffle=True,
                                                        random_state=seed)
            self.root = self.id3(X_train, y_train, X_prune, y_prune, used_features)
        else:
            self.root = self.id3(self.X, self.y, [], [], used_features)

    
    
    def id3(self, X, y, X_prune, y_prune, used_features):
        """
        This method peforms a version of the id3 algorithm described in the project description, with some additions.
        
        :param X: feature values used to create the tree
        :param y: label values used to create the tree
        :param X_prune: feature values used to prune
        :param y_prune: label values used to prune 
        :param used_features: list of the indexes of features that have already been split on
        :return current_node: node that is a root of a fully built tree. This is in most cases a subtree.
        """
        current_node = Node(0, "unfinished node" ,X = X, y = y)
        label, pure = current_node.is_pure(y)
        if pure:
            current_node.leaf = label
            return current_node
        elif (self.all_feature_values_are_the_same(X)):
            current_node.leaf = current_node.majority_label
            return current_node
        elif (len(used_features) >= self.total_number_of_features): #no more features to split
            leaf_node = Node(0, "leaf", X = X, y = y) #initiate a leaf node
            leaf_node.leaf = leaf_node.majority_label
            return leaf_node
        else: #split node, if possible
            best_splitting_feature_index = self.get_best_splitting_feature_index(X, y, used_features)
            current_node = Node(best_splitting_feature_index, "X"+str(best_splitting_feature_index),X = X, y = y)
            neg_X, neg_y, pos_X, pos_y = self.split_data(current_node, X, y)
            data_is_unsplittable, end_node = self.check_for_unsplittable_data(neg_X, pos_X, current_node)
            if (data_is_unsplittable): return end_node #check if data was split properly
            left_child, right_child = self.generate_children(current_node, neg_X, neg_y, pos_X, pos_y, X_prune, y_prune, used_features)
            #if the generated children aren't identical leaves, assignm them to the current node
            if (left_child.leaf == None or (left_child.leaf != right_child.leaf)):
                current_node.left = left_child
                current_node.right = right_child
            elif (left_child.leaf != None and (left_child.leaf == right_child.leaf)): #if both children are identical leaves, node becomes a leaf
                current_node.leaf = self.most_common_label(y)
                current_node.name = 'leaf'
        if self.prune_tree:
            self.prune_from_node(current_node, X_prune, y_prune)
        return current_node
    
    def generate_children(self, parent_node, neg_X, neg_y, pos_X, pos_y, X_prune, y_prune, used_features):
        """
        Prepares data needed to create left and right child node, and returns the child nodes.
        
        :param parent_node: node that the children are generated from
        :param neg_X, neg_y: training data to be given to the left child
        :param pos_X, pos_y: training data to be given to the right child
        :param X_prune, y_prune: prune data to be split and passed on to children
        :param used_features: list of features that have been split on
        :return left_child, right_child: children of the parent_node
        """
        neg_X_prune, neg_y_prune, pos_X_prune, pos_y_prune = self.split_data(parent_node, X_prune, y_prune)
        #deepcopy so as to not affect the used_features list in other parts of the tree
        left_used_features = copy.deepcopy(used_features)
        right_used_features = copy.deepcopy(used_features)
        left_child = self.id3(neg_X, neg_y, neg_X_prune, neg_y_prune, left_used_features)
        right_child = self.id3(pos_X, pos_y, pos_X_prune, pos_y_prune, right_used_features)
        return left_child, right_child
    
    def check_for_unsplittable_data(self, neg_X, pos_X, current_node):
        """
        Checks if when splitting the data, all the data ended up on one side. Ie, the data was not split.
        Since we split by the mean, this should never happen.
        
        :param neg_X: data split to the left
        :param pos_X: data split to the right
        :param current_node: node we are splitting on
        :return : True if data is unsplittable, node to return if data is not splittable
        """
        if len(neg_X) == 0 or len(pos_X) == 0:
            current_node.leaf = self.most_common_label(y)
            current_node.name = 'leaf'
            print("Data cannot be split by the mean", current_node.feature_mean,", so node X", current_node.feature_index, " becomes leaf with label", current_node.leaf, "\n")
            print("ERROR: this should never happen, since if we split by the mean there is always data on each side of the split")
            return True, current_node
        return False, None
    
    def prune_from_node(self, current_node, X_prune, y_prune):
        """
        If the accuracy of the majority label on this node is better than the predictions of the subtree,
        the children of the node are removes, and the node becomes a leaf.
        
        :param current_node: node that might be pruned
        :param X_prune: feature values for pruning data
        :param y_prune: label values for pruning data
        """
        if current_node.leaf != None: # If current node already is a leaf, there is nothing to do.
            return
        if (current_node.accuracy_majority_label(y_prune) >= (self.accuracy_full_tree_from_node(X_prune, y_prune, current_node))):
            current_node.left = None
            current_node.right = None
            current_node.leaf = current_node.majority_label
            
    def split_data(self, current_node, X, y):
        """
        Splits data by whether the value of a certian feature of less than the mean of the feature, or not.
        
        :param current_node: node on which the data will be split
        :param X: feature data
        :param y: label data
        :return : "neg_X" and "neg_y" contain all rows where the feature in question was less than the mean,
        while "pos_X" and "pos_y" contain the rest of the rows.
        """
        if ((len(X) == 0) or (len(y) == 0)): #nothing to split
            return [], [], [], []
        feature_index = current_node.feature_index
        new_X = copy.deepcopy(X) # make deepcopy to avoid affecting the data of other nodes
        new_y = copy.deepcopy(y)
        merged_data = np.insert(new_X, len(new_X[0]), new_y, axis=1)
        # mean of the features is taken from the node
        #as it containes the subset of the total dataset from which the mean at this split should be calculated
        mean = current_node.feature_mean 
        neg_merged = merged_data[np.where(merged_data[:, feature_index] < mean)]
        pos_merged = merged_data[np.where(merged_data[:, feature_index] >= mean)]
        neg_X = neg_merged[:,:-1]
        neg_y = neg_merged[:, -1]
        pos_X = pos_merged[:,:-1]
        pos_y = pos_merged[:, -1]
        return neg_X, neg_y, pos_X, pos_y
        
            
    def get_best_splitting_feature_index(self, X, y, used_features):
        """
        Returns the index of the feature that has the highest information gain. Ie, lowest entropy or gini.
        
        :param X: feature data
        :param y: label data
        :param used_features: list of indexes that already have been split on.
        :return : index of feature with least impurity, aka highest information gain
        """
        min_impurity = None
        best_feature = None
        for i in range(len(X[0])):
            if(i in used_features): #if the feature already has been used to split, skip it
                continue
            feature_column = np.array(X)[:,i]
            calculator = Impurity_calculator(feature_column, y, self.impurity_measure)
            impurity_of_feature_with_index_i = calculator.impurity_of_feature()
            if ((min_impurity is None) or (min_impurity > impurity_of_feature_with_index_i)):
                    min_impurity = impurity_of_feature_with_index_i
                    best_feature = i    
        used_features.append(best_feature)
        return best_feature
    
    def most_common_label(self, Y):
        """
        Finds the most common label in the label-array. 
        Only takes the labels True and False (ie 1 and 0) into account.
        
        :param Y: array of y-values
        :return : most common value in y
        """
        true_count = 0
        false_count = 0
        for y in Y:
            if y == 0:
                false_count += 1
            else:
                true_count += 1
        if (false_count > true_count):
            return 0
        else:
            return 1
    
    def predict(self, new_data_point, current_node = None):
        """
        Predicts the label of a given new data point. 
        This is done recursively, passing the next node to be searched from in the parameter.
        If no current node is given, the default is to start searching from the root.
        
        :param new_data_point: array of feature values
        :param current node: node to search from.
        :return : the predicted y value of the given x-values
        """
        if (len(new_data_point) != len(self.X[0])):
            print("invalid data point to predict")
            return
        elif (current_node is None):
            current_node = self.root
        if (current_node.left == None): # the right child will be None too
            return current_node.leaf # this is a leaf node
        else:
            mean = current_node.feature_mean
            if (new_data_point[current_node.feature_index] < mean):
                return self.predict(new_data_point, current_node.left)
            else:
                return self.predict(new_data_point, current_node.right)
            
            
    def accuracy_full_tree_from_node(self, X_unseen, y_unseen, current_node):
        """
        Extracts all the elements in the column of the 2d array X

        :param X_unseen: new feature data to used to predict values
        :param y_unseen: new labels to compare predictions to
        :param current_node: node that it is being predicted from
        :return :
        """
        corretly_labeled_count = 0
        for i in range (len(y_unseen)):
            if (self.predict(X_unseen[i], current_node) == y_unseen[i]):
                corretly_labeled_count += 1
        return corretly_labeled_count/len(y_unseen)
    
    def all_feature_values_are_the_same(self, X):
        """
        Checks if all rows of feature values are the same.
        
        :param X: 2d array of feature values
        :return : True of all feature values are the same, False otherwise
        """
        if (len(X) == 1):
            return True
        for i in range(len(X)-1):
            if not(np.array_equal(X[i], X[i+1])):
                return False
        print("All features are the same")
        return True        

## Algorithm evaluation

We load in the dataset, and split it into training, valuation and testing sets.

Then we create our decision tree object, and try our the four different hyperparameter combinations of *entropy*/*gini* and *pruning*/*no pruning*. We evaluate each version with the valuation dataset, using the *accuracy_full_tree_from_node* method, and calling it with the root of the tree.

In [5]:
# Load dataset
data = np.genfromtxt('data_banknote_authentication.txt', delimiter = ',')
X_data = data[:,:-1]
y_data = data[:,-1]

#split data
seed = 123
X_train, X_val_test, y_train, y_val_test = train_test_split(X_data, y_data,
                                       test_size = 0.2,
                                       shuffle = True, 
                                       random_state = seed)

X_val, X_test, y_val, y_test = train_test_split(X_val_test, y_val_test,
                                                        test_size=0.5,
                                                        shuffle=True,
                                                        random_state=seed)

#Train tree with different combinations, and print accuracies
my_tree = DecisionTree(X_train, y_train)
my_tree.learn('entropy', False)
print("\n\nentropy + no prune")
my_tree.root.print_tree_from_node()
print("\nValidation Accuracy:", my_tree.accuracy_full_tree_from_node(X_val, y_val, my_tree.root))

my_tree.learn('gini', False)
print("\n\ngini + no prune")
my_tree.root.print_tree_from_node()
print("\nValidation Accuracy:", my_tree.accuracy_full_tree_from_node(X_val, y_val, my_tree.root))

my_tree.learn('entropy', True)
print("\n\nentropy + prune")
my_tree.root.print_tree_from_node()
print("\nValidation Accuracy:", my_tree.accuracy_full_tree_from_node(X_val, y_val, my_tree.root))

my_tree.learn('gini', True)
print("\n\ngini + prune")
my_tree.root.print_tree_from_node()
print("\nValidation Accuracy:", my_tree.accuracy_full_tree_from_node(X_val, y_val, my_tree.root))
print()



entropy + no prune
X0: (X1: (⟨1⟩) (X2: (⟨1⟩) (⟨0⟩ ) )) (X2: (X1: (X3: (⟨1⟩) (⟨0⟩ )) (⟨0⟩ )) (⟨0⟩ ) )
Validation Accuracy: 0.8394160583941606


gini + no prune
X0: (X1: (⟨1⟩) (X2: (⟨1⟩) (⟨0⟩ ) )) (X2: (X1: (X3: (⟨1⟩) (⟨0⟩ )) (⟨0⟩ )) (⟨0⟩ ) )
Validation Accuracy: 0.8394160583941606


entropy + prune
X0: (⟨1⟩) (X2: (X1: (X3: (⟨1⟩) (⟨0⟩ )) (⟨0⟩ )) (⟨0⟩ ) )
Validation Accuracy: 0.8467153284671532


gini + prune
X0: (⟨1⟩) (X2: (X1: (X3: (⟨1⟩) (⟨0⟩ )) (⟨0⟩ )) (⟨0⟩ ) )
Validation Accuracy: 0.8467153284671532



We see that there was no difference in performance when gini or entropy were used as impurity measure, as the two split the trees identically.
The addition of pruning only made a slight difference, improving the accurcy by less than a percent. Since the data sets contain a random sample, it can happen that the validation  data by chance favours some model over the other. So pruning does not always improve validation accuracy.

We pick "entropy + prune" as our selected model, though by these results we could equally well have picked "gini + prune".
If we wanted to make a more informed model selection, we could have done it with K-fold cross-validation.

Now we assess the final model on unseen testing data.

In [6]:
my_tree.learn('entropy', True)
print("entropy + prune")
my_tree.root.print_tree_from_node()
my_tree_estimated_accuracy = my_tree.accuracy_full_tree_from_node(X_test, y_test, my_tree.root)
print("\nAccuracy on unseen testing data:", my_tree_estimated_accuracy)

entropy + prune
X0: (⟨1⟩) (X2: (X1: (X3: (⟨1⟩) (⟨0⟩ )) (⟨0⟩ )) (⟨0⟩ ) )
Accuracy on unseen testing data: 0.8913043478260869


So my estimate for the performance of my algorithm on unseen data is about 89%, which isn't that great. If I had more time I'd like to figure out how to improve this value.
It is however better than the accuracy on validation data, so we can be fairly certian that the selected model was a good choice.


## Comparison to an existing implementation

As suggested in the assignment, I am comparing my algorithm with the desicion tree implementation DecisionTreeClassifier from sklearn. It can use either gini, or entropy as inpurity measure, so I will pick the one which gives highest accuracy.

In [7]:
sklearn_tree = sk_tree.DecisionTreeClassifier(criterion='entropy')
sklearn_tree = sklearn_tree.fit(X_train, y_train)
sk_predict = sklearn_tree.predict(X_val) 
sk_learn_accuracy = accuracy_score(y_val, sk_predict, normalize=True)
print("Sklearn with entropy accuracy:", sk_learn_accuracy)

sklearn_tree = sk_tree.DecisionTreeClassifier(criterion='gini')
sklearn_tree = sklearn_tree.fit(X_train, y_train)
sk_predict = sklearn_tree.predict(X_val) 
sk_learn_accuracy = accuracy_score(y_val, sk_predict, normalize=True)
print("Sklearn with gini accuracy:", sk_learn_accuracy)

Sklearn with entropy accuracy: 1.0
Sklearn with gini accuracy: 1.0


It seems that gini and entropy do equally well when used in sklearns DecisionTreeClassifier. We pick entopy to compare with my algorithm.

In [8]:
# Calculate runtime of my algorithm learning
start = timeit.default_timer()
my_tree.learn('entropy', True)
stop = timeit.default_timer()
print('My decision tree learning time: ', stop - start)  

#Calculate runtime of sklearn decision tree learning
sklearn_tree = sk_tree.DecisionTreeClassifier(criterion='entropy')
start = timeit.default_timer()
sklearn_tree = sklearn_tree.fit(X_train, y_train)
stop = timeit.default_timer()
print('Sklearn decision tree with gini learning time: ', stop - start)  

print("My decision tree accuracy:", my_tree_estimated_accuracy)
    
#Calculate accuracy of sklearn classifier
sk_predict = sklearn_tree.predict(X_test) 
sk_learn_accuracy = accuracy_score(y_test, sk_predict, normalize=True)
print("Sklearn accuracy:", sk_learn_accuracy)

My decision tree learning time:  0.6259505999999995
Sklearn decision tree with gini learning time:  0.006669499999999218
My decision tree accuracy: 0.8913043478260869
Sklearn accuracy: 1.0


sklearn clearly beats my implementation in both runtime and accuracy. 
I have not spent time improving the runtime of my algorithm, but I have an idea of where to start. I am sure that the hard workers that made DecisionTreeClassifier took much more care in ensuring the algorithm had a good runtime.
Unlike my algorithm, sklearn's does not prune it's decision trees.Unless a max depth parameter is set, an sklearn decision tree can potentially become very large on some datasets. While my algorith does not have a max-depth set, the pruning may prevent trees from being unnecessarily large.
Sklearn's decision tree does however split on a feature more than once, while my decision tree only splits once. This may be an explanation for why my implementation has so low accuracy, and is a main point of improvement for my algorithm. 
