# Decision Trees and Forests

In [None]:
import numpy as np
import pandas as pd
from dataclasses import dataclass
from typing import Optional
from typing import Type
from matplotlib import pyplot as plt


In [None]:
# base class for the criterion

class Criterion:
    OPTIMIZATION = ""

    @staticmethod
    def impurity(y) -> float: ...

    @staticmethod
    def split_impurity(y_left, y_right) -> float: ...

    @staticmethod
    def is_tolerance_reached(node_impurity, split_impurity, min_improvement) -> bool: ...


# specifical class for Gini

class GiniCriterion(Criterion):
    OPTIMIZATION = "minimize"

    @staticmethod
    def impurity(y):
        class_count = np.unique(y, return_counts=True)[1]
        probs = class_count / len(y)
        return 1 - np.sum(probs ** 2)
    
    @staticmethod
    def split_impurity(y_left, y_right):
        """Weighted sum of the gini impurity of two subsets"""
        nL = len(y_left)
        nR = len(y_right)
        n = nL + nR
        g_left = GiniCriterion.impurity(y_left)
        g_right = GiniCriterion.impurity(y_right)
        return (nL * g_left + nR * g_right) / n
    
    @staticmethod
    def is_tolerance_reached(node_impurity, split_impurity, min_improvement):
        improvement = node_impurity - split_impurity
        return improvement < min_improvement

In [None]:

# define an example of target
y1 = np.array(["cat", "cat", "dog", "dog", "dog"])

# access the moethods of the class using the class name and the dot operator
impurity = GiniCriterion.impurity(y1)
print(f"Gini impurity of y1 is {impurity:.2f}")

y2 = np.array(["cat", "cat"])

impurity = GiniCriterion.impurity(y2)
print(f"Gini impurity of y2 is {impurity:.2f}")

total_impurity = GiniCriterion.impurity(np.concatenate((y1, y2)))
print(f"Gini impurity of y = concat(y1, y2) is {total_impurity:.2f}")

split_impurity = GiniCriterion.split_impurity(y1, y2)
print(f"Gini impurity of the split (y1, y2) is: {split_impurity:.2f}")

In [None]:
@dataclass
class Node:
    impurity: Optional[float]         # None if leaf
    split_impurity: Optional[float]   # None if leaf
    feature: Optional[int]            # None if leaf
    threshold: Optional[float]        # None if leaf
    prediction: Optional[float]       # None if not leaf
    n_samples: int

# usage
node = Node(impurity=0.5, split_impurity = 0.3, feature=1, threshold=0.5, prediction=None, n_samples=10)


In [None]:
# la classe più generale possibile
def node_prediction_classification(y):
    """ritorna la classe più comune nel nodo"""
    classes, counts = np.unique(y, return_counts = True)
    return classes[np.argmax(counts)]

In [None]:
# Tree classifier model

