In [1]:
import pandas
import numpy
import sklearn
import warnings

from dataclasses import dataclass
from sklearn.linear_model import LinearRegression, Ridge, Lasso
from sklearn.model_selection import train_test_split, GridSearchCV
from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score, make_scorer
from sklearn.preprocessing import StandardScaler
from sklearn.ensemble import RandomForestRegressor
from sklearn.exceptions import DataConversionWarning
from typing import Protocol, Callable
from enum import Enum

In [2]:
double_dim_converter: Callable[[numpy.ndarray], numpy.ndarray] = lambda x: x.reshape(-1, 1) if x.ndim == 1 else x

In [3]:
@dataclass
class Metrics:
    "class that represents each model test error metrics"
    mse: float
    mae: float
    r2_score: float

class Model(Protocol):

    def fit(self, x: numpy.ndarray, y: numpy.ndarray) -> None:
        "represents fitting the model"
        raise NotImplementedError

    def transform(self, x: numpy.ndarray) -> numpy.ndarray:
        "represents the transform function"
        raise NotImplementedError

    def fit_transform(self, x: numpy.ndarray, y: numpy.ndarray) -> numpy.ndarray:
        "represents the fit and transform function"
        raise NotImplementedError
        

class ModelTypes(Enum):
    LINEAR = "Linear"
    RANDOM_FORREST = "Random Forrest"
    RIDGE = "Ridge"
    LASSO = "Lasso"

class DataTransformer(Protocol):
    "Basic representation of the dataset normalizer"

    def fit(self, data: numpy.ndarray) -> None:
        "fit the transformer model"
        raise NotImplementedError

    def fit_transform(self, data: numpy.ndarray) -> numpy.ndarray:
        "fit the transformer model and return the normalized dataset"
        raise NotImplementedError

    def transform(self, data: numpy.ndarray) -> numpy.ndarray:
        "transforms the input data"
        raise NotImplementedError
    
    def inv_transform(self, data: numpy.ndarray) -> numpy.ndarray:
        "inversely transforms the data"
        raise NotImplementedError

class STDScaler:

    def __init__(self) -> None:
        self._transformer: sklearn.preprocessing._data.StandardScaler = StandardScaler()

    def fit(self, data: numpy.ndarray) -> None:
        self._transformer.fit(data)

    def fit_transform(self, data: numpy.ndarray) -> numpy.ndarray:
        return self._transformer.fit_transform(data)

    def transform(self, data: numpy.ndarray) -> numpy.ndarray:
        return self._transformer.transform(data)
    
    def inv_transform(self, data: numpy.ndarray) -> numpy.ndarray:
        return self._transformer.inverse_transform(data)

class ModelPipeline(Protocol):
    "represents an end to end pipeline of a model"
    feature_scaler: DataTransformer
    target_scaler: DataTransformer 

    def fit(self, feature: numpy.ndarray, target: numpy.ndarray, hparams: dict[str, int|float]=dict()) -> None:
        "fits the pipeline"
        raise NotImplementedError
    
    def forward(self, feature: numpy.ndarray) -> numpy.ndarray:
        "calculates the forward path"
        raise NotImplementedError

class LinearModel(ModelPipeline):

    def __init__(self) -> None:
        self._model: Model
        self._pipeline: Callable[[numpy.ndarray], numpy.ndarray] 
    
    def fit(self, feature: numpy.ndarray, target: numpy.ndarray, hparams: dict[str, int|float]=dict()) -> None:
        self._model = LinearRegression()
        self._model.fit(self.feature_scaler.transform(feature), self.target_scaler.transform(target))
        self._pipeline = lambda x: self.target_scaler.inv_transform(double_dim_converter(self._model.predict(self.feature_scaler.transform(x))))
    
    def forward(self, feature: numpy.ndarray) -> numpy.ndarray:
        return self._pipeline(feature)

class RandomForrestModel(ModelPipeline):

    def __init__(self) -> None:
        self._model: Model
        self._pipeline: Callable[[numpy.ndarray], numpy.ndarray] 
    
    def fit(self, feature: numpy.ndarray, target: numpy.ndarray, hparams: dict[str, int|float]=dict()) -> None:
        self._model = RandomForestRegressor
        grid_search = GridSearchCV(self._model(), hparams, verbose=3)
        grid_search.fit(self.feature_scaler.transform(feature), self.target_scaler.transform(target))
        self._model = grid_search.best_estimator_
        self._pipeline = lambda x: self.target_scaler.inv_transform(double_dim_converter(self._model.predict(self.feature_scaler.transform(x))))
    
    def forward(self, feature: numpy.ndarray) -> numpy.ndarray:
        return self._pipeline(feature)

class RidgeModel(ModelPipeline):

    def __init__(self) -> None:
        self._model: Model
        self._pipeline: Callable[[numpy.ndarray], numpy.ndarray] 
    
    def fit(self, feature: numpy.ndarray, target: numpy.ndarray, hparams: dict[str, int|float]=dict()) -> None:
        self._model = Ridge
        grid_search = GridSearchCV(self._model(), hparams)
        grid_search.fit(self.feature_scaler.transform(feature), self.target_scaler.transform(target))
        self._model = grid_search.best_estimator_
        self._pipeline = lambda x: self.target_scaler.inv_transform(double_dim_converter(self._model.predict(self.feature_scaler.transform(x))))
    
    def forward(self, feature: numpy.ndarray) -> numpy.ndarray:
        return self._pipeline(feature)

