# First Draft of Decision Tree
This Notebook experiments with my first implementation of a decision tree with entropy criterion. 
It uses best practices such as radix sort and O(1) entropy calculation for quantitative features.
Categorical features are one-hot-encoded and missing values are replaced with the mode of the feature. 

In [41]:
from numpy.ma.core import indices
from sympy import pprint
%load_ext autoreload
%autoreload 2

import numpy as np
from sklearn.model_selection import cross_val_score, GridSearchCV
import time 
from sklearn.tree import DecisionTreeClassifier

from collections import Counter
from sklearn.base import BaseEstimator, ClassifierMixin
from typing import Optional, List


The autoreload extension is already loaded. To reload it, use:
  %reload_ext autoreload


## First Implementation of a classification Decision Tree

In [113]:
class DecisionTree(BaseEstimator, ClassifierMixin):
    def __init__(self, max_depth: int = 3, feature_labels: Optional[List[str]]=None, max_features: Optional[int]=None, min_samples_leaf: int = 2,  sortfnc = None):
        
        self.sortfnc = sortfnc
        self.max_depth = max_depth
        self.min_samples_leaf = min_samples_leaf
        self.max_features = max_features
        self.feature_labels = feature_labels
        
        self.left: Optional['DecisionTree'] = None
        self.right: Optional['DecisionTree'] = None
        self.split_idx: Optional = None
        self.thresh: Optional =  None
        self.prediction: Optional[int] = None

    @staticmethod
    def entropy(y: np.ndarray[int]) -> float:
        length = len(y)
        if length == 0:
            return 0

        counts = np.bincount(y)
        probabilities = counts[counts > 0] / length
        return -np.sum(probabilities * np.log2(probabilities))

    @staticmethod
    def entropy_o1(counter: Counter[int], labels: np.ndarray, total_count: int) -> float:
        entropy = sum(
            count * np.log2(count / total_count) for count in (counter[label] for label in labels) if count > 0)
        return entropy

    @staticmethod
    def split(X: np.ndarray, y: np.ndarray[int], idx: int, thresh: float) -> tuple:
        left_mask = X[:, idx] < thresh
        right_mask = ~left_mask
        return X[left_mask], y[left_mask], X[right_mask], y[right_mask]

    def fit(self, X: np.ndarray, y: np.ndarray[int]) -> 'DecisionTree':
        nb_samples, nb_features = X.shape
        labels = np.unique(y)

        if self.max_depth == 0 or self.entropy(y) == 0 or nb_samples < self.min_samples_leaf:
            self.prediction = Counter(y).most_common(1)[0][0]
            return self

        best_split = {'gain': 0}
        features_indices = np.random.choice(nb_features, size=min(self.max_features, nb_features),
                                            replace=False) if self.max_features else range(nb_features)

        for idx in features_indices:
            X_j = np.copy(X[:, idx])
            if self.sortfnc: 
                sorted_indices = self.sortfnc(X_j)
                X_j_sorted = X_j
                y_sorted = y[sorted_indices]
            else:
                sorted_indices = np.argsort(X_j)
                X_j_sorted, y_sorted = X_j[sorted_indices], y[sorted_indices]
            
            unique_values, indices = np.unique(X_j, return_index=True)
            y_unique = y[indices]
            if len(unique_values) < 2:
                continue

            y_entropy = self.entropy(y_sorted)
            total_sum = len(y_sorted)

            left_classes, right_classes = Counter(), Counter(y_sorted)
            counts_left, counts_right = 0, total_sum

            for i in range(len(unique_values) - 1):
                threshold = (unique_values[i] + unique_values[i + 1]) / 2
                if i == 0:
                    left_classes = Counter(y_sorted[X_j_sorted < threshold])
                    right_classes = Counter(y_sorted[X_j_sorted >= threshold])
                    counts_left, counts_right = sum(left_classes.values()), sum(right_classes.values())
                else:
                    left_classes[y_unique[i]] += 1
                    counts_left += 1
                    right_classes[y_unique[i]] = right_classes[y_unique[i]] - 1
                    counts_right -= 1

                weighted_entropy = (-1 / total_sum) * (
                            self.entropy_o1(left_classes, labels, counts_left) + self.entropy_o1(right_classes, labels,
                                                                                                 counts_right))
                information_gain = y_entropy - weighted_entropy

                if information_gain > best_split['gain']:
                    best_split = {'gain': information_gain, 'threshold': threshold, 'feature': idx}

        if best_split['gain'] == 0:
            self.prediction = Counter(y).most_common(1)[0][0]
        else:
            self.split_idx = best_split['feature']
            self.thresh = best_split['threshold']
            X_left, y_left, X_right, y_right = self.split(X, y, self.split_idx, self.thresh)

            self.left = DecisionTree(self.max_depth - 1, self.feature_labels, self.max_features, self.min_samples_leaf).fit(X_left, y_left)
            self.right = DecisionTree(self.max_depth - 1, self.feature_labels, self.max_features, self.min_samples_leaf).fit(X_right, y_right)

        return self

    def predict(self, X: np.ndarray) -> np.ndarray:
        if self.prediction is not None:
            return np.full(X.shape[0], self.prediction)

        predictions = np.empty(X.shape[0], dtype=int)
        feature_values = X[:, self.split_idx]
        left_mask = feature_values < self.thresh
        right_mask = ~left_mask

        predictions[left_mask] = self.left.predict(X[left_mask])
        predictions[right_mask] = self.right.predict(X[right_mask])

        return predictions


