In [1]:
from scripts.DecisionTree import DecisionTree
from scripts.utils import *
import pandas as pd
import numpy as np
from ucimlrepo import fetch_ucirepo #for importing data
from summarytools import dfSummary

In [3]:
# fetch dataset 
secondary_mushroom = fetch_ucirepo(id=848) 
  
# data (as pandas dataframes) 
X_loaded = secondary_mushroom.data.features 
y_loaded = secondary_mushroom.data.targets

In [4]:
X = X_loaded.copy()
y = y_loaded.copy()

In [5]:
X = X.drop(['spore-print-color', 'veil-type', 'veil-color', 'stem-root'], axis=1)

In [6]:
for col in X:
    X.loc[:,col]=X.loc[:,col].astype(str) if X.loc[:,col].dtype == 'object' else X.loc[:,col]

In [7]:
#encode y into 0s and 1s. 
y_mapping = encode_labels(y)
print(y_mapping)

{'class': {'e': 0, 'p': 1}}


In [10]:
#split dataset into train and test
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

In [23]:
import numpy as np
import pandas as pd
from collections import Counter
from typing import Optional

class Node:
    def __init__(self, feature: str = None, threshold=None, left=None, right=None, *, value=None):
        self.feature = feature
        self.threshold = threshold
        self.left = left
        self.right = right
        self.value = value

    def is_leaf_node(self):
        return self.value is not None

