## A decision tree implementation using the CART algorithm.

In [1]:
import numpy as np
import pandas as pd
from sklearn.preprocessing import LabelEncoder
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report

import os
from collections import namedtuple

In [2]:
DATA_HOME = './data'

In [3]:
Node = namedtuple('Node', ['threshold', 'feature_index', 'majority_class', 'left', 'right'])


class DecisionTreeClassifier:
    """ A decision tree classifier.
    """
    
    def __init__(self):
        self.n_classes = None
        self.n_features = None
        self.max_depth = None
        self.root = None
    
    def predict(self, X):
        """ Assigns a class to each observation. 
        
        :param X: A matrix of observations of shape (n_observations, n_features).
        :returns: An array of predicted classes corresponding to each observation.
        """
        predictions = []
        for observation in X:
            node = self.root
            while node.left is not None:
                if observation[node.feature_index] <= node.threshold:
                    node = node.left
                else:
                    node = node.right
            predictions.append(node.majority_class)
        return predictions
        
    def fit(self, X, y, max_depth=10):
        """ Builds a tree based on the given observations.
        
        :param X: A matrix of observations of shape (n_observations, n_features).
        :param y: A vector of output classes corresponding to the observations in ''X'', 
        of shape (n_observations,).
        :param max_depth: The maximum depth of the tree.
        """
        self.n_features = X.shape[1]
        self.n_classes = np.max(y) + 1
        self.max_depth = max_depth
        self.root = self.build_tree(X, y, 0)
        
        
    def build_tree(self, X, y, depth):
        """ Recursively splits each node and builds the left and right subtrees
        on a subset of observations, dictated by the feature and threshold returned
        by ''self.best_node_split''.
        
        :param X: A matrix of observations, limited to the ones that satisfy the conditions
        of the above nodes, of shape (n_observations, n_features).
        :param y: A vector of output classes corresponding to the observations in ''X'', 
        of shape (n_observations,).
        :param depth: The current depth of the tree.
        """
        class_counts = np.array([np.sum(y==c) for c in range(self.n_classes)])
        majority_class = np.argmax(class_counts)
        best_feature, best_threshold = None, None
        left, right = None, None
        
        if depth < self.max_depth:
            best_feature, best_threshold = self.best_node_split(X, y)
            if best_feature is not None:
                left_mask = X[:, best_feature] < best_threshold
                left = self.build_tree(X[left_mask, :], y[left_mask], depth+1)
                right_mask = X[:, best_feature] > best_threshold
                right = self.build_tree(X[right_mask, :], y[right_mask], depth+1)
        
        return Node(best_threshold, best_feature, majority_class, left, right)
        
    
    def best_node_split(self, X, y):
        """ Finds a feature and a threshold that leads to the smallest gini impurity
        of the child nodes (weighted by the number of observations in each of the child
        nodes). Considers every midpoint between two adjacent values (in sorted order) 
        of each feature as a possible split threshold.
        
        :param X: A matrix of observations to consider for the split, of shape 
        (n_observations, n_features).
        :param y: A vector of output classes corresponding to the observations in ''X'', 
        of shape (n_observations,).
        """
        n_observations = X.shape[0]
        class_counts = np.array([np.sum(y==c) for c in range(self.n_classes)])
        
        # Parent's gini index
        best_gini = 1 - np.sum((class_counts / n_observations) ** 2)
        best_feature, best_threshold = None, None
        
        for feature in range(self.n_features):
            # Sorting ''X'' and ''y'' in ascending order of the current feature
            sorted_indices = np.argsort(X[:, feature])
            sorted_observations = X[sorted_indices, feature]
            sorted_classes = y[sorted_indices]
            
            # Starting with all observations larger than the threshold (in the right child node)
            left_class_counts = np.array([0] * self.n_classes)
            right_class_counts = class_counts.copy()
            
            # For every midpoint between two adjacent observations (after sorting)
            for midpoint_id in range(1, n_observations):
                # Observation class directly below the considered midpoint value
                prev_observation_class = sorted_classes[midpoint_id - 1]
                # Moving this observation to the left child node
                left_class_counts[prev_observation_class] += 1
                # And removing it from the right child node
                right_class_counts[prev_observation_class] -= 1
                # Nr. of observations in the left child is the enumerated midpoint index 
                n_observations_left = midpoint_id
                # Nr. of observations in the right child is the rest of the observations
                n_observations_right = n_observations - n_observations_left
                
                # Calculating gini index for both child nodes
                gini_left = 1 - np.sum((left_class_counts / n_observations_left) ** 2)
                gini_right = 1 - np.sum((right_class_counts / n_observations_right) ** 2)
                # And their weighted average
                gini = (1 / n_observations) * (n_observations_left * gini_left + n_observations_right * gini_right)
                
                # Skipping midpoints between observations with the same values of the current feature
                if sorted_observations[midpoint_id] == sorted_observations[midpoint_id - 1]:
                    continue
                    
                if gini < best_gini:
                    best_gini = gini
                    best_feature = feature
                    # The threshold is an average of the two adjacent observations
                    best_threshold = (sorted_observations[midpoint_id - 1] + sorted_observations[midpoint_id]) / 2
                    
        return best_feature, best_threshold   
    
    def print_tree(self):
        """ Prints the tree to console.
        """
        lines, _, _, _ = self._print_tree_aux(self.root)
        for line in lines:
            print(line)

    def _print_tree_aux(self, node):
        """ Recursively creates a string representation of the tree. ''self.print_tree'' helper method. 
        :param node: Node from which to recursively begin creating the string representation.
        :returns: A list of lines, the width, the height, and the horizontal coordinate of the root.
        """
        if node.right is None and node.left is None:
            line = f'c: {node.majority_class}'
            width = len(line)
            height = 1
            middle = width // 2
            return [line], width, height, middle
        else:
            left, n, p, x = self._print_tree_aux(node.left)
            right, m, q, y = self._print_tree_aux(node.right)
            s = f'f: {node.feature_index}, t: {node.threshold}'
            u = len(s)
            first_line = (x + 1) * ' ' + (n - x - 1) * '_' + s + y * '_' + (m - y) * ' '
            second_line = x * ' ' + '/' + (n - x - 1 + u + y) * ' ' + '\\' + (m - y - 1) * ' '
            if p < q:
                left += [n * ' '] * (q - p)
            elif q < p:
                right += [m * ' '] * (p - q)
            zipped_lines = zip(left, right)
            lines = [first_line, second_line] + [a + u * ' ' + b for a, b in zipped_lines]
            return lines, n + m + u, max(p, q) + 2, n + u // 2

