<a href="https://colab.research.google.com/github/Hos96/Decision-Tree-from-scratch/blob/main/parameter%20tuning%20code.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
! pip install graphviz

In [None]:
import itertools

import pandas as pd
import numpy as np
import time

from sklearn.model_selection import train_test_split, GridSearchCV, KFold
from sklearn.metrics import make_scorer, confusion_matrix, accuracy_score
import seaborn as sns
import matplotlib.pyplot as plt

from graphviz import Digraph
from joblib import Parallel, delayed

In [None]:
class TreeNode:
    def __init__(obj, feature=None, threshold=None, left=None, right=None, value=None):
        obj.feature = feature
        obj.threshold = threshold
        obj.left = left
        obj.right = right
        obj.value = value  # Value the node has if it is a leaf

    def is_leaf(obj):
        return obj.value is not None

In [None]:
class DecisionTree:
    def __init__(obj, maximum_depth=None, max_leaf_nodes=None, entropy_threshold=None, spliting_function=None, min_samples_split=2, feature_names=None):
        obj.maximum_depth = maximum_depth
        obj.max_leaf_nodes = max_leaf_nodes
        obj.entropy_threshold = entropy_threshold
        obj.spliting_function = spliting_function
        obj.min_samples_split = min_samples_split
        obj.root = None
        obj.feature_names = feature_names
        obj.leaf_count = 0
        obj.depth = 0
        obj.criterion_func = {
            'scaled_entropy': obj._scaled_entropy,
            'gini': obj._gini_impurity,
            'squared': obj._squared_impurity,
        }.get(obj.spliting_function)

    def get_parameters(obj, deep=True):
        return {
            'maximum_depth': obj.maximum_depth,
            'max_leaf_nodes': obj.max_leaf_nodes,
            'entropy_threshold': obj.entropy_threshold,
            'spliting_function': obj.spliting_function,
            'min_samples_split': obj.min_samples_split,
            'feature_names': obj.feature_names
        }

    def set_parameters(obj, **params):#for hyper parameter tuning
        for param, value in params.items():
            setattr(obj, param, value)#a function in python for modifing the attributes of objects
        return obj

    def fit(obj, X, y):
        obj.root = obj._grow_tree(X, y)

    def _grow_tree(obj, X, y, depth=0):
        num_samples, num_features = X.shape

        current_entropy = obj.criterion_func(y)

        if depth > obj.depth:
            obj.depth = depth

        if (obj.maximum_depth is not None and depth >= obj.maximum_depth) \
                or (obj.max_leaf_nodes is not None and obj.leaf_count >= obj.max_leaf_nodes) \
                or (obj.entropy_threshold is not None and current_entropy < obj.entropy_threshold) \
                or (num_samples < obj.min_samples_split) \
                or (np.unique(y).size == 1):
            leaf_value = obj._most_common_label(y)
            return TreeNode(value=leaf_value)

        feat_idxs = np.random.choice(num_features, num_features, replace=False)
        best_feat, best_thresh = obj._best_criteria(X, y, feat_idxs)
        if best_feat is None:
            leaf_value = obj._most_common_label(y)
            return TreeNode(value=leaf_value)

        obj.leaf_count += 1
        left_idxs, right_idxs = obj._split(X[:, best_feat], best_thresh)
        left = obj._grow_tree(X[left_idxs, :], y[left_idxs], depth + 1)
        right = obj._grow_tree(X[right_idxs, :], y[right_idxs], depth + 1)
        return TreeNode(best_feat, best_thresh, left, right)

    def _best_criteria(obj, X, y, feat_idxs):
        best_gain = -1
        split_idx, split_thresh = None, None
        for feat_idx in feat_idxs:
            X_column = X[:, feat_idx]
            thresholds = np.unique(X_column)
            for threshold in thresholds:
                gain = obj._gain(y, X_column, threshold)
                if gain > best_gain:
                    best_gain = gain
                    split_idx = feat_idx
                    split_thresh = threshold
        return split_idx, split_thresh

    def _split(obj, X_column, split_thresh):
        left_idxs = np.argwhere(X_column <= split_thresh).flatten()
        right_idxs = np.argwhere(X_column > split_thresh).flatten()
        return left_idxs, right_idxs

    def _gain(obj, y, X_column, split_thresh):

        parent_criterion = obj.criterion_func(y)

        left_idxs, right_idxs = obj._split(X_column, split_thresh)
        if len(left_idxs) == 0 or len(right_idxs) == 0:
            return 0

        y_left, y_right = y[left_idxs], y[right_idxs]
        child_criterion = obj._weighted_criterion(y_left, y_right, obj.criterion_func)
        gain = parent_criterion - child_criterion
        return gain

    def _weighted_criterion(obj, y_left, y_right, criterion_func):
        n = len(y_left) + len(y_right)
        p_left = len(y_left) / n
        p_right = len(y_right) / n
        return p_left * criterion_func(y_left) + p_right * criterion_func(y_right)

    def _scaled_entropy(obj, y):
        hist = np.bincount(y)
        probs = hist / len(y)
        scaled_ent = -np.sum([(p / 2) * np.log2(p) for p in probs if p > 0])
        return scaled_ent

    def _gini_impurity(obj, y):
        hist = np.bincount(y)
        probs = hist / len(y)
        gini = 1.0 - np.sum(probs ** 2)
        return gini
    def _squared_impurity(obj, y):
        hist = np.bincount(y)
        probs = hist / len(y)
        epsilon = 1e-10  #
        sqr = np.sum(np.sqrt((probs + epsilon) * (1 - probs + epsilon)))
        return sqr

    def predict(obj, X):
        return np.array([obj._investigate_tree(x, obj.root) for x in X])

    def _most_common_label(obj, y):
        return np.bincount(y).argmax()

    def _investigate_tree(obj, x, node):
        if node.is_leaf():
            return node.value
        if x[node.feature] <= node.threshold:
            return obj._investigate_tree(x, node.left)
        return obj._investigate_tree(x, node.right)

    def visualize_tree(obj, dot=None):
        if dot is None:
            dot = Digraph()

        def add_nodes_edges(dot, node):
            if node is None:
                return
            if node.is_leaf():
                dot.node(str(id(node)), f"Class {node.value}", shape='ellipse')
            else:
                if obj.feature_names is not None:
                    feature_name = obj.feature_names[node.feature]
                else:
                    feature_name = f"Feature {node.feature}"
                dot.node(str(id(node)), f"{feature_name} <= {node.threshold}", shape='box')
                if node.left is not None:
                    add_nodes_edges(dot, node.left)
                    dot.edge(str(id(node)), str(id(node.left)), '<=')
                if node.right is not None:
                    add_nodes_edges(dot, node.right)
                    dot.edge(str(id(node)), str(id(node.right)), '>')

        add_nodes_edges(dot, obj.root)
        return dot

