# Линейная регрессия
## Самописный класс MyLineReg

In [1]:
import pandas as pd
import numpy as np
import random

In [2]:
class MyLineReg:
    def __init__(self, n_iter=100, learning_rate=0.1, metric=None, reg=None, l1_coef=0, l2_coef=0, sgd_sample=0.1, random_state=42):
        """Initialize the linear regression model parameters."""
        self.n_iter = n_iter
        self.learning_rate = learning_rate
        self.weights = None
        self.metric = metric
        self.best_score = None
        self.reg = reg
        self.l1_coef = l1_coef
        self.l2_coef = l2_coef
        self.sgd_sample = sgd_sample
        self.random_state = random_state

    def fit(self, X, Y, verbose=False):
        """Fit the linear regression model to the training data."""
        random.seed(self.random_state)

        Y = Y.to_numpy()
        X = np.hstack((np.ones((X.shape[0], 1)), X))
        N = X.shape[0]
        self.weights = np.ones(X.shape[1]) * 0.01

        for i in range(self.n_iter):

            if self.sgd_sample is None:
                sample_size = N
            elif isinstance(self.sgd_sample, int):
                sample_size = self.sgd_sample
            else:
                sample_size = int(N * self.sgd_sample)

            sample_row_idx = random.sample(range(N), sample_size)
            batch_X = X[sample_row_idx]
            batch_Y = Y[sample_row_idx]

            if callable(self.learning_rate):
                current_learning_rate = self.learning_rate(i + 1)
            else:
                current_learning_rate = self.learning_rate

            Y_pred = self._predict(batch_X)
            error = self._calculate_error(batch_Y, Y_pred)

            self.best_score = error
            gradient = self._calculate_gradient(
                batch_X, batch_Y, Y_pred, sample_size)

            self.weights -= current_learning_rate * gradient

            if verbose and i % verbose == 0:
                print(
                    f"Iteration {i} | Loss: {self._calculate_loss(batch_Y, Y_pred)} | Metric: {error}")

    def _calculate_error(self, Y, Y_pred):
        """Calculate the error metric based on the provided type."""
        if self.metric == 'mse':
            return ((Y_pred - Y) ** 2).mean()
        elif self.metric == 'mae':
            return np.mean(np.abs(Y_pred - Y))
        elif self.metric == 'rmse':
            return (((Y_pred - Y) ** 2).mean()) ** 0.5
        elif self.metric == 'mape':
            return ((np.abs((Y_pred - Y) / Y)) * 100).mean()
        elif self.metric == 'r2':
            ss_res = ((Y_pred - Y) ** 2).sum()
            ss_tot = ((Y - Y.mean()) ** 2).sum()
            return 1 - ss_res / ss_tot
        else:
            return ((Y_pred - Y) ** 2).mean()

    def _calculate_loss(self, Y, Y_pred):
        """Calculate loss for the current predictions."""
        base_loss = ((Y_pred - Y) ** 2).mean()
        if self.reg == 'l1':
            return base_loss + self.l1_coef * np.sum(np.abs(self.weights))
        elif self.reg == 'l2':
            return base_loss + self.l2_coef * np.sum(self.weights ** 2)
        elif self.reg == 'elasticnet':
            return base_loss + self.l1_coef * np.sum(np.abs(self.weights)) + self.l2_coef * np.sum(self.weights ** 2)
        else:
            return base_loss

    def _calculate_gradient(self, X, Y, Y_pred, N):
        """Calculate the gradient for weight update."""
        error_term = Y_pred - Y
        base_gradient = (2 / N) * X.T.dot(error_term)

        if self.reg == 'l1':
            return base_gradient + self.l1_coef * np.sign(self.weights)
        elif self.reg == 'l2':
            return base_gradient + self.l2_coef * (2 * self.weights)
        elif self.reg == 'elasticnet':
            return base_gradient + self.l1_coef * np.sign(self.weights) + self.l2_coef * (2 * self.weights)
        else:
            return base_gradient

    def _predict(self, X):
        """Return the predicted output for given features."""
        return X.dot(self.weights)

    def predict(self, X):
        """Make predictions for the input data."""
        X = X.to_numpy()
        X = np.hstack((np.ones((X.shape[0], 1)), X))
        return self._predict(X)

    def get_coef(self):
        """Return the model coefficients."""
        return self.weights[1:]

    def get_best_score(self):
        """Return the best score observed during training."""
        return self.best_score

