# Decision Trees



In [4]:
import numpy as np

## Algorithm

We treat features as nodes. To build a decision tree, we need recursively/iteratively choose the best node for each level. The best node should have the most information gain after spliting.
An ideal case would be that if there is a threshold in the feature that can 100% separate all the labels.

Some pseduo code for choosing the best node for each level:
- for each feature in features:
  - thresholds = unique(data[feature])
  - for each threshold in thresholds:
    - information gain = split(data, feature, therold)
- pick the feature that leads to the most information gain

**Entropy**
- entropy is a way to measure impurity. and ideal split would sperate the same labels in the same subtree, which leads to an entropy of 0.

$$
H = -\sum_{i} p_i\log p_i
$$

$p_i$ is the fraction of the `ith` label in the current dataset for spliting -> probability of each label

- for regression problem, this could be represented as `MSE`

**Information Gain**
- information gain is the reduction of entropy when a split is made

$$
g = H(p^{node}) - (w^{left} H(p^{left}) + w^{right} H(p^{right}))
$$

where $H(p^{node})$ is the entropy at node, $H(p^{left})$ and $H(p^{right})$ is the entropy at the left and right subtree after spliting.
$w^{left}$ and $w^{right}$ is are the fraction of number of samples of left and right subtrees in terms of total number of samples before splitting.

In [None]:
class Node():
    """Node class for a decision tree
    """
    
    def __init__(self, feature=None, threshold=None, left=None, right=None, gain=None, value=None):
        """_summary_

        Args:
            feature (_type_, optional): feature represented at current node. Defaults to None.
            threshold (_type_, optional): splitting threshold for current feature. Defaults to None.
            left (_type_, optional): left child. Defaults to None.
            right (_type_, optional): right child. Defaults to None.
            gain (_type_, optional): information gain after splitting. Defaults to None.
            value (_type_, optional): predicted value. Only available for leaf nodes. Defaults to None.
        """
        self.feature = feature
        self.threshold = threshold
        self.left = left
        self.right = right
        self.gain = gain
        self.value = value

In [None]:
class DecisionTree():
    """Decision Tree class
    """
    
    def __init__(self):
        """Initialize an empty decision tree
        """
        self.root = None
    
    def build_tree(self, X, y):
        """Build the decision tree recursively
        
        Args:
            X (array-like): Input features
            y (array-like): Target labels
        """
        self.root = self._build_tree_recursive(X, y)
    
    def _build_tree_recursive(self, X, y):
        """Recursively build the decision tree
        
        Args:
            X (array-like): Input features
            y (array-like): Target labels
        
        Returns:
            Node: Root node of the decision tree
        """
        # Base case: if all labels are the same, create a leaf node with the predicted value
        if len(set(y)) == 1:
            return Node(value=y[0])
        
        # Find the best feature and threshold to split the data
        best_feature, best_threshold, best_gain = self._find_best_split(X, y)
        
        # Create a new node with the best feature and threshold
        node = Node(feature=best_feature, threshold=best_threshold, gain=best_gain)
        
        # Split the data based on the best feature and threshold
        left_indices = X[:, best_feature] <= best_threshold
        right_indices = X[:, best_feature] > best_threshold
        X_left, y_left = X[left_indices], y[left_indices]
        X_right, y_right = X[right_indices], y[right_indices]
        
        # Recursively build the left and right subtrees
        node.left = self._build_tree_recursive(X_left, y_left)
        node.right = self._build_tree_recursive(X_right, y_right)
        
        return node
    
    def _find_best_split(self, X, y):
        """Find the best feature and threshold to split the data
        
        Args:
            X (array-like): Input features
            y (array-like): Target labels
        
        Returns:
            int: Index of the best feature
            float: Best threshold value
            float: Information gain of the best split
        """
        best_feature = None
        best_threshold = None
        best_gain = -1
        
        # Iterate over each feature
        for feature in range(X.shape[1]):
            thresholds = np.unique(X[:, feature])
            
            # Iterate over each threshold
            for threshold in thresholds:
                # Split the data based on the current feature and threshold
                left_indices = X[:, feature] <= threshold
                right_indices = X[:, feature] > threshold
                y_left, y_right = y[left_indices], y[right_indices]
                
                # Calculate the information gain of the current split
                gain = self._calculate_information_gain(y, y_left, y_right)
                
                # Update the best feature, threshold, and gain if the current split has higher gain
                if gain > best_gain:
                    best_feature = feature
                    best_threshold = threshold
                    best_gain = gain
        
        return best_feature, best_threshold, best_gain
    
    def _calculate_information_gain(self, y, y_left, y_right):
        """Calculate the information gain of a split
        
        Args:
            y (array-like): Target labels before splitting
            y_left (array-like): Target labels in the left subtree after splitting
            y_right (array-like): Target labels in the right subtree after splitting
        
        Returns:
            float: Information gain
        """
        # Calculate the entropy before splitting
        entropy_before = self._calculate_entropy(y)
        
        # Calculate the entropy after splitting
        entropy_left = self._calculate_entropy(y_left)
        entropy_right = self._calculate_entropy(y_right)
        
        # Calculate the weighted average entropy after splitting
        weight_left = len(y_left) / len(y)
        weight_right = len(y_right) / len(y)
        entropy_after = weight_left * entropy_left + weight_right * entropy_right
        
        # Calculate the information gain
        gain = entropy_before - entropy_after
        
        return gain
    
    def _calculate_entropy(self, y):
        """Calculate the entropy of a set of labels
        
        Args:
            y (array-like): Target labels
        
        Returns:
            float: Entropy
        """
        # Count the occurrences of each label
        label_counts = np.bincount(y)
        
        # Calculate the probability of each label
        probabilities = label_counts / len(y)
        
        # Calculate the entropy
        entropy = -np.sum(probabilities * np.log2(probabilities + 1e-10))
        
        return entropy
    
    def predict(self, X):
        """Make predictions using the decision tree
        
        Args:
            X (array-like): Input features
        
        Returns:
            array-like: Predicted labels
        """
        return np.array([self._predict_recursive(x, self.root) for x in X])
    
    def _predict_recursive(self, x, node):
        """Recursively make predictions using the decision tree
        
        Args:
            x (array-like): Input features
            node (Node): Current node
        
        Returns:
            int: Predicted label
        """
        # Base case: if the current node is a leaf node, return the predicted value
        if node.value is not None:
            return node.value
        
        # Recursively traverse the tree based on the feature and threshold
        if x[node.feature] <= node.threshold:
            return self._predict_recursive(x, node.left)
        else:
            return self._predict_recursive(x, node.right)

