DATA COLLECTION

In [None]:
!pip install ucimlrepo

Collecting ucimlrepo
  Downloading ucimlrepo-0.0.7-py3-none-any.whl.metadata (5.5 kB)
Downloading ucimlrepo-0.0.7-py3-none-any.whl (8.0 kB)
Installing collected packages: ucimlrepo
Successfully installed ucimlrepo-0.0.7


In [None]:
from ucimlrepo import fetch_ucirepo
import numpy as np
import pandas as pd
from ucimlrepo import fetch_ucirepo
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder
from sklearn.metrics import accuracy_score, precision_recall_fscore_support, confusion_matrix
from sklearn.tree import DecisionTreeClassifier as SklearnDTC # For comparison
import matplotlib.pyplot as plt
import seaborn as sns

adult = fetch_ucirepo(id=2) #
X = adult.data.features #
y = adult.data.targets #

X = X.replace(' ?', np.nan)
X = X.replace('?', np.nan)
initial_rows = len(X)
data = pd.concat([X, y], axis=1).dropna()
X = data.drop(columns=y.columns)
y = data[y.columns.tolist()[0]]

print(f"Dropped {initial_rows - len(data)} rows with missing values.")

Dropped 3620 rows with missing values.


In [None]:
categorical_features = X.select_dtypes(include=['object']).columns
numeric_features = X.select_dtypes(include=['int64', 'float64']).columns

for col in categorical_features:
    le = LabelEncoder()
    X[col] = X[col].astype('category').cat.codes

y_encoded = y.apply(lambda x: 1 if x.strip() == '>50K' else 0)

X_train_val, X_test, y_train_val, y_test = train_test_split(X, y_encoded, test_size=0.2, random_state=42, stratify=y_encoded)

X_train, X_val, y_train, y_val = train_test_split(X_train_val, y_train_val, test_size=0.25, random_state=42, stratify=y_train_val)

X_train_np = X_train.values
y_train_np = y_train.values
X_val_np = X_val.values
y_val_np = y_val.values
X_test_np = X_test.values
y_test_np = y_test.values

print(f"Train samples: {len(X_train)} (60%)")
print(f"Validation samples: {len(X_val)} (20%)")
print(f"Test samples: {len(X_test)} (20%)")

Train samples: 27132 (60%)
Validation samples: 9045 (20%)
Test samples: 9045 (20%)


DECISION TREE

In [None]:
class Node:
    def __init__(self, feature_idx=None, threshold=None, value=None, left=None, right=None):

        self.feature_idx = feature_idx
        self.threshold = threshold
        self.left = left
        self.right = right
        self.value = value

