In [23]:
import pandas as pd
import numpy as np
import mlflow
import mlflow.xgboost
from xgboost import XGBRegressor
from sklearn.model_selection import GridSearchCV
import subprocess
from mlflow.tracking import MlflowClient
from sklearn.metrics import mean_absolute_percentage_error, mean_absolute_error

In [24]:
class Preprocessing:
    """
        Class functions to retrieve and preprocess time series data
    """
    
    def get_data(path: str) -> pd.DataFrame:
        data = pd.read_csv(path, index_col=0)
        return data

    def preprocessing(data: pd.DataFrame) -> pd.DataFrame:
        """
        Returns data with index and frequency of index set

        Parameters
        ----------
        data: pd.DataFrame

        col: str
            name of the column that will be kept
        """
        data.index = pd.to_datetime(data.index)
        #data = data[col]
        data = data.div(1000)
        data.index.freq = pd.infer_freq(data.index)
        return data

    def train_test_split_series(data: pd.DataFrame, n_test: int) -> pd.DataFrame:
        return data.iloc[:-n_test], data.iloc[-n_test:]

    def train_test_split_df(data: pd.DataFrame, n_test: int) -> pd.DataFrame:
        return data.iloc[:-n_test], data.iloc[-n_test:]

    def series_to_supervised(
        data: pd.Series, n_in: int = 1, dropnan: bool = True
    ) -> np.array:
        """
        Converts a sequence of numbers, i.e. a univariate time series, into a matrix
        with one array (series at time t) plus one more array for each n_in
        (lags at times t-1, t-2, .., t-n_in).

        Parameters
        ----------
        data: pd.Series

        n_in: int
            number of lags to create from the original series.
            For each lag required, one more column will be added,
            at the cost of one row of observations.

        dropnan: bool

        """
        df = pd.DataFrame(data)
        cols = list()
        # input sequence (t-n, ... t-1)
        for i in range(n_in, 0, -1):
            cols.append(df.shift(i))
        cols.append(df)
        # put it all together
        agg = pd.concat(cols, axis=1)
        # drop rows with NaN values (in particular the first and the last rows)
        if dropnan:
            agg.dropna(inplace=True)

        return agg

