In [1]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import math

data_train = pd.read_csv('../data/q1/train.csv')
data_test = pd.read_csv('../data/q1/test.csv')
data_validation = pd.read_csv('../data/q1/validation.csv')

categorical_cols = data_train.select_dtypes(include='object').columns.tolist()

data_train = pd.get_dummies(data_train, columns=categorical_cols, dtype=int)
data_validation = pd.get_dummies(data_validation, columns=categorical_cols, dtype=int)
data_test = pd.get_dummies(data_test, columns=categorical_cols, dtype=int)

data_test = data_test.reindex(columns=data_train.columns, fill_value=0)
data_validation = data_validation.reindex(columns=data_train.columns, fill_value=0)


X_train = data_train.drop('result', axis=1).values
y_train = data_train['result'].values

X_validation = data_validation.drop('result', axis=1).values
y_validation = data_validation['result'].values

X_test = data_test.drop('result', axis=1).values
y_test = data_test['result'].values

ERROR! Session/line number was not unique in database. History logging moved to new session 277


In [5]:
import numpy as np
import math

class Node:
    def __init__(self, feature=None, threshold=None, is_leaf=False, target=None):
        self.feature = feature
        self.threshold = threshold
        self.is_leaf = is_leaf
        self.target = target
        self.left = None
        self.right = None

        # Validation stats used for fast pruning
        self.val_total = 0
        self.val_correct = 0