## Сравнение самописного класса со встроенным в библиотеку scikit-learn
Для сравнения используем датасет fetch_california_housing

In [3]:
from sklearn.datasets import fetch_california_housing
from sklearn.model_selection import train_test_split
from sklearn.linear_model import LinearRegression
from sklearn.metrics import mean_squared_error as mse
from sklearn.linear_model import LinearRegression

california_housing_data = fetch_california_housing(as_frame=True)
california_housing_df = california_housing_data.frame
california_housing_df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 20640 entries, 0 to 20639
Data columns (total 9 columns):
 #   Column       Non-Null Count  Dtype  
---  ------       --------------  -----  
 0   MedInc       20640 non-null  float64
 1   HouseAge     20640 non-null  float64
 2   AveRooms     20640 non-null  float64
 3   AveBedrms    20640 non-null  float64
 4   Population   20640 non-null  float64
 5   AveOccup     20640 non-null  float64
 6   Latitude     20640 non-null  float64
 7   Longitude    20640 non-null  float64
 8   MedHouseVal  20640 non-null  float64
dtypes: float64(9)
memory usage: 1.4 MB


In [4]:
from sklearn.preprocessing import StandardScaler
X = california_housing_df.drop(columns=['MedHouseVal'])
y = california_housing_df['MedHouseVal']

X_train, X_test, y_train, y_test = train_test_split(
    X, y, test_size=0.2, random_state=42)

print(f"Размеры обучающей выборки: {X_train.shape}, {y_train.shape}")
print(f"Размеры тестовой выборки: {X_test.shape}, {y_test.shape}")


scaler = StandardScaler()
X_train_scaled = pd.DataFrame(data=scaler.fit_transform(X_train))
X_test_scaled = pd.DataFrame(data=scaler.transform(X_test))

Размеры обучающей выборки: (16512, 8), (16512,)
Размеры тестовой выборки: (4128, 8), (4128,)


In [5]:
sklearn_regression = LinearRegression()
sklearn_regression.fit(X_train_scaled, y_train)
sklearn_prediction = sklearn_regression.predict(X_test_scaled)
print(
    f'Ошибка предсказания модели из библиотеки sklearn: {mse(sklearn_prediction, y_test)}')

Ошибка предсказания модели из библиотеки sklearn: 0.5558915986952444


In [6]:
my_model = MyLineReg(n_iter=900, learning_rate= lambda iter: 0.85 * (0.85 ** iter))
my_model.fit(X_train_scaled, y_train, verbose=100, )
my_model_prediction = my_model.predict(X_test_scaled)
print(
    f"Ошибка предсказания самописной модели: {my_model._calculate_error(y_test, my_model_prediction)}")

Iteration 0 | Loss: 5.504583989238672 | Metric: 5.504583989238672
Iteration 100 | Loss: 0.5608681110697722 | Metric: 0.5608681110697722
Iteration 200 | Loss: 0.5502441789164211 | Metric: 0.5502441789164211
Iteration 300 | Loss: 0.5226595309423533 | Metric: 0.5226595309423533
Iteration 400 | Loss: 0.5778929925640858 | Metric: 0.5778929925640858
Iteration 500 | Loss: 0.5401883627853428 | Metric: 0.5401883627853428
Iteration 600 | Loss: 0.5537862868180132 | Metric: 0.5537862868180132
Iteration 700 | Loss: 0.5101366901601977 | Metric: 0.5101366901601977
Iteration 800 | Loss: 0.5084999224794219 | Metric: 0.5084999224794219
Ошибка предсказания самописной модели: 0.5698712523981718


# Вывод
Самописная модель не иедальна. Она дает результат чуть хуже чем стандартная модель из библиотеки. Так же проблемой модели является то, что для того, чтобы она работала необходимо масштабировать данные (это несложно добавляется в класс, но модель создана для понимания работы линейной регрессии "под капотом", поэтому не считаю это необходимым).

Разница в метриках модлелей достаточно мала, чтобы утвердить, что самописная модель написана правильно.