##Chapter 16: Production Infrastructure
Author: Ben Wilson

#### Summary
In this notebook, we'll be looking at leveraging MLflow's model registry for building a basic passive retraining system for a simple model. 
As part of this, as a companion to earlier discussions in the book about hyperparameter tuning, we're going to be using Optuna, an optimization engine that builds upon the foundation of hyperopt. This package has a higher-level (and arguably far easier to use interface) API, an improved optimizer, and no dependency on comparatively complex partial function logic. </br>

#### Notes
The code in this notebook is displayed in a notebook format for teaching purposes only. The classes section at the top portion of the code would, in a real-world use case, be defined in separate modules within a code base. As we move further along through this notebook, subsequent iterative runs that simulate retraining would, clearly, not be defined within a single code base. Rather, these iterations would be manifested in multiple executions of the training code modules. </br>
Basically, what I'm saying is... <i><b>don't use this as a template</b></i>. This is intended to showcase the model registry functionality, why it's so critical to production processes for retraining, and to illustrate how to manage model artifacts in an automated manner.

First install optuna from pip to ensure that we have this library loaded for our automated hyperparameter tuning as part of passive retraining.

In [0]:
%pip install optuna

In [0]:
import requests
import math
import pandas as pd
import numpy as np
from datetime import datetime
from dataclasses import dataclass

import mlflow
import mlflow.sklearn
from mlflow.tracking.client import MlflowClient
from mlflow.models.signature import infer_signature

import optuna
from collections import namedtuple

from sklearn.ensemble import RandomForestRegressor
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_squared_error
from sklearn.preprocessing import StandardScaler, RobustScaler
from sklearn.pipeline import make_pipeline

from pyspark.sql.types import *
from pyspark.sql import functions as F
from pyspark.sql.functions import when
from pyspark.sql.functions import udf

A pseudo-module of our data ingestion logic for this example

In [0]:
class DataIngest:
  
  def __init__(self, url, local, source, sink, schema, database, table_name):
    self.url = url
    self.local = local
    self.source = source
    self.sink = sink
    self.schema = schema
    self.database = database
    self.table_name = table_name
    
  def _acquire_raw(self):
    response = requests.get(self.url, stream=True)
    with open(self.url.split("/")[-1], "wb") as data:
      data.write(response.content)
    response.close()
    
  def _transfer_local(self):
    dbutils.fs.mv(self.local, self.source)
    
  def _read_source(self):
    return spark.read.csv(self.source, header=True, inferSchema=False, schema=self.schema)
  
  def _write_source(self, data):
    data.write.format("delta").mode("overwrite").option("mergeSchema", "true").option("overwriteSchema", "true").save(self.sink)
    
  @staticmethod
  def _supplement_data(data):
    
    @udf("double")
    def _calculate_heat_index(t, h):
      f = ((t * 9/5) + 32)
      t2 = math.pow(f, 2)
      h2 = math.pow(h, 2)
      c = [ 0.363445176, 0.988622465, 4.777114035, -0.114037667, -0.000850208, -0.020716198, 0.000687678, 0.000274954, 0]
      heat_index =  c[0] + (c[1]*f) + (c[2]*h) + (c[3]*f*h) + (c[4]*t2) + (c[5]*h2) + (c[6]*t2*h) + (c[7]*f*h2) + (c[8]*t2*h2)
      return ((heat_index - 32) * 5/9)
    
    return (data.withColumn("month", F.from_unixtime(F.unix_timestamp(F.col("month"), 'MMM'), 'MM').cast(DoubleType()))
                .withColumn("day", when(F.col("day") == "sun", 0.0).when(F.col("day") == "mon", 1.0)
                                  .when(F.col("day") == "tue", 2.0).when(F.col("day") == "wed", 3.0)
                                  .when(F.col("day") == "thu", 4.0).when(F.col("day") == "fri", 5.0)
                                  .otherwise(6.0)
                           )
                .withColumn("heat_index", _calculate_heat_index(F.col("temperature"), F.col("relative_humidity")))
           )
  
  def _create_table(self):
    spark.sql(f"CREATE DATABASE IF NOT EXISTS {self.database};")
    spark.sql(f"""CREATE TABLE IF NOT EXISTS {self.database}.{self.table_name} USING DELTA LOCATION '{self.sink}';""")
    dbutils.fs.rm(self.source)
  
  def register_data(self):
    self._acquire_raw()
    self._transfer_local()
    data = self._read_source()
    supplemented = self._supplement_data(data)
    self._write_source(supplemented)
    self._create_table()
    
  def get_data(self):
    return spark.table(f"{self.database}.{self.table_name}")
  
  def get_data_as_pandas(self):
    return self.get_data().toPandas()
  

