In [None]:
from abc import ABC, abstractmethod
from typing import Literal

import numpy as np
import pandas as pd
from numpy.typing import NDArray
from sklearn.model_selection import KFold, train_test_split

In [3]:
# np.number is a base class for all integers, floating numbers in numpy
# In Python 3.12, type keyword wass introduced to replace typing.TypeAlias
# read more about this https://docs.astral.sh/ruff/rules/non-pep695-type-alias/
type Vector = NDArray[np.integer | np.floating]
type Matrix = NDArray[np.integer | np.floating]
type Scalar = int | float | np.integer | np.floating

# These are old alternatives
# Vector: TypeAlias = NDArray[np.integer | np.floating]
# Matric: TypeAlias = NDArray[np.integer | np.floating]
# Scalar: TypeAlias = int | float | np.integer | np.floating

In [4]:
class BasePenalty(ABC):
    @abstractmethod
    def __call__(self, theta: Vector) -> Vector:
        pass

    @abstractmethod
    def derivation(self, theta: Vector) -> Vector:
        pass

## Lasso Penalty
$$
\text{L1} = \lambda\sum_{j=1}^{n}|\theta_{j}|

## Ridge Penalty
$$
\text{L2} = \lambda\sum_{j=1}^{n}\theta_{j}^2

In [263]:
class LassoPenalty(BasePenalty):
    def __init__(self, l: Scalar) -> None:
        self.l = l  # lambda value

    # __call__ allows call class as method
    def __call__(self, theta: Vector) -> Scalar:
        return self.l * np.sum(np.abs(theta))

    def derivation(self, theta: Vector) -> Vector:
        return self.l * np.sign(theta)


class RidgePenalty(BasePenalty):
    def __init__(self, l: Scalar) -> None:
        self.l = l

    def __call__(self, theta: Vector) -> Scalar:
        mask = np.ones_like(theta)
        mask[0] = 0.0
        return self.l * np.sum(np.square(theta * mask))

    def derivation(self, theta: Vector) -> Vector:
        mask = np.ones_like(theta)
        mask[0] = 0.0
        return self.l * 2 * (theta * mask)


class ElasticPenalty(BasePenalty):
    def __init__(self, l: Scalar = 0.1, l_ratio: Scalar = 0.5) -> None:
        self.l = l
        self.l_ratio = l_ratio

    def __call__(self, theta: Vector) -> Scalar:
        l1_contribution = self.l_ratio * LassoPenalty(1)(theta)
        l2_contribution = (1 - self.l_ratio) * RidgePenalty(1)(theta)
        return self.l * (l1_contribution + l2_contribution)

    def derivation(self, theta: Vector) -> Vector:
        l1_derivation = self.l_ratio * LassoPenalty(1).derivation(theta)
        l2_derivation = (1 - self.l_ratio) * RidgePenalty(1).derivation(theta)
        return self.l * (l1_derivation + l2_derivation)

In [6]:
l = 6
lasso = LassoPenalty(l)
arr = np.array([1, 2, 3, 4])
print(type(arr))
lasso(arr)

<class 'numpy.ndarray'>


np.int64(60)

In [264]:
type TrainMethod = Literal["batch", "mini", "sto"]
type WeightInitMethod = Literal["zero", "xavier"]


