# Many Models Forecasting (MMF)
This notebook showcases how to run MMF with local models on multiple univariate time series of daily resolution. We will use [M4 competition](https://www.sciencedirect.com/science/article/pii/S0169207019301128#sec5) data.

### Cluster setup

We recommend using a cluster with [Databricks Runtime 16.4 LTS for ML](https://docs.databricks.com/en/release-notes/runtime/16.4lts-ml.html). The cluster can be either a single-node or multi-node CPU cluster. MMF leverages [Pandas UDF](https://docs.databricks.com/en/udf/pandas.html) under the hood and utilizes all the available resource. Make sure to set the following Spark configurations before you start your cluster: [`spark.sql.execution.arrow.enabled true`](https://spark.apache.org/docs/3.0.1/sql-pyspark-pandas-with-arrow.html#enabling-for-conversion-tofrom-pandas) and [`spark.sql.adaptive.enabled false`](https://spark.apache.org/docs/latest/sql-performance-tuning.html#adaptive-query-execution). You can do this by specifying [Spark configuration](https://docs.databricks.com/en/compute/configure.html#spark-configuration) in the advanced options on the cluster creation page.

### Install and import packages
Check out [requirements.txt](https://github.com/databricks-industry-solutions/many-model-forecasting/blob/main/requirements.txt) if you're interested in the libraries we use.

In [0]:
%pip install -r ../../requirements.txt --quiet
dbutils.library.restartPython()

In [0]:
import logging
logging.getLogger("py4j.java_gateway").setLevel(logging.ERROR)
logging.getLogger("py4j.clientserver").setLevel(logging.ERROR)

In [0]:
import pathlib
import pandas as pd
from datasetsforecast.m4 import M4
from mmf_sa import run_forecast

### Prepare data 
We are using [`datasetsforecast`](https://github.com/Nixtla/datasetsforecast/tree/main/) package to download M4 data. M4 dataset contains a set of time series which we use for testing MMF. Below we have written a number of custome functions to convert M4 time series to an expected format.

In [0]:
# Number of time series
n = 100


def create_m4_daily():
    y_df, _, _ = M4.load(directory=str(pathlib.Path.home()), group="Daily")
    _ids = [f"D{i}" for i in range(1, n)]
    y_df = (
        y_df.groupby("unique_id")
        .filter(lambda x: x.unique_id.iloc[0] in _ids)
        .groupby("unique_id")
        .apply(transform_group)
        .reset_index(drop=True)
    )
    return y_df


def transform_group(df):
    unique_id = df.unique_id.iloc[0]
    if len(df) > 1020:
        df = df.iloc[-1020:]
    _start = pd.Timestamp("2020-01-01")
    _end = _start + pd.DateOffset(days=int(df.count()[0]) - 1)
    date_idx = pd.date_range(start=_start, end=_end, freq="D", name="ds")
    res_df = pd.DataFrame(data=[], index=date_idx).reset_index()
    res_df["unique_id"] = unique_id
    res_df["y"] = df.y.values
    return res_df

We are going to save this data in a delta lake table. Provide catalog and database names where you want to store the data.

In [0]:
catalog = "mmf" # Name of the catalog we use to manage our assets
db = "m4" # Name of the schema we use to manage our assets (e.g. datasets)
user = spark.sql('select current_user() as user').collect()[0]['user'] # User email address

In [0]:
# Making sure that the catalog and the schema exist
_ = spark.sql(f"CREATE CATALOG IF NOT EXISTS {catalog}")
_ = spark.sql(f"CREATE SCHEMA IF NOT EXISTS {catalog}.{db}")

(
    spark.createDataFrame(create_m4_daily())
    .write.format("delta").mode("overwrite")
    .saveAsTable(f"{catalog}.{db}.m4_daily_train")
)

Let's take a peak at the dataset:

In [0]:
display(
  spark.sql(f"select * from {catalog}.{db}.m4_daily_train where unique_id in ('D1', 'D2', 'D3', 'D4', 'D5') order by unique_id, ds")
  )

If the number of time series is larger than the number of total cores, we set `spark.sql.shuffle.partitions` to the number of cores (can also be a multiple) so that we don't under-utilize the resource.

In [0]:
# Get the current value of shuffle partitions
current = spark.conf.get("spark.sql.shuffle.partitions")

# If not set to 'auto' (serverless), convert to int; otherwise, use default 200
if current != "auto":
    current_val = int(current)
else:
    current_val = 200       

# If n is greater than the current value, update the shuffle partitions setting
if n > current_val:            
    spark.conf.set("spark.sql.shuffle.partitions", str(n))

### Models
Let's configure a list of models we are going to apply to our time series for evaluation and forecasting. A comprehensive list of all supported models is available in [mmf_sa/models/README.md](https://github.com/databricks-industry-solutions/many-model-forecasting/blob/main/mmf_sa/models/README.md). Look for the models where `model_type: local`; these are the local models we import from [statsforecast](https://github.com/Nixtla/statsforecast) and [sktime](https://github.com/sktime/sktime). Check their documentations for the detailed description of each model. 

Some of these models perform hyperparameter optimization ([statsforecast Automatic Forecasting](https://nixtlaverse.nixtla.io/statsforecast/index.html#automatic-forecasting)) on its own for some hyperparameters. For other hyperparameters or models, you can modify the hyperparameters in [mmf_sa/models/models_conf.yaml](https://github.com/databricks-industry-solutions/many-model-forecasting/blob/main/mmf_sa/models/models_conf.yaml) or overwrite the default values in [mmf_sa/forecasting_conf_daily.yaml](https://github.com/databricks-industry-solutions/many-model-forecasting/blob/main/mmf_sa/forecasting_conf_daily.yaml). You can also introduce new hyperparameters that are supported by the base models. To do this, first add those hyperparameters under the model specification in [mmf_sa/models/models_conf.yaml](https://github.com/databricks-industry-solutions/many-model-forecasting/blob/main/mmf_sa/models/models_conf.yaml). Then, include these hyperparameters inside the model instantiation which happens in the model pipeline script: e.g. `StatsFcAutoArima` class in [mmf_sa/models/statsforecast/StatsFcForecastingPipeline.py](https://github.com/databricks-industry-solutions/many-model-forecasting/blob/main/mmf_sa/models/statsforecast/StatsFcForecastingPipeline.py).

In [0]:
active_models = [
    "StatsForecastBaselineWindowAverage",
    "StatsForecastBaselineSeasonalWindowAverage",
    "StatsForecastBaselineNaive",
    "StatsForecastBaselineSeasonalNaive",
    "StatsForecastAutoArima",
    "StatsForecastAutoETS",
    "StatsForecastAutoCES",
    "StatsForecastAutoTheta",
    "StatsForecastAutoTbats",
    "StatsForecastAutoMfles",
    "StatsForecastTSB",
    "StatsForecastADIDA",
    "StatsForecastIMAPA",
    "StatsForecastCrostonClassic",
    "StatsForecastCrostonOptimized",
    "StatsForecastCrostonSBA",
    "SKTimeProphet",
    ]

### Run MMF

Now, we can run the evaluation and forecasting using `run_forecast` function defined in [mmf_sa/models/__init__.py](https://github.com/databricks-industry-solutions/many-model-forecasting/blob/main/mmf_sa/models/__init__.py).

Refer to [README.md](https://github.com/databricks-industry-solutions/many-model-forecasting/blob/main/README.md#parameters-description) for a comprehensive explanation of each parameter. Note that we are not providing any covariate field (i.e. `static_features`, `dynamic_future` or `dynamic_historical`) yet in this example. We will look into how we can add exogenous regressors to help our models in a different notebook: [examples/external_regressors/local_univariate_external_regressors_daily.ipynb](https://github.com/databricks-industry-solutions/many-model-forecasting/blob/main/examples/external_regressors/local_univariate_external_regressors_daily.ipynb).

While the following cell is running, you can check the status of your run on Experiments. Make sure you look for the experiments with the path you provided as `experiment_path` within `run_forecast`. On the Experiments page, you see one entry per one model (i.e. StatsForecastAutoArima). The metric provided here is a simple average over all back testing trials and all time series. This is intended to give you an initial feeling of how good each model performs on your entire data mix. But we will look into how you can scrutinize the evaluation using the `evaluation_output` table in a bit. 

If you are interested in how Pandas UDF achieves parallel fitting and forecasting of multiple time series by distributing them across multiple executors, have a look at the two methods `evaluate_local_model` and `score_local_model` defined in the source code [`Forecaster.py`](https://github.com/databricks-industry-solutions/many-model-forecasting/blob/main/mmf_sa/Forecaster.py).

In [0]:
run_forecast(
    spark=spark,
    train_data=f"{catalog}.{db}.m4_daily_train",
    scoring_data=f"{catalog}.{db}.m4_daily_train",
    scoring_output=f"{catalog}.{db}.daily_scoring_output",
    evaluation_output=f"{catalog}.{db}.daily_evaluation_output",
    group_id="unique_id",
    date_col="ds",
    target="y",
    freq="D",
    prediction_length=10,
    backtest_length=30,
    stride=10,
    metric="smape",
    train_predict_ratio=1,
    data_quality_check=True,
    resample=False,
    active_models=active_models,
    experiment_path=f"/Users/{user}/mmf/m4_daily",
    use_case_name="m4_daily",
)

### Evaluate
In `evaluation_output` table, the we store all evaluation results for all backtesting trials from all models. This information can be used to understand which models performed well on which time series on which periods of backtesting. This is very important for selecting the final model for forecasting or models for ensembling. Maybe, it's faster to take a look at the table:

In [0]:
display(
  spark.sql(f"""
    select * from {catalog}.{db}.daily_evaluation_output 
    where unique_id = 'D1'
    order by unique_id, model, backtest_window_start_date
    """))

For each backtesting trial, we train the model on the training dataset only up to `backtesting_window_start_date`. We then use that fitted model to generate the forecasts for that specific horizon. We then move `backtesting_window_start_date` forward by `stride` and do the same exercise. This continues until `backtesting_window_start_date` reaches the last day of the time series given in the training dataset. See how MMF implements backtesting in `backtest` method in [mmf_sa/models/abstract_model.py](https://github.com/databricks-industry-solutions/many-model-forecasting/blob/main/mmf_sa/models/abstract_model.py).
> 
We store the **as-if** forecasts together with the actuals for each backtesting period, so you can construct any metric of your interest. We provide a few out-of-the-box metrics for you (e.g. smape), but the idea here is that you construct your own metrics reflecting your business requirements and evaluate models based on those. For example, maybe you care more about the accuracy of the near-horizon forecasts than the far-horizon ones. In such case, you can apply a decreasing wieght to compute weighted aggregated metrics.

Note that if you run other global and foundation models against the same time series with the same input parameters (except for those specifying global and foundation models), you will get the entries from those models in the same table and will be able to compare across all types models, which is the biggest benefit of having all models integrated in one solution.

We also store each model in a binary format in this table (`model_pickle`). You can unpickle the models and access their specifications or produce forecasts. 

### Forecast
In `scoring_output` table, forecasts for each time series from each model are stored. Based on the evaluation exercised performed on `evaluation_output` table, you can select the forecasts from the best performing models or a mix of models. We are again storing each model in a binary format in this table.

In [0]:
display(spark.sql(f"""
                  select * from {catalog}.{db}.daily_scoring_output 
                  where unique_id = 'D1'
                  order by unique_id, model, ds
                  """))

Refer to the [notebook](https://github.com/databricks-industry-solutions/many-model-forecasting/blob/main/examples/post-evaluation-analysis.ipynb) for guidance on performing fine-grained model selection after running `run_forecast`.

### Delete Tables
Let's clean up the tables.

In [0]:
#display(spark.sql(f"delete from {catalog}.{db}.daily_evaluation_output"))

In [0]:
#display(spark.sql(f"delete from {catalog}.{db}.daily_scoring_output"))