In [1]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

from sklearn.linear_model import LinearRegression, Ridge, Lasso, LogisticRegression
from sklearn.datasets import make_regression, make_classification
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_squared_error, r2_score, accuracy_score, roc_auc_score
from sklearn.preprocessing import StandardScaler

from abc import ABC, abstractmethod

# Linear Regression settings

Предполагаемая функция модели:
$h(X) = Xw$ <p>

Функция потерь:
$J(\theta|X,Y) = \frac{1}{n}||Y - Xw||_2^2$ <p>

Оптимизационная проблема:
$argmin_\theta J(\theta|X,Y)$ <p>
    
Градиент:
$\nabla J(\theta) = \frac{2}{n}X^T(Xw - Y)$

L2 функция потерь:
$J(\theta|X,Y) = \frac{1}{n}||Y - Xw||_2^2  + \lambda\theta^2$ <p>
    
L2 градиент:
$\nabla J(\theta) = \frac{2}{n}X^T(Xw - Y) + 2\lambda\theta$

L1 функция потерь:
$J(\theta|X,Y) = \frac{1}{2n}||Y - Xw||_2^2  + \lambda|\theta|$ <p>
    
L1 градиент:
$\nabla J(\theta) = \frac{1}{n}X^T(Xw - Y) + \lambda sign(\theta)$

# Logistic Regression settings

Предполагаемая функция модели:
$h(X) = g(z) = \frac{1}{1 + e^{-z}}; \ z=Xw$

Связывающая функция:
$g^{-1}(z)=log(\frac{p(Y=1|X)}{1-p(Y=1|X)})=Xw$

Функция потерь:
$J(\theta|X,Y) = -\frac{1}{n}\sum_i^n y_i*log(p(x_i)) + (1-y_i)*log(1-p(x_i))$

Оптимизационная проблема:
$argmin_\theta J(\theta|X,Y)$

Градиент:
$\nabla J(\theta) = \frac{1}{n}X^T(g(z) - Y)$

# Implementation

In [7]:
class BaseLinearEstimator(ABC):
    '''
    Базовый класс для линейной и логистической регрессии с регуляризацией.
    
    Реализовано два метода оптимизации - обычный градиентный и стохастический градиентый спуск
    '''
    
    def __init__(
        self,
        sgd_sample: int = None,
        n_iter: int = 1000, 
        learning_rate: float = 0.1, 
        tolerance: float = 1e-4, 
        verbose: bool = False,
        alpha: float = 0.,
        penalty: str = 'l2'
    ) -> None:
        self.sgd_sample = sgd_sample
        self.n_iter = n_iter
        self.learning_rate = learning_rate
        self.tolerance = tolerance
        self.verbose = verbose
        self.alpha = alpha
        if penalty not in ['l1','l2']:
            raise ValueError('penalty must be l1 or l2')
        self.penalty = penalty
        
    @staticmethod
    def add_constant(X: np.array) -> np.array:
        '''
        Добавляет intercept к матрице X в качестве нового поля с индексом 0 
        '''
        return np.hstack((np.ones(X.shape[0]).reshape(-1,1), X))
    
    @abstractmethod
    def _compute_loss(self, weights: np.array) -> float:
        '''
        Рассчитывает функцию потерь
        '''
        pass
    
    @abstractmethod
    def _compute_gradient(self, weights: np.array, X: np.array = None, y: np.array = None) -> np.array:
        '''
        Рассчитывает градиент функции потерь. X и y должны быть не None для SGD
        '''
        pass

    @staticmethod
    def shuffle(X: np.array, y: np.array) -> np.array:
        '''
        Перемешивает данные X и y
        '''
        shuffled_idx = np.random.permutation(X.shape[0])
        return X[shuffled_idx, :], y[shuffled_idx]
    
    def fit(self, X: np.array, y: np.array) -> None:
        self._X, self._y = self.add_constant(X), y
        self._n_obs, self._n_dim = self._X.shape
        self._init_weights = np.random.normal(size=self._n_dim, scale=1)
        if self.sgd_sample is None:
            self.weights, self.losses, self.epoch = self._initiate_gradient_descent()    
        else:
            self.weights, self.losses, self.epoch = self._initiate_stochastic_gradient_descent()

    def _initiate_gradient_descent(self) -> tuple[np.array, np.array, int]:
        '''
        Инициализирует алгоритм градиентного спуска
        '''
        weights = self._init_weights
        losses = [self._compute_loss(weights)]
        for epoch in range(self.n_iter):
            if callable(self.learning_rate):
                step = self.learning_rate(epoch) * self._compute_gradient(weights)
            else:
                step = self.learning_rate * self._compute_gradient(weights)
            if np.all(np.abs(step) < self.tolerance):
                print(f'Convergence has been reached. Epochs taken - {epoch}')
                break
            weights -= step
            losses.append(self._compute_loss(weights))
            if self.verbose == True and epoch % 10 == 0:
                print('Epoch %s; Loss %s' % (epoch, self._compute_loss(weights)))
        return weights, losses, epoch
    
    def _initiate_stochastic_gradient_descent(self) -> tuple[np.array, np.array, int]:
        '''
        Инициализирует алгоритм стохастического градиентного спуска
        '''
        weights = self._init_weights
        losses = [self._compute_loss(weights)]
        for epoch in range(self.n_iter):
            X, y = self.shuffle(self._X, self._y)
            losses_ = []
            for start in range(0, self._n_obs, self.sgd_sample):
                end = start + self.sgd_sample
                step = self.learning_rate * self._compute_gradient(weights, X[start:end, :], y[start:end])
                if np.all(np.abs(step) < self.tolerance):
                    print(f'Convergence has been reached.  Epochs taken - {epoch}')
                    break
                weights -= step
                losses_.append(self._compute_loss(weights))
            losses.append(np.mean(losses_))
            if self.verbose == True and epoch % 10 == 0:
                print('Epoch %s; Loss %s' % (epoch, self._compute_loss(weights)))
        return weights, losses_, epoch

    @abstractmethod
    def predict(self, X: np.array) -> np.array:
        pass

