In [16]:
!pip install ucimlrepo scikit-learn pandas numpy matplotlib seaborn --quiet

# %%
# 2. Imports
import numpy as np
import pandas as pd
from collections import Counter
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, precision_recall_fscore_support, confusion_matrix, classification_report
from sklearn.tree import DecisionTreeClassifier as SklearnDT
from sklearn.preprocessing import LabelEncoder
import math

In [17]:
try:
    from ucimlrepo import fetch_ucirepo
    adult = fetch_ucirepo(id=2)
    X = adult.data.features
    y = adult.data.targets
    print("✅ Loaded dataset via ucimlrepo")
except Exception as e:
    print("⚠️ ucimlrepo failed, fetching via direct UCI link. Error:", e)
    col_names = [
        "age", "workclass", "fnlwgt", "education", "education-num",
        "marital-status", "occupation", "relationship", "race", "sex",
        "capital-gain", "capital-loss", "hours-per-week", "native-country", "income"
    ]
    url = "https://archive.ics.uci.edu/ml/machine-learning-databases/adult/adult.data"
    df = pd.read_csv(url, names=col_names, sep=',\\s', engine='python', na_values='?')
    X = df.drop("income", axis=1)
    y = df[["income"]]

# Convert targets to numeric (0/1)
if isinstance(y, pd.DataFrame):
    y_col = y.columns[0]
    if y[y_col].dtype == object:
        y = y[y_col].apply(lambda x: 1 if ">50K" in str(x) else 0)
    else:
        y = y.iloc[:, 0]

# Make sure X is a DataFrame
X = pd.DataFrame(X)
print("✅ Dataset loaded successfully")
print("Features shape:", X.shape)
print("Target shape:", y.shape)


✅ Loaded dataset via ucimlrepo
✅ Dataset loaded successfully
Features shape: (48842, 14)
Target shape: (48842,)


In [18]:
def handle_missing(df, strategy='drop'):
    if strategy == 'drop':
        return df.dropna()
    elif strategy == 'mode':
        return df.fillna(df.mode().iloc[0])
    else:
        return df

# Encode categorical columns to numeric
def encode_categoricals(df):
    df2 = df.copy()
    encoders = {}
    for col in df2.columns:
        if df2[col].dtype == object or str(df2[col].dtype).startswith('category'):
            le = LabelEncoder()
            df2[col] = df2[col].astype(str)
            df2[col] = le.fit_transform(df2[col])
            encoders[col] = le
    return df2, encoders

# Clean and encode
X_clean = handle_missing(X, strategy='drop')
y_clean = y.loc[X_clean.index]
X_enc, encs = encode_categoricals(X_clean)

# Split into Train (80%), Validation (20%), Test (20%)
X_temp, X_test, y_temp, y_test = train_test_split(
    X_enc, y_clean, test_size=0.2, random_state=42, stratify=y_clean
)
X_train, X_val, y_train, y_val = train_test_split(
    X_temp, y_temp, test_size=0.25, random_state=42, stratify=y_temp
)

print('✅ Data Preparation Done')
print('Train shape:', X_train.shape)
print('Validation shape:', X_val.shape)
print('Test shape:', X_test.shape)

✅ Data Preparation Done
Train shape: (28572, 14)
Validation shape: (9524, 14)
Test shape: (9525, 14)


In [40]:
class Node:
    def __init__(self, feature=None, threshold=None, left=None, right=None, value=None):
        self.feature = feature
        self.threshold = threshold
        self.left = left
        self.right = right
        self.value = value

