## Cash Liquidity Forecast
For the Data Product Cash Flow we want to expand the data product by calculating for upcoming periods the cash flow. This notebook shows an example workflow for the enrichment of the CashFlow data product which is going to be exposed back to SAP Datasphere in Business Data Cloud (BDC).
This involves in total the following steps for the overall prediction:
- Consume exposed data product over the Delta Share
- Prepare data for time series forecasting
- Perform hyperparameter optimization for time series prediction with model selection
- Log best model to MLflow

### Install packages
All necessary packages for this notebook are going to be outlined in the following notebook cell. In order to make sure that the results are reproducible, the following packages are going to be installed:
- mlflow: Tracking of our ML model
- AutoTS: Time Series package that performs Hyper Parameter Tuning over multiple time series models

In [0]:
%pip install mlflow
%pip install autots['additional']
%restart_python

### Import packages

In [0]:
import pyspark
from pyspark.sql import SparkSession
import pandas as pd
from autots import AutoTS
from delta import *
import mlflow
from mlflow.models import infer_signature
from mlflow.client import MlflowClient
import os
import pickle
from pathlib import Path
import re
from functools import reduce

### Setup Spark Session and consume prepared data product from feature store

In [0]:
%sql
CREATE CATALOG IF NOT EXISTS <CATALOG_NAME>;
SET CATALOG <CATALOG_NAME>;
CREATE SCHEMA IF NOT EXISTS <SCHEMA_NAME>;
USE SCHEMA <SCHEMA_NAME>;

### Feature store table
We read from our feature store the stored table `prepared_cash_flow_time_series` generated by our Data Preparation scripts.

In [0]:
builder = SparkSession.builder.appName("Time Series Training").getOrCreate()
data = spark.read.table("prepared_cash_flow_time_series")

In [0]:
# convert dataframe to time series data with correct data types
# ds: datetime datetype, y: float
time_series_data = data.toPandas().astype({"ds": "datetime64[ns]", "y": float})

