## 5.1

In [10]:
import numpy as np
import pandas as pd
from sklearn.preprocessing import StandardScaler, MinMaxScaler
from sklearn.model_selection import train_test_split
from sklearn.linear_model import Ridge as SklearnRidge
from sklearn.metrics import mean_absolute_percentage_error


In [22]:
df = pd.read_csv("../ToyotaCorolla.csv")
df = df.drop(columns=['Id', 'Model', 'Mfg_Month', 'Mfg_Year', 'Cylinders'])
df['Fuel_Type'] = df['Fuel_Type'].astype('category')
df['Color'] = df['Color'].astype('category')
# df['Gears'] = df['Gears'].astype('category')
# df['Doors'] = df['Doors'].astype('category')

df.loc[df['CC'] == 16000, 'CC'] = 1600
df.rename(columns={'Age_08_04': 'Age'}, inplace=True)

for col in df.columns:
    if df[col].nunique() == 2:
        df[col] = df[col].astype('bool')

df['Combined_Color'] = df.apply(
    lambda row:
    f'Met_{row["Color"]}' if row['Met_Color']
    else row['Color'], axis=1
).astype('category')

df['Age^2'] = df['Age']**2
df['KM^2'] = df['KM']**2
df['eAge'] = np.exp(-df['Age'])
df['eKM'] = np.exp(-df['KM'])
df['Weight^2'] = df['Weight'] ** 2

features = ['Sport_Model', 'Fuel_Type', 'CC', 'Gears', 'Weight', 'HP', 'KM', 'KM^2', 'eKM', 'Age', 'Age^2', 'eAge',
            'Quarterly_Tax', 'Guarantee_Period']
extras = ['Automatic', 'ABS', 'Airbag_1', 'Airbag_2', 'Airco', 'Automatic_airco', 'Boardcomputer', 'CD_Player', 'Central_Lock',
          'Powered_Windows', 'Power_Steering', 'Radio', 'Mistlamps', 'Backseat_Divider', 'Metallic_Rim', 'Radio_cassette', 
          'Parking_Assistant', 'Tow_Bar', 'Combined_Color', 'Mfr_Guarantee', 'BOVAG_Guarantee']
features.extend(extras)

cat_features = list(set(df.select_dtypes('category').columns).intersection(set(features)))
num_features = list(set(df.select_dtypes('number').columns).intersection(set(features)))
bool_features = list(set(df.select_dtypes('bool').columns).intersection(set(features)))

X = df[num_features].copy()
y = df['Price']

# X = pd.get_dummies(X)
# X = pd.DataFrame(MinMaxScaler().fit_transform(X), columns=X.columns)

X_train, X_test, y_train, y_test = train_test_split(X , y, shuffle=True, random_state=42, test_size=0.2)

In [23]:
X.describe().T

Unnamed: 0,count,mean,std,min,25%,50%,75%,max
Age,1436.0,55.94708,18.59999,1.0,44.0,61.0,70.0,80.0
Guarantee_Period,1436.0,3.81546,3.011025,3.0,3.0,3.0,3.0,36.0
Age^2,1436.0,3475.794,1836.373,1.0,1936.0,3721.0,4900.0,6400.0
eKM,1436.0,0.002049468,0.02739122,0.0,0.0,0.0,0.0,0.3678794
CC,1436.0,1566.828,187.1824,1300.0,1400.0,1600.0,1600.0,2000.0
HP,1436.0,101.5021,14.98108,69.0,90.0,110.0,110.0,192.0
KM^2,1436.0,6102562000.0,6954814000.0,1.0,1849000000.0,4018232000.0,7572612000.0,59049000000.0
Gears,1436.0,5.026462,0.1885104,3.0,5.0,5.0,5.0,6.0
Weight,1436.0,1072.46,52.64112,1000.0,1040.0,1070.0,1085.0,1615.0
KM,1436.0,68533.26,37506.45,1.0,43000.0,63389.5,87020.75,243000.0


