## MLflow Train, Cross Validation, Hyperparameter Tuning and deploy to databricks as a REST API

In [0]:
%pip install hyperopt
%pip install mlflow
%pip install xgboost

Python interpreter will be restarted.
Collecting hyperopt
  Using cached hyperopt-0.2.7-py2.py3-none-any.whl (1.6 MB)
Collecting cloudpickle
  Using cached cloudpickle-2.2.1-py3-none-any.whl (25 kB)
Collecting networkx>=2.2
  Using cached networkx-3.1-py3-none-any.whl (2.1 MB)
Collecting future
  Using cached future-0.18.3-py3-none-any.whl
Collecting py4j
  Using cached py4j-0.10.9.7-py2.py3-none-any.whl (200 kB)
Collecting tqdm
  Using cached tqdm-4.65.0-py3-none-any.whl (77 kB)
Installing collected packages: tqdm, py4j, networkx, future, cloudpickle, hyperopt
Successfully installed cloudpickle-2.2.1 future-0.18.3 hyperopt-0.2.7 networkx-3.1 py4j-0.10.9.7 tqdm-4.65.0
Python interpreter will be restarted.
Python interpreter will be restarted.
Collecting mlflow
  Using cached mlflow-2.4.1-py3-none-any.whl (18.1 MB)
Collecting gunicorn<21
  Using cached gunicorn-20.1.0-py3-none-any.whl (79 kB)
Collecting docker<7,>=4.0.0
  Using cached docker-6.1.3-py3-none-any.whl (148 kB)
Collecting py

In [0]:
import pandas as pd
import requests
import seaborn as sns
from pyspark.sql.functions import col

from hyperopt import fmin, tpe, hp, SparkTrials, Trials, STATUS_OK
from hyperopt.pyll import scope
from math import exp
import mlflow.xgboost
import numpy as np
import xgboost as xgb
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_squared_error
from mlflow.models.signature import infer_signature
from mlflow.utils.environment import _mlflow_conda_env
from mlflow.tracking import MlflowClient
import cloudpickle
import time

In [0]:
def get_historical_weather(
    lat=52.377956,
    lon=4.897070,
    start_date="2022-01-01",
    end_date="2022-12-31",
    feature_list=[
        "temperature_2m",
        "relativehumidity_2m",
        "windspeed_10m",
        "rain",
    ],
):
    """
    Get historical weather data from open-meteo.com

    Parameters
    ----------
    lat: float
        Latitude of the location you want to get the weather for.
    lon: float
        Longitude of the location you want to get the weather for.
    start_date: str
        Start date of the period you want to get the weather for.
    end_date: str
        End date of the period you want to get the weather for.
    feature_list: list
        List of features you want to get the weather for.
        Options: "temperature_2m", "relativehumidity_2m",
        "windspeed_10m", "rain"

    Returns
    -------
    df: pd.DataFrame
        Data frame containing the weather data.

    Examples
    --------
    >>> import pandas as pd
    >>> from utils import get_historical_weather
    >>> df = get_historical_weather()
    >>> df.head()

    """
    url = f"https://archive-api.open-meteo.com/v1/era5?latitude={lat}&longitude={lon}&start_date={start_date}&end_date={end_date}&hourly={','.join(feature_list)}"
    response = requests.get(url)
    return pd.DataFrame(response.json()["hourly"])

In [0]:
train_df = (
    spark.read.load("dbfs:/user/hive/warehouse/disruptions_2011_2021")
    .withColumn("start_time", col("start_time").cast("string"))
    .withColumn("end_time", col("end_time").cast("string"))
    .toPandas()
    .assign(
        **{
            "start_time": lambda x: pd.to_datetime(x["start_time"]),
            # "end_time": lambda x: pd.to_datetime(x["end_time"]),
            "date": lambda x: pd.to_datetime(x["start_time"]).dt.date,
        }
    )
    .groupby("date")
    .agg({"duration_minutes": "sum"})
    # .reset_index()
)