## Cross Validation and comparison of Validation scores and running time between My implementation and Sklearn's

In [117]:
X = np.genfromtxt('../data/processed/titanic_training_cleaned.csv', delimiter=",", dtype=float)[1:, :]
y = np.genfromtxt('../data/processed/titanic_training_labels.csv', delimiter=",", dtype=int)[1:]


print("sklearn's decision tree")
clf = DecisionTreeClassifier(random_state=0, max_depth=3, min_samples_leaf= 7, criterion='entropy')
clf.fit(X, y)
start_time = time.time()  # Record the start time
print("Cross validation", cross_val_score(clf, X, y))
end_time = time.time()  # Record the end time
execution_time = end_time - start_time
print(f"Execution time sklearn: {execution_time} seconds")


print("\n\nMy decision tree")
clf = DecisionTree(max_depth=3, min_samples_leaf=1)
start_time = time.time()  # Record the start time
clf.fit(X, y)
print("Cross validation", cross_val_score(clf, X, y))
end_time = time.time()  # Record the end time
execution_time = end_time - start_time
print(f"Execution time my implementation: {execution_time} seconds")


sklearn's decision tree
Cross validation [0.82673267 0.83663366 0.79207921 0.78712871 0.7761194 ]
Execution time sklearn: 0.011657238006591797 seconds


My decision tree
Cross validation [0.83663366 0.82673267 0.79207921 0.81188119 0.80597015]
Execution time my implementation: 0.3010091781616211 seconds


Cross validation score is better than Sklearn's but running time is not good, my decision tree is still relatively slow, a potential improvement is to implement radix search for sorting quantitative features. But the O(1) information gain calculation dropped training time by half. An improvement for the score can be achieved with grid search to tune the hyperparameters.

## Grid search for Hyperparameters

In [50]:
param_grid = {
    'max_depth': [3, 5, 7, 8],
    'min_samples_leaf': [1, 2, 7, 8, 9, 10],
}


print("sklearn's decision tree")
model = DecisionTreeClassifier()
grid_search = GridSearchCV(estimator=model, param_grid=param_grid, cv=5)
grid_search.fit(X, y)
print("Sklearn's Best hyperparameters:",grid_search.best_params_)



print('\n\n My decision tree')
model = DecisionTree()
grid_search = GridSearchCV(estimator=model, param_grid=param_grid, cv=5)
grid_search.fit(X, y)
print("My tree Best hyperparameters:",grid_search.best_params_)


sklearn's decision tree
Sklearn's Best hyperparameters: {'max_depth': 3, 'min_samples_leaf': 7}


 My decision tree
My tree Best hyperparameters: {'max_depth': 3, 'min_samples_leaf': 1}


# Implementation of radix sort.

In [114]:
def counting_sort(arr, exp, indices):
    n = len(arr)
    count = np.zeros(10, dtype=int)
    
    # Calculate count of occurrences
    index = (arr // exp) % 10
    np.add.at(count, index.astype(int), 1)
    
    # Calculate cumulative count
    np.cumsum(count, out=count)
    
    # Create output arrays
    output = np.zeros(n, dtype=arr.dtype)
    output_indices = np.zeros(n, dtype=int)
    
    # Build the output array using the cumulative count
    for i in range(n-1, -1, -1):
        idx = int(index[i])
        count[idx] -= 1
        output[count[idx]] = arr[i]
        output_indices[count[idx]] = indices[i]
    
    # Copy the output array to arr and indices
    np.copyto(arr, output)
    np.copyto(indices, output_indices)

def radix_sort(arr):
    n = len(arr)
    indices = np.arange(n)
    max1 = np.max(arr)
    exp = 1

    while max1 // exp > 0:
        counting_sort(arr, exp, indices)
        exp *= 10
    
    return indices


In [116]:

print("\n\nMy decision tree")
clf = DecisionTree(max_depth=3, min_samples_leaf=1, sortfnc=radix_sort)
start_time = time.time()  # Record the start time
clf.fit(X, y)
print("Cross validation", cross_val_score(clf, X, y))
end_time = time.time()  # Record the end time
execution_time = end_time - start_time
print(f"Execution time my implementation: {execution_time} seconds")



My decision tree
Cross validation [0.83663366 0.82673267 0.79207921 0.81188119 0.80597015]
Execution time my implementation: 0.3266599178314209 seconds


There was no real improvement.Performance stayed practically the same.