class LinearRegression:
    kfold = KFold(n_splits=3)

    def __init__(  # noqa: PLR0913
        self,
        regularization: BasePenalty,
        lr: Scalar = 0.001,
        train_method: TrainMethod = "batch",
        weight_init_method: WeightInitMethod = "zero",
        num_epochs: int = 500,
        batch_size: int = 50,
        cv: KFold = kfold,
    ) -> None:
        self.lr = lr
        self.num_epochs = num_epochs
        self.batch_size = batch_size
        self.train_method = train_method
        self.weight_init_method = weight_init_method
        self.cv = cv
        self.regularization = regularization
        self.training_history = {}

    def mse(self, ytrue: Vector, ypred: Vector) -> Scalar:
        return ((ypred - ytrue) ** 2).sum() / ytrue.shape[0]

    def r_squared(self, ytrue: Vector, ypred: Vector) -> Scalar:
        # Formula is 1 - SSres / SStot
        # SSres = Sum of squared residual
        # SStot = Sum of squared totsl (compared to the mean or ybar)
        return 1 - np.sum(np.square(ytrue - ypred)) / np.sum(np.square(ytrue - ytrue.mean()))

    def xavier_weight_init(self, m: int, n: int) -> Vector:
        # Since this method use number of samples to calculate a new uniform range,
        # it requires both m and n to initialize theta (n, )
        upper = 1 / np.sqrt(m)
        lower = -(upper)
        numbers = np.random.rand(n)
        # The scaled output is calculated by
        # 1. streching U(0, 1) into U(0, upper - lower)
        #    with upper - lower = length between lower and upper
        # 2. Add lower to shift all the numbers back to U(lower, upper)
        return lower + numbers * (upper - lower)

    def fit(self, X_train: Matrix, y_train: Vector) -> None:  # noqa: C901
        # create a list of kfold scores
        self.kfold_scores = []

        # reset val loss
        self.val_loss_old = np.inf

        # kfold.split in the sklearn.....
        # 5 splits
        for fold, (train_idx, val_idx) in enumerate(self.cv.split(X_train)):
            X_cross_train = X_train[train_idx]
            y_cross_train = y_train[train_idx]
            X_cross_train = np.hstack([np.ones((X_cross_train.shape[0], 1)), X_cross_train])
            # add bias column to validation set as well
            X_cross_val = X_train[val_idx]
            X_cross_val = np.hstack([np.ones((X_cross_val.shape[0], 1)), X_cross_val])
            y_cross_val = y_train[val_idx]

            if self.weight_init_method == "zero":
                self.theta = np.zeros(X_cross_train.shape[1])
            elif self.weight_init_method == "xavier":
                self.theta = self.xavier_weight_init(X_cross_train.shape[0], X_cross_train.shape[1])

            # define X_cross_train as only a subset of the data
            # how big is this subset?  => mini-batch size ==> 50

            # one epoch will exhaust the WHOLE training set
            # with mlflow.start_run(run_name=f"Fold-{fold}", nested=True):
            self.training_history[f"Fold-{fold}"] = {}
            params = {
                "method": self.train_method,
                "lr": self.lr,
                "reg": type(self).__name__,
            }
            self.training_history[f"Fold-{fold}"]["params"] = params
            self.training_history[f"Fold-{fold}"]["epoch"] = []
            # mlflow.log_params(params=params)

            for epoch in range(self.num_epochs):
                # with replacement or no replacement
                # with replacement means just randomize
                # with no replacement means 0:50, 51:100, 101:150, ......300:323
                # shuffle your index
                # https://docs.astral.sh/ruff/rules/numpy-legacy-random/
                perm = np.random.permutation(X_cross_train.shape[0])

                X_cross_train = X_cross_train[perm]
                y_cross_train = y_cross_train[perm]

                if self.train_method == "sto":
                    for batch_idx in range(X_cross_train.shape[0]):
                        # (11,) ==> (1, 11) ==> (m, n)
                        X_method_train = X_cross_train[batch_idx].reshape(1, -1)
                        y_method_train = y_cross_train[batch_idx]
                        train_loss = self._train(X_method_train, y_method_train)
                elif self.train_method == "mini":
                    for batch_idx in range(0, X_cross_train.shape[0], self.batch_size):
                        # batch_idx = 0, 50, 100, 150
                        X_method_train = X_cross_train[batch_idx : batch_idx + self.batch_size, :]
                        y_method_train = y_cross_train[batch_idx : batch_idx + self.batch_size]
                        train_loss = self._train(X_method_train, y_method_train)
                elif self.train_method == "batch":
                    X_method_train = X_cross_train
                    y_method_train = y_cross_train
                    train_loss = self._train(X_method_train, y_method_train)
                else:
                    print("Error: Please select correct training method.")

                # mlflow.log_metric(key="train_loss", value=train_loss, step=epoch)

                yhat_val = self.predict(X_cross_val)
                val_loss_new = self.mse(y_cross_val, yhat_val)
                # mlflow.log_metric(key="val_loss", value=val_loss_new, step=epoch)
                self.training_history[f"Fold-{fold}"][epoch] = {
                    "train_loss": train_loss,
                    "val_loss": val_loss_new,
                }

                # early stopping
                if np.allclose(val_loss_new, self.val_loss_old):
                    break
                self.val_loss_old = val_loss_new

            self.kfold_scores.append(val_loss_new)
            print(f"Fold {fold}: {val_loss_new}")

    def _train(self, X: Matrix, y: Vector) -> Scalar:
        yhat = self.predict(X)
        m = X.shape[0]
        grad_loss = (2 / m) * X.T @ (yhat - y)
        grad_penalty = self.regularization.derivation(self.theta) / m
        print("Gradient Data\t\t", grad_loss[:4])
        print("Gradient Penalty\t", grad_penalty[:4])
        grad = grad_loss + grad_penalty
        step = self.lr * grad
        self.theta = self.theta - step
        # self.theta = self.theta - step + self.momentum * self.prev_step
        # self.prev_step = step
        return self.mse(y, yhat)

    def predict(self, X: Matrix) -> Vector:
        return X @ self.theta  # ===>(m, n) @ (n, )

    def _coef(self) -> Vector:
        return self.theta[1:]  # remind that theta is (w0, w1, w2, w3, w4.....wn)
        # w0 is the bias or the intercept
        # w1....wn are the weights / coefficients / theta

    def _bias(self) -> Scalar:
        return self.theta[0]


