# 1. Линейная регрессия


Первый урок - всегда линейные модели, в частности линейная регрессия, напишем класс линейной регресии, обучаемый с помощью Mini-Batch Gradient Descent
Так же добавим возможность выбрать регуляризацию


In [None]:
from typing import Callable, Literal, get_args

import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import seaborn as sns
from sklearn.metrics import r2_score

sns.set_theme()

Алгоритм градиентного спуска выглядит следующим образом:

$$
{\displaystyle \mathbf {w} _{n+1}=\mathbf {w} _{n}-\gamma \nabla L(\mathbf {w} _{n})}
$$

где $\gamma$ - скорость градиентного спуска, гиперпараметр, настраеваемый для обучения

Наша Loss-функция

$$
L(f_w, y, X) = \frac{1}{N} \sum_{i=1}^{N} (y_i - \langle x_i, w \rangle - b)^2
$$

здесь подразумевается, что в матрицу признаков добавлен столбец, состоящий из единиц для свободного члена $w_0$


## Градиент без регуляризации


Выведем градиент функции потерь:

$$
    \nabla_w L(f_w, y, X) = (\frac{\partial L}{\partial w_0}, \frac{\partial L}{\partial w_1}, ... , \frac{\partial L}{\partial w_N})
$$

$$
    \frac{\partial L}{\partial w_0} = \frac{2}{N} \sum_{i=1}^N (y_i - w_0 \cdot x_{i0} - b) \cdot x_{i0}
$$

$$
    \frac{\partial L}{\partial b} = \frac{2}{N} \sum_{i=1}^N (y_i - \langle x_i, w \rangle - b)
$$

Вспомним формулу умножения матрицы на вектор:

