In [17]:
import numpy as np
import math
from itertools import combinations

In [18]:
def gini(counts):
    """ Calculates Gini Impurity 
    
        Args:
            counts: The number of samples in each class.
    """
    # counts: array of shape (K,)
    total = counts.sum()
    if total == 0:
        return 0.0
    p = counts / total
    return 1.0 - np.sum(p * p)

def entropy(counts):
    """ Calculate Entropy
    
        Args:
            counts: The number of samples in each class.
    """
    p = counts/counts.sum()
    p = p[p > 0]   # avoid log(0)
    return -np.sum(p * np.log2(p))


In [19]:
def best_split_one_feature(x, y):
    x = np.asarray(x)
    y = np.asarray(y)

    # Sort by feature to scan thresholds once
    order = np.argsort(x, kind="mergesort")
    x = x[order]
    classes, y_enc = np.unique(y[order], return_inverse=True)   # Encode labels to 0...K-1
    K = len(classes)
    N = len(x)
    if N <= 1:
        return None, np.inf  # no split

    parent_counts = np.bincount(y_enc, minlength=K)
    parent_imp = gini(parent_counts)

    # Prefix sums of class counts
    prefix = np.zeros((N+1, K), dtype=int)
    for i in range(1, N+1):
        prefix[i] = prefix[i-1]
        prefix[i, y_enc[i-1]] += 1
    
    best_t = None
    best_after_imp = np.inf  # we minimize weighted impurity after

    for i in range(1, N):
        # Skip if they are the same
        if x[i-1]==x[i]:
            continue

        t = (x[i-1] + x[i]) / 2   # The threshold is the middle point
        
        n_left = i
        n_right = N - i
        left_counts = prefix[i]
        right_counts = parent_counts - left_counts

        if n_left==0 or n_right==0:
            continue

        left_imp = gini(left_counts)
        right_imp = gini(right_counts)
        weighted_after = (n_left/N) * left_imp + (n_right/N) * right_imp

        if weighted_after < best_after_imp:
            best_after_imp = weighted_after
            best_t = t
    
    return best_t, best_after_imp

x=[1, 3, 5, 6, 8]
y=[0, 1, 0, 0, 2]

best_split_one_feature(x, y)

(np.float64(7.0), np.float64(0.30000000000000004))

In [20]:
def best_split(X, y):
    X = np.asarray(X)
    y = np.asarray(y)
    N, D = X.shape  # N samples, D features
    
    best_feat = None
    best_t = None
    best_imp = np.inf
    for i in range(D):
        x_i = X[:,i]  # get feature i (column i of the matrix)
        t, imp = best_split_one_feature(x_i, y)

        if imp < best_imp:
            best_imp = imp
            best_feat = i
            best_t = t
    return best_feat, best_t, best_imp


X = [
    [1, 6],
    [2, 4],
    [3, 3],
    [1, 0]
]

y = [0, 0, 1, 1]

print(best_split(X, y))

(1, np.float64(3.5), np.float64(0.0))


In [49]:
from dataclasses import dataclass
from typing import Optional, Tuple, Union, List
@dataclass
class Node:
    feature:Optional[int] = None
    threshold: Optional[float] = None
    left: Optional["Node"] = None
    right: Optional["Node"] = None

    # for the leaves
    proba: Optional[np.ndarray] = None
    label: Optional[int] = None

    depth: int = 0
    n_samples: int = 0
    impurity: float = 0.0

    is_categorical = False

    def is_leaf(self) -> bool:
        return self.proba is not None

