# Импорты

In [175]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.preprocessing import PolynomialFeatures
from sklearn.linear_model import LinearRegression
from sklearn.tree import DecisionTreeRegressor
from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score
from collections import Counter
from sklearn.model_selection import train_test_split
from tqdm.notebook import tqdm

# Модель 1 Аналитическое решение

In [176]:
class SimpleLinearRegression:
    def __init__(self, fit_intercept=True):
        self.fit_intercept = fit_intercept
        self.w = None

    def fit(self, X: np.ndarray, y: np.ndarray):
        """
        фукнкция обучения - вычисляет параметры модели (веса) по данной выборке

        Inputs:
        X - матрица признаков
        y - вектор ответов

        Outputs:
        self - модель
        """

        if self.fit_intercept:
            ones_col = np.ones((X.shape[0], 1))
            # добавляем вектор единиц в x, чтобы не занулить смещение w0 (байес)
            X = np.hstack([ones_col, X])

        if not np.isclose(np.linalg.det(X.T @ X), 0.0):
            self.w = np.linalg.inv(X.T @ X) @ X.T @ y

        return self

    def predict(self, X: np.ndarray):
        """
        функция предсказания - предсказывает ответы модели по данной выборке

        Inputs:
        X - матрица признаков

        Outputs:
        y_pred - предсказания
        """

        if self.fit_intercept:
            ones_col = np.ones((X.shape[0], 1))
            X = np.hstack([ones_col, X])

        y_pred = X @ self.w

        return y_pred

    def get_weights(self):
        ''' 
        фун-я возвращает веса модели
        '''
        return self.w

## сравниваю модели

In [177]:
# сгенерируем простой датасет с одним признаком
n_objects = 100


def linear_func(x): return 3.2 * x + 8


X = np.linspace(-10, 10, n_objects)
y = linear_func(X) + np.random.randn(n_objects) * 5

In [178]:
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2)

In [179]:
simple_model = SimpleLinearRegression()
original_model = LinearRegression()

In [180]:
X_train.shape

(80,)

In [181]:
simple_model.fit(X_train[:, np.newaxis], y_train)
simple_model.predict(X_test[:, np.newaxis])

array([  8.80750756,  -9.82139648,   6.23800356,  -5.96714047,
        30.00591561,  -4.68238847, -20.0994125 ,  -5.32476447,
        24.8669076 , -19.4570365 , -11.74852448,   3.66849955,
        31.93304362, -18.1722845 ,  28.72116361, -10.46377248,
        19.72789959, -11.10614848,   6.88037956, -18.8146605 ])

In [182]:
original_model.fit(X_train[:, np.newaxis], y_train)
original_model.predict(X_test[:, np.newaxis])

array([  8.80750756,  -9.82139648,   6.23800356,  -5.96714047,
        30.00591561,  -4.68238847, -20.0994125 ,  -5.32476447,
        24.8669076 , -19.4570365 , -11.74852448,   3.66849955,
        31.93304362, -18.1722845 ,  28.72116361, -10.46377248,
        19.72789959, -11.10614848,   6.88037956, -18.8146605 ])

# Модель 2 с SGD

