# Import tools

In [1]:
import numpy as np
import pandas as pd
import random
from collections import Counter


# Import data

In [2]:
data = pd.read_csv("wine_dataset.csv", header = None, skiprows = 1)
data.columns = ["citric acid"] + ["residual sugar"] + ["pH"] + ["sulphates"] + ["alcohol"] + ["type"] 
data.iloc[:, :-1] = data.iloc[:, :-1].astype(np.float64)
data.iloc[:, -1] = data.iloc[:, -1].astype(str)
data.head()


Unnamed: 0,citric acid,residual sugar,pH,sulphates,alcohol,type
0,0.13,1.6,3.34,0.59,9.2,1
1,0.1,2.8,3.6,0.66,10.2,1
2,0.32,1.9,3.2,0.55,9.5,1
3,0.29,13.65,3.0,0.6,9.5,0
4,0.26,2.0,3.41,0.74,9.2,1


# Node and Tree class

In [3]:
class Node():
    def __init__(self, feature = None,threshold = None, left = None, right = None, gain = None, value = None, maj_label = None):
        # check node
        self.feature = feature
        self.threshold = threshold
        self.left = left
        self.right = right
        self.gain = gain
        # leaf node
        self.value = value
        self.maj_label = maj_label  # To store labels that reach this node

        
        # printing of node
    def print(self):
        print("feature: " + str(self.feature) + "threshold: " + str(self.threshold) + "left: " + str(self.left)
              + "right: " + str(self.right) + "gain: " + str(self.gain))
    

class Dtree():
    def __init__(self):
        # the root of tree
        self.root=None
        
        
        # used for setting the root of the tree
    def set_root(self, root):
        self.root = root
        
    
        # used to calculate the entropy
    def entropy(self, y): # y is labels
        # entropy formula E=−∑i=Npilog2pi
        classes, class_count = np.unique(y, return_counts = True)
        entropy = 0
        for i in classes:
            pi = len(y[y == i]) / len(y)
            entropy -= pi * np.log2(pi)
        return entropy
            
        
        # builds a decision tree based on either the GINI index or entropy 
    def make_tree(self, dataset, is_gini):
        X= dataset[:, :-1] 
        Y= dataset[:, -1]
        temp_flat_data = np.ravel(X)
        
        # if all of the labels have the same value return that value
        if np.all(Y == Y[0]):
            value_leaf = Y[0]
            return Node(value = value_leaf)
        # else if all the data points have equal feature value return the most common label
        elif np.all(X == temp_flat_data[0]):
            value_leaf = self.most_common(Y)
            return Node(value = value_leaf)
        # else get best split based on entropy
        else:
            (best_feature, best_threshold, best_data_left, best_data_right, best_gain) = self.best_split(X, Y, is_gini)
            if best_gain is None:
                return Node(value = self.most_common(Y))
            
            left_subtree = self.make_tree(best_data_left, is_gini)
            right_subtree = self.make_tree(best_data_right, is_gini)
            majority_label = self.most_common(Y)
            return Node(feature = best_feature, threshold = best_threshold, left = left_subtree, right = right_subtree, gain = best_gain, maj_label = majority_label)
    
        # finds the most common value for label or majority label
    def most_common(self, lst):
        y = list(lst)
        return max(y, key = y.count)
    
    
        # returns the best split based on either the GINI index or entropy impurity
    def best_split(self, X, y, is_gini):
        samples, features = np.shape(X)     
        # calculate entropy/gini of parent
        measure = self.gini if is_gini else self.entropy
        parent_measure = measure(y)    
        feature = None
        threshold = None
        gain = None
        data_left = None
        data_right = None
        data_left_out = None
        data_right_out = None
         
        for f in range(features):
            mean = np.mean(X[:,f])
            # can use np.median but is slower   
            data_left = np.where(X[:, f] <= mean)
            data_right = np.where(X[:, f] > mean)
            measure_left = measure(y[data_left])
            measure_right = measure(y[data_right]) 
            summ = (len(data_left[0]) + len(data_right[0]))
            weighted_measure =(measure_left * (len(data_left[0]) / summ)) + (measure_right * (len(data_right[0]) / summ))
            # if the weighted entropy/gini is smaller than the parent entropy/gini we want to split on this node    
            if weighted_measure < parent_measure:
                gain = parent_measure - weighted_measure
                parent_measure = weighted_measure
                feature = f
                threshold = mean
                data_left_out = np.concatenate((X[data_left], y[data_left].reshape(-1, 1)), axis = 1)
                data_right_out = np.concatenate((X[data_right], y[data_right].reshape(-1, 1)), axis = 1)          
        return feature, threshold, data_left_out, data_right_out,  gain
    
        # returns an array of predictions
    def predict(self, y):
        predictions = [self.make_predictions(x, self.root) for x in y]
        return predictions
    
        # checks the thresholds and the feature value and makes a prediction based on them recursively
    def make_predictions(self, x, node):
        if node.value != None: return node.value 
        feature_v = x[node.feature]
        if feature_v <= node.threshold:
            return self.make_predictions(x, node.left)
        else:
            return self.make_predictions(x, node.right)
        
        
        # prints the tree with indentations
    def print_tree(self, node, spacing = ""):
        if node.value is not None:
            print(f"{spacing}Predict: {node.value}")
            return

        print(f"{spacing}[Feature {node.feature} <= {node.threshold}] (Gain: {node.gain})")
    
        print(f"{spacing}--> Left:")
        self.print_tree(node.left, spacing + "  ")
    
        print(f"{spacing}--> Right:")
        self.print_tree(node.right, spacing + "  ")
        
        
        # used to calculate the gini
    def gini(self, y):
        # gini formula
        classes, class_count = np.unique(y, return_counts = True)
        gini = 0
        for i in classes:
            pi = len(y[y == i]) / len(y)
            gini += pi ** 2
        return 1 - gini

