<a href="https://colab.research.google.com/github/adamd1985/Lectures_On_MLAI/blob/main/4_6_DecisionTrees_Lecture.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Decision Trees

In this notebook, we will explore how decision trees work, their fundamental concepts, and their application to real-world datasets. We will begin by understanding the structure of a decision tree, how it splits data based on features, and the mathematical criteria used to determine the best splits, such as Gini impurity and entropy.

In [None]:
from collections import Counter
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

# Classification Tree

Trees are primarly used to classify data points.

## Impurity Metrics

Decision trees rely on impurity measures to determine the best feature splits.

The **Gini Impurity** measures how often a randomly chosen sample from the node would be incorrectly classified if labeled according to the majority class.

$$
G = 1 - \sum_{k=1}^{K} p_{i,k}^2
$$

where:
- $ p_{i,k} $ represents the proportion of data points belonging to class $ k $ at node $ i $.
- $ K $ is the total number of classes in the dataset.

A Gini Index of **0** means that the node is **pure**, meaning all instances belong to the same class. Higher values indicate greater impurity.


Entropy, derived from **information theory**, measures the **uncertainty** of the dataset and is defined as:

$
H = -\sum_{k=1}^{K} p_{i,k} \log_2(p_{i,k})
$

where:
- $ p_{i,k} $ is the probability of class $ k $ at node $ i $.
- $ K $ is the total number of classes.

Entropy is **0** when all samples belong to a single class, meaning the dataset is completely **pure**. The maximum entropy value occurs when all classes are equally probable, indicating **maximum disorder**.


In [None]:
def gini_impurity(y):
    _, counts = np.unique(y, return_counts=True)
    probabilities = counts / len(y)
    return 1 - np.sum(probabilities ** 2)

def entropy(y):
    _, counts = np.unique(y, return_counts=True)
    probabilities = counts / len(y)
    return -np.sum(probabilities * np.log2(probabilities + 1e-10)) # avoid div by 0!

y1 = np.array([0, 0, 1, 1])  # 50% class 0, 50% class 1
y2 = np.array([0, 0, 0, 1])  # 75% class 0, 25% class 1

print("Gini Impurity and Entropy for y1:")
print("Gini:", gini_impurity(y1))
print("Entropy:", entropy(y1))

print("\nGini Impurity and Entropy for y2:")
print("Gini:", gini_impurity(y2))
print("Entropy:", entropy(y2))

## Information Gain

Decision trees use **information gain** to determine which feature provides the most **useful split** at each step. Information gain is based on the reduction in **impurity** (measured by either **Gini Impurity** or **Entropy**) after splitting a dataset.


$$
IG = H_{\text{parent}} - \sum_{j} \frac{N_j}{N_{\text{total}}} H_j
$$

where:
- $ H_{\text{parent}} $ is the impurity (Entropy or Gini) of the **parent node**.
- $ H_j $ is the impurity of the $ j $-th **child node**.
- $ N_j $ is the number of samples in the $ j $-th child node.
- $ N_{\text{total}} $ is the total number of samples in the parent node.

In [None]:
def information_gain(y, left_y, right_y, cost_fn):
    weight_left = len(left_y) / len(y)
    weight_right = len(right_y) / len(y)

    return cost_fn(y) - (weight_left * cost_fn(left_y) + weight_right * cost_fn(right_y))

Let's test it on the lesson's toy salary dataset.

In [None]:
salary_df = pd.DataFrame({
    'ID': [1, 2, 3, 4, 5],
    'Age': [25, 32.5, 40, 37.5, 35],
    'Salary': [50000, 60000, 80000, 90000, 70000],
    'Gender': ['Male', 'Female', 'Male', 'Female', 'Male']
})
y = pd.Categorical(salary_df['Gender']).codes
print(y)

salary_df

## Recursive Partitioning and Information Gain

Decision trees use recursive partitioning, a method where the dataset is iteratively split into binary subsets until a stopping condition is met. The process follows a structured sequence to ensure that each decision maximizes the separation of different target classes or minimizes variance in regression.

The classification and regression trees (CART) algorithm is a widely used implementation of recursive partitioning:

1. Start at the root node, which contains the entire dataset.
2. Identify the best feature to split on by evaluating all possible splits using a cost function such as Gini impurity for classification or variance reduction for regression.
3. Perform binary splitting, dividing the dataset into two subsets at each step.
4. Continue splitting nodes until a predefined stopping criterion is met, such as reaching a maximum depth or achieving pure leaf nodes.
5. Apply pruning techniques to refine the tree structure and prevent overfitting by removing unnecessary branches.
6. Finalize the model, ensuring it generalizes well to unseen data.



