In [1]:
import pandas as pd
import matplotlib.pyplot as plt
import pickle

In [2]:
df = pd.read_csv('../data/vehical.csv')
df.head(3)

Unnamed: 0,Brand,Year,Model,Car/Suv,Title,UsedOrNew,Transmission,Engine,DriveType,FuelType,FuelConsumption,Kilometres,ColourExtInt,Location,CylindersinEngine,BodyType,Doors,Seats,Price
0,Ssangyong,2022.0,Rexton,Sutherland Isuzu Ute,2022 Ssangyong Rexton Ultimate (awd),DEMO,Automatic,"4 cyl, 2.2 L",AWD,Diesel,8.7 L / 100 km,5595,White / Black,"Caringbah, NSW",4 cyl,SUV,4 Doors,7 Seats,51990
1,MG,2022.0,MG3,Hatchback,2022 MG MG3 Auto Excite (with Navigation),USED,Automatic,"4 cyl, 1.5 L",Front,Premium,6.7 L / 100 km,16,Black / Black,"Brookvale, NSW",4 cyl,Hatchback,5 Doors,5 Seats,19990
2,BMW,2022.0,430I,Coupe,2022 BMW 430I M Sport,USED,Automatic,"4 cyl, 2 L",Rear,Premium,6.6 L / 100 km,8472,Grey / White,"Sylvania, NSW",4 cyl,Coupe,2 Doors,4 Seats,108988


In [3]:
def save_with_pickle(model, file_path: str) -> None:
    with open(file_path, "wb") as file:
        pickle.dump(model, file)

In [4]:
from typing import List

cols = ['Title', "Model", "Car/Suv", "Location", "Engine", "ColourExtInt"]

# Droping unnecessary columns and NA values
def drop_unnecessary_and_NA_values(df: pd.DataFrame, cols: List[str]) -> pd.DataFrame:
    df = df.copy()
    df.drop(columns=cols, inplace=True)
    df.dropna(inplace=True)
    try:
        if df.isnull().sum().sum() == 0:
            print("All NaN values have been removed.")
            return df   
        else:
            print("There are still NaN values in the DataFrame.")
    except KeyError as e:
        raise KeyError(f"One or more columns specified in 'cols' were not found in the DataFrame: {e}")
    
    except ValueError as e:
        raise ValueError(f"A value error occurred, possibly due to mismatched data types: {e}")
    
    except Exception as e:
        raise RuntimeError(f"An error occurred during the operation: {e}")
    
df = drop_unnecessary_and_NA_values(df, cols)

All NaN values have been removed.


In [5]:
# Build a function with mask to remove "-" values from df
def remove_dash_symbol(df: pd.DataFrame) -> pd.DataFrame:
    mask = df.apply(lambda col: col.astype(str).str.contains('-')).any(axis=1)
    df = df[~mask]
    if df.apply(lambda col: col.astype(str).str.contains('-')).any().any():
            raise ValueError("Not all dash symbols ('-') have been removed from the DataFrame.")
    
    return df

df = remove_dash_symbol(df)

In [6]:
# Removing all "POA" values from Price column
def remove_POA_values(df: pd.DataFrame) -> pd.DataFrame:
    df = df[df['Price'] != 'POA'].reset_index(drop=True)
    if (df['Price'] == 'POA').any():
        raise ValueError("Not all 'POA' values have been removed from the Price column.")
        
    return df

df = remove_POA_values(df)

In [7]:
from typing import Union
int_cols = ['Seats', 'Doors', 'CylindersinEngine', 'Kilometres']
float_cols = ['FuelConsumption']

# Converting columns from obj to int/float
def from_cat_to_num(df: pd.DataFrame, col: str, dtypes: Union[int, float]) -> pd.DataFrame:
    if dtypes == int:
        df[col] = df[col].str.replace('[^0-9]', '', regex=True).astype(dtypes)
    else:
       df[col] = df[col].str.extract(r'(\d+\.?\d*)').astype(float)
       
for col in int_cols:
    from_cat_to_num(df, col, int)

for col in float_cols:
    from_cat_to_num(df, col, float)

