<a href="https://colab.research.google.com/github/assafna/machine-learning-linear-regression-tree/blob/main/machine_learning_linear_regression_tree.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

The goal of this task is to create a linear regression tree that can predict a numerical value for a new sample based on linear regression models in the leaves.

The following will occur:


*   Each node in the tree will have a linear regression model and a MSE score.
*   Each split will be made based on the feature that gives the lowest MSE score. This feature's MSE score will be the lowest MSE score out of all possible splits of the feature values.
*   The MSE score will be normalized by the number of instances.
*   Split values will be the mean of each two (unique) consecutive samples.
*   All features will be considered in every depth.
*   Nominal features will be split to all values.
*   The tree will recieve a minimal samples to split parameter.
*   The regression algorithm will be chosen by the user.
*   The tree will be created from top to bottom.
*   No pruning will be made.
*   There are no missing values.



In [1]:
import pandas as pd
import numpy as np
from sklearn.metrics import mean_squared_error
from sklearn.linear_model import LinearRegression

In [2]:
id = 0

class Node:
  def __init__(self, parent_node_id, parent_depth, df, split_node, linear_regression_model, columns, mse_score):
    global id
    self.id = id
    id += 1
    self.depth = parent_depth + 1
    self.linear_regression_model = linear_regression_model
    self.mse_score = mse_score
    self.columns = columns

    results = split_node(df, self.depth)

    # no split
    if results is None:
      self.is_leaf = True
      print('Created node #' + str(self.id), ' (leaf), child of node #' + str(parent_node_id))
      return

    # split
    best_split_feature, best_split_value, best_split_models, best_split_models_columns, best_split_mse_scores = results
    self.is_leaf = False
    self.split_feature = best_split_feature

    print('Created node #' + str(self.id), ' (' + str(self.split_feature) + ' - ' + str(best_split_value) + '), child of node #' + str(parent_node_id))

    # categorical or numeric
    if isinstance(best_split_value, list):
      self.is_numeric = False
      self.children = {child_value: Node(self.id, self.depth, df[df[self.split_feature] == child_value], split_node, child_model, child_model_columns, child_mse_score) for 
                       child_value, child_model, child_model_columns, child_mse_score in zip(best_split_value, best_split_models, best_split_models_columns, best_split_mse_scores)}
    else:
      self.is_numeric = True
      self.split_value = best_split_value
      self.children = [
        Node(self.id, self.depth, df[df[self.split_feature] < self.split_value], split_node, best_split_models[0], best_split_models_columns[0], best_split_mse_scores[0]),
        Node(self.id, self.depth, df[df[self.split_feature] > self.split_value], split_node, best_split_models[1], best_split_models_columns[1], best_split_mse_scores[1])
      ]

  def predict(self, X):
    # a leaf
    if self.is_leaf:
      X_dummies = pd.get_dummies(X)
      return list(self.linear_regression_model.predict(X_dummies))

    # numeric
    if self.is_numeric:
      lower_values = X[X[self.split_feature] < self.split_value]
      greater_values = X[X[self.split_feature] > self.split_value]
      return self.children[0].predict(lower_values) + self.children[1].predict(greater_values)

    # categorial
    y_pred = []
    for value in X[self.split_feature].unique():
      equal_values = X[X[self.split_feature] == value]
      y_pred += self.children[value].predict(equal_values)

    return y_pred