In [24]:
class LassoCoordinateDescent:
    def __init__(self, lambda_=0.1, max_iter=1000, tol=1e-4, scaler=None):
        self.lambda_ = lambda_
        self.max_iter = max_iter
        self.tol = tol
        self.coef_ = None
        if scaler == None:
            self.scaler_ = StandardScaler()
        else:
            self.scaler_ = scaler
    
    def soft_thresholding(self, rho, lambda_):
        if rho < -lambda_:
            return rho + lambda_
        elif rho > lambda_:
            return rho - lambda_
        else:
            return 0
    
    def fit(self, X, y):
        # Standardize the features
        # self.scaler_ = StandardScaler()
        X_scaled = self.scaler_.fit_transform(X)
        
        # Initialize coefficients
        n, d = X_scaled.shape
        self.coef_ = np.zeros(d)
        
        for iteration in range(self.max_iter):
            coef_old = self.coef_.copy()
            
            for j in range(d):
                X_j = X_scaled[:, j]
                residual = y - X_scaled @ self.coef_ + self.coef_[j] * X_j
                rho = X_j.T @ residual
                
                # Update coefficient for feature j using soft-thresholding
                self.coef_[j] = self.soft_thresholding(rho / (X_j.T @ X_j), self.lambda_)
            
            # Check for convergence
            if np.max(np.abs(self.coef_ - coef_old)) < self.tol:
                print(f'Converged after {iteration} iterations.')
                break
    
    def predict(self, X):
        # Standardize the test data using the scaler fitted on training data
        X_scaled = self.scaler_.transform(X)
        
        # Compute predictions
        return X_scaled @ self.coef_

    def mean_absolute_percentage_error(self, y_true, y_pred):
        """
        Compute the Mean Absolute Percentage Error (MAPE)
        """
        return np.mean(np.abs((y_true - y_pred) / y_true)) * 100

    def score(self, X_test, y_test):
        """
        Compute the MAPE on the test set
        """
        y_pred = self.predict(X_test)
        return self.mean_absolute_percentage_error(y_test, y_pred)

# Create a Lasso model
lambda_ = 0.1  # Regularization strength
# The difference between MinMax and Standard is insane
lasso = LassoCoordinateDescent(lambda_=lambda_, scaler=MinMaxScaler())

# Fit the model to the training data
lasso.fit(X_train, y_train)

# Predict on the test data
y_pred = lasso.predict(X_test)
#print('Predicted prices:', y_pred)

# Evaluate the model using MAPE
mape = lasso.score(X_test, y_test)
print('Mean Absolute Percentage Error on test set:', mape)

Mean Absolute Percentage Error on test set: 10.66361550830001


In [26]:
class RidgeRegression:
    def __init__(self, lambda_=1.0, scaler=None):
        self.lambda_ = lambda_  # Regularization strength
        self.coef_ = None
        if scaler == None:
            self.scaler_ = StandardScaler()
        else:
            self.scaler_ = scaler
    
    def fit(self, X, y):
        # Standardize the features
        #self.scaler_ = StandardScaler()
        X_scaled = self.scaler_.fit_transform(X)
        
        # Solve the normal equation (X'X + lambda*I) * beta = X'y
        n, d = X_scaled.shape
        I = np.eye(d)  # Identity matrix of size d
        XTX = X_scaled.T @ X_scaled
        XTy = X_scaled.T @ y
        
        # Solve for beta (ridge coefficients)
        self.coef_ = np.linalg.solve(XTX + self.lambda_ * I, XTy)
    
    def predict(self, X):
        # Standardize the test data using the same scaler as training data
        X_scaled = self.scaler_.transform(X)
        
        # Compute predictions
        return X_scaled @ self.coef_

    def score(self, X_test, y_test):
        """
        Compute the Mean Squared Error on the test set
        """
        y_pred = self.predict(X_test)
        return np.mean((y_test - y_pred) ** 2)

    def mean_absolute_percentage_error(self, y_true, y_pred):
        """
        Compute the Mean Absolute Percentage Error (MAPE)
        """
        return np.mean(np.abs((y_true - y_pred) / y_true)) * 100

# Create a Ridge regression model
ridge = RidgeRegression(lambda_=1.0, scaler=MinMaxScaler())

# Fit the model
ridge.fit(X_train, y_train)

# Predict on the test set
y_pred = ridge.predict(X_test)
#print('Predicted prices:', y_pred)

# Evaluate the model
mse = ridge.score(X_test, y_test)
print('Mean Squared Error on test set:', mse)

# Evaluate using MAPE
mape = ridge.mean_absolute_percentage_error(y_test, y_pred)
print('Mean Absolute Percentage Error on test set:', mape)

