<a href="https://colab.research.google.com/github/Chuguevskij/ML_from_scratch/blob/main/decision_tree.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# imports
from __future__ import annotations
from typing import Tuple
from abc import ABC,abstractmethod
from scipy import stats
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split
from sklearn.datasets import load_breast_cancer,make_regression
from sklearn.metrics import accuracy_score,precision_score,recall_score,mean_squared_error,mean_absolute_error,r2_score

In [None]:
class Node(object):
    """
    Class to define & control tree nodes
    """

    def __init__(self) -> None:
        """
        Initializer for a Node class instance
        """
        self.__split    = None
        self.__feature  = None
        self.__left     = None
        self.__right    = None
        self.leaf_value = None

    def set_params(self, split: float, feature: int) -> None:
        """
        Set the split & feature parameters for this node

        Input:
            split   -> value to split feature on
            feature -> index of feature to be used in splitting
        """
        self.__split   = split
        self.__feature = feature

    def get_params(self) -> Tuple[float,int]:
        """
        Get the split & feature parameters for this node

        Output:
            Tuple containing (split,feature) pair
        """
        return(self.__split, self.__feature)

    def set_children(self, left: Node, right: Node) -> None:
        """
        Set the left/right child nodes for the current node

        Inputs:
            left  -> LHS child node
            right -> RHS child node
        """
        self.__left  = left
        self.__right = right

    def get_left_node(self) -> Node:
        """
        Get the left child node

        Output:
            LHS child node
        """
        return(self.__left)

    def get_right_node(self) -> Node:
        """
        Get the RHS child node

        Output:
            RHS child node
        """
        return(self.__right)

In [None]:
class DecisionTree(ABC):
    """
    Base class to encompass the CART algorithm
    """

    def __init__(self, max_depth: int=None, min_samples_split: int=2) -> None:
        """
        Initializer

        Inputs:
            max_depth         -> maximum depth the tree can grow
            min_samples_split -> minimum number of samples required to split a node
        """
        self.tree              = None
        self.max_depth         = max_depth
        self.min_samples_split = min_samples_split

    @abstractmethod
    def _impurity(self, D: np.array) -> None:
        """
        Protected function to define the impurity
        """
        pass

    @abstractmethod
    def _leaf_value(self, D: np.array) -> None:
        """
        Protected function to compute the value at a leaf node
        """
        pass

    def __grow(self, node: Node, D: np.array, level: int) -> None:
        """
        Private recursive function to grow the tree during training

        Inputs:
            node  -> input tree node
            D     -> sample of data at node
            level -> depth level in the tree for node
        """
        # are we in a leaf node?
        depth = (self.max_depth is None) or (self.max_depth >= (level+1))
        msamp = (self.min_samples_split <= D.shape[0])
        n_cls = np.unique(D[:,-1]).shape[0] != 1

        # not a leaf node
        if depth and msamp and n_cls:

            # initialize the function parameters
            ip_node = None
            feature = None
            split   = None
            left_D  = None
            right_D = None
            # iterate through the possible feature/split combinations
            for f in range(D.shape[1]-1):
                for s in np.unique(D[:,f]):
                    # for the current (f,s) combination, split the dataset
                    D_l = D[D[:,f]<=s]
                    D_r = D[D[:,f]>s]
                    # ensure we have non-empty arrays
                    if D_l.size and D_r.size:
                        # calculate the impurity
                        ip  = ((D_l.shape[0]/D.shape[0])*self._impurity(D_l) +
                               (D_r.shape[0]/D.shape[0])*self._impurity(D_r))
                        # now update the impurity and choice of (f,s)
                        if (ip_node is None) or (ip < ip_node):
                            ip_node = ip
                            feature = f
                            split   = s
                            left_D  = D_l
                            right_D = D_r
            # set the current node's parameters
            node.set_params(split,feature)
            # declare child nodes
            left_node  = Node()
            right_node = Node()
            node.set_children(left_node,right_node)
            # investigate child nodes
            self.__grow(node.get_left_node(),left_D,level+1)
            self.__grow(node.get_right_node(),right_D,level+1)

        # is a leaf node
        else:
            # set the node value & return
            node.leaf_value = self._leaf_value(D)
            return

    def __traverse(self, node: Node, Xrow: np.array) -> int | float:
        """
        Private recursive function to traverse the (trained) tree

        Inputs:
            node -> current node in the tree
            Xrow -> data sample being considered
        Output:
            leaf value corresponding to Xrow
        """
        # check if we're in a leaf node?
        if node.leaf_value is None:
            # get parameters at the node
            (s,f) = node.get_params()
            # decide to go left or right?
            if (Xrow[f] <= s):
                return(self.__traverse(node.get_left_node(),Xrow))
            else:
                return(self.__traverse(node.get_right_node(),Xrow))
        else:
            # return the leaf value
            return(node.leaf_value)

    def train(self, Xin: np.array, Yin: np.array) -> None:
        """
        Train the CART model

        Inputs:
            Xin -> input set of predictor features
            Yin -> input set of labels
        """
        # prepare the input data
        D = np.concatenate((Xin,Yin.reshape(-1,1)),axis=1)
        # set the root node of the tree
        self.tree = Node()
        # build the tree
        self.__grow(self.tree,D,1)

    def predict(self, Xin: np.array) -> np.array:
        """
        Make predictions from the trained CART model

        Input:
            Xin -> input set of predictor features
        Output:
            array of prediction values
        """
        # iterate through the rows of Xin
        p = []
        for r in range(Xin.shape[0]):
            p.append(self.__traverse(self.tree,Xin[r,:]))
        # return predictions
        return(np.array(p).flatten())

