In [1]:
#Import Libraries
import numpy as np
import pandas as pd
from collections import Counter

In [2]:
#Node
class DTRegression():

    def __init__(
        self,
        X: pd.DataFrame, 
        Y: list,
        min_samples_split=None,
        max_depth=None,
        depth=None,
        node_type=None,
        rule=None):
      
        #Data goes to the node 
        self.Y = Y 
        self.X = X

        #Hyper parameters
        self.min_samples_split = min_samples_split if min_samples_split else 20 #If None then default value taken as 20
        self.max_depth = max_depth if max_depth else 5 #If None then default value taken as 5 

        #Depth
        self.depth = depth if depth else 0 #If None then default value taken as 0 

        #Features from the database
        self.features = list(self.X.columns)

        #Node 
        self.node_type = node_type if node_type else 'root'

        #Split 
        self.rule = rule if rule else ""

        #Mean of Y 
        self.y_mean = np.mean(Y)

        #Residuals 
        self.residuals = self.Y - self.y_mean

        #Mse of the node 
        self.mse = self.mse_cal(Y, self.y_mean)

        #Count of observations in the node 
        self.n = len(Y)

        #Left and Right nodes initialisation
        self.left = None 
        self.right = None 

        #Initialisation of splits
        self.best_feature = None 
        self.best_value = None 



    #MSE - Mean Square Error
    def mse_cal(y_act, y_hat) -> float:

        #Residuals 
        res = y_act - y_hat 

        #Residual Square 
        res_2 = res * res

        #Summation of residuals 
        r_sum = np.sum(res_2)

        #Total Samples
        n = len(y_act)        

        #Average 
        return r_sum / n
    



    #Moving average
    def ma(x: np.array, window: int) -> np.array:
        return np.convolve(x, np.ones(window), 'valid') / window
    #Convultion is a way of `multiplying together' two arrays of numbers, generally of different sizes
    #but of the same dimensionality, to produce a third array of numbers of the same dimensionality




    #Best splitting values
    def best_split(self) -> tuple:

        #Copy of the dataset
        df = self.X.copy()
        #Target Variable
        df['Y'] = self.Y

        #MSE of the input 
        mse_og = self.mse

        best_feature = None
        best_value = None

        for feature in self.features:
            #Drop missing values
            df2 = df.dropna().sort_values(feature)

            #Sort and moving average
            x_ma = self.ma(df2[feature].unique(), 2)

            for value in x_ma: 
                #This is done for all values, smaller than current value in sorted array of features
                #The smaller values are moved to left of the current value in the tree
                #Similarly all larger values are moved to the right of the current value in the tree

                #Left branch
                y_left = df2[df2[feature]<value]['Y'].values
                #Right branch
                y_right = df2[df2[feature]>=value]['Y'].values

                #Mean of the points in the leaf 
                mean_left = np.mean(y_left)
                mean_right = np.mean(y_right)

                #Residuals 
                res_left = y_left - mean_left 
                res_right = y_right - mean_right
                
                #Arranging the distance of each value in the left and right trees from respective means and it becomes a sorted array
                #Note: all left nodes are smaller and right are larger, so the array below is in the format - negative numbers, 0, positive numbers
                r_concat = np.concatenate((res_left, res_right), axis=None)

                #MSE 
                r_concat = r_concat * r_concat
                r = np.sum(r_concat)
                n = len(r)
                mse_new = r / n
                
                #Minimum mse -> reducing the error
                #When a mse lower than previous one is found, update the new mse value and continue to find values lower than the current mse

                #Check for best split 
                if mse_new < mse_og:
                    best_feature = feature
                    best_value = value 

                    #Updating the best split 
                    mse_og = mse_new

        return (best_feature, best_value)



    #def grow tree
    def grow_tree(self):
         
          #Copy of the dataset 
          df = self.X.copy()
          #Target variable
          df['Y'] = self.Y

          #split further 
          if (self.depth < self.max_depth) and (self.n >= self.min_samples_split):

              #best split 
              best_feature, best_value = self.best_split()

              if best_feature is not None:
                  #best split to the current node 
                  self.best_feature = best_feature
                  self.best_value = best_value

                  #left and right nodes
                  df_left, df_right = df[df[best_feature]<=best_value].copy(), df[df[best_feature]>best_value].copy()

                  #left node
                  left = DTRegression(
                      df_left['Y'].values.tolist(), 
                      df_left[self.features], 
                      depth=self.depth + 1, 
                      max_depth=self.max_depth, 
                      min_samples_split=self.min_samples_split, 
                      node_type='left_node',
                      rule=f"{best_feature} <= {round(best_value, 3)}"
                      )

                  self.left = left 
                  self.left.grow_tree()
                  
                  #right node
                  right = DTRegression(
                      df_right['Y'].values.tolist(), 
                      df_right[self.features], 
                      depth=self.depth + 1, 
                      max_depth=self.max_depth, 
                      min_samples_split=self.min_samples_split,
                      node_type='right_node',
                      rule=f"{best_feature} > {round(best_value, 2)}"
                      )

                  self.right = right
                  self.right.grow_tree()