# Стохастический градиентный и координатный спуски

Для каждого задания указано количество баллов (если они оцениваются отдельно) + 1 балл за аккуратное и полное выполнение всего задания

In [67]:
from pathlib import Path

import pandas as pd
import numpy as np
import plotly.express as px
from plotly.graph_objects import Figure
from rich.jupyter import print
from rich.table import Table
from pylatexenc.latex2text import LatexNodes2Text

## Загрузка и подготовка данных

**Загрузите уже знакомый вам файл *Advertising.csv* как объект DataFrame.** 

In [2]:
DATA_PATH = "Advertising.csv"

DATA_PATH = Path(DATA_PATH)
assert DATA_PATH.exists()

advertising_df = pd.read_csv(DATA_PATH)
advertising_df.head()

Unnamed: 0.1,Unnamed: 0,TV,radio,newspaper,sales
0,1,230.1,37.8,69.2,22.1
1,2,44.5,39.3,45.1,10.4
2,3,17.2,45.9,69.3,9.3
3,4,151.5,41.3,58.5,18.5
4,5,180.8,10.8,58.4,12.9


**Проверьте, есть ли в данных пропуски и, если они есть - удалите их**

In [3]:
advertising_df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 200 entries, 0 to 199
Data columns (total 5 columns):
 #   Column      Non-Null Count  Dtype  
---  ------      --------------  -----  
 0   Unnamed: 0  200 non-null    int64  
 1   TV          200 non-null    float64
 2   radio       200 non-null    float64
 3   newspaper   200 non-null    float64
 4   sales       200 non-null    float64
dtypes: float64(4), int64(1)
memory usage: 7.9 KB


**Преобразуйте ваши признаки в массивы NumPy и разделите их на переменные X (предикторы) и y(целевая переменная)** 

In [4]:
predictors = advertising_df[['TV', 'radio', 'newspaper']].values
target = advertising_df['sales'].values

## Координатный спуск (3 балла)

**Добавим единичный столбец для того, чтобы у нас был свободный коэффициент в уравнении регрессии:**

In [5]:
predictors_with_bias = np.hstack([np.ones(predictors.shape[0]).reshape(-1, 1), predictors])

**Нормализуем данные: обычно это необходимо для корректной работы алгоритма**

In [6]:
normalized_predictors = predictors_with_bias / np.sqrt(np.sum(np.square(predictors_with_bias), axis=0))

**Реализуйте алгоритм координатного спуска:** (3 балла)

Ниже приведен алгоритм:

<a href="https://ibb.co/Th3BQFn"><img src="https://i.ibb.co/DK2DBS6/zascas.jpg" alt="zascas" border="0"></a>

Примечание: 1000 итераций здесь указаны для этого задания, на самом деле их может быть намного больше, нет детерменированного значения.

Вам необходимо реализовать координатный спуск, и вывести веса в модели линейной регрессии.

In [7]:
# Реализация пакетного градиентного спуска
latex_to_text = LatexNodes2Text().latex_to_text

eta = 10 ** 1
number_of_iterations = 10 ** 3
number_of_samples = len(normalized_predictors)

omega = np.random.randn(normalized_predictors.shape[1])

for _ in range(number_of_iterations):
    gradients = 2/number_of_samples * normalized_predictors.T.dot(normalized_predictors.dot(omega) - target)
    omega -= eta * gradients

table = Table(title="Коэффициенты Линейной Регрессии (Пакетный Градиентный Спуск)")
table.add_column(latex_to_text(r"\Omega"), justify="center", style="cyan", no_wrap=True)
table.add_column("Значение", justify="center")

for i, w in enumerate(omega):
    table.add_row(latex_to_text(fr"\omega_{{{i}}}"), f"{w:.4f}")

print(table)

Сравните результаты с реализацией линейной регрессии из библиотеки sklearn:

In [8]:
from sklearn.linear_model import LinearRegression
 
model = LinearRegression(fit_intercept=False)
model.fit(normalized_predictors, target)
 
table = Table(title="Коэффициенты Линейной Регрессии (sklearn)")
table.add_column(latex_to_text(r"\Omega"), justify="center", style="cyan", no_wrap=True)
table.add_column("Значение", justify="center")