A utility class that helps to simplify our configuration for the training code. The static path hard-coded into the class would be immutable based on our project's configuration. 
##### NOTE: <b><i>To run this in your environment, change the path within the workspace (to your registered email). I promise this won't execute for you in your environment. </i></b>

In [0]:
class PathHelper:
  def __init__(self, experiment_name):
    self.experiment_name = experiment_name
  def name_generator(self):
    return f"/Users/benjamin.wilson@databricks.com/Book/{self.experiment_name}" #Change the workspace path and email if you want to run this!

Define a dataclass for currying of data from the model registry and to generate a Spark DataFrame from that data.

In [0]:
@dataclass
class Registry:
  model_name: str
  production_version: int
  updated: bool
  training_time: str
    
class RegistryStructure:
  def __init__(self, data):
    self.data = data
  def generate_row(self):
    spark_df = spark.createDataFrame(pd.DataFrame([vars(self.data)]))
    return (spark_df.withColumn("training_time", F.to_timestamp(F.col("training_time")))
            .withColumn("production_version", F.col("production_version").cast("long")))

A mock module for handling the interface to the Model Registry. </br>
The class <i>RegistryLogging</i> provides the interface to record the state of passive retraining history (useful for externally checking the state of the current deployed model and the history of attempted overrides to prior best models) and to allow for a table-based audit trail. </br>
The class <i>ModelRegistration</i> provides functionality to handle the retrieval of best performing conditions of models trained. It will retrieve the current productio model's metrics from the model tracking server and compare those to the best metric run in the history of the project. If any run is better than current production performance, it will replace the production model with that new model. <br>
####NOTE
For a more thorough implementation (with far deeper complexity that would be distracting here for example purposes), would search only the current passive retraining event's metrics, pull the current production model's artifact, run a metrics calculation on the new holdout validation set, and determine if the current production model performs better or worse than the new passive retraining model. <br>
This full implementation would be many times larger than the one shown below. As an exercise, if you are interested, I encourage you to attempt to implement the above defined logic to see how a passive retraining system would look using MLflow.

In [0]:
class RegistryLogging:
  
  def __init__(self, database, table, delta_location, model_name, production_version, updated):
    self.database = database
    self.table = table
    self.delta_location = delta_location
    self.entry_data = Registry(model_name, production_version, updated, self._get_time())
  
  @classmethod
  def _get_time(self):
    return datetime.today().strftime('%Y-%m-%d %H:%M:%S')
  
  def _check_exists(self):
    return spark._jsparkSession.catalog().tableExists(self.database, self.table)
  
  def write_entry(self):
    log_row = RegistryStructure(self.entry_data).generate_row()
    log_row.write.format("delta").mode("append").save(self.delta_location)
    if not self._check_exists():
      spark.sql(f"""CREATE TABLE IF NOT EXISTS {self.database}.{self.table} USING DELTA LOCATION '{self.delta_location}';""")
    