class Lasso(LinearRegression):
    def __init__(
        self,
        train_method: TrainMethod,
        weight_init_method: WeightInitMethod,
        lr: Scalar,
        l: Scalar,
    ) -> None:
        self.regularization = LassoPenalty(l)
        super().__init__(self.regularization, lr, train_method, weight_init_method)


class Ridge(LinearRegression):
    def __init__(
        self,
        train_method: TrainMethod,
        weight_init_method: WeightInitMethod,
        lr: Scalar,
        l: Scalar,
    ) -> None:
        self.regularization = RidgePenalty(l)
        super().__init__(self.regularization, lr, train_method, weight_init_method)


class ElasticNet(LinearRegression):
    def __init__(
        self,
        train_method: TrainMethod,
        weight_init_method: WeightInitMethod,
        lr: Scalar,
        l: Scalar,
        l_ratio: Scalar = 0.5,
    ) -> None:
        self.regularization = ElasticPenalty(l, l_ratio)
        super().__init__(self.regularization, lr, train_method, weight_init_method)

In [253]:
df = pd.read_csv("Cars.csv")
df_train, df_test = train_test_split(df, random_state=42, test_size=0.2)

In [254]:
df_train.head()

Unnamed: 0,name,year,selling_price,km_driven,fuel,seller_type,transmission,owner,mileage,engine,max_power,torque,seats
6518,Tata Tiago NRG Petrol AMT,2019,520000,2560,Petrol,Individual,Automatic,First Owner,24.0 kmpl,1199 CC,83.81 bhp,114Nm@ 3500rpm,5.0
6144,Honda Brio S MT,2013,300000,80000,Petrol,Individual,Manual,Second Owner,19.4 kmpl,1198 CC,86.8 bhp,109Nm@ 4500rpm,5.0
6381,Hyundai i20 1.4 CRDi Asta,2011,380000,150000,Diesel,Individual,Manual,Fourth & Above Owner,23.0 kmpl,1396 CC,90 bhp,22.4 kgm at 1750-2750rpm,5.0
438,Maruti Swift Dzire VDI,2013,530000,120000,Diesel,Individual,Manual,Second Owner,23.4 kmpl,1248 CC,74 bhp,190Nm@ 2000rpm,5.0
5939,Maruti Alto K10 VXI,2017,335000,25000,Petrol,Individual,Manual,First Owner,23.95 kmpl,998 CC,67.05 bhp,90Nm@ 3500rpm,5.0


In [122]:
def data_cleaning(input_df: pd.DataFrame) -> pd.DataFrame:
    df = input_df.copy()

    # Drop unused columns first to reduce size
    df = df.drop(columns=["torque"])

    # Drop rows that will definitely not be used
    df = df[~df["fuel"].isin(["CNG", "LPG"])]
    df = df[df["owner"] != "Test Drive Car"]

    # Remove Units
    df["mileage"] = df["mileage"].str.split(" ").str[0]
    df["engine"] = df["engine"].str.split(" ").str[0]
    df["max_power"] = df["max_power"].str.split(" ").str[0]

    # Numeric Type conversion
    df["mileage"] = df["mileage"].astype(float)
    df["engine"] = df["engine"].astype(float)
    df["max_power"] = df["max_power"].astype(float)

    # Fix Brand column
    df = df.rename(columns={"name": "brand"})
    df["brand"] = df["brand"].str.split(" ").str[0]

    return df

In [255]:
df_train = data_cleaning(df_train)
df_test = data_cleaning(df_test)

In [256]:
df_train.head()


Unnamed: 0,brand,year,selling_price,km_driven,fuel,seller_type,transmission,owner,mileage,engine,max_power,seats
6518,Tata,2019,520000,2560,Petrol,Individual,Automatic,First Owner,24.0,1199.0,83.81,5.0
6144,Honda,2013,300000,80000,Petrol,Individual,Manual,Second Owner,19.4,1198.0,86.8,5.0
6381,Hyundai,2011,380000,150000,Diesel,Individual,Manual,Fourth & Above Owner,23.0,1396.0,90.0,5.0
438,Maruti,2013,530000,120000,Diesel,Individual,Manual,Second Owner,23.4,1248.0,74.0,5.0
5939,Maruti,2017,335000,25000,Petrol,Individual,Manual,First Owner,23.95,998.0,67.05,5.0


