In [53]:
import numpy as np
import pandas as pd
import random
from sklearn.datasets import make_regression

X, y = make_regression(n_samples=1000, n_features=14, n_informative=10, noise=15, random_state=42)
X = pd.DataFrame(X)
y = pd.Series(y)
X.columns = [f'col_{col}' for col in X.columns]

In [54]:
class MyLineReg():
    def __init__(self, n_iter, learning_rate=0.1, metric=None, reg=None, l1_coef=None, l2_coef=None, sgd_sample=None, random_state=42):
        """ 
        metric: r2, mse, mae, mape, rmse
        """
        self.n_iter = n_iter
        self.learning_rate = learning_rate
        self.weights = None
        self.metric = metric
        self.reg = reg
        self.l1_coef = l1_coef
        self.l2_coef = l2_coef
        self.sgd_sample = sgd_sample
        self.random_state = random_state

    def predict(self, X):
        X = X.copy()
        X.insert(0, "bias", 1)
        return X @ self.weights
    
    def get_loss_metric(self, y_pred, y):
        error = y_pred - y
        if (self.metric == None) | (self.metric == 'mse'):
            return np.sum(error ** 2) / len(y_pred)
        elif self.metric == 'rmse':
            return np.sqrt(np.sum(error ** 2) / len(y_pred))
        elif self.metric == 'mae':
            return np.sum(abs(error)) / len(y_pred)
        elif self.metric == 'mape':
            return 100 / len(y_pred) * np.sum(abs((error) / y))
        elif self.metric == 'r2':
            return 1 - np.sum((error) ** 2) / np.sum((y - np.mean(y)) ** 2)
        raise ValueError('Invalid metric. Supported metrics: mae, mse, rmse, mape, r2')
    
    def get_loss_regularization(self):
        if self.reg is None:
            return 0
        elif self.reg == 'l1':
            return self.l1_coef * np.sum(abs(self.weights))
        elif self.reg == 'l2':
            return self.l2_coef * np.sum(self.weights ** 2)
        elif self.reg == 'elasticnet':
            return self.l1_coef * np.sum(abs(self.weights)) + self.l2_coef * np.sum(self.weights ** 2)
        raise ValueError('Invalid regularization. Supported methods: l1, l2, elasticnet')

    def gr_mse(self, X, y_pred, y):
        error = y_pred - y
        if self.reg is None:
            return (2 / len(y_pred)) * X.T @ error
        if self.reg == 'l1':
            return (2 / len(y_pred)) * X.T @ error + self.l1_coef * (self.weights / abs(self.weights))
        elif self.reg == 'l2':
            return (2 / len(y_pred)) * X.T @ error + self.l2_coef * 2 * self.weights
        elif self.reg == 'elasticnet':
            return (2 / len(y_pred)) * X.T @ error + self.l1_coef * (self.weights / abs(self.weights)) + self.l2_coef * 2 * self.weights
        raise ValueError('Invalid regularization. Supported methods: l1, l2, elasticnet')

    def fit(self, X, y, verbose=False):
        random.seed(self.random_state)

        X = X.copy()
        y = y.copy()
        X.insert(0, "bias", 1)
        if self.weights is None:    
            self.weights = np.ones(X.shape[1])
        
        for epoch in range(1, self.n_iter + 1):
            X_batch, y_batch = self.get_batch(X, y, self.sgd_sample)

            step = self.learning_rate if type(self.learning_rate) in (int, float) else self.learning_rate(epoch)
            y_pred_batch = X_batch @ self.weights 
            self.weights -= step * self.gr_mse(X_batch, y_pred_batch, y_batch)
            
            y_pred = X @ self.weights # ошибка и метрика считаются на полном датасете
            self.loss_func_score = self.get_loss_metric(y_pred, y) + self.get_loss_regularization()
            if verbose and ((epoch % verbose == 0) | (epoch == 1)):
                if self.metric:
                    print(f"{epoch} | {self.metric} loss: {self.loss_func_score} | lr: {np.round(step, 5)}")
                else:
                    print(f"{epoch} | mse loss: {self.loss_func_score} | lr: {np.round(step, 5)}")
        self.best_score = self.loss_func_score

    def get_batch(self, X, y, sgd):
        if isinstance(sgd, int):
            sample_rows_idx = random.sample(range(X.shape[0]), sgd)
            return X.values[sample_rows_idx], y.values[sample_rows_idx]
        elif isinstance(sgd, float):
            sample_rows_idx = random.sample(range(X.shape[0]), round(X.shape[0] * sgd))
            return X.values[sample_rows_idx], y.values[sample_rows_idx]
        else:
            return X, y
        


    def get_coef(self):
        return self.weights[1:]
    
    def get_best_score(self):
        return self.best_score

    def __str__(self) -> str:
        return f"{self.__class__.__name__} class: " + ", ".join("%s=%s" % item for item in vars(self).items())
    
    def __repr__(self) -> str:
        return f"{self.__class__.__name__} class: " + ", ".join("%s=%s" % item for item in vars(self).items())

In [55]:
reg = MyLineReg(100, metric='rmse', sgd_sample=6)

In [56]:
reg.fit(X, y, 2)

1 | rmse loss: 137.7622142187194 | lr: 0.1
2 | rmse loss: 122.80546499612828 | lr: 0.1
4 | rmse loss: 80.86024030449997 | lr: 0.1
6 | rmse loss: 57.049175535776754 | lr: 0.1
8 | rmse loss: 42.233238174096535 | lr: 0.1
10 | rmse loss: 34.67522291963384 | lr: 0.1
12 | rmse loss: 29.192685239096733 | lr: 0.1
14 | rmse loss: 22.68350911708069 | lr: 0.1
16 | rmse loss: 22.106307548479073 | lr: 0.1
18 | rmse loss: 20.51574504737952 | lr: 0.1
20 | rmse loss: 19.301256105344166 | lr: 0.1
22 | rmse loss: 19.522649768707023 | lr: 0.1
24 | rmse loss: 19.566587366970882 | lr: 0.1
26 | rmse loss: 19.718701345272724 | lr: 0.1
28 | rmse loss: 19.722676330522944 | lr: 0.1
30 | rmse loss: 18.961610030199385 | lr: 0.1
32 | rmse loss: 18.940774459150067 | lr: 0.1
34 | rmse loss: 17.978058778477752 | lr: 0.1
36 | rmse loss: 17.46136067300869 | lr: 0.1
38 | rmse loss: 18.44026896338236 | lr: 0.1
40 | rmse loss: 16.800739522744312 | lr: 0.1
42 | rmse loss: 16.585130325386054 | lr: 0.1
44 | rmse loss: 17.376

In [243]:
reg.get_best_score()

11.997442020227814