for i, w in enumerate(model.coef_):
    table.add_row(latex_to_text(fr"\omega_{{{i}}}"), f"{w:.4f}")

print(table)

Если вы все сделали верно, они должны практически совпасть!

## Стохастический градиентный спуск (6 баллов)

**Отмасштабируйте столбцы исходной матрицы *X* (которую мы не нормализовали еще!). Для того, чтобы это сделать, надо вычесть из каждого значения среднее и разделить на стандартное отклонение** (0.5 баллов)

In [9]:
predictors_scalable = (predictors - predictors.mean(axis=0)) / predictors.std(axis=0)

**Добавим единичный столбец**

In [10]:
predictors_scalable_with_bias = np.hstack([np.ones(predictors_scalable.shape[0]).reshape(-1, 1), predictors_scalable])

**Создайте функцию mse_error для вычисления среднеквадратичной ошибки, принимающую два аргумента: реальные значения и предсказывающие, и возвращающую значение mse** (0.5 балла)

In [11]:
def mse_error(real_values: np.ndarray, predictive_values: np.ndarray) -> float:
    return np.mean((real_values - predictive_values) ** 2)

**Сделайте наивный прогноз: предскажите продажи средним значением. После этого рассчитайте среднеквадратичную ошибку для этого прогноза** (0.5 балла)

In [143]:
mean_prediction = np.mean(target)
mean_rmse = mse_error(target, mean_prediction) ** 0.5
print(f"Среднеквадратичная ошибка для наивного прогноза: {mean_rmse:.4f}")

**Создайте функцию *lin_pred*, которая может по матрице предикторов *X* и вектору весов линейной модели *w* получить вектор прогнозов** (0.5 балла)

In [13]:
def lin_pred(X: np.ndarray, w: np.ndarray) -> np.ndarray:
    return X.dot(w)


**Создайте функцию *stoch_grad_step* для реализации шага стохастического градиентного спуска. (1.5 балла) 
Функция должна принимать на вход следующие аргументы:**
* матрицу *X*
* вектора *y* и *w*
* число *train_ind* - индекс объекта обучающей выборки (строки матрицы *X*), по которому считается изменение весов
* число *$\eta$* (eta) - шаг градиентного спуска

Результатом будет вектор обновленных весов

Шаг для стохастического градиентного спуска выглядит следующим образом:

$$\Large w_j \leftarrow w_j - \frac{2\eta}{\ell} \sum_{i=1}^\ell{{x_{ij}((w_0 + w_1x_{i1} + w_2x_{i2} +  w_3x_{i3}) - y_i)}}$$

Для того, чтобы написать функцию, нужно сделать следующее:
    
*  посчитать направление изменения: умножить объект обучающей выборки на 2 и на разницу между предсказанным значением и реальным, а потом поделить на количество элементов в выборке.
* вернуть разницу между вектором весов и направлением изменения, умноженным на шаг градиентного спуска

In [79]:
from dataclasses import dataclass


@dataclass(frozen=True, kw_only=True, slots=True)
class SGDResponse:
    omegas: np.ndarray
    rmse: list[float]
    rmse_fig: Figure
    
    def __str__(self):
        return f"omegas: {self.omegas}\nrmse: {self.rmse[-1]:.4f}"
    
    __repr__ = __str__