def zero_one_loss(y_true, y_pred):
    return np.mean(y_pred != y_true)

def grid_search(X_train, y_train, param_grid, scoring_func):

    cv = KFold(n_splits=5, shuffle=True, random_state=42)

    def evaluate_parameters(params):
        current_scores = []
        depths = []
        leafs = []

        # 80% train, 20% validation
        for train_idxs, val_idxs in cv.split(X_train, y_train):
            X_train_cv, y_train_cv = X_train.iloc[train_idxs], y_train.iloc[train_idxs]
            X_val_cv, y_val_cv = X_train.iloc[val_idxs], y_train.iloc[val_idxs]

            model = DecisionTree(
                maximum_depth=params.get('maximum_depth'),
                max_leaf_nodes=params.get('max_leaf_nodes'),
                entropy_threshold=params.get('entropy_threshold'),
                spliting_function=params['spliting_function'],
                min_samples_split=2,
                feature_names=X_train.columns
            )

            model.fit(X_train_cv.values, y_train_cv.values)
            y_val_pred = model.predict(X_val_cv.values)
            score = scoring_func(y_val_cv, y_val_pred)
            #print(f"zero one loss: {score:.6f} with params: {params}")
            current_scores.append(score)
            depths.append(model.depth)
            leafs.append(model.leaf_count)


        mean_score = np.mean(current_scores)
        mean_depth = np.mean(depths)
        mean_leafs = np.mean(leafs)

        print(f"zero one loss: {mean_score:.5f} with params: {params} \t mean depth: {mean_depth:.1f} and mean leafs: {mean_leafs:.1f}")

        return params, mean_score

    param_combinations = [
        dict(zip(param_dict.keys(), values)) for param_dict in param_grid for values in
        itertools.product(*param_dict.values())
    ]

    print(f"\nTotal number of combinations: {len(param_combinations)}  x  5 cv = {5*len(param_combinations)} iterations\n")

    # Parallelizing the grid search
    results = Parallel(n_jobs=-1)(delayed(evaluate_parameters)(params) for params in param_combinations)

    # Find best parameters based on the returned results
    sorted_results = sorted(results, key=lambda x: x[1])[:10]

    print("\nTop 10 Results:")
    for rank, (params, mean_score) in enumerate(sorted_results, 1):
        print(f"Rank {rank}: Mean zero one loss: {mean_score:.6f} with params: {params}")

    return results, sorted_results[0][0], sorted_results[0][1]


