In [1]:
import numpy as np
import pandas as pd
from math import ceil
from numpy.core.multiarray import ndarray
from numpy.linalg import inv
from typing import Optional

np.random.seed(33)

# Linear regression (analytical solution)

## Mathematical description 

$\mathbf{y}$ - vector with target variable values  

$\mathbf{w}$ - vector with linear regression weights

$\mathbf{X}$ - train matrix

$\mathbf{y} = \mathbf{X} \mathbf{w} + \mathbf{e} $ 

$ E = \sum_{i=1}^{N} ( \mathbf{x}_i^{T} \mathbf{w} - y_i ) ^2 = (\mathbf{X} \mathbf{w} - \mathbf{y}) ^2 = (\mathbf{X} \mathbf{w} - \mathbf{y} ) ^T (\mathbf{X} \mathbf{w} - \mathbf{y}) = (\mathbf{w}^T\mathbf{X}^T - \mathbf{y}^T)(\mathbf{X} \mathbf{w} - \mathbf{y}) = \mathbf{w}^T\mathbf{X}^T\mathbf{X}\mathbf{w} - \mathbf{w}^T\mathbf{X}^T\mathbf{y} - \mathbf{y}^T\mathbf{X}\mathbf{w} + \mathbf{y}^T\mathbf{y} $

find the derivative

$ \frac{\partial E}{\partial w} = \mathbf{w}^T(\mathbf{X}^T\mathbf{X} + \mathbf{X}^T\mathbf{X}) - (\mathbf{X}^T\mathbf{y})^T - \mathbf{y}^T\mathbf{X} + 0 = 2\mathbf{w}^T\mathbf{X}^T\mathbf{X} - 2\mathbf{y}^T\mathbf{X} $ 

equate the derivative to zero and solve the resulting equation

$ 2\mathbf{w}^T\mathbf{X}^T\mathbf{X} - 2\mathbf{y}^T\mathbf{X} = 0 $ 

$ \mathbf{w}^T\mathbf{X}^T\mathbf{X} = \mathbf{y}^T\mathbf{X} $

$ \mathbf{X}^T\mathbf{X}\mathbf{w} = \mathbf{X}^T\mathbf{y} $ 

$ \mathbf{w} = (\mathbf{X}^T\mathbf{X})^{-1}\mathbf{X}^T\mathbf{y} $ 

## Implementation in Python

In [2]:
class LinearRegressionAnalytical:
    def __init__(self) -> None:
        self.weights = None
        self.intercept = None

    def fit(self, X_train: ndarray, y: ndarray) -> None:
        X = np.hstack((X_train, np.ones((len(X_train), 1))))
        coefs = inv(X.T @ X) @ X.T @ y
        self.weights = coefs[:-1]
        self.intercept = coefs[-1]

    def predict(self, X_test: ndarray) -> ndarray:
        if self.weights is None:
            raise Exception("This linear regression instance is not fitted yet.")
        y_pred = X_test @ self.weights + self.intercept
        return y_pred

# Linear regression (stochastic gradient descent)

## Mathematical description 

$ y $ - predicted target variable

$ t $ - target variable

$ \mu $ - learning rate

$ \mathbf{w} $ - vector with linear regression weights

$ N $ - batch size

$  $

$ \mathbf{w}(i+1) = \mathbf{w}(i) - \Delta \mathbf{w}(i) $

$ \Delta \mathbf{w} = \mu \frac{\partial \xi}{\partial \mathbf{w}} $

$ \frac{\partial \xi_i}{\partial \mathbf{w}} = \frac{\partial \xi_i}{\partial y_i} \frac{\partial y_i}{\partial \mathbf{w}} $

$ \frac{\partial \xi_i}{\partial y_i} = \frac{\partial (t_i - y_i)^2}{\partial y_i} = - 2 (t_i - y_i) = 2 (y_i - t_i) $

$ \frac{\partial y_i}{\partial \mathbf{w}} = \frac{\partial (\mathbf{x}_i \cdot \mathbf{w})}{\partial \mathbf{w}} = \mathbf{x}_i $

$ \Delta \mathbf{w} = \mu \cdot \frac{\partial \xi_i}{\partial \mathbf{w}} = \mu \cdot 2 \mathbf{x}_i (y_i - t_i) $

$ \Delta \mathbf{w} = 2 * \mu * \frac{1}{N} \sum_{i=1}^{N} \mathbf{x}_i (y_i - t_i) $

It makes sense to use a variable learning rate: first, when the minimum point is still far away, move quickly, and later, after a certain number of iterations, take slower steps.
One way to set the step size is as follows:

$ \mu_t = \frac{\mu}{t} $

$ \mu_t $ - learning rate at epoch t

$ \mu $ - initial learning rate

$ t $ - epoch number

There can be many conditions for stopping the algorithm learning:
- reaching the specified number of iterations;
- euclidean distance between weights in two adjacent iterations below a certain threshold;
- the last n iterations, the loss function has not improved
- and etc.
In this implementation, I will use the first two criteria.

## Implementation in Python

In [4]:
def euclid_distance(vec_1: ndarray, vec_2: ndarray) -> float:
    return np.sqrt(np.power(vec_1 - vec_2, 2).sum())


def mse(y_true: ndarray, y_pred: ndarray) -> float:
    return np.power(y_true - y_pred, 2).mean()


class LinearRegressionSGD:
    def __init__(self, learning_rate: float) -> None:
        self.weights = None
        self.intercept = None
        self.errors = None
        self.lr = learning_rate
        self.errors = list()

    def fit(
        self,
        X_train: ndarray,
        y: ndarray,
        n_epochs: int,
        min_weight_dist: float = 1e-8,
        batch_size: Optional[int] = None,
    ) -> None:
        n_samples = len(y)
        if batch_size is None:
            batch_size = n_samples

        X = np.hstack((X_train, np.ones((len(X_train), 1))))
        w = np.random.normal(size=X_train.shape[1] + 1)
        w_prev = w + min_weight_dist * 10e5

        epoch_num = 0

        while euclid_distance(w_prev, w) > min_weight_dist and epoch_num < n_epochs:
            epoch_num += 1
            cur_lr = self.lr / epoch_num
            inds = np.random.choice(np.arange(n_samples), size=min(batch_size, n_samples), replace=False)

            y_pred = X[inds] @ w
            y_true = y[inds]
            N = len(inds)
            gradient_step = (X[inds] * (y_pred - y_true).reshape((-1, 1))).sum(axis=0) * cur_lr / N
            w_prev = w
            w = w - gradient_step
            self.errors.append(mse(y, X @ w))

        self.weights = w[:-1]
        self.intercept = w[-1]

    def predict(self, X_test: ndarray) -> ndarray:
        if self.weights is None:
            raise Exception("This linear regression instance is not fitted yet.")
        y_pred = X_test @ self.weights + self.intercept
        return y_pred