class DecisionTreeClassifierScratch:

    def __init__(self, max_depth=None, min_samples_split=2, min_impurity_decrease=0.0, impurity_measure='gini'):
        self.root = None
        self.max_depth = max_depth
        self.min_samples_split = min_samples_split
        self.min_impurity_decrease = min_impurity_decrease
        self.impurity_measure = impurity_measure
        self.feature_importances_ = {}
        self.feature_names = None


    def _gini_impurity(self, y):
        if len(y) == 0:
            return 0
        classes, counts = np.unique(y, return_counts=True)
        probabilities = counts / len(y)
        return 1.0 - np.sum(probabilities**2)

    def _entropy(self, y):
        if len(y) == 0:
            return 0
        classes, counts = np.unique(y, return_counts=True)
        probabilities = counts / len(y)
        probabilities = probabilities[probabilities > 0]
        return -np.sum(probabilities * np.log2(probabilities))

    def _calculate_impurity(self, y):
        if self.impurity_measure == 'gini':
            return self._gini_impurity(y)
        else:
            return self._entropy(y)


    def _information_gain(self, y, y_left, y_right):
        parent_impurity = self._calculate_impurity(y)
        n = len(y)
        n_left, n_right = len(y_left), len(y_right)

        weighted_child_impurity = (n_left / n) * self._calculate_impurity(y_left) + \
                                  (n_right / n) * self._calculate_impurity(y_right)

        return parent_impurity - weighted_child_impurity

    def _find_best_split(self, X, y):
        best_gain = -1
        best_feature_idx = None
        best_threshold = None
        n_features = X.shape[1]

        if len(y) < self.min_samples_split:
            return None, None

        if len(np.unique(y)) == 1:
            return None, None

        parent_impurity = self._calculate_impurity(y)

        for feature_idx in range(n_features):
            unique_values = np.unique(X[:, feature_idx])

            sorted_unique = np.sort(unique_values)
            potential_thresholds = (sorted_unique[:-1] + sorted_unique[1:]) / 2

            if len(potential_thresholds) == 0:
                potential_thresholds = unique_values

            for threshold in potential_thresholds:
                left_indices = X[:, feature_idx] <= threshold
                right_indices = X[:, feature_idx] > threshold

                y_left, y_right = y[left_indices], y[right_indices]

                if len(y_left) > 0 and len(y_right) > 0:
                    gain = self._information_gain(y, y_left, y_right)

                    if gain >= self.min_impurity_decrease and gain > best_gain:
                        best_gain = gain
                        best_feature_idx = feature_idx
                        best_threshold = threshold

        return best_feature_idx, best_threshold


    def _build_tree(self, X, y, depth=0):
        if (len(np.unique(y)) == 1 or
            (self.max_depth is not None and depth >= self.max_depth) or
            len(y) < self.min_samples_split):

            classes, counts = np.unique(y, return_counts=True)
            majority_class = classes[np.argmax(counts)]
            return Node(value=majority_class)

        feature_idx, threshold = self._find_best_split(X, y)

        if feature_idx is None:
            classes, counts = np.unique(y, return_counts=True)
            majority_class = classes[np.argmax(counts)]
            return Node(value=majority_class)

        self.feature_importances_[feature_idx] = self.feature_importances_.get(feature_idx, 0) + 1

        left_indices = X[:, feature_idx] <= threshold
        right_indices = X[:, feature_idx] > threshold

        X_left, y_left = X[left_indices], y[left_indices]
        X_right, y_right = X[right_indices], y[right_indices]

        left_subtree = self._build_tree(X_left, y_left, depth + 1)
        right_subtree = self._build_tree(X_right, y_right, depth + 1)

        return Node(feature_idx=feature_idx, threshold=threshold,
                    left=left_subtree, right=right_subtree)

    def fit(self, X, y, feature_names=None):
        self.feature_names = feature_names
        self.root = self._build_tree(X, y)


    def _predict_single(self, x, node):
        if node.value is not None:
            return node.value

        if x[node.feature_idx] <= node.threshold:
            return self._predict_single(x, node.left)
        else:
            return self._predict_single(x, node.right)

    def predict(self, X):
        if self.root is None:
            raise ValueError("Tree not fitted yet.")

        predictions = [self._predict_single(x, self.root) for x in X]
        return np.array(predictions)


    def post_prune(self, X_val, y_val):
        if self.root is None: return

        def get_accuracy(model, X, y):
            return accuracy_score(y, model.predict(X))

        best_accuracy = get_accuracy(self, X_val, y_val)

        while True:
            improvement = False
            internal_nodes = self._get_internal_nodes(self.root)

            for node in internal_nodes:
                original_value = node.value
                original_feature_idx = node.feature_idx
                original_threshold = node.threshold
                original_left = node.left
                original_right = node.right

                y_subtree = self._get_subtree_labels(X_val, y_val, self.root, node)
                if len(y_subtree) == 0: continue

                classes, counts = np.unique(y_subtree, return_counts=True)
                majority_class = classes[np.argmax(counts)]

                node.feature_idx = None
                node.threshold = None
                node.left = None
                node.right = None
                node.value = majority_class

                pruned_accuracy = get_accuracy(self, X_val, y_val)

                if pruned_accuracy >= best_accuracy:
                    best_accuracy = pruned_accuracy
                    improvement = True
            else:
                    node.value = original_value
                    node.feature_idx = original_feature_idx
                    node.threshold = original_threshold
                    node.left = original_left
                    node.right = original_right

            if not improvement:
                break

    def _get_internal_nodes(self, node):
        nodes = []
        if node is not None and node.value is None:
            nodes.append(node)
            nodes.extend(self._get_internal_nodes(node.left))
            nodes.extend(self._get_internal_nodes(node.right))
        return nodes

    def _get_subtree_labels(self, X_val, y_val, current_root, target_node):

        reached_indices = []
        for i, x in enumerate(X_val):
            curr = current_root
            path_taken = []
            reached = False
            while curr is not None and curr.value is None:
                if curr == target_node:
                    reached = True
                    break

                if x[curr.feature_idx] <= curr.threshold:
                    curr = curr.left
                else:
                    curr = curr.right

            if reached:
                reached_indices.append(i)

        return y_val[reached_indices]

    def print_tree_structure(self, node=None, prefix="", is_left=True, feature_names=None):
        if node is None:
            node = self.root
        if node is not None:
            if node.value is not None:
                print(f"{prefix}└── Class {node.value} (LEAF)")
            else:
                feature_name = feature_names[node.feature_idx] if feature_names else f"Feature {node.feature_idx}"
                print(f"{prefix}├── IF {feature_name} <= {node.threshold:.2f}")
                self.print_tree_structure(node.left, prefix + ("|   " if is_left else "    "), True, feature_names)
                print(f"{prefix}└── ELSE {feature_name} > {node.threshold:.2f}")
                self.print_tree_structure(node.right, prefix + ("|   " if is_left else "    "), False, feature_names)

EVALUATION

In [None]:

