In [1]:
import pandas as pd
import numpy as np
from sklearn.datasets import make_regression
import random

In [2]:
X, y = make_regression(n_samples=50, n_features=20, n_informative=2, noise=5, random_state=42)
X = pd.DataFrame(X)
y = pd.Series(y)
X.columns = [f'col_{col}' for col in X.columns]

In [3]:
def _mse(y, y_pred):
    return np.mean((y - y_pred) ** 2) 

def _mae(y, y_pred):
    return np.mean(abs(y - y_pred))

def _rmse(y, y_pred):
    return np.sqrt(mse(y, y_pred))

def _r2(y, y_pred):
    mean_y = np.mean(y)
    return 1 - np.sum((y-y_pred)**2)/np.sum((y-mean_y)**2)

def _mape(y, y_pred):
    return 100 * np.mean(abs((y-y_pred)/y))

class MyLineReg:
    def __init__(self, n_iter, learning_rate, metric = None, reg = None, l1_coef=0, l2_coef=0, random_state=42):
        self.n_iter = n_iter
        self.learning_rate = learning_rate
        self.weights = None
        self.metric = metric
        self.reg = reg
        self.l1_coef = l1_coef
        self.l2_coef = l2_coef
        self.random_state = random_state
        self.scores = []

    def __str__(self):
        return f"MyLineReg class: n_iter={self.n_iter}, learning_rate={self.learning_rate}"
    
    def l1(self):
        return self.l1_coef * np.sign(self.weights) 
    
    def l2(self):
        return self.l2_coef * 2*self.weights
    
    def elasticnet(self):
        return self.l1() + self.l2()

    def fit(self, X, y, verbose=False):
        random.seed(self.random_state)
        n_samples, m_features = X.shape
        
        self.y = y
        self.X = np.hstack((np.ones((n_samples, 1)), X))
        self.weights = np.ones(m_features + 1)
        
        
        for i in range(self.n_iter):
            y_pred = np.dot(self.X, self.weights)
            
            if self.metric != None: 
                loss_function = globals()[self.metric]
                loss = loss_function(y, y_pred)
                self.scores.append(loss)
                
            gradient = (2 / n_samples) * np.dot(self.X.T, (y_pred - y))
            
            if self.reg == 'l1':
                gradient += self.l1()
            elif self.reg == 'l2':
                gradient += self.l2()
            elif self.reg == 'elasticnet':
                gradient += self.elasticnet()
                
            learning_rate = 0.1    
            if callable(self.learning_rate):
                learning_rate = self.learning_rate(i)
            else: 
                learning_rate = self.learning_rate    
                
            self.weights -= learning_rate * gradient
            
    def predict(self, X):
        return np.dot(X, self.weights)
            
    def get_coef(self):
        return self.weights[1:]
    
    def get_best_score(self):
        loss_function = globals()[self.metric]
        y_pred = np.dot(self.X, self.weights)   
        return loss_function(self.y, y_pred)

In [4]:
def mse(y):
    n = len(y)
    y_mean = y.mean()
    return ((y-y_mean)**2).mean()

def msep(y, X, Q):
    n = len(y)
    y_right = y.loc[X[X>Q].index]
    y_left = y.loc[X[X<=Q].index]

    mse_left = mse(y_left)
    mse_right = mse(y_right)
    
    n_left = len(y_left)
    n_right = len(y_right)
    
    return mse(y) - (n_left*mse_left/n + n_right*mse_right/n) 

class TreeNode:
    def __init__(self, split_column, split_value, predicted_classes):
        self.split_column = split_column
        self.split_value = split_value
        self.predicted_classes = predicted_classes
        self.left = None
        self.right = None
        