class DecisionTree:
    def __init__(self, min_samples_split=2, max_depth=10, n_features=None, split_using="entropy"):
        if split_using not in ('entropy', 'gini', 'class_error'):
            raise ValueError(f"split_using argument must be one of ('entropy', 'gini', 'class_error')")
        self.min_samples_split = min_samples_split
        self.max_depth = max_depth
        self.n_features = n_features
        self.root = None
        self.split_using = split_using

    def fit(self, X, y):
        X = np.array(X)
        y = np.array(y)     
        self.leaves = []
        self.n_features = X.shape[1] if not self.n_features else min(X.shape[1], self.n_features)
        self.root = self._grow_tree(X, y)

    def _grow_tree(self, X, y, depth=0):
        n_samples, n_feats = X.shape
        n_labels = len(np.unique(y))

        if depth >= self.max_depth or n_labels == 1 or n_samples < self.min_samples_split:
            leaf_value = self._most_common_label(y)
            self.leaves.append(leaf_value)
            return Node(value=leaf_value)

        feat_idxs = np.random.choice(n_feats, self.n_features, replace=False)
        best_feature, best_thresh, best_gain = self._best_split(X, y, feat_idxs)
        if best_gain == 0:
            leaf_value = self._most_common_label(y)
            self.leaves.append(leaf_value)
            return Node(value=leaf_value)

        left_idxs, right_idxs = self._split(X[:, best_feature], best_thresh)
        left = self._grow_tree(X[left_idxs, :], y[left_idxs], depth + 1)
        right = self._grow_tree(X[right_idxs, :], y[right_idxs], depth + 1)
        return Node(best_feature, best_thresh, left, right)

    def _best_split(self, X, y, feat_idxs):
        best_gain = -1
        split_idx, split_threshold = None, None

        for feat_idx in feat_idxs:
            X_column = X[:, feat_idx]
            thresholds = np.unique(X_column.astype(str)) if self._iscategorical(X_column) else np.unique(X_column)
            #thresholds = np.unique(X_column.astype(str)) if X_column.dtype == 'object' else np.unique(X_column)

            for thr in thresholds:
                gain = self._gain(y, X_column, thr)
                if gain > best_gain:
                    best_gain = gain
                    split_idx = feat_idx
                    split_threshold = thr

        return split_idx, split_threshold, best_gain
    
    def _iscategorical(self, X_column):
        try:
            X_column = X_column.astype(float)
            return False
        except ValueError:
            return True


    def _gain(self, y, X_column, threshold):
        if self.split_using == 'entropy':
            parent_entropy = self._entropy(y)
            left_idxs, right_idxs = self._split(X_column, threshold)

            if len(left_idxs) == 0 or len(right_idxs) == 0:
                return 0

            n = len(y)
            n_l, n_r = len(left_idxs), len(right_idxs)
            e_l, e_r = self._entropy(y[left_idxs]), self._entropy(y[right_idxs])
            child_entropy = (n_l / n) * e_l + (n_r / n) * e_r

            information_gain = parent_entropy - child_entropy
            return information_gain

        elif self.split_using == 'gini':
            parent_gini = self._gini(y)
            left_idxs, right_idxs = self._split(X_column, threshold)

            if len(left_idxs) == 0 or len(right_idxs) == 0:
                return 0

            n = len(y)
            n_l, n_r = len(left_idxs), len(right_idxs)
            gini_l, gini_r = self._gini(y[left_idxs]), self._gini(y[right_idxs])
            child_gini = (n_l / n) * gini_l + (n_r / n) * gini_r

            information_gain = parent_gini - child_gini
            return information_gain

        elif self.split_using == 'class_error':
            parent_error = self._class_error(y)
            left_idxs, right_idxs = self._split(X_column, threshold)

            if len(left_idxs) == 0 or len(right_idxs) == 0:
                return 0

            n = len(y)
            n_l, n_r = len(left_idxs), len(right_idxs)
            error_l, error_r = self._class_error(y[left_idxs]), self._class_error(y[right_idxs])
            child_error = (n_l / n) * error_l + (n_r / n) * error_r

            information_gain = parent_error - child_error
            return information_gain

    def _split(self, X_column, split_thresh):
        try:
            split_thresh = float(split_thresh)
            X_column = X_column.astype(float)
            left_idxs = np.argwhere(X_column <= split_thresh).flatten()
            right_idxs = np.argwhere(X_column > split_thresh).flatten()

        except ValueError:
            #since all thresholds have been converted into strings, for correct handling of nulls, X_column will need to be converted into strings too
            split_thresh = str(split_thresh)
            X_column = X_column.astype(str)
            left_idxs = np.argwhere(X_column.astype(str) == split_thresh).flatten()
            right_idxs = np.argwhere(X_column.astype(str) != split_thresh).flatten()

        return left_idxs, right_idxs

    def _class_error(self, y):
        counter = Counter(y.flatten())  # Convert to list
        most_common_label, count = counter.most_common(1)[0]
        return 1 - (count / len(y))

    def _entropy(self, y):
        counts = np.bincount(y.flatten())
        total_count = len(y)
        probabilities = counts / total_count
        
        # Filter out zero probabilities to avoid log2(0)
        probabilities = probabilities[probabilities > 0]
        
        entropy = -np.sum(probabilities * np.log2(probabilities) + (1 - probabilities) * np.log2(1 - probabilities)) / 2
        return entropy


    def _gini(self, y):
        counts = np.bincount(y.flatten())
        total_count = len(y)
        probabilities = counts / total_count
        
        # Filter out zero probabilities to avoid unnecessary calculations
        probabilities = probabilities[probabilities > 0]
        
        gini = 2 * np.sum(probabilities * (1 - probabilities))
        return gini


    def _most_common_label(self, y):
        counter = Counter(y)
        value = counter.most_common(1)[0][0]
        return value

    def predict(self, X):
        y_pred = np.array([self._traverse_tree(x, self.root) for x in X])
        return y_pred

    def _traverse_tree(self, x, node):
        if node.is_leaf_node():
            return node.value
        if type(x[node.feature]) == str:
            if x[node.feature] == node.threshold:
                return self._traverse_tree(x, node.left)

            return self._traverse_tree(x, node.right)
        else:
            if x[node.feature] <= node.threshold:
                return self._traverse_tree(x, node.left)

            return self._traverse_tree(x, node.right)           

Now we train the tree using each of the three splitting methods

Model 1: Entropy

In [24]:
#training the decision tree
entropy_model = DecisionTree(split_using='entropy', max_depth=10)
entropy_model.fit(X_train, y_train)

  entropy = -np.sum(probabilities * np.log2(probabilities) + (1 - probabilities) * np.log2(1 - probabilities)) / 2
  entropy = -np.sum(probabilities * np.log2(probabilities) + (1 - probabilities) * np.log2(1 - probabilities)) / 2


