# Utilities

This notebook is used to outsource code. This serves two purposes. Firstly, the same code is made reusable in several other notebooks. In this way, the redundant listing of one and the same code is avoided. Secondly, sections of code can be outsourced that are only used once but would disrupt the flow of reading in the chapters. As a result, coherent storytelling can be guaranteed in the chapters.

## 1. Import Statements

In [2]:
# Import dependencies
import os
import time
import datetime as dt
from collections import defaultdict
import itertools

import ipywidgets as widgets
from IPython.display import display, clear_output

import numpy as np
import pandas as pd
import lightgbm as lgb
import xgboost as xg
import optuna

from sklearn.datasets import make_blobs
from sklearn.preprocessing import LabelEncoder, PolynomialFeatures
from sklearn.model_selection import train_test_split
from sklearn import metrics
from sklearn.linear_model import LinearRegression
from sklearn import tree
from sklearn.tree import DecisionTreeRegressor

import matplotlib
import matplotlib.pyplot as plt
import plotly.express as px
import plotly.graph_objects as go
import networkx as nx

## 2. Helper Functions and Classes

### 2.1 Visualization

In [8]:
def display_graph(graph: pd.DataFrame) -> None:
    """A helper function which plots a graph given an adjacency matrix

    Args:
        graph (pd.DataFrame): the adjacency matrix the graph is to be plotted for

    Returns:
        None
    """
    # Replace NaN values with 0 to ensure all values are numeric
    adj_matrix = graph.fillna(0)

    # Initialize graph from the adjacency matrix
    graph = nx.from_pandas_adjacency(adj_matrix)
    pos = nx.circular_layout(graph)

    # Draw the graph
    nx.draw_networkx_edges(graph, pos)
    nx.draw_networkx_nodes(graph, pos, node_size=700)
    nx.draw_networkx_labels(graph, pos, font_size=12, font_family="sans-serif")

    # Add edge labels with weights
    edge_labels = nx.get_edge_attributes(graph, "weight")
    # Only include edges with weight > 0
    edge_labels = {k: v for k, v in edge_labels.items() if v > 0}
    nx.draw_networkx_edge_labels(graph, pos, edge_labels=edge_labels)

    # Show the plot
    plt.title("Weighted Graph from Adjacency Matrix")
    plt.axis("off")
    plt.show()


def style_scatterplot(fig: go.Figure) -> None:
    """A helper function which styles plotly line charts in a uniform way

    Args:
        fig (go.Figure): the figure which needs to be styled

    Returns:
        None
    """
    fig.update_layout(
        plot_bgcolor="#FFF",
        title_x=0.5,
    )
    fig.update_xaxes(
        showline=True,
        linewidth=1,
        linecolor="black",
        showgrid=True,
        gridcolor="lightgrey",
    )
    fig.update_yaxes(
        showline=True,
        linewidth=1,
        linecolor="black",
        showgrid=True,
        gridcolor="lightgrey",
    )


def style_bar_chart(fig: go.Figure) -> None:
    """A helper function which styles plotly bar charts in a uniform way

    Args:
        fig (go.Figure): the figure which needs to be styled

    Returns:
        None
    """
    fig.update_layout(
        plot_bgcolor="#FFF",
        title_x=0.5,
    )
    fig.update_xaxes(
        showline=True,
        linewidth=1,
        linecolor="black",
        showgrid=False,
    )
    fig.update_yaxes(
        showline=True,
        linewidth=1,
        linecolor="black",
        showgrid=True,
        gridcolor="lightgrey",
    )