In [253]:
class SGDLinearRegression:
    def __init__(self, fit_intercept: bool = True, learning_rate: float = 0.01, 
                n_iter: int = 1000, batch_size: int = 10, random_state: int = 21): 
        
        self.fit_intercept = fit_intercept
        self.lr = learning_rate
        self.n_iter = n_iter 
        self.batch_size = batch_size 
        self.random_state = random_state
        self.w = None
        self.loss_history = [] 
        self.rng = np.random.RandomState(random_state)
    
    def _add_intercept(self, X: np.ndarray): 
        ones_col = np.ones((X.shape[0], 1)) # добавляем вектор единиц в x, чтобы не занулить смещение w0 (байес)
        X = np.hstack([ones_col, X])
        
        return X 
    
    def _gradient(self, X_batch: np.ndarray, y_batch: np.ndarray): 
        """
        Вычисление градиентов MSE loss для батча
        L = (1/n) * Σ(y_pred - y)^2
        ∇L = (2/n) * X.T @ (y_pred - y_true) = (2/n) * X.T @ (X @ w - y)
        """
        
        n = X_batch.shape[0] 
        y_pred = X_batch @ self.w 
        error = y_pred - y_batch
        
        gradient = (2/n) * X_batch.T @ error # type: ignore
        return gradient 
    
    def fit(self, X: np.ndarray, y: np.ndarray): 
        """
        Обучение модели методом стохастического градиентного спуска
        
        Параметры:
        -----------
        X : матрица признаков (n_samples, n_features)
        y : вектор целевых значений (n_samples,)
        """
        
        X_train = X.copy() 
        y_train = y.copy().ravel()
        
        if self.fit_intercept: 
            X_train = self._add_intercept(X_train)
            
        n_samples, n_features = X_train.shape 
        self.w = self.rng.randn(n_features) * 0.01 
        
        for epoch in tqdm(range(self.n_iter), desc='Training'):
            epoch_loss_sum = 0
            batch_count = 0
            
            indices = np.arange(n_samples) 
            
            for i in range(0, n_samples, self.batch_size): 
                batch_indices = indices[i:i + self.batch_size]
                
                if len(batch_indices) == 0:
                    continue
                
                X_batch = X_train[batch_indices]
                y_batch = y_train[batch_indices]
                
                grad = self._gradient(X_batch, y_batch)
                self.w = self.w - self.lr * grad
                
                y_pred_batch = X_batch @ self.w
                batch_loss = np.mean((y_pred_batch - y_batch) ** 2)
                
                epoch_loss_sum += batch_loss
                batch_count += 1
                
            if batch_count > 0:
                epoch_avg_loss = epoch_loss_sum / batch_count
                self.loss_history.append(epoch_avg_loss) 
            else: 
                self.loss_history.append(0.0)
            
                
        return self 

    def predict(self, X: np.ndarray):
        """
        функция предсказания - предсказывает ответы модели по данной выборке
        
        Inputs:
        X - матрица признаков
        
        Outputs:
        y_pred - предсказания
        """
        
        if self.fit_intercept: 
            X = self._add_intercept(X)

        return X @ self.w
               
    def get_weights(self):
        return self.w    
        

## проверка моделей

In [254]:
sgd_model = SGDLinearRegression(batch_size=10)

In [255]:
sgd_model.fit(X_train[:, np.newaxis], y_train)
sgd_model.predict(X_test[:, np.newaxis])

Training:   0%|          | 0/1000 [00:00<?, ?it/s]

array([  8.94658481,  -8.92009611,   6.48221503,  -5.22354143,
        29.27763551,  -3.99135654, -18.77757523,  -4.60744899,
        24.34889594, -18.16148279, -10.76837344,   4.01784525,
        31.12591284, -16.9292979 ,  28.04545062,  -9.53618855,
        19.42015638, -10.152281  ,   7.09830747, -17.54539034])

In [256]:
simple_model.predict(X_test[:, np.newaxis])

array([  8.80750756,  -9.82139648,   6.23800356,  -5.96714047,
        30.00591561,  -4.68238847, -20.0994125 ,  -5.32476447,
        24.8669076 , -19.4570365 , -11.74852448,   3.66849955,
        31.93304362, -18.1722845 ,  28.72116361, -10.46377248,
        19.72789959, -11.10614848,   6.88037956, -18.8146605 ])

# Модель 3 классический градиентный спуск

