Модель логистической регрессии, реализованная на Python с использованием библиотек Pandas и NumPy.

In [1]:
import random
import pandas as pd
import numpy as np
import time
import functools

from sklearn.model_selection import train_test_split
from sklearn.datasets import make_classification
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import roc_auc_score

In [2]:
X, y = make_classification(n_samples=10000, n_features=14, n_informative=10, random_state=42)
X = pd.DataFrame(X)
y = pd.Series(y)
X.columns = [f'col_{col}' for col in X.columns]

X_train, X_test, y_train, y_test = train_test_split(X, y, test_size = 0.20)

In [3]:
class MyLogReg():
    """
    Логистическая регрессия

    Parameters
    ----------
    n_iter : int, optional
        Количество шагов градиентного спуска, by default 10
    learning_rate : float, optional
        Коэффициент скорости обучения градиентного спуска, by default 0.1
        Если на вход пришла lambda-функция, то learning_rate вычисляется на каждом шаге на основе переданной функцией
    weights : np.ndarray, optional
        Веса модели
    metric : str, optional
        Метрика, которая будет вычисляться параллельно с функцией потерь.
        Принимает одно из следующих значений: accuracy, precision, recall, f1, roc_auc, by default None
    reg : str, optional
        Вид регуляризации. Принимает одно из следующих значений: l1, l2, elasticnet, by default None
    l1_coef : float, optional
        Коэффициент L1 регуляризации. Принимает значения от 0.0 до 1.0, by default 0
    l2_coef : float, optional
        Коэффициент L2 регуляризации. Принимает значения от 0.0 до 1.0, by default 0
    sgd_sample : Union[int, float], optional
        Количество образцов, которое будет использоваться на каждой итерации обучения.
        Может принимать целые числа, либо дробные от 0.0 до 1.0, by default None
    random_state : int, optional
        Сид для воспроизводимости результата, by default 42
    
    """
    @staticmethod
    def timer(func):
        @functools.wraps(func)
        def wrapper(*args, **kwargs):
            start = time.perf_counter()
            val = func(*args, **kwargs)
            end = time.perf_counter()
            work_time = end - start
            print(f'Время выполнения {func.__name__}: {round(work_time, 4)} сек.')
            return val
        return wrapper
    
    @staticmethod
    def indicator(numbers: pd.Series) -> pd.Series:
        numbers[numbers > 0.5] = 1
        numbers[numbers <= 0.5] = 0
        return numbers
    
    @staticmethod
    def accuracy(y_pred: pd.Series, y_true: pd.Series) -> float:
        numerator = MyLogReg.indicator(y_pred)[MyLogReg.indicator(y_true) == MyLogReg.indicator(y_pred)].count()
        denominator = y_true.shape[0]
        return round(numerator / denominator, 10)
    
    @staticmethod
    def precision(y_pred: pd.Series, y_true: pd.Series) -> float:
        numerator = MyLogReg.indicator(y_true)[(MyLogReg.indicator(y_pred) == 1) & (MyLogReg.indicator(y_true) == 1)].count()
        denominator = MyLogReg.indicator(y_pred)[MyLogReg.indicator(y_pred) == 1].count()
        return round(numerator / denominator, 10)
    
    @staticmethod
    def recall(y_pred: pd.Series, y_true: pd.Series) -> float:
        numerator = MyLogReg.indicator(y_pred)[(MyLogReg.indicator(y_pred) == 1) & (MyLogReg.indicator(y_true) == 1)].count()
        denominator = MyLogReg.indicator(y_pred)[MyLogReg.indicator(y_true) == 1].count()
        return round(numerator / denominator, 10)
    
    @staticmethod
    def f1(y_pred: pd.Series, y_true: pd.Series) -> float:
        result = 2 * MyLogReg.precision(y_true, y_pred) * MyLogReg.recall(y_true, y_pred) / (MyLogReg.precision(y_true, y_pred) + MyLogReg.recall(y_true, y_pred))
        return round(result, 10)
    
    @staticmethod
    def roc_auc(y_pred: pd.Series, y_true: pd.Series) -> float:
        sqr = 0
        n_ones = np.sum(y_true == 1)
        n_zeroes = np.sum(y_true == 0)
        m = n_ones * n_zeroes
        trip = sorted(zip(y_pred, y_true), reverse=True)
        for _, true in trip:
            if true == 1:
                sqr += n_zeroes
            else:
                n_zeroes -= 1
        return sqr / m        
    
    @staticmethod
    def get_metric_score(y_true: pd.Series, y_pred: pd.Series, metric: str) -> float:
        if metric == 'accuracy':
            return MyLogReg.accuracy(y_true, y_pred)
        elif metric == 'precision':
            return MyLogReg.precision(y_true, y_pred)
        elif metric == 'recall':
            return MyLogReg.recall(y_true, y_pred)
        elif metric == 'f1':
            return MyLogReg.f1(y_true, y_pred)
        elif metric == 'roc_auc':
            return MyLogReg.roc_auc(y_true, y_pred)    
        
    def __calc_grad(self, X: pd.DataFrame, y: pd.Series) -> float:
        y_ = (1 + np.exp(- X @ self.weights)) ** (-1)
        gradient = (y_ - y) @ X / y.size       
        if self.reg:
            if self.reg == 'l1' or self.reg == 'elasticnet':
                gradient += self.l1_coef * np.sign(self.weights)
            if self.reg == 'l2' or self.reg == 'elasticnet':
                gradient += self.l2_coef * 2 * self.weights        
        return gradient
    
    def __calc_loss(self, X: pd.DataFrame, y: pd.Series) -> float:
        eps = 1e-15    
        y_ = (1 + np.exp(- X @ self.weights)) ** (-1)
        log_loss = (-(y * np.log(y_ + eps) - (1 - y) * np.log(1 - y_ + eps))).mean()
        return log_loss
       
    def __init__(self, 
                 n_iter: int = 10, 
                 learning_rate: float = 0.1, 
                 weights: np.ndarray = None, 
                 metric: str = None, 
                 reg: str = None, 
                 l1_coef: float = None, 
                 l2_coef: float = None,
                 sgd_sample: float = None,
                 random_state: int = 42) -> None:
        self.n_iter = n_iter
        self.learning_rate = learning_rate
        self.weights = weights
        self.metric = metric
        self.reg = reg
        self.l1_coef = l1_coef
        self.l2_coef = l2_coef
        self.sgd_sample = sgd_sample
        self.random_state = random_state
    
    @timer
    def fit(self, X: pd.DataFrame, y: pd.Series, verbose: int = False) -> None:   
         """Обучение линейной регрессии

        Parameters
        ----------
        X : pd.DataFrame
            Все фичи
        y : pd.Series
            Целевая переменная
        verbose : int, optional
            Указывает через сколько итераций градиентного спуска будет выводиться лог
        """
         random.seed(self.random_state)
         X = X.copy()
         X.insert(value=1, loc=0, column='w0')
         self.weights = np.ones(X.shape[1])      
         for i in range(1, self.n_iter + 1):
            
            #разделение трейна на батчи
            if isinstance(self.sgd_sample, float):
                sample_rows_idx = random.sample(range(X.shape[0]), round(X.shape[0] * self.sgd_sample))                
            if isinstance(self.sgd_sample, int):
                sample_rows_idx = random.sample(range(X.shape[0]), self.sgd_sample)            
            if not self.sgd_sample:
                sample_rows_idx = range(X.shape[0])                                                
            X_batch = X.iloc[sample_rows_idx]
            y_batch = y.iloc[sample_rows_idx]
            
            #вычисление лосса и учет регулиризации
            log_loss = self.__calc_loss(X_batch, y_batch)            
            if self.reg == 'l1' or self.reg == 'elasticnet':
                log_loss += self.l1_coef * np.sign(self.weights)
            if self.reg == 'l2' or self.reg == 'elasticnet':
                log_loss += self.l1_coef * self.weights * 2
                
            #вычисление весов и скорости градиентного спуска    
            if isinstance(self.learning_rate, (int, float)):    
                self.weights -= self.__calc_grad(X_batch, y_batch) * self.learning_rate
            else:
                self.weights -= self.__calc_grad(X_batch, y_batch) * self.learning_rate(i)
            
            #вычисление последнего значения метрики
            if self.metric:
                y_ = (1 + np.exp(- X @ self.weights)) ** (-1)
                self.score = MyLogReg.get_metric_score(y_, y, self.metric)
            
            #вывод лога
            if verbose and metric:
                if (i + 1) % verbose == 0:
                    print(f'{i} | loss: {MSE} | {self.metric}: ')
                elif i == 0:
                    print(f'start | loss: {MSE} | {self.metric}: ')                
    
    @timer
    def predict(self, X: pd.DataFrame) -> pd.Series:
        """Выдача предсказаных классов моделью

        Parameters
        ----------
        X : pd.DataFrame
            Матрица фичей
        """
        X.insert(value=1, loc=0, column='w0')
        s = 0
        y_ = (1 + np.exp(- X @ self.weights)) ** (-1)
        for i in range(y_.shape[0]):
            if y_[i] > 0.5:
                s += 1
        return s
    
    @timer
    def predict_proba(self, X: pd.DataFrame) -> pd.Series:
        """Выдача предсказаных вероятностей моделью

        Parameters
        ----------
        X : pd.DataFrame
            Матрица фичей
        """
        return (1 + np.exp(- X @ self.weights[1:])) ** (-1)
                        
    def get_coef(self) -> pd.Series:
        """Возвращает значений весов"""
        
        return self.weights[1:]

    def get_best_score(self) -> float:
        """Возвращает последнее значение метрики"""
        
        return self.score     
    
    def __str__(self) -> str:
        return f'MyLogReg class: n_iter={self.n_iter}, learning_rate={self.learning_rate}'

In [4]:
LogReg = MyLogReg(
    n_iter = 100, 
    learning_rate = lambda iter: 0.7 * (0.9 ** iter), 
    metric = 'roc_auc', 
    reg = 'elasticnet', 
    l1_coef = 0.1, 
    l2_coef = 0.001,
    sgd_sample = 0.1
)

In [5]:
LogReg.fit(X_train, y_train)
y_pred = LogReg.predict_proba(X_test)
print(LogReg.get_best_score())

Время выполнения fit: 0.6558 сек.
Время выполнения predict_proba: 0.0006 сек.
0.8788968944099379


In [6]:
y_pred_sklearn = LogisticRegression().fit(X_train, y_train).predict(X_test)
print(roc_auc_score(y_test, y_pred))

0.8164727954971859