In [25]:
#performance of entropy model
entropy_pred = entropy_model.predict(X_test)
accuracy(y_test, entropy_pred)
precision(y_test, entropy_pred)
recall(y_test, entropy_pred)

Accuracy: 90.34%
0.9033895529719993
Precision: 93.04%
0.930352798053528
Recall: 89.44%
0.8944444444444445


Model 2: Gini impurity

In [26]:
#training the decision tree
gini_model = DecisionTree(split_using='gini', max_depth=10)
gini_model.fit(X_train, y_train)

In [27]:
#performance of gini model
gini_pred = gini_model.predict(X_test)
accuracy(y_test, gini_pred)
precision(y_test, gini_pred)
recall(y_test, gini_pred)

Accuracy: 90.43%
Precision: 95.45%
Recall: 87.06%


0.8706140350877193

Model 3: Train Error

Next we adopt the training error using zero-one loss as a splitting criteria

In [28]:
#training the decision tree
class_error_model = DecisionTree(split_using='class_error', max_depth=10)
class_error_model.fit(X_train, y_train)

In [29]:
#performance of class_error model
class_error_pred = class_error_model.predict(X_test)
accuracy(y_test, class_error_pred)
precision(y_test, class_error_pred)
recall(y_test, class_error_pred)

Accuracy: 80.61%
Precision: 87.78%
Recall: 75.95%


0.7595029239766082

In [31]:
#training errors of each model
entropy_train = entropy_model.predict(X_train)
gini_train = gini_model.predict(X_train)
class_error_train = class_error_model.predict(X_train)
print(f"entropy train error: {zero_one_loss(y_train, entropy_train)}")
print(f"gini train error: {zero_one_loss(y_train, gini_train)}")
print(f"class_error train error: {zero_one_loss(y_train, class_error_train)}")

entropy train error: 0.09041039811687647
gini train error: 0.09415617644048715
class_error train error: 0.18579469859789172


It's easy to see that the training errors are similar to the test errors, showing that the model didn't overfit considering the stopping criteria.

finally we perform hyper parameter tuning to optimize the model on the max depth stopping criterion

In [33]:
#tryna see if at some point the validation starts to rise
#tuning the tree on the max_depth criterion

tuned_tree = tune(X_train, y_train, tune_on='max_depth',split_using='gini', start=10, stop=25)

tree depth: 10, validation error: 0.13632176849861835
tree depth: 11, validation error: 0.09620304984136731
tree depth: 12, validation error: 0.06273666973697677
tree depth: 13, validation error: 0.04175621737795517


KeyboardInterrupt: 

In [None]:
class TunedTree:
    def __init__(self, split_using, n_shuffles, start, stop, n_jobs=-1 ) -> None:
        self.split_using = split_using 
        self.n_shuffles = n_shuffles
        self.start = start
        self.stop = stop
        self.n_jobs = n_jobs


    def evaluate_model(X_train, y_train, X_validate, y_validate, tune_on, split_using, i, cv_index):
        params = {tune_on: i, 'split_using': split_using}
        tree = DecisionTree(**params)
        X_train = np.random.permutation
        tree.fit(X_train, y_train)
        y_pred = tree.predict(X_validate)
        validation_error = zero_one_loss(y_validate, y_pred)
        print(f"tree depth: {tree.max_depth}, , cv_index: {cv_index}, validation error: {validation_error}")
        return validation_error, i

    def _tune(X, y, tune_on, split_using, n_shuffles, start, stop, n_jobs=-1):
        for cv_index in range(n_shuffles):
            permuted_indices = np.random.permutation(X.shape[0])
            X_shuffled = X[permuted_indices]
            y_shuffled = y[permuted_indices]
            X_train, X_validate, y_train, y_validate = train_test_split(X_shuffled, y_shuffled, test_size=0.2)
        
            results = Parallel(n_jobs=n_jobs)(
                delayed(evaluate_model)(X_train, y_train, X_validate, y_validate, tune_on, split_using, n_shuffles, i)
                for i in range(start, stop)
            )
        
        best_error, best_depth = min(results, key=lambda x: x[0])
        
        print(f"best depth: {best_depth} split criterion: {split_using} validation error: {round(best_error * 100, 2)} %")
        return [(f"tree_depth: {i}", f"validation error:{error}") for error, i in results]