class DecisionTree:
    def __init__(self, max_depth=5, X=None, y=None):
        self.max_depth = max_depth
        self.X = X
        self.y = y
        self.root = None

    # ---------------- core building functions ---------------- #

    def calculate_entropy(self, y):
        if len(y) == 0:
            return 0
        p = np.mean(y == 1)
        return -(p * math.log2(p) + (1 - p) * math.log2(1 - p)) if p not in [0, 1] else 0

    def calculate_information_gain(self, X, y, feature_index):
        feature_values = X[:, feature_index]
        threshold = np.median(feature_values)
        left_mask = feature_values <= threshold
        right_mask = feature_values > threshold

        y_left, y_right = y[left_mask], y[right_mask]
        if len(y_left) == 0 or len(y_right) == 0:
            return 0.0, threshold
        H = (len(y_left) / len(y)) * self.calculate_entropy(y_left) + \
            (len(y_right) / len(y)) * self.calculate_entropy(y_right)
        return self.calculate_entropy(y) - H, threshold

    def get_best_split(self, X, y):
        best_gain = 0.0
        best_feature_index = None
        best_threshold = None

        if X is None or X.shape[0] == 0:
            return None, None

        n_features = X.shape[1]
        for feature_index in range(n_features):
            gain, threshold = self.calculate_information_gain(X, y, feature_index)
            if gain >= best_gain:
                best_gain = gain
                best_feature_index = feature_index
                best_threshold = threshold

        return best_feature_index, best_threshold

    def build_tree(self, X, y, depth):
        # leaf condition
        if depth == self.max_depth or len(y) == 0 or self.calculate_entropy(y) == 0:
            target = 1 if np.sum(y == 1) >= np.sum(y == 0) else 0
            return Node(is_leaf=True, target=target)

        feature_index, threshold = self.get_best_split(X, y)
        if feature_index is None:
            target = 1 if np.sum(y == 1) >= np.sum(y == 0) else 0
            return Node(is_leaf=True, target=target)

        root = Node(feature=feature_index, threshold=threshold)
        root.target = 1 if np.sum(y == 1) >= np.sum(y == 0) else 0

        left_mask = X[:, feature_index] <= threshold
        right_mask = X[:, feature_index] > threshold

        X_left, y_left = X[left_mask], y[left_mask]
        X_right, y_right = X[right_mask], y[right_mask]

        root.left = self.build_tree(X_left, y_left, depth + 1)
        root.right = self.build_tree(X_right, y_right, depth + 1)

        return root

    def fit(self):
        self.root = self.build_tree(self.X, self.y, 0)

    # ---------------- prediction / accuracy ---------------- #

    def predict(self, X):
        y_predictions = []
        for row in X:
            current_node = self.root
            while not current_node.is_leaf:
                feature = current_node.feature
                threshold = current_node.threshold
                if row[feature] <= threshold:
                    current_node = current_node.left
                else:
                    current_node = current_node.right
            y_predictions.append(current_node.target)
        return np.array(y_predictions)

    def accuracy(self, X, y):
        preds = self.predict(X)
        return np.mean(preds == y)

    # -------------- helpers for nodes & traversal -------------- #

    def get_nodes(self, node):
        """Return internal nodes (non-leaf) in pre-order."""
        if node is None or node.is_leaf:
            return []
        nodes = [node]
        nodes += self.get_nodes(node.left)
        nodes += self.get_nodes(node.right)
        return nodes

    def get_leaves(self, node):
        if node is None:
            return []
        if node.is_leaf:
            return [node]
        return self.get_leaves(node.left) + self.get_leaves(node.right)

    def new_pruned_node(self, node):
        """Turn node into a leaf: keep node.target already set by caller."""
        node.is_leaf = True
        node.left = None
        node.right = None

    # ----------- Validation annotation (single pass) ----------- #

    def _reset_val_stats(self, node):
        """Reset val stats recursively."""
        if node is None:
            return
        node.val_total = 0
        node.val_correct = 0
        if not node.is_leaf:
            self._reset_val_stats(node.left)
            self._reset_val_stats(node.right)

    def annotate_validation_stats(self, X_val, y_val):
        """
        Single pass over validation set:
        - increment val_total along path to leaf
        - if leaf prediction correct, increment val_correct along entire path
        This ensures ALL internal nodes get correct val_total and val_correct.
        """
        if self.root is None:
            return

        # clear previous stats
        self._reset_val_stats(self.root)

        for i, row in enumerate(X_val):
            true_label = y_val[i]
            path_nodes = []
            current = self.root

            # traverse and collect path
            while not current.is_leaf:
                path_nodes.append(current)
                feature, threshold = current.feature, current.threshold
                # increment total for internal node
                current.val_total += 1
                if row[feature] <= threshold:
                    current = current.left
                else:
                    current = current.right

            # now at leaf 'current'
            path_nodes.append(current)
            current.val_total += 1
            # check correctness at leaf
            leaf_correct = 1 if current.target == true_label else 0
            if leaf_correct:
                # increment val_correct for every node on the path (internal + leaf)
                for n in path_nodes:
                    n.val_correct += 1

        # Note: internal nodes' val_total already incremented while traversing.
        # For robustness, ensure internal.val_total equals sum(child.val_total)
        # (optional consistency check / recompute if needed). Not necessary with above logic.

    # --------------- fast pruning helpers ---------------- #

    def _node_ancestor_path(self, target_node):
        """
        Return list of nodes from root down to target_node (inclusive).
        Uses recursion to find path. O(depth).
        """
        path = []
        found = self._find_path(self.root, target_node, path)
        return path if found else []

    def _find_path(self, current, target_node, path):
        if current is None:
            return False
        path.append(current)
        if current is target_node:
            return True
        if current.left and self._find_path(current.left, target_node, path):
            return True
        if current.right and self._find_path(current.right, target_node, path):
            return True
        path.pop()
        return False

    def get_candidate_nodes(self, node):
        """
        Return nodes whose both children exist and are leaves.
        We will recompute candidates each iteration (cheap relative to N).
        """
        if node is None or node.is_leaf:
            return []
        candidates = []
        if node.left and node.right and node.left.is_leaf and node.right.is_leaf:
            candidates.append(node)
        candidates += self.get_candidate_nodes(node.left)
        candidates += self.get_candidate_nodes(node.right)
        return candidates

    # ---------------- optimized post-pruning ---------------- #

    def post_pruning(self, X_val, y_val, X_train, Y_train, X_test, Y_test, max_iters=100000):
        """
        Returns:
            count_nodes, train_accuracies, validation_accuracies, test_accuracies
        This function:
        - annotates validation stats in one pass,
        - chooses best candidate (largest improvement or non-decrease) each iteration,
        - applies pruning (updates node stats and propagates val_correct delta to ancestors),
        - updates summary accuracies lists.
        """
        if self.root is None:
            return [], [], [], []

        # 1) annotate validation stats (single pass)
        self.annotate_validation_stats(X_val, y_val)
        total_val = len(y_val)
        # initial global_correct is val_correct at root (counts correct predictions across validation set)
        global_correct = self.root.val_correct

        validation_accuracies = [global_correct / total_val]
        train_accuracies = [self.accuracy(X_train, Y_train)]
        test_accuracies = [self.accuracy(X_test, Y_test)]
        count_nodes = [2 * len(self.get_nodes(self.root)) + 1]

        iter_count = 0
        while True:
            iter_count += 1
            if iter_count > max_iters:
                break

            candidates = self.get_candidate_nodes(self.root)
            if not candidates:
                break

            best_gain = -1  # we allow zero gain as acceptable (>=0), choose best >=0
            best_node = None
            best_correct_after = None

            # evaluate each candidate in O(1) using stored stats
            for node in candidates:
                left, right = node.left, node.right
                before = left.val_correct + right.val_correct
                total = left.val_total + right.val_total

                # majority vote for new leaf target
                majority_1 = left.val_correct + right.val_correct
                majority_0 = total - majority_1
                if majority_1 >= majority_0:
                    target = 1
                    correct_after = majority_1
                else:
                    target = 0
                    correct_after = majority_0

                gain = correct_after - before

                # choose node with largest gain; tie-breaker: prefer larger total (prune bigger subtrees first)
                if gain > best_gain or (gain == best_gain and best_node is not None and total > (best_node.left.val_total + best_node.right.val_total)):
                    best_gain = gain
                    best_node = node
                    best_correct_after = correct_after

            # stop if no non-negative gain (you can allow zero gain too to reduce size)
            if best_node is None or best_gain < 0:
                break

            # apply pruning to best_node
            left, right = best_node.left, best_node.right
            before = left.val_correct + right.val_correct
            after = best_correct_after
            delta = after - before  # change in correct predictions for the subtree rooted at best_node

            # update global_correct and node stats
            global_correct += delta

            # set new stats for pruned node
            best_node.val_total = left.val_total + right.val_total
            best_node.val_correct = after
            best_node.target = 1 if after >= best_node.val_total / 2 else 0

            # make it a leaf
            self.new_pruned_node(best_node)

            # propagate delta up to ancestors (their val_total doesn't change, but val_correct changes)
            ancestor_path = self._node_ancestor_path(best_node)
            # ancestor_path includes root ... best_node; we already updated best_node.val_correct
            # we need to add delta to every ancestor except best_node (we already set it)
            for anc in ancestor_path[:-1]:
                anc.val_correct += delta

            # record metrics
            count_nodes.append(2 * len(self.get_nodes(self.root)) + 1)
            validation_accuracies.append(global_correct / total_val)
            train_accuracies.append(self.accuracy(X_train, Y_train))
            test_accuracies.append(self.accuracy(X_test, Y_test))

        return count_nodes, train_accuracies, validation_accuracies, test_accuracies


In [7]:
import sys
sys.setrecursionlimit(100000)
dpeth = [15,25,35,45]
for dpth in dpeth:
    Dtree = DecisionTree(max_depth=dpeth, X=X_train, y=y_train)
    Dtree.fit()
    cnt_nodes , train_acc , val_acc, test_acc = Dtree.post_pruning(X_validation, y_validation,X_train,y_train , X_test,y_test)
    print(f"Training Accuracy after pruning for depth {dpth} : ", train_acc[-1] , '%')
    print(f"Validation Accuracy after pruning for depth {dpth} : ", val_acc[-1] , '%')
    print(f"Test Accuracy after pruning for depth {dpth} : ", test_acc[-1] , '%')

    plt.plot(cnt_nodes , train_acc, label='Training Accuracy after Pruning')
    plt.plot(cnt_nodes , val_acc, label='Validation Accuracy after Pruning')
    plt.plot(cnt_nodes , test_acc, label='Test Accuracy after Pruning')
    plt.xlabel('Number of Nodes')
    plt.ylabel('Accuracy (%)')
    plt.title('Decision Tree Pruning')
    plt.legend()
    plt.show()

RecursionError: maximum recursion depth exceeded