In [8]:
class LinearRegressionRegularized(BaseLinearEstimator):
       
    def _compute_loss(self, weights: np.array) -> float:
        if self.penalty == 'l1':
            return np.square(self._y - np.dot(self._X, weights)).mean() \
                    + self.alpha * np.sum(np.abs(weights[1:]))
        if self.penalty == 'l2':
            return np.square(self._y - np.dot(self._X, weights)).mean() \
                    + self.alpha * np.sum(np.square(weights[1:])) 
    
    def _compute_gradient(self, weights: np.array, X: np.array = None, y: np.array = None) -> np.array:
        X_ = X if X is not None else self._X
        y_ = y if y is not None else self._y
        if self.penalty == 'l1':
            return (np.dot(X_.T, (np.dot(X_, weights) - y_)) \
                    + np.hstack([0, self.alpha * np.sign(weights[1:])])) / X_.shape[0]
        if self.penalty == 'l2':
            return (2 * np.dot(X_.T, (np.dot(X_, weights) - y_)) \
                    + np.hstack([0, 2 * self.alpha * weights[1:]])) / X_.shape[0]
        
    def predict(self, X: np.array = None) -> np.array:
        if X is None:
            return np.dot(self._X, self.weights)
        else:
            X = self.add_constant(X)
            return np.dot(X, self.weights)

In [9]:
class LogisticRegressionRegularized(BaseLinearEstimator):

    @staticmethod
    def sigmoid(X: np.array, weights: np.array) -> np.array:
        return 1 / (1 + np.exp(-np.dot(X, weights)))
    
    def _compute_loss(self, weights: np.array) -> float:
        if self.penalty == 'l1':
            return -(self._y*(np.log(self.sigmoid(self._X, weights))) \
                    + (1-self._y)*(np.log(1-self.sigmoid(self._X, weights)))).mean() \
                    + self.alpha * np.sum(np.abs(weights[1:]))
        if self.penalty == 'l2':
            return -(self._y*(np.log(self.sigmoid(self._X, weights))) \
                    + (1-self._y)*(np.log(1-self.sigmoid(self._X, weights)))).mean() \
                    + self.alpha * np.sum(np.square(weights[1:])) 
    
    def _compute_gradient(self, weights: np.array, X: np.array = None, y: np.array = None) -> np.array:
        X_ = X if X is not None else self._X
        y_ = y if y is not None else self._y
        if self.penalty == 'l1':
            return (np.dot(X_.T, (self.sigmoid(X_, weights) - y_)) \
                    + np.hstack([0, self.alpha * np.sign(weights[1:])])) / X_.shape[0]
        if self.penalty == 'l2':
            return (np.dot(X_.T, (self.sigmoid(X_, weights) - y_)) \
                    + np.hstack([0, 2 * self.alpha * weights[1:]])) / X_.shape[0]
        
    def predict(self, X: np.array = None) -> np.array:
        if X is None:
            probabilities = self.sigmoid(self._X, self.weights)
            return [1 if p > 0.5 else 0 for p in probabilities]
        else:
            X = self.add_constant(X)
            probabilities = self.sigmoid(X, self.weights)
            return [1 if p > 0.5 else 0 for p in probabilities]
    
    def predict_proba(self, X: np.array = None) -> np.array:
        '''
        Предсказывает вероятность принадлежности к положительному классу
        '''
        if X is None:
            probabilities = self.sigmoid(self._X, self.weights)
            return probabilities
        else:
            X = self.add_constant(X)
            probabilities = self.sigmoid(X, self.weights)
            return probabilities

In [10]:
X, y = make_regression(n_samples=100_000, n_features=5, n_informative=4, noise=20, random_state=69)
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=69)

In [11]:
model_custom = LinearRegressionRegularized(learning_rate=0.1, verbose=True)
model_sklearn = LinearRegression()
model_custom.fit(X_train,y_train)
model_sklearn.fit(X_train,y_train)
custom_pred = model_custom.predict(X_test)
sklearn_pred = model_sklearn.predict(X_test)