class MyTreeReg:
    def __init__(self, max_depth=5, min_samples_split=2, max_leafs=20, bins=None):
        self.max_depth = max_depth
        self.min_samples_split = min_samples_split
        self.max_leafs = max_leafs if max_leafs > 1 else 2
        self.bins = bins
        self.fi = None
        
    def __str__(self):
        return f"MyTreeClf class: max_depth={self.max_depth}, min_samples_split={self.min_samples_split}, max_leafs={self.max_leafs}"
    
    def fit(self,X, y):
        self.n = len(y)
        self.leafs_cnt = 1
        self.fi = {key: 0 for key in X.columns}
        if self.bins != None and self.bins < X.shape[0] - 2:
            self.bins = self.get_bins(X)
        else:
            self.bins = None
        self.node = self.build_tree(X, y,0)
        
    def predict(self, X):
        self.y_predict = pd.Series(index=X.index)
        self.prediction(self.node, X)
        return self.y_predict
    
    def get_bins(self, X):
        bins = pd.DataFrame()
        for column in X.columns:
            sample = np.histogram(X[column], bins=self.bins)[1][1:-1]
            bins[column] = sample    
        return bins
    
    def prediction(self, node, X):
        if type(node) == np.float64:
            self.y_predict[X.index] = node
            return
        
        left_indexes = X[node.split_column] <= node.split_value
        right_indexes = X[node.split_column] > node.split_value
        self.prediction(node.left, X[left_indexes])
        self.prediction(node.right, X[right_indexes])     
        
    def build_tree(self, X, y, current_depth):
        if self.leafs_cnt >= self.max_leafs:
            return self.build_leaf(X,y)
        
        if X.shape[0] <= 1 or len(np.unique(y)) <= 1:
            return self.build_leaf(X,y)
        
        split_column, Q, ig = self.get_best_split(X, y)
        node = TreeNode(split_column, Q, -1)
        
        
        if current_depth < self.max_depth and len(y) >= self.min_samples_split:

            left_indices = X[split_column] <= Q
            right_indices = X[split_column] > Q

            X_left = X[left_indices]
            y_left = y[left_indices]
            X_right = X[right_indices]
            y_right = y[right_indices]
    
            self.leafs_cnt += 1
            node.left = self.build_tree(X_left, y_left, current_depth + 1)
            node.right = self.build_tree(X_right, y_right, current_depth + 1)
            
            self.fi[split_column] += X.shape[0]/self.n*(
                mse(y) - 
                len(y_left)/len(y)*mse(y_left) -
                len(y_right)/len(y)*mse(y_right)
            )
            
        else:
            return self.build_leaf(X,y)
        return node
    
    def build_leaf(self, X, y):
        return np.sum(y[X.index]) / len(y[X.index])        
    
    def get_best_split(self, X, y):
        best_ig = 0
        best_Q = 0
        best_column_name = None
        prev_value = None
       
        if self.bins is None:
            for column in X.columns:
                column = X[column].sort_values()
                for index, value in column.items():
                    if prev_value != None:
                        Q = (prev_value + value)/2
                        ig = msep(y, column, Q)
                        if ig > best_ig:
                            best_ig = ig
                            best_Q = Q
                            best_column_name = column.name
                    prev_value = value
        else:
            for column in X.columns:
                sample = self.bins[column]
                column = X[column]
                for Q in sample:
                    ig = msep(y, column, Q)
                    if ig > best_ig:
                        best_ig = ig
                        best_Q = Q
                        best_column_name = column.name
          
        return best_column_name, best_Q, best_ig  
    
    def print_tree(self, node):
        if type(node) == np.float64:
            print(node)
            return
        print(node.split_column, node.split_value)
        self.print_tree(node.left)
        self.print_tree(node.right)        

In [12]:
class MyForestReg:
    
    def __init__(self, n_estimators=10, max_features=0.5, max_samples=0.5,
                 max_depth=5, min_samples_split=2, max_leafs=20, bins=16, random_state=42, oob_score=None):
            
        self.n_estimators = n_estimators    
        self.max_features = max_features
        self.max_samples = max_samples
        self.random_state = random_state
        self.max_depth = max_depth
        self.min_samples_split = min_samples_split
        self.max_leafs = max_leafs if max_leafs > 1 else 2
        self.bins = bins
        self.fi = None
        self.forest = []
        self.leafs_cnt = 0
        
    def __str__(self):
        return f"MyForestReg class: n_estimators={self.n_estimators}, max_features={self.max_features}," + f"max_samples={self.max_samples}, max_depth={self.max_depth}, min_samples_split={self.min_samples_split}, max_leafs={self.max_leafs}, bins={self.bins}, random_state={self.random_state}"
    
    def fit(self, X, y):
        random.seed(self.random_state)
        init_cols = X.columns.tolist()
        init_rows_cnt = X.shape[0]
        
        cols_smpl_cnt = int(np.round(len(X.columns) * self.max_features))
        rows_smpl_cnt = int(np.round(X.shape[0] * self.max_samples))
        
        for i in range(self.n_estimators):
            cols_idx = random.sample(init_cols, cols_smpl_cnt)
            rows_idx = random.sample(range(init_rows_cnt), rows_smpl_cnt)
            
            forest_tree = MyTreeReg(max_depth=self.max_depth, min_samples_split=self.min_samples_split,
                                   max_leafs=self.max_leafs, bins=self.bins)
            forest_tree.fit(X.loc[rows_idx, cols_idx], y[rows_idx])
            
            oob_rows_idx = X.index.difference(rows_idx)
            
            y_prediction = forest_tree.predict(X.loc[oob_rows_idx, cols_idx])
            self.oob_score_ += self.oob_score(y[oob_rows_idx], y_prediction)
            
            self.leafs_cnt += forest_tree.leafs_cnt
            
            self.forest.append(forest_tree)
            
    def predict(self, X):
        y_prediction = np.zeros(X.shape[0])
        for forest_tree in self.forest:
            y_prediction += forest_tree.predict(X)
        return y_prediction/X.shape[0]    

In [13]:
import copy

In [14]:
class MyBaggingReg:
    def __init__(self, estimator=None, n_estimators=10, max_samples=1.0, random_state=42):
        self.estimator = estimator
        self.n_estimators = n_estimators
        self.max_samples = max_samples
        self.random_state = random_state
        self.estimators = []
       
    def __str__(self):
        return f"MyBaggingReg class: estimator={self.estimator}, n_estimators={self.n_estimators}, max_samples={self.max_samples}, random_state={self.random_state}"
    
    def fit(self, X, y):
        random.seed(self.random_state)
        rows_smpl_cnt = int(np.round(X.shape[0] * self.max_samples))
        for i in range(self.n_estimators):
            estimator = copy.deepcopy(self.estimator)
            sample_rows_idx = random.choices(X.index, k=rows_smpl_cnt)
            estimator.fit(X.iloc[sample_rows_idx], y[sample_rows_idx])
            self.estimators.append(estimator)

In [11]:
MyBaggingReg(estimator=MyLineReg(n_iter=10, learning_rate=0.1)).fit(X, y)