Mean Squared Error on test set: 3151555.363609479
Mean Absolute Percentage Error on test set: 10.619804484642712


In [27]:
# Custom Ridge Regression implementation
ridge_custom = RidgeRegression(lambda_=1.0, scaler=MinMaxScaler())
ridge_custom.fit(X_train, y_train)
beta_custom = ridge_custom.coef_

# Sklearn Ridge Regression
ridge_sklearn = SklearnRidge(alpha=1.0, fit_intercept=False)
scaler = MinMaxScaler()
X_train_scaled = scaler.fit_transform(X_train)
ridge_sklearn.fit(X_train_scaled, y_train)
beta_sklearn = ridge_sklearn.coef_

# Compare coefficients
print('Custom Ridge Coefficients:\n', beta_custom)
print('Sklearn Ridge Coefficients:\n', beta_sklearn)

Custom Ridge Coefficients:
 [-2507.27129255  3433.75862619 -3502.07202551  -395.37711319
 -2888.56095915  6057.60499375 -2386.8805472  16839.40370955
 14402.74411779 -2723.94476838  2041.25236416  4396.88171115]
Sklearn Ridge Coefficients:
 [-2507.27129255  3433.75862619 -3502.07202551  -395.37711319
 -2888.56095915  6057.60499375 -2386.8805472  16839.40370955
 14402.74411779 -2723.94476838  2041.25236416  4396.88171115]


In [30]:
# Predictions and comparison
y_pred_custom = ridge_custom.predict(X_test)
y_pred_sklearn = ridge_sklearn.predict(scaler.transform(X_test))

# Compare predictions
# print('Custom Ridge Predictions:\n', y_pred_custom)
# print('Sklearn Ridge Predictions:\n', y_pred_sklearn)

# Evaluate both models (MSE)
mape_custom = mean_absolute_percentage_error(y_test, y_pred_custom)
mape_sklearn = mean_absolute_percentage_error(y_test, y_pred_sklearn)

print('Custom Ridge Mape:', mape_custom)
print('Sklearn Ridge Mape:', mape_sklearn)

Custom Ridge Mape: 0.10619804484642713
Sklearn Ridge Mape: 0.10619804484642725


In [41]:
class RidgeGradientDescent:
    def __init__(self, lambda_=1.0, alpha=0.01, max_iter=1000, tol=1e-6, scaler=None):
        self.lambda_ = lambda_  # Regularization strength
        self.alpha = alpha  # Learning rate
        self.max_iter = max_iter  # Maximum number of iterations
        self.tol = tol  # Tolerance for stopping criterion
        self.coef_ = None  # Coefficients (beta)
        if scaler == None:
            self.scaler_ = StandardScaler()
        else:
            self.scaler_ = scaler
    
    def fit(self, X, y):
        # Standardize the features
        # self.scaler_ = StandardScaler()
        X_scaled = self.scaler_.fit_transform(X)
        
        n, d = X_scaled.shape
        self.coef_ = np.zeros(d)  # Initialize coefficients
        
        for iteration in range(self.max_iter):
            # Compute predictions
            y_pred = X_scaled @ self.coef_
            
            # Compute the gradient
            gradient = -(X_scaled.T @ (y - y_pred)) / n + (self.lambda_ / n) * self.coef_
            
            # Update coefficients (gradient descent step)
            self.coef_ -= self.alpha * gradient
            
            # Check convergence (if the norm of the gradient is small enough)
            if np.linalg.norm(gradient, ord=2) < self.tol:
                print(f'Converged after {iteration} iterations.')
                break
            # self.alpha *= 1 - iteration/self.max_iter
    
    def predict(self, X):
        # Standardize the test data using the same scaler as training data
        X_scaled = self.scaler_.transform(X)
        return X_scaled @ self.coef_

    def mean_absolute_percentage_error(self, y_true, y_pred):
        """
        Compute the Mean Absolute Percentage Error (MAPE)
        """
        return np.mean(np.abs((y_true - y_pred) / y_true)) * 100

    def score(self, X_test, y_test):
        # Compute predictions and MAPE
        y_pred = self.predict(X_test)
        return self.mean_absolute_percentage_error(y_test, y_pred)

