In [579]:
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_absolute_error, mean_squared_error
import numpy as np
from itertools import combinations
import random
from sklearn.linear_model import ridge_regression
from collections import Counter

import pandas as pd

import bisect
from graphviz import Digraph

In [580]:
df = pd.read_csv('Bengaluru_House_Data.csv')

df.drop(['area_type', 'society', 'location'], axis=1, inplace=True)
df['availability'] = [1 if x.strip() == ('Ready To Move' or 'Immediate Possession')
                       else 0 for x in df['availability']]
df['size'] = df['size'].str.extract(r'(\d+)').astype('float16')
df['size'] = df['size'].fillna(df['size'].mean())
df['total_sqft'] = df['total_sqft'].str.extract(r'(\d+)').astype('float16')
df['bath'] = df['bath'].fillna(df['bath'].mean())
df['balcony'] = df['balcony'].fillna(df['balcony'].mean())
df.reset_index(drop=True, inplace=True)
df

  has_large_values = (abs_vals > 1e6).any()


Unnamed: 0,availability,size,total_sqft,bath,balcony,price
0,0,2.0,1056.0,2.0,1.000000,39.07
1,1,4.0,2600.0,5.0,3.000000,120.00
2,1,3.0,1440.0,2.0,3.000000,62.00
3,1,3.0,1521.0,3.0,1.000000,95.00
4,1,2.0,1200.0,2.0,1.000000,51.00
...,...,...,...,...,...,...
13315,1,5.0,3452.0,4.0,0.000000,231.00
13316,1,4.0,3600.0,5.0,1.584376,400.00
13317,1,2.0,1141.0,2.0,1.000000,60.00
13318,0,4.0,4688.0,4.0,1.000000,488.00


In [581]:
import torch as tc
import pandas as pd
import numpy as np

num_amostras = 5000


dados = pd.DataFrame()
dados['x1'] = 6 + tc.randn(num_amostras)
dados['x2'] = [1.5*np.sin(0.002*idx) + n for idx, n in enumerate(dados['x1'])]
dados['x3'] = [0.001*idx+n for idx, n in enumerate(dados['x1'])]
dados['x4'] = [abs(n*np.sin(0.0025*idx))  for idx, n in enumerate(dados['x1'])]
dados['x5'] = [n + float(tc.randn(1)) for idx, n in enumerate(dados['x1'])]
dados['x6'] = [0.0003*idx + 0.8*np.sin(0.0008*idx) + n + float(tc.randn(1)) for idx, n in enumerate(dados['x1'])]
dados['x7'] = tc.rand(num_amostras)
dados['x8'] = [-np.log(n) for idx, n in enumerate(dados['x7'])]
#dados['y'] = [10 + n*np.sin(0.0025*idx) + dados['x7'][idx]*np.sin(0.001*idx) + float(tc.randn(1)) for idx, n in enumerate(dados['x1'])]
dados['price'] = [n + dados['x2'][idx] + dados['x3'][idx] + dados['x4'][idx] + dados['x5'][idx] + dados['x6'][idx] + dados['x7'][idx] + dados['x8'][idx] for idx, n in enumerate(dados['x1'])]

df = dados

In [582]:
X_train, X_test, y_train, y_test = train_test_split(
    df.drop(columns=['price']), df['price'], test_size=0.2, random_state=42)

In [583]:
class Node:
    count=0
    def __init__(self, left, right, data=None, w=None, 
                 b=None, criterion_value=None, _result=None):
        self.id=Node.count
        Node.count += 1
    
        self.left: Node = left
        self.right: Node = right

        self.data = data
        self.w: int = w
        self.b: float = b
        self.criterion_value: float = criterion_value
        self.n_sample: int = len(data) if data is not None else 0
        self._result = _result

    def predict(self, x):
        z = x @ self.w - self.b
        if z < 0:
            return self.left.predict(x)
        
        elif z >= 0:
            return self.right.predict(x)

class LeafNode(Node):
    def __init__(self, data, criterion_value, _result):
        super().__init__(None, None, data=data, 
                         criterion_value=criterion_value, 
                         _result=_result)

    def predict(self, X=None):
        return self._result
    

