# Car Price Prediction Model Training

In [None]:
# Libraries
import pandas as pd
import numpy as np
import joblib
from datetime import date
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler, PowerTransformer, OneHotEncoder
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline
from sklearn.base import BaseEstimator, TransformerMixin
from category_encoders import TargetEncoder

# Model
from lightgbm import LGBMRegressor
from sklearn.metrics import r2_score, mean_absolute_error, mean_squared_error, root_mean_squared_error

## Define Custom Classes
### These classes allow the pipeline to handle raw data automatically.

In [None]:
class CarAgeTransformer(BaseEstimator, TransformerMixin):
    def fit(self, X, y=None):
        return self
    
    def transform(self, X):
        X = X.copy()
        if not isinstance(X, pd.DataFrame):
            X = pd.DataFrame(X, columns=['RegistrationYear', 'RegistrationMonth'])
            
        current_year = date.today().year
        
        X['RegistrationYear'] = pd.to_numeric(X['RegistrationYear'], errors='coerce')
        X['RegistrationMonth'] = pd.to_numeric(X['RegistrationMonth'], errors='coerce')
        
        X['RegistrationMonth'] = X['RegistrationMonth'].apply(lambda x: 6 if (x < 1 or x > 12) else x)
        
        X['RegistrationDate'] = pd.to_datetime(
            X['RegistrationYear'].astype(int).astype(str) + '-' + 
            X['RegistrationMonth'].astype(int).astype(str), 
            format='%Y-%m', errors='coerce'
        )
        
        X['CarAge'] = ((pd.to_datetime('today') - X['RegistrationDate']).dt.days / 365.25).round(2)
        X['CarAge'] = X['CarAge'].fillna(10)
        
        return X[['CarAge']]

    def get_feature_names_out(self, input_features=None):
        return ["CarAge"]
    def fit(self, X, y=None):
        return self
    
    def transform(self, X):
        X = X.copy()
        if not isinstance(X, pd.DataFrame):
            X = pd.DataFrame(X, columns=['RegistrationYear', 'RegistrationMonth'])
            
        current_year = date.today().year
        
        X['RegistrationYear'] = pd.to_numeric(X['RegistrationYear'], errors='coerce')
        X['RegistrationMonth'] = pd.to_numeric(X['RegistrationMonth'], errors='coerce')
        
        X['RegistrationMonth'] = X['RegistrationMonth'].apply(lambda x: 6 if (x < 1 or x > 12) else x)
        
        X['RegistrationDate'] = pd.to_datetime(
            X['RegistrationYear'].astype(int).astype(str) + '-' + 
            X['RegistrationMonth'].astype(int).astype(str), 
            format='%Y-%m', errors='coerce'
        )
        
        X['CarAge'] = ((pd.to_datetime('today') - X['RegistrationDate']).dt.days / 365.25).round(2)
        X['CarAge'] = X['CarAge'].fillna(10)
        
        return X[['CarAge']]

class WeightedEnsemble(BaseEstimator):
    def __init__(self, models, weights=None):
        self.models = models
        self.weights = weights if weights else [1/len(models)] * len(models)

    def fit(self, X, y):
        X_arr = np.array(X)
        for model in self.models:
            model.fit(X_arr, y)
        return self

    def predict(self, X):
        X_arr = np.array(X)
        predictions = np.zeros(X_arr.shape[0])
        for model, weight in zip(self.models, self.weights):
            predictions += weight * model.predict(X_arr)
        return predictions

## Load and Clean Data

### Loading the data

In [22]:
file_path = "data/CarPrices.csv"
df = pd.read_csv(file_path)

### Drop useless columns

In [23]:
useless_cols = ['DateCrawled', 'DateCreated', 'LastSeen', 'NumberOfPictures', 'PostalCode']
df = df.drop(columns=useless_cols, errors='ignore')

### Filter Years

In [24]:
current_year = date.today().year
df = df[(df['RegistrationYear'] <= current_year) & (df['RegistrationYear'] > 1900)]

### Filter Price Outliers