def add_crises(fig: go.Figure) -> None:
    """A helper function which adds major economic crises of the last 25 years as shaded areas to a chart

    Args:
        fig (go.Figure): the figure which needs to be styled

    Returns:
        None
    """
    # Add shaded areas for major crises
    fig_median_price_per_year.add_shape(
        type="rect",
        x0=2000,
        x1=2002,
        y0=0,
        y1=1,
        line=dict(color="red", width=0),
        fillcolor="red",
        opacity=0.2,
        xref="x",
        yref="paper",
    )
    fig_median_price_per_year.add_shape(
        type="rect",
        x0=2007,
        x1=2009,
        y0=0,
        y1=1,
        line=dict(color="red", width=0),
        fillcolor="red",
        opacity=0.2,
        xref="x",
        yref="paper",
    )
    fig_median_price_per_year.add_shape(
        type="rect",
        x0=2020,
        x1=2021,
        y0=0,
        y1=1,
        line=dict(color="red", width=0),
        fillcolor="red",
        opacity=0.2,
        xref="x",
        yref="paper",
    )

    # Add annotations for the crises
    fig_median_price_per_year.add_annotation(
        x=2001,
        y=1,
        text="Dot-com Bubble",
        showarrow=False,
        xref="x",
        yref="paper",
        font=dict(color="red"),
    )
    fig_median_price_per_year.add_annotation(
        x=2008,
        y=1,
        text="Global Financial Crisis",
        showarrow=False,
        xref="x",
        yref="paper",
        font=dict(color="red"),
    )
    fig_median_price_per_year.add_annotation(
        x=2020,
        y=1,
        text="COVID-19 Pandemic",
        showarrow=False,
        xref="x",
        yref="paper",
        font=dict(color="red"),
    )


def update_interactive_chart(button: widgets.widget_button.Button) -> None:
    """
    Args:
        button (widgets.widget_button.Button): button enabling submission
    """
    # Get dropwdown values
    clear_output(wait=True)
    display(controls)
    x_axis = x_dropdown.value
    y_axis = y_dropdown.value

    # Style and display chart
    fig = px.scatter(df_cars, x=x_axis, y=y_axis)
    style_scatterplot(fig)
    fig.show()

### 2.2 Optuna Optimization Objective Functions

In [None]:
def objective_dt(trial: optuna.trial.Trial) -> float:
    """Function whose output needs to be minimized by adapting the hyperparameters. It suggests hyperparameter intervals and trains a decision tree regressor using chosen hyperparameters.
    The model is then applied to predict on the test set. The predictions are used to calculate the RMSE error metric and return its value.

    Args:
        trial (optuna.trial.Trial): Trial object which suggests hyperparameter values and manages optimization

    Returns:
        rmse (float): value of the error metric which needs to be minimized
    """
    params = {
        "criterion": "squared_error",
        "splitter": "best",
        "max_depth": trial.suggest_int("max_depth", 30, 50),
        "min_samples_split": trial.suggest_int("min_samples_split", 2, 20),
        "min_samples_leaf": trial.suggest_int("min_samples_leaf", 1, 20),
        "min_weight_fraction_leaf": trial.suggest_float(
            "min_weight_fraction_leaf", 0.0, 0.5
        ),
        "max_features": trial.suggest_int("max_features", 5, 12),
        "random_state": 42,
    }
    # Instantiate and train Decision Tree Regressor
    dt_regressor = DecisionTreeRegressor(**params)
    dt_regressor.fit(X_train, y_train)

    # Predict on test set and return RMSE
    y_pred = dt_regressor.predict(X_test)
    rmse = metrics.root_mean_squared_error(y_test, y_pred)
    return rmse


def objective_xgb(trial: optuna.trial.Trial) -> float:
    """Function whose output needs to be minimized by adapting the hyperparameters. It suggests hyperparameter intervals and trains a XGBoost regressor using chosen hyperparameters.
    The model is then applied to predict on the test set. The predictions are used to calculate the RMSE error metric and return its value.

    Args:
        trial (optuna.trial.Trial): Trial object which suggests hyperparameter values and manages optimization

    Returns:
        rmse (float): value of the error metric which needs to be minimized
    """
    params = {
        "eta": trial.suggest_float("eta", 0.0, 0.2),
        "gamma": trial.suggest_int("gamma", 30, 50),
        "max_depth": trial.suggest_int("max_depth", 14, 20),
        "min_child_weight": trial.suggest_int("min_child_weight", 0, 10),
        "max_delta_step": trial.suggest_int("max_delta_step", 0, 1),
        "subsample": trial.suggest_float("subsample", 1e-3, 1.0, log=True),
        "colsample_bytree": trial.suggest_float("colsample_bytree", 0.1, 1.0, log=True),
        "lambda": trial.suggest_int("lambda", 1, 10),
        "alpha": trial.suggest_int("alpha", 0, 10),
        "n_estimators": trial.suggest_int("n_estimators", 50, 300),
        "objective": "reg:squarederror",
        "eval_metric": "rmse",
    }

    # Instantiate and train XGBoost Regressor
    xgb_regressor = xg.XGBRegressor(**params)
    xgb_regressor.fit(X_train, y_train)

    # Predict on test set and return RMSE
    y_pred = xgb_regressor.predict(X_test)
    rmse = metrics.root_mean_squared_error(y_test, y_pred)
    return rmse


