In [12]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt

**DecisionTree Class**

In [14]:
import numpy as np

def to_one_hot(yX: np.ndarray): # Used while testing. There is a internal to_one_hot function in DecisionTree class
    y = yX[:, 0]
    X = yX[:, 1:] 
    
    n_attributes = X.shape[1]
    n_instances = X.shape[0]

    listx = []
    for attribute in range(n_attributes):
        listx.append(list(set(X[:, attribute])))
    
    one_hot_attributes = np.empty((0, n_instances))
    for i in range(n_attributes):
        for j in listx[i][:-1]:
            att = (X[:, i] <= j)
            one_hot_attributes = np.vstack((one_hot_attributes, att))
    
    one_hot_attributes = np.vstack((y, one_hot_attributes))
    return listx, one_hot_attributes.T


def node_score_gini(y: list):

    classes = list(set(y))
    gini = 1
    for i in classes:
        prob = y.count(i) / len(y)
        gini -= prob ** 2
    return gini

class Node:
 
    def __init__(self, left=None, right=None, depth=0, index_split_on=0, isleaf=False, label=1):
        self.left = left
        self.right = right
        self.depth = depth
        self.index_split_on = index_split_on
        self.isleaf = isleaf
        self.label = label
        self.info = {} # used for visualization


    def _set_info(self, gain, num_samples):

        self.info['gain'] = gain
        self.info['num_samples'] = num_samples


