In [3]:
import numpy as np
from numpy.linalg import norm
from numpy.random import choice
from numpy.typing import ArrayLike, NDArray
from typing import Callable

## Стохастический градиент

In [69]:
def StochasticGradientDescent(
    start: ArrayLike,
    X: NDArray,
    y: NDArray,
    L: Callable,
    L_grad: Callable,
    learning_rate: float = 0.01,
    batch_size: int = 64,
    max_iter=1000,
    tol=1e-7,
    **kwargs
) -> dict:
    curr_point = start
    W_error = None

    curr_iter = 0
    while W_error is None or (curr_iter < max_iter and W_error >= tol):
        idx = choice(X.shape[0], batch_size, replace=False)
        batch_X, batch_y = X[idx, :], np.array([y[idx]]).T

        curr_value = L(curr_point, batch_X, batch_y, **kwargs)
        curr_grad = L_grad(curr_point, batch_X, batch_y, **kwargs)

        curr_point = curr_point - learning_rate * curr_grad
        W_error = norm(learning_rate * curr_grad)
        curr_iter += 1

    return {
        "point": curr_point.reshape((X.shape[1] + 1)),
        "L_value": curr_value,
        "grad_value": curr_grad.reshape((X.shape[1] + 1)),
        "iterations": curr_iter,
    }

### Тест стохастического градиентного спуска

In [None]:
def L(w, X, y):
    X_tmp = np.hstack([X, np.ones((y.size, 1))])
    return norm(X_tmp.dot(w) - y) ** 2 / y.size


def L_grad(w, X, y):
    X_tmp = np.hstack([X, np.ones((y.size, 1))])
    return 2 * X_tmp.T.dot(X_tmp.dot(w) - y) / y.size

np.random.seed(42)
nrow, ncol = 500, 10
X = np.array(normal(0, 1, ncol * nrow)).reshape(nrow, ncol)
y = normal(0, 1, nrow)
w_start = np.array([normal(0, 1, ncol + 1)]).T

StochasticGradientDescent(start = w_start, X = X, y = y, L = L, L_grad = L_grad, batch_size=100)

6.336279110461559e-06


In [None]:
class SGDLinearRegressor(RegressorMixin):
    def __init__(
        self,
        lr=0.01,
        regularization=1.0,
        delta_converged=1e-3,
        max_steps=1000,
        batch_size=64,
    ):
        self.lr = lr
        self.regularization = regularization
        self.max_steps = max_steps
        self.delta_converged = delta_converged
        self.batch_size = batch_size
        self.W = None
        self.b = None

    def fit(self, X, Y):
        W_err = None
        self.W = normal(size=(X.shape[1], 1))
        self.b = normal()
        steps = 0

        while W_err is None or (
            W_err > self.delta_converged and steps <= self.max_steps
        ):
            idx = choice(X.shape[0], self.batch_size, replace=False)
            batch_X, batch_Y = X[idx, :], np.array([Y[idx]]).T
            cur_pred = batch_X.dot(self.W) + self.b
            err = cur_pred - batch_Y
            grad_W = (
                2 * batch_X.T.dot(err) / self.batch_size
                + 2 * self.regularization * self.W
            )
            grad_b = 2 * err.sum() / self.batch_size

            W_err = norm(self.lr * grad_W)
            self.W -= self.lr * grad_W
            self.b -= self.lr * grad_b
            steps += 1
        return self

    def predict(self, X):
        return (X.dot(self.W) + self.b)[:, 0]