In [0]:
weather_df = (
    get_historical_weather(
        lat=52.520008,
        lon=13.404954,
        start_date=str(train_df.index.min()),
        end_date=str(train_df.index.max()),
    )
    .assign(**{"date": lambda x: pd.to_datetime(x["time"]).dt.date})
    .groupby("date")
    .agg({"temperature_2m": ["mean", "min", "max"], "rain": "sum"})
)
weather_df.columns = ["_".join(col) for col in weather_df.columns]

In [0]:
df = (pd.merge(train_df, weather_df, on="date", how="left")
      .loc[lambda x: x["duration_minutes"] < 20000] # remove outliers
      .dropna()
      )

In [0]:
target = "duration_minutes"
X = df.drop([target], axis=1)
y = df[target]
 
# Split out the training data
X_train, X_rem, y_train, y_rem = train_test_split(X, y, train_size=0.6, random_state=123)
 
# Split the remaining data equally into validation and test
X_val, X_test, y_val, y_test = train_test_split(X_rem, y_rem, test_size=0.5, random_state=123)

In [0]:
search_space = { 
    'boosting_type': hp.choice('boosting_type', ['gbdt','goss']),
    'metric': hp.choice('metric',['rmse']),
    'max_depth':scope.int(hp.quniform('max_depth', 2, 16, 1)),
    'min_data_in_leaf': scope.int(hp.quniform('min_data_in_leaf', 30, 150, 1)),
    'num_leaves': scope.int(hp.quniform('num_leaves', 30, 150, 1)),
    'learning_rate': hp.loguniform('learning_rate', np.log(0.01), np.log(0.2)),
    'min_child_samples': hp.quniform('min_child_samples', 20, 500, 5),
    'reg_alpha': hp.uniform('reg_alpha', 0.0, 10),
    'reg_lambda': hp.uniform('reg_lambda', 0.0, 10),
    'colsample_bytree': hp.uniform('colsample_by_tree', 0.6, 1.0),
    "feature_pre_filter": hp.choice("feature_pre_filter",[False]),
    'seed': 123, # Set a seed for deterministic training
}

In [0]:
def train_model(params):
    # With MLflow autologging, hyperparameters and the trained model are automatically logged to MLflow.
    mlflow.xgboost.autolog()
    with mlflow.start_run(nested=True):
        train = xgb.DMatrix(data=X_train, label=y_train)
        validation = xgb.DMatrix(data=X_val, label=y_val)
        # Pass in the validation set so xgb can track an evaluation metric. XGBoost terminates training when the evaluation metric
        # is no longer improving.
        booster = xgb.train(
            params=params,
            dtrain=train,
            num_boost_round=1000,
            evals=[(validation, "validation")],
            early_stopping_rounds=50,
        )
        validation_predictions = booster.predict(validation)
        metric_score = np.sqrt(mean_squared_error(y_val, validation_predictions))
        mlflow.log_metric("RMSE", metric_score)

        signature = infer_signature(X_train, booster.predict(train))
        mlflow.xgboost.log_model(booster, "model", signature=signature)

        # Set the loss to -1*auc_score so fmin maximizes the auc_score
        return {
            "status": STATUS_OK,
            "loss": -1 * metric_score,
            "booster": booster.attributes(),
        }

In [0]:
# Greater parallelism will lead to speedups, but a less optimal hyperparameter sweep. 
# A reasonable value for parallelism is the square root of max_evals.
spark_trials = SparkTrials(parallelism=10)
# mlflow.set_experiment("/Users/james.twose@cinqict.nl/disruptions_xgboost")

In [0]:
 # Run fmin within an MLflow run context so that each hyperparameter configuration is logged as a child run of a parent
