# Parallel demand forecasting at scale using Ray Tune and Ray Data

Batch training and tuning are common tasks in machine learning use-cases. They require training simple models, on data batches, typcially corresponding to different locations, products, etc. Batch training can take less time to process all the data at once, but only if those batches can run in parallel!

This notebook showcases how to conduct batch forecasting with NeuralProphet. NeuralProphet is a popular open-source library developed by Facebook and designed for automatic forecasting of univariate time series data. 
<br></br>
<div style="text-align: center; line-height: 5; padding-top: 20px;  padding-bottom: 20px;">
  <img src="https://docs.ray.io/en/master/_images/batch-training.svg" alt='Push compute' height="300" width="400">
</div>

For the data, we will use the M5 walmart dataset.This popular tabular dataset contains historical sales of products for different locations and regions in USA

## This notebook has been tested with ML DBR 16.1 with the below cluster config
**Head node** : 28GB 4 Cores <br>
**Worker nodes** : 64GB 16 Cores  <br>
**max workers** : 3  <br>

In [0]:
%pip install neuralprophet ray[default,tune]==2.41.0 #update to latest version for better support to logging
%restart_python

In [0]:
# Specify catalog Dependencies
CATALOG = 'mlops_pj'
SCHEMA = 'many_model_forecasting_example'
VOLUME = 'walmart'

In [0]:
# Check if UC assets exists and create them if they do not
# spark.sql(f"CREATE CATALOG IF NOT EXISTS {CATALOG}")
spark.sql(f"USE CATALOG {CATALOG}")
spark.sql(f"CREATE SCHEMA IF NOT EXISTS {SCHEMA}")
spark.sql(f"USE SCHEMA {SCHEMA}")
spark.sql(f"CREATE VOLUME IF NOT EXISTS {CATALOG}.{SCHEMA}.{VOLUME}")

### Import the libraries

In [0]:
import random
import multiprocessing
import numpy as np
from datetime import timedelta, date
import traceback
import math
import timeit
import torch
import mlflow
from mlflow.tracking import MlflowClient

import pandas as pd
import neuralprophet
from neuralprophet import NeuralProphet, set_log_level


# importing hyperopt and ray
from hyperopt import hp
import ray
from ray import tune
from ray.tune.schedulers import AsyncHyperBandScheduler
from ray.tune.search.hyperopt import HyperOptSearch
from ray.tune.stopper import TimeoutStopper
from ray.tune.search.concurrency_limiter import ConcurrencyLimiter
from ray.runtime_env import RuntimeEnv

from hyperopt.pyll import scope
from itertools import product
from ray.util.multiprocessing import Pool

import multiprocessing
num_cpus = multiprocessing.cpu_count()
print(num_cpus)

import logging

from delta.tables import DeltaTable
from pyspark.sql.functions import col
from pyspark.sql import Window
from sklearn.metrics import mean_squared_error
from datetime import datetime
import json
import time, os
from ast import literal_eval
from pytorch_lightning.callbacks.early_stopping import EarlyStopping
import pyspark.sql.functions as F

## Get The cluster information

In [0]:
import requests
ctx = dbutils.notebook.entry_point.getDbutils().notebook().getContext()
host_name = ctx.tags().get("browserHostName").get()
host_token = ctx.apiToken().get()
cluster_id = ctx.tags().get("clusterId").get()

response = requests.get(
    f'https://{host_name}/api/2.1/clusters/get?cluster_id={cluster_id}',
    headers={'Authorization': f'Bearer {host_token}'}
  ).json()

if "autoscale" in response:
  min_node = response['autoscale']["min_workers"]
  max_node = response['autoscale']["max_workers"]

## Start Ray Cluster

In [0]:
import os
import ray
import numpy as np 
from mlflow.utils.databricks_utils import get_databricks_env_vars
from ray.util.spark import setup_ray_cluster, shutdown_ray_cluster, MAX_NUM_WORKER_NODES


# Cluster cleanup
restart = True
if restart is True:
  try:
    shutdown_ray_cluster()
  except:
    pass

  try:
    ray.shutdown()
  except:
    pass