class LassoModel(ModelPipeline):

    def __init__(self) -> None:
        self._model: Model
        self._pipeline: Callable[[numpy.ndarray], numpy.ndarray] 
    
    def fit(self, feature: numpy.ndarray, target: numpy.ndarray, hparams: dict[str, int|float]=dict()) -> None:
        self._model = Lasso
        grid_search = GridSearchCV(self._model(), hparams)
        grid_search.fit(self.feature_scaler.transform(feature), self.target_scaler.transform(target))
        self._model = grid_search.best_estimator_
        self._pipeline = lambda x: self.target_scaler.inv_transform(double_dim_converter(self._model.predict(self.feature_scaler.transform(x))))
    
    def forward(self, feature: numpy.ndarray) -> numpy.ndarray:
        return self._pipeline(feature)

In [4]:
warnings.filterwarnings("ignore", category=DataConversionWarning)

DS_PATH = "1.xlsx"
TARGET = "Vs"

TEST_SIZE = 0.2
SEED = 0
RANDOM_FORREST_HPARAMS = {"max_depth": list(range(1, 20))}
RIDGE_HPARAMS = {"alpha":list(numpy.arange(0.1, 2.0, 0.1))}
LASSO_HPARAMS = {"alpha":list(numpy.arange(0.1, 2.0, 0.1))}

with open(DS_PATH, "rb") as afile:
    df = pandas.read_excel(afile)

In [5]:
def calculate_err_metrics(y_true: numpy.ndarray, y_pred: numpy.ndarray):
    mse = mean_squared_error(y_true, y_pred)
    mae = mean_absolute_error(y_true, y_pred)
    r2_metric = r2_score(y_true, y_pred)
    return Metrics(mse, mae, r2_metric)

def data_train_test_split(df: pandas.core.frame.DataFrame, test_size: float, seed: int) -> tuple[numpy.ndarray, numpy.ndarray, numpy.ndarray, numpy.ndarray]:

    X = double_dim_converter(df.drop(TARGET, axis=1).to_numpy())
    y = double_dim_converter(df[TARGET].to_numpy())
    return train_test_split(X, y, test_size=test_size, shuffle=True, random_state=seed)

In [6]:
train_features, test_features, train_targets, test_targets = data_train_test_split(df, TEST_SIZE, SEED)

feature_scaler = STDScaler()
target_scaler = STDScaler()

feature_scaler.fit(train_features)
target_scaler.fit(train_targets)


ModelPipeline.feature_scaler = feature_scaler
ModelPipeline.target_scaler = target_scaler

In [7]:
model_metrics: dict[ModelTypes, Metrics] = {}

model_type = ModelTypes.LINEAR
print(f"fitting the {model_type.value} model...")
linear_pipeline = LinearModel()
linear_pipeline.fit(train_features, train_targets)
model_metrics[ModelTypes.LINEAR] = calculate_err_metrics(test_targets, linear_pipeline.forward(test_features))

model_type = ModelTypes.RANDOM_FORREST
print(f"fitting the {model_type.value} model...")
random_forrest_pipeline = RandomForrestModel()
random_forrest_pipeline.fit(train_features, train_targets, RANDOM_FORREST_HPARAMS)
model_metrics[model_type] = calculate_err_metrics(test_targets, random_forrest_pipeline.forward(test_features))

model_type = ModelTypes.RIDGE
print(f"fitting the {model_type.value} model...")
ridge_pipeline = RidgeModel()
ridge_pipeline.fit(train_features, train_targets, RIDGE_HPARAMS)
model_metrics[model_type] = calculate_err_metrics(test_targets, ridge_pipeline.forward(test_features))

model_type = ModelTypes.LASSO
print(f"fitting the {model_type.value} model...")
lasso_pipeline = LassoModel()
lasso_pipeline.fit(train_features, train_targets, LASSO_HPARAMS)
model_metrics[model_type] = calculate_err_metrics(test_targets, lasso_pipeline.forward(test_features))

print(model_metrics)

fitting the Linear model...
fitting the Random Forrest model...
Fitting 5 folds for each of 19 candidates, totalling 95 fits
[CV 1/5] END .......................max_depth=1;, score=0.684 total time=   0.4s
[CV 2/5] END .......................max_depth=1;, score=0.666 total time=   0.3s
[CV 3/5] END .......................max_depth=1;, score=0.668 total time=   0.2s
[CV 4/5] END .......................max_depth=1;, score=0.690 total time=   0.2s
[CV 5/5] END .......................max_depth=1;, score=0.667 total time=   0.2s
[CV 1/5] END .......................max_depth=2;, score=0.923 total time=   0.3s
[CV 2/5] END .......................max_depth=2;, score=0.917 total time=   0.3s
[CV 3/5] END .......................max_depth=2;, score=0.908 total time=   0.4s
[CV 4/5] END .......................max_depth=2;, score=0.933 total time=   0.3s
[CV 5/5] END .......................max_depth=2;, score=0.912 total time=   0.3s
[CV 1/5] END .......................max_depth=3;, score=0.987 tot