In [3]:
import numpy as np
import sklearn
from sklearn.model_selection import train_test_split
from sklearn.tree import DecisionTreeRegressor

RANDOM_SEED = 42

housing = sklearn.datasets.fetch_california_housing()

x_train, x_test, y_train, y_test = train_test_split(housing.data, housing.target, random_state=RANDOM_SEED)
print(x_train.shape, y_train.shape)
print(x_test.shape, y_test.shape)

(20640, 8)
(15480, 8) (15480,)
(5160, 8) (5160,)


# Baseline

In [7]:
regressor = DecisionTreeRegressor(
    random_state=RANDOM_SEED,
    min_samples_leaf=5,
    min_samples_split=20
)
regressor.fit(x_train, y_train)
baseline_acc = regressor.score(x_test, y_test)
print(baseline_acc)

0.6985753014500338


In [19]:
def mean_squared_error(y_true, y_pred):
    return np.sum((y_true - y_pred) ** 2) / len(y_true)

def coefficient_of_correlation(y_true, y_pred):
    u = np.sum(np.square(y_true - y_pred))
    v = np.sum(np.square(y_true - np.mean(y_true)))
    return 1 - u/v

class DecisionTreeRegressorNode:
    def __init__(self, min_samples_leaf=5, min_samples_split=20):
        self.min_samples_leaf = min_samples_leaf
        self.min_samples_split = min_samples_split
        self.best_feature_index = None
        self.best_threshold = None
        self.prediction = None
        self.left = None
        self.right = None

    def fit(self, x, y):
        if x.shape[0] < self.min_samples_split:
            self.prediction = np.mean(y)
            return

        num_features = x.shape[1]
        best_mse = np.inf
        for feature_index in range(num_features):
            unique_values = np.sort(np.unique(x[:, feature_index]))
            for i in range(self.min_samples_leaf, unique_values.shape[0]-self.min_samples_leaf-1):
                threshold = (unique_values[i+1] + unique_values[i]) / 2

                x_left, y_left, x_right, y_right = self._split_by_threshold(x, y, feature_index, threshold)

                mean_left = np.mean(y_left)
                mean_right = np.mean(y_right)

                mse_left = mean_squared_error(y_left, mean_left)
                mse_right = mean_squared_error(y_right, mean_right)

                mse = (mse_left * len(y_left) + mse_right * len(y_right)) / (len(y_left) + len(y_right))
                if best_mse > mse:
                    best_mse = mse
                    self.best_feature_index = feature_index
                    self.best_threshold = threshold

        if self.best_feature_index is None or self.best_threshold is None:
            self.prediction = np.mean(y)
            return

        x_left, y_left, x_right, y_right = self._split_by_threshold(x, y, self.best_feature_index, self.best_threshold)
        print(f"Node split: feature_index={self.best_feature_index}, threshold={self.best_threshold}, left_size={x_left.shape[0]}, right_size={x_right.shape[0]}")
        self.left = DecisionTreeRegressorNode(
            min_samples_leaf=self.min_samples_leaf,
            min_samples_split=self.min_samples_split
        )
        self.left.fit(x_left, y_left)
        self.right = DecisionTreeRegressorNode(
            min_samples_leaf=self.min_samples_leaf,
            min_samples_split=self.min_samples_split
        )
        self.right.fit(x_right, y_right)

    def predict(self, x):
        if self.prediction is not None:
            return self.prediction

        if x[self.best_feature_index] <= self.best_threshold:
            return self.left.predict(x)
        else:
            return self.right.predict(x)

    @staticmethod
    def _split_by_threshold(x, y, feature_index, threshold):
        x_left, y_left = x[x[:, feature_index] <= threshold], y[x[:, feature_index] <= threshold]
        x_right, y_right = x[x[:, feature_index] > threshold], y[x[:, feature_index] > threshold]
        return x_left, y_left, x_right, y_right

class CustomDecisionTreeRegressor:
    def __init__(
            self,
            min_samples_leaf=5,
            min_samples_split=20
    ):
        self.min_samples_leaf = min_samples_leaf
        self.min_samples_split = min_samples_split
        self.root = DecisionTreeRegressorNode(
            min_samples_leaf = self.min_samples_leaf,
            min_samples_split = self.min_samples_split
        )

    def fit(self, x, y):
        self.root.fit(x, y)

    def predict(self, x):
        return self.root.predict(x)

    def score(self, x, y):
        y_pred = [self.predict(x) for x in x]
        return coefficient_of_correlation(y, y_pred)


custom_regressor = CustomDecisionTreeRegressor()
custom_regressor.fit(x_train, y_train)
print(custom_regressor.score(x_test, y_test))



Feature 0
Feature 1
Feature 2
Feature 3
Feature 4
Feature 5
Feature 6
Feature 7
Node split: feature_index=0, threshold=5.03165, left_size=12147, right_size=3333
Feature 0
Feature 1
Feature 2
Feature 3
Feature 4
Feature 5
Feature 6
Feature 7
Node split: feature_index=0, threshold=3.0743, left_size=5883, right_size=6264
Feature 0
Feature 1
Feature 2
Feature 3
Feature 4
Feature 5
Feature 6
Feature 7
Node split: feature_index=2, threshold=4.200790513833992, left_size=2165, right_size=3718
Feature 0
Feature 1
Feature 2
Feature 3
Feature 4
Feature 5
Feature 6
Feature 7
Node split: feature_index=5, threshold=2.4993826802449393, left_size=676, right_size=1489
Feature 0
Feature 1
Feature 2
Feature 3
Feature 4
Feature 5
Feature 6
Feature 7
Node split: feature_index=0, threshold=2.2851, left_size=336, right_size=340
Feature 0
Feature 1
Feature 2
Feature 3
Feature 4
Feature 5
Feature 6
Feature 7
Node split: feature_index=2, threshold=3.3259634244836738, left_size=135, right_size=201
Feature 0
Feat