def objective_lgb(trial: optuna.trial.Trial) -> float:
    """Function whose output needs to be minimized by adapting the hyperparameters. It suggests hyperparameter intervals and trains a LightGBM regressor using chosen hyperparameters.
    The model is then applied to predict on the test set. The predictions are used to calculate the RMSE error metric and return its value.

    Args:
        trial (optuna.trial.Trial): Trial object which suggests hyperparameter values and manages optimization

    Returns:
        rmse (float): value of the error metric which needs to be minimized
    """
    params = {
        "objective": "regression",
        "num_iterations": trial.suggest_int("num_iterations", 50, 500),
        "learning_rate": trial.suggest_float("learning_rate", 1e-3, 0.1, log=True),
        "num_leaves": trial.suggest_int("num_leaves", 2, 2**10),
        "min_data_in_leaf": trial.suggest_int("min_data_in_leaf", 1, 100),
        "bagging_fraction": trial.suggest_float(
            "bagging_fraction", 0.05, 1.0, log=True
        ),
        "bagging_freq": trial.suggest_int("bagging_freq", 0, 20),
        "feature_fraction": trial.suggest_float(
            "feature_fraction", 0.05, 1.0, log=True
        ),
        "metric": "rmse",
    }

    # Instantiate and train LightGBM Regressor
    lgb_regressor = lgb.train(
        params=params,
        train_set=train_set,
    )

    # Predict on test set and return RMSE
    y_pred = lgb_regressor.predict(X_test)
    rmse = metrics.root_mean_squared_error(y_test, y_pred)
    return rmse

### 2.3 Benchmarking Helper Functions

In [3]:
# Hardcoded (default and optimized) hyperparameters for training a LightGBM Regressor
params_lgb = {
    "objective": "regression",
    "num_iterations": 100,
    "learning_rate": 0.1,
    "num_leaves": 31,
    "min_data_in_leaf": 20,
    "bagging_fraction": 1.0,
    "bagging_freq": 0,
    "feature_fraction": 1.0,
    "metric": "rmse",
    "verbose": -1,
}

params_lgb_opt = {
    "objective": "regression",
    "num_iterations": 376,
    "learning_rate": 0.02037086620821208,
    "num_leaves": 527,
    "min_data_in_leaf": 72,
    "bagging_fraction": 0.9944312804038378,
    "bagging_freq": 16,
    "feature_fraction": 0.5204062794631757,
    "metric": "rmse",
    "verbose": -1,
}


def track_training_time(model_name: str, optimized: bool) -> float:
    """Tracks the training time of one of the three benchmarking models (Decision Tree Regressor, XGBoost Regressor, LightGBM Regressor)

    Args:
        model_name (str): the name of the model the training time should be tracked for
        optimized (bool): states whether the hyperparameters of the model have prviously been optimized

    Returns:
        float: the training time of the stated model
    """
    if model_name == "dt_regressor":
        # Instantiate Decision Tree Regressor
        if not optimized:
            regressor = DecisionTreeRegressor(**params_dt)
        else:
            regressor = DecisionTreeRegressor(**params_dt_opt)

    elif model_name == "xgb_regressor":
        # Instantiate XGBoost Regressor
        if not optimized:
            regressor = xg.XGBRegressor(**params_xgb)
        else:
            regressor = xg.XGBRegressor(**params_xgb_opt)

    else:
        # Instantiate and train LightGBM Regressor, track the process time
        if not optimized:
            training_start = time.process_time()
            lgb.train(params=params_lgb, train_set=train_set_lgb)
            training_end = time.process_time()
        else:
            training_start = time.process_time()
            lgb.train(params=params_lgb_opt, train_set=train_set_lgb)
            training_end = time.process_time()

        # Return the training time
        return training_end - training_start

    # Fit Decision Tree Regressor or XGBoost Regressor using the training data and track the process time
    training_start = time.process_time()
    regressor.fit(X_train, y_train)
    training_end = time.process_time()

    # Return the training time
    return training_end - training_start


