In [1]:
import random
import numpy as np
import pandas as pd
from sklearn.metrics import accuracy_score
from sklearn.metrics import mean_squared_error
from sklearn.model_selection import train_test_split
from sklearn import tree
from sklearn import ensemble

In [2]:
def mse_index(x):
    # return ((x - x.mean()) ** 2).mean()
    # return np.square(x - x.mean()).mean()
    return x.var()

def mse_gain(parent_node, splits):   
    splits_mse = np.sum([mse_index(split)*(len(split)/len(parent_node)) for split in splits])
    return mse_index(parent_node) - splits_mse

def split(X, y, value):      
    left_mask = X < value
    right_mask = X >= value
    return y[left_mask], y[right_mask]

def split_dataset(X, y, column, value):     
    left_mask = X[:, column] < value
    right_mask = X[:, column] >= value
    left_y, right_y = y[left_mask], y[right_mask]
    left_X, right_X = X[left_mask], X[right_mask]
    return left_X, right_X, left_y, right_y

In [3]:
class DecisionTreeRegressor(object):

    def __init__(self, criterion=None):
        self.impurity = None
        self.threshold = None
        self.column_index = None
        self.outcome_probs = None
        self.criterion = criterion
        self.left_child = None
        self.right_child = None

    @property
    def is_terminal(self):         
        return not bool(self.left_child and self.right_child)

    def _find_splits(self, X):
        split_values = set()
        x_unique = list(np.unique(X))
        for i in range(1, len(x_unique)):
            # average = (x_unique[i - 1] + x_unique[i]) / 2.
            average = np.mean((x_unique[i - 1], x_unique[i]))
            split_values.add(average)

        return list(split_values)

    def _find_best_split(self, X, y, n_features):
        random.seed(42)
        subset = random.sample(list(range(0, X.shape[1])), n_features)
        max_gain, max_col, max_val = None, None, None
        for column in subset:
            split_values = self._find_splits(X[:, column])
            for value in split_values:
                splits = split(X[:, column], y, value)
                gain = self.criterion(y, splits)
                if (max_gain is None) or (gain > max_gain):
                    max_col, max_val, max_gain = column, value, gain
        return max_col, max_val, max_gain

    def fit(self, X, y, n_features=None, max_depth=None):    
        try:
            if max_depth is not None:
                assert max_depth > 0
                max_depth -= 1

            if n_features is None:
                n_features = X.shape[1]

            column, value, gain = self._find_best_split(X, y, n_features)
            assert gain is not None

            self.column_index = column
            self.threshold = value
            self.impurity = gain

            left_X, right_X, left_target, right_target = split_dataset(X, y, column, value)

            self.left_child = DecisionTreeRegressor(self.criterion)
            self.left_child.fit(
                left_X, left_target, n_features, max_depth
            )

            self.right_child = DecisionTreeRegressor(self.criterion)
            self.right_child.fit(
                right_X, right_target, n_features, max_depth
            )
        except AssertionError:
            self.outcome_probs = np.sum(y) / y.shape[0] #@!
            
    def predict_row(self, row):
        
        if not self.is_terminal:
            if row[self.column_index] < self.threshold:
                return self.left_child.predict_row(row)
            else:
                return self.right_child.predict_row(row)
        return self.outcome_probs

    def predict(self, X):
        result = np.zeros(X.shape[0])
        for i in range(X.shape[0]):
            result[i] = self.predict_row(X[i, :])
        return result

In [4]:
class RandomForestRegressor(object):

    def __init__(self, n_estimators=10, max_depth=None, n_features=None, criterion="mse", bootstrap=True):
        self.n_estimators = n_estimators
        self.max_depth = max_depth
        self.n_features = n_features
        self.bootstrap = bootstrap
        
        if criterion == "mse": #@!
            self.criterion = mse_gain
        else:
            raise ValueError(f"Unknown criterion '{criterion}'")
            
        self.trees = [DecisionTreeRegressor(criterion=self.criterion) for _ in range(n_estimators)]
        
    def _init_data(self, X, y):

        self.size = len(X)
        
        if not isinstance(X, np.ndarray):
            self.X = np.array(X)
        else:
            self.X = X

        if not isinstance(y, np.ndarray):
            self.y = np.array(y)
        else:
            self.y = y
            
    def bootstrap_data(self, size):
        np.random.seed(42)
        return np.random.randint(size, size=size)
    
    def fit(self, X, y):
       
        if self.n_features is None:
            self.n_features = int(np.sqrt(X.shape[1]))
        elif X.shape[1] < self.n_features:
            raise ValueError(f"'n_features should be <= n_features'")
            
        self._init_data(X, y)
        
        for tree in self.trees:
            if self.bootstrap:
                idxs = self.bootstrap_data(self.size)
                X = self.X[idxs]
                y = self.y[idxs]
            else:
                X = self.X
                y = self.y
                
            tree.fit(
                X,
                y,
                n_features=self.n_features,
                max_depth=self.max_depth,
            )
            
    def predict(self, X):
           
        if not isinstance(X, np.ndarray):
            X = np.array(X)

        if self.X is not None:
            predictions = np.zeros(len(X))
            for i in range(len(X)):
                row_pred = 0.
                for tree in self.trees:
                    row_pred += tree.predict_row(X[i, :])

                row_pred /= self.n_estimators
                predictions[i] = row_pred #@!
            return predictions  
        else:
            raise ValueError("You should fit a model before `predict`")

In [5]:
def f(X): 
    return X[:, 0]**3 + np.log(np.exp(X[:, 1]) + np.exp(X[:, 2])) + np.sqrt(abs(X[:, 3])) * X[:, 4]

n_samples = 100
stdv = 1. / np.sqrt(5)
X = np.random.uniform(-stdv, stdv, size = (n_samples, 5))
y = f(X)

In [31]:
t1 = DecisionTreeRegressor(criterion=mse_gain)
t1.fit(X, y, max_depth=5)
y1 = t1.predict(X)
print(f"MSE is: {mean_squared_error(y, y1)}")

MSE is: 0.001869299243728372


In [32]:
t2 = tree.DecisionTreeRegressor(criterion='mse', max_depth=5, random_state=42, max_features=5, splitter='best', min_samples_split=2, min_samples_leaf=1)
t2.fit(X, y)
y2 = t2.predict(X)
print(f"MSE is: {mean_squared_error(y, y2)}")

MSE is: 0.001869299243728372


In [33]:
t1.threshold, t1.column_index

(-0.21197722203507108, 2)

In [34]:
t2.tree_.threshold[0], t2.tree_.feature[0]

(-0.21197722107172012, 2)

In [35]:
# y1[y1 != y2], y2[y1 != y2]
# y1 != y2
y1[3], y2[3]

(0.6075477180903875, 0.6075477180903875)

In [40]:
f1 = RandomForestRegressor(n_estimators=10, max_depth=4, criterion='mse')
f1.fit(X, y)
y1 = f1.predict(X)
print(f"MSE is: {mean_squared_error(y, y1)}")

MSE is: 0.02817857721432049


In [41]:
f2 = ensemble.RandomForestRegressor(n_estimators=10, max_depth=4, criterion='mse', random_state=42)
f2.fit(X, y)
y2 = f2.predict(X)
print(f"MSE is: {mean_squared_error(y, y2)}")

MSE is: 0.004190721858253212


In [52]:
y1[:2], y2[:2]

(array([0.10267444, 0.85609033]), array([0.51914222, 0.87432236]))