# Parallel demand forecasting at scale using Ray Tune and Ray Data

Batch training and tuning are common tasks in machine learning use-cases. They require training simple models, on data batches, typcially corresponding to different locations, products, etc. Batch training can take less time to process all the data at once, but only if those batches can run in parallel!

This notebook showcases how to conduct batch forecasting with NeuralProphet. NeuralProphet is a popular open-source library developed by Facebook and designed for automatic forecasting of univariate time series data. 
<br></br>
<div style="text-align: center; line-height: 5; padding-top: 20px;  padding-bottom: 20px;">
  <img src="https://docs.ray.io/en/master/_images/batch-training.svg" alt='Push compute' height="300" width="400">
</div>

For the data, we will use the M5 walmart dataset.This popular tabular dataset contains historical sales of products for different locations and regions in USA

## This notebook has been tested with ML DBR 16.4 with the below cluster config
**Head node** : 28GB 4 Cores <br>
**Worker nodes** : 64GB 16 Cores  <br>
**max workers** : 3  <br>

## Manage Dependencies

In [0]:
%pip install neuralprophet ray[default,tune]==2.41.0
%restart_python

In [0]:
# Base Python imports
import json
import logging
import multiprocessing
import os
import time
import timeit
from datetime import date, datetime, timedelta
from typing import Any, Dict, Optional

# External libraries
import mlflow
import neuralprophet
import numpy as np
import pandas as pd
import pyspark.sql.functions as F
import ray
import requests
import torch
from mlflow.utils.databricks_utils import get_databricks_env_vars
from neuralprophet import NeuralProphet
from pyspark.sql import Window
from pyspark.sql.functions import col
from ray import train, tune
from ray.air.integrations.mlflow import MLflowLoggerCallback
from ray.tune.search.concurrency_limiter import ConcurrencyLimiter
from ray.tune.search.optuna import OptunaSearch
from ray.util.spark import setup_ray_cluster, shutdown_ray_cluster
from sklearn.metrics import mean_squared_error

In [0]:
config = mlflow.models.ModelConfig(development_config='config.yaml')
CATALOG = config.get('catalog')
SCHEMA = config.get('schema')
VOLUME = config.get('volume')

# Assumes 01_data has been run
spark.sql(f"USE CATALOG {CATALOG}")
spark.sql(f"USE SCHEMA {SCHEMA}")

## Get cluster information

In [0]:
# Get cluster info
ctx = dbutils.notebook.entry_point.getDbutils().notebook().getContext()
host_name = ctx.tags().get("browserHostName").get()
host_token = ctx.apiToken().get()
cluster_id = ctx.tags().get("clusterId").get()
response = requests.get(
    f'https://{host_name}/api/2.1/clusters/get?cluster_id={cluster_id}',
    headers={'Authorization': f'Bearer {host_token}'}
  ).json()

# Get autoscale stats
if "autoscale" in response:
  min_workers = response['autoscale']["min_workers"]
  max_workers = response['autoscale']["max_workers"]
else:
  min_workers = 1
  max_workers = 1

# Get CPU information
current_workers = len(response.get('executors',[]))
driver_cpus = multiprocessing.cpu_count()
worker_cpus = response.get('cluster_cores') - driver_cpus

## Start Ray Cluster

In [0]:
if config.get('restart_ray_each_run'):
  try:
    shutdown_ray_cluster()
  except:
    pass

  try:
    ray.shutdown()
  except:
    pass

# We need to pass these to the environment in BEFORE caling setup_ray_cluster so that MLFlow works 
mlflow_dbrx_creds = get_databricks_env_vars("databricks")
os.environ["DATABRICKS_HOST"] = mlflow_dbrx_creds['DATABRICKS_HOST']
os.environ["DATABRICKS_TOKEN"] = mlflow_dbrx_creds['DATABRICKS_TOKEN']

# We keep two nodes from each worker and half the driver for spark
ray_conf = setup_ray_cluster(
  min_worker_nodes=min_workers,
  max_worker_nodes=max_workers,
  num_cpus_head_node=int(driver_cpus/2),
  num_cpus_per_node=int(worker_cpus-2),
  num_gpus_head_node=0,
  num_gpus_worker_node=0
)

## Distribute dataset

In [0]:
sdf_walmart = spark.read.table(config.get("final_table"))

window_spec = Window.orderBy(
    "state_id", "store_id", "cat_id", "dept_id", "item_id"
)

sdf_walmart_with_model_num = (
    sdf_walmart.withColumn(
        "item_num", F.dense_rank().over(window_spec)
    )
    .filter(F.col("item_num") <= config.get("num_items"))
    .withColumn(
        "model_num",
        F.ceil(F.col("item_num") / config.get("items_per_model")),
    )
    .withColumn("y", F.col("sell_price") * F.col("sale_quantity"))
    .cache()
)