class DecisionTree:
    def __init__(self, criterion: Type[Criterion], node_pref_fnc, max_depth, min_impurity_improvement):
        self.criterion = criterion
        self.node_pref_fnc = node_pref_fnc
        self.max_depth = max_depth
        self.min_impurity_improvement = min_impurity_improvement

        self.categorical_feat_idxs = []
        self.nodes = dict()

    @staticmethod
    def _split_dataset(X, feature, threshold, is_categorical):
        if is_categorical:
            # categorical feature
            left_mask = X[:, feature] == threshold
            right_mask = X[:, feature] != threshold
        else:
            # numerical feature
            left_mask = X[:, feature] <= threshold
            right_mask = X[:, feature] > threshold
        return left_mask, right_mask
    
    @staticmethod
    def _child_id(node_id, is_left):
        if is_left:
            return 2 * node_id + 1
        else:
            return 2 * node_id + 2
        
    def _find_best_split(self, X, y):
        """trovo la miglior feature e la migliro soglia per dividere i dati"""
        n_features = X.shape[1]

        best_feature:int = -1
        best_threshold:float = 0.0
        best_impurity = np.inf if self.criterion.OPTIMIZATION == "minimize" else -np.inf

        for f in range(n_features):
            is_categorical = f in self.categorical_feat_idxs
            split_impurity, threshold = self._find_best_threshold(X[:, f], y, is_categorical)

            if self.criterion.OPTIMIZATION == "minimize":
                is_better = split_impurity < best_impurity
            else: 
                is_better = split_impurity > best_impurity

            if is_better:
                best_feature = f
                best_threshold = threshold
                best_impurity = split_impurity

        return best_impurity, best_feature, best_threshold
    
    def _find_best_threshold(self, X_f, y, is_categorical):
        """Try all possible thresholds/categories and return the best one"""
        unique_values = np.unique(X_f)
        best_impurity = np.inf if self.criterion.OPTIMIZATION == "minimize" else -np.inf

        if len(unique_values) < 2:
            return best_impurity, unique_values[0]  # no split possible

        for value in unique_values:
            # divide according to threshold/category
            left_mask, right_mask = self._split_dataset(X_f, ..., value, is_categorical)
            y_left, y_right = y[left_mask], y[right_mask]

            # skip if the value does not split the data
            if len(y_left) == 0 or len(y_right) == 0:
                continue

            # evaluate impurity of the split
            split_impurity = self.criterion.split_impurity(y_left, y_right)

            if self.criterion.OPTIMIZATION == "minimize":
                is_better = split_impurity < best_impurity  # minimize
            else:
                is_better = split_impurity > best_impurity  # maximize

            if is_better:
                best_impurity = split_impurity
                best_threshold = value

        return best_impurity, best_threshold
    
    def _build_tree(self, X, y, depth, node_id):
        n_samples_node = len(y)

        # -- LEAF CONDITIONS --- posso splittare ancora visto le cose che mi sono arrivate?
        # 0. All the feature values of all the samples are the same (no useful split possible)
        # 1. The node has 0 or 1 samples (no split possible)
        # 2. Max depth is reached
        # 3. The impurity improvement < min_impurity_improvement (implemented after)

        # leaf condition 0
        X_all_same = all(len(np.unique(X[:, feat])) == 1 for feat in range(X.shape[1]))

        if len(y) <= 1 or depth >= self.max_depth or X_all_same:
            prediction = self.node_pred_fnc(y)
            self.node[node_id] = Node(None, None, None, None, prediction, n_samples_node)
            return
        
        # -- Internal Node: find BEST SPLIT and create children --
        best_split_impurity, best_feature, best_threshold = self._find_best_split(X, y)

        # (leaf condition 3)
        current_node_impurity = self.criterion.impurity(y)
        if self.criterion.is_tolerance_reached(current_node_impurity, best_split_impurity, self.min_impurity_improvement):
            prediction = self.node_pred_fnc(y)
            self.nodes[node_id] = Node(None, None, None, None, prediction, n_samples_node)    # leaf node
            return

        # -- Save node -- Se arriviamo qua non siamo in una foglia, salvo il nodo
        self.nodes[node_id] = Node(current_node_impurity, best_split_impurity, best_feature, best_threshold, None, n_samples_node)

        # -- Create children --
        is_feat_cat = best_feature in self.categorical_feat_idxs
        left_mask, right_mask = self._split_dataset(X, best_feature, best_threshold, is_feat_cat)

        left_id = self._child_id(node_id, True)
        right_id = self._child_id(node_id, False)

        X_left, y_left = X[left_mask], y[left_mask]
        self._build_tree(X_left, y_left, depth + 1, left_id) #ricorsiva

        X_right, y_right = X[right_mask], y[right_mask]
        self._build_tree(X_right, y_right, depth + 1, right_id) #ricorsiva

    def fit(self, X, y, categorical_feat_idxs = None):
        """
        Fit the decision tree to the data. The tree is built recursively by splitting the data at each node
        based on the impurity criterion.
        Args:
            X: input matrix of shape (n_samples, n_features)
            y: true target/labels of shape (n_samples,)
            categorical_feat_idxs: list of categorical feature indices
        """
        self.categorical_feat_idxs = categorical_feat_idxs if categorical_feat_idxs is not None else []

        self.nodes = dict()     # Reset the nodes
        self._build_tree(X, y, 0, 0)

    def predict(self, X):
        """
        Predict the target variable for the given input data X.
        Keep the same column order as in the training data.

        Args:
            X: input matrix of shape (n_samples, n_features)
        """
        predictions = []
        for x in X:
            node_id = 0
            node : Node = self.nodes[0]
            while True:
                if node.prediction is not None:
                    predictions.append(node.prediction) # leaf node
                    break
                # else we need to go down the tree
                if node.feature in self.categorical_feat_idxs:
                    # categorical features
                    is_left = x[node.feature] == node.threshold
                else:
                    # continuous features
                    is_left = x[node.feature] <= node.threshold
                node_id = self._child_id(node_id, is_left)
                node = self.nodes[node_id]
        return np.array(predictions)


    def print_tree(self, node_id=0, prefx="", is_left=True, feat_names=None):
        """
        Prints the binary tree in a hierarchical format.
        """
        if node_id not in self.nodes:
            return

        node = self.nodes[node_id]
        conn = "├── " if is_left else "└── "

        if node.feature is None:
            print(f"{prefx}{conn}{node_id}-Pred: {node.prediction}")
            return
        else:
            if feat_names is None:
                f = f"X[{node.feature}]"
            else:
                f = feat_names[node.feature]
            print(f"{prefx}{conn}{node_id}-{f} <= {node.threshold} | Impurity:{node.impurity:.3f}")

        prefx += "│   " if is_left else "    "
        l_id = self._child_id(node_id, is_left=True)
        r_id = self._child_id(node_id, is_left=False)

        self.print_tree(l_id, prefx, is_left=True, feat_names=feat_names)
        self.print_tree(r_id, prefx, is_left=False, feat_names=feat_names)