# Create the Ridge Regression model using Gradient Descent
ridge_gd = RidgeGradientDescent(lambda_=1.0, alpha=0.05, max_iter=100_000, tol=1e-3, scaler=MinMaxScaler())

# Fit the model to the training data
ridge_gd.fit(X_train, y_train)

# Predict on the test data
y_pred_gd = ridge_gd.predict(X_test)

mape_gd = ridge_gd.score(X_test, y_test)
print(f'MAPE ridge gd: {mape_gd}')
print(f'Coefficients: {ridge_gd.coef_}')

Converged after 93126 iterations.
MAPE ridge gd: 10.619794902454258
Coefficients: [-2507.44970258  3433.77746929 -3501.94836693  -395.28506891
 -2888.55146574  6057.59845803 -2387.06330472 16839.45701475
 14402.69243661 -2723.7887348   2040.74523291  4396.88616884]


Even after 100_000 iterations it has not completely converged.
Making alpha (the learning rate) depended on beta may be a good idea...

In [48]:
from sklearn.neural_network import MLPRegressor

X = df[features].copy()
y = df['Price']

X = pd.get_dummies(X)

X_train, X_test, y_train, y_test = train_test_split(X , y, shuffle=True, random_state=42, test_size=0.2)

scaler = MinMaxScaler()
X_train_scaled = scaler.fit_transform(X_train)
X_test_scaled = scaler.transform(X_test)

# Define the MLPRegressor model
mlp = MLPRegressor(hidden_layer_sizes=(200, 10),  # Two hidden layers with 100 and 50 neurons
                   activation='relu',  # Activation function
                   solver='adam',  # Optimization method
                   max_iter=5000,  # Maximum number of iterations
                   random_state=42)

# Fit the model to the training data
mlp.fit(X_train_scaled, y_train)

# Predict on the train set
y_train_pred = mlp.predict(X_train_scaled)

# Predict on the test set
y_test_pred = mlp.predict(X_test_scaled)

# Evaluate the model using MAPE on both train and test sets
mape_train = mean_absolute_percentage_error(y_train, y_train_pred) * 100
mape_test = mean_absolute_percentage_error(y_test, y_test_pred) * 100

print(f'Train set MAPE: {mape_train:.2f}%')
print(f'Test set MAPE: {mape_test:.2f}%')

Train set MAPE: 7.08%
Test set MAPE: 8.04%


##### mape of 8% for random forest vs 9.5% for mlp

RandomForestRegression does quite well. It is faster to train and there are less difficult hyper parameter choices to make.

In [49]:
from sklearn.ensemble import RandomForestRegressor

rf = RandomForestRegressor(
    n_estimators=200,
    min_samples_split=10,
    min_samples_leaf=4,
    max_features=0.6,
    n_jobs=-1
)

rf.fit(X_train_scaled, y_train)

# Predict on the train set
y_train_pred = rf.predict(X_train_scaled)

# Predict on the test set
y_test_pred = rf.predict(X_test_scaled)

# Evaluate the model using MAPE on both train and test sets
mape_train = mean_absolute_percentage_error(y_train, y_train_pred) * 100
mape_test = mean_absolute_percentage_error(y_test, y_test_pred) * 100

print(f'Train set MAPE: {mape_train:.2f}%')
print(f'Test set MAPE: {mape_test:.2f}%')
                           

Train set MAPE: 5.54%
Test set MAPE: 7.88%


This is a special type of boosting model

In [62]:
from sklearn.ensemble import HistGradientBoostingRegressor

est = HistGradientBoostingRegressor(
    max_leaf_nodes=10, # prevents overfitting
    learning_rate=0.1,
    max_iter=1000,
    min_samples_leaf=2,
    max_features=0.6,
)

est.fit(X_train_scaled, y_train)

# Predict on the train set
y_train_pred = est.predict(X_train_scaled)

# Predict on the test set
y_test_pred = est.predict(X_test_scaled)

# Evaluate the model using MAPE on both train and test sets
mape_train = mean_absolute_percentage_error(y_train, y_train_pred) * 100
mape_test = mean_absolute_percentage_error(y_test, y_test_pred) * 100

print(f'Train set MAPE: {mape_train:.2f}%')
print(f'Test set MAPE: {mape_test:.2f}%')
                           

Train set MAPE: 1.16%
Test set MAPE: 7.56%
