## Import tools

In [1]:
import numpy as np
import pandas as pd

## Node class

In [2]:
class Node():
    """
    Base Node class.
    If the tested input is below the threshold, go to the left tree, else go to the right tree.
    The value of the leaf node is the sought of target for classification or regression.
    """
    def __init__(self, feature_index=None, threshold=None, left=None, right=None, purity=None, value=None):

        # Decision node
        self.feature_index = feature_index
        self.threshold = threshold
        self.left = left
        self.right = right
        self.purity = purity

        # Leaf node
        self.value = value
        
class DecisionNode(Node):
    def __init__(self, feature_index, threshold, left, right, purity):
        super().__init__()
        self.feature_index = feature_index
        self.threshold = threshold
        self.left = left
        self.right = right
        self.purity = purity
        
class LeafNode(Node):
    def __init__(self, value):
        super().__init__()
        self.value = value
        


## Decision Tree Base Class

In [14]:
class DecisionTree():
    def __init__(self, min_samples_split=2, max_depth=2):
        "Initialize the Decision Tree Classifier"

        # Initialize the root of the tree
        self.root = None

        # Stopping conditions
        self.min_samples_split = min_samples_split
        self.max_depth = max_depth
        
    def build_tree(self, dataset, curr_depth=0):
        "Recursively build the decision tree"

        X, Y = dataset[:, :-1], dataset[:, -1]
        num_samples, num_features = np.shape(X)

        # Split until stopping conditions are met
        if num_samples >= self.min_samples_split and curr_depth <= self.max_depth:

            # Find  best split
            best_split = self.get_best_split(dataset, num_samples, num_features)

            # Check if IG is positive (0 being a leaf node)
            if best_split["purity"] > 0:

                # Recursively build left and right subtrees
                left_subtree = self.build_tree(
                    best_split["dataset_left"], curr_depth+1)
                right_subtree = self.build_tree(
                    best_split["dataset_right"], curr_depth+1)

                # Return decision node
                return DecisionNode(best_split["feature_index"], best_split["threshold"],
                            left_subtree, right_subtree, best_split["purity"])

        # Return leaf node
        leaf_value = self.get_leaf_value(Y)
        return LeafNode(value=leaf_value)
    
    def split(self, dataset, feature_index, threshold):

        dataset_left = np.array([row for row in dataset if row[feature_index] <= threshold])
        dataset_right = np.array([row for row in dataset if row[feature_index] > threshold])
        return dataset_left, dataset_right
    
    def print_tree(self, tree=None, indent=" "):
        "Recurvisely print the tree's nodes"

        if not tree:
            tree = self.root

        if tree.value is not None:
            print(tree.value)

        else:
            print()
            print("X_"+str(tree.feature_index), "<=", tree.threshold, "?", tree.purity)
            print("%sleft:" % (indent), end="")
            self.print_tree(tree.left, indent + indent)
            print("%sright:" % (indent), end="")
            self.print_tree(tree.right, indent + indent)
            print()

    def fit(self, X, Y):
        "Fit the input train data"

        print('Input Shape (Samples, Features): ', X.shape)
        print('Target Shape (Samples): ', Y.shape)

        if Y.ndim == 1:
            Y = Y[:, None]
        
        dataset = np.concatenate((X, Y), axis=1)
        self.root = self.build_tree(dataset)

    def predict(self, X):
        "Predict the target of the given input X"

        return [self.make_prediction(x, self.root) for x in X]

    def make_prediction(self, x, tree):
        "Predict the target of a single point x"
        
        assert isinstance(tree, Node)

        if isinstance(tree, LeafNode):
            return tree.value
        else:
            feature_val = x[tree.feature_index]

            if feature_val <= tree.threshold:
                return self.make_prediction(x, tree.left)
            else:
                return self.make_prediction(x, tree.right)


## Decision Tree Classifier

In [15]:
class DecisionTreeClassifier(DecisionTree):
    def __init__(self, min_samples_split=2, max_depth=2):
        "Initialize the Decision Tree Classifier"
        super().__init__(min_samples_split, max_depth)

    def get_best_split(self, dataset, num_samples, num_features):
        "Find the best splif among the given dataset"

        best_split = {'purity':0}
        max_info_gain = -np.inf

        # Loop over all the features
        for feature_index in range(num_features):

            # Consider only thresholds present in the current feature
            feature_values = dataset[:, feature_index]
            possible_thresholds = np.unique(feature_values)

            # Loop over the observed thresholds
            for threshold in possible_thresholds:

                # Get current split
                dataset_left, dataset_right = self.split(
                    dataset, feature_index, threshold)

                # Check split is not trivial
                if len(dataset_left) > 0 and len(dataset_right) > 0:

                    y, left_y, right_y = dataset[:, -
                                                 1], dataset_left[:, -1], dataset_right[:, -1]

                    # Compute information gain
                    information_gain = self.information_gain(
                        y, left_y, right_y, 'gini')

                    # Update the best split if needed
                    if information_gain > max_info_gain:
                        best_split["feature_index"] = feature_index
                        best_split["threshold"] = threshold
                        best_split["dataset_left"] = dataset_left
                        best_split["dataset_right"] = dataset_right
                        best_split["purity"] = information_gain
                        max_info_gain = information_gain

        return best_split

    def information_gain(self, parent, l_child, r_child, mode='entropy'):

        assert mode == 'gini' or mode == 'entropy', f"Function `information_gain` called with wrong `mode` kwarg: {mode}"

        if mode == 'gini':
            gain = self.gini_index(parent) - (len(l_child) * self.gini_index(
                l_child) + len(r_child) * self.gini_index(r_child)) / len(parent)
        else:
            gain = self.entropy(parent) - (len(l_child)*self.entropy(l_child) +
                                           len(r_child)*self.entropy(r_child)) / len(parent)

        return gain

    def entropy(self, y):
        "Compute -sum_i(p_i log(p_i))"

        labels, counts = np.unique(y, return_counts=True)
        pi = counts/len(y)
        return - np.sum(pi * np.log(pi))

    def gini_index(self, y):
        "Compute 1 - sum_i(p_i^2)"

        labels, counts = np.unique(y, return_counts=True)
        return 1. - np.sum((counts/len(y))**2)

    def get_leaf_value(self, Y):
        "Compute the most occurring element in Y"

        Y = list(Y)
        return max(Y, key=Y.count)


## Run Classifier

In [16]:
from sklearn.datasets import load_iris

# Load Data
iris = load_iris()
X = iris['data']
Y = iris['target']

X_train, X_test, Y_train, Y_test = train_test_split(X, Y, test_size=.2, random_state=41)

print('Train input shape:', X_train.shape)
print('Train target shape:', Y_train.shape)
print('Test input shape:', X_test.shape)
print('Test target shape:', Y_test.shape)

In [None]:
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score

# Fit model
classifier = DecisionTreeClassifier(min_samples_split=3, max_depth=3)
classifier.fit(X_train, Y_train)
classifier.print_tree()

# Test model
Y_pred = classifier.predict(X_test)
print('Final accuracy: ', accuracy_score(Y_test, Y_pred))

## Decision Tree Regressor

In [18]:
class DecisionTreeRegressor(DecisionTree):
    def __init__(self, min_samples_split=2, max_depth=2):
        "Initialize the Decision Tree Regressor"

        super().__init__(min_samples_split,max_depth)

    def get_best_split(self, dataset, num_samples, num_features):
        "Find the best splif among the given dataset"

        best_split = {}
        max_var_red = -np.inf

        # Loop over all the features
        for feature_index in range(num_features):

            # Consider only thresholds present in the current feature
            feature_values = dataset[:, feature_index]
            possible_thresholds = np.unique(feature_values)

            # Loop over the observed thresholds
            for threshold in possible_thresholds:

                # Get current split
                dataset_left, dataset_right = self.split(
                    dataset, feature_index, threshold)

                # Check split is not trivial
                if len(dataset_left) > 0 and len(dataset_right) > 0:

                    y, left_y, right_y = dataset[:, -
                                                 1], dataset_left[:, -1], dataset_right[:, -1]

                    # Compute information gain
                    variance_reduction = self.variance_reduction(
                        y, left_y, right_y)

                    # Update the best split if needed
                    if variance_reduction > max_var_red:
                        best_split["feature_index"] = feature_index
                        best_split["threshold"] = threshold
                        best_split["dataset_left"] = dataset_left
                        best_split["dataset_right"] = dataset_right
                        best_split["purity"] = variance_reduction
                        max_var_red = variance_reduction

        return best_split

    def variance_reduction(self, parent, l_child, r_child):
        "Compute the variance reduction wrt the parent node"

        return np.var(parent) - (len(l_child) * np.var(l_child) + len(r_child) * np.var(r_child)) / len(parent)

    def get_leaf_value(self, Y):
        "Return the average of the values contained in the node"

        return np.mean(Y)

## Run Regressor

In [None]:
from sklearn.datasets import load_iris

# Load Data
iris = load_iris()
X = iris['data']
Y = iris['target']

X_train, X_test, Y_train, Y_test = train_test_split(X, Y, test_size=.2, random_state=41)

print('Train input shape:', X_train.shape)
print('Train target shape:', Y_train.shape)
print('Test input shape:', X_test.shape)
print('Test target shape:', Y_test.shape)

In [None]:
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_squared_error

# Fit model
regressor = DecisionTreeRegressor(min_samples_split=3, max_depth=3)
regressor.fit(X_train, Y_train)
regressor.print_tree()

# Test model
Y_pred = regressor.predict(X_test) 

np.sqrt(mean_squared_error(Y_test, Y_pred))