def normalize_importance(values: np.ndarray | dict) -> np.ndarray:
    """Normalizes feature importance values by converting absolute to relative values. This allows us to compare diffently scaled importances of different models.

    Args:
        values (np.ndarray | dict): feature importance values

    Returns:
        np.ndarray: normalized feature importance values
    """
    if isinstance(values, np.ndarray):
        return values / np.sum(values)
    elif isinstance(values, dict):
        values_array = np.array(list(values.values()))
        return values_array / np.sum(values_array)


def track_feature_importance(model_name: str, optimized: bool) -> np.ndarray:
    """Tracks the feature importance of one of the three benchmarking models (Decision Tree Regressor, XGBoost Regressor, LightGBM Regressor)

    Args:
        model_name (str): the name of the model the training time should be tracked for
        optimized (bool): states whether the hyperparameters of the model have previously been optimized

    Returns:
        np.ndarray: the feature importance of the stated model
    """
    if model_name == "dt_regressor":
        # Instantiate Decision Tree Regressor
        if not optimized:
            regressor = DecisionTreeRegressor(**params_dt)
        else:
            regressor = DecisionTreeRegressor(**params_dt_opt)

        # Fit XGBoost Regressor using the training data and track RMSE evaluation on train and test set per iteration
        regressor.fit(X_train, y_train)

        feature_importance = regressor.feature_importances_

    elif model_name == "xgb_regressor":
        # Instantiate XGBoost Regressor
        if not optimized:
            regressor = xg.XGBRegressor(**params_xgb)
        else:
            regressor = xg.XGBRegressor(**params_xgb_opt)

        # Fit XGBoost Regressor using the training data and track RMSE evaluation on train and test set per iteration
        regressor.fit(X_train, y_train)

        # Store feature importance, skip 0 values
        feature_importance = regressor.get_booster().get_score(importance_type="gain")
        feature_importance = {
            feature: feature_importance.get(feature, 0) for feature in X_train.columns
        }

    else:
        eval_result = {}
        # Train LightGBM Regressor and track RMSE evaluation on train and test set per iteration
        if not optimized:
            regressor = lgb.train(
                params=params_lgb,
                train_set=train_set_lgb,
                valid_sets=[train_set_lgb, test_set_lgb],
                callbacks=[lgb.record_evaluation(eval_result)],
            )
        else:
            regressor = lgb.train(
                params=params_lgb_opt,
                train_set=train_set_lgb,
                valid_sets=[train_set_lgb, test_set_lgb],
                callbacks=[lgb.record_evaluation(eval_result)],
            )

        feature_importance = regressor.feature_importance(importance_type="gain")

    return normalize_importance(feature_importance)


def track_learning_curve(
    model_name: str, optimized: bool
) -> tuple[list[float], list[float]]:
    """Tracks the training time of one of the three benchmarking models (Decision Tree Regressor, XGBoost Regressor, LightGBM Regressor)

    Args:
        model_name (str): the name of the model the training time should be tracked for
        optimized (bool): states whether the hyperparameters of the model have prviously been optimized

    Returns:
        training_delta (float): the training time of the stated model
    """
    if model_name == "xgb_regressor":
        # Instantiate XGBoost Regressor
        if not optimized:
            regressor = xg.XGBRegressor(**params_xgb)
        else:
            regressor = xg.XGBRegressor(**params_xgb_opt)

        # Fit XGBoost Regressor using the training data and track RMSE evaluation on train and test set per iteration
        regressor.fit(
            X_train,
            y_train,
            eval_set=[(X_train, y_train), (X_test, y_test)],
            verbose=False,
        )

        eval_values = regressor.evals_result()

    else:
        eval_values = {}
        # Train LightGBM Regressor and track RMSE evaluation on train and test set per iteration
        if not optimized:
            regressor = lgb.train(
                params=params_lgb,
                train_set=train_set_lgb,
                valid_sets=[train_set_lgb, test_set_lgb],
                callbacks=[lgb.record_evaluation(eval_values)],
            )
        else:
            regressor = lgb.train(
                params=params_lgb_opt,
                train_set=train_set_lgb,
                valid_sets=[train_set_lgb, test_set_lgb],
                callbacks=[lgb.record_evaluation(eval_values)],
            )

    return standardize_evaluation(eval_values)