In [None]:
# applicazione
data = pd.read_csv('iris.csv')
display(data.head())

feature_names = data.columns[:-1]
target_col = 'species'

X = data.drop(target_col, axis=1).to_numpy()
y = data[target_col].to_numpy()

print("X.shape_", X.shape, "y.shape_", y.shape)

# define model using the Gini impurity criterion
tree = DecisionTree(criterion=GiniCriterion,
                    node_pred_fnc = node_prediction_classification,
                    max_depth = 3,
                    min_impurity_improvement=0.0)

est_gini, best_feature, best_threshold = tree._find_best_split(X, y)
print(f"Best gini impurity for the first split: {best_gini:.2e}")
print(f"Best feature: '{feature_names[best_feature]}'")
print(f"Best threshold: {best_threshold:.2f}")

plt.scatter(X[:, best_feature], y)
plt.axvline(best_threshold, color="red", linestyle="--")
plt.xlabel(feature_names[best_feature])
plt.ylabel("species")
plt.title(f"Best split for '{feature_names[best_feature]}' feature")

tree.fit(X, y, categorical_feat_idxs=None) # fit the model

tree.print_tree()

# training prediction and accuracy
y_pred = tree.predict(X)
print("Training accuracy:", accuracy(y, y_pred))

In [None]:
from utils import plot_decision_boundary_2d, create_2d_meshpoints

def create_proba(dtree):
    def tree_proba(X):
        y_pred = dtree.predict(X)
        y_pred = np.unique(y_pred, return_inverse=True)[1]
        proba = np.zeros((X.shape[0], len(np.unique(y))))
        for i, pred in enumerate(y_pred):
            proba[i, pred] = 1
        return proba
    return tree_proba

max_depth = 4
print("Max depth:", max_depth)
tree2 = DecisionTree(GiniCriterion, node_prediction_classification, max_depth, 0.0)
tree2.fit(X, y, categorical_feat_idxs=None)
y_pred = tree2.predict(X)
print("Train Accuracy:", accuracy(y, y_pred))

X_grid, xx, yy, X_2d = create_2d_meshpoints(X, 200)

probability_func = create_proba(tree2)