class ModelRegistration:
  
  def __init__(self, experiment_name, experiment_title, model_name, metric, direction):
    self.experiment_name = experiment_name
    self.experiment_title = experiment_title
    self.model_name = model_name
    self.metric = metric
    self.direction = direction
    self.client = MlflowClient()
    self.experiment_id = mlflow.get_experiment_by_name(experiment_name).experiment_id
    
  def _get_best_run_info(self, key):
    run_data = mlflow.search_runs(self.experiment_id, order_by=[f"metrics.{self.metric} {self.direction}"])
    return run_data.head(1)[key].values[0]
    
  def _get_registered_status(self):
    return self.client.get_registered_model(name=self.experiment_title)
    
  def _get_current_prod(self):
    return [x.run_id for x in self._get_registered_status().latest_versions if x.current_stage == "Production"][0]
  
  def _get_prod_version(self):
    return int([x.version for x in self._get_registered_status().latest_versions if x.current_stage == "Production"][0])
  
  def _get_metric(self, run_id):
    return mlflow.get_run(run_id).data.metrics.get(self.metric)
  
  def _find_best(self):
    try: 
      current_prod_id = self._get_current_prod()
      prod_metric = self._get_metric(current_prod_id)
    except mlflow.exceptions.RestException:
      current_prod_id = -1
      prod_metric = 1e7
    
    best_id = self._get_best_run_info('run_id')
    best_metric = self._get_metric(best_id)
    
    if self.direction == "ASC":
      if prod_metric < best_metric:
        return current_prod_id
      else:
        return best_id
    else:
      if prod_metric > best_metric:
        return current_prod_id
      else:
        return best_id
  
  def _generate_artifact_path(self, run_id):
    return f"runs:/{run_id}/{self.model_name}"
    
  def register_best(self, registration_message, logging_location, log_db, log_table):
    best_id = self._find_best()
    try:
      current_prod = self._get_current_prod()
      current_prod_version = self._get_prod_version()
    except mlflow.exceptions.RestException:
      current_prod = -1
      current_prod_version = -1
    updated = current_prod != best_id
    
    if updated:
      register_new = mlflow.register_model(self._generate_artifact_path(best_id), self.experiment_title)
      self.client.update_registered_model(name=register_new.name, 
                                          description="Forest Fire Prediction for the National Park")
      self.client.update_model_version(name=register_new.name, 
                                       version=register_new.version, 
                                       description=registration_message)
      self.client.transition_model_version_stage(name=register_new.name, 
                                                 version=register_new.version, 
                                                 stage="Production")
      if current_prod_version > 0:
        self.client.transition_model_version_stage(name=register_new.name,
                                                   version=current_prod_version,
                                                   stage="Archived")
      RegistryLogging(log_db, log_table, logging_location, self.experiment_title, int(register_new.version), updated).write_entry()
      return "upgraded prod"
    else:
      RegistryLogging(log_db, log_table, logging_location, self.experiment_title, int(current_prod_version), updated).write_entry()
      return "no change"
    
  def get_model_as_udf(self):
    prod_id = self._get_current_prod()
    artifact_uri = self._generate_artifact_path(prod_id)
    return mlflow.pyfunc.spark_udf(spark, model_uri=artifact_uri)


A condensed collection of pseudo-modules (each of these classes would be in its own file in a production code base) related to model training.

In [0]:
class DataPrep:
    
    def __init__(self, data, label, test_size=0.3):
        self.data = data
        self.label = label
        self.test_size = test_size
        
    def split_features(self):
        Data = namedtuple('Data', 'X y')
        X = self.data.drop([self.label], axis=1)
        y = self.data[self.label]
        return Data(X, y)
    
    def train_test_split(self, stratify_column):
        TrainTest = namedtuple('Data', 'X_train X_test y_train y_test X y')
        split_data = self.split_features()
        X_train, X_test, y_train, y_test = train_test_split(split_data.X, 
                                                            split_data.y, 
                                                            stratify=split_data.X[stratify_column],
                                                            test_size=self.test_size
                                                           )
        return TrainTest(X_train, X_test, y_train, y_test, split_data.X, split_data.y)