In [25]:
lower_q, upper_q = df['Price'].quantile([0.08, 0.92])
df = df[(df['Price'] >= lower_q) & (df['Price'] <= upper_q)]

### Fill Missing Values

In [26]:
obj_cols = df.select_dtypes(include='object').columns
df[obj_cols] = df[obj_cols].fillna('unknown')
num_cols = df.select_dtypes(include=np.number).columns
df[num_cols] = df[num_cols].fillna(df[num_cols].median())

### Save Cleaned Data for App

In [27]:
df.to_csv("data/CleanedCarPrices.csv", index=False)
print("Saved data/CleanedCarPrices.csv")

Saved data/CleanedCarPrices.csv


### Features (X) and Target (Y)

In [28]:
X = df.drop(columns=['Price'])
Y = df['Price']

## Build Preprocessing Pipeline

### 1. Age Pipeline

In [29]:
age_pipe = Pipeline([
    ('age_calc', CarAgeTransformer()),
    ('pt', PowerTransformer(method='yeo-johnson'))
])

### 2. Numerical Pipeline (Power -> Scale)

In [30]:
num_pipe = Pipeline([
    ('pt', PowerTransformer(method='yeo-johnson')),
    ('scaler', StandardScaler())
])

## Encoding the columns

### 1. Categorical Encoding

In [31]:
cat_ohe_cols = ['Gearbox', 'Repaired', 'FuelType', 'VehicleType']
cat_ohe = OneHotEncoder(drop='first', handle_unknown='ignore', sparse_output=False)

### 2. Target Encoding

In [32]:
cat_target_cols = ['Brand', 'Model']
cat_target = TargetEncoder()

# Combine the Pipelines and Encoded Columns 

In [None]:
preprocessor = ColumnTransformer(
    transformers=[
        ('age', age_pipe, ['RegistrationYear', 'RegistrationMonth']),
        ('num', num_pipe, ['Power', 'Kilometer']),
        ('cat_ohe', cat_ohe, cat_ohe_cols),
        ('cat_target', cat_target, cat_target_cols)
    ],
    remainder='drop',
    verbose_feature_names_out=False
)

# --- ADD THIS LINE ---
preprocessor.set_output(transform="pandas")

## Train Test Split the data

In [34]:
X_train, X_test, Y_train, Y_test = train_test_split(X, Y, test_size=0.2, random_state=42)

## Optimized Models

In [35]:
# Optimized LightGBM (Kept your specific hyperparameters)
lgbm_fast = LGBMRegressor(
    n_estimators=2000, 
    learning_rate=0.05, 
    max_depth=-1,
    num_leaves=63, 
    subsample=0.9, 
    colsample_bytree=0.85,
    reg_alpha=0.2, 
    reg_lambda=0.4, 
    min_child_samples=30,
    min_split_gain=0.01, 
    random_state=42, 
    n_jobs=-1, 
    verbose=-1
)

## Create the FULL Pipeline (Preprocessing + Model)
### We use the fast_ensemble here as it's faster for the web app than the full Stacker

In [36]:
full_pipeline = Pipeline([
    ('preprocessor', preprocessor),
    ('model', lgbm_fast)
])

# Train
print("Training LightGBM Pipeline...")
full_pipeline.fit(X_train, Y_train)
print("Training Complete.")

Training LightGBM Pipeline...
Training Complete.


## Evalution Metrics

In [37]:
Y_pred = full_pipeline.predict(X_test)
print(f"R2 Score: {r2_score(Y_test, Y_pred):.4f}")
print(f"MAE: {mean_absolute_error(Y_test, Y_pred):.2f}")
print(f"MSE: {mean_squared_error(Y_test, Y_pred):.2f}")
print(f"R-MAE: {root_mean_squared_error(Y_test, Y_pred):.2f}")

R2 Score: 0.8480
MAE: 805.90
MSE: 1445066.87
R-MAE: 1202.11


## Save Pipeline in Joblib file

In [38]:
output_path = 'joblib/CarPricePipeline.joblib'
joblib.dump(full_pipeline, output_path, compress=3)
print(f"Pipeline saved as {output_path}")

Pipeline saved as joblib/CarPricePipeline.joblib