In [584]:
class DecisionTree:
    def __init__(self, max_depth=5, min_samples_to_split=4, min_samples_leaf=2,
                 n_feature_split=2, n_feature_comb_split=10, min_cost_decrease=0.1,
                 prob_oblique_split=0.7, hist=True):
        self.root: Node = Node(None, None)
        self.max_depth = max_depth
        self.min_samples_to_split = min_samples_to_split
        self.min_samples_leaf = min_samples_leaf
        self.n_feature_split = n_feature_split
        self.n_feature_comb_split = n_feature_comb_split
        self.min_cost_decrease = min_cost_decrease
        self.prob_oblique_split = prob_oblique_split
        self.hist = hist

        
    
    def fit(self, X: np.ndarray, y: np.ndarray):
        self.n_sample, self.n_feature = X.shape

        if self.hist:
            #Converte y em bins usando a regra de Freedman-Diaconis
            bins = int((np.max(y) - np.min(y)) / (2 * (np.percentile(y, 75) - np.percentile(y, 25)) * len(y)**(-1/3)))
            edge = np.linspace(np.min(y), np.max(y), bins)
            y_bins = []
            for yi in y:
                bin = bisect.bisect_left(edge, yi)
                y_bins.append(edge[bin-1] + (edge[bin] - edge[bin-1]) / 2)
            y = y_bins       

        #Gera todas as combinações de features
        if self.n_feature > self.n_feature_split:
            self.feature_combination = list(combinations(np.arange(self.n_feature), self.n_feature_split))
        else:
            self.feature_combination = [np.arange(self.n_feature + 1)]

        self.root = self._grow(np.hstack((X.to_numpy(), np.array(y).reshape(-1,1))))
    
    def predict(self, X):
        y_pred = []
        for x in X.to_numpy():
            y_pred.append(self.root.predict(x))
        return y_pred

   
    def _split(self, feature_data, labels):
    
        #Binariza as labels
        if len(np.unique(labels)) == 2:
            binary_labels = labels
        else: 
            unique, counts = np.unique(labels, return_counts=True)
            most_frequent_class = unique[np.argmax(counts)]
            binary_labels = (labels == most_frequent_class).astype(int)

        binary_labels = (labels >= np.median(labels)).astype(int)

        if feature_data.ndim != 1:

            #Aplica regressão ridge para encontrar o hiperplano de separação
            if (Counter(binary_labels)[0] > self.min_samples_leaf and 
                Counter(binary_labels)[1] > self.min_samples_leaf):
                w = ridge_regression(feature_data, binary_labels, alpha=0.9)
            else:
                return None, None, np.inf
            
            #Soft Theresholding
            th = 0.1*np.max(np.abs(w))
            for i, weight in enumerate(w):
                if weight > th:
                    w[i] = weight - th
                elif weight < -th:
                    w[i] = weight + th
                else:
                    w[i] = 0.0

            #Define o b
            z = feature_data @ w
           
        else:
            w = np.array([1.0])
            z = feature_data

        

        sorted_z_idx = np.argsort(z)
        sorted_z = z[sorted_z_idx]
        sorted_y = labels[sorted_z_idx]

        best_b = None
        best_cost = np.inf

        for i in range(1, len(sorted_y)):
            if sorted_z[i] != sorted_z[i-1]:
                b_candidate = (sorted_z[i] + sorted_z[i-1]) / 2.0

                left_labels = labels[z > b_candidate]
                right_labels = labels[z <= b_candidate]

                m_left, m_right = len(left_labels), len(right_labels)
                m = m_left + m_right

                s_left = np.sum((left_labels - np.mean(left_labels))**2)/m_left
                s_right = np.sum((right_labels - np.mean(right_labels))**2)/m_right

                cost_value = s_left*(m_left/m) + s_right*(m_right/m)

                if cost_value < best_cost:
                    best_cost = cost_value
                    best_b = b_candidate
        
       
        return w, best_b, best_cost
    
    
    def _best_feature(self, data):

        min_feature_cost = np.inf
        min_w = None
        min_b = None
        selected_feature = []

        
        
        
        if random.uniform(0, 1) > self.prob_oblique_split:
            sampled_feature_list = random.sample(self.feature_combination, 
                                             self.n_feature_comb_split)
        else:
            sampled_feature_list = np.arange(self.n_feature)

        for feature_idxs in sampled_feature_list:

            w, b, split_cost = self._split(data[:, feature_idxs], data[:, -1])

            if split_cost < min_feature_cost:
                min_feature_cost = split_cost
                min_w = w
                min_b = b
                selected_feature = feature_idxs

        if min_feature_cost != np.inf:
            w_out = np.zeros(self.n_feature)
            if isinstance(selected_feature, tuple):
                for idx, f_idx in enumerate(selected_feature):
                    w_out[f_idx] = min_w[idx]
            else:
                w_out[selected_feature] = 1
        else:
            w_out = None


        return w_out, min_b, min_feature_cost
        

    def _grow(self, data, depth=1):

        y = data[:, -1]

        criterion_value = np.sum((y - np.mean(y))**2)
        result = np.mean(y)

        if ((depth >= self.max_depth) or
            (criterion_value < np.finfo('float32').eps)):
            return LeafNode(data, criterion_value=criterion_value, 
                            _result = result)
       
        w_out, min_b, min_feature_cost = self._best_feature(data)

        if min_feature_cost == np.inf:
            return LeafNode(data, criterion_value=criterion_value, 
                            _result=result)
        
        z = np.dot(data[:, :-1], w_out) - min_b
    
        left_data = data[z < 0]
        right_data = data[z >= 0]

        if ((len(left_data) < self.min_samples_leaf) or
            (len(right_data) < self.min_samples_leaf) or
            ((criterion_value/len(y)) - min_feature_cost <= self.min_cost_decrease)):           
            return LeafNode(data, criterion_value=criterion_value, 
                            _result=result)
        
        if len(left_data) < self.min_samples_to_split:
            left_y = left_data[:, -1]

            left_criterion_value = np.sum((left_y - np.mean(left_y))**2)
            left_result = np.mean(left_y)

            left_node = LeafNode(left_data, criterion_value=left_criterion_value, 
                                 _result = left_result)
        else:
            left_node = self._grow(left_data, depth=depth+1)


        if len(right_data) < self.min_samples_to_split:
            right_y = right_data[:, -1]

            right_criterion_value = np.sum((right_y - np.mean(right_y))**2)
            right_result = np.mean(right_y)

            right_node = LeafNode(right_data, criterion_value=right_criterion_value, 
                                 _result = right_result)
        else:
            right_node = self._grow(right_data, depth=depth+1)

        return Node(left_node, 
                    right_node,
                    data, 
                    w_out, 
                    min_b, 
                    criterion_value = criterion_value,
                    _result=result)
    