class ModelScoring:
    
    def __init__(self, y_test, y_pred, param_count, algorithm):
        self.y_test = y_test
        self.y_pred = y_pred
        self.n = len(y_test)
        self.param_count = param_count
        self.algorithm = algorithm
    
    def _mse_calc(self):
        return mean_squared_error(self.y_test, self.y_pred)
    
    def _bic(self):
        return self.n * np.log(self._mse_calc()) + self.param_count * np.log(self.n)
    
    def _rmse(self):
        return np.sqrt(self._mse_calc())
    
    def evaluate(self):
        return {'rmse': self._rmse(), 'bic': self._bic()}[self.algorithm]

class Logging:
  
  def __init__(self, metric, model_name, run_name):
    self.metric = metric
    self.model_name = model_name
    self.run_name = run_name
    self.signature = {}
    
  def log_mlflow(self, trial, data, model, params):
    if not self.signature:
      self.signature = infer_signature(data.X_train, model.predict(data.X_train))
    with mlflow.start_run(run_name=self.run_name):
      mlflow.log_params(params)
      mlflow.sklearn.log_model(model, self.model_name, signature=self.signature)
      scores = ModelScoring(data.y_test, model.predict(data.X_test), len(params.keys()), self.metric).evaluate()
      mlflow.log_metric(self.metric, scores)
      mlflow.log_param("trial_number", trial.number)
    return scores
      
class RandomForestTuning:
    
    def __init__(self, data, label, stratify_column, trials, model_name, run_name, test_size=0.3, metric='rmse'):
        self.data = data
        self.label = label
        self.stratify_column = stratify_column
        self.trials = trials
        self.model_name = model_name
        self.run_name = run_name
        self.test_size = test_size
        self.metric = metric
    
    @staticmethod
    def _random_forest_model(**kwargs):
        model = RandomForestRegressor(n_estimators=kwargs['n_estimators'],
                                      max_depth=kwargs['max_depth'],
                                      min_samples_split=kwargs['min_samples_split'],
                                      min_samples_leaf=kwargs['min_samples_leaf'],
                                      max_leaf_nodes=kwargs['max_leaf_nodes'],
                                      min_impurity_decrease=kwargs['min_impurity_decrease'],
                                      max_features=kwargs['max_features'],
                                      n_jobs=-1
                                     )
        return model
    
    def _run_trial(self, trial):
        logger = Logging(self.metric, self.model_name, self.run_name)
        
        splits = DataPrep(self.data, self.label, self.test_size).train_test_split(self.stratify_column)
        params = {
                 'n_estimators': trial.suggest_int("n_estimators", 50, 2000, 10),
                 'max_depth': trial.suggest_int("max_depth", 2, 24, 1),
                 'min_samples_split': trial.suggest_int("min_samples_split", 2, 100, 1),
                 'min_samples_leaf': trial.suggest_int("min_samples_leaf", 1, 50, 1),
                 'max_leaf_nodes': trial.suggest_int("max_leaf_nodes", 4, 500, 1),
                 'min_impurity_decrease': trial.suggest_loguniform("min_impurity_decrease", 1e-22, 1e-1),
                 'max_features': trial.suggest_categorical("max_features", ['sqrt', 'log2'])
                }

        model = self._random_forest_model(**params).fit(splits.X_train, splits.y_train)
        scores = logger.log_mlflow(trial, splits, model, params)
        return scores
        
    def run(self, experiment_name):
        mlflow.set_experiment(experiment_name)

        trial = optuna.create_study(direction='minimize')
        trial.optimize(self._run_trial, self.trials)
        return trial    

The job configuration definitions<br>
##### NOTE 
In order to execute this in your environment, change the paths (and certainly the email address for the workspace pathing) to your own.

In [0]:
EXPERIMENT_TITLE = "Forest_Fire_Model_5"
EXPERIMENT_NAME = PathHelper(EXPERIMENT_TITLE).name_generator()
RUN_NAME = "initial_prod_release"
MODEL_NAME = "random_forest_model"
OPTIMIZER_ITERATIONS = 5
TEST_PERCENTAGE = 0.5