In [257]:
df_test.head()

Unnamed: 0,brand,year,selling_price,km_driven,fuel,seller_type,transmission,owner,mileage,engine,max_power,seats
1971,Honda,2004,198000,110000,Petrol,Individual,Manual,Third Owner,12.8,1493.0,100.0,5.0
4664,Tata,2014,500000,291977,Diesel,Individual,Manual,First Owner,14.0,2179.0,138.1,7.0
5448,Maruti,2016,425000,70000,Diesel,Individual,Manual,First Owner,23.2,1248.0,73.94,5.0
3333,Honda,2006,150000,120000,Petrol,Individual,Manual,Second Owner,16.9,1497.0,100.0,5.0
2316,Maruti,2013,525000,69000,Diesel,Individual,Manual,Second Owner,22.9,1248.0,74.0,5.0


In [258]:
X_train = df_train.drop(columns=["selling_price"])
X_test = df_test.drop(columns=["selling_price"])
y_train = df_train["selling_price"]
y_test = df_test["selling_price"]

In [259]:
from sklearn.compose import ColumnTransformer
from sklearn.impute import SimpleImputer
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import OneHotEncoder, OrdinalEncoder, StandardScaler

# Define transformers for each feature group
brand_le = (
    "brand_le",
    Pipeline(
        [
            # Pre-imputer for handling missing values before encoding
            ("pre_imputer", SimpleImputer(strategy="most_frequent")),
            ("encoder", OrdinalEncoder(handle_unknown="use_encoded_value", unknown_value=np.nan)),
            ("post_imputer", SimpleImputer(strategy="most_frequent")),
            # Post-imputer for handling any NaN introduced by unknown categories during encoding
            # So the pipeline is robust to unseen categories in production data
            # This won't happen if frontend limits the choices to existing categories only
            # But it's better to be safe than sorry
        ],
    ),
    ["brand"],
)

fuel_le = (
    "fuel_le",
    Pipeline(
        [
            ("pre_imputer", SimpleImputer(strategy="most_frequent")),
            ("encoder", OrdinalEncoder(handle_unknown="use_encoded_value", unknown_value=np.nan)),
            ("post_imputer", SimpleImputer(strategy="most_frequent")),
        ],
    ),
    ["fuel"],
)

transmission_le = (
    "transmission_le",
    Pipeline(
        [
            ("pre_imputer", SimpleImputer(strategy="most_frequent")),
            ("encoder", OrdinalEncoder(handle_unknown="use_encoded_value", unknown_value=np.nan)),
            ("post_imputer", SimpleImputer(strategy="most_frequent")),
        ],
    ),
    ["transmission"],
)

owner_le = (
    "owner_le",
    Pipeline(
        [
            ("pre_imputer", SimpleImputer(strategy="most_frequent")),
            (
                "encoder",
                OrdinalEncoder(
                    categories=[
                        [
                            "First Owner",
                            "Second Owner",
                            "Third Owner",
                            "Fourth & Above Owner",
                        ],
                    ],
                    handle_unknown="use_encoded_value",
                    unknown_value=np.nan,
                ),
            ),
            ("post_imputer", SimpleImputer(strategy="most_frequent")),
        ],
    ),
    ["owner"],
)

seller_type_ohe = (
    "seller_type_ohe",
    Pipeline(
        [
            ("imputer", SimpleImputer(strategy="most_frequent")),
            ("encoder", OneHotEncoder(sparse_output=False, drop="first", handle_unknown="ignore")),
        ],
    ),
    ["seller_type"],
)

num_scaler = (
    "num_scaler",
    Pipeline(
        [
            (
                "imputer",
                ColumnTransformer(
                    transformers=[
                        ("year_imputer", SimpleImputer(strategy="median"), ["year"]),
                        ("km_imputer", SimpleImputer(strategy="median"), ["km_driven"]),
                        ("mileage_imputer", SimpleImputer(strategy="mean"), ["mileage"]),
                        ("engine_imputer", SimpleImputer(strategy="median"), ["engine"]),
                        ("max_power_imputer", SimpleImputer(strategy="median"), ["max_power"]),
                        ("seats_imputer", SimpleImputer(strategy="most_frequent"), ["seats"]),
                    ],
                    remainder="passthrough",
                ),
            ),
            ("scaler", StandardScaler()),
        ],
    ),
    ["year", "km_driven", "mileage", "engine", "max_power", "seats"],
)