print('\n---Results---\n')
print(f'custom mse: {mean_squared_error(y_test, custom_pred).round(2)}')
print(f'sklearn mse: {mean_squared_error(y_test, sklearn_pred).round(2)}')
print(f'custom r2: {r2_score(y_test, custom_pred).round(2)}')
print(f'sklearn r2: {r2_score(y_test, sklearn_pred).round(2)}')

Epoch 0; Loss 6128.496668450145
Epoch 10; Loss 464.8222547769154
Epoch 20; Loss 402.0686670094342
Epoch 30; Loss 401.3727915326166
Epoch 40; Loss 401.3650680326829
Epoch 50; Loss 401.3649822248559
Convergence has been reached. Epochs taken - 54

---Results---

custom mse: 396.34
sklearn mse: 396.34
custom r2: 0.96
sklearn r2: 0.96


In [12]:
model_custom = LinearRegressionRegularized(learning_rate=lambda iter: 0.5 * (0.80**iter), verbose=True)
model_sklearn = LinearRegression()
model_custom.fit(X_train,y_train)
model_sklearn.fit(X_train,y_train)
custom_pred = model_custom.predict(X_test)
sklearn_pred = model_sklearn.predict(X_test)

print('\n---Results---\n')
print(f'custom mse: {mean_squared_error(y_test, custom_pred).round(2)}')
print(f'sklearn mse: {mean_squared_error(y_test, sklearn_pred).round(2)}')
print(f'custom r2: {r2_score(y_test, custom_pred).round(2)}')
print(f'sklearn r2: {r2_score(y_test, sklearn_pred).round(2)}')

Epoch 0; Loss 402.1721136029288
Epoch 10; Loss 401.36500116118265
Convergence has been reached. Epochs taken - 15

---Results---

custom mse: 396.34
sklearn mse: 396.34
custom r2: 0.96
sklearn r2: 0.96


In [13]:
model_custom = LinearRegressionRegularized(sgd_sample=1000, n_iter=10, learning_rate=0.01, verbose=True)
model_sklearn = LinearRegression()
model_custom.fit(X_train,y_train)
model_sklearn.fit(X_train,y_train)
custom_pred = model_custom.predict(X_test)
sklearn_pred = model_sklearn.predict(X_test)

print('\n---Results---\n')
print(f'custom mse: {mean_squared_error(y_test, custom_pred).round(2)}')
print(f'sklearn mse: {mean_squared_error(y_test, sklearn_pred).round(2)}')
print(f'custom r2: {r2_score(y_test, custom_pred).round(2)}')
print(f'sklearn r2: {r2_score(y_test, sklearn_pred).round(2)}')

Epoch 0; Loss 729.871984944402

---Results---

custom mse: 396.35
sklearn mse: 396.34
custom r2: 0.96
sklearn r2: 0.96


In [14]:
model_custom = LinearRegressionRegularized(learning_rate=0.1, alpha=2000, verbose=True)
model_sklearn = Ridge(alpha=2000)
model_custom.fit(X_train,y_train)
model_sklearn.fit(X_train,y_train)
custom_pred = model_custom.predict(X_test)
sklearn_pred = model_sklearn.predict(X_test)

print('\n---Results---\n')
print(f'custom mse: {mean_squared_error(y_test, custom_pred).round(2)}')
print(f'sklearn mse: {mean_squared_error(y_test, sklearn_pred).round(2)}')
print(f'custom r2: {r2_score(y_test, custom_pred).round(2)}')
print(f'sklearn r2: {r2_score(y_test, sklearn_pred).round(2)}')

Epoch 0; Loss 649320.8885885312
Epoch 10; Loss 13926260.438056858
Epoch 20; Loss 16172987.961165404
Epoch 30; Loss 16404308.91330121
Epoch 40; Loss 16427279.775996312
Epoch 50; Loss 16429553.4247604
Convergence has been reached. Epochs taken - 53

---Results---

custom mse: 401.45
sklearn mse: 401.45
custom r2: 0.96
sklearn r2: 0.96


In [15]:
X, y = make_classification(n_samples=10000, n_features=10, n_informative=5, n_redundant=5, n_classes=2, random_state=69)
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=69)

In [16]:
model_custom = LogisticRegressionRegularized(learning_rate=0.1, verbose=False)
model_sklearn = LogisticRegression(penalty=None)
model_custom.fit(X_train,y_train)
model_sklearn.fit(X_train,y_train)
custom_pred = model_custom.predict(X_test)
custom_prob = model_custom.predict_proba(X_test)
sklearn_pred = model_sklearn.predict(X_test)
sklearn_prob = model_sklearn.predict_proba(X_test)[:,1]

print('\n---Results---\n')
print(f'custom accuracy: {accuracy_score(y_test, custom_pred).round(2)}')
print(f'sklearn accuracy: {accuracy_score(y_test, sklearn_pred).round(2)}')
print(f'custom roc-auc: {roc_auc_score(y_test, custom_prob).round(2)}')
print(f'sklearn roc-auc: {roc_auc_score(y_test, sklearn_prob).round(2)}')

Convergence has been reached. Epochs taken - 324

---Results---

custom accuracy: 0.86
sklearn accuracy: 0.86
custom roc-auc: 0.92
sklearn roc-auc: 0.92