n_features = X.shape[1]
plot_decision_boundary_2d(
    X_grid,
    np.unique(y, return_inverse=True)[1],
    probability_func,
    xx,
    yy,
    X_2d,
    n_features,
)

## Cost Complexity Pruning

In [None]:
def compute_cost_complexity(X, y, node_id, tree, ccps, total_N):
    node = tree.nodes[node_id]

    node_impurity = tree.criterion.impurity(y) * len(y) / total_N

    if node.feature is None: # leaf node
        return node_impurity, 1 # impurity, number of leaf
    
    # parent node: splitta il dataset e processa i figli
    left_id = tree._child_id(node_id, is_left=True)
    right_id = tree._child_id(node_id, is_left=False)
    left_mask, right_mask = tree._split_dataset(
        X, node.feature, node.threshold, node.feature in tree.categorical_feat_idxs
    )

    left_impurity, left_leaves = compute_cost_complexity(
        X[left_mask], y[left_mask], left_id, tree, ccps, total_N
    )
    right_impurity, right_leaves = compute_cost_complexity(
        X[right_mask], y[right_mask], right_id, tree, ccps, total_N
    )

    total_leaves = left_leaves + right_leaves

    impurity_sum = left_impurity + right_impurity
    ccps[node_id] = (node_impurity - impurity_sum) / (total_leaves - 1)

    return impurity_sum, total_leaves

In [None]:
# applicazione sempre con irisi di questa parte
subtrees_impurities = {}
compute_cost_complexity(X, y, 0, tree, subtrees_impurities, total_N=len(y))

# keep only the internal nodes
subtrees_impurities = {node_id: imp for node_id, imp in subtrees_impurities.items() if tree.nodes[node_id].feature is not None}


for node_id, imp in subtrees_impurities.items():  # Reusing the same impurity dictionary
    print(f"Subtree {node_id} imp: {imp:.4f}")

print("-" * 50)
tree.print_tree()

In [None]:
# given the node_id, we can remove the node and all its children
# to get the prediction, we still need the training dataset
def prune_tree(X, y, node_id, tree, ids_to_prune):
    node = tree.nodes[node_id]

    if node_id in ids_to_prune:
        tree.nodes[node_id] = Node(None, None, None, None, tree.node_pred_fnc(y), node.n_samples)
        return
    
    if node.feature is None:
        return
    
    left_mask, right_mask = tree._split_dataset(X, node.feature, node.threshold, node.feature in tree.categorical_feats_idxs)
    left_id = tree._child_id(node_id, is_left = True)
    right_id = tree._child_id(node_id, is_left=False)
    prune_tree(X[left_mask], y[left_mask], left_id, tree, ids_to_prune)
    prune_tree(X[right_mask], y[right_mask], right_id, tree, ids_to_prune)

In [None]:
# come cambia l accuracy variando il numero di sotto alberi pruned
tree = DecisionTree(
    criterion=GiniCriterion,
    node_pred_fnc=node_prediction_classification,
    max_depth=3,
    min_impurity_improvement=0.0,
)

# order the ccps by increasing ccp
sorted_ccps = sorted(subtrees_impurities.items(), key=lambda x: x[1])
accuracies = []
for i in range(len(sorted_ccps)):
    nodes_to_prune = [node_id for node_id, _ in sorted_ccps[:i + 1]]
    print(f"Pruning nodes: {nodes_to_prune}")
    # start from the original tree
    tree.fit(X, y, categorical_feat_idxs=None)
    # prune the tree
    prune_tree(X, y, 0, tree, nodes_to_prune)
    # get the predictions
    y_pred = tree.predict(X)
    # compute the accuracy
    acc = accuracy(y, y_pred)
    print(f"Accuracy after pruning nodes {nodes_to_prune}: {acc:.4f}")
    # plot the tree
    tree.print_tree()
    print("-" * 50)
    accuracies.append((acc, len(nodes_to_prune)))

accuracies = np.array(accuracies)
plt.plot(accuracies[:, 1], accuracies[:, 0], marker="o")
plt.xlabel("Number of pruned nodes")
plt.ylabel("Accuracy")
plt.title("Accuracy vs Number of pruned nodes")
plt.grid()