In [59]:
class DecisionTreeClassifier:
    def __init__(self, *, criterion: str = "gini", max_depth: Optional[int] = None,
                min_samples_split: int = 2, min_samples_leaf: int = 1, 
                min_impurity_decrease: float = 0.0
    ):
        """CART Decision Tree classifier
        Args:
            criterion: 'gini' or 'entropy'.
            max_depth: maximum depth of the tree (root=0). None => unlimited.
            min_samples_split: minimum samples to consider splitting a node.
            min_samples_leaf: minimum samples required at any child node.
            min_impurity_decrease: required impurity decrease to accept a split.
        """
        if criterion not in {"gini", "entropy"}:
            raise ValueError("criterion must be 'gini' or 'entropy'")
        self.criterion = criterion
        self.max_depth = max_depth
        self.min_samples_split = min_samples_split
        self.min_samples_leaf = min_samples_leaf
        self.min_impurity_decrease = min_impurity_decrease

        # Set during fit
        self.n_classes = None
        self.n_features = None
        self.tree = None
        self.classes = None

    def fit(self, X: np.array, y: np.array) -> "DecisionTreeClassifier":
        X = np.asarray(X)
        y = np.asarray(y)

        if X.ndim != 2: raise ValueError("X must be 2D")
        if y.ndim != 1: raise ValueError("y must be 1D")
        if X.shape[0] != y.shape[0]: raise ValueError("rows mismatch")

        classes, y_enc = np.unique(y, return_inverse=True)
        self.classes = classes
        self.n_classes = len(classes)
        self.n_features = X.shape[1]

        idx = np.arange(X.shape[0])
        self.tree = self._build(X, y_enc, idx, depth=0)
        return self
    
    def predict(self, X: np.array) -> np.array:
        proba = self.predict_proba(X)
        pred_idx = np.argmax(proba, axis=1)
        return self.classes[pred_idx]
    
    def predict_proba(self, X: np.array) -> np.array:
        if self.tree is None: raise RuntimeError("Model is not fitted")
        X = np.asarray(X)
        out = np.zeros(shape=(X.shape[0], self.n_classes), dtype=float)
        
        for i in range(X.shape[0]):
            head = self.tree

            while not head.is_leaf():
                feature = head.feature
                # For categorical nodes use in operator
                if head.is_categorical:
                    if X[i,feature] in head.threshold:
                        head = head.left
                    else:
                        head = head.right
                # For numerical nodes use < operator
                else:
                    if X[i,feature] < head.threshold:
                        head = head.left
                    else:
                        head = head.right
            out[i] = head.proba
        
        return out
    
    def _gini(self, counts: np.array) -> float:
        """ Calculates Gini Impurity 
        
            Args:
                counts: The number of samples in each class.
        """
        # counts: array of shape (K,)
        total = counts.sum()
        if total == 0:
            return 0.0
        p = counts / total
        return 1.0 - np.sum(p * p)

    def _entropy(self, counts: np.array) -> float:
        """ Calculate Entropy
        
            Args:
                counts: The number of samples in each class.
        """
        total = counts.sum()
        if total == 0.0: return 0.0 # check it's not empty
        p = counts/total
        p = p[p > 0]   # avoid log(0)
        return -np.sum(p * np.log2(p))

    def _impurity(self, counts: np.array):
        if self.criterion=="gini":
            return self._gini(counts)
        elif self.criterion=="entropy":
            return self._entropy(counts)
        else:
            return None
    
    def _best_split_one_feature_numerical(self, x, y):
        x = np.asarray(x)
        y = np.asarray(y)

        # Sort by feature to scan thresholds once
        order = np.argsort(x, kind="mergesort")
        x = x[order]
        classes, y_enc = np.unique(y[order], return_inverse=True)   # Encode labels to 0...K-1
        K = len(classes)
        N = len(x)
        if N <= 1:
            return None, np.inf  # no split

        parent_counts = np.bincount(y_enc, minlength=K)
        parent_imp = self._impurity(parent_counts)

        # Prefix sums of class counts
        prefix = np.zeros((N+1, K), dtype=int)
        for i in range(1, N+1):
            prefix[i] = prefix[i-1]
            prefix[i, y_enc[i-1]] += 1
        
        best_t = None
        best_after_imp = np.inf  # we minimize weighted impurity after

        for i in range(1, N):
            # Skip if they are the same
            if x[i-1]==x[i]:
                continue

            t = (x[i-1] + x[i]) / 2   # The threshold is the middle point
            
            n_left = i
            n_right = N - i
            left_counts = prefix[i]
            right_counts = parent_counts - left_counts

            if n_left==0 or n_right==0:
                continue

            left_imp = self._impurity(left_counts)
            right_imp = self._impurity(right_counts)
            weighted_after = (n_left/N) * left_imp + (n_right/N) * right_imp

            if weighted_after < best_after_imp:
                best_after_imp = weighted_after
                best_t = t
        
        return best_t, best_after_imp
    
    def _best_split_one_feature_categorical(self, x, y):
        cats, x_enc = np.unique(x, return_inverse=True)
        
        K = len(cats)
        if K <= 1:
            return None, np.inf # No split possible
        
        parent_counts = np.bincount(y, minlength=self.n_classes)
        parent_imp = self._impurity(parent_counts)

        best_subset = None
        best_after_imp = np.inf

        for r in range(1, K):
            for subset in combinations(range(K), r):
                mask_left = np.isin(x_enc, subset)
                left_counts = np.bincount(y[mask_left], minlength=self.n_classes)
                right_counts = np.bincount(y[~mask_left], minlength=self.n_classes)

                n_left = mask_left.sum()
                n_right = (~mask_left).sum()

                if n_left < self.min_samples_leaf or n_right < self.min_samples_leaf:
                    continue
                
                left_imp = self._impurity(left_counts)
                right_imp = self._impurity(right_counts)
                N = n_left + n_right
                weighted_after = (n_left/N) * left_imp + (n_right/N) * right_imp

                if weighted_after < best_after_imp:
                    best_after_imp = weighted_after
                    best_subset = subset
        
        if best_subset is None:
            return None, np.inf

        return cats[list(best_subset)], best_after_imp
    
    def _best_split_one_feature(self, x, y):
        if np.issubdtype(x.dtype, np.number):   # If it's a numerical feature we use the numerical split
            return self._best_split_one_feature_numerical(x, y)
        else:                                   # If it's a categorical feature we use the categorical split
            return self._best_split_one_feature_categorical(x, y)
    
    def _best_split(self, X, y):
        X = np.asarray(X)
        y = np.asarray(y)
        N, D = X.shape  # N samples, D features
        
        best_feat = None
        best_t = None
        best_imp = np.inf
        for i in range(D):
            x_i = X[:,i]  # get feature i (column i of the matrix)
            t, imp = self._best_split_one_feature(x_i, y)
            if t is None:
                continue

            if imp < best_imp:
                best_imp = imp
                best_feat = i
                best_t = t
        return best_feat, best_t, best_imp
    
    def _build(self, X: np.array, y: np.array, idx: np.array, depth: int=0) -> Node:
        node = Node(depth=depth, n_samples=idx.size)
        
        counts = np.bincount(y[idx], minlength=self.n_classes)
        proba = counts / counts.sum()
        imp = self._impurity(counts)
        node.impurity = imp

        # Stopping criteria
        if (
            (self.max_depth is not None and depth >= self.max_depth)
            or (idx.size < self.min_samples_split)
            or (counts.max() == counts.sum())   # pure node
        ):
            node.proba = proba
            node.label = int(np.argmax(proba))
            return node
        
        # Find best split
        best_feat, best_t, best_imp = self._best_split(X[idx], y[idx])
        gain = imp - best_imp

        if best_feat is None or gain < self.min_impurity_decrease:   # No valid split on any feature or gain smaller than min decrease
            node.proba = proba
            node.label = int(np.argmax(proba))
            return node
        
        # Partition indices
        if np.issubdtype(X[:, best_feat].dtype, np.number):
            mask_left = X[idx, best_feat] <= best_t
        else:
            mask_left = np.isin(X[idx, best_feat], best_t)  # if the feature was categorical the threshold will be a subset of categories returned by the best_split method
            node.is_categorical = True                      # Set the is_categorical attribute of the node to True
        
        left_idx = idx[mask_left]
        right_idx = idx[~mask_left]
        
        # Check min_samples_leaf
        if (left_idx.size < self.min_samples_leaf) or (right_idx.size < self.min_samples_leaf):
            node.proba = proba
            node.label = int(np.argmax(proba))
            return node
        
        left_node = self._build(X, y, left_idx, depth+1)
        right_node = self._build(X, y, right_idx, depth+1)
        
        node.feature = int(best_feat)
        # node.threshold = float(best_t)
        node.threshold = best_t
        node.left = left_node
        node.right = right_node
        return node
    
    def print_tree(self):
        if self.tree is None: raise RuntimeError("Model is not fitted")
        self._print(self.tree)

    def _print(self, node, indent=""):
        if node.is_leaf():
            print(f"{indent}Leaf(depth={node.depth}, n={node.n_samples}, proba={np.round(node.proba, 3)})")
        elif node.is_categorical:
            print(f"{indent}[X{node.feature} in {node.threshold}] imp={node.impurity:.3f}")
        else:
            print(f"{indent}[X{node.feature} < {node.threshold:.3f}] imp={node.impurity:.3f}")
            self._print(node.left, indent+"  ")
            self._print(node.right, indent+"  ")