class StochasticGradientDescent:
    
    __slots__ = (
        "predictors",
        "target",
        "sample_size",
        "number_features",
        "omegas",
        "tolerance",
        "learning_rate",
        "max_iterations",
    )

    def __init__(
        self,
        predictors: np.ndarray,
        target: np.ndarray,
        omegas: np.ndarray | None = None,
        tolerance: float = 10 ** -4,
        learning_rate: float = 5.5 * 10 ** 2,
        max_iterations: int = 10 ** 6,
    ):
        self.predictors = predictors
        self.target = target
        self.sample_size = len(predictors)
        self.number_features = predictors.shape[1]
        self.omegas = omegas if omegas is not None else np.zeros(self.number_features)
        self.tolerance = tolerance
        self.learning_rate = learning_rate
        self.max_iterations = max_iterations
    
    def __call__(self) -> SGDResponse:
        omegas = self.omegas.copy()
        
        rmse = []
        for _ in range(self.max_iterations):
        
            random_index = np.random.randint(self.sample_size)
            
            predictors_row = self.predictors[random_index]
            target_row = self.target[random_index]
            
            new_omegas = self.stoch_grad_step(
                predictors_row=predictors_row,
                target_row=target_row,
                omegas=omegas,
            )
            
            if np.linalg.norm(new_omegas - omegas) < self.tolerance:
                break
            
            omegas = new_omegas
            rmse.append(mse_error(self.target, self.predictors.dot(omegas)) ** 0.5)
        
        return SGDResponse(
            omegas=omegas,
            rmse=rmse,
            rmse_fig=self.create_rmse_fig(rmse),
        )
    
    def stoch_grad_step(
        self,
        predictors_row: np.ndarray,
        target_row: np.ndarray,
        omegas: np.ndarray,
    ) -> np.ndarray:
        forecast = self.lin_pred(predictors=predictors_row, omegas=omegas)
        gradient = 2 * self.lin_pred(predictors=predictors_row.T, omegas=forecast - target_row)
        eta = self.learning_rate / self.sample_size
        
        return omegas - eta * gradient
    
    @classmethod
    def lin_pred(cls, predictors: np.ndarray, omegas: np.ndarray) -> np.ndarray:
        return predictors.dot(omegas)
    
    @classmethod
    def create_rmse_fig(cls, rmse: list[float]) -> Figure:
        fig = px.line(
            x=range(len(rmse)), 
            y=rmse, 
            title="Стохастический Градиентный Спуск", 
            log_x=True,
        )
        fig.update_layout(
            xaxis_title="Итерация",
            yaxis_title="RMSE",
        )
        
        return fig



**Создайте функцию *stochastic_gradient_descent*, для реализации стохастического градиентного спуска (2.5 балла)**

**Функция принимает на вход следующие аргументы:**
- Матрицу признаков X
- Целевую переменнную
- Изначальную точку (веса модели)
- Параметр, определяющий темп обучения
- Максимальное число итераций
- Евклидово расстояние между векторами весов на соседних итерациях градиентного спуска,при котором алгоритм прекращает работу 

**На каждой итерации в вектор (список) должно записываться текущее значение среднеквадратичной ошибки. Функция должна возвращать вектор весов $w$, а также вектор (список) ошибок.**

Алгоритм сследующий:
    
* Инициализируйте расстояние между векторами весов на соседних итерациях большим числом (можно бесконечностью)
* Создайте пустой список для фиксации ошибок
* Создайте счетчик итераций
* Реализуйте оновной цикл обучения пока расстояние между векторами весов больше того, при котором надо прекратить работу (когда расстояния станут слишком маленькими - значит, мы застряли в одном месте) и количество итераций меньше максимально разрешенного: сгенерируйте случайный индекс, запишите текущую ошибку в вектор ошибок, запишите в переменную текущий шаг стохастического спуска с использованием функции, написанной ранее. Далее рассчитайте текущее расстояние между векторами весов и прибавьте к счетчику итераций 1.
* Верните вектор весов и вектор ошибок

In [138]:
stochastic_gradient_descent = StochasticGradientDescent(
    predictors=predictors_scalable_with_bias, 
    target=target,
    max_iterations=10 ** 5,
    learning_rate=6 * 10 ** -1,
    tolerance=10 ** -5,
    omegas=np.ones(predictors_scalable_with_bias.shape[1]) * 10 ** 10,
)

response = stochastic_gradient_descent()
response

omegas: [14.05979037  3.79835744  2.75064846 -0.04405857]
rmse: 1.6745

 **Запустите $10^5$ итераций стохастического градиентного спуска. Укажите вектор начальных весов, состоящий из нулей. Можете поэкспериментировать с параметром, отвечающим за темп обучения.**

**Постройте график зависимости ошибки от номера итерации**

In [139]:
response.rmse_fig.show()

**Выведите вектор весов, к которому сошелся метод.**

In [140]:
print("Вектор весов к которому сошелся метод:", response.omegas)

**Выведите среднеквадратичную ошибку на последней итерации.**

In [141]:
print("Среднеквадратичная ошибка на последней итерации:", response.rmse[-1])