DATA_URL = "https://archive.ics.uci.edu/ml/machine-learning-databases/forest-fires/forestfires.csv"
LOCAL_FILE = "file:/databricks/driver/forestfires.csv"
SOURCE = "dbfs:/home/benjamin.wilson@databricks.com/demo/firedata/fire.csv"  #Change this to run in your environment
SINK = "/home/benjamin.wilson@databricks.com/demo/fire" # Change this to run in your environment
DATABASE = "ben_demo" # Change this (unless you really want a database with my name in your workspace) to run in your environment
TABLE_NAME = "fire_regression"
LOG_TABLE = f"{TABLE_NAME}_logs"
LOGGING_LOCATION = f"{SINK}_{LOG_TABLE}"

FIRE_SCHEMA = StructType([
  StructField('x_coord', DoubleType()),
  StructField('y_coord', DoubleType()),
  StructField('month', StringType()),
  StructField('day', StringType()),
  StructField('fine_fuel_moisture_code', DoubleType()),
  StructField('duff_moisture_code', DoubleType()),
  StructField('drought_code', DoubleType()),
  StructField('initial_spread_index', DoubleType()),
  StructField('temperature', DoubleType()),
  StructField('relative_humidity', DoubleType()),
  StructField('windspeed', DoubleType()),
  StructField('rain_amount', DoubleType()),
  StructField('area', DoubleType())
])

Instantiation of the data retrieval logic. <br>
##### NOTE
We will be covering a <b><i>far better method for this</i></b> in the other notebook for this chapter (Chapter16_2), utilizing a Feature Store so that we don't have data access and feature engineering code within our modeling code base.

In [0]:
data_handler = DataIngest(DATA_URL, LOCAL_FILE, SOURCE, SINK, FIRE_SCHEMA, DATABASE, TABLE_NAME)
data_handler.register_data()
fire_data = data_handler.get_data_as_pandas()

Run the model training (initial run)

In [0]:
random_forest_tune = RandomForestTuning(fire_data, 'area', 'x_coord', OPTIMIZER_ITERATIONS, MODEL_NAME, RUN_NAME, TEST_PERCENTAGE, 'rmse')
trial_rf = random_forest_tune.run(EXPERIMENT_NAME)

Register the model

In [0]:
registry = ModelRegistration(EXPERIMENT_NAME, EXPERIMENT_TITLE, MODEL_NAME, "rmse", "ASC")
registry.register_best("initial run", LOGGING_LOCATION, DATABASE, LOG_TABLE)

For demonstration purposes only - this is the audit tracking table that shows the history of all of our models that have been registered and their state transitions.

In [0]:
display(spark.table(f"{DATABASE}.{LOG_TABLE}").orderBy(F.col("training_time")))

model_name,production_version,updated,training_time
Forest_Fire_Model_5,1,True,2021-07-16T17:38:54.000+0000


Now let's try another tuning run and see what happens with the registry and the logging table (this is a simulation of scheduling a passive retraining run of our modeling code from above, not a recommendation of what a code base should look like).

In [0]:
RUN_NAME = "updated_prod_release"
OPTIMIZER_ITERATIONS = 50
TEST_PERCENTAGE = 0.1

random_forest_tune_update = RandomForestTuning(fire_data, 'area', 'x_coord', OPTIMIZER_ITERATIONS, MODEL_NAME, RUN_NAME, TEST_PERCENTAGE, 'rmse')
trial_rf_update = random_forest_tune_update.run(EXPERIMENT_NAME)

registry_update = ModelRegistration(EXPERIMENT_NAME, EXPERIMENT_TITLE, MODEL_NAME, "rmse", "ASC")
registry_update.register_best("initial run", LOGGING_LOCATION, DATABASE, LOG_TABLE)

Now we have a new entry from the retraining, an update of the production version of the model, and an indiciation that the update occurred.

In [0]:
display(spark.table(f"{DATABASE}.{LOG_TABLE}").orderBy(F.col("training_time")))

model_name,production_version,updated,training_time
Forest_Fire_Model_5,1,True,2021-07-16T17:38:54.000+0000
Forest_Fire_Model_5,2,True,2021-07-16T17:41:46.000+0000