## Sanity Checks

In [40]:
X = np.array([[0.],[1.],[2.]])
y = np.array([1,1,1])
clf = DecisionTreeClassifier().fit(X,y)
assert clf.tree.is_leaf() and clf.predict(X).tolist() == [1,1,1]


In [41]:
X = np.array([[0,0],[0,1],[1,0],[1,1]])
y = np.array([0,1,1,0])
acc1 = (DecisionTreeClassifier(max_depth=1).fit(X,y).predict(X) == y).mean()
acc3 = (DecisionTreeClassifier(max_depth=3).fit(X,y).predict(X) == y).mean()
assert acc3 > acc1 + 0.2

t, imp:   (np.float64(0.5), np.float64(0.5))
t, imp:   (np.float64(0.5), np.float64(0.5))
t, imp:   (np.float64(0.5), np.float64(0.5))
t, imp:   (np.float64(0.5), np.float64(0.5))
t, imp:   (None, inf)
t, imp:   (np.float64(0.5), np.float64(0.0))
t, imp:   (None, inf)
t, imp:   (np.float64(0.5), np.float64(0.0))


In [42]:
X = np.array([[0.0],[0.1],[0.2],[10.0]])
y = np.array([0,0,0,1])
clf = DecisionTreeClassifier(min_samples_leaf=2).fit(X,y)
assert clf.tree.is_leaf()  # split that isolates the outlier is rejected