In [None]:
gini_root = gini_impurity(y)

print("Root: ", salary_df['ID'].values)
print("Gini before split:", gini_root)

left_mask = salary_df['Age'] <= 35
right_mask = ~left_mask

left_y = y[left_mask]
right_y = y[right_mask]

info_gain = information_gain(y, left_y, right_y, gini_impurity)
print("Information Gain for Age <= 35 split:", info_gain)

In [None]:
gini_left = gini_impurity(left_y)
gini_right = gini_impurity(right_y)
print("Root: ", salary_df['ID'].values)
print("Left: ", salary_df.loc[left_mask, 'ID'].values)
print("Right: ", salary_df.loc[right_mask, 'ID'].values)

print("Gini for left node (Age ≤ 35):", gini_left)
print("Gini for right node (Age > 35):", gini_right)

In [None]:
left_left_mask = salary_df.loc[left_mask, 'Salary'] <= 60000
left_right_mask = ~left_left_mask

left_left_y = left_y[left_left_mask]
left_right_y = left_y[left_right_mask]

info_gain_second_split = information_gain(left_y, left_left_y, left_right_y, gini_impurity)
print("Information Gain for Salary ≤ 60,000 split in Left Node:", info_gain_second_split)

In [None]:
gini_left_left = gini_impurity(left_left_y)
gini_left_right = gini_impurity(left_right_y)
print("Left IDs (≤ 60,000):", salary_df.loc[left_mask].loc[left_left_mask, 'ID'].values if len(left_left_y) > 0 else "Empty")
print("Right IDs (> 60,000):", salary_df.loc[left_mask].loc[left_right_mask, 'ID'].values if len(left_right_y) > 0 else "Empty")

print("Gini for left node (≤ 60,000):", gini_left_left if gini_left_left is not None else "N/A")
print("Gini for right node (> 60,000):", gini_left_right if gini_left_right is not None else "N/A")

## Full Classification Tree Build

Let's build a full tree from ground up using these concepts

In [None]:
class Node:
    def __init__(self, feature=None, threshold=None, left=None, right=None, prediction=None):
        self.feature = feature
        self.threshold = threshold
        self.left = left
        self.right = right
        self.prediction = prediction

    def is_leaf(self):
        return self.prediction is not None


In [None]:
class DecisionTree:
    def __init__(self, max_depth=None, cost_function=None):
        self.max_depth = max_depth
        self.cost_function = cost_function
        self.root = None  # Root node from class Node

    def fit(self, X, y):
        print("\nStarting to build the Decision Tree...\n")
        self.root = self._build_tree(X, y, depth=0)
        print("\nFinished building the Decision Tree!\n")

    def _build_tree(self, X, y, depth):
        num_samples, num_features = X.shape
        unique_classes = np.unique(y)
        if len(unique_classes) == 1 or (self.max_depth and depth >= self.max_depth):
            leaf_class = Counter(y).most_common(1)[0][0]
            print(f"{'  ' * depth}Leaf node created at depth {depth}: Class = {leaf_class}")
            return Node(prediction=leaf_class)

        best_feature, best_threshold, feature_scores_history = self._best_split(X, y)
        if best_feature is None:
            leaf_class = Counter(y).most_common(1)[0][0]
            print(f"{'  ' * depth}Leaf node created at depth {depth}: Class = {leaf_class}")
            return Node(prediction=leaf_class)

        print(f"{'  ' * depth}Feature evaluation at depth {depth}:")
        for feat, (thresh, score) in feature_scores_history.items():
            print(f"{'  ' * depth}  Feature {feat} -> Best Threshold: {thresh:.4f}, Score: {score:.6f}")

        print(f"{'  ' * depth}Depth {depth}: Selected Feature {best_feature} at Threshold {best_threshold}")

        left_indices = X[:, best_feature] <= best_threshold
        right_indices = ~left_indices
        left_subtree = self._build_tree(X[left_indices], y[left_indices], depth + 1)
        right_subtree = self._build_tree(X[right_indices], y[right_indices], depth + 1)

        return Node(feature=best_feature, threshold=best_threshold, left=left_subtree, right=right_subtree)

    def _best_split(self, X, y):
        num_samples, num_features = X.shape
        best_gain = -1
        best_feature, best_threshold = None, None
        feature_scores_history = {}

        for feature in range(num_features):
            thresholds = np.unique(X[:, feature])
            best_feature_score = -1
            best_feature_threshold = None

            for threshold in thresholds:
                left_y = y[X[:, feature] <= threshold]
                right_y = y[X[:, feature] > threshold]

                if len(left_y) == 0 or len(right_y) == 0:
                    continue

                gain = information_gain(y, left_y, right_y, self.cost_function)
                if gain > best_feature_score:
                    best_feature_score = gain
                    best_feature_threshold = threshold

                if gain > best_gain:
                    best_gain, best_feature, best_threshold = gain, feature, threshold

            feature_scores_history[feature] = (best_feature_threshold, best_feature_score)

        return best_feature, best_threshold, feature_scores_history

    def predict(self, X):
        return np.array([self._traverse_tree(sample, self.root) for sample in X])

    def _traverse_tree(self, sample, node):
        if node.is_leaf():
            return node.prediction
        if sample[node.feature] <= node.threshold:
            return self._traverse_tree(sample, node.left)
        else:
            return self._traverse_tree(sample, node.right)