# Helper functions

In [4]:
from sklearn.metrics import accuracy_score

    # returns a decision tree based on features X and labels y
def learn(X, y, impurity_measure, is_prune = False, prune_data_ratio = 0.2): 
    if is_prune == True:
        X_train_prune, X_test_prune, Y_train_prune, Y_test_prune = train_test_split(X, y, test_size = prune_data_ratio, random_state = 41)
        data_prune = np.concatenate((X_train_prune, Y_train_prune), axis = 1)
        tree = Dtree()
        root = tree.make_tree(data_prune, impurity_measure == "gini")
        tree.set_root(root)
        
        new_root = prune_tree(tree, X_test_prune,  Y_test_prune)
        tree.set_root(new_root)        
        return tree
    else:
        print("Entropy impurity" if impurity_measure == "entropy" else "Gini impurity")
        dataset = np.concatenate((X, y), axis = 1)  
        tree = Dtree()
        root = tree.make_tree(dataset, impurity_measure == "gini")
        tree.set_root(root)
        return tree
        
    # prunes the tree by replacing subtrees with leaf nodes that have the majority label
def prune_tree(tree, X_p, Y_p, node = None):
    if node is None:
        node = tree.root
    if node.value is not None:
        return node

    # combine identical values
    if (node.left.value is not None) and (node.right.value is not None) and (node.left.value == node.right.value):
        return Node(value=node.left.value)
    
    # Recursively prune the left and right children
    if node.left is not None:
        node.left = prune_tree(tree, X_p, Y_p, node = node.left)
    if node.right is not None:
        node.right = prune_tree(tree, X_p, Y_p, node = node.right)
    # Save old node information
    old_value = node.value
    old_left = node.left
    old_right = node.right

    # Set node to a leaf node with majority label
    node.value = node.maj_label
    node.left = None
    node.right = None

    # Test the accuracy with the node converted to a leaf
    pruned_accuracy = accuracy_score(Y_p, tree.predict(X_p))

    # Revert the node back to its original state
    node.value = old_value
    node.left = old_left
    node.right = old_right

    # If pruning did not reduce accuracy, replace the subtree with the majority label
    original_accuracy = accuracy_score(Y_p, tree.predict(X_p))
    if pruned_accuracy >= original_accuracy:
        node.value = node.maj_label
        node.left = None
        node.right = None
        return node
    return node   

    
    # prints and returns the accuracy of the decision tree
