In [28]:
# Loading dependencies
from typing import Any, Dict, List, Optional

import cloudpickle
import pandas as pd
from sklearn.base import BaseEstimator, TransformerMixin
from sklearn.compose import ColumnTransformer
from sklearn.impute import SimpleImputer
from sklearn.metrics import mean_absolute_error, mean_squared_error
from sklearn.model_selection import RandomizedSearchCV, train_test_split
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import OneHotEncoder
from xgboost import XGBRegressor

In [13]:
# Loading data
df = pd.read_csv("./data/data.csv")
df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 8128 entries, 0 to 8127
Data columns (total 13 columns):
 #   Column         Non-Null Count  Dtype  
---  ------         --------------  -----  
 0   name           8128 non-null   object 
 1   year           8128 non-null   int64  
 2   selling_price  8128 non-null   int64  
 3   km_driven      8128 non-null   int64  
 4   fuel           8128 non-null   object 
 5   seller_type    8128 non-null   object 
 6   transmission   8128 non-null   object 
 7   owner          8128 non-null   object 
 8   mileage        7907 non-null   object 
 9   engine         7907 non-null   object 
 10  max_power      7913 non-null   object 
 11  torque         7906 non-null   object 
 12  seats          7907 non-null   float64
dtypes: float64(1), int64(3), object(9)
memory usage: 825.6+ KB


In [None]:
# Cleaning data
df.dropna(inplace=True)

### Defining data transformers

In [3]:
class CarModelExtractionTransformer(BaseEstimator, TransformerMixin):
    def __init__(self) -> None:
        self._popular_brands: List[str] = []

    def fit(self, X: pd.DataFrame, y: Optional[Any] = None, max_models_count: int = 30):
        data = X.copy()
        data["model"] = data.name.apply(lambda x: x.split()[1])

        self._popular_brands = (
            data.groupby("model").size().sort_values(ascending=False).index[:max_models_count]
        )
        return self

    def transform(self, X: pd.DataFrame, y: Optional[Any] = None) -> pd.DataFrame:
        data = X.copy()
        data["model"] = data.name.apply(lambda x: x.split()[1])
        data.loc[~data["model"].isin(self._popular_brands), "model"] = "Other"
        return data

In [4]:
class CarTechnicalInfoTransformer(BaseEstimator, TransformerMixin):
    def fit(self, X: pd.DataFrame, y: Optional[Any] = None):
        return self

    def _extract_engine_info(self, engine_info: str) -> int:
        try:
            return int(engine_info.split()[0])
        except AttributeError:
            return engine_info

    def _parse_mileage(self, mileage: str) -> float:
        if type(mileage) != str:
            return mileage
        measurement, measure_type = mileage.split()
        if measure_type == "km/kg":
            return float(measurement) * 1.4
        return float(measurement)

    def _extract_max_power(self, power: str) -> float:
        if type(power) != str:
            return power
        try:
            return float(power.split()[0])
        except ValueError:
            return float("nan")

    def _parse_year_to_age(self, year: int) -> int:
        return 2022 - year

    def transform(self, X: pd.DataFrame, y: Optional[Any] = None) -> pd.DataFrame:
        data = X.copy()
        data.max_power = data.max_power.apply(self._extract_max_power)
        data.mileage = data.mileage.apply(self._parse_mileage)
        data["age"] = data.year.apply(self._parse_year_to_age)
        data.engine = data.engine.apply(self._extract_engine_info)

        return data

In [5]:
class CarTechnicalInfoTransformer(BaseEstimator, TransformerMixin):
    def fit(self, X: pd.DataFrame, y: Optional[Any] = None):
        return self

    def _extract_engine_info(self, engine_info: str) -> int:
        try:
            return int(engine_info.split()[0])
        except AttributeError:
            return engine_info

    def _parse_mileage(self, mileage: str) -> float:
        if type(mileage) != str:
            return mileage
        measurement, measure_type = mileage.split()
        if measure_type == "km/kg":
            return float(measurement) * 1.4
        return float(measurement)

    def _extract_max_power(self, power: str) -> float:
        if type(power) != str:
            return power
        try:
            return float(power.split()[0])
        except ValueError:
            return float("nan")

    def _parse_year_to_age(self, year: int) -> int:
        return 2022 - year

    def transform(self, X: pd.DataFrame, y: Optional[Any] = None) -> pd.DataFrame:
        data = X.copy()
        data.max_power = data.max_power.apply(self._extract_max_power)
        data.mileage = data.mileage.apply(self._parse_mileage)
        data["age"] = data.year.apply(self._parse_year_to_age)
        data.engine = data.engine.apply(self._extract_engine_info)

        return data