class CARTClassifier:
    
    def __init__(self, data, validation_data=None, gain_function=node_score_gini, max_depth=100):

        self.listx = []
        # to_one_hot
        one_hot_data = self.to_one_hot(data)
        # print('one_hot_data', one_hot_data)
        
        y = [row[0] for row in one_hot_data]
        self.classes = list(set(y))
        self.majority_class = max(self.classes, key=y.count)

        self.max_depth = max_depth
        self.root = Node(label = self.majority_class)
        self.gain_function = gain_function

        indices = list(range(1, len(one_hot_data[0])))
        # print('indices', indices)

        self._split_recurs(self.root, one_hot_data, indices)
 
        # Pruning
        if validation_data is not None:
            self._prune_recurs(self.root, validation_data)


    def to_one_hot(self, yX: np.ndarray):
        y = yX[:, 0]
        X = yX[:, 1:]
        
        n_attributes = X.shape[1]
        n_instances = X.shape[0]
        one_hot_attributes = np.empty((0, n_instances))
        
        if self.listx == []:
            for attribute in range(n_attributes):
                self.listx.append(list(set(X[:, attribute])))
        
        for i in range(n_attributes):
            for j in self.listx[i][:-1]:
                att = (X[:, i] <= j)
                one_hot_attributes = np.vstack((one_hot_attributes, att))
        
        one_hot_attributes = np.vstack((y, one_hot_attributes))
        return one_hot_attributes.T
    

    def predict(self, features):

        return self._predict_recurs(self.root, features)


    def accuracy(self, data):

        return 1 - self.loss(data)


    def loss(self, data):

        one_hot_data = self.to_one_hot(data)

        cnt = 0.0
        test_Y = [row[0] for row in one_hot_data]
        for i in range(len(one_hot_data)):
            prediction = self.predict(one_hot_data[i])
            if (prediction != test_Y[i]):
                cnt += 1.0
        return cnt/len(one_hot_data)


    def _predict_recurs(self, node, row):
        
        if node.isleaf or node.index_split_on == 0:
            return node.label
        split_index = node.index_split_on
        if not row[split_index]:
            return self._predict_recurs(node.left, row)
        else:
            return self._predict_recurs(node.right, row)


    def _prune_recurs(self, node, validation_data):
        
        if not node.isleaf:
            if node.left is not None:
                self._prune_recurs(node.left, validation_data)
            
            if node.right is not None:
                self._prune_recurs(node.right, validation_data)
            
            if (node.left.isleaf) and (node.right.isleaf):
                original_loss = self.loss(validation_data)
                original_label = node.label
                left = node.left
                right = node.right

                node.isleaf = True
                node.left = None
                node.right = None
                loss = self.loss(validation_data)
                if original_loss < loss:
                    node.isleaf = False
                    node.label = original_label
                    node.left = left
                    node.right = right
        return    
                    
        
    def _is_terminal(self, node, data, indices):

        y = [row[0] for row in data]
        # print('y', y)        
        
        is_terminal = node.isleaf
        if len(data) == 0 or len(indices) == 0 or len(set(y)) == 1 or node.depth == self.max_depth:
            is_terminal = True            
        # print(len(data) == 0, len(indices) == 0, len(set(y)) == 1, node.depth == self.max_depth)
        

        if len(data) == 0: 
            label = self.majority_class
            # print('if', label)
        else:
            label = max(list(set(y)), key=y.count)
            # print('else', label)
        
        return is_terminal, label
        

    def _split_recurs(self, node, data, indices):

        is_terminal, label = self._is_terminal(node, data, indices)
        node.label = label
        # print('nodelabel', node.label)

        if is_terminal:
            node.isleaf = True
            node.left = None
            node.right = None
            return
        
        if not node.isleaf:
            best_gain = -float('inf')
            best_index = None

            for index in indices:
                gain = self._calc_gain(data, index, self.gain_function)
                if gain > best_gain:
                    best_gain = gain
                    best_index = index

            # print('best_gain', best_gain, 'best_index', best_index)
            
            node.index_split_on = best_index
            indices.remove(best_index)
            node._set_info(best_gain, len(data))

            left_data = [row for row in data if row[best_index] == 0]
            right_data = [row for row in data if row[best_index] == 1]

            node.left = Node(depth=node.depth + 1)
            node.right = Node(depth=node.depth + 1)

            self._split_recurs(node.left, left_data, indices)
            self._split_recurs(node.right, right_data, indices)            
            

    def _calc_gain(self, data, split_index, gain_function=node_score_gini):

        y = [row[0] for row in data]
        xi = [row[split_index] for row in data]
        y_x0 = [row[0] for row in data if row[split_index] == 0]
        y_x1 = [row[0] for row in data if row[split_index] == 1]

        # print('y', y, 'xi', xi, 'y_x0', y_x0, 'y_x1', y_x1)
        if len(y) != 0 and len(xi) != 0:
            Px1 = xi.count(1) / len(xi)
            Px0 = xi.count(0) / len(xi)
            
            gain = gain_function(y) - (Px0 * gain_function(y_x0) + Px1 * gain_function(y_x1))

        else:
            gain = 0

        # print('gain', gain)
        return gain
    

    def print_tree(self):

        print('---START PRINT TREE---')
        def print_subtree(node, indent=''):
            if node is None:
                return str("None")
            if node.isleaf:
                return str(node.label)
            else:
                decision = 'split attribute = {:d}; gain = {:f}; number of samples = {:d}'.format(node.index_split_on, node.info['gain'], node.info['num_samples'])
            left = indent + '0 -> '+ print_subtree(node.left, indent + '\t\t')
            right = indent + '1 -> '+ print_subtree(node.right, indent + '\t\t')
            return (decision + '\n' + left + '\n' + right)

        print(print_subtree(self.root))
        print('----END PRINT TREE---')


    def loss_plot_vec(self, data):

        self._loss_plot_recurs(self.root, data, 0)
        loss_vec = []
        q = [self.root]
        num_correct = 0
        while len(q) > 0:
            node = q.pop(0)
            num_correct = num_correct + node.info['curr_num_correct']
            loss_vec.append(num_correct)
            if node.left != None:
                q.append(node.left)
            if node.right != None:
                q.append(node.right)

        return 1 - np.array(loss_vec)/len(data)


    def _loss_plot_recurs(self, node, rows, prev_num_correct):

        labels = [row[0] for row in rows]
        curr_num_correct = labels.count(node.label) - prev_num_correct
        node.info['curr_num_correct'] = curr_num_correct

        if not node.isleaf:
            left_data, right_data = [], []
            left_num_correct, right_num_correct = 0, 0
            for row in rows:
                if not row[node.index_split_on]:
                    left_data.append(row)
                else:
                    right_data.append(row)

            left_labels = [row[0] for row in left_data]
            left_num_correct = left_labels.count(node.label)
            right_labels = [row[0] for row in right_data]
            right_num_correct = right_labels.count(node.label)

            if node.left != None:
                self._loss_plot_recurs(node.left, left_data, left_num_correct)
            if node.right != None:
                self._loss_plot_recurs(node.right, right_data, right_num_correct)