def measure_accuracy(tree, measure, is_pruning, data_type, X, Y):
    Y_PRED = tree.predict(X)
    acc = accuracy_score(Y, Y_PRED)
    print(f"decision tree with {measure} and {'post' if is_pruning else 'no'} pruning has a {data_type} accuracy of: {acc}")
    return {"measure": measure, "is_pruning": is_pruning, "accuracy": acc}

    # by providing the different settings with their accuracy measures (on the validation data) it prints and returns the best setting among them
def get_best_setting(settings):
    max_value = -1
    best_setting = None
    for setting in settings:
        if setting["accuracy"] > max_value:
            max_value = setting["accuracy"]
            best_setting = setting
    print('best setting is: ')
    print(best_setting)
    print()
    return best_setting

    # used for evaluating the best setting by measuring its accuracy on the test data and printing out the results
def evaluate_best_setting(best_setting, gini_tree, gini_tree_pr, entropy_tree, entropy_tree_pr):
    if best_setting["measure"] == 'gini':
        if best_setting["is_pruning"] == False:
            measure_accuracy(gini_tree, 'gini', False, 'test', X_test, Y_test)
        else:
            measure_accuracy(gini_tree_pr, 'gini', True, 'test', X_test, Y_test)
    elif best_setting["is_pruning"] == False:
        measure_accuracy(entropy_tree, 'entropy', False, 'test', X_test, Y_test)
    else:
        measure_accuracy(entropy_tree_pr, 'entropy', True, 'test', X_test, Y_test)

# 1.4 - Test the implementation with wine data

In [5]:
from sklearn.metrics import accuracy_score
from sklearn.model_selection import train_test_split
import time

# Import data
X = data.iloc[:, :-1]
Y = data.iloc[:, -1].values.reshape(-1, 1)


X = np.asarray(X, dtype = object)
Y = np.asarray(Y, dtype = object)
np.random.seed(41)

# Split data
X_train, X_val_test, Y_train, Y_val_test = train_test_split(X, Y, test_size = 0.2, random_state = 2)
X_train_val, X_test, Y_train_val, Y_test = train_test_split(X_val_test, Y_val_test, test_size = 0.5, random_state = 2)

# training data: X_train, y_train
# validation data: X_train_val, y_train_val
# test data: X_test, Y_test

# entropy without pruning
entropy_tree = learn(X_train, Y_train, "entropy", False)
entropy_val_acc = measure_accuracy(entropy_tree, 'entropy', False, 'validation', X_train_val, Y_train_val)
measure_accuracy(entropy_tree, 'entropy', False, 'training', X_train, Y_train)
print()


start_time = time.time()
# entropy with pruning
entropy_tree_pr = learn(X_train, Y_train, "entropy", True, 0.2)
end_time = time.time()
elapsed_time = end_time - start_time
entropy_pr_val_acc = measure_accuracy(entropy_tree_pr, 'entropy', True, 'validation', X_train_val, Y_train_val)
measure_accuracy(entropy_tree_pr, 'entropy', True, 'training', X_train, Y_train)
print()


# gini without pruning
gini_tree = learn(X_train, Y_train, "gini", False)
gini_val_acc = measure_accuracy(gini_tree, 'gini', False, 'validation', X_train_val, Y_train_val)
measure_accuracy(gini_tree, 'gini', False, 'training', X_train, Y_train)
print()


# gini with pruning
gini_tree_pr = learn(X_train, Y_train, "gini", True, 0.2)
gini_pr_val_acc = measure_accuracy(gini_tree_pr, 'gini', True, 'validation', X_train_val, Y_train_val)
measure_accuracy(gini_tree_pr, "gini", True, 'training', X_train, Y_train)
print()