Now let's simulate a retraining that doesn't cause the production model to update to see what that aspect of passive retraining looks like...

In [0]:
RUN_NAME = "passive_retrain_attempt"
OPTIMIZER_ITERATIONS = 5 # intentionally lowering this to create a poorly tuned model
TEST_PERCENTAGE = 0.6 # intentionally making it bad for demonstration purposes

random_forest_tune_update = RandomForestTuning(fire_data, 'area', 'x_coord', OPTIMIZER_ITERATIONS, MODEL_NAME, RUN_NAME, TEST_PERCENTAGE, 'rmse')
trial_rf_update = random_forest_tune_update.run(EXPERIMENT_NAME)

registry_update = ModelRegistration(EXPERIMENT_NAME, EXPERIMENT_TITLE, MODEL_NAME, "rmse", "ASC")
registry_update.register_best("initial run", LOGGING_LOCATION, DATABASE, LOG_TABLE)

Now we can see that another iteration of the training happened, but the performance wasn't as good as the current production model. We have an audit that the retraining happened, when it happened, and what the state of the current running model is from a simple table interface.

In [0]:
display(spark.table(f"{DATABASE}.{LOG_TABLE}").orderBy(F.col("training_time")))

model_name,production_version,updated,training_time
Forest_Fire_Model_5,1,True,2021-07-16T17:38:54.000+0000
Forest_Fire_Model_5,2,True,2021-07-16T17:41:46.000+0000
Forest_Fire_Model_5,2,False,2021-07-16T17:42:26.000+0000


Utilizing a registered model for inference on a Spark DataFrame is as simple as retrieving the artifact, applying the model to the partitions of the to-be-predicted Spark DataFrame data within a udf. We're specifying the input to the udf as all of the columns within the data set with exception of the label data (purely for demonstrations here; in a real scenario, the label will not be known at inference time (obviously)).

In [0]:
raw_data = spark.table("ben_demo.fire_regression")
predicted_data = raw_data.withColumn("prediction", registry.get_model_as_udf()(*[x for x in raw_data.columns if x != "area"]))
display(predicted_data)

x_coord,y_coord,month,day,fine_fuel_moisture_code,duff_moisture_code,drought_code,initial_spread_index,temperature,relative_humidity,windspeed,rain_amount,area,heat_index,prediction
7.0,5.0,3.0,5.0,86.2,26.2,94.3,5.1,8.2,51.0,6.7,0.0,0.0,22.58586522123999,10.365023189946848
7.0,4.0,10.0,2.0,90.6,35.4,669.1,6.7,18.0,33.0,0.9,0.0,0.0,19.24285690820003,9.13129530110445
7.0,4.0,10.0,6.0,90.6,43.7,686.9,6.7,14.6,33.0,1.3,0.0,0.0,18.547475427608017,10.374295457931494
8.0,6.0,3.0,5.0,91.7,33.3,77.5,9.0,8.3,97.0,4.0,0.2,0.0,16.963338703982707,7.964178634313191
8.0,6.0,3.0,0.0,89.3,51.3,102.2,9.6,11.4,99.0,1.8,0.0,0.0,13.455547685325405,7.91546223925741
8.0,6.0,8.0,0.0,92.3,85.3,488.0,14.7,22.2,29.0,5.4,0.0,0.0,21.191912198025776,11.965842554322174
8.0,6.0,8.0,1.0,92.3,88.9,495.6,8.5,24.1,27.0,3.1,0.0,0.0,22.472145524617343,17.840660639433295
8.0,6.0,8.0,1.0,91.5,145.4,608.2,10.7,8.0,86.0,2.2,0.0,0.0,20.35991968377781,11.411729175991312
8.0,6.0,9.0,2.0,91.0,129.5,692.6,7.0,13.1,63.0,5.4,0.0,0.0,19.222061152121366,13.955393468777862
7.0,5.0,9.0,6.0,92.5,88.0,698.6,7.1,22.8,40.0,4.0,0.0,0.0,22.045141129743985,15.7942849063639