In [585]:
model_dt = DecisionTree(max_depth=100, min_samples_to_split=10, min_samples_leaf=3, 
                        n_feature_split=2, n_feature_comb_split=10, min_cost_decrease=0.1,
                        prob_oblique_split=0.7, hist=True)
model_dt.fit(X_train, y_train)
y_pred_dt = model_dt.predict(X_test)
mse_dt = mean_squared_error(y_test, y_pred_dt)
mae_dt = mean_absolute_error(y_test, y_pred_dt)

In [586]:
def print_tree(node=None, dot=None):
    if dot is None:
        dot = Digraph()
        dot.attr('graph', fontsize='80')
        dot.attr('node', fontsize='80')
    if node.left != None:
        dot.node(f'{node.id}', f'{node.w}\n{node.criterion_value}')
        print_tree(node.left, dot=dot)
        print_tree(node.right, dot=dot)
        dot.edge(f'{node.id}', f'{node.left.id}', constraint='true')
        dot.edge(f'{node.id}', f'{node.right.id}', constraint='true')
    else:
        dot.node(f'{node.id}', f'{node._result}\n{node.criterion_value}\n{node.n_sample}')
    

    return dot

In [587]:
d = print_tree(model_dt.root)
d.render('flowchart', format='pdf')

'flowchart.pdf'

In [588]:
from sklearn.ensemble import RandomForestRegressor
model_sklearn = RandomForestRegressor()
model_sklearn.fit(X_train, y_train)
y_pred_sklearn = model_sklearn.predict(X_test)
mse_sklearn = mean_squared_error(y_test, y_pred_sklearn)
mae_sklearn = mean_absolute_error(y_test, y_pred_sklearn)

In [589]:
from sklearn.tree import DecisionTreeRegressor
model_sklearn_dt = DecisionTreeRegressor()
model_sklearn_dt.fit(X_train, y_train)
y_pred_sklearn_dt = model_sklearn_dt.predict(X_test)
mse_sklearn_dt = mean_squared_error(y_test, y_pred_sklearn_dt)
mae_sklearn_dt = mean_absolute_error(y_test, y_pred_sklearn_dt)

In [590]:
print(f'Modelo                |     MSE    |   MAE   ')
print(f'----------------------|------------|----------')
print(f'Decision Tree         | {mse_dt:10.4f} | {mae_dt:8.4f} ')
print(f'Sklearn Decision Tree | {mse_sklearn_dt:10.4f} | {mae_sklearn_dt:8.4f} ')
print(f'Random Forest')
print(f'Sklearn Random Forest | {mse_sklearn:10.4f} | {mae_sklearn:8.4f} ')

Modelo                |     MSE    |   MAE   
----------------------|------------|----------
Decision Tree         |     2.4015 |   1.1907 
Sklearn Decision Tree |     2.5552 |   1.2373 
Random Forest
Sklearn Random Forest |     0.9505 |   0.7040 


In [591]:
print(f'{model_dt.root.left} - {model_dt.root.right}')

<__main__.Node object at 0x7875f88d25d0> - <__main__.Node object at 0x7874c8d456d0>
