In [3]:
import pandas as pd
import numpy as np
from typing import Literal
import random

In [13]:
class MyLineReg():
    def __init__(self, n_iter = 100, learning_rate = 0.1, metric = None, reg = None, l1_coef = 0, l2_coef = None, sgd_sample = None, random_state = 42):
        self.n_iter = n_iter
        self.learning_rate = learning_rate
        self.weights = None
        self.metric = metric
        self.best_score = None
        self.reg = reg
        self.l1_coef = l1_coef
        self.l2_coef = l2_coef
        self.sgd_sample = sgd_sample
        self.random_state = random_state
        
    def __str__(self):
        return f"MyLineReg class: n_iter={self.n_iter}, learning_rate={self.learning_rate}"
        
    def _calculate_metric(self, y_true, y_pred):
        if self.metric == None:
            return None
        y_true = np.array(y_true).flatten()
        y_pred = np.array(y_pred).flatten()
        
        if self.metric == "mae":
            return np.mean(np.abs(y_true - y_pred))
        elif self.metric == "rmse":
            mse = np.mean((y_pred - y_true) ** 2)
            return np.sqrt(mse)
        elif self.metric == "mape":
            return np.mean(np.abs((y_true - y_pred) / y_true)) *100
        elif self.metric == "r2":
            y_mean = np.mean(y_true)
            ss_res = np.sum((y_true - y_pred) ** 2)
            ss_tot = np.sum((y_true - y_mean) ** 2)
            if ss_tot == 0:
                metric_value =  0
            return  1 - (ss_res / ss_tot)
        elif self.metric == "mse":
            return np.mean((y_true - y_pred) ** 2)
        else:
            return None
    def _get_learning_rate(self, iteration):
        if callable(self.learning_rate):
            return (float(self.learning_rate(iteration)))
        else:
            return(float(self.learning_rate))

    def fit(self, X, y, verbose = False):
        random.seed(self.random_state)
        
        if isinstance(X, pd.DataFrame):
            X_array = X.values
        else:
            X_array = np.asarray(X)
        if isinstance(y, pd.DataFrame):
            y_array = y.values.flatten()
        else:
            y_array = np.asarray(y)
            
        n_samples, n_features = X_array.shape   
        X_array = np.c_[np.ones(n_samples), X_array]
        self.weights = np.ones(n_features+1)
    
        for i in range(1, self.n_iter +1):
            lr = self._get_learning_rate(i)
            y_pred_full = X_array @ self.weights
            errors_full = y_pred_full- y_array
            if self.sgd_sample:
                if isinstance(self.sgd_sample, int):
                    sgd_sample = self.sgd_sample
                elif isinstance(self.sgd_sample, float):
                    sgd_sample = round(X.shape[0] * self.sgd_sample)
                else:
                    raise ValueError("SGD sample должен быть целым или дробным числом!")
                sample_rows_idx = random.sample(range(X.shape[0]), sgd_sample)
                X_train = X_array[sample_rows_idx]
                y_train = y_array[sample_rows_idx]
                
            else:
                X_train = X_array.copy()
                y_train = y_array.copy()
                
            y_pred_batch = X_train @ self.weights
            errors_batch = y_pred_batch - y_train
            n_samples_batch = len(X_train)
            
            if self.reg:
                if self.reg == 'l1':
                    mse_loss = np.sum(errors_full **2) / n_samples + self.l1_coef * np.sum(np.abs(self.weights))
                    grad = (2/n_samples_batch) * (errors_batch  @ X_train) + self.l1_coef * np.sign(self.weights)
                elif reg == 'l2':
                    mse_loss = np.sum(errors_full **2) / n_samples + self.l2_coef * np.sum(self.weights ** 2)
                    grad = (2/n_samples_batch) * (errors_batch  @ X_train) + self.l2_coef * 2 * self.weights
                elif reg == 'elasticnet':
                    mse_loss = np.sum(errors_full **2) / n_samples + self.l1_coef * np.sum(np.abs(self.weights)) + (1 - self.l1_coef) * np.sum(self.weights ** 2)
                    grad = (2/n_samples_batch) * (errors_batch  @ X_train) + self.l1_coef * np.sign(self.weights) + (1 - self.l1_coef) * 2 * self.weights
                else:
                    return None
            else:
                mse_loss = np.sum(errors_full **2) / n_samples
                grad = (2/n_samples_batch) * (errors_batch  @ X_train)
                
           
            self.weights -= lr * grad
            if self.metric:
                metric_value = self._calculate_metric(y_true = y_array, y_pred = y_pred_full)                                    
                
            if verbose and i % verbose == 0:
                if i == 0:
                    print(f"start | loss: {mse_loss:.2f} | learning rate: {lr}", f" {self.metric}|{metric_value}" if self.metric else "")
                else:
                    print(f" {i} | loss: {mse_loss:.2f} | learning rate: {lr}",  f" {self.metric}|{metric_value}" if self.metric else "")
                    
        if self.metric:
            y_final = X_array @ self.weights
            self.best_score = self._calculate_metric(y_array, y_final)
    
    def get_coef(self):
        if self.weights is None:
            raise ValueError("Модель не обучена. Сначала вызовите fit().")
        return self.weights[1:]
    
    def get_best_score(self):
        return self.best_score
            
    def predict(self, X):
        if isinstance(X, pd.DataFrame):
            X_array = X.values
        else:
            X_array = np.asarray(X)
            
        n_samples = X_array.shape[0]   
        X_array = np.c_[np.ones(n_samples), X_array]
        y_pred = X_array @ self.weights
        return y_pred

In [16]:
from sklearn.datasets import make_regression
import pandas as pd
import numpy as np

x_train, y_train = make_regression(n_samples=20000,
                                   n_features=5,
                                   n_informative=5,
                                   noise=15,
                                   random_state=42)
x_train = pd.DataFrame(x_train)
y_train = pd.Series(y_train)
x_train.columns = [f'col_{col}' for col in x_train.columns]


x_test, y_test = make_regression(n_samples=5,
                                   n_features=5,
                                   n_informative=2,
                                   noise=15,
                                   random_state=42)
x_test = pd.DataFrame(x_test)
y_test = pd.Series(y_test)
x_test.columns = [f'col_{col}' for col in x_test.columns]


a = MyLineReg(metric='mae', n_iter = 400, learning_rate = 0.01, reg = 'l1', l1_coef = 0.5, sgd_sample = 50)
a.fit(x_train, y_train, verbose=100)
print(a.get_best_score())

 100 | loss: 523.99 | learning rate: 0.01  mae|16.64044728345547
 200 | loss: 327.45 | learning rate: 0.01  mae|12.02209550488457
 300 | loss: 324.83 | learning rate: 0.01  mae|11.89893978181818
 400 | loss: 324.64 | learning rate: 0.01  mae|11.893419996345433
11.891543017560846
