# Линейная регрессия (Linear Regression)
Метод машинного обучения, в котором ищется отношение зависимой переменной от одной или нескольких независимых переменных (регрессоров) посредством линейной функции. Стохастический градиентный спуск основан на функции MSE. Также модель дополнена регуляризацией, чтобы бороться со сложностью модели.

Реализованы методы:
- fit для обучения модели (со стохастическим градиентным спуском)
- predict для предсказания таргетов
- _init_ - конструктор
- calculate_loss - подсчитывает функцию потерь c учетом регуляризации
- get_best_score - возвращает последнее значение метрики (т.е. уже полностью обученной модели)
- calculate_metric - подсчитывает метрики
- calculate_regularization_grad - подсчитывает производную для регуляризации

Дополнительно реализован подсчет метрик:
- mae
- mse
- rmse
- mape
- r2

In [36]:
import numpy as np
import pandas as pd
import random

In [37]:
class MyLineReg():
    
    def __init__(self, n_iter=100, learning_rate=0.1, weights=False, metric=None, reg=None, l1_coef=0, l2_coef=0,
                 sgd_sample=None, random_state=42):
        '''
        Input:
        n_iter: the number of steps of gradient descent (default = 100)
        learning_rate: gradient descent learning rate coefficient or lambda function (default = 0.1)
        weights: array of model weights (default = empty)
        metric: string, name of the metric from array ['mae', 'mse', 'rmse', 'mape', 'r2'], (default = None)
        reg: string, regularization for model, name from ['l1', 'l2', 'elasticnet'], (default = None)
        l1_coef: value between 0.0 and 1.0 for L1_reg (default = 0)
        l2_coef: value between 0.0 and 1.0 for L2_reg (default = 0)
        sgd_sample: the number of samples that will be used in each iteration of the training. 
        It can accept either integers or fractions from 0.0 to 1.0.
        random_state: integer
        '''
        self.n_iter = n_iter
        self.learning_rate = learning_rate
        self.weights = weights
        self.metric = metric
        self.metric_values = []
        self.reg = reg
        self.l1_coef = l1_coef
        self.l2_coef = l2_coef
        self.sgd_sample = sgd_sample
        self.random_state = random_state
    
    def __str__(self):
        '''
        Output: 
        string - info about class parameters
        '''
        return f"MyLineReg class: n_iter={self.n_iter}, learning_rate={self.learning_rate}"
    
    def fit(self, X, y, verbose):
        '''
        Input:
        X: DataFrame of features
        y: Series of targets
        verbose (int): indicates which iteration to output the log (default = False)
        '''
        random.seed(self.random_state)
        # Complete the passed feature matrix with a single column on the left.
        X = pd.concat([pd.Series(1, index=X.index), X], axis=1)
        self.weights = np.ones(X.shape[1])
        
        if verbose and not(self.metric is None):
            print(f"start | loss: {self.calculate_loss(X, y)} | {self.metric}: {self.calculate_metric(X, y)}")

        for i in range(1, self.n_iter + 1):
            
            # Check lambda function
            if callable(self.learning_rate):
                lr = self.learning_rate(i)
            else:
                lr = self.learning_rate
            
            # Form the sequence numbers of the lines that should be selected for Stochastic gradient descent
            if self.sgd_sample is not None:
                if isinstance(self.sgd_sample, int):
                    sample_size = self.sgd_sample
                elif 0 < self.sgd_sample < 1:
                    sample_size = int(self.sgd_sample * X.shape[0])
                
                sample_rows_idx = random.sample(range(X.shape[0]), sample_size)
                X_sample = X.iloc[sample_rows_idx]
                y_sample = y.iloc[sample_rows_idx]
            else:
                X_sample = X
                y_sample = y

            y_pred = np.dot(X_sample, self.weights)
            
            # Calculate the gradient based on MSE and regularization
            gradient = 2 * np.dot(y_pred - y_sample, X_sample) / X_sample.shape[0] + self.calculate_regularization_grad()
                        
            # Make a step with the size of the lr in the opposite direction from the gradient
            self.weights -= lr * gradient
            
            if verbose and i % verbose == 0 and not(self.metric is None):
                print(f"start | loss: {self.calculate_loss(X, y)} | {self.metric}: {self.calculate_metric(X, y)}")
        
        if self.metric:
            self.metric_values.append(self.calculate_metric(X, y))
    
    def calculate_regularization_grad(self):
        '''
        Output:
        float - regularization gradient
        '''
        if self.reg == "l1":
            return self.l1_coef * np.sign(self.weights)      
        if self.reg == "l2":
            return 2 * self.l2_coef * self.weights    
        if self.reg == "elasticnet":
            return self.l1_coef * np.sign(self.weights) + 2 * self.l2_coef * self.weights
        return 0

    
    def calculate_loss(self, X, y):
        '''
        Input:
        X: DataFrame of features
        y: Series of targets
        Output: MSE and regularization -> float
        '''
        y_pred = np.dot(X, self.weights)
        loss = np.mean((y_pred - y) ** 2)
        if self.reg == "l1":
            reg_loss = self.l1_coef *  np.sum(np.abs(self.weights)) 
        elif self.reg == "l2":
            reg_loss = self.l2_coef * np.sum(self.weights ** 2)
        elif self.reg == "elasticnet":
            reg_loss = self.l1_coef *  np.sum(np.abs(self.weights)) + self.l2_coef * np.sum(self.weights ** 2)
        else:
            reg_loss = 0
        loss += reg_loss
        return reg_loss
    
    def get_coef(self):
        '''
        Output:
        np.array of weights without first 
        '''
        return self.weights[1:]
    
    def predict(self, X):
        '''
        Input:
        X: DataFrame of features
        Output:
        y_pred: array of prediction
        '''
        X = pd.concat([pd.Series(1, index=X.index), X], axis=1)
        y_pred = np.dot(X, self.weights)
        return y_pred
    
    def get_best_score(self):
        return self.metric_values[-1] if self.metric_values else None
    
    def calculate_metric(self, X, y):
        y_pred = np.dot(X, self.weights)
        
        if self.metric == 'mae':
            return np.mean(np.abs(y_pred - y))
        
        elif self.metric == 'mse':
            return np.mean((y_pred - y) ** 2)
        
        elif self.metric == 'rmse':
            return np.sqrt(np.mean((y_pred - y) ** 2))
        
        elif self.metric == 'mape':
            return np.mean(np.abs((y - y_pred) / y)) * 100
        
        elif self.metric == 'r2':
            ss_res = np.sum((y - y_pred) ** 2)
            ss_tot = np.sum((y - np.mean(y)) ** 2)
            return 1 - (ss_res / ss_tot)


## Протестируем модель

Входные данные: датасет с различными параметрами (сгенерированными посредством метода make_regression из scikit-learn)

Выходные данные: возвращенные предсказания и MAE


In [38]:
from sklearn.datasets import make_regression

X, y = make_regression(n_samples=1000, n_features=14, n_informative=10, random_state=42)
X = pd.DataFrame(X)
y = pd.Series(y)
X.columns = [f'col_{col}' for col in X.columns]

In [42]:
from sklearn.model_selection import train_test_split

model = MyLineReg(metric='mae', reg='elasticnet', sgd_sample=0.3)
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2)
model.fit(X_train, y_train, 50)
y_pred = model.predict(X_test)
np.mean((y_pred - y_test) ** 2)

start | loss: 0.0 | mae: 112.80906582514544
start | loss: 0.0 | mae: 0.008318875593259337
start | loss: 0.0 | mae: 2.276170724027221e-06


1.097254555204565e-11

Как ожидалось, наша модель работает корректно. Следовательно, алгоритм реализован верно.