In [None]:
from sklearn.datasets import load_iris

iris = load_iris(as_frame=True)
X, y = iris.data, iris.target
iris.frame.sample(3)

Train and test it.

In [None]:
from sklearn.metrics import accuracy_score, classification_report, confusion_matrix
from sklearn.model_selection import train_test_split

X, y = iris.data.values, iris.target.values

X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
tree = DecisionTree(max_depth=3, cost_function=gini_impurity)
tree.fit(X_train, y_train)

predictions = tree.predict(X_test)
print("Decision Tree Accuracy:", accuracy_score(y_test, predictions))
print("Decision Tree Classification Report:\n", classification_report(y_test, predictions))

In [None]:
class PrunedDecisionTree(DecisionTree):
    def __init__(self, max_depth=None, cost_function=None, alpha=0.01):
        super().__init__(max_depth, cost_function)
        self.alpha = alpha

    def _evaluate_accuracy(self, X_val, y_val):
        predictions = self.predict(X_val)
        return np.mean(predictions == y_val)


    def prune(self, X_val, y_val):
        print("\nStarting pruning process...\n")
        self._prune_tree(self.root, X_val, y_val, depth=0)
        print("\nPruning completed!\n")

    def _prune_tree(self, node, X_val, y_val, depth):
      if node.is_leaf():
          return
      self._prune_tree(node.left, X_val, y_val, depth + 1)
      self._prune_tree(node.right, X_val, y_val, depth + 1)

      if node.left.is_leaf() and node.right.is_leaf():
          # Can it be pruned more?
          pre_prune_accuracy = self._evaluate_accuracy(X_val, y_val)
          left_node, right_node = node.left, node.right

          node.prediction = Counter(y_val).most_common(1)[0][0]
          node.left = None
          node.right = None

          post_prune_accuracy = self._evaluate_accuracy(X_val, y_val)

          if post_prune_accuracy < pre_prune_accuracy - self.alpha:
              # Revert pruning if we didn't improve!
              node.prediction = None
              node.left = left_node
              node.right = right_node
              print(f"{'  ' * depth}Retained split at depth {depth}, feature {node.feature}, threshold {node.threshold}")
          else:
              print(f"{'  ' * depth}Pruned subtree at depth {depth}, feature {node.feature}, threshold {node.threshold}")


In [None]:
from sklearn.model_selection import train_test_split

X_train, X_val, y_train, y_val = train_test_split(X, y, test_size=0.2, random_state=42)

tree = PrunedDecisionTree(max_depth=3, cost_function=gini_impurity, alpha=0.01)
tree.fit(X_train, y_train)
tree.prune(X_val, y_val)

predictions = tree.predict(X_val)
accuracy = np.mean(predictions == y_val)
print(f"Final Accuracy after pruning: {accuracy:.4f}")

Check against the Scikit-learn evaluation.

In [None]:
from sklearn.tree import DecisionTreeClassifier

tree = DecisionTreeClassifier(max_depth=3, ccp_alpha=0.01, random_state=42)
tree.fit(X_train, y_train)

predictions = tree.predict(X_val)
accuracy = accuracy_score(y_val, predictions)
print(f"Final Accuracy after pruning: {accuracy:.4f}")

# Regression Tree

Trees can be used for regression, with minor alterations.

## Variance Reduction