**Dataset Import**

In [1]:
import pandas as pd

file = r'../data/car_evaluation.csv'
df = pd.read_csv(file, header=None)
df

Unnamed: 0,0,1,2,3,4,5,6
0,vhigh,vhigh,2,2,small,low,unacc
1,vhigh,vhigh,2,2,small,med,unacc
2,vhigh,vhigh,2,2,small,high,unacc
3,vhigh,vhigh,2,2,med,low,unacc
4,vhigh,vhigh,2,2,med,med,unacc
...,...,...,...,...,...,...,...
1723,low,low,5more,more,med,med,good
1724,low,low,5more,more,med,high,vgood
1725,low,low,5more,more,big,low,unacc
1726,low,low,5more,more,big,med,good


In [2]:
col_names = ['buying', 'maint', 'doors', 'persons', 'lug_boot', 'safety', 'class']
df.columns = col_names
df

Unnamed: 0,buying,maint,doors,persons,lug_boot,safety,class
0,vhigh,vhigh,2,2,small,low,unacc
1,vhigh,vhigh,2,2,small,med,unacc
2,vhigh,vhigh,2,2,small,high,unacc
3,vhigh,vhigh,2,2,med,low,unacc
4,vhigh,vhigh,2,2,med,med,unacc
...,...,...,...,...,...,...,...
1723,low,low,5more,more,med,med,good
1724,low,low,5more,more,med,high,vgood
1725,low,low,5more,more,big,low,unacc
1726,low,low,5more,more,big,med,good


In [7]:
X = df.drop(['class'], axis=1)

y = df['class']

**Some preprocess**

In [8]:
# replace +- with 1 and 0
y.replace({'unacc': 0, 'acc': 1, 'good': 2, 'vgood': 3}, inplace=True)
display(y)

0       0
1       0
2       0
3       0
4       0
       ..
1723    2
1724    3
1725    0
1726    2
1727    3
Name: class, Length: 1728, dtype: int64

In [9]:
# merge y and X
yX = pd.concat([y, X], axis=1)
display(yX)

Unnamed: 0,class,buying,maint,doors,persons,lug_boot,safety
0,0,vhigh,vhigh,2,2,small,low
1,0,vhigh,vhigh,2,2,small,med
2,0,vhigh,vhigh,2,2,small,high
3,0,vhigh,vhigh,2,2,med,low
4,0,vhigh,vhigh,2,2,med,med
...,...,...,...,...,...,...,...
1723,2,low,low,5more,more,med,med
1724,3,low,low,5more,more,med,high
1725,0,low,low,5more,more,big,low
1726,2,low,low,5more,more,big,med


In [10]:
# drop all NaN values
display(yX.dropna())

yX_fixed = yX.dropna()

yX_array = yX_fixed.to_numpy()
print(yX_array.shape)

Unnamed: 0,class,buying,maint,doors,persons,lug_boot,safety
0,0,vhigh,vhigh,2,2,small,low
1,0,vhigh,vhigh,2,2,small,med
2,0,vhigh,vhigh,2,2,small,high
3,0,vhigh,vhigh,2,2,med,low
4,0,vhigh,vhigh,2,2,med,med
...,...,...,...,...,...,...,...
1723,2,low,low,5more,more,med,med
1724,3,low,low,5more,more,med,high
1725,0,low,low,5more,more,big,low
1726,2,low,low,5more,more,big,med