# Combine all transformers into the ColumnTransformer
preprocessor = ColumnTransformer(
    transformers=[
        brand_le,
        fuel_le,
        transmission_le,
        owner_le,
        seller_type_ohe,
        num_scaler,
    ],
    remainder="passthrough",
    verbose_feature_names_out=True,
)

In [260]:
# Fit and transform the training data, transform the test data
X_train = preprocessor.fit_transform(X_train)
X_test = preprocessor.transform(X_test)


In [261]:
y_train = y_train.to_numpy()
y_test = y_test.to_numpy()

In [323]:
en = Ridge("batch", "xavier", 0.001, 0.1)
en.fit(X_train, y_train)

Gradient Data		 [ -1275360.00849662 -22234551.5944653    -398986.34848069
   -791175.92110441]
Gradient Penalty	 [0.00000000e+00 2.27020035e-07 6.73904696e-07 2.17432225e-07]
Gradient Data		 [ -465727.06905044 -5181723.71061854   -56520.06981662   -84360.74963571]
Gradient Penalty	 [0.         1.03899797 0.0186449  0.03697105]
Gradient Data		 [ -277107.16164492 -1209144.68627933    22787.60973604    79717.13668954]
Gradient Penalty	 [0.         1.28113454 0.02128602 0.04091314]
Gradient Data		 [-233153.9712788  -283657.33457241   40794.19071018  117364.98495863]
Gradient Penalty	 [0.         1.33763657 0.02022118 0.03718804]
Gradient Data		 [-222900.90032969  -68001.31828684   44523.09594001  125563.06617723]
Gradient Penalty	 [0.         1.35089152 0.01831491 0.0317037 ]
Gradient Data		 [-220497.88079222  -17703.36831082   44928.58866892  126903.27479092]
Gradient Penalty	 [0.         1.35406909 0.01623439 0.02583626]
Gradient Data		 [-219923.048759     -5926.46175036   44562.49073501

In [324]:
en.theta

array([ 104571.10391115,   24604.22351263,   13167.48225756,
         -9262.98511291,  -15577.78303082,   12646.81837742,
          1288.89812506,  157265.16610932, -110380.44282706,
         -4750.6913564 ,  132037.1471968 ,  322219.52926869,
        -56944.19701341])

In [326]:
y_pred = en.predict(np.hstack([np.ones((X_test.shape[0], 1)), X_test]))
y_pred

array([  -3001.54451607,  910172.00174891,  487893.22874702, ...,
        292457.25569611, 1084523.63499252,  -57823.57775088],
      shape=(1607,))

In [327]:
y_test[:20]

array([ 198000,  500000,  425000,  150000,  525000,  160000,  450000,
        570000,  170000,  135000,  580000,  525000,  100000, 1825000,
        900000,  400000,  135000,  420000,  850000,  385000])

In [320]:
y_pred_exp = np.exp(y_pred)
y_pred_exp[:20]

array([4.03464117e+03, 2.69595827e+06, 2.76554324e+05, 3.55327411e+03,
       3.03257683e+05, 3.42173072e+05, 2.60447687e+04, 7.56257498e+03,
       3.72208520e+05, 1.06859924e+05, 1.55231076e+03, 1.84956522e+05,
       1.59587589e+05, 3.61274804e+06, 3.10815538e+05, 1.79949750e+07,
       3.17376211e+05, 6.51721978e+05, 2.30794302e+04, 8.03537460e+02])

In [329]:
en.mse(y_test, y_pred)

np.float64(304081143292.8809)

In [330]:
en.r_squared(y_test, y_pred)

np.float64(0.5248816807162029)

In [None]:
en.training_history["Fold-0"]

In [None]:
import numpy as np
from sklearn.datasets import load_diabetes
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler

diabetes = load_diabetes()
print("Features: ", diabetes.feature_names)
X = diabetes.data
y = diabetes.target
m = X.shape[0]  # number of samples
n = X.shape[1]  # number of features

X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3)

scaler = StandardScaler()
X_train = scaler.fit_transform(X_train)
X_test = scaler.transform(X_test)

# actually you can do like this too
# X = np.insert(X, 0, 1, axis=1)
intercept = np.ones((X_train.shape[0], 1))
X_train = np.concatenate((intercept, X_train), axis=1)
intercept = np.ones((X_test.shape[0], 1))
X_test = np.concatenate((intercept, X_test), axis=1)

Features:  ['age', 'sex', 'bmi', 'bp', 's1', 's2', 's3', 's4', 's5', 's6']
