In [1]:
import os
import io
import sys
import json
import time
import random
import logging

import numpy as np
import pandas as pd
import datetime

import plotly.io as pio
import streamlit as st
import seaborn as sns
import zipfile

import plotly.graph_objects as go
from sklearn.metrics import mean_squared_error

from utils.manager.login import *
from utils.inputs.validation import *
from utils.inputs.preprocess import *
from utils.inputs.ads import *
from utils.modeling.search import *
from utils.modeling.general import *
from utils.modeling.plot import *
from utils.analysis.tables import *
from utils.analysis.plot import *

# Set up the logging configuration for cmdstanpy
logger = logging.getLogger()

# Add NullHandler with CRITICAL log level
null_handler = logging.NullHandler()
null_handler.setLevel(logging.CRITICAL)
logger.addHandler(null_handler)

# Add StreamHandler with INFO log level
stream_handler = logging.StreamHandler()
stream_handler.setFormatter(logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s'))
stream_handler.setLevel(logging.INFO)
logger.addHandler(stream_handler)

logger.propagate = False

## Inputs

In [2]:
# Add dropdown for Country
country_name = "CA"

# Add dropdown for frequency
forecast_freq = "D"

# Add dropdown for data selection
data_selection = False

# Add dropdown for data selection
external_features = False

# Add file uploader to the sidebar
uploaded_file = 'Agency Services.csv'

In [3]:
if forecast_freq == "D":
    forecast_period = 92
elif forecast_freq == "B":
    forecast_period = 66
elif forecast_freq == "W":
    forecast_period = 26
elif forecast_freq == "M":
    forecast_period = 12

## Validation

In [4]:
try:
    # Validate the input file
    df = validate_input_file(uploaded_file, external_features)
    logging.info(f"Train Data Size: {df.shape}")
except Exception as e:
    # Log this exception or handle it further up the call stack
    raise Exception(f"An error occurred while validating the file: {str(e)}")

In [5]:
df

Unnamed: 0,ds,y
0,2018-01-01,0.0
1,2018-01-02,367.0
2,2018-01-03,391.0
3,2018-01-04,431.0
4,2018-01-05,395.0
...,...,...
360,2018-12-27,280.0
361,2018-12-28,278.0
362,2018-12-29,0.0
363,2018-12-30,0.0


## Processing

In [6]:
try:
    # Process the input file
    processed_df, forecast_df = process_input_file(df)
    logging.info(f"Train Data Size: {processed_df.shape}")
    logging.info(f"Forecast Data Size: {forecast_df.shape}")
except Exception as e:
    # Log this exception or handle it further up the call stack
    raise Exception(f"An error occurred while processing the file: {str(e)}")

-1


## Automated Data Selection

In [7]:
try:
    if data_selection:
        
        # Find optimal window 
        optimal_window_size = find_optimal_window(processed_df)
            
        logging.info(f"Optimal Window Size: {optimal_window_size}")

        # Add 180 days for feature engineering to optimal window
        optimal_window_size += 180

    else:
        optimal_window_size = len(processed_df)
        
except Exception as e:
    # Log this exception or handle it further up the call stack
    raise Exception(f"An error occurred while finding the optimal window: {str(e)}")
    

# Truncate the train set based on optimal window
optimal_df = processed_df[-optimal_window_size:].copy(deep=True)
    
logging.info(f"Optimal Train Data Size: {optimal_df.shape}")

# Find the min data for optimal train data
optimal_window_date = optimal_df['ds'].min()

In [8]:
optimal_window_size

365

In [9]:
optimal_df

Unnamed: 0,ds,y
0,2018-01-01,0.0
1,2018-01-02,367.0
2,2018-01-03,391.0
3,2018-01-04,431.0
4,2018-01-05,395.0
...,...,...
360,2018-12-27,280.0
361,2018-12-28,278.0
362,2018-12-29,0.0
363,2018-12-30,0.0


In [10]:
forecast_df.shape

(0, 2)

## Final Data Checks

In [11]:
# Set forecast start and end dates
min_forecast_date = optimal_df['ds'].min() + pd.Timedelta(days=1)
max_forecast_date = min_forecast_date + pd.Timedelta(days=forecast_period)
logging.info(f"Forecast Range: {min_forecast_date} to {max_forecast_date}")

try:
    # Validate column counts based on whether external features are used
    if external_features:
        assert optimal_df.shape[1] > 2 and forecast_df.shape[1] > 2
    else:
        assert optimal_df.shape[1] == 2 and forecast_df.shape[1] == 2
    # Ensure non-empty data structure
    assert optimal_df.shape[1] > 0
    # Ensure same number of columns
    assert optimal_df.shape[1] == forecast_df.shape[1]
except Exception as e:
    raise ValueError("Invalid input data format.")

try:
    # Check coverage of forecast period by data
    if external_features:
        assert forecast_df['ds'].max() > max_forecast_date
except Exception as e:
    raise Exception("Incomplete external variable coverage for forecast period.")

In [12]:
# Get the names of the exogenous variables from the train data
exog_cols = list((optimal_df.columns).difference(['y', 'ds']))

In [13]:
def resample_dataframe(df, forecast_freq='D'):
    """
    Resample and compute the mean for the dataframes based on a specified frequency.
    """
    
    df['ds'] = pd.to_datetime(df['ds'])
    df.set_index('ds', inplace=True)
    df = df.resample(forecast_freq).mean()
    
    return df.reset_index()

In [14]:
try:
    # Generate date features
    optimal_df = resample_dataframe(optimal_df, forecast_freq)
    forecast_df = resample_dataframe(forecast_df, forecast_freq)
except Exception as e:
    raise Exception(f"Failed to set the data frequency to {forecast_freq}: {e}")

In [15]:
import pandas as pd
import warnings
import holidays  # Ensure the holidays library is installed and imported

def generate_date_features(df: pd.DataFrame, freq='D', country_name=None) -> pd.DataFrame:
    """
    Add time-based features to a DataFrame based on its DateTime index, considering the frequency of data.
    """
    
    df['ds'] = pd.to_datetime(df['ds'])
    df.set_index('ds', inplace=True)
    
    if not isinstance(df.index, pd.DatetimeIndex):
        error_message = "DataFrame must have a DateTimeIndex"
        logger.error("DataFrame must have a DateTimeIndex")
        raise ValueError(error_message)

    with warnings.catch_warnings():
        warnings.simplefilter("ignore")  # Suppress warnings during feature generation

        # Generate features based on the specified frequency
        if freq in ['D', 'B']:
            # Features specific to daily data
            df['day_of_week'] = df.index.dayofweek + 1  # Monday=1, Sunday=7
            df['day_of_year'] = df.index.dayofyear
            df['is_weekend'] = df.index.dayofweek.isin([5, 6]).astype(int)

        if freq in ['D', 'B', 'W']:  # Weekly features include week_of_year
            df['week_of_year'] = df.index.isocalendar().week.astype(int)

        # Features applicable to all frequencies
        df['quarter'] = df.index.quarter
        df['month'] = df.index.month
        df['year'] = df.index.year

        # Calculate holidays if country_name is provided
        if country_name:
            country_holidays = holidays.CountryHoliday(country_name)
            if freq in ['D', 'B']:
                # Mark holidays for daily data
                df['is_holiday'] = df.index.map(lambda date: int(date in country_holidays))
            elif freq == 'W':
                # Count holidays in a week for weekly data
                df['is_holiday'] = df.index.map(lambda week_start: sum(
                    1 for day in pd.date_range(start=week_start - pd.Timedelta(days=6), end=week_start)
                    if day in country_holidays))
            elif freq == 'M':
                # Count holidays in a month for monthly data
                df['is_holiday'] = df.index.map(lambda month_start: sum(
                    1 for day in pd.date_range(start=month_start.replace(day=1), end=month_start)
                    if day in country_holidays))

    return df.reset_index()

In [16]:
try:
    # Generate date features
    optimal_df = generate_date_features(optimal_df, forecast_freq, country_name)
    forecast_df = generate_date_features(forecast_df, forecast_freq, country_name)
except Exception as e:
    raise ValueError(f"Failed to generate features using 'ds': {e}")

In [17]:
# Get the names of the exogenous variables from the train data
exog_cols_all = list((optimal_df.columns).difference(['y', 'ds']))

In [18]:
def determine_params(forecast_freq):
    """
    Determines lag window and test step size based on the frequency and weekend inclusion.
    """
    # Define settings for different scenarios using dictionaries
    # initial_window_size, lag_window_range, rolling_window_range, test_size, test_steps
    freq_settings = {
        "D": (90, [7, 15, 30, 60, 90], [3, 7, 15, 30, 60, 90], 30, 3),
        "B": (60, [5, 10, 20, 40, 60], [3, 5, 10, 20, 40, 60], 20, 2),
        "W": (12, [4, 8, 12], [4, 8, 12, 16, 20, 24], 6, 1),
        "M": (3, [3], [3, 6, 9, 12], 3, 1)
    }
    
    try:
        # Select the appropriate settings based on forecast frequency and weekend drop
        if forecast_freq in freq_settings:
            if isinstance(freq_settings[forecast_freq], dict):
                # Handle daily frequency differently based on weekend inclusion
                return freq_settings[forecast_freq][weekend_drop]
            else:
                return freq_settings[forecast_freq]
        else:
            raise ValueError(f"Unknown Frequency: {forecast_freq}")
    except Exception as e:
        error_message = f"Failed to determine lag window and test set size: {e}"
        logger.error(error_message)
        raise Exception(error_message)

In [19]:
try:
    initial_window_size, lag_window_range, rolling_window_range, test_size, test_steps = determine_params(forecast_freq)
    logger.info(f"Initial Window Size: {initial_window_size}, Lag Window Range: {lag_window_range}")
    logger.info(f"Test Size: {test_size}, Test Steps: {test_steps}")
except Exception as e:
    raise Exception(e)

In [20]:
try:
    test_df = optimal_df[-test_size:].copy(deep=True)
    train_df = optimal_df[:-test_size].copy(deep=True)
    assert len(train_df) + len(test_df) == len(optimal_df)
except Exception as e:
    raise ValueError(f"Failed to split into train and test: {e}")

In [21]:
train_df['ds'] = pd.to_datetime(train_df['ds']) 
train_df.set_index('ds', inplace=True)

In [22]:
if 'is_holiday' in train_df:
    train_holiday_mask = train_df['is_holiday'].values
if 'is_holiday' in test_df:
    test_holiday_mask = test_df['is_holiday'].values

In [23]:
import os
import yaml
from sklearn.base import BaseEstimator
from sklearn.ensemble import RandomForestRegressor
from xgboost import XGBRegressor
from sktime.forecasting.fbprophet import Prophet
from sktime.forecasting.naive import NaiveForecaster, NaiveVariance
from skforecast.ForecasterAutoregCustom import ForecasterAutoregCustom
from skforecast.model_selection import grid_search_forecaster
from typing import Optional, Tuple, List, Dict, Any, Callable, Union

def load_model_params_and_create_instance(model_type, current_dir):
    """
    Load model parameters from a YAML file and create a model instance based on model type.
    """
    # Dictionary to map model types to their respective classes and YAML files
    model_config = {
        'random_forest': (RandomForestRegressor, 'random_forest.yaml'),
        'xgboost': (XGBRegressor, 'xgboost.yaml'),
        'prophet': (Prophet, 'prophet.yaml'),
        'naive': (NaiveForecaster, 'naive.yaml')
    }
    
    # Ensure the model type is supported
    if model_type not in model_config:
        raise ValueError(f"Unsupported model type: {model_type}")
    
    model_class, yaml_file = model_config[model_type]
    file_path = os.path.join(current_dir, 'params', yaml_file)
    
    # Load parameters from the YAML file, handling file and parsing errors
    try:
        with open(file_path, 'r') as file:
            param_grid = yaml.safe_load(file)
    except FileNotFoundError:
        raise FileNotFoundError(f"The configuration file {yaml_file} was not found in {file_path}")
    except yaml.YAMLError as e:
        raise Exception(f"Error parsing the YAML file: {e}")

    return model_class(), param_grid

In [24]:
def _get_best_model_parameters_and_metrics(search_results: pd.DataFrame, metric_key: str) -> Tuple[Dict, float, int]:
    """
    Get the parameters and metrics for the best model.
    """
    best_params = search_results.iloc[0]['params']
    best_score = search_results.iloc[0][metric_key]
    lag_window = search_results.iloc[0]['lag_window']
    return best_params, best_score, lag_window

def _filter_holidays(y_true: pd.Series, y_pred: np.ndarray, country: str) -> Tuple[pd.Series, np.ndarray]:
    """
    Filters out holidays from the true and predicted series based on the specified country.
    """
    country_holidays = holidays.CountryHoliday(country)
    holiday_mask = y_true.index.map(lambda date: int(date in country_holidays))
    holiday_mask = (holiday_mask == 1)

    y_true = pd.Series([act for act, mask in zip(y_true, holiday_mask) if not mask])
    y_pred = np.array([pred for pred, mask in zip(y_pred, holiday_mask) if not mask])

    return y_true, y_pred

def _custom_mape(y_true: pd.Series, y_pred: np.ndarray) -> float:
    """
    Custom MAPE calculation considering country holidays.
    """
    # Filter out holidays
    if country:
        y_true, y_pred = _filter_holidays(y_true, y_pred, country)

    # Calculate errors
    return mean_absolute_percentage_error(y_true, y_pred)

def _custom_mspe(y_true: pd.Series, y_pred: np.ndarray) -> float:
    """
    Custom MSPE calculation considering country holidays.
    """
    # Filter out holidays
    if country:
        y_true, y_pred = _filter_holidays(y_true, y_pred, country)

    # Calculate errors
    return mean_squared_percentage_error(y_true, y_pred)

def _custom_predictors(y: pd.Series) -> np.ndarray:
    """
    Function to create custom predictors for a time series.
    """
    predictors = []

    # Calculate rolling statistics for specific rolling window sizes
    for rolling_window in rolling_window_range:
        predictors.extend([np.mean(y[-rolling_window:]), 
                           np.std(y[-rolling_window:]), 
                           np.min(y[-rolling_window:]), 
                           np.max(y[-rolling_window:])])

    # Create lags
    predictors.extend(y[-1:-lag_window-1:-1])

    # Combine all predictors into one array
    return np.hstack(predictors)

def _custom_weights(index: pd.DatetimeIndex, country: str=None) -> np.ndarray:
    """
    Return a list of weights for each index in the DataFrame.
    """
    print(len(index))
    print(index.values)
    
    
    # Start with all weights as 1
    weights = np.ones(len(index))
    
    print(index)

    # Find indices of weekends
    weekend_indices = index.dayofweek.isin([5, 6])

    # Check for holidays if country is specified
    if country:
        country_holidays = holidays.CountryHoliday(country)
        holiday_indices = index.map(lambda date: date in country_holidays)
        weights[holiday_indices] = 10

    # Set weights to 2 if either a weekend or a holiday
    weights[weekend_indices] = 5

    return weight

In [25]:

def _create_forecaster(model: BaseEstimator, y: pd.Series, exog_cols: pd.DataFrame, 
                       param_grid: Dict, lag_window: int, test_steps: int, test_size: int) -> pd.DataFrame:
    """
    Create a forecaster and perform a grid search to find the best model.
    """
    
    forecaster = ForecasterAutoregCustom(regressor=model,
                                         fun_predictors=_custom_predictors,
                                         window_size=initial_window_size,
                                         weight_func=_custom_weights)
    
    return grid_search_forecaster(forecaster=forecaster,
                                  y=y,
                                  exog=exog_cols,
                                  param_grid=param_grid,
                                  steps=test_steps,
                                  fixed_train_size=False,
                                  refit=True,
                                  metric=[_custom_mape, _custom_mspe],
                                  initial_train_size=len(y)-test_size,
                                  return_best=False,
                                  verbose=False)

In [26]:
initial_window_size, lag_window_range, rolling_window_range, test_size, test_steps

(90, [7, 15, 30, 60, 90], [3, 7, 15, 30, 60, 90], 30, 3)

In [27]:
test_steps = 15

In [28]:
try:
    current_dir = 'utils/modeling'
    model, param_grid = load_model_params_and_create_instance('random_forest', current_dir)
    # model_xgb = load_model_params_and_create_instance('xgboost', current_dir)
    # model_prophet = load_model_params_and_create_instance('prophet', current_dir)
    # model_naive = load_model_params_and_create_instance('naive', current_dir)
except Exception as e:
    raise Exception(f"Error initializing models: {e}")

In [29]:
train_df['y'] = train_df['y'] + 1

In [30]:
def grid_search_skforecast(lag_window_range: List[int], 
                           model: BaseEstimator, 
                           train_data: pd.DataFrame, 
                           exog_cols: List[str], 
                           param_grid: Dict) -> Tuple[pd.DataFrame, Dict, ForecasterAutoregCustom]:
    """
    Function to perform grid search over different window sizes.
    """
    window_size = 90
    rolling_window_range = [7, 15, 30, 60, 90]
    validation_steps = 15
    validation_size = 60
    
    def _load_params() -> Dict:
        """
        Load parameters from a yaml file.
        """
        current_dir = 'utils/modeling'
        file_path = os.path.join(current_dir, 'params', 'grid_search.yaml')
        with open(file_path, 'r') as f:
            return yaml.safe_load(f)

    def _create_forecaster(model: BaseEstimator, y: pd.Series, exog_cols: pd.DataFrame, 
                           param_grid: Dict, lag_window: int, validation_steps: int, validation_size: int) -> pd.DataFrame:
        """
        Create a forecaster and perform a grid search to find the best model.
        """
        
        forecaster = ForecasterAutoregCustom(regressor=model,
                                             fun_predictors=_custom_predictors,
                                             window_size=window_size,
                                             weight_func=_custom_weights)

        return grid_search_forecaster(forecaster=forecaster,
                                      y=y,
                                      exog=exog_cols,
                                      param_grid=param_grid,
                                      steps=validation_steps,
                                      fixed_train_size=False,
                                      refit=True,
                                      metric=[_custom_mape, _custom_mspe],
                                      initial_train_size=len(y)-validation_size,
                                      return_best=False,
                                      verbose=False)


    def _get_best_model_parameters_and_metrics(search_results: pd.DataFrame, metric_key: str) -> Tuple[Dict, float, int]:
        """
        Get the parameters and metrics for the best model.
        """
        best_params = search_results.iloc[0]['params']
        best_score = search_results.iloc[0][metric_key]
        lag_window = search_results.iloc[0]['lag_window']
        return best_params, best_score, lag_window
    
    def _filter_holidays(y_true: pd.Series, y_pred: np.ndarray, country: str) -> Tuple[pd.Series, np.ndarray]:
        """
        Filters out holidays from the true and predicted series based on the specified country.
        """
        country_holidays = holidays.CountryHoliday(country)
        holiday_mask = y_true.index.map(lambda date: int(date in country_holidays))
        holiday_mask = (holiday_mask == 1)

        y_true = pd.Series([act for act, mask in zip(y_true, holiday_mask) if not mask])
        y_pred = np.array([pred for pred, mask in zip(y_pred, holiday_mask) if not mask])

        return y_true, y_pred
    
    def _custom_mape(y_true: pd.Series, y_pred: np.ndarray) -> float:
        """
        Custom MAPE calculation considering country holidays.
        """
        # Filter out holidays
        if country:
            y_true, y_pred = _filter_holidays(y_true, y_pred, country)

        # Calculate errors
        return mean_absolute_percentage_error(y_true, y_pred)

    def _custom_mspe(y_true: pd.Series, y_pred: np.ndarray) -> float:
        """
        Custom MSPE calculation considering country holidays.
        """
        # Filter out holidays
        if country:
            y_true, y_pred = _filter_holidays(y_true, y_pred, country)

        # Calculate errors
        return mean_squared_percentage_error(y_true, y_pred)

    def _custom_predictors(y: pd.Series) -> np.ndarray:
        """
        Function to create custom predictors for a time series.
        """
        predictors = []

        # Calculate rolling statistics for specific rolling window sizes
        for rolling_window in rolling_window_range:
            predictors.extend([np.mean(y[-rolling_window:]), 
                               np.std(y[-rolling_window:]), 
                               np.min(y[-rolling_window:]), 
                               np.max(y[-rolling_window:])])

        # Create lags
        predictors.extend(y[-1:-lag_window-1:-1])

        # Combine all predictors into one array
        return np.hstack(predictors)

    def _custom_weights(index: pd.DatetimeIndex, country: str=None) -> np.ndarray:
        """
        Return a list of weights for each index in the DataFrame.
        """
        # Start with all weights as 1
        weights = np.ones(len(index))

        # Find indices of weekends
        weekend_indices = index.dayofweek.isin([5, 6])

        # Check for holidays if country is specified
        if country:
            country_holidays = holidays.CountryHoliday(country)
            holiday_indices = index.map(lambda date: date in country_holidays)
            weights[holiday_indices] = holiday_weight

        # Set weights to 2 if either a weekend or a holiday
        weights[weekend_indices] = weekend_weight

        return weights
    
    # Load parameters from yaml file
    params = _load_params()
    
    metric_key = '_custom_mspe'
    weekend_weight = params['weekend_weight']
    holiday_weight = params['holiday_weight']

    # Grid search over different window sizes
    total_iterations = len(lag_window_range)
    pbar = tqdm(total=total_iterations, desc="Grid Search Progress")

    # Perform grid search for each lag window and compile results
    results_list = []
    for lag_window in lag_window_range:
        results_grid = _create_forecaster(model, train_data['y'], train_data[exog_cols], 
                                          param_grid, lag_window, validation_steps, validation_size)
        results_grid['lag_window'] = lag_window
        results_list.append(results_grid)
        pbar.update()

    pbar.close()

    # Get the best model parameters and metrics
    search_results = pd.concat(results_list, ignore_index=True).sort_values(metric_key)
    best_params, best_score, lag_window = _get_best_model_parameters_and_metrics(search_results, metric_key)
    best_dict = {"best_params": best_params, "best_score": best_score, "lag_window": lag_window}

    # Instantiate and train the best model
    forecaster = ForecasterAutoregCustom(regressor=model.set_params(**best_params),
                                         fun_predictors=_custom_predictors,
                                         window_size=int(window_size))

    return search_results, best_dict, forecaster

In [31]:
grid_search_skforecast(
        lag_window_range, model, train_df, exog_cols_all, param_grid
    )

Grid Search Progress:   0%|                               | 0/5 [00:00<?, ?it/s]

Number of models compared: 1.


lags grid:   0%|          | 0/1 [00:00<?, ?it/s]

params grid:   0%|          | 0/1 [00:00<?, ?it/s]



AttributeError: 'Index' object has no attribute 'dayofweek'

In [None]:
# Grid search over different window sizes
total_iterations = len(lag_window_range)

# Perform grid search for each lag window and compile results
results_list = []
for lag_window in lag_window_range:
    results_grid = _create_forecaster(model, train_df['y'], train_df[exog_cols_all], 
                                      param_grid, lag_window, test_steps, test_size)
    results_grid['lag_window'] = lag_window
    results_list.append(results_grid)
    pbar.update()

In [None]:



def _get_best_model_parameters_and_metrics(search_results: pd.DataFrame, metric_key: str) -> Tuple[Dict, float, int]:
    """
    Get the parameters and metrics for the best model.
    """
    best_params = search_results.iloc[0]['params']
    best_score = search_results.iloc[0][metric_key]
    lag_window = search_results.iloc[0]['lag_window']
    return best_params, best_score, lag_window








# Load parameters from yaml file
params = _load_params()

metric_key = '_custom_mspe'
weekend_weight = params['weekend_weight']
holiday_weight = params['holiday_weight']

# Determine the country based on session state
country = _get_country()

# Grid search over different window sizes
total_iterations = len(lag_window_range)
pbar = tqdm(total=total_iterations, desc="Grid Search Progress")

# Perform grid search for each lag window and compile results
results_list = []
for lag_window in lag_window_range:
    results_grid = _create_forecaster(model, train_data['y'], train_data[exog_cols], 
                                      param_grid, lag_window, validation_steps, validation_size)
    results_grid['lag_window'] = lag_window
    results_list.append(results_grid)
    pbar.update()

pbar.close()

# Get the best model parameters and metrics
search_results = pd.concat(results_list, ignore_index=True).sort_values(metric_key)
best_params, best_score, lag_window = _get_best_model_parameters_and_metrics(search_results, metric_key)
best_dict = {"best_params": best_params, "best_score": best_score, "lag_window": lag_window}

# Instantiate and train the best model
forecaster = ForecasterAutoregCustom(regressor=model.set_params(**best_params),
                                     fun_predictors=_custom_predictors,
                                     window_size=int(window_size))

return search_results, best_dict, forecaster


In [None]:
# Load parameters from the xgboost.yaml file
    file_path = os.path.join(current_dir, 'params', 'xgboost.yaml')
    with open(file_path, 'r') as file:
        param_grid_xgb = yaml.safe_load(file)

    # Create an instance of the XGBoost regressor
    model = XGBRegressor()

    # Perform grid search on the XGBoost regressor
    search_results_xgb, best_dict_xgb, forecaster_xgb = grid_search_skforecast(
        lag_window_range, model, train_data, exog_cols, param_grid_xgb
    )

    logger.info(f"XGBOOST - Best score: {best_dict_xgb['best_score']}, Best params: {best_dict_xgb['best_params']}")

    # Store the best estimator and model parameters in the session state
    st.session_state.xgb_best_estimator = forecaster_xgb
    st.session_state.xgb_best_model_params = best_dict_xgb

    # Fit the best model on the train data and compute error metrics on the test data
    best_model_xgb = forecaster_xgb
    best_model_xgb.fit(y=train_data['y'], exog=train_data[exog_cols])

    # Generate prediction intervals for the test data
    predictions_xgb = forecaster_xgb.predict_interval(
        steps=test_steps, exog=test_data[exog_cols], interval=[pi_lower, pi_upper], n_boot=pi_n_boots
    )

    # Compute error metrics and prediction coverage for the test data
    st.session_state.xgb_test_metrics = compute_error_metrics(
        actual, predictions_xgb['pred'], train_data['y'], holiday_mask
    )
    st.session_state.xgb_test_coverage = compute_prediction_coverage(test_data, predictions_xgb, holiday_mask)

    # Fit the best model on the filtered train data
    best_model_xgb = forecaster_xgb
    best_model_xgb.fit(y=train_filtered['y'], exog=train_filtered[exog_cols])

    # Store the best model in the session state
    st.session_state.xgb_best_model = best_model_xgb
    
    # Store the feature importance of the best model
    st.session_state.xgb_best_model_fi = best_model_xgb.get_feature_importances()
    
    # Update the progress bar with the time taken
    end_time = time.time()
    remaining_time = round(6*(end_time - start_time)/60, 2)
    start_time = time.time()
    st.session_state.search_time = round((end_time - init_time)/60, 2)

    model_search_bar.progress(1/5, text=f"⌛ 1/5 Completed! - Estimated Time Remaining: {remaining_time} minutes")

    #####################################################################################

In [None]:
# Example usage
y = np.array([100, 150, 200, 250, 300, 350, 400])
rolling_window_range = [3, 5]  # Different window sizes for rolling calculations
lag_window = 3  # Number of lagged values to include
predictors = _custom_predictors(y)
print(predictors)