(1728, 7)


**Main**

In [22]:
# Main
from sklearn.model_selection import train_test_split

# Split the data into training and testing sets
train, test = train_test_split(yX_array, test_size=0.2, random_state=123)
test, val = train_test_split(test, test_size=0.5, random_state=123)

# Train the model
test_model = CARTClassifier(train, max_depth=1000)
test_model_pruned = CARTClassifier(train, validation_data=val, max_depth=1000)

# Print the metrics
print(test_model.accuracy(train))
print(test_model.accuracy(test))
print(test_model_pruned.accuracy(train))
print(test_model_pruned.accuracy(test))

0.7083936324167872
0.7283236994219653
0.7069464544138929
0.7283236994219653


**Test Model 1**

In [None]:
import pytest
np.random.seed(0)
random.seed(0)

# Tests for node_score_gini with binary class
assert node_score_gini([1,1,1,1,1]) == node_score_gini([0,0,0,0,0]) == 0
assert node_score_gini([1,1,0,0,0]) == .48

# Creates Test Model and Dummy Data
x1 = np.array(([[1, 1, 0, 3], [1, 4, 1, 2], [1, 4, 1, 0], [0, 1, 0, 1], [0, 2, 1, 1], [1, 4, 0, 0]]))
test_model1 = CARTClassifier(x1)

# Test for majority_class
assert test_model1.majority_class == 1

# Tests for _is_terminal (threshold are values, if value<=threshold then to the left child)
test1_node1 = Node(left=None, right=None, depth=0, index_split_on=2, isleaf=False, label=1)
test1_node2 = Node(left=None, right=None, depth=1, index_split_on=None, isleaf=True, label=1)
test1_node3 = Node(left=None, right=None, depth=1, index_split_on=5, isleaf=False, label=0)
test1_node4 = Node(left=None, right=None, depth=2, index_split_on=None, isleaf=True, label=1)
test1_node5 = Node(left=None, right=None, depth=2, index_split_on=None, isleaf=True, label=0)

x1_one_hot = np.array([[1,1,1,1,0,0,0],
                       [1,0,0,0,0,0,1],
                       [1,0,0,0,1,1,1],
                       [0,1,1,1,0,1,1],
                       [0,0,1,0,0,1,1],
                       [1,0,0,1,1,1,1]]
)
x1_filtered_node2 = np.array([row for row in x1_one_hot if row[2] == 0])
x1_filtered_node3 = np.array([row for row in x1_one_hot if row[2] == 1])
x1_filtered_node4 = np.array([row for row in x1_filtered_node3 if row[5] == 0])
x1_filtered_node5 = np.array([row for row in x1_filtered_node3 if row[5] == 1])

assert test_model1._is_terminal(node=test1_node1, data=x1_one_hot, indices=[1,2,3,4,5,6]) == (False, 1)
assert test_model1._is_terminal(node=test1_node2, data=x1_filtered_node2, indices=[1,3,4,5,6]) == (True, 1)
assert test_model1._is_terminal(node=test1_node3, data=x1_filtered_node3, indices=[1,3,4,5,6]) == (False, 0)
assert test_model1._is_terminal(node=test1_node4, data=x1_filtered_node4, indices=[1,3,4,6]) == (True, 1)
assert test_model1._is_terminal(node=test1_node5, data=x1_filtered_node5, indices=[1,3,4,6]) == (True, 0)

# Tests _calc_gain
x1_original_node3 = np.array([row for row in x1 if row[2] == 1])
# Testing gain for index 2 in root, which is the max gain when splitting x1
assert test_model1._calc_gain(x1_one_hot, 2, node_score_gini) == pytest.approx(0.222, .01)
# Testing gain for index 5 in node3, which is the max gain when splitting data in node 3
assert test_model1._calc_gain(x1_filtered_node3, 5, node_score_gini) == pytest.approx(0.444, .01)

# Check Tree is created Properly, Compare with text below
test_model1.print_tree()