### Time Series Forecasting
We train our time series algorithm using the [AutoTS library](https://github.com/winedarksea/autots). It provides a possibility to run multiple models in parallel to run and validate different data preparation steps as well as different Time Series algorithms in parallel. For the overall aspects we log our model to MLflow under the experiment called `Time Series`. As the package AutoTS does not provide an autologging of the model, we create a prediction Python class, in order to be able to use the MLflow prediction capabilities.

In [0]:
notebook_path = dbutils.notebook.entry_point.getDbutils().notebook().getContext().notebookPath().get()
root_path = str(Path(notebook_path).parent)

In [0]:
mlflow.set_tracking_uri("databricks")
mlflow.set_experiment(f"{root_path}/Time Series Forecasting")

In [0]:
class AutoTSPredictor(mlflow.pyfunc.PythonModel):
    """
    A custom MLflow Python model wrapper for time series forecasting using the AutoTS model.

    This class is designed to integrate an AutoTS forecasting model with MLflow's model
    deployment capabilities. The `AutoTSPredictor` class inherits from `mlflow.pyfunc.PythonModel`,
    allowing it to be used in MLflow pipelines or deployed as an MLflow model.

    Args:
        mlflow (mlflow.pyfunc.PythonModel): Base class for custom MLflow models.
    """
    def __init__(self, autots_model: AutoTS):
        """
        Initializes the AutoTSPredictor with a trained AutoTS model.

        Args:
            autots_model (AutoTS): An instance of the AutoTS model that has been trained
                for time series forecasting.
        """
        self.autots_model = autots_model
    def predict(self, context, model_input) -> pd.DataFrame:
        """
        Generates forecasts using the AutoTS model and formats the output.

        The `predict` method leverages the `AutoTS` model to produce forecasts,
        including the central forecast and optional upper and lower bounds for
        prediction intervals.

        Args:
            context: MLflow context, provided during deployment; typically unused here.
            model_input: Model input provided to the MLflow model (not used in this function).

        Returns:
            pd.DataFrame: A DataFrame containing the forecasted values along with
            upper (`y_high`) and lower (`y_low`) forecast bounds.

        The returned DataFrame has the following columns:
            - `y`: The central forecast values.
            - `y_high`: The upper bound of the forecast interval.
            - `y_low`: The lower bound of the forecast interval.
        """
        predictions = self.autots_model.predict()
        forecasts_df = predictions.forecast.reset_index(names=["date"])
        forecasts_df = forecasts_df.melt(id_vars=["date"], var_name="CompanyCode", value_name="forecast")
        upper_forecast = predictions.upper_forecast.reset_index(names=["date"])
        upper_forecast = upper_forecast.melt(id_vars=["date"], var_name="CompanyCode", value_name="upper_forecast")
        lower_forecast = predictions.lower_forecast.reset_index(names=["date"])
        lower_forecast = lower_forecast.melt(id_vars=["date"], var_name="CompanyCode", value_name="lower_forecast")
        prediction_dataframes = [forecasts_df, upper_forecast, lower_forecast]
        prediction_merged = reduce(lambda left, right: pd.merge(left, right, on=["date", "CompanyCode"], how="inner"), prediction_dataframes)
        return prediction_merged

For the training with the AutoTS package, we use in total 4 different parameters which can be changed based on the individual characteristics of the data:
- FORECAST_LENGTH: Number of periods over which to evaluate forecast. Can be overriden later in `.predict`. when you don’t have much historical data, using a small forecast length for `.fit` and the full desired forecast length for `.predict` is usually the best possible approach given limitations.
- MAX_GENERATIONS: Number of genetic algorithms generations to run. More runs = longer runtime, generally better accuracy. 
- NUM_VALIDATIONS: Number of cross validations to perform. 0 for just train/test on best split. Possible confusion: `num_validations` is the number of validations to perform after the first eval segment, so totally eval/validations will be this + 1. Also **auto** and **max** aliases available. Max maxes out at **50**.
- ENSEMBLE: None or list or comma-separated string containing: **auto**, **simple**, **distance**, **horizontal**, **horizontal-min**, **horizontal-max**, **mosaic**, **subsample**
- NUM_JOBS: number of jobs used for performing the hyperparameter tuning for AutoTS
- MODEL_LIST: Model name list with the associated models. Under the method `from autots.models.model_list import model_lists` a dictonary is returned showing all the different available models under the specific dictionary key

In [0]:
from autots.models.model_list import model_lists
model_lists

In [0]:
FORECAST_LENGTH = 6
MAX_GENERATIONS = 2
NUM_VALIDATIONS = 2
ENSEMBLE = "simple"
NUM_JOBS = 30
MODEL_LIST = "fast"

In [0]:
with mlflow.start_run() as run:
    try:
        model = AutoTS(forecast_length=FORECAST_LENGTH,
                    max_generations=MAX_GENERATIONS,
                    num_validations=NUM_VALIDATIONS,
                    ensemble=ENSEMBLE,
                    verbose=0,
                    n_jobs=NUM_JOBS,
                    model_list=MODEL_LIST)
        model.fit(time_series_data, date_col="ds", value_col="y", id_col="CompanyCode")
        # Save the AutoTS model as a serialized file
        model_path = "autots_model.pkl"
        with open(model_path, "wb") as f:
            pickle.dump(model, f)

        # Log the AutoTS model as an artifact
        mlflow.log_artifact(model_path)
        # Log the custom model with MLflow's pyfunc API
        mlflow.pyfunc.log_model(
            artifact_path="model",
            python_model=AutoTSPredictor(model),
            artifacts={"autots_model": model_path},
            input_example=time_series_data[:5],
            registered_model_name="cashflow_ts_model"
        )
        model_uri = f"runs:/{run.info.run_id}/model"
        best_model_id = model.best_model_id
        print(f"Model saved in MLflow Run ID: {run.info.run_id}")
        os.remove(model_path)
    except:
        raise ValueError("The provided data does not provide a coherent time series that allows the models to generalize. Please provide a time series with at least 3 years of data and check, whether the prepared dataset contains a lot of 0 values due to gaps per month or seasonality issues.")