def evaluate_model(model, X, y, label=""):
    y_pred = model.predict(X)

    accuracy = accuracy_score(y, y_pred)
    precision, recall, f1_score, _ = precision_recall_fscore_support(y, y_pred, average='binary', pos_label=1)
    conf_matrix = confusion_matrix(y, y_pred)

    print(f"{label} Results, ")
    print(f"Accuracy: {accuracy:.4f}")
    print(f"Precision: {precision:.4f}")
    print(f"Recall: {recall:.4f}")
    print(f"F1-score: {f1_score:.4f}")
    print(f"Confusion Matrix:\n{conf_matrix}")

    return accuracy, precision, recall, f1_score, conf_matrix


depths = [2, 4, 6, 1000]
results = []
feature_names_list = X_train.columns.tolist()

for impurity in ['gini', 'entropy']:
    for depth in depths:
        max_depth = depth if depth != 1000 else None

        dtc_scratch = DecisionTreeClassifierScratch(
            max_depth=max_depth,
            min_samples_split=5,
            impurity_measure=impurity
        )

        dtc_scratch.fit(X_train_np, y_train_np, feature_names=feature_names_list)

        acc, _, _, _, _ = evaluate_model(dtc_scratch, X_val_np, y_val_np, label=f"Val - Impurity: {impurity}, Depth: {depth if max_depth else 'Unlimited'}")

        results.append({
            'Type': 'Pre-Pruned',
            'Impurity': impurity,
            'Max_Depth': depth,
            'Val_Accuracy': acc,
            'Model': dtc_scratch
        })

best_pre_pruned = max(results, key=lambda x: x['Val_Accuracy'])
print(f"Best Pre-Pruned Model (Val): {best_pre_pruned['Impurity']} with Max Depth {best_pre_pruned['Max_Depth']}\n")

evaluate_model(best_pre_pruned['Model'], X_test_np, y_test_np, label=f"FINAL TEST - Best Pre-Pruned ({best_pre_pruned['Impurity']}, Depth {best_pre_pruned['Max_Depth']})")



full_tree = DecisionTreeClassifierScratch(max_depth=None, min_samples_split=2, impurity_measure='gini')
full_tree.fit(X_train_np, y_train_np)
print("Full Tree Validation Results")
full_acc, _, _, _, _ = evaluate_model(full_tree, X_val_np, y_val_np, label="Val - Full Tree")

post_pruned_tree = DecisionTreeClassifierScratch(max_depth=None, min_samples_split=2, impurity_measure='gini')
post_pruned_tree.fit(X_train_np, y_train_np)
post_pruned_tree.post_prune(X_val_np, y_val_np)

print("Post-Pruned Tree Validation Results")
post_pruned_acc, _, _, _, _ = evaluate_model(post_pruned_tree, X_val_np, y_val_np, label="Val - Post-Pruned Tree")

Val - Impurity: gini, Depth: 2 Results, 
Accuracy: 0.8379
Precision: 0.6324
Recall: 0.0573
F1-score: 0.1050
Confusion Matrix:
[[7493   50]
 [1416   86]]
Val - Impurity: gini, Depth: 4 Results, 
Accuracy: 0.8534
Precision: 0.6667
Recall: 0.2344
F1-score: 0.3468
Confusion Matrix:
[[7367  176]
 [1150  352]]
Val - Impurity: gini, Depth: 6 Results, 
Accuracy: 0.8519
Precision: 0.5925
Recall: 0.3455
F1-score: 0.4365
Confusion Matrix:
[[7186  357]
 [ 983  519]]


KeyboardInterrupt: 

In [None]:
# Final Test Set Comparison
print("FINAL TEST SET COMPARISON (Pre-Pruned vs. Post-Pruned vs. Full)")
print("Full Tree Test Results:")
evaluate_model(full_tree, X_test_np, y_test_np, label="Test - Full Tree")

print("Best Pre-Pruned Tree Test Results:")
evaluate_model(best_pre_pruned['Model'], X_test_np, y_test_np, label="Test - Best Pre-Pruned Tree")

print("Post-Pruned Tree Test Results:")
evaluate_model(post_pruned_tree, X_test_np, y_test_np, label="Test - Post-Pruned Tree")


print("Most Important Features (from Full Gini Tree)")
importance_df = pd.Series(full_tree.feature_importances_, index=feature_names_list).sort_values(ascending=False)
print(importance_df.head(5))

best_depth = best_pre_pruned['Max_Depth'] if best_pre_pruned['Max_Depth'] != 1000 else None
best_criterion = 'gini' if best_pre_pruned['Impurity'] == 'gini' else 'entropy'

sklearn_dtc = SklearnDTC(
    max_depth=best_depth,
    criterion=best_criterion,
    min_samples_split=5,
    random_state=42
)
sklearn_dtc.fit(X_train_np, y_train_np)

print(f"Sklearn DTC Comparison (Criterion: {best_criterion}, Max_Depth: {best_depth})")
evaluate_model(sklearn_dtc, X_test_np, y_test_np, label="Test - Sklearn DTC")