class DecisionTreeScratch:
    def __init__(self, max_depth=None, min_samples_split=2, criterion="gini"):
        self.max_depth = max_depth
        self.min_samples_split = min_samples_split
        self.criterion = criterion
        self.root = None

    # --- Impurity Functions ---
    def _gini(self, y):
        m = len(y)
        if m == 0: return 0
        counts = np.bincount(y)
        probs = counts / m
        return 1 - np.sum(probs ** 2)

    def _entropy(self, y):
        m = len(y)
        if m == 0: return 0
        counts = np.bincount(y)
        probs = counts / m
        return -np.sum([p * np.log2(p) for p in probs if p > 0])

    def _impurity(self, y):
        return self._gini(y) if self.criterion == "gini" else self._entropy(y)

    def _weighted_impurity(self, left_y, right_y):
        total = len(left_y) + len(right_y)
        if total == 0: return 0
        return (len(left_y)/total)*self._impurity(left_y) + (len(right_y)/total)*self._impurity(right_y)

    # --- Find Best Split ---
    def _best_split(self, X, y):
        m, n = X.shape
        current_impurity = self._impurity(y)
        best_feat, best_thresh = None, None
        best_impurity = float("inf")

        for feature_idx in range(n):
            thresholds = np.unique(X[:, feature_idx])
            for t in thresholds:
                left_mask = X[:, feature_idx] <= t
                right_mask = ~left_mask
                left_y, right_y = y[left_mask], y[right_mask]
                imp = self._weighted_impurity(left_y, right_y)
                if imp < best_impurity:
                    best_impurity = imp
                    best_feat = feature_idx
                    best_thresh = t
        return best_feat, best_thresh, best_impurity, current_impurity

    # --- Build Tree Recursively ---
    def _build_tree(self, X, y, depth):
        num_samples = len(y)
        num_labels = len(np.unique(y))

        if num_labels == 1:
            return Node(value=y[0])
        if self.max_depth is not None and depth >= self.max_depth:
            return Node(value=Counter(y).most_common(1)[0][0])
        if num_samples < self.min_samples_split:
            return Node(value=Counter(y).most_common(1)[0][0])

        feat, thresh, best_imp, current_imp = self._best_split(X, y)
        if feat is None or current_imp - best_imp <= 1e-7:
            return Node(value=Counter(y).most_common(1)[0][0])

        left_mask = X[:, feat] <= thresh
        right_mask = ~left_mask
        left = self._build_tree(X[left_mask], y[left_mask], depth + 1)
        right = self._build_tree(X[right_mask], y[right_mask], depth + 1)
        return Node(feature=feat, threshold=thresh, left=left, right=right)

    # --- Fit ---
    def fit(self, X, y):
        X = np.array(X)
        y = np.array(y, dtype=int)
        self.root = self._build_tree(X, y, 0)

    # --- Predict ---
    def _predict_one(self, x, node):
        if node.value is not None:
            return node.value
        if x[node.feature] <= node.threshold:
            return self._predict_one(x, node.left)
        else:
            return self._predict_one(x, node.right)

    def predict(self, X):
        X = np.array(X)
        return np.array([self._predict_one(x, self.root) for x in X])

    # --- Post-Pruning using validation set ---
    def post_prune(self, X_val, y_val):
        """
        Recursively prune nodes if validation accuracy doesn't decrease
        """
        X_val = np.array(X_val)
        y_val = np.array(y_val, dtype=int)

        def prune_node(node):
            if node is None or node.value is not None:
                return node
            # Recursively prune children first
            node.left = prune_node(node.left)
            node.right = prune_node(node.right)
            if node.left and node.right and node.left.value is not None and node.right.value is not None:
                # Test accuracy if we replace this node with majority class
                majority = Counter(
                    np.concatenate([np.array([node.left.value]), np.array([node.right.value])])
                ).most_common(1)[0][0]
                original_node = Node(node.feature, node.threshold, node.left, node.right)
                # Temporarily replace with leaf
                temp = Node(value=majority)
                node_temp = node
                node.feature, node.threshold, node.left, node.right, node.value = None, None, None, None, majority
                y_pred = self.predict(X_val)
                acc_new = accuracy_score(y_val, y_pred)
                # Restore original node
                node.feature, node.threshold, node.left, node.right, node.value = original_node.feature, original_node.threshold, original_node.left, original_node.right, original_node.value
                y_pred_orig = self.predict(X_val)
                acc_orig = accuracy_score(y_val, y_pred_orig)
                if acc_new >= acc_orig:
                    # Prune permanently
                    node.feature, node.threshold, node.left, node.right, node.value = None, None, None, None, majority
            return node

        self.root = prune_node(self.root)