In [4]:
iris_data = pd.read_csv(os.path.join(DATA_HOME, 'iris.data'), header=None)

In [5]:
label_encoder = LabelEncoder()

In [6]:
X = iris_data.iloc[:, :-1].to_numpy()
y = label_encoder.fit_transform(iris_data.iloc[:, -1])

In [7]:
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3, shuffle=True, random_state=0)

In [8]:
dtc = DecisionTreeClassifier()

In [9]:
dtc.fit(X_train, y_train)

In [10]:
predictions = dtc.predict(X_test)

In [11]:
print(classification_report(predictions, y_test))

              precision    recall  f1-score   support

           0       1.00      1.00      1.00        16
           1       0.94      1.00      0.97        17
           2       1.00      0.92      0.96        12

    accuracy                           0.98        45
   macro avg       0.98      0.97      0.98        45
weighted avg       0.98      0.98      0.98        45



In [12]:
# f = feature index, t = decision threshold, c = class index
dtc.print_tree()

   _f: 2, t: 2.35___________________________________________                                            
  /                                                         \                                           
c: 0                        __________________________f: 2, t: 4.95__________________________           
                           /                                                                 \          
                    _f: 3, t: 1.65__________                                  _________f: 2, t: 5.05__  
                   /                        \                                /                        \ 
                 c: 1                _f: 1, t: 3.1__                  _f: 0, t: 6.5__               c: 2
                                    /               \                /               \                  
                                  c: 2            c: 1             c: 2            c: 1                 