In [36]:
class XGBForecaster:
    """
        XGBoost model used for univariate or multivariate forecasting.
    """

    def __init__(self):
        self.model = XGBRegressor()

    def fit(self, train_ensamble: pd.DataFrame) -> XGBRegressor:
        data = np.asarray(train_ensamble)
        X, y = data[:, :-1], data[:, -1]
        self.fitted_model = self.model.fit(X, y)
        return self.fitted_model

    def forecast(self, row_just_before: int, steps_ahead: int) -> list:
        """
            Rolling prediction with the model_fitted for predicting n=steps_ahead new instances.
            This instances will immediately follow row_just_before, which is the last row of the dataframe available
        """
        row_just_before = np.asarray(row_just_before)[1:]
        current_row = row_just_before.reshape(1, -1)
        forecast = []
        for _ in range(steps_ahead):
            pred = self.fitted_model.predict(current_row)
            forecast.append(pred[0])
            current_row = np.concatenate((current_row[0][1:], pred)).reshape(1, -1)
        return forecast

    def grid_search(self, parameters: dict, n_folds: int, train_df: pd.DataFrame, test_size, n_jobs=1, verbose=0):
        """
            Grid Search for time series forecasting with XGBoost
        """
        grid = GridSearchCV(
            xgb, parameters, cv=n_folds, n_jobs=n_jobs, verbose=verbose
        )
        grid = XGBForecaster.fit(train_df, grid)
        predictions = XGBForecaster.forecast(train_df.iloc[-1, :], grid, test_size)
        return grid, predictions

    def preprocess(self, data: pd.DataFrame, experiment_name: str, frac: float):
        """
            Creates an experiment run for the model to be trained and preprocess the data

        Parameters
        ----------
        data: pd.DataFrame
            data to use for training.
        experiment_name: str
            name of the experiment for training the model; might refer to the commodity to forecast.
        frac: float
            percentage of data to hold out for testing the model.

        """
        # Create the experiment if it does not exist
        experiment = mlflow.get_experiment_by_name(experiment_name)
        if experiment is None:
            mlflow.create_experiment(experiment_name)
            experiment = mlflow.get_experiment_by_name(experiment_name)

        with mlflow.start_run(experiment_id=experiment.experiment_id):
            # logging information on input data
            data = Preprocessing.preprocessing(data)

            train, test = Preprocessing.train_test_split_df(
                data=data, n_test=round(len(data) * frac)
            )

            proc_training_data = Preprocessing.series_to_supervised(
                data=train, n_in=1, dropnan=True
            )
            proc_testing_data = Preprocessing.series_to_supervised(
                data=test, n_in=1, dropnan=False
            )

            mlflow.log_param(key="pct_data_for_training", value=(1 - frac))
            mlflow.log_param(key="pct_data_for_testing", value=(frac))

            return proc_training_data, proc_testing_data
        
    def train_model(
        self, experiment_name: str, train_data: pd.DataFrame, test_data: pd.DataFrame
    ) -> XGBRegressor:

        test_data.fillna(train_data.iloc[-1, -1])
        X_train = train_data.iloc[:, :-1].values
        X_test = test_data.iloc[:, :-1].values
        y_train = train_data.iloc[:, -1].values
        y_test = test_data.iloc[:, -1].values

        # n-folds
        effective_df_length = len(train_data) - len(test_data)
        max_folds = effective_df_length // len(test_data)
        n_folds = min(max_folds, 10)

        # Create the experiment if it does not exist
        client = MlflowClient()
        experiment = mlflow.get_experiment_by_name(experiment_name)
        experiment_id = experiment.experiment_id
        latest_run = client.search_runs(
            experiment_id, order_by=["start_time desc"], max_results=1
        )[0]
            
        # enable auto logging
        mlflow.xgboost.autolog()

        with mlflow.start_run(run_id=latest_run.info.run_id):
            # log the script
            #mlflow.log_artifact(__file__)

            # Get current commit hash
            commit_hash = (
                subprocess.check_output(["git", "rev-parse", "HEAD"])
                .strip()
                .decode("utf-8")
            )
            # Log Git commit hash as a parameter
            mlflow.log_param("commit_hash", commit_hash)

            #xgb = XGBRegressor()
            parameters_xgb = {
                "gamma": [0, 30, 100, 200],
                "eta": [0.3, 0.03, 0.003],
                "max_depth": [6, 12, 30],
            }
            xgb_grid, predictions_xgb = XGBForecaster.grid_search(
                parameters=parameters_xgb,
                n_folds=n_folds,
                train_df=train_data,
                test_size=len(test_data),
                n_jobs=-1,
                verbose=1,
            )
            mae = mean_absolute_error(y_test, predictions_xgb)
            mape = mean_absolute_percentage_error(y_test, predictions_xgb)

            # log metrics
            mlflow.log_metrics({"MAE": mae, "MAPE": mape})

            return xgb_grid

In [26]:
gasoline = pd.read_csv("../data/fuel_prices.csv", index_col=0)
gasoline = gasoline[["BENZINA"]]
gasoline.head()

Unnamed: 0_level_0,BENZINA
DATA_RILEVAZIONE,Unnamed: 1_level_1
2005-01-03,1115.75
2005-01-10,1088.0
2005-01-17,1088.14
2005-01-24,1090.01
2005-01-31,1132.11


In [40]:
experiment_name = "xgboost_predictor_gasoline"
frac = 0.2
xgb_forecaster = XGBForecaster()

training_data, testing_data= xgb_forecaster.preprocess(
    experiment_name=experiment_name, 
    data=gasoline,
    frac=frac
)

xgb_forecaster.train_model(
    experiment_name=experiment_name,
    train_data=training_data, 
    test_data=testing_data
)

TypeError: grid_search() missing 1 required positional argument: 'self'