# Tests _prune_recurs
# Pruned tree should be smaller
# with higher training loss and lower validation loss
# The third feature of the second point in x_val1 is 5, which does not exist in our training set. We should still be able to classify it.
x_val1 = np.array([[1,4,0,2],[0,2,1,5]])

print('training loss not pruned:', test_model1.loss(x1))
print('validation loss not pruned:', test_model1.loss(x_val1), '\n')

test_model1_pruned = CARTClassifier(x1,validation_data=x_val1)
test_model1_pruned.print_tree()
print('training loss pruned:', test_model1_pruned.loss(x1))
print('validation loss pruned:', test_model1_pruned.loss(x_val1))


---START PRINT TREE---
split attribute = 2; gain = 0.222222; number of samples = 6
0 -> 1.0
1 -> split attribute = 5; gain = 0.444444; number of samples = 3
		0 -> 1.0
		1 -> 0.0
----END PRINT TREE---
training loss not pruned: 0.0
validation loss not pruned: 0.5 

---START PRINT TREE---
split attribute = 2; gain = 0.222222; number of samples = 6
0 -> 1.0
1 -> 0.0
----END PRINT TREE---
training loss pruned: 0.16666666666666666
validation loss pruned: 0.0


**Test Model 2**

In [None]:
# Tests for node_score_gini with multiclass
assert node_score_gini([0,0,0,0,0]) == node_score_gini([1,1,1,1,1]) == node_score_gini([2,2,2,2,2]) == 0
assert node_score_gini([1,1,0,0,2]) == pytest.approx(.64, .01)

# Creates Test Model and Dummy Data
x2 = np.array([[1, 1, 0, 3], [0, 1, 1, 2], [2, 0, 1, 1], [2, 0, 0, 3], 
               [0, 0, 1, 3], [1, 1, 0, 1], [1, 1, 1, 3], [0, 1, 0, 2]])
test_model2 = CARTClassifier(x2)

# Test for majority_class
assert test_model2.majority_class == 0

# Tests for _is_terminal (threshold are values, if value<=threshold then to the left child)
test2_node1 = Node(left=None, right=None, depth=0, index_split_on=1, isleaf=False, label=0)
test2_node2 = Node(left=None, right=None, depth=1, index_split_on=4, isleaf=False, label=1)
test2_node3 = Node(left=None, right=None, depth=1, index_split_on=2, isleaf=False, label=2)
test2_node4 = Node(left=None, right=None, depth=2, index_split_on=None, isleaf=True, label=1)
test2_node5 = Node(left=None, right=None, depth=2, index_split_on=3, isleaf=False, label=0)
test2_node6 = Node(left=None, right=None, depth=2, index_split_on=3, isleaf=False, label=0)
test2_node7 = Node(left=None, right=None, depth=2, index_split_on=None, isleaf=True, label=2)
test2_node8 = Node(left=None, right=None, depth=3, index_split_on=None, isleaf=True, label=0)
test2_node9 = Node(left=None, right=None, depth=3, index_split_on=None, isleaf=True, label=1)
test2_node10 = Node(left=None, right=None, depth=3, index_split_on=None, isleaf=True, label=0)
test2_node11 = Node(left=None, right=None, depth=3, index_split_on=None, isleaf=True, label=2)

x2_one_hot = np.array([[1,0,1,0,0],
                       [0,0,0,0,1],
                       [2,1,0,1,1],
                       [2,1,1,0,0],
                       [0,1,0,0,0],
                       [1,0,1,1,1],
                       [1,0,0,0,0],
                       [0,0,1,0,1]]
)
x2_filtered_node2 = np.array([row for row in x2_one_hot if row[1] == 0])
x2_filtered_node3 = np.array([row for row in x2_one_hot if row[1] == 1])
x2_filtered_node4 = np.array([row for row in x2_filtered_node2 if row[4] == 0])
x2_filtered_node5 = np.array([row for row in x2_filtered_node2 if row[4] == 1])
x2_filtered_node6 = np.array([row for row in x2_filtered_node3 if row[2] == 0])
x2_filtered_node7 = np.array([row for row in x2_filtered_node3 if row[2] == 1])
x2_filtered_node8 = np.array([row for row in x2_filtered_node5 if row[3] == 0])
x2_filtered_node9 = np.array([row for row in x2_filtered_node5 if row[3] == 1])
x2_filtered_node10 = np.array([row for row in x2_filtered_node6 if row[3] == 0])
x2_filtered_node11 = np.array([row for row in x2_filtered_node6 if row[3] == 1])