# Random Forest

## Bootstrap

In [None]:
def bootstrap_sample(X, y):
    # select n samples from X with replacement
    # first parameter: values among which we are selecting, second parameter: number of values that we are
    # selecting, third parameter: with replacement
    indices = np.random.choice(len(X), size=len(X), replace=True)
    # filter X and y based on indices
    return X[indices], y[indices]

def fit_bagging(X, y, T, max_depth):
    """Fit T trees using bagging"""
    # forest
    trees = []
    for j in range(T):
        # alternative dataset
        X_sample, y_sample = bootstrap_sample(X, y)
        # initialize the tree
        tree = DecisionTree(GiniCriterion, node_prediction_classification, max_depth, 0.0)
        # fit the tree
        tree.fit(X_sample, y_sample, categorical_feat_idxs=None)
        # add the tree to the forest
        trees.append(tree)
    return trees

def predict_bagging(X, trees):
    # matrix where each row corresponds to a different tree (each column correspond to a different sample)
    predictions = np.array([tree.predict(X) for tree in trees]) # Shape (num_trees, num_samples)
    y_pred = []
    confidences = []
    # iterate through the columns
    for i in range(predictions.shape[1]):
        # for column (sample) i, consider all rows (all predictions of all trees)
        sample_predictions = predictions[:, i]
        # get the number of times each class appears in the column
        # (array containing the number of times class 0 appears, the number of times class 1 appears ...)
        class_counts = np.bincount(sample_predictions)
        # get the class that appears most frequently
        majority_class = np.argmax(class_counts)
        # we compute the confidence interval as the proportion of times that the majority class gets predicted
        confidence = class_counts[majority_class] / len(trees)
        # add the prediction to the prediction list
        y_pred.append(majority_class)
        # add the confidence to the confidence list
        confidences.append(confidence)
    return np.array(y_pred), np.array(confidences)

# number of trees in the forest
T = 4
# depth of each tree
max_depth = 2

accuracies = []

# train the forest
trees = fit_bagging(X_train, y_train, T, max_depth)
# get the vector of predictions
y_pred, confidences = predict_bagging(X_test, trees)
acc = accuracy(y_test, y_pred)
print("Accuracy: ", acc)
print("Predictions: ", y_pred)
print("True labels: ", y_test)
print("Confidence of the predictions: ", confidences)

for i in range(len(y_pred)):
    if y_pred[i] != y_test[i]: # predizioni sbagliate
        print("Index: ", i)
        print("Prediction: ", y_pred[i])
        print("True label: ", y_test[i])
        print("Confidence of the prediction: ", confidences[i])



In [None]:
from tqdm import trange  # to print a progress bar

np.random.seed(0)

# we make the same computation several times, reporting the mean and variance of the accuracy metric
trials = 5
# number of trees in the forest
T_sizes = [1, 4, 8, 16]
# depth of each tree
max_depth = 4

trial_accuracies = []
for trial in trange(trials, desc="Trials", leave=True): # while this loop is computed, print a progress bar
    accuracies = []
    for T in T_sizes:
        # train the forest
        trees = fit_bagging(X_train, y_train, T, max_depth)
        # get the vector of predictions
        y_pred, confidences = predict_bagging(X_test, trees)
        # compute the accuracy
        acc = accuracy(y_test, y_pred)
        accuracies.append(acc)
    trial_accuracies.append(accuracies)

# Calculate mean and std of accuracies
mean_accuracies = np.mean(trial_accuracies, axis=0)
std_accuracies = np.std(trial_accuracies, axis=0)

# Plot accuracies over T_sizes with error bars
plt.figure(figsize=(8, 5))
plt.errorbar(
    T_sizes, mean_accuracies, yerr=std_accuracies, fmt="o-", color="b", label="Accuracy"
)
plt.xlabel("Number of Trees (T)")
plt.ylabel("Accuracy")
plt.title("Bagging: Accuracy vs Number of Trees")
plt.grid(True)
plt.xticks(T_sizes)
plt.legend(loc="lower right")
plt.show()