# Set configs based on your cluster size
num_cpu_cores_per_worker = 15 # total cpu to use in each worker node (total_cores - 1 to leave one core for spark)
num_cpus_head_node = 2 # Cores to use in driver node (total_cores - 1)

# Set databricks credentials as env vars
mlflow_dbrx_creds = get_databricks_env_vars("databricks")
os.environ["DATABRICKS_HOST"] = mlflow_dbrx_creds['DATABRICKS_HOST']
os.environ["DATABRICKS_TOKEN"] = mlflow_dbrx_creds['DATABRICKS_TOKEN']

ray_conf = setup_ray_cluster(
  min_worker_nodes=min_node,
  max_worker_nodes=max_node,
  num_cpus_head_node= num_cpus_head_node,
  num_cpus_per_node=num_cpu_cores_per_worker,
  num_gpus_head_node=0,
  num_gpus_worker_node=0
)
os.environ['RAY_ADDRESS'] = ray_conf[0]

# Reading walmart data

In [0]:
sdf_walmart = spark.read.table('final_cleaned_filtered')

### Create Random time-series per model for the Demo.
make sure num_items and items_per_model are divisible

In [0]:
# Experiment setup - make sure num_items and items_per_model are divisible
num_items = 600  # Max number of item time series to load, full dataset has 30490 which is overkill
items_per_model = 100  # Number of item time series per model

In [0]:

window_spec = Window.orderBy('state_id', 'store_id', 'cat_id', 'dept_id', 'item_id')
sdf_walmart_with_model_num = sdf_walmart.withColumn("item_num", F.dense_rank().over(window_spec))  # A unique item number based on the window
sdf_walmart_with_model_num = sdf_walmart_with_model_num.filter(sdf_walmart_with_model_num.item_num <= num_items)
sdf_walmart_with_model_num = sdf_walmart_with_model_num.withColumn("model_num", F.ceil(F.col("item_num") / items_per_model))
sdf_walmart_with_model_num = sdf_walmart_with_model_num.withColumn('y', F.col('sell_price')*F.col('sale_quantity'))
sdf_walmart_with_model_num.cache()
print(sdf_walmart.count())
sdf_walmart_with_model_num.display()

## There are multiple ways to convert data from the lakehouse to Ray Data , refer to the [documentation](https://docs.databricks.com/en/machine-learning/ray/connect-spark-ray.html) for more details 

In [0]:
# Convert spark Dataframe to Ray 
sdf_ray  = ray.data.from_spark(sdf_walmart_with_model_num)

# Many model Forecasting with Ray Tune and Ray Data
Ray Tune is a powerful library for hyperparameter tuning, designed to simplify the development of distributed applications. It allows you to efficiently sample hyperparameters and get optimized results on your objective function. Ray Tune provides a variety of state-of-the-art hyperparameter tuning algorithms for optimizing model performance. 

To use Ray Tune for hyperparameter tuning, you can follow these steps:
- Define your training function and objective function.
- Specify the hyperparameters and their search space.
- Define the pyspark udf function which runs ray tune for each Hierarchial model for the chosen search algorithm and scheduler.
- Run the pyspark job and get the result

## Step 1 : Define the training and objective function

In [0]:
import numpy as np 

def ray_trial(config, df,cpu_resources_per_trial):
  """
  Single ray trial of parameter config 
  This runs a NeuralProphet model based on the given config and then loads  
  """

  torch.set_num_threads(int(cpu_resources_per_trial)) # Pass the correct cpu to use to improve multi-threading
  test_cutoff = df['ds'].max() - pd.Timedelta(days=7) # Take 7 day test cut-ogg
  df_train = df[df['ds'] < test_cutoff]
  df_test = df
  trainer_config = {}

  config['n_changepoints'] = 10
  config['n_lags'] = 3
  config['drop_missing'] = True 
  config['impute_rolling'] = 1000
  config['batch_size'] = 128
  config['epochs'] = 10

  # Define the Model (it can be any model in our case we use NeuralProphet)
  model = NeuralProphet(
      accelerator='auto',
      trainer_config=trainer_config,
      **config
  )
  start = timeit.default_timer()
  # Train model
  progress = model.fit(
      df=df_train,
      checkpointing =True,
      freq="D",
      metrics=['RMSE'],
      progress='bar'
    )
  total_time = timeit.default_timer()-start
  if np.isnan(progress['RMSE'][0]):
    progress.fillna(1000, inplace = True)

  d_p = progress.loc[progress['RMSE'] == progress['RMSE'].min()].to_dict(orient='records')

  print("Loss :",d_p[0]['Loss'])
  # Validate the model and get the RMSE Score
  forecast_week = model.predict(df[df['ds'] >= (df['ds'].max() - pd.Timedelta(days=360))])
  forecast_week = forecast_week[forecast_week['ds'] >= test_cutoff]
  forecast_week.y.fillna(0, inplace=True)
  forecast_week.yhat1.fillna(0, inplace=True)
  test_rmse = mean_squared_error(forecast_week.yhat1.tolist(), forecast_week.y.tolist(), squared=False)
  if np.isnan(test_rmse):
    test_rmse = 1000
  print("test_rmse:",test_rmse)
  #Push the final metric to track
  ray.train.report({"RMSE":test_rmse,
              "Loss" :d_p[0]['Loss']})