t, imp:   (np.float64(5.1), np.float64(0.0))


In [43]:
# Trying the print method
X = np.array([[0,0],[0,1],[1,0],[1,1]])
y = np.array([0,1,1,0])
clf = DecisionTreeClassifier(max_depth=3).fit(X, y)
clf.print_tree()

t, imp:   (np.float64(0.5), np.float64(0.5))
t, imp:   (np.float64(0.5), np.float64(0.5))
t, imp:   (None, inf)
t, imp:   (np.float64(0.5), np.float64(0.0))
t, imp:   (None, inf)
t, imp:   (np.float64(0.5), np.float64(0.0))
[X0 < 0.500] imp=0.500
  [X1 < 0.500] imp=0.500
    Leaf(depth=2, n=1, proba=[1. 0.])
    Leaf(depth=2, n=1, proba=[0. 1.])
  [X1 < 0.500] imp=0.500
    Leaf(depth=2, n=1, proba=[0. 1.])
    Leaf(depth=2, n=1, proba=[1. 0.])


In [44]:
X = np.array([["red"], ["red"], ["red"]])
y = np.array([1, 1, 1])

clf = DecisionTreeClassifier().fit(X, y)
print("Pred:", clf.predict(X))
assert clf.tree.is_leaf()
assert clf.predict(X).tolist() == [1, 1, 1]


Pred: [1 1 1]


In [61]:
X = np.array([["red"], ["red"], ["blue"], ["blue"]])
y = np.array([0, 0, 1, 1])

clf = DecisionTreeClassifier().fit(X, y)
print("Pred:", clf.predict(X))
print("Feature:", clf.tree.feature, "Threshold:", clf.tree.threshold)

assert not clf.tree.is_leaf()
assert set(clf.tree.threshold) == {"red"} or set(clf.tree.threshold) == {"blue"}  # best subset
assert clf.predict(X).tolist() == [0, 0, 1, 1]


Pred: [0 0 1 1]
Feature: 0 Threshold: ['blue']
