### This Notebook will create the Decision Tree algorithm from scratch. The dataset used for this will be the sklearn diabetes which is a regression problem

In [133]:
from sklearn import datasets
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split

In [134]:
ds = datasets.load_diabetes()

In [135]:
X = np.array(ds.data)
y = np.array(ds.target)
X_train, X_test, y_train, y_test = train_test_split(X,y, random_state=42)

Now that we have the data, we need to make the decision tree. So let's look at the first step. 

The first step is to decide how we are going to separate the data. How do we find this? Since we don't know the best spli criteria, we will have to find it. We do this by iterating through each column and for each column we test every possible value to split and see which one is the best. However, if a column A can have all possible positive values, there would be infinite values for which to try to split the data. So, to circunvent this, for each column, we try to separate the labels  using only the values of that column that we found in our training data. But now, how do we determine what is the best split. There are different metrics that we can use, however, in this case, we will calculate the variance of the labels on each child node and sum them. The best split is the one that have the lower variance. 

So, let's build 5 functions:
 1) Calculates the variance of a list of values, returning a float 
 2) For a given n of labels y it sums the values of the variances, returning a float
 3) For a given column c and a list it loops through all its values and calculates the variance of the split caused by the value, returning a tuple with (value, sum of variance)
 4) For a given value v, return the indexes that are less than that value and the indexes that are equal or greater than that value, as a list of lists.
 5) For a matrix X and labels y, it loops through all the columns and executes function 3), calculating the best column and value to split on, returning a dictionary with the index of the column, the value and the variance

In [136]:
def calculate_variance(number_list:np.ndarray[float])->float:
    n = len(number_list)
    if n == 0:
        return float('inf')
    sum_list = 0
    for number in number_list:
        sum_list += number
    average = sum_list/n
    sum = 0
    for number in number_list:
        sum+=(number-average)**2
    variance = sum/n
    return variance

In [137]:
def calculate_lists_variances(list_of_lists:list[list[float]])->float:
    variance_sum = 0
    len_all = sum([len(ls) for ls in list_of_lists])
    for number_list in list_of_lists:
        variance_sum+=(len(number_list)/len_all)*calculate_variance(number_list)
    return variance_sum

In [138]:
def calculate_split_indexes(split_value:float, list_column_values:list[float])->list[list[int], list[int]]:
    indexes_smaller = []
    indexes_eq_bigger = []
    for index, value in enumerate(list_column_values):
        if value<split_value:
            indexes_smaller.append(index)
        else:
            indexes_eq_bigger.append(index)
    return [indexes_smaller, indexes_eq_bigger]


In [139]:
def calculate_best_column_variance(float_list:np.ndarray[float], labels:np.ndarray[float], min_variance:float)->tuple[float, float]:
    best_split = None
    for value in float_list:
        list_of_lists_of_indexes= calculate_split_indexes(value, float_list)
        labels_lists = [labels[indexes] for indexes in list_of_lists_of_indexes]
        value_variance = calculate_lists_variances(labels_lists)
        if value_variance< min_variance:
            min_variance = value_variance
            best_split = value
    return (best_split, min_variance)


In [140]:
def calculate_matrix_best_split(matrix: np.ndarray[float], labels: np.ndarray[float])->dict[str, float]:
    result_dictionary = {
        "column_index":-1,
        "split":-1,
        "variance":calculate_variance(labels)
    }
    for column_index in range(matrix.shape[1]):
        column_values = matrix[:, column_index] #select all values from the column of index column_index
        split_value, variance = calculate_best_column_variance(column_values, labels, result_dictionary['variance'])
        if variance<result_dictionary['variance']:
            result_dictionary['column_index'] = column_index
            result_dictionary['split'] = split_value
            result_dictionary['variance'] = variance
    return result_dictionary

### Now let's try to determine the best split for our data!

In [141]:
calculate_matrix_best_split(X_train, y_train)

{'column_index': 2,
 'split': 0.005649978676881689,
 'variance': 4288.930696708918}

Now we just need to keep doing this. Now the question is: Untill when do e keep doing this? We can do this until we can't do anymore split,(when every leaf node has only one label, multiple labels all with the same value or when the values are different but the feature rows values are all the same), when we reach a maximum number of iterations that we define, or we reach the maximum number of leaves that we define.

Let's first build one tree that does the split untill we can't do it anymore and then add the constraints. First we have to alter the function calculate_matrix_best_split and calculate_best_column_variance to also return the splits

In [142]:
def calculate_best_column_variance(float_list:np.ndarray[float], labels:np.ndarray[float], min_variance:float)->tuple[float, float]:
    best_split = None
    indexes_1 = np.empty((0))
    indexes_2 = np.empty((0))
    for value in float_list:
        list_of_lists_of_indexes= calculate_split_indexes(value, float_list)
        labels_lists = [labels[indexes] for indexes in list_of_lists_of_indexes]
        value_variance = calculate_lists_variances(labels_lists)
        if value_variance< min_variance:
            min_variance = value_variance
            best_split = value
            indexes_1 = list_of_lists_of_indexes[0]
            indexes_2 = list_of_lists_of_indexes[1]
    return (best_split, min_variance,indexes_1, indexes_2)