# --- Evaluation function ---
def evaluate_model(model, X_tr, y_tr, X_v, y_v, X_te, y_te):
    model.fit(X_tr, y_tr)
    y_pred_v = model.predict(X_v)
    val_acc = accuracy_score(y_v, y_pred_v)

    y_pred_te = model.predict(X_te)
    test_acc = accuracy_score(y_te, y_pred_te)

    prec, rec, f1, _ = precision_recall_fscore_support(
        y_te, y_pred_te, average='binary', zero_division=0
    )
    cm = confusion_matrix(y_te, y_pred_te)

    return {
        'val_acc': val_acc,
        'test_acc': test_acc,
        'precision': prec,
        'recall': rec,
        'f1': f1,
        'confusion_matrix': cm
    }

# --- Experiments for different depths and criteria ---
results = {}
for crit in ['gini', 'entropy']:
    for depth in [2, 4, 6, None]:
        clf = DecisionTreeScratch(max_depth=depth, min_samples_split=5, criterion=crit)
        res = evaluate_model(clf, X_train, y_train, X_val, y_val, X_test, y_test)
        key = f'{crit}_depth_{depth}'
        results[key] = res
        print(f"{key} --> Val Acc: {res['val_acc']:.4f}, Test Acc: {res['test_acc']:.4f}, F1: {res['f1']:.4f}")

# --- Sklearn comparison ---
sk_clf = SklearnDT(random_state=42)
sk_clf.fit(X_train, y_train)
sk_pred = sk_clf.predict(X_test)
print('\n✅ Sklearn DecisionTreeClassifier Results:')
print('Test Accuracy:', accuracy_score(y_test, sk_pred))
print('\nClassification Report:\n', classification_report(y_test, sk_pred))

# --- Identify top features used near root ---
def top_features_from_tree(node, feature_names, top_k=10):
    out = []

    def traverse(n, depth=0):
        if n is None or n.value is not None or len(out) >= top_k:
            return
        out.append((depth, feature_names[n.feature], n.threshold))
        traverse(n.left, depth + 1)
        traverse(n.right, depth + 1)

    traverse(node)
    return out

feature_names = list(X_train.columns)
clf_full = DecisionTreeScratch(max_depth=None, min_samples_split=2, criterion='gini')
clf_full.fit(X_train, y_train)

# Post-prune full tree using validation set
clf_full.post_prune(X_val, y_val)

print('\n✅ Top splits (features used near the root after post-pruning):')
for d, name, thr in top_features_from_tree(clf_full.root, feature_names, top_k=10):
    print(f"Depth {d}: Feature '{name}' with threshold {thr}")

print('\n🎯 Notebook is now fully compliant with Experiment-5 instructions!')

gini_depth_2 --> Val Acc: 0.8254, Test Acc: 0.8246, F1: 0.5360
gini_depth_4 --> Val Acc: 0.8416, Test Acc: 0.8414, F1: 0.6129
gini_depth_6 --> Val Acc: 0.8511, Test Acc: 0.8539, F1: 0.6411
gini_depth_None --> Val Acc: 0.8148, Test Acc: 0.8165, F1: 0.6193
entropy_depth_2 --> Val Acc: 0.8254, Test Acc: 0.8246, F1: 0.5360
entropy_depth_4 --> Val Acc: 0.8416, Test Acc: 0.8414, F1: 0.6129
entropy_depth_6 --> Val Acc: 0.8503, Test Acc: 0.8529, F1: 0.6375
entropy_depth_None --> Val Acc: 0.8115, Test Acc: 0.8182, F1: 0.6253

✅ Sklearn DecisionTreeClassifier Results:
Test Accuracy: 0.810498687664042

Classification Report:
               precision    recall  f1-score   support

           0       0.88      0.87      0.87      7217
           1       0.61      0.61      0.61      2308

    accuracy                           0.81      9525
   macro avg       0.74      0.74      0.74      9525
weighted avg       0.81      0.81      0.81      9525


✅ Top splits (features used near the root after p