## Improvements

How large should we grow the tree?
Tree size is a tuning parameter governing the model complexity.
- grow the tree as large as possible, say, $T_0$. Only stop the splitting process when some minimum node size (i.e., 5) is reached.
- prune $T_0$ using `cost-complexity` pruning


cost complexity criterion:

$$
C_{\alpha}(T) = \sum_{m=1}^{|T|}N_mQ_m(T) + \alpha |T|
$$

where $T$ is a subtree we want, $|T|$ is the number of nodes in $T$, $N_m$ is the number of tree nodes in the region with node `m`.
$Q_m(T)$ is the mean prediction errors in the subtree region. 
The idea is to find, for each $\alpha$, the subtree $T_{\alpha}$ to minimize $C_{\alpha}(T)$. 
The tuning $\alpha$ governs the tradeoff between tree size and its goodness of fit to the data. Large value will lead to small tree and vice versa. 

For each $\alpha$, there will be a unique smallest subtree $T_{\alpha}$. 
To find the $T_{\alpha}$, we successively collapse the internal node that produces the smallest per-node increase in $\sum_{m=1}^{|T|}N_mQ_m(T)$.

In [None]:
def cost_complexity_pruning(self, X_val, y_val, alpha):
    """Perform cost complexity pruning on the decision tree
    
    Args:
        X_val (array-like): Validation set features
        y_val (array-like): Validation set labels
        alpha (float): Complexity parameter
        
    Returns:
        Node: Pruned decision tree
    """
    self._calculate_subtree_cost(self.root, X_val, y_val)
    self._prune_subtree(self.root, alpha)
    
    return self.root

def _calculate_subtree_cost(self, node, X_val, y_val):
    """Recursively calculate the cost complexity criterion for each subtree
    
    Args:
        node (Node): Current node
        X_val (array-like): Validation set features
        y_val (array-like): Validation set labels
    """
    # Base case: if the current node is a leaf node, calculate the cost complexity criterion
    if node.left is None and node.right is None:
        node.cost = self._calculate_cost(node, X_val, y_val)
        return
    
    # Recursively calculate the cost complexity criterion for the left and right subtrees
    self._calculate_subtree_cost(node.left, X_val, y_val)
    self._calculate_subtree_cost(node.right, X_val, y_val)
    
    # Calculate the cost complexity criterion for the current node
    node.cost = node.left.cost + node.right.cost

def _calculate_cost(self, node, X_val, y_val):
    """Calculate the cost complexity criterion for a leaf node
    
    Args:
        node (Node): Leaf node
        X_val (array-like): Validation set features
        y_val (array-like): Validation set labels
    
    Returns:
        float: Cost complexity criterion
    """
    # Make predictions using the current subtree
    y_pred = self.predict(X_val)
    
    # Calculate the number of misclassifications
    misclassifications = np.sum(y_pred != y_val)
    
    # Calculate the cost complexity criterion
    cost = (misclassifications / len(y_val)) + (node.gain * node.value)
    
    return cost

def _prune_subtree(self, node, alpha):
    """Recursively prune the decision tree based on the complexity parameter
    
    Args:
        node (Node): Current node
        alpha (float): Complexity parameter
    """
    # Base case: if the current node is a leaf node, return
    if node.left is None and node.right is None:
        return
    
    # Recursively prune the left and right subtrees
    self._prune_subtree(node.left, alpha)
    self._prune_subtree(node.right, alpha)
    
    # Check if the current node can be pruned
    if node.cost <= alpha:
        node.left = None
        node.right = None
    
DecisionTree.cost_complexity_pruning = cost_complexity_pruning
DecisionTree._calculate_subtree_cost = _calculate_subtree_cost
DecisionTree._calculate_cost = _calculate_cost
DecisionTree._prune_subtree = _prune_subtree