In [143]:
def calculate_matrix_best_split(matrix: np.ndarray[float], labels: np.ndarray[float])->dict[str, float]:
    result_dictionary = {
        "column_index":-1,
        "split":None,
        "variance":calculate_variance(labels),
        "X1":np.empty((0)),
        "X2":np.empty((0)),
        "y1":np.empty((0)),
        "y2":np.empty((0))
    }
    for column_index in range(matrix.shape[1]):
        column_values = matrix[:, column_index] #select all values from the column of index column_index
        split_value, variance, indexes_1, indexes_2 = calculate_best_column_variance(column_values, labels, result_dictionary['variance'])
        if variance<result_dictionary['variance']:
            result_dictionary['column_index'] = column_index
            result_dictionary['split'] = split_value
            result_dictionary['variance'] = variance
            result_dictionary["X1"] = matrix[indexes_1]
            result_dictionary["X2"] = matrix[indexes_2]
            result_dictionary["y1"] = labels[indexes_1]
            result_dictionary["y2"] = labels[indexes_2]
    return result_dictionary

In [144]:
import numpy as np

def split_data_best_split(X: np.ndarray, y: np.ndarray, tree: dict = None):
    if tree is None:
        tree = {}

    best_split_dct = calculate_matrix_best_split(X, y)
    if best_split_dct["split"] == None:
        tree["leaf"] = {"value":y}
    else:
        branch_1 = (best_split_dct["X1"], best_split_dct["y1"])
        branch_2 = (best_split_dct["X2"], best_split_dct["y2"])

        if len(branch_1[0]) > 1:
            tree['left'] = split_data_best_split(branch_1[0], branch_1[1])
            tree['column_index'] = best_split_dct['column_index']
            tree['split'] = best_split_dct['split']
        else:
            tree['leaf'] = {'value': branch_1[1]}  # Assuming leaf node value

        if len(branch_2[0]) > 1:
            tree['right'] = split_data_best_split(branch_2[0], branch_2[1])
            tree['column_index'] = best_split_dct['column_index']
            tree['split'] = best_split_dct['split']
        else:
            tree['leaf'] = {'value': branch_2[1]}  # Assuming leaf node value

    return tree


In [145]:
res = split_data_best_split(X_train, y_train)

In [146]:
res

{'left': {'left': {'left': {'left': {'left': {'left': {'left': {'left': {'leaf': {'value': array([143.])}},
        'column_index': 5,
        'split': -0.07866154748823384,
        'right': {'left': {'left': {'left': {'left': {'left': {'left': {'left': {'leaf': {'value': array([71., 71.])}},
               'column_index': 0,
               'split': -0.06000263174410134,
               'leaf': {'value': array([72.])}},
              'column_index': 0,
              'split': -0.056370093293081916,
              'leaf': {'value': array([70.])}},
             'column_index': 0,
             'split': -0.020044708782887707,
             'leaf': {'value': array([74.])}},
            'column_index': 2,
            'split': -0.004050329988045492,
            'leaf': {'value': array([61.])}},
           'column_index': 9,
           'split': -0.03007244590430716,
           'right': {'leaf': {'value': array([83.])}}},
          'column_index': 5,
          'split': -0.05423596746865012,
       

Nice, we were now able to separate the different samples and reduced the variance. If we now want to calculate the label y for an observation we just have to follow the tree and reach the leaf node. When we reach it, we calculate the average of the values of the leaf node and that is the value we want! Let's try

In [147]:
x = X_test[0]

In [148]:
def predict(x:np.ndarray, tree:dict)->float:
    if "leaf" in tree.keys():
        return np.mean(tree['leaf']['value'])
    else:
        if x[tree['column_index']]<tree["split"]:
            return predict(x, tree['left'])
        else:
            return predict(x, tree['right'])

In [156]:
predict(X_test[1], res)

265.0

In [157]:
y_test[1]

70.0

As we can see, it's pretty bad, but it works! Why is it so bad? Because it is overfitted with the training data

In [175]:
y_pred = [predict(x, res) for x in X_test]
error_train = sum((y_train_pred -y_true)**2 for y_train_pred, y_true in zip(y_pred, y_train)) / len(X_train)
error_train

4534.797583081571

In [176]:
y_pred = [predict(x, res) for x in X_test]
error_test = sum((y_train_pred -y_true)**2 for y_train_pred, y_true in zip(y_pred, y_test)) / len(X_test)
error_test

12172.018018018018

In [177]:
from sklearn.tree import DecisionTreeRegressor
tr = DecisionTreeRegressor()
tr.fit(X_train, y_train)

In [178]:
preds = tr.predict(X_train)
error_train = sum((y_train_pred -y_true)**2 for y_train_pred, y_true in zip(preds, y_train)) / len(X_train)
error_train

0.0

In [180]:
preds = tr.predict(X_test)
error_test = sum((y_train_pred -y_true)**2 for y_train_pred, y_true in zip(preds, y_test)) / len(X_test)
error_test

6049.846846846847