In [257]:
class ClassicalGDLinearRegression:
    def __init__(self, fit_intercept: bool = True, learning_rate: float = 0.01, 
                n_iter: int = 1000, random_state: int = 21): 
        
        self.fit_intercept = fit_intercept
        self.lr = learning_rate
        self.n_iter = n_iter 
        self.random_state = random_state
        self.w = None
        self.loss_history = [] 
        self.rng = np.random.RandomState(random_state)
    
    def _add_intercept(self, X: np.ndarray): 
        ones_col = np.ones((X.shape[0], 1)) # добавляем вектор единиц в x, чтобы не занулить смещение w0 (байес)
        X = np.hstack([ones_col, X])
        
        return X 
    
    def _gradient(self, X: np.ndarray, y: np.ndarray): 
        """
        Вычисление градиентов MSE loss для всей выборки
        L = (1/n) * Σ(y_pred - y)^2
        ∇L = (2/n) * X.T @ (y_pred - y_true) = (2/n) * X.T @ (X @ w - y)
        """
        
        n = X.shape[0] 
        y_pred = X @ self.w 
        error = y_pred - y
        
        gradient = (2/n) * X.T @ error # type: ignore
        return gradient 
    
    def fit(self, X: np.ndarray, y: np.ndarray): 
        """
        Обучение модели с помощью классического градиентного спуска
        
        Параметры:
        -----------
        X : матрица признаков (n_samples, n_features)
        y : вектор целевых значений (n_samples,)
        """
        eps = 1e-4
        best_loss = 10_000
        X_train = X.copy() 
        y_train = y.copy().ravel()
        
        if self.fit_intercept: 
            X_train = self._add_intercept(X_train)
            
        n_samples, n_features = X_train.shape 
        self.w = self.rng.randn(n_features) * 0.01 
        
        for epoch in tqdm(range(self.n_iter), desc='Training'):            
            grad = self._gradient(X_train, y_train)
            self.w = self.w - self.lr * grad
            
            y_pred_batch = X_train @ self.w
            loss = np.mean((y_pred_batch - y_train) ** 2)
            self.loss_history.append(loss)
            
            # if loss < best_loss - eps: 
            #     best_loss = loss
            # else: 
            #     print(f"Ранняя остановка (эпоха {epoch}): loss стабилизировался на {loss:.4f}")
            #     break
                
        return self 

    def predict(self, X: np.ndarray):
        """
        функция предсказания - предсказывает ответы модели по данной выборке
        
        Inputs:
        X - матрица признаков
        
        Outputs:
        y_pred - предсказания
        """
        
        if self.fit_intercept: 
            X = self._add_intercept(X)

        return X @ self.w
               
    def get_weights(self):
        return self.w    
        

In [258]:
gd_model = ClassicalGDLinearRegression() 
gd_model.fit(X_train[:, np.newaxis], y_train)

Training:   0%|          | 0/1000 [00:00<?, ?it/s]

<__main__.ClassicalGDLinearRegression at 0x1688a96a0>

In [259]:
sgd_model.get_weights()

array([8.02244614, 3.0496576 ])

In [260]:
simple_model.get_weights()

array([7.84394356, 3.17976121])

In [261]:
gd_model.get_weights()

array([7.84394355, 3.17976121])

# Фун-ии метрик

In [192]:
def R2(y_true, y_pred): 
    y_mean = np.mean(y_true)
    ss_res = np.sum((y_true - y_pred)**2)
    ss_tot = np.sum((y_true - y_mean)**2) 
    
    if ss_tot == 0: 
        return 0.0 
    
    return 1 - (ss_res / ss_tot)

In [193]:
def MAE(y_true, y_pred): 
    return np.mean(np.abs(y_true - y_pred))

In [194]:
def MSE(y_true, y_pred): 
    return np.mean((y_true - y_pred)**2)

In [195]:
def RMSE(y_true, y_pred): 
    return np.sqrt(np.mean((y_true - y_pred)**2))

In [196]:
y_pred = gd_model.predict(X_test[:, np.newaxis])

In [197]:
R2(y_test, y_pred), r2_score(y_test, y_pred)

(np.float64(0.9520543460531936), 0.9520543460531936)

In [198]:
MAE(y_test, y_pred), mean_absolute_error(y_test, y_pred)

(np.float64(3.2040642586746144), 3.2040642586746144)

In [199]:
RMSE(y_test, y_pred), np.sqrt(mean_squared_error(y_test, y_pred))

(np.float64(4.243925454289967), np.float64(4.243925454289967))

In [200]:
MSE(y_test, y_pred), mean_squared_error(y_test, y_pred)

(np.float64(18.010903261570306), 18.010903261570306)

# L2 - регуляризация

In [262]:
class RidgeLinearRegression(SGDLinearRegression): 
    def __init__(self, fit_intercept: bool = True, learning_rate: float = 0.01, 
                 n_iter: int = 1000, batch_size: int = 10, random_state: int = 21, alpha: float = 0):
        super().__init__(fit_intercept, learning_rate, n_iter, batch_size, random_state)
        
        self.alpha = alpha
        
    def _gradient(self, X_batch: np.ndarray, y_batch: np.ndarray):
        """
        Градиент с L2 регуляризацией
        ∇L = (2/n) * X.T @ (X @ w - y) + 2 * alpha * w
        """
        
        n = X_batch.shape[0] 
        y_pred = X_batch @ self.w 
        error = y_pred - y_batch
        
        mse_grad = (2/n) * (X_batch.T @ error)
        l2_grad = 2 * self.alpha * self.w # type: ignore
        
        if self.fit_intercept:
            l2_grad[0] = 0  # смещение не штрафуем
        
        return mse_grad + l2_grad

## Проверяю метрики модели

