import packages

In [1]:
import mlflow
from forecastflowml.meta_model import MetaModel
from forecastflowml.data.loader import load_walmart_m5
from pyspark.sql import SparkSession

  from .autonotebook import tqdm as notebook_tqdm


create spark session

In [2]:
spark = (
    SparkSession.builder.master("local[*]")
    .config("spark.driver.memory", "16g")
    .config("spark.sql.execution.arrow.enabled", "true")
    .config('spark.sql.adaptive.enabled', 'false')
    .getOrCreate()
)

load sample data from forecasterflow

In [3]:
df_train, df_test = load_walmart_m5(spark)

examine lag features include pattern of lag_{i}

model will filter lags (using regex pattern of lag_{i}) based on the model forecast horizon

example: if model horizon is [1, 2, 3] and feature_lag_range = 2, then lag_3, lag_4 and lag_5 will be used as features.

In [4]:
print(df_train.columns)

['d', 'id', 'item_id', 'dept_id', 'cat_id', 'store_id', 'state_id', 'sales', 'date', 'item_store_window_1_lag_7_mean', 'item_store_window_1_lag_8_mean', 'item_store_window_1_lag_9_mean', 'item_store_window_1_lag_14_mean', 'item_store_window_1_lag_15_mean', 'item_store_window_1_lag_16_mean', 'item_store_window_1_lag_21_mean', 'item_store_window_1_lag_22_mean', 'item_store_window_1_lag_23_mean', 'item_store_window_1_lag_28_mean', 'item_store_window_1_lag_29_mean', 'item_store_window_1_lag_30_mean', 'item_store_window_7_lag_7_mean', 'item_store_window_7_lag_14_mean', 'item_store_window_7_lag_21_mean', 'item_store_window_7_lag_28_mean', 'item_store_window_15_lag_7_mean', 'item_store_window_15_lag_14_mean', 'item_store_window_15_lag_21_mean', 'item_store_window_15_lag_28_mean', 'item_store_window_30_lag_7_mean', 'item_store_window_30_lag_14_mean', 'item_store_window_30_lag_21_mean', 'item_store_window_30_lag_28_mean', 'item_store_window_90_lag_7_mean', 'item_store_window_90_lag_14_mean', 'i

define optuna hyperparameter space

In [5]:
def hyperparam_space_fn(trial):
    return {
        "learning_rate": trial.suggest_float("learning_rate", 0.2, 0.3),
        "num_leaves": trial.suggest_int("num_leaves", 30, 40),
    }

initialize the model

In [6]:
model = MetaModel(
    # dataset parameters
    group_col="cat_id",  # column to slice dataframe
    id_cols=["id"],  # columns to use as time series identifier
    date_col="date",  # date column
    target_col="sales",  # target column
    date_frequency="days",  # date frequency of dataset

    # model parameters
    model_horizon=7,  # horizon per model
    max_forecast_horizon=7*4,  # total forecast horizon
    lag_feature_range=2,  # 

    # cross validation and optimisation parameters
    n_cv_splits=3,  # number of time-based cv splits
    cv_step_length=7*4,  # number of dates between each cv folds
    max_hyperparam_evals=2,  # total number of optuna trials
    scoring="neg_mean_squared_error",  # sklearn scoring metric
    hyperparam_space_fn=hyperparam_space_fn,  # optuna hyperparameter space

    # mlflow parameters
    tracking_uri="./mlruns",  # Mlflow tracking URI
)

train model

In [7]:
model.train(df_train)



load model

In [None]:
model = mlflow.pyfunc.load_model(f"runs:/{model.run_id}/meta_model")

predict and save results

In [None]:
model.predict(df_test).write.parquet("forecast")