In [18]:
from joblib import Parallel, delayed
from scripts.utils import *
from scripts.DecisionTree import DecisionTree

def evaluate_model(X_train, y_train, X_validate, y_validate, tune_on, split_using, i, cv_index):
    params = {tune_on: i, 'split_using': split_using}
    tree = DecisionTree(**params)
    tree.fit(X_train, y_train)
    y_pred = tree.predict(X_validate)
    validation_error = zero_one_loss(y_validate, y_pred)
    print(f"Tree depth: {tree.max_depth}, Shuffle index: {cv_index}, Validation error: {validation_error}")
    return validation_error, i

def tune(X, y, tune_on, split_using, start, stop, n_shuffles=5, n_jobs=-1):
    results_per_depth = {i: [] for i in range(start, stop)}
    
    for cv_index in range(n_shuffles):
        # Shuffle the dataset
        permuted_indices = np.random.permutation(X.shape[0])
        X_shuffled = X[permuted_indices]
        y_shuffled = y[permuted_indices]
        
        # Split the shuffled data into training and validation sets
        X_train, X_validate, y_train, y_validate = train_test_split(X_shuffled, y_shuffled, test_size=0.2)
        
        # Evaluate the model for each depth
        results = Parallel(n_jobs=n_jobs)(
            delayed(evaluate_model)(X_train, y_train, X_validate, y_validate, tune_on, split_using, i, cv_index)
            for i in range(start, stop)
        )
        print(results)
        
        # Store the results for this shuffle
        for error, depth in results:
            results_per_depth[depth].append(error)
        
    
    # Compute mean validation error for each depth
    mean_errors = {depth: np.mean(errors) for depth, errors in results_per_depth.items()}
    
    best_depth = min(mean_errors, key=mean_errors.get)
    best_error = mean_errors[best_depth]
    
    print(f"Best depth: {best_depth}, Split criterion: {split_using}, Mean validation error: {round(best_error * 100, 2)} %")
    
    # Format results for return
    return [(f"tree_depth: {depth}", f"mean_validation_error: {error}") for depth, error in mean_errors.items()]


In [38]:
type(X)

pandas.core.frame.DataFrame

In [19]:
tuned_tree = tune(X_train,y_train, 'max_depth', split_using='gini', start=1, stop=5)

[(0.3900317265377136, 1), (0.34510285538839425, 2), (0.3131716303346638, 3), (0.3008903899293829, 4)]
[(0.3932043803090779, 1), (0.3485825401698905, 2), (0.31736772080646813, 3), (0.3059052297615392, 4)]
[(0.39044110121788966, 1), (0.34694504144918636, 2), (0.31951693787739227, 3), (0.30201617029986694, 4)]
[(0.38604032340599737, 1), (0.3445911370381742, 2), (0.31368334868488384, 3), (0.28072868693071334, 4)]
[(0.40047078088220245, 1), (0.35902159451437926, 2), (0.3280114624910449, 3), (0.3149114727254119, 4)]
Best depth: 4, Split criterion: gini, Mean validation error: 30.09 %


In [21]:
tuned_tree

[('tree_depth: 1', 'mean_validation_error: 0.3920376624705762'),
 ('tree_depth: 2', 'mean_validation_error: 0.3488486337120049'),
 ('tree_depth: 3', 'mean_validation_error: 0.3183502200388906'),
 ('tree_depth: 4', 'mean_validation_error: 0.30089038992938283')]

In [None]:
#retraining the tree with the best validation error
best_tree = DecisionTree(max_depth=20, split_using='gini')
best_tree.fit(X_train, y_train)

In [None]:
y_pred = best_tree.predict(X_test)
print(accuracy(y_test, y_pred))
print(precision(y_test, y_pred))
print(recall(y_test, y_pred))

Accuracy: 0.997789421974783
Precision: 0.9980991373007749
Recall: 0.997953216374269