df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 13752 entries, 0 to 13751
Data columns (total 13 columns):
 #   Column             Non-Null Count  Dtype  
---  ------             --------------  -----  
 0   Brand              13752 non-null  object 
 1   Year               13752 non-null  float64
 2   UsedOrNew          13752 non-null  object 
 3   Transmission       13752 non-null  object 
 4   DriveType          13752 non-null  object 
 5   FuelType           13752 non-null  object 
 6   FuelConsumption    13752 non-null  float64
 7   Kilometres         13752 non-null  int64  
 8   CylindersinEngine  13752 non-null  int64  
 9   BodyType           13752 non-null  object 
 10  Doors              13752 non-null  int64  
 11  Seats              13752 non-null  int64  
 12  Price              13752 non-null  object 
dtypes: float64(2), int64(4), object(7)
memory usage: 1.4+ MB


In [8]:
int_cols = ['Price', 'Year']

# Convert Kilometres and Price columns to int
def convert_to_int(df: pd.DataFrame, cols: List[str]) -> pd.DataFrame:
    for col in int_cols:
        df[col] = df[col].astype(int)
    
    return df

df = convert_to_int(df, int_cols)
df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 13752 entries, 0 to 13751
Data columns (total 13 columns):
 #   Column             Non-Null Count  Dtype  
---  ------             --------------  -----  
 0   Brand              13752 non-null  object 
 1   Year               13752 non-null  int64  
 2   UsedOrNew          13752 non-null  object 
 3   Transmission       13752 non-null  object 
 4   DriveType          13752 non-null  object 
 5   FuelType           13752 non-null  object 
 6   FuelConsumption    13752 non-null  float64
 7   Kilometres         13752 non-null  int64  
 8   CylindersinEngine  13752 non-null  int64  
 9   BodyType           13752 non-null  object 
 10  Doors              13752 non-null  int64  
 11  Seats              13752 non-null  int64  
 12  Price              13752 non-null  int64  
dtypes: float64(1), int64(6), object(6)
memory usage: 1.4+ MB


In [9]:
from typing import List

cols_to_remove_outliers = ['Year', 'FuelConsumption', 'CylindersinEngine', 'Seats']

def remove_outliers(df: pd.DataFrame, cols_to_remove_outliers: List[str]) -> pd.DataFrame:
    df = df.copy()
    df = df[df[cols_to_remove_outliers[0]] > 1990]
    df = df[(df[cols_to_remove_outliers[1]] >= 1) & (df[cols_to_remove_outliers[1]] < 25)]
    df = df[df[cols_to_remove_outliers[2]] > 1]
    df = df[df[cols_to_remove_outliers[3]] < 15]
    df.reset_index(drop=True, inplace=True)
    
    return df

df = remove_outliers(df, cols_to_remove_outliers)

In [10]:
categorical_cols = ['Brand', "UsedOrNew", "Transmission", "DriveType", "FuelType", "BodyType"]

# Unique values in columns
def show_unique_values(df: pd.DataFrame, cols: List[str]) -> None:
    for col in cols:
        print(len(df[col].unique()))
        
show_unique_values(df, categorical_cols)

59
3
2
5
5
10


In [11]:
from sklearn.preprocessing import OneHotEncoder
import pandas as pd

categorical_cols_for_one_hot = ["UsedOrNew", "Transmission", "DriveType", "FuelType"]

def oneHot_encodeing(categorical_cols: List[str], df: pd.DataFrame) -> pd.DataFrame:
    try:
        df = df.copy()
        one_hot_encoder = OneHotEncoder(sparse_output=False)
        one_hot_encoded = one_hot_encoder.fit_transform(df[categorical_cols])
        one_hot_df = pd.DataFrame(one_hot_encoded, 
                                  columns=one_hot_encoder.get_feature_names_out(categorical_cols), 
                                  index=df.index) 
        df_encoded = pd.concat([df.drop(categorical_cols_for_one_hot, axis=1),
                                one_hot_df], axis=1)
        
        save_with_pickle(one_hot_encoder, "../models/OneHot_encoder.pkl")
        
        return df_encoded
    except KeyError as e:
        raise KeyError(f"One or more columns specified in 'categorical_cols' were not found in the DataFrame: {e}")
    
    except Exception as e:
        raise RuntimeError(f"An error occurred during one-hot encoding: {e}")