print(sdf_walmart.count())
display(sdf_walmart_with_model_num)

## There are multiple ways to convert data from the lakehouse to Ray Data , refer to the [documentation](https://docs.databricks.com/en/machine-learning/ray/connect-spark-ray.html) for more details 

In Databricks Runtime, ray.data.from_spark() requires the DataFrame chunk API to efficiently transfer data from Spark to Ray. This configuration enables chunked reading of Spark DataFrames, which is more memory-efficient for large datasets.

In [0]:
# Set the temporary directory for Ray to use Unity Catalog Volumes
os.environ['RAY_UC_VOLUMES_FUSE_TEMP_DIR'] = f'/Volumes/{CATALOG}/{SCHEMA}/{VOLUME}/ray_temp'

# Convert spark Dataframe to Ray 
sdf_ray = ray.data.from_spark(
    sdf_walmart_with_model_num, 
    use_spark_chunk_api=False
)

# Many model Forecasting with Ray Tune and Ray Data
Ray Tune is a powerful library for hyperparameter tuning, designed to simplify the development of distributed applications. It allows you to efficiently sample hyperparameters and get optimized results on your objective function. Ray Tune provides a variety of state-of-the-art hyperparameter tuning algorithms for optimizing model performance. 

To use Ray Tune for hyperparameter tuning, you can follow these steps:
- Define your training function and objective function.
- Specify the hyperparameters and their search space.
- Define the pyspark udf function which runs ray tune for each Hierarchial model for the chosen search algorithm and scheduler.
- Run the pyspark job and get the result

## Step 1 : Define the training and objective function

In [0]:
def ray_trial(config, df, cpu_resources_per_trial):
  """
  Single ray trial of parameter config 
  This runs a NeuralProphet model based on the given config and then loads  
  """

  torch.set_num_threads(int(cpu_resources_per_trial)) # Pass the correct cpu to use to improve multi-threading
  test_cutoff = df['ds'].max() - pd.Timedelta(days=7) # Take 7 day test cut-ogg
  df_train = df[df['ds'] < test_cutoff]
  df_test = df
  trainer_config = {}

  # Hardcode other Neural prophet parameters
  config['n_changepoints'] = 10
  config['n_lags'] = 3
  config['drop_missing'] = True 
  config['impute_rolling'] = 1000
  config['batch_size'] = 128
  config['epochs'] = 10

  # Define the Model (it can be any model in our case we use NeuralProphet)
  model = NeuralProphet(
      accelerator='auto',
      trainer_config=trainer_config,
      **config
  )
  start = timeit.default_timer()
  
  # Train model
  progress = model.fit(
      df=df_train,
      checkpointing =True,
      freq="D",
      metrics=['RMSE'],
      progress='bar'
    )
  total_time = timeit.default_timer()-start
  if np.isnan(progress['RMSE'][0]):
    progress.fillna(1000, inplace = True)

  d_p = progress.loc[progress['RMSE'] == progress['RMSE'].min()].to_dict(orient='records')
  print("Loss :",d_p[0]['Loss'])

  # Validate the model and get the RMSE Score
  forecast_week = model.predict(df[df['ds'] >= (df['ds'].max() - pd.Timedelta(days=360))])
  forecast_week = forecast_week[forecast_week['ds'] >= test_cutoff]
  forecast_week.y.fillna(0, inplace=True)
  forecast_week.yhat1.fillna(0, inplace=True)
  test_rmse = mean_squared_error(forecast_week.yhat1.tolist(), forecast_week.y.tolist(), squared=False)
  if np.isnan(test_rmse):
    test_rmse = 1000
  print("test_rmse:",test_rmse)
  
  #Push the final metric to track
  ray.train.report({"RMSE":test_rmse, "Loss" :d_p[0]['Loss']})

## Step 2 : Define the search space

In [0]:
space_str = """
{
  "learning_rate": tune.uniform(0.001, 0.1),
  "n_changepoints": 10,
  "n_lags": 3, 
  'drop_missing': True,
  'impute_rolling': 1000,
  'newer_samples_weight': tune.uniform(1, 7),
  'batch_size': 128,
  "ar_layers": tune.choice([[64,64,64],[128,128,128],[256,256,256]]),
  'epochs': 10
}
"""

## Step 3 : Define the Ray Map Groups udf function which runs ray tune for each Hierarchial model for the chosen search algorithm and scheduler.