Regression trees use **variance reduction** to determine the best splits, ensuring that the target variable is more homogeneous within each resulting node. Unlike classification trees, which use impurity measures like Gini impurity or entropy, regression trees aim to minimize the **mean squared error (MSE)** within each subset.


$
VR = \sigma^2_{\text{parent}} - \left( \frac{N_{\text{left}}}{N_{\text{total}}} \sigma^2_{\text{left}} + \frac{N_{\text{right}}}{N_{\text{total}}} \sigma^2_{\text{right}} \right)
$

where:
- $ \sigma^2_{\text{parent}} $ is the variance of the target variable before the split.
- $ \sigma^2_{\text{left}} $ and $ \sigma^2_{\text{right}} $ are the variances of the left and right child nodes.
- $ N_{\text{left}} $, $ N_{\text{right}} $, and $ N_{\text{total}} $ represent the number of samples in the left, right, and parent nodes, respectively.

In [None]:
def variance_reduction(y, left_y, right_y):
    weight_left = len(left_y) / len(y)
    weight_right = len(right_y) / len(y)

    return np.var(y) - (weight_left * np.var(left_y) + weight_right * np.var(right_y))

In [None]:
y = salary_df["Salary"].values

variance_root = np.var(y)
print("Root Node IDs: ", salary_df["ID"].values)
print("Variance before split:", variance_root)


left_mask = salary_df["Age"] <= 35
right_mask = ~left_mask

left_y = y[left_mask]
right_y = y[right_mask]


info_gain = variance_reduction(y, left_y, right_y)
print("Variance Reduction for Age <= 35 split:", info_gain)

print("\nLeft Subset (Age <= 35):", salary_df[left_mask]["ID"].values)
print("Right Subset (Age > 35):", salary_df[right_mask]["ID"].values)

Root Node IDs:  [1 2 3 4 5]
Variance before split: 200000000.0
Variance Reduction for Age <= 35 split: 150000000.0

Left Subset (Age <= 35): [1 2 5]
Right Subset (Age > 35): [3 4]


## Full Regression Tree Build

Let's build a full tree from ground up using these concepts

In [None]:
class RegressionTree:
    def __init__(self, max_depth=5):
        self.max_depth = max_depth
        self.root = None

    def fit(self, X, y):
        print("\nStarting to build the Regression Tree...\n")
        self.root = self._build_tree(X, y, depth=0)
        print("\nFinished building the Regression Tree!\n")

    def _build_tree(self, X, y, depth):
        num_samples, num_features = X.shape
        if len(np.unique(y)) == 1 or (self.max_depth and depth >= self.max_depth):
            leaf_value = np.mean(y)
            print(f"{'  ' * depth}Leaf node created at depth {depth}: Value = {leaf_value:.4f}")
            return Node(prediction=leaf_value)

        best_feature, best_threshold, best_score, feature_scores_history = self._best_split(X, y)
        if best_feature is None:
            leaf_value = np.mean(y)
            print(f"{'  ' * depth}Leaf node created at depth {depth}: Value = {leaf_value:.4f}")
            return Node(prediction=leaf_value)

        print(f"{'  ' * depth}Feature evaluation at depth {depth}:")
        for feat, (thresh, score) in feature_scores_history.items():
            print(f"{'  ' * depth}  Feature {feat} -> Best Threshold: {thresh:.4f}, Variance Reduction: {score:.6f}")

        print(f"{'  ' * depth}Depth {depth}: Selected Feature {best_feature} at Threshold {best_threshold}, Variance Reduction: {best_score:.6f}")

        left_indices = X[:, best_feature] <= best_threshold
        right_indices = ~left_indices

        left_subtree = self._build_tree(X[left_indices], y[left_indices], depth + 1)
        right_subtree = self._build_tree(X[right_indices], y[right_indices], depth + 1)

        return Node(feature=best_feature, threshold=best_threshold, left=left_subtree, right=right_subtree)

    def _best_split(self, X, y):
        num_samples, num_features = X.shape
        best_score = float("inf")
        best_feature, best_threshold = None, None
        feature_scores_history = {}

        for feature in range(num_features):
            thresholds = np.unique(X[:, feature])
            best_feature_score = float("inf")
            best_feature_threshold = None

            for threshold in thresholds:
                left_y = y[X[:, feature] <= threshold]
                right_y = y[X[:, feature] > threshold]

                if len(left_y) == 0 or len(right_y) == 0:
                    continue

                score = variance_reduction(y, left_y, right_y)
                if score < best_feature_score:
                    best_feature_score = score
                    best_feature_threshold = threshold

                if score < best_score:
                    best_score, best_feature, best_threshold = score, feature, threshold

            feature_scores_history[feature] = (best_feature_threshold, best_feature_score)

        return best_feature, best_threshold, best_score, feature_scores_history

    def predict(self, X):
        """Predicts the target values for a given dataset."""
        return np.array([self._traverse_tree(sample, self.root) for sample in X])

    def _traverse_tree(self, sample, node):
        """Recursively traverses the tree to make predictions."""
        if node.is_leaf():
            return node.prediction
        if sample[node.feature] <= node.threshold:
            return self._traverse_tree(sample, node.left)
        else:
            return self._traverse_tree(sample, node.right)