df = oneHot_encodeing(categorical_cols_for_one_hot, df)

In [12]:
# Using LabelEncoder to encode categorical values (More then 7 unique values in column)
from sklearn.preprocessing import LabelEncoder

categorical_cols_for_label_encoder = ['Brand', "BodyType"]

def label_encoder(categorical_cols: List[str], df: pd.DataFrame) -> pd.DataFrame:
    df = df.copy()
    label_encoder = LabelEncoder()
    df[categorical_cols[0]] = label_encoder.fit_transform(df[categorical_cols[0]])
    df[categorical_cols[1]] = label_encoder.fit_transform(df[categorical_cols[1]])
    
    save_with_pickle(label_encoder, "../models/Label_encoder.pkl")
    
    return df

df = label_encoder(categorical_cols_for_label_encoder, df)

In [13]:
from sklearn.model_selection import train_test_split
from typing import Tuple

def split_data(df: pd.DataFrame, target: str, test_size: float) -> Tuple[pd.DataFrame, pd.DataFrame, pd.Series, pd.Series]:
    X = df.drop(columns=target)
    y = df[target]
    
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=test_size)
    
    return X_train, X_test, y_train, y_test

X_train, X_test, y_train, y_test = split_data(df, "Price", 0.2)

In [None]:
# This script performs hyperparameter tuning for an XGBoost regression model using GridSearchCV with
# cross-validation to find the best parameter combination for minimizing mean squared error.

from typing import Dict
from xgboost import XGBRegressor
from sklearn.model_selection import GridSearchCV

reg = XGBRegressor(n_estimators=1000, objective='reg:squarederror', random_state=1234)

param_grid = {
    'n_estimators': [500, 1000],
    'learning_rate': [0.01, 0.1, 0.2],
    'max_depth': [3, 5, 7],
    'subsample': [0.8, 1.0],
    'colsample_bytree': [0.8, 1.0],
    'gamma': [0, 0.1, 0.2],
}

def train_model(X_train: pd.DataFrame,
                y_train: pd.Series,
                reg: XGBRegressor,
                param_grid: Dict[str, list[Union[int, float]]]) -> XGBRegressor:
    
    grid_search = GridSearchCV(estimator=reg,
                           param_grid=param_grid,
                           cv=3, n_jobs=-1, verbose=2,
                           scoring='neg_mean_squared_error')
    
    grid_search.fit(X_train, y_train)
    
    # Showing the best model
    print(f'Best params: {grid_search.best_params_}')
    print(f'Best score: {grid_search.best_score_}')
    
    best_model = grid_search.best_estimator_
    
    return best_model 

model = train_model(X_train, y_train, reg, param_grid)

In [17]:
# Testing the model on the following metrics
# - Mean Squared Error
# - Mean Absolute Error
# - Root Mean Squared Error
# - R2 Score

from sklearn.metrics import mean_squared_error, mean_absolute_error, root_mean_squared_error, r2_score

def Evaluate_model(X_test: pd.DataFrame,
                   y_test: pd.Series,
                   model: XGBRegressor) -> None:
    # Predict on test set
    reg_pred = model.predict(X_test)
    
    print(f'Mean Squared Error: {mean_squared_error(y_test, reg_pred)}')
    print(f'Mean Absolute Error: {root_mean_squared_error(y_test, reg_pred)}')
    print(f'Root Mean Squared Error: {mean_absolute_error(y_test, reg_pred)}')
    print(f'R2 Score: {r2_score(y_test, reg_pred)}')
    
Evaluate_model(X_test, y_test, model)

Mean Squared Error: 80653536.11512727
Mean Absolute Error: 8980.731379744484
Root Mean Squared Error: 4468.910143218565
R2 Score: 0.8963158523279421


In [18]:
import pickle

def save_with_pickle(model, file_path: str) -> None:
    with open(file_path, "wb") as file:
        pickle.dump(model, file)
        
save_with_pickle(model, "../models/01_model.pkl")