if __name__ == '__main__':
    # Load the dataset
    data = df

    data_encoded = pd.get_dummies(data)
    data_encoded = data_encoded.astype(int)


    # Separate features and target
    X = data_encoded.drop(['class_p', 'class_e'], axis=1)  # Assuming 'class_p' is the target
    y = data_encoded['class_p']

    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, stratify=y, random_state=42)

    param_grid = [
        {
            'maximum_depth': [40],
            'spliting_function': ['scaled_entropy', 'gini', 'squared']
        },
        {
            'max_leaf_nodes': [150],
            'spliting_function': ['scaled_entropy', 'gini', 'squared']
        },
        {
            'entropy_threshold': [0.0001],
            'spliting_function': ['scaled_entropy', 'gini', 'squared']
        }
    ]

    custom_scorer = make_scorer(zero_one_loss, greater_is_better=False)

    start_time = time.time()
    results, best_params, best_score = grid_search(X_train, y_train, param_grid, zero_one_loss)
    end_time = time.time()

    execution_time = end_time - start_time
    print(f"Execution time: {execution_time} seconds")

    print(f"\nBest score: : {best_score:.6f}")
    print("Best Hyperparameters:", best_params)


    best_tree = DecisionTree(
        maximum_depth=best_params.get('maximum_depth'),
        max_leaf_nodes=best_params.get('max_leaf_nodes'),
        entropy_threshold=best_params.get('entropy_threshold'),
        spliting_function=best_params['spliting_function'],
        min_samples_split=2,  # This is fixed as per the original configuration
        feature_names=X_train.columns
    )


    best_tree.fit(X_train.values, y_train.values)

    # predictions on the training data
    y_pred = best_tree.predict(X_train.values)

    # Evaluate the model for training data
    accuracy = accuracy_score(y_train, y_pred)
    print(f"\nTrain accuracy: {accuracy:.6f}")

    y_test_pred = best_tree.predict(X_test.values)    # Prediction on the testing data


    # Evaluate the model on the testing data
    accuracy = accuracy_score(y_test, y_test_pred)
    print(f"Test accuracy: {accuracy:.6f}")

    # zero-one loss
    train_error = zero_one_loss(y_test.values, y_test_pred)
    print(f"zero one loss on test set with best params: {train_error:.6f}")

    conf_matrix = confusion_matrix(y_test.values, y_test_pred)  # confusion matrix


    # confusion matrix
    plt.figure(figsize=(12, 8))
    sns.heatmap(conf_matrix, annot=True, fmt='d', cmap='Reds', xticklabels=['Class 0', 'Class 1'],
                yticklabels=['Class 0', 'Class 1'])
    plt.xlabel('Predicted')
    plt.ylabel('Actual')
    plt.title('Confusion Matrix')
    plt.show()

    print("\n\n")

    # Visualize the tree
    dot = Digraph()
    dot = best_tree.visualize_tree(dot)
    dot.render('png/mushroom_tree', format='png', view=True)