In [0]:
max_trials = config.get('max_concurrent_trials')
num_batches = int(config.get('num_items') / config.get('items_per_model'))
num_models = sdf_walmart_with_model_num.select(F.max('model_num')).collect()[0][0]
total_concurrent_trials = num_models * max_trials
print(f"Total concurrent Trials: {total_concurrent_trials}")

In [0]:
def udf_parallel_tune(
    df,
    experiment_id,
    parent_id=None,
    cpu_resources_per_trial=2,
    gpu_resources_per_trial=0,
):
    """
    Single ray trial of parameter config 
    This runs a NeuralProphet model based on the given config and then loads  
    """

    def define_by_run_func(trial) -> Optional[Dict[str, Any]]:
        """
        Define-by-run function to create the search space.
        For more information, see https://optuna.readthedocs.io/en/stable\
        /tutorial/10_key_features/002_configurations.html
        """
        trial.suggest_int("n_estimators", 10, 200, log=True)
        trial.suggest_float("newer_samples_weight", 1, 7)
        trial.suggest_categorical(
            "ar_layers", [[64, 64, 64], [128, 128, 128], [256, 256, 256]]
        )

    start = timeit.default_timer()
    model_num = df["model_num"][0]
    df["date_time"] = pd.to_datetime(df["date_time"], format="%Y-%m-%d")
    df = df.sort_values(by="date_time", ascending=True)
    df = df.rename(columns={"date_time": "ds", "item_num": "ID"})
    df = df[["ID", "ds", "y"]]

    # Use a string to serialize the parameters easier 
    space = eval(space_str)

    tune_resources = (
        {"CPU": cpu_resources_per_trial}
        if gpu_resources_per_trial == 0
        else {"CPU": cpu_resources_per_trial, "GPU": gpu_resources_per_trial}
    )

    # Define Optuna search algo
    searcher = OptunaSearch(space=space, metric="RMSE", mode="min")

    # Tune with callback
    tuner = tune.Tuner(
        ray.tune.with_resources(
            ray.tune.with_parameters(
                ray_trial, df=df, cpu_resources_per_trial=cpu_resources_per_trial
            ),
            tune.PlacementGroupFactory([tune_resources]),
        ),
        tune_config=tune.TuneConfig(
            search_alg=searcher,
            max_concurrent_trials=MAX_TRIALS,
            num_samples=MAX_TRIALS * NUM_BATCHES,
            reuse_actors=True,  # Highly recommended for short training jobs (NOT RECOMMENDED FOR GPU AND LONG TRAINING JOBS)
        ),
    )
    multinode_results = tuner.fit()
    best_trial = multinode_results.get_best_result(
        metric="RMSE", mode="min", scope="last"
    )
    with mlflow.start_run(
        run_name=f"model_{str(model_num)}",
        experiment_id=experiment_id,
        tags={"mlflow.parentRunId": parent_id.info.run_id},
        description="run inside ray Map Batches",
    ) as child_run:
        for key, value in best_trial.config.items():
            mlflow.log_param(key=key, value=str(value))
        mlflow.log_metric(key="rmse", value=best_trial.metrics["RMSE"])
        mlflow.log_metric(key="Loss", value=best_trial.metrics["Loss"])
        # mlflow.pyfunc.log_model(best_trial.last_result['checkpoint'], "model")

    best_rmse = best_trial.metrics["RMSE"]

    return pd.DataFrame(
        [
            {
                "model_num": model_num,
                "model_HPT_time": str(timeit.default_timer() - start),
                "num_datapoints": df["y"].count(),
                "RMSE": best_rmse,
                "space": str(best_trial.config),
            }
        ]
    )

## Step 4 : Run the Map_groups wrapped in MLFLOW and get the results.

In [0]:
import mlflow

experiment_name = config.get('experiment_name')

if not mlflow.get_experiment_by_name(experiment_name):
    mlflow.set_experiment(experiment_name)
    experiment_id = mlflow.get_experiment_by_name(experiment_name).experiment_id
else:
    experiment_id = mlflow.get_experiment_by_name(experiment_name).experiment_id

mlflow.set_experiment(experiment_name)

with mlflow.start_run(
    run_name="ray_tune_native_mlflow_callback", experiment_id=experiment_id
) as parent_run:
    result_df = (
        sdf_ray.groupby("model_num")
        .map_groups(
            udf_parallel_tune,
            concurrency=num_models,
            fn_kwargs={
                "parent_id": parent_run,
                "experiment_id": experiment_id,
                "cpu_resources_per_trial": 2,
                "gpu_resources_per_trial": 0,
            },
            batch_format="pandas",
        )
        .to_pandas()
    )
    
result_df