assert test_model2._is_terminal(node=test2_node1, data=x2_one_hot, indices=[1,2,3,4]) == (False, 0)
assert test_model2._is_terminal(node=test2_node2, data=x2_filtered_node2, indices=[2,3,4]) == (False, 1)
assert test_model2._is_terminal(node=test2_node3, data=x2_filtered_node3, indices=[2,3,4]) == (False, 2)

# assert test_model2._is_terminal(node=test2_node4, data=x2_filtered_node4, indices=[2,3]) == (True, 1)
# assert test_model2._is_terminal(node=test2_node5, data=x2_filtered_node5, indices=[2,3]) == (False, 0)
# assert test_model2._is_terminal(node=test2_node6, data=x2_filtered_node6, indices=[3,4]) == (False, 0)
# assert test_model2._is_terminal(node=test2_node7, data=x2_filtered_node7, indices=[3,4]) == (True, 2)
# assert test_model2._is_terminal(node=test2_node8, data=x2_filtered_node8, indices=[2]) == (True, 0)
# assert test_model2._is_terminal(node=test2_node9, data=x2_filtered_node9, indices=[2]) == (True, 1)
# assert test_model2._is_terminal(node=test2_node10, data=x2_filtered_node10, indices=[4]) == (True, 0)
# assert test_model2._is_terminal(node=test2_node11, data=x2_filtered_node10, indices=[4]) == (True, 2)

# Tests _calc_gain
# Testing gain for index 1 in x2
assert test_model2._calc_gain(x2, 1, node_score_gini) == pytest.approx(0.18958333, .01)
# Testing gain for index 4 in x2_filtered_node2
assert test_model2._calc_gain(x2_filtered_node2, 4, node_score_gini) == pytest.approx(0.21333, .01)
# Testing gain for index 2 in x2_filtered_node3
assert test_model2._calc_gain(x2_filtered_node5, 2, node_score_gini) == pytest.approx(0.111, .01)

# Check Tree is created Properly, Compare with text below
test_model2.print_tree()

# Tests _prune_recurs
# Pruned tree should be smaller
# with higher training loss and lower validation loss
x_val2 = np.array([[2,0,0,2],[0,1,1,1]])

print('training loss not pruned:', test_model2.loss(x2))
print('validation loss not pruned:', test_model2.loss(x_val2), '\n')

test_model2_pruned = CARTClassifier(x2,validation_data=x_val2)
test_model2_pruned.print_tree()
print('training loss pruned:', test_model2_pruned.loss(x2))
print('validation loss pruned:', test_model2_pruned.loss(x_val2))

---START PRINT TREE---
split attribute = 1; gain = 0.189583; number of samples = 8
0 -> split attribute = 4; gain = 0.213333; number of samples = 5
		0 -> 1.0
		1 -> split attribute = 3; gain = 0.444444; number of samples = 3
				0 -> 0.0
				1 -> 1.0
1 -> split attribute = 2; gain = 0.111111; number of samples = 3
		0 -> 0.0
		1 -> 2.0
----END PRINT TREE---
training loss not pruned: 0.125
validation loss not pruned: 0.5 

---START PRINT TREE---
split attribute = 1; gain = 0.189583; number of samples = 8
0 -> split attribute = 4; gain = 0.213333; number of samples = 5
		0 -> 1.0
		1 -> 0.0
1 -> 2.0
----END PRINT TREE---
training loss pruned: 0.25
validation loss pruned: 0.0