## Step 2 : Define the search space

In [0]:
space_str = """
{
  "learning_rate": tune.uniform(0.001, 0.1),
  "n_changepoints": 10,
  "n_lags": 3, 
  'drop_missing': True,
  'impute_rolling': 1000,
  'newer_samples_weight': tune.uniform(1, 7),
  'batch_size': 128,
  "ar_layers": tune.choice([[64,64,64],[128,128,128],[256,256,256]]),
  'epochs': 10
}
"""

## Step 3 : Define the Ray Map Groups udf function which runs ray tune for each Hierarchial model for the chosen search algorithm and scheduler.

In [0]:
from typing import Dict, Optional, Any

from ray import tune,train
from ray.tune.search import ConcurrencyLimiter
from ray.tune.search.optuna import OptunaSearch
from ray.air.integrations.mlflow import MLflowLoggerCallback


def udf_parallel_hpt_tune(df,experiment_id,
                          parent_id =None,
                          cpu_resources_per_trial = 2,
                          gpu_resources_per_trial = 0):
  """
  Single ray trial of parameter config 
  This runs a NeuralProphet model based on the given config and then loads  
  """
  def define_by_run_func(trial) -> Optional[Dict[str, Any]]:
    """
    Define-by-run function to create the search space.
    For more information, see https://optuna.readthedocs.io/en/stable\
    /tutorial/10_key_features/002_configurations.html
    """
    trial.suggest_int("n_estimators", 10, 200, log=True)
    trial.suggest_float("newer_samples_weight", 1, 7)
    trial.suggest_categorical("ar_layers", [[64,64,64],[128,128,128],[256,256,256]])

  start = timeit.default_timer()
  model_num = df["model_num"][0]
  df['date_time'] = pd.to_datetime(df['date_time'], format='%Y-%m-%d')
  df = df.sort_values(by='date_time', ascending=True)
  df = df.rename(columns={'date_time': 'ds', 'item_num': 'ID'})
  df = df[['ID', 'ds', 'y']]
  space = eval(space_str)

  tune_resources = {"CPU": cpu_resources_per_trial} if \
                     gpu_resources_per_trial == 0 else \
                      {"CPU": cpu_resources_per_trial,\
                                     "GPU": gpu_resources_per_trial}

  # Define Optuna search algo
  searcher = OptunaSearch(space=space, metric="RMSE", mode="min")

  # Tune with callback
  tuner = tune.Tuner(
      ray.tune.with_resources(ray.tune.with_parameters(
          ray_trial, df=df,cpu_resources_per_trial = cpu_resources_per_trial),tune.PlacementGroupFactory([tune_resources])),
      tune_config=tune.TuneConfig(
          search_alg=searcher,
          max_concurrent_trials=max_concurrent_trials,
          num_samples=max_concurrent_trials * num_batches,
          reuse_actors = True, # Highly recommended for short training jobs (NOT RECOMMENDED FOR GPU AND LONG TRAINING JOBS)
          )
  )
  multinode_results = tuner.fit()
  best_trial = multinode_results.get_best_result(metric="RMSE", mode="min", scope='last')
  with mlflow.start_run(
    run_name=f"model_{str(model_num)}",
    experiment_id = experiment_id,
    tags={"mlflow.parentRunId": parent_run.info.run_id},
    description="run inside ray Map Batches",
) as child_run:
    for key ,value in best_trial.config.items():
      mlflow.log_param( key = key,
                        value = str(value))
    mlflow.log_metric(key = 'rmse', value = best_trial.metrics['RMSE'])
    mlflow.log_metric(key = 'Loss', value = best_trial.metrics['Loss'])
    # mlflow.pyfunc.log_model(best_trial.last_result['checkpoint'], "model")

  best_rmse = best_trial.metrics['RMSE']

  return pd.DataFrame([{
  'model_num': model_num,
  'model_HPT_time': str(timeit.default_timer()-start), 
  'num_datapoints': df['y'].count(),
  'RMSE': best_rmse,
  'space': str(best_trial.config)
  }])