# model selection
# (settings are in this order because with same accuracy: pruning > no pruning, gini > entropy)
settings = [gini_pr_val_acc, entropy_pr_val_acc, gini_val_acc, entropy_val_acc]
best_setting = get_best_setting(settings)


# evaluation
evaluate_best_setting(best_setting, gini_tree, gini_tree_pr, entropy_tree, entropy_tree_pr)
print('time needed for creating tree: ' + str(elapsed_time))

Entropy impurity
decision tree with entropy and no pruning has a validation accuracy of: 0.865625
decision tree with entropy and no pruning has a training accuracy of: 1.0

decision tree with entropy and post pruning has a validation accuracy of: 0.86875
decision tree with entropy and post pruning has a training accuracy of: 0.9057857701329164

Gini impurity
decision tree with gini and no pruning has a validation accuracy of: 0.85625
decision tree with gini and no pruning has a training accuracy of: 1.0

decision tree with gini and post pruning has a validation accuracy of: 0.8625
decision tree with gini and post pruning has a training accuracy of: 0.9089132134480062

best setting is: 
{'measure': 'entropy', 'is_pruning': True, 'accuracy': 0.86875}

decision tree with entropy and post pruning has a test accuracy of: 0.921875
time needed for creating tree: 1.7682678699493408


# 1.5 - Compare to other implementation

In [6]:
from sklearn.preprocessing import LabelEncoder
from sklearn.tree import DecisionTreeClassifier
from sklearn.model_selection import train_test_split
from sklearn import tree
import time

# create entropy tree from sklearn
start_time = time.time()
classifier = DecisionTreeClassifier(criterion = 'entropy', random_state = 2)
classifier.fit(X_train, Y_train)
end_time = time.time()
elapsed_time = end_time - start_time
pred_val = classifier.predict(X_train_val)
pred_train = classifier.predict(X_train)
pred_test = classifier.predict(X_test)
print("accuracy of sklearn DecisionTreeClassifier with impurity measure entropy on validation data: ", accuracy_score(Y_train_val, pred_val))

print("accuracy of sklearn DecisionTreeClassifier with impurity measure entropy on training data: ", accuracy_score(Y_train, pred_train))
print()

# create gini tree from sklearn
start_time_gini = time.time()
classifier = DecisionTreeClassifier(criterion = 'gini', random_state = 2)
classifier.fit(X_train, Y_train)
end_time_gini = time.time()
elapsed_time_gini = end_time_gini - start_time_gini
pred_val = classifier.predict(X_train_val)
pred_train = classifier.predict(X_train)
pred_test = classifier.predict(X_test)
print("accuracy of sklearn DecisionTreeClassifier with impurity measure gini on validation data: ", accuracy_score(Y_train_val, pred_val))
print("accuracy of sklearn DecisionTreeClassifier with impurity measure gini on training data: ", accuracy_score(Y_train, pred_train))
print('time needed for creating tree: ' + str(elapsed_time))
print()
print("We need to compare the accuracy on test data to our implementation's test accuracy")
print("accuracy of sklearn DecisionTreeClassifier with impurity measure entropy on test data: ", accuracy_score(Y_test, pred_test))
print("accuracy of sklearn DecisionTreeClassifier with impurity measure gini on test data: ", accuracy_score(Y_test, pred_test))
print('time needed for creating tree: ' + str(elapsed_time_gini))

accuracy of sklearn DecisionTreeClassifier with impurity measure entropy on validation data:  0.903125
accuracy of sklearn DecisionTreeClassifier with impurity measure entropy on training data:  1.0

accuracy of sklearn DecisionTreeClassifier with impurity measure gini on validation data:  0.89375
accuracy of sklearn DecisionTreeClassifier with impurity measure gini on training data:  1.0
time needed for creating tree: 0.00797891616821289

We need to compare the accuracy on test data to our implementation's test accuracy
accuracy of sklearn DecisionTreeClassifier with impurity measure entropy on test data:  0.91875
accuracy of sklearn DecisionTreeClassifier with impurity measure gini on test data:  0.91875
time needed for creating tree: 0.005984067916870117