$$
{\displaystyle v'_{i}=\sum \limits _{j}A_{ij}v_{j}}
$$

Можно заметить, что в формуле присутствуют соотвествующие операцие, единственное отличие - индексы, по которым идёт суммирование
пусть

$$
    \nabla_w L = \frac{2}{N} \sum_{i=1}^N (y_i - w_j \cdot x_{ij}) \cdot x_{ij} = \frac{2}{N} \sum_{i=1}^N y_i \cdot x_{ij} - \sum_{i=1}^N w_j \cdot x_{ij} \cdot x_{ij} = X^Ty - X^T X w = X^T(y - Xw)
$$

$$
    \nabla_b L = \frac{2}{N} \sum (Xw + b - y)
$$


## Градиент с L2 регуляризацией


Наша Loss-функция c L2 регуляризацией выглядит следующим образом:

$$
L(f_w, y, X) = \frac{1}{N} \sum_{i=1}^{N} (y_i - \langle x_i, w \rangle - b)^2 + \lambda \sum_{j=1}^D w_j ^2
$$

Соотвественно градиент для MSE с L2 регуляризацией в матричном виде:

$$
\nabla_w L = - X^T(y - Xw - b) + \lambda w
$$

$$
\nabla_b L = - X^T(y - Xw - b) + \lambda w
$$


## Немного о оптимизациях градиентного спуска


Градиентный спуск по всей выборке кушает много времени, но есть возможности для оптимизации:

1. Стохастический градиентый спуск - выбираем один элемент из выборки и сходимся за E итераций (эпох) или пока шаг не станет меньше некоторого delta_converged
2. MiniBatch - выбираем подвыборку из B элементов и сходимся за E итераций (эпох) или пока шаг не станет меньше некоторого delta_converged

Использую второй вариант


## Сам коооооодЪ


In [598]:
Matrix = pd.DataFrame | pd.Series | np.ndarray
Vector = Matrix


def mse_gradient(X: Matrix, y: Vector, w: Vector, b, lambda_=None) -> Vector:
    return X.T @ (X @ w + b - y) / X.shape[0]


def mse_l2_gradient(X: Matrix, y: Vector, w: Vector, b, lambda_: float) -> Vector:
    return mse_gradient(X, y, w, b) + lambda_ * w


gradients = {
    None: mse_gradient,
    "L2": mse_l2_gradient,
}


class MySGDLinearRegression:
    def __init__(
        self,
        learning_rate: float = 0.1,
        regularization: Literal["L1", "L2"] | None = None,
        lambda_: float = 1,
        delta_converged=1e-3,
        batch_size: int = 64,
        max_steps: int = 1000,
        verbose: int | None = None,
    ):
        if regularization and regularization not in ["L2"]:
            raise ValueError("Регуляризация должна принимать типы L2 или отсуствовать")

        if lambda_ and lambda_ < 0:
            raise ValueError(
                "Коэффициент регуляризации должен быть в интервале [0, inf)"
            )

        self._lr = learning_rate
        self._lambda = lambda_

        self._grad: Callable[[Matrix, Vector, Vector, float], Vector] = gradients[
            regularization
        ]
        self.batch_size = batch_size
        self.delta_converged = delta_converged
        self.max_steps = max_steps

        self.verbose = verbose

        self.epoch_ = None
        self.coef_ = None
        self.intercept_ = None

    def fit(self, X: Matrix, y: Vector):
        X = np.asarray(X)
        y = np.asarray(y)

        if self.verbose > 0:
            print("Начало обучения")

        n, _ = X.shape

        n, features = X.shape

        w = np.random.normal(0, 1, features)
        b = 0

        for e in range(self.max_steps):
            # Генерируем индексы подвыборки
            indices = np.random.permutation(n)[: self.batch_size]
            X_batch = X[indices]
            y_batch = y[indices]

            grad = self._grad(X_batch, y_batch, w, b, self._lambda)
            b_grad = (X_batch @ w + b - y_batch).sum() / self.batch_size

            w -= 2 * self._lr * grad
            b -= 2 * self._lr * b_grad

            if np.linalg.norm(grad) < self.delta_converged:
                break

            if self.verbose > 0:
                print(f"Эпоха {e} норма градиента: {np.linalg.norm(grad)}")

        if self.verbose > 0:
            print(f"Обучение закончено за {e} эпох")

        self.epoch_ = e

        self.coef_ = w
        self.intercept_ = b

        return self

    def predict(self, X: Matrix):
        return X @ self.coef_ + self.intercept_

## Сравнение на данных, хоба


In [None]:
from sklearn.datasets import fetch_california_housing
from sklearn.model_selection import train_test_split

X, y = fetch_california_housing(return_X_y=True, as_frame=True)
display(X.head())

X_train, X_test, y_train, y_test = train_test_split(
    X.drop(["AveBedrms"], axis=1),
    y,
    test_size=0.25,
    random_state=42,
)

Unnamed: 0,MedInc,HouseAge,AveRooms,AveBedrms,Population,AveOccup,Latitude,Longitude
0,8.3252,41.0,6.984127,1.02381,322.0,2.555556,37.88,-122.23
1,8.3014,21.0,6.238137,0.97188,2401.0,2.109842,37.86,-122.22
2,7.2574,52.0,8.288136,1.073446,496.0,2.80226,37.85,-122.24
3,5.6431,52.0,5.817352,1.073059,558.0,2.547945,37.85,-122.25
4,3.8462,52.0,6.281853,1.081081,565.0,2.181467,37.85,-122.25


In [None]:
# from sklearn.datasets import make_regression
# from sklearn.model_selection import train_test_split

# X, y = make_regression(n_samples=100, n_features=100, noise=4, random_state=42)
# X_train, X_test, y_train, y_test = train_test_split(
#     X, y, test_size=0.25, random_state=42
# )

In [None]:
ind = np.random.permutation(X.shape[0])[:64]
X_batch = np.asarray(X)[ind]
y_batch = np.asarray(y)[ind]

w = np.random.normal(0, 1, X.shape[1])

grad = X_batch.T @ (X_batch @ w - y_batch)
-2 * 0.00001 * grad

array([-7.89306800e+00, -5.23808774e+01, -1.08924236e+01, -2.24914186e+00,
       -4.82443490e+03, -5.91829433e+00, -7.43931118e+01,  2.51364452e+02])

In [None]:
from sklearn.preprocessing import MinMaxScaler, StandardScaler

# scaler = StandardScaler()
scaler = MinMaxScaler()
scaler.fit(X_train)

In [None]:
from sklearn.linear_model import LinearRegression, SGDRegressor

my_linear_model = MySGDLinearRegression(verbose=-1)

# my_linear_model.fit(X_train, y_train)
# my_y_pred = my_linear_model.predict(X_test)

my_linear_model.fit(scaler.transform(X_train), y_train)
my_y_pred = my_linear_model.predict(scaler.transform(X_test))

my_r2_score = r2_score(y_test, my_y_pred)

lr_model = SGDRegressor(penalty=None)

# lr_model.fit(X_train, y_train)
# y_pred = lr_model.predict(X_test)

lr_model.fit(scaler.transform(X_train), y_train)
y_pred = lr_model.predict(scaler.transform(X_test))

sk_r2_score = r2_score(y_test, y_pred)


res = {
    "SKLearn LinearRegression coef norm": np.linalg.norm(lr_model.coef_),
    "My SGDLinearRegression coef norm": np.linalg.norm(my_linear_model.coef_),
    "SKLearn LinearRegression R2": sk_r2_score,
    "My SGDLinearRegression R2": my_r2_score,
}
res

{'SKLearn LinearRegression coef norm': np.float64(6.40457598558952),
 'My SGDLinearRegression coef norm': np.float64(6.782421505094185),
 'SKLearn LinearRegression R2': 0.5632317055512235,
 'My SGDLinearRegression R2': 0.5599722931371789}

In [None]:
from sklearn.linear_model import Ridge

my_linear_model = MySGDLinearRegression(regularization="L2", verbose=-1)

# my_linear_model.fit(X_train, y_train)
# my_y_pred = my_linear_model.predict(X_test)

my_linear_model.fit(scaler.transform(X_train), y_train)
my_y_pred = my_linear_model.predict(scaler.transform(X_test))

my_r2_score = r2_score(y_test, my_y_pred)

lr_model = Ridge()

lr_model.fit(X_train, y_train)
y_pred = lr_model.predict(X_test)

# lr_model.fit(scaler.transform(X_train), y_train)
# y_pred = lr_model.predict(scaler.transform(X_test))

sk_r2_score = r2_score(y_test, y_pred)


res = {
    "SKLearn Ridge coef norm": np.linalg.norm(lr_model.coef_),
    "My SGDLinearRegression(L2) coef norm": np.linalg.norm(my_linear_model.coef_),
    "SKLearn Ridge R2": sk_r2_score,
    "My SGDLinearRegression(L2) R2": my_r2_score,
}
res

{'SKLearn Ridge coef norm': np.float64(0.7501030424774919),
 'My SGDLinearRegression(L2) coef norm': np.float64(0.09544773986971777),
 'SKLearn Ridge R2': 0.5951183417728905,
 'My SGDLinearRegression(L2) R2': 0.013964009200292948}

# Некоторые выводы

Мою имплементацию, к сожалению, нельзя использовать на реальном датасете без мастшабирования признаков, т.к. веса только растут

Сравнивая результат работы на данных без мастшабирования SGDRegressor(penalty=None) из пакета sklearn не выдаёт ошибку, вероятнее всего, внутри коробки лежат оптимизации, позволяющие наложить ограничения на веса