## Step 4 : Run the Map_groups wrapped in MLFLOW and get the results.

In [0]:
max_concurrent_trials = 3
num_batches = 3  # num trials = max_concurrent_trials * num_batches
num_models = sdf_walmart_with_model_num.select(F.max('model_num')).collect()[0][0]
print(f"num models : {num_models}")
total_concurrent_trials = num_models*max_concurrent_trials
print(f"Total concurrent Trials: {total_concurrent_trials}")

In [0]:
import mlflow


# Grab experiment and model name
experiment_name = "/Users/puneet.jain@databricks.com/ray_test"

if not  mlflow.get_experiment_by_name(experiment_name):
  mlflow.set_experiment(experiment_name)
  experiment_id = mlflow.get_experiment_by_name(experiment_name).experiment_id
else:
  experiment_id = mlflow.get_experiment_by_name(experiment_name).experiment_id

mlflow.set_experiment(experiment_name)

with mlflow.start_run(run_name ='ray_tune_native_mlflow_callback', experiment_id=experiment_id) as parent_run:
  result_df = sdf_ray.groupby('model_num').map_groups(udf_parallel_hpt_tune ,
                                                    concurrency = num_models,
                                                    fn_kwargs = {'parent_id' : parent_run,
                                                                 'experiment_id' : experiment_id,
                                                                 'cpu_resources_per_trial' : 2,
                                                                 'gpu_resources_per_trial' : 0},
                                                     batch_format = 'pandas').to_pandas()
result_df

Here we dynamically specify the resources to be used based on the cluster choosen betweem CPU and GPU

## #To Update
We see for each trial the cluster utilizes on 25% of the computation
<br></br>
<div style="text-align: center; line-height: 5; padding-top: 20px;  padding-bottom: 20px;">
  <img src="https://raw.githubusercontent.com/puneet-jain159/Image_dump/test/ray_25_dash_utilization.png" alt='Push compute' height="1000" width="1600">
</div>

In [0]:
# Here we see that only 25% percent of the GPU is being utlized to when running the trails

# Improving the cluster utilization

Since we are consuming 25% let us increase the number of trails to 4 to utlize the GPU Cluster properly

In [0]:
# max_concurrent_trials = 4
# num_models = sdf_walmart_with_model_num.select(F.max('model_num')).collect()[0][0]
# print(f"num models : {num_models}")
# total_concurrent_trials = num_models*max_concurrent_trials
# print(f"Total concurrent Trials: {total_concurrent_trials}")
# gpu_resources_per_trial = ray_resources['GPU']/total_concurrent_trials if 'GPU' in ray_resources.keys() else 0
# cpu_resources_per_trial = min(int(math.ceil((ray_resources['CPU']/num_models)/max_concurrent_trials)),16)
# print(f'gpu_resources_per_trial:{gpu_resources_per_trial}\ncpu_resources_per_trial:{cpu_resources_per_trial}')
# print(f"***Creating DF to ray tune on {num_models} models with {items_per_model} item time seies per model***")

## #To Update
We now see full utlization of the cluster
<br></br>
<div style="text-align: center; line-height: 5; padding-top: 20px;  padding-bottom: 20px;">
  <img src="https://raw.githubusercontent.com/puneet-jain159/Image_dump/test/ray_100_dash_utlization_max.png" alt='Push compute' height="1000" width="1600">
</div>