In [1]:
import numpy as np
import pandas as pd
from numpy.linalg import inv
from sklearn.preprocessing import MinMaxScaler
from sklearn.model_selection import train_test_split

In [3]:
class MyLinearRegression:
    def __init__(self, regularization=None, weight_calc='matrix', lambda_1=None, lambda_2=None, batch_size=20):

        if regularization not in [None, 'l1', 'l2', 'l1l2']:
            raise TypeError(f"Параметр regularization не может принимать значение '{regularization}'")
        if weight_calc not in ['matrix', 'gd', 'sgd']:
            raise TypeError(f"Параметр weight_calc не может принимать значение '{weight_calc}'")
        if regularization in ['l1', 'l1l2'] and lambda_1 is None:
            raise TypeError(f"Значение коэффициента регуляризации l1 не задано")
        if regularization in ['l2', 'l1l2'] and lambda_2 is None:
            raise TypeError(f"Значение коэффициента регуляризации l2 не задано")
        
        self.regularization = regularization
        self.weight_calc = weight_calc
        self.lambda_1 = lambda_1
        self.lambda_2 = lambda_2
        self.batch_size = batch_size
        self.coefs_ = None
        self.feature_names_in_ = None

    def fit(self, X: pd.DataFrame, y: pd.DataFrame):
        if isinstance(X, pd.DataFrame):
            feature_names = X.columns.tolist()
            X = np.array(X)
        else:
            feature_names = ['x' + str(i) for i in range(X.shape[1])]
        y = np.array(y)
        n_samples, n_features = X.shape
        X = np.hstack((np.ones((n_samples, 1)), X))
        self.feature_names_in_ = ['bias'] + feature_names

        if self.weight_calc == 'matrix':
            if self.regularization in ['l1', 'l1l2']:
                raise ValueError("Матричный метод не совместим с l1 и l1l2 регуляризацией")

            A = X.T.dot(X)
            b = X.T.dot(y)

            if self.regularization == 'l2':
                A += self.lambda_2 * np.eye(n_features + 1)

            self.coefs_ = inv(A).dot(b)

        elif self.weight_calc in ['gd', 'sgd']:
            self.coefs_ = np.array(np.random.rand(n_features + 1)).reshape(-1, 1)
            learning_rate = 0.05

            for _ in range(100000):
                if self.weight_calc == 'gd':
                    X_batch = X
                    y_batch = y
                elif self.weight_calc == 'sgd':
                    indices = np.random.choice(n_samples, self.batch_size, replace=False)
                    X_batch = X[indices]
                    y_batch = y[indices]

                predictions = np.array(X_batch.dot(self.coefs_)).reshape(-1, 1)
                error = predictions - y_batch
                gradient = X_batch.T.dot(error) / X_batch.shape[0]

                if self.regularization == 'l1':
                    gradient += self.lambda_1 * np.sign(self.coefs_)
                elif self.regularization == 'l2':
                    gradient += 2 * self.lambda_2 * self.coefs_
                elif self.regularization == 'l1l2':
                    gradient += self.lambda_1 * np.sign(self.coefs_) + 2 * self.lambda_2 * self.coefs_

                self.coefs_ -= learning_rate * gradient

    def predict(self, X):
        X = np.array(X)
        n_samples = X.shape[0]
        X = np.hstack((np.ones((n_samples, 1)), X))
        return X.dot(self.coefs_)

    def score(self, X, y):
        predictions = self.predict(X)
        y = np.array(y)
        u = ((y - predictions) ** 2).sum()
        v = ((y - y.mean()) ** 2).sum()
        return 1 - u/v

In [4]:
df = pd.read_csv('Used_fiat_500_in_Italy_dataset.csv', sep=',')
df

Unnamed: 0,model,engine_power,transmission,age_in_days,km,previous_owners,lat,lon,price
0,pop,69,manual,4474,56779,2,45.071079,7.46403,4490
1,lounge,69,manual,2708,160000,1,45.069679,7.70492,4500
2,lounge,69,automatic,3470,170000,2,45.514599,9.28434,4500
3,sport,69,manual,3288,132000,2,41.903221,12.49565,4700
4,sport,69,manual,3712,124490,2,45.532661,9.03892,4790
...,...,...,...,...,...,...,...,...,...
375,lounge,69,manual,4474,55976,2,45.610050,9.24234,5500
376,lounge,69,manual,4200,134717,1,44.102020,9.82024,5500
377,lounge,69,manual,3470,113344,1,41.003799,16.87294,5500
378,pop,69,automatic,3712,130000,1,45.810501,8.96474,5500


In [13]:
y = df[['price']]
X =df.drop(columns=['price', 'model', 'transmission', 'lat', 'lon'])

In [14]:
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=10)

In [15]:
scaler = MinMaxScaler()
scaler.fit(X_train)
X_train = pd.DataFrame(scaler.transform(X_train), columns=X_train.columns)
X_test = pd.DataFrame(scaler.transform(X_test), columns=X_test.columns)
X_train.head()

Unnamed: 0,engine_power,age_in_days,km,previous_owners
0,0.0,0.786731,0.683228,0.5
1,0.0,0.113422,0.088231,0.0
2,0.0,0.040289,0.0,0.5
3,0.0,0.079921,0.070342,0.0
4,0.0,0.813006,0.638714,0.5


In [16]:
reg = MyLinearRegression()
reg.fit(X_train, y_train)

print('train:', reg.score(X_train, y_train))
print('test:', reg.score(X_test, y_test))

train: 0.8462165600436948
test: 0.8392187425404127


In [17]:
reg = MyLinearRegression(weight_calc='sgd', batch_size=10)
reg.fit(X_train, y_train)

print('train:', reg.score(X_train, y_train))
print('test:', reg.score(X_test, y_test))

train: 0.8460192801480236
test: 0.8382381606204097


In [18]:
reg = MyLinearRegression(weight_calc='gd', regularization='l1', lambda_1=0.001)
reg.fit(X_train, y_train)

print('train:', reg.score(X_train, y_train))
print('test:', reg.score(X_test, y_test))

train: 0.8462165600254054
test: 0.839218603088049