In [3]:
class LinearRegressionTree:
  def __init__(self, linear_regression_model, min_samples_for_split, max_depth, target_variable):
    self.linear_regression_model = linear_regression_model
    self.min_samples_for_split = min_samples_for_split
    self.max_depth = max_depth
    self.target_variable = target_variable
    self.root = None

  def split_to_X_and_y(self, df):
    X = df.drop(columns=[self.target_variable])
    y = df[self.target_variable]

    return X, y

  def linear_model_fit_and_predict(self, df):
    X, y = self.split_to_X_and_y(df)
    
    # categorical features to numerical
    X = pd.get_dummies(X)

    model = self.linear_regression_model().fit(X, y)
    y_pred = model.predict(X)

    return model, list(X.columns), y, y_pred

  def compute_mse_score(self, y_true, y_pred):
    return mean_squared_error(y_true, y_pred) / len(y_true)

  def fit(self, df):
    model, model_columns, y_true, y_pred = self.linear_model_fit_and_predict(df)
    mse_score = self.compute_mse_score(y_true, y_pred)
    self.root = Node(None, 0, df, self.split_node, model, model_columns, mse_score)

  def numeric_feature_best_mse_score(self, df, feature):
    best_weighted_mse_score = float('inf')
    best_split_value = None
    best_split_models = []
    best_split_models_columns = []
    best_split_mse_scores = []

    feature_values = df[feature]
    feature_unique_values = np.sort(feature_values.unique())
    for value_index in range(len(feature_unique_values) - 1):
      value = (feature_unique_values[value_index] + 
              feature_unique_values[value_index + 1]) / 2

      lower_values = df[feature_values.lt(value)]
      greater_values = df[feature_values.gt(value)]

      lower_model, lower_model_columns, lower_model_y_true, lower_model_y_pred = self.linear_model_fit_and_predict(lower_values)
      greater_model, greater_model_columns, greater_model_y_true, greater_model_y_pred = self.linear_model_fit_and_predict(greater_values)

      lower_mse = self.compute_mse_score(lower_model_y_true, lower_model_y_pred)
      greater_mse = self.compute_mse_score(greater_model_y_true, greater_model_y_pred)

      weighted_mse_score = (lower_mse + greater_mse) / 2
      if weighted_mse_score < best_weighted_mse_score:
        best_weighted_mse_score = weighted_mse_score
        best_split_value = value
        best_split_models = [lower_model, greater_model]
        best_split_models_columns = [lower_model_columns, greater_model_columns]
        best_split_mse_scores = [lower_mse, greater_mse]
    
    return best_weighted_mse_score, best_split_value, best_split_models, best_split_models_columns, best_split_mse_scores

  def categorical_feature_mse_score(self, df, feature):
    models = []
    models_columns = []
    mse_scores = []

    feature_values = df[feature]
    feature_unique_values = feature_values.unique()
    for value in feature_unique_values:
      equal_values = df[feature_values.eq(value)]
      model, model_columns, y_true, y_pred = self.linear_model_fit_and_predict(equal_values)
      models.append(model)
      models_columns.append(model_columns)
      mse_score = self.compute_mse_score(y_true, y_pred)
      mse_scores.append(mse_score)

    weighted_mse_score = np.mean(mse_scores)
    return weighted_mse_score, list(feature_unique_values), models, models_columns, mse_scores

  def is_feature_numeric(self, df, feature):
    return df.dtypes[feature] == 'int64' or df.dtypes[feature] == 'float64'

  def features_best_mse_score(self, df):
    best_weighted_mse_score = float('inf')
    best_split_feature = None
    best_split_value = None
    best_split_models = None
    best_split_models_columns = None
    best_split_mse_scores = None

    features = list(df.columns)
    features.remove(self.target_variable)
    for feature in features:
      if self.is_feature_numeric(df, feature):
        weighted_mse_score, split_value, models, models_columns, mse_scores = self.numeric_feature_best_mse_score(df, feature)
      else:
        weighted_mse_score, split_value, models, models_columns, mse_scores = self.categorical_feature_mse_score(df, feature)
      
      if weighted_mse_score < best_weighted_mse_score:
        best_weighted_mse_score = weighted_mse_score
        best_split_feature = feature
        best_split_value = split_value
        best_split_models = models
        best_split_models_columns = models_columns
        best_split_mse_scores = mse_scores
    
    return best_split_feature, best_split_value, best_split_models, best_split_models_columns, best_split_mse_scores

  def split_node(self, df, depth):
    if len(df) < self.min_samples_for_split or depth > self.max_depth:
      return None

    return self.features_best_mse_score(df)

  def predict(self, X):
    return self.root.predict(X)

In [4]:
df = pd.read_csv('forestfires.csv')
linear_regression_tree = LinearRegressionTree(
    linear_regression_model=LinearRegression, 
    min_samples_for_split=5,
    max_depth=4,
    target_variable='area'
)
linear_regression_tree.fit(df)
X = df.drop(columns=['area'])
y = df['area']
y_pred = linear_regression_tree.predict(X)
mse_score = mean_squared_error(y, y_pred)
print(mse_score)

Created node #0  (DMC - 290.65), child of node #None
Created node #1  (FFMC - 34.55), child of node #0
Created node #2  (leaf), child of node #1
Created node #3  (DMC - 2.7), child of node #1
Created node #4  (leaf), child of node #3
Created node #5  (temp - 3.2), child of node #3
Created node #6  (leaf), child of node #5
Created node #7  (leaf), child of node #5
Created node #8  (leaf), child of node #0
4042.1161388601436


Let's compare it to a regular decision tree regressor

In [6]:
from sklearn.tree import DecisionTreeRegressor

df = pd.get_dummies(pd.read_csv('forestfires.csv'))
X = df.drop(columns=['area'])
y = df['area']
dtr = DecisionTreeRegressor(min_samples_split=5, max_depth=4)
dtr.fit(X, y)
y_pred = dtr.predict(X)
mse_score = mean_squared_error(y, y_pred)
print(mse_score)

1059.882993558152


There's a lot to improve