In [None]:
class DecisionTreeClassifier(DecisionTree):
    """
    Decision Tree Classifier
    """

    def __init__(self, max_depth: int=None, min_samples_split: int=2, loss: str='gini') -> None:
        """
        Initializer

        Inputs:
            max_depth         -> maximum depth the tree can grow
            min_samples_split -> minimum number of samples required to split a node
            loss              -> loss function to use during training
        """
        DecisionTree.__init__(self,max_depth,min_samples_split)
        self.loss = loss

    def __gini(self, D: np.array) -> float:
        """
        Private function to define the gini impurity

        Input:
            D -> data to compute the gini impurity over
        Output:
            Gini impurity for D
        """
        # initialize the output
        G = 0
        # iterate through the unique classes
        for c in np.unique(D[:,-1]):
            # compute p for the current c
            p = D[D[:,-1]==c].shape[0]/D.shape[0]
            # compute term for the current c
            G += p*(1-p)
        # return gini impurity
        return(G)

    def __entropy(self, D: np.array) -> float:
        """
        Private function to define the shannon entropy

        Input:
            D -> data to compute the shannon entropy over
        Output:
            Shannon entropy for D
        """
        # initialize the output
        H = 0
        # iterate through the unique classes
        for c in np.unique(D[:,-1]):
            # compute p for the current c
            p = D[D[:,-1]==c].shape[0]/D.shape[0]
            # compute term for the current c
            H -= p*np.log2(p)
        # return entropy
        return(H)

    def _impurity(self, D: np.array) -> float:
        """
        Protected function to define the impurity

        Input:
            D -> data to compute the impurity metric over
        Output:
            Impurity metric for D
        """
        # use the selected loss function to calculate the node impurity
        ip = None
        if self.loss == 'gini':
            ip = self.__gini(D)
        elif self.loss == 'entropy':
            ip = self.__entropy(D)
        # return results
        return(ip)

    def _leaf_value(self, D: np.array) -> int:
        """
        Protected function to compute the value at a leaf node

        Input:
            D -> data to compute the leaf value
        Output:
            Mode of D
        """
        return(stats.mode(D[:,-1],keepdims=False)[0])

In [None]:
class DecisionTreeRegressor(DecisionTree):
    """
    Decision Tree Regressor
    """

    def __init__(self, max_depth: int=None, min_samples_split: int=2, loss: str='mse') -> None:
        """
        Initializer

        Inputs:
            max_depth         -> maximum depth the tree can grow
            min_samples_split -> minimum number of samples required to split a node
            loss              -> loss function to use during training
        """
        DecisionTree.__init__(self,max_depth,min_samples_split)
        self.loss = loss

    def __mse(self, D: np.array) -> float:
        """
        Private function to define the mean squared error

        Input:
            D -> data to compute the MSE over
        Output:
            Mean squared error over D
        """
        # compute the mean target for the node
        y_m = np.mean(D[:,-1])
        # compute the mean squared error wrt the mean
        E = np.sum((D[:,-1] - y_m)**2)/D.shape[0]
        # return mse
        return(E)

    def __mae(self, D: np.array) -> float:
        """
        Private function to define the mean absolute error

        Input:
            D -> data to compute the MAE over
        Output:
            Mean absolute error over D
        """
        # compute the mean target for the node
        y_m = np.mean(D[:,-1])
        # compute the mean absolute error wrt the mean
        E = np.sum(np.abs(D[:,-1] - y_m))/D.shape[0]
        # return mae
        return(E)

    def _impurity(self, D: np.array) -> float:
        """
        Protected function to define the impurity

        Input:
            D -> data to compute the impurity metric over
        Output:
            Impurity metric for D
        """
        # use the selected loss function to calculate the node impurity
        ip = None
        if self.loss == 'mse':
            ip = self.__mse(D)
        elif self.loss == 'mae':
            ip = self.__mae(D)
        # return results
        return(ip)

    def _leaf_value(self, D: np.array) -> float:
        """
        Protected function to compute the value at a leaf node

        Input:
            D -> data to compute the leaf value
        Output:
            Mean of D
        """
        return(np.mean(D[:,-1]))

In [None]:
#test = np.array([[1, 4, 2, 3], [1, 4, 2, 6]])
test = np.array([1, 4, 2, 3])
#np.unique(test[:,-1]).shape[0]
print(test.shape)
print(test)
print()
print(test.reshape(-1,1).shape)
test.reshape(-1,1)

(4,)
[1 4 2 3]

(4, 1)


array([[1],
       [4],
       [2],
       [3]])

In [None]:
np.unique(test[:,-1]).shape

(2,)

In [None]:
test

array([[1, 4, 2, 4],
       [2, 4, 4, 4]])

In [None]:
test[:,:2]

array([[1, 4],
       [2, 4]])