def standardize_evaluation(eval_values):
    evaluation_keys = list(eval_values.keys())

    return eval_values[evaluation_keys[0]]["rmse"], eval_values[evaluation_keys[1]][
        "rmse"
    ]

### 2.4 Custom LightGBM Regressor

In [None]:
class CarPriceRegressor:
    """Class to perform car price regression with custom user inputs

    Attributes:
        df_cars (pd.DataFrame): captures the whole cars data
        custom_data (dict): contains the values entered by the user
        custom_df (pd.DataFrame): contains the values entered by the user in the final format
        model_dropdown (widgets.widget_selection.Dropdown): contains the car models related to the selected car brand
        opt_bst (lgb.basic.Booster): trained LightGBM model
        label_encoders (dict): label encoders used for model training
    """

    def __init__(self, lgb_regressor: lgb.basic.Booster, label_encoders: dict) -> None:
        """Initialize instance attributes

        Args:
            lgb_regressor (lgb.basic.Booster): trained LightGBM model
            label_encoders (dict): label encoders used for model training

        Returns:
            None
        """
        self.df_cars = pd.read_csv("data/cars_cleaned.csv")
        self.custom_data = None
        self.custom_df = None
        self.model_dropdown = None
        self.lgb_regressor = lgb_regressor
        self.label_encoders = label_encoders

    def update_model_options(
        self, change: dict, submit_button: widgets.widget_button.Button
    ) -> None:
        """Dynamically updates model options based on the selected brand

        Args:
            change (dict): contains current and previous states of dropdowns
            submit_button (widgets.widget_button.Button): button enabling submission

        Returns:
            None
        """
        if change["type"] == "change" and change["name"] == "value":
            selected_brand = change["new"]
            self.custom_data["brand"] = selected_brand

            # Filter models based on the selected brand
            filtered_models = self.df_cars[self.df_cars["brand"] == selected_brand][
                "model"
            ].unique()

            # Update the options of the model dropdown
            self.model_dropdown.options = filtered_models
            self.model_dropdown.value = None

            # Check if form is ready to submit
            self.check_submit_ready(submit_button)

    def on_change(
        self,
        change: dict,
        feature_name: list[str],
        submit_button: widgets.widget_button.Button,
    ) -> None:
        """Update custom data based on dropdown and slider selection

        Args:
            change (dict): contains current and previous states of dropdowns and sliders
            feature_name (list[str]): contains either the categorical or the numerical features of df_cars as strings
            submit_button (widgets.widget_button.Button): button enabling submission

        Returns:
            None
        """
        if change["type"] == "change" and change["name"] == "value":
            self.custom_data[feature_name] = change["new"]

            # Check if form is ready to submit
            self.check_submit_ready(submit_button)

    def check_submit_ready(self, submit_button: widgets.widget_button.Button) -> None:
        """Checks the state of dropdwons and sliders before allowing custom data to be submitted

        Args:
            submit_button (widgets.widget_button.Button): button enabling submission

        Returns:
            None
        """
        submit_button.disabled = not all(
            value is not None for value in self.custom_data.values()
        )

    def on_submit(self, b: widgets.widget_button.Button) -> None:
        """Displays the user input and returns the predicted car price

        Args:
            b (widgets.widget_button.Button): button enabling submission
        """
        # Create a DataFrame from the custom_data dictionary
        self.custom_df = pd.DataFrame([self.custom_data])
        self.custom_df = self.custom_df[
            self.df_cars.drop("price_in_euro", axis=1).columns
        ]

        # Display the custom DataFrame
        print("User input data:")
        display(self.custom_df)

        # Call method to encode the custom data and return predicted price
        predicted_price = self.encode_and_predict()
        print(f"Predicted car price: {predicted_price}")

    def display_input_widgets(
        self,
        cat_features: list[str],
        num_features: list[str],
        submit_button: widgets.widget_button.Button,
    ):
        """Displays dropdowns and sliders, captures selections

        Args:
            cat_features (list[str]): contains the categorical features of df_cars as strings
            num_features (list[str]): contains the numerical features of df_cars as strings
            submit_button (widgets.widget_button.Button): button enabling submission

        Returns:
            None
        """
        # Initialize custom_data with None values
        self.custom_data = dict.fromkeys(cat_features + num_features)

        # Initialize empty list for input widgets
        input_widgets = []

        # Display brand dropdown
        brand_dropdown = widgets.Dropdown(
            options=self.df_cars["brand"].unique(),
            value=None,
            description=f"Brand of your car:",
            style={"description_width": "initial"},
            layout=widgets.Layout(width="initial"),
        )
        # Display model dropdown
        self.model_dropdown = widgets.Dropdown(
            options=[],
            value=None,
            description=f"Model of your car:",
            style={"description_width": "initial"},
            layout=widgets.Layout(width="initial"),
        )
        # Observe changes in the brand dropdown and update model options
        brand_dropdown.observe(
            lambda change: self.update_model_options(change, submit_button),
            names="value",
        )
        # Add brand dropdown to input widgets
        input_widgets.append(brand_dropdown)
        # Observe changes in the model dropdown and update custom_data
        self.model_dropdown.observe(
            lambda change, col="model": self.on_change(change, col, submit_button),
            names="value",
        )
        # Add model dropdown to input widgets
        input_widgets.append(self.model_dropdown)

        # Display dropdowns for categorical features (except brand and model)
        for cat_feature in [i for i in cat_features if i not in ["model", "brand"]]:
            dropdown = widgets.Dropdown(
                options=self.df_cars[cat_feature].unique(),
                value=None,
                description=f"{cat_feature.capitalize().replace('_', ' ')} of your car:",
                style={"description_width": "initial"},
                layout=widgets.Layout(width="initial"),
            )
            # Observe changes in the dropdown selection and update custom_data
            dropdown.observe(
                lambda change, col=cat_feature: self.on_change(
                    change, col, submit_button
                ),
                names="value",
            )
            # Add dropdown to input widgets
            input_widgets.append(dropdown)

        # Display sliders for numerical features, distinguishing between discrete and continous ones
        for num_feature in num_features:
            if num_feature in [
                "year",
                "power_ps",
                "power_kw",
                "mileage_in_km",
                "registration_month",
            ]:
                slider = widgets.IntSlider(
                    value=int(self.df_cars[num_feature].mean()),
                    min=self.df_cars[num_feature].min(),
                    max=self.df_cars[num_feature].max(),
                    step=1,
                    description=f"{num_feature.capitalize().replace('_', ' ')} of your car:",
                    style={"description_width": "initial"},
                    layout=widgets.Layout(width="initial"),
                )
            else:
                slider = widgets.FloatSlider(
                    value=self.df_cars[num_feature].mean(),
                    min=self.df_cars[num_feature].min(),
                    max=self.df_cars[num_feature].max(),
                    step=0.1,
                    description=f"{num_feature.capitalize().replace('_', ' ')} of your car:",
                    style={"description_width": "initial"},
                    layout=widgets.Layout(width="initial"),
                )
            # Observe changes in the slider and update custom_data
            slider.observe(
                lambda change, col=num_feature: self.on_change(
                    change, col, submit_button
                ),
                names="value",
            )
            # Add slider to input widgets
            input_widgets.append(slider)

        # Display all input widgets vertically aligned
        for input_widget in input_widgets:
            display(input_widget)

    def encode_and_predict(self):
        """Encodes categorical features and performs a regression using the trained LightGBM model

        Returns:
            None
        """
        # Encode categorical features with the same labels as for training
        self.custom_df[list(self.label_encoders.keys())] = self.custom_df[
            list(self.label_encoders.keys())
        ].apply(lambda x: self.label_encoders[x.name].transform(x))

        # Perform prediction using the trained LightGBM model
        prediction = self.lgb_regressor.predict(self.custom_df)
        return prediction[0]

    def predict_car_price(
        self, cat_features: list[str], num_features: list[str]
    ) -> None:
        """Main function, creates custom data from user inputs and implements submission logic

        Args:
            cat_features (list[str]): contains the categorical features of df_cars as strings
            num_features (list[str]): contains the numerical features of df_cars as strings

        Returns:
            None
        """
        # Define the submit button
        submit_button = widgets.Button(description="Calculate", disabled=True)

        # Display input widgets (dropdowns and sliders)
        self.display_input_widgets(cat_features, num_features, submit_button)

        # Define the click event of the submit button and display the latter
        submit_button.on_click(self.on_submit)
        display(submit_button)