# run called "xgboost_models" .
with mlflow.start_run(run_name='xgboost_models'):
  best_params = fmin(
    fn=train_model, 
    space=search_space, 
    algo=tpe.suggest, 
    max_evals=96,
    trials=spark_trials,
  )

  0%|          | 0/96 [00:00<?, ?trial/s, best loss=?]  1%|          | 1/96 [00:14<22:15, 14.06s/trial, best loss: -1712.1590894160886]  3%|▎         | 3/96 [00:15<06:13,  4.02s/trial, best loss: -1789.2112967147025]  4%|▍         | 4/96 [00:16<04:33,  2.98s/trial, best loss: -1789.2112967147025]  6%|▋         | 6/96 [00:17<02:37,  1.75s/trial, best loss: -1800.2259562156223]  7%|▋         | 7/96 [00:19<02:41,  1.82s/trial, best loss: -1800.2259562156223]  8%|▊         | 8/96 [00:20<02:20,  1.60s/trial, best loss: -1800.2259562156223] 10%|█         | 10/96 [00:25<02:51,  2.00s/trial, best loss: -1800.2259562156223] 11%|█▏        | 11/96 [00:26<02:29,  1.76s/trial, best loss: -1800.2259562156223] 12%|█▎        | 12/96 [00:27<02:12,  1.57s/trial, best loss: -1800.2259562156223] 15%|█▍        | 14/96 [00:28<01:32,  1.13s/trial, best loss: -1800.2259562156223] 16%|█▌        | 15/96 [00:30<01:48,  1.34s/trial, best loss: -1800.2259562156223] 17%|█▋        | 16/96 [00:31<01:40, 

Total Trials: 96: 96 succeeded, 0 failed, 0 cancelled.


In [0]:
best_run = mlflow.search_runs(order_by=['metrics.RMSE ASC']).iloc[0]
print(f'RMSE of Best Run: {best_run["metrics.RMSE"]}')

RMSE of Best Run: 1555.615725330372


In [0]:
best_run

Out[14]: run_id                                                        9b4fb095dc994a1faafa91ac2aab1ea9
experiment_id                                                                  376818718792203
status                                                                                FINISHED
artifact_uri                                 dbfs:/databricks/mlflow-tracking/3768187187922...
start_time                                                    2023-06-15 10:50:50.232000+00:00
end_time                                                      2023-06-15 10:50:58.999000+00:00
metrics.stopped_iteration                                                                245.0
metrics.RMSE                                                                       1555.615725
metrics.best_iteration                                                                   195.0
metrics.validation-rmse                                                            1554.834269
params.seed                              

In [0]:
model_name = "disruption_prediction"
new_model_version = mlflow.register_model(f"runs:/{best_run.run_id}/model", model_name)
 
# Registering the model takes a few seconds, so add a small delay
time.sleep(15)

Successfully registered model 'disruption_prediction'.
2023/06/15 10:55:35 INFO mlflow.tracking._model_registry.client: Waiting up to 300 seconds for model version to finish creation. Model name: disruption_prediction, version 1
Created version '1' of model 'disruption_prediction'.


In [0]:
client = MlflowClient()
# Promote the new model version to Production
client.transition_model_version_stage(
  name=model_name,
  version=new_model_version.version,
  stage="Production"
)

Out[23]: <ModelVersion: aliases=[], creation_timestamp=1686826535248, current_stage='Production', description='', last_updated_timestamp=1686826676807, name='disruption_prediction', run_id='9b4fb095dc994a1faafa91ac2aab1ea9', run_link='', source='dbfs:/databricks/mlflow-tracking/376818718792203/9b4fb095dc994a1faafa91ac2aab1ea9/artifacts/model', status='READY', status_message='', tags={}, user_id='5109889680170896', version='1'>

In [0]:
# Try the model that is saved in the production models
model = mlflow.pyfunc.load_model(f"models:/{model_name}/production")
 
# Sanity-check: This should match the RMSE logged by MLflow
print(f'RMSE: {np.sqrt(mean_squared_error(y_test, model.predict(X_test)))}')

 - mlflow (current: 2.4.1, required: mlflow==2.4)
To fix the mismatches, call `mlflow.pyfunc.get_model_dependencies(model_uri)` to fetch the model's environment and install dependencies using the resulting environment file.


RMSE: 1626.2048003495559