на изначальных параметрах alpha и learning_rate ridge показывает очень плохие результаты в предсказаниях, нужно было поиграться с learning_rate и alpha для точного поподания 

In [263]:
ridge_model = RidgeLinearRegression(alpha=0.0001, learning_rate=0.0001, batch_size=10, n_iter=3000)
ridge_model.fit(X_train[:, np.newaxis], y_train)
ridge_pred = ridge_model.predict(X_test[:, np.newaxis])

Training:   0%|          | 0/3000 [00:00<?, ?it/s]

In [264]:
MSE(y_test, ridge_pred)

np.float64(18.09409484015761)

In [265]:
MSE(y_test, original_model.predict(X_test[:, np.newaxis]))

np.float64(18.01090324233241)

# L1-регуляризация

In [266]:
class LassoLinearRegression(SGDLinearRegression): 
    def __init__(self, fit_intercept: bool = True, learning_rate: float = 0.01, n_iter: int = 1000, 
                 batch_size: int = 10, random_state: int = 21, alpha: float = 0.1):
        super().__init__(fit_intercept, learning_rate, n_iter, batch_size, random_state)
        self.alpha = 0.1
    
    def _gradient(self, X_batch: np.ndarray, y_batch: np.ndarray):
        """
        Градиент с L1 регуляризацией (субградиент)
        ∇L = (2/n) * X.T @ (X @ w - y) + alpha * sign(w)
        """
     
        n = X_batch.shape[0] 
        y_pred = X_batch @ self.w 
        error = y_pred - y_batch 
        
        mse_grad = (2/n) * X_batch.T @ error 
        l1_grad = self.alpha * np.sign(self.w)  # type: ignore
        
        if self.fit_intercept: 
            l1_grad[0] = 0 # смещение не штрафуем
        
        return mse_grad + l1_grad
        

## Проверяю метрики модели

Тоже самое! Нужно посмотреть разные параметры! 

In [267]:
lasso_model = LassoLinearRegression(alpha=0.01, learning_rate=0.001, batch_size=10, n_iter=3000)
lasso_model.fit(X_train[:, np.newaxis], y_train)
lasso_pred = lasso_model.predict(X_test[:, np.newaxis])

Training:   0%|          | 0/3000 [00:00<?, ?it/s]

In [268]:
MSE(y_test, lasso_pred), MSE(y_test, original_model.predict(X_test[:, np.newaxis]))

(np.float64(18.011537301836743), np.float64(18.01090324233241))

# Elastic Net

In [271]:
class ElasticNetLinearRegression(SGDLinearRegression): 
    def __init__(self, fit_intercept: bool = True, learning_rate: float = 0.01, 
                 n_iter: int = 1000, batch_size: int = 10, random_state: int = 21,
                 alpha: float = 0.01, l1_ratio : float = 0.5):
        super().__init__(fit_intercept, learning_rate, n_iter, batch_size, random_state)
        self.l1_ratio = l1_ratio
        self.alpha = alpha
        
    def _gradient(self, X_batch: np.ndarray, y_batch: np.ndarray):
        """
        Градиент ElasticNet
        ∇L = (2/n) * X.T @ (X @ w - y) + 
             alpha * l1_ratio * sign(w) + 
             alpha * (1 - l1_ratio) * w
        """
        n = X_batch.shape[0] 
        y_pred = X_batch @ self.w 
        error = y_pred - y_batch
        
        mse_grad = (2/n) * (X_batch.T @ error) 
        
        l1_grad = self.alpha * self.l1_ratio * np.sign(self.w) # type: ignore
        l2_grad = self.alpha * (1 - self.l1_ratio) * self.w  # type: ignore
        
        if self.fit_intercept: 
            l1_grad[0] = 0 
            l2_grad[0] = 0 
            
        return mse_grad + l1_grad + l2_grad
        

## Проверка метрик моделей

In [278]:
elastic_model = ElasticNetLinearRegression(alpha=0.0001, learning_rate=0.0001, batch_size=10, n_iter=3000, l1_ratio=0.01)
elastic_model.fit(X_train[:, np.newaxis], y_train)
elastic_pred = elastic_model.predict(X_test[:, np.newaxis])

Training:   0%|          | 0/3000 [00:00<?, ?it/s]

In [279]:
MSE(y_test, elastic_pred), MSE(y_test, original_model.predict(X_test[:, np.newaxis]))

(np.float64(18.094022608552972), np.float64(18.01090324233241))