In [None]:
from sklearn.datasets import fetch_california_housing

cal_df = fetch_california_housing(as_frame=True)
cal_df.frame.sample(3)

Unnamed: 0,MedInc,HouseAge,AveRooms,AveBedrms,Population,AveOccup,Latitude,Longitude,MedHouseVal
16773,4.2421,43.0,5.835526,0.997807,1384.0,3.035088,37.69,-122.48,2.575
1138,3.13,8.0,5.539062,1.019531,1163.0,2.271484,39.69,-121.56,1.683
12081,4.7965,5.0,5.84016,1.033966,3258.0,3.254745,33.76,-117.54,1.608


In [None]:
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import mean_squared_error

X, y = cal_df.data.values, cal_df.target.values
scaler = StandardScaler()
X = scaler.fit_transform(X)

X_train, y_train = X[:200], y[:200]
X_test, y_test = X[-20:], y[-20:]

tree = RegressionTree(max_depth=5)
tree.fit(X_train, y_train)

predictions = tree.predict(X_test)

mse = mean_squared_error(y_test, predictions)
print(f"\nMean Squared Error on Test Set: {mse:.4f}")


Starting to build the Regression Tree...

Feature evaluation at depth 0:
  Feature 0 -> Best Threshold: -1.7743, Variance Reduction: 0.008262
  Feature 1 -> Best Threshold: 1.6178, Variance Reduction: 0.000050
  Feature 2 -> Best Threshold: -1.3675, Variance Reduction: 0.005838
  Feature 3 -> Best Threshold: -0.7223, Variance Reduction: 0.000095
  Feature 4 -> Best Threshold: 1.0045, Variance Reduction: 0.000013
  Feature 5 -> Best Threshold: 0.0921, Variance Reduction: 0.000007
  Feature 6 -> Best Threshold: 1.0010, Variance Reduction: 0.017737
  Feature 7 -> Best Threshold: -1.3628, Variance Reduction: 0.034647
Depth 0: Selected Feature 5 at Threshold 0.09205300097808652, Variance Reduction: 0.000007
  Feature evaluation at depth 1:
    Feature 0 -> Best Threshold: -1.7743, Variance Reduction: 0.008480
    Feature 1 -> Best Threshold: 1.6973, Variance Reduction: 0.000047
    Feature 2 -> Best Threshold: -1.3675, Variance Reduction: 0.005996
    Feature 3 -> Best Threshold: -0.7223, 

Check against the Scikit-learn evaluation.

In [None]:
from sklearn.tree import DecisionTreeRegressor

tree = DecisionTreeRegressor(max_depth=5, ccp_alpha=0.01, random_state=42)
tree.fit(X_train, y_train)
predictions = tree.predict(X_test)
mse = mean_squared_error(y_test, predictions)
print(f"\nMean Squared Error on Test Set: {mse:.4f}")


Mean Squared Error on Test Set: 0.6846


# Conclusion

Decision trees are one of the most intuitive and powerful machine learning models, providing clear decision-making structures for both **classification** and **regression** problems. By recursively splitting data into subsets, decision trees create a hierarchical model that captures complex relationships in the data.

What we have learned:

- **Classification Trees** use impurity measures such as **Gini Impurity** and **Entropy** to find the best splits, ensuring that classes are well separated.
- **Regression Trees** use **variance reduction** to minimize prediction error, selecting splits that reduce the spread of target values within each node.
- **Information Gain** plays a crucial role in selecting the best feature for splitting, ensuring that each decision node maximally improves homogeneity.
- **Pruning Techniques** help prevent overfitting by removing unnecessary branches and ensuring the model generalizes well to new data.