In [6]:
class CarOwnerParsingTransformer(BaseEstimator, TransformerMixin):
    def fit(self, X: pd.DataFrame, y: Optional[Any] = None):
        return self

    def transform(self, X: pd.DataFrame, y: Optional[Any] = None) -> pd.DataFrame:
        data = X.copy()

        owners = {
            "Test Drive Car": 0,
            "First Owner": 1,
            "Second Owner": 2,
            "Third Owner": 3,
            "Fourth & Above Owner": 4,
        }

        data["owner"] = data["owner"].apply(lambda x: owners[x])

        return data

### Defining pipeline

In [7]:
cont_pipeline = Pipeline(
    [
        ("imputer", SimpleImputer(strategy="median")),
    ]
)

cat_pipeline = Pipeline(
    [
        ("imputer", SimpleImputer(strategy="most_frequent")),
        ("one_hot_encode", OneHotEncoder(handle_unknown="ignore")),
    ]
)

In [19]:
cat_features = ["fuel", "seller_type", "transmission", "model"]
cont_features = ["km_driven", "owner", "mileage", "engine", "max_power", "seats", "age"]

label = "selling_price"

In [11]:
pipeline = Pipeline(
    [
        ("model_extraction", CarModelExtractionTransformer()),
        ("technical_info_extraction", CarTechnicalInfoTransformer()),
        ("owner_extraction", CarOwnerParsingTransformer()),
        (
            "ColumnTransformer",
            ColumnTransformer(
                [
                    ("cont_transformer", cont_pipeline, cont_features),
                    ("cat_transformer", cat_pipeline, cat_features),
                ]
            ),
        ),
        ("model", XGBRegressor()),
    ]
)

### Training and hyperparams search

In [14]:
# Splitting data
train_df, test_df = train_test_split(df)

In [22]:
# Searching for the best hyperparams


def extract_best_model_params(params: Dict[str, Any]) -> Dict[str, Any]:
    filtered_dict = {}
    for key, value in params.items():
        if "model__" in key:
            filtered_dict[key.split("model__")[-1]] = value
    return filtered_dict

In [26]:
params = {
    "model__max_depth": [6, 10],
    "model__learning_rate": [0.01, 0.08],
    "model__colsample_bylevel": [0.3, 0.4],
}

forrest = pipeline

search = RandomizedSearchCV(forrest, params, n_iter=100, cv=7, random_state=42, n_jobs=-1)
search.fit(train_df, train_df[label].values)


pipeline = Pipeline(
    [
        ("model_extraction", CarModelExtractionTransformer()),
        ("technical_info_extraction", CarTechnicalInfoTransformer()),
        ("owner_extraction", CarOwnerParsingTransformer()),
        (
            "ColumnTransformer",
            ColumnTransformer(
                [
                    ("cont_transformer", cont_pipeline, cont_features),
                    ("cat_transformer", cat_pipeline, cat_features),
                ]
            ),
        ),
        ("model", XGBRegressor(**extract_best_model_params(search.best_estimator_.get_params()))),
    ]
)
pipeline.fit(train_df, train_df[label].values)

predictions = pipeline.predict(test_df)

print("MAE: ", mean_absolute_error(test_df[label], predictions))
print("MSE: ", mean_squared_error(test_df[label], predictions))



MAE:  64003.35230453371
MSE:  16487482942.967878


In [30]:
pipeline.predict(test_df.head(1))

array([100638.05], dtype=float32)

### Saving model

In [None]:
cloudpickle.dump(pipeline, open("./outputs/model.pkl", "wb"))