In [None]:
import pandas as pd
import numpy as np
import statistics
import math
import random


class LinearRegression:
    """
    Linear regression class.

    Typical use:
        model = MyLineReg(n_iter=50, learning_rate=0.15, metric="mae", reg="l1", l1_coef=0.25)
        model.fit(x=x, y=y, verbose=10)
        predictions = model.predict(samples=samples)

    Attributes:
        - reg: Type of regularization ("l1" for Lasso, "l2" for Ridge, "elasticnet" for ElsticNet).
        - metric: Metric for testing model ("mae" for Mean Absolute Error, "mse" for Mean Squared Error, "rmse" for Root Mean Squared Error, "r2" for Coefficient of Determination, "mape" for Mean Absolute Percentage Error).
        - _metric_value: The best metric score (If metric is not speicified, Loss function score is calculated).
        - n_iter: Count of gradient iterations.
        - _weights: Numpy array of model weights.
        - l1_coef: Coefficient of Lasslo regularization.
        - l2_coef: Coefficient of Ridge regularization.
        - learning_rate: Multiplier of gradient step (can be lambda function).

    Methods:
        - get_coef: Returns weights of a fitting model.
        - fit: Fitting a model to find the best weights (logging evry n step when verbose = n is specified).
        - get_best_score: Returns the best model metric score.
        - predict: Returns a numeric array of predicted target values.
    """

    def __init__(self, 
                 learning_rate: float = 0.1, 
                 n_iter: int = 100, 
                 metric: str = None, 
                 reg: str = None, 
                 l1_coef: float = 0, 
                 l2_coef: float = 0,
                 sgd_sample: float = None,
                 random_state: int = 42) -> None:
        self.n_iter = n_iter
        self.learning_rate = learning_rate
        self._weights = None
        self.metric = metric
        self.reg = reg
        self.l1_coef = l1_coef
        self.l2_coef = l2_coef
        self.sgd_sample = sgd_sample
        self.random_state = random_state
        
    def __str__(self) -> str:
        return f"{__class__.__name__} class: n_iter={self.n_iter}, learning_rate={self.learning_rate}" 
    
    def fit(self, X: pd.DataFrame, y: pd.Series, verbose = False) -> None:
        #Reset index for X dataset
        X.reset_index(drop=True)
        
        # Fill the first column of feature matrix with "1" values (for intercept)
        X.insert(loc = 0, column = 'x0', value = 1)
        
        num_observations, num_features = X.shape
        
        # Set initial weights equal to 1
        weights = [1] * num_features
        
        # Convert sgd_sample share to integer
        if self.sgd_sample and self.sgd_sample <= 1:
            self.sgd_sample = round(self.sgd_sample*num_observations)
            
        
        # Fix random seed
        random.seed(self.random_state)
        
        # Make iterations
        for i in range(1, self.n_iter + 1):
            
            # Select the subset for calculations 
            if self.sgd_sample:
                sample_rows_idx = random.sample(range(num_observations), self.sgd_sample)
                subset_num_observations = self.sgd_sample
                subset_X = X.iloc[sample_rows_idx]
                subset_y = y[sample_rows_idx]

            else:
                subset_num_observations = num_observations
                subset_X = X
                subset_y = y
            
            # Make predictions and calculate errors
            y_predicted = X.dot(weights)
            errors = y_predicted - y
            
            # Calculate Loss function
            if self.reg:
                match self.reg:
                    case "l1":
                        Loss_function = sum(errors ** 2) / num_observations + self.l1_coef * sum(np.abs(weights))
                    case "l2":
                        Loss_function =  sum(errors ** 2) / num_observations + 2 * self.l2_coef * sum(np.array(weights)**2)
                    case "elasticnet":
                        Loss_function = sum(errors ** 2) / num_observations + self.l1_coef * sum(np.abs(weights)) + + 2*self.l2_coef*sum(np.array(weights)**2)
            
            # When regularization is not defined MSE is a Loss function
            else:
                Loss_function = sum(errors ** 2) / num_observations
            
            # Calculate metric
            if self.metric:
                match self.metric:
                    case "mae":
                        metric_temp_value = sum(abs(errors)) / num_observations
                    case "mse":
                        metric_temp_value = sum(errors ** 2) / num_observations
                    case "rmse":
                        metric_temp_value = sqrt(sum(errors ** 2) / num_observations)
                    case "mape":
                        metric_temp_value = (100 / num_observations) * sum(abs(errors / y))
                    case "r2":
                        mean_y = statistics.mean(y)
                        metric_temp_value = 1 - sum(errors ** 2) / sum((y - mean_y) ** 2)
                        
                self._metric_value = metric_temp_value
            else:
                self._metric_value = Loss_function
                
            # Log after verbose iterations 
            if verbose and i % verbose == 0:
                
                if self.metric:
                    print(f'{i} | loss: {Loss_function} | {self.metric}: {metric_temp_value}')
                         
                else:
                    print(f'{i} | loss: {Loss_function}')
            
            # Make subset predictions and calculate subset errors
            subset_y_predicted = subset_X.dot(weights)
            subset_errors = subset_y_predicted - subset_y
            
            # Calculate gradient based on subset
            if self.reg:
                match self.reg:
                    case "l1":
                        gradient = 2 /  subset_num_observations *  subset_X.T.dot(subset_errors) + self.l1_coef * np.sign(weights)
                    case "l2":
                        gradient = 2 /  subset_num_observations *  subset_X.T.dot(subset_errors) + 2 * self.l2_coef * np.array(weights)
                    case "elasticnet":
                        gradient = 2 /  subset_num_observations *  subset_X.T.dot(subset_errors) + self.l1_coef * np.sign(weights) + 2*self.l2_coef*np.array(weights)
            
            else:
                gradient = 2 /  subset_num_observations *  subset_X.T.dot( subset_errors)
            
            # Calculate new weights
            if callable(self.learning_rate):
                weights -= self.learning_rate(i) * gradient
            else:
                weights -= self.learning_rate * gradient
        
        # Final (best) weights
        self.weights = weights  


    
    def get_coef(self) -> np.array:
        return np.array(self.weights[1:])
    
    def predict(self, X) -> np.array:
        
        # Fill the first column of feature matrix with "1" values (for intercept)
        X.insert(loc = 0, column = 'x0', value = 1)
        y_predicted = X.dot(self.weights)
        return np.array(y_predicted)
    
    
    def get_best_score(self) -> float:
        return self._metric_value

In [86]:
# Create ficticious dataset

from sklearn.datasets import make_regression

X, y = make_regression(n_samples=100, n_features=5, n_informative=3, noise=5, random_state=42)
X = pd.DataFrame(X)
y = pd.Series(y)
X.columns = [f'col_{col_number}' for col_number in X.columns]

In [87]:
# Create a class instance
sample_one = MyLineReg(n_iter = 50, sgd_sample = 0.3, learning_rate = 0.1, metric = "mae", reg = "l1", l1_coef = 0.3)

In [88]:
sample_one.fit(X, y, 10)

10 | loss: 407.7536252194835 | mae: 15.586722346130946
20 | loss: 83.2392782476568 | mae: 4.849404419651548
30 | loss: 71.10224922935954 | mae: 3.8808217054231546
40 | loss: 70.41575391812177 | mae: 3.7501823824164204
50 | loss: 70.49019070453042 | mae: 3.7645359834704584


In [89]:
sample_one.get_coef()

array([57.07398193, 35.0790659 ,  0.14585922, 63.40005449, -0.2202788 ])

In [90]:
sample_one.get_best_score()

3.7645359834704584