## MLflow's Model Registry

### Interacting with the MLflow tracking server

The `MlflowClient` object allows us to interact with : 
- MLflow Tracking Server that creates and manages experiments and runs.
- MLflow Registry Server that creates and manages registered models and model versions. 

#### Instantiate tracking URI

In [1]:
# Set our database's tracking URI
from mlflow.tracking import MlflowClient
from mlflow.entities import ViewType

MLFLOW_TRACKING_URI = "sqlite:///mlflow.db"
client = MlflowClient(tracking_uri=MLFLOW_TRACKING_URI)

#### List of all experiment tracking

In [2]:
# List of all experiment tracking
experiments = client.search_experiments(order_by=["experiment_id ASC"])
for experiment in experiments:
    print(f'experiment_id:{experiment.experiment_id}, experiment_name:{experiment.name}')

experiment_id:0, experiment_name:Default
experiment_id:1, experiment_name:duration-trip
experiment_id:2, experiment_name:duration-trip-autolog


#### List of all experiment runs

In [3]:
# Timestamp converter function
import datetime

def convert_timestamp_to_seconds(timestamp):    
    # Convert it to a seconds format
    timestamp_in_seconds = timestamp / 1000
    datetime_converted = datetime.datetime.fromtimestamp(timestamp_in_seconds)
    datetime_formatted = datetime_converted.strftime("%Y-%m-%d %H:%M:%S")
    return datetime_formatted

In [9]:
# List of all experiment runs
runs = client.search_runs(
    experiment_ids='2',
    run_view_type=ViewType.ACTIVE_ONLY
)

for run in runs[:5]:
    print(run.data.tags['estimator_name'], run.info.run_id, run.info.start_time, run.info.end_time, (run.info.end_time - run.info.start_time) / 1000, run.data.metrics['rmse'])

XGBRegressor b09242a9964c4965bc8baa572bc2103b 1729416323278 1729416324851 1.573 6.644320289321216
LinearSVR 47e45a45a4e441afa61e6e23cc7624c2 1729415640307 1729415642934 2.627 807.9904772805995
ExtraTreesRegressor 794703933ee54ea79264b8d27cea1959 1729414318597 1729415640237 1321.64 6.940426720605602
GradientBoostingRegressor 668dbd3cd12547428e8fd65c5b4ae3d6 1729414314111 1729414317862 3.751 6.742303328497426
RandomForestRegressor 3982aae14ad24edca76cf903b45350d9 1729413817195 1729414244361 427.166 6.914909840472828


In [8]:
# Convert it on pandas dataframe format
import pandas as pd

runs_info = []
for run in runs:
    runs_info.append({
        'estimator_name':run.data.tags['estimator_name'],
        'run_id':run.info.run_id,
        'start_time':convert_timestamp_to_seconds(run.info.start_time),
        'end_time':convert_timestamp_to_seconds(run.info.end_time),
        'duration':(run.info.end_time - run.info.start_time)/1000,
        'rmse':run.data.metrics['rmse']
    })
    
runs_info_df = pd.DataFrame(runs_info)
runs_info_df.head()

Unnamed: 0,estimator_name,run_id,start_time,end_time,duration,rmse
0,XGBRegressor,b09242a9964c4965bc8baa572bc2103b,2024-10-20 16:25:23,2024-10-20 16:25:24,1.573,6.64432
1,LinearSVR,47e45a45a4e441afa61e6e23cc7624c2,2024-10-20 16:14:00,2024-10-20 16:14:02,2.627,807.990477
2,ExtraTreesRegressor,794703933ee54ea79264b8d27cea1959,2024-10-20 15:51:58,2024-10-20 16:14:00,1321.64,6.940427
3,GradientBoostingRegressor,668dbd3cd12547428e8fd65c5b4ae3d6,2024-10-20 15:51:54,2024-10-20 15:51:57,3.751,6.742303
4,RandomForestRegressor,3982aae14ad24edca76cf903b45350d9,2024-10-20 15:43:37,2024-10-20 15:50:44,427.166,6.91491


#### Create a new experiment

In [16]:
# Create a new experiment
client.create_experiment(name="new-experiment-tracking")

'3'

#### Delete existing experiment

In [14]:
# Delete existing experiment by id
client.delete_experiment(experiment_id='3')

# Permanently delete experiment
# mlflow gc --backend-store-uri sqlite:///mlflow.db --experiment-ids 3

#### Check the experiment has rmse lower than 7

In [25]:
from mlflow.entities import ViewType

runs = client.search_runs(
    experiment_ids='2',
    filter_string="metrics.rmse < 7",
    run_view_type=ViewType.ACTIVE_ONLY,
    max_results=5,
    order_by=["metrics.rmse ASC"]
)

# Convert it on pandas dataframe format
runs_info = []
for run in runs:
    runs_info.append({
        'estimator_name':run.data.tags['estimator_name'],
        'run_id':run.info.run_id,
        'start_time':convert_timestamp_to_seconds(run.info.start_time),
        'end_time':convert_timestamp_to_seconds(run.info.end_time),
        'duration':(run.info.end_time - run.info.start_time)/1000,
        'rmse':run.data.metrics['rmse']
    })
    
runs_info_df = pd.DataFrame(runs_info)
runs_info_df.head()

Unnamed: 0,estimator_name,run_id,start_time,end_time,duration,rmse
0,XGBRegressor,b09242a9964c4965bc8baa572bc2103b,2024-10-20 16:25:23,2024-10-20 16:25:24,1.573,6.64432
1,GradientBoostingRegressor,668dbd3cd12547428e8fd65c5b4ae3d6,2024-10-20 15:51:54,2024-10-20 15:51:57,3.751,6.742303
2,RandomForestRegressor,3982aae14ad24edca76cf903b45350d9,2024-10-20 15:43:37,2024-10-20 15:50:44,427.166,6.91491
3,ExtraTreesRegressor,794703933ee54ea79264b8d27cea1959,2024-10-20 15:51:58,2024-10-20 16:14:00,1321.64,6.940427


#### Register model name in the model registry

In [35]:
from mlflow import MlflowClient
from mlflow.store.artifact.runs_artifact_repo import RunsArtifactRepository

Register model name

In [36]:
registered_model_name = 'nyc-taxi-regressor'

# Register model name in the model registry
client = MlflowClient()
client.create_registered_model(registered_model_name)

<RegisteredModel: aliases={}, creation_timestamp=1729419016922, description=None, last_updated_timestamp=1729419016922, latest_versions=[], name='nyc-taxi-regressor', tags={}>

Create spesific version of registered model

In [42]:
# Define the run_id model we want to register
# best_model_run_id = runs_info_df.loc[:, 'run_id'][0]
# best_model_estimator_name = runs_info_df.loc[:, 'estimator_name'][0]

best_model_run_id = runs_info_df.loc[:, 'run_id'][1]
best_model_estimator_name = runs_info_df.loc[:, 'estimator_name'][1]

# Params
registered_model_name = 'nyc-taxi-regressor'
runs_uri = f"runs:/{best_model_run_id}/model"
model_source = RunsArtifactRepository.get_underlying_uri(runs_uri)
description = f"This is a {best_model_estimator_name} model"

# Crate new version of registered model
client.create_model_version(name=registered_model_name, source=model_source, run_id=best_model_run_id, description=description)

<ModelVersion: aliases=[], creation_timestamp=1729419488740, current_stage='None', description='This is a GradientBoostingRegressor model', last_updated_timestamp=1729419488740, name='nyc-taxi-regressor', run_id='668dbd3cd12547428e8fd65c5b4ae3d6', run_link=None, source=('/Users/farelyue/Documents/Projects/Data '
 'Science/mlops-zoomcamp/02-experiment-tracking/mlruns/2/668dbd3cd12547428e8fd65c5b4ae3d6/artifacts/model'), status='READY', status_message=None, tags={}, user_id=None, version=3>

Delete spesific version of registered model

In [39]:
# Define registered model and version name we want to delete
registered_model_name = 'nyc-taxi-regressor'
registered_model_version = '1'

# Delete spesific version of registered model
client.delete_model_version(name=registered_model_name, version=registered_model_version)

#### List of all version of a registered model

In [47]:
# List of all version
registered_model_name = 'nyc-taxi-regressor'
versions = client.search_model_versions(filter_string=f"name='{registered_model_name}'")

version_info = []

for version in versions:
    version_info.append({
        'name':version.name,
        'created_date':convert_timestamp_to_seconds(version.creation_timestamp),
        'version':version.version,
        'current_stage':version.current_stage,
        'description':version.description
    })

version_info_df = pd.DataFrame(version_info)
version_info_df.head()

Unnamed: 0,name,created_date,version,current_stage,description
0,nyc-taxi-regressor,2024-10-20 17:18:08,3,,This is a version:3 of nyc-taxi-regressor model
1,nyc-taxi-regressor,2024-10-20 17:16:26,2,,This is a version:2 of nyc-taxi-regressor model


#### Update the description of registered model version

In [46]:
# Define registered model and version name we want to update
registered_model_name = 'nyc-taxi-regressor'
registered_model_version = '3'

# Update the description of registered model
client.update_model_version(
    name = registered_model_name,
    version = registered_model_version,
    description = f"This is a version:{registered_model_version} of {registered_model_name} model"
)

<ModelVersion: aliases=[], creation_timestamp=1729419488740, current_stage='None', description='This is a version:3 of nyc-taxi-regressor model', last_updated_timestamp=1729419812414, name='nyc-taxi-regressor', run_id='668dbd3cd12547428e8fd65c5b4ae3d6', run_link=None, source=('/Users/farelyue/Documents/Projects/Data '
 'Science/mlops-zoomcamp/02-experiment-tracking/mlruns/2/668dbd3cd12547428e8fd65c5b4ae3d6/artifacts/model'), status='READY', status_message=None, tags={}, user_id=None, version=3>

#### Update the tag of registered model version

In [48]:
# Define registered model and version name we want to update
registered_model_name = 'nyc-taxi-regressor'
registered_model_version = '3'

# Update the tag of registered model
client.set_model_version_tag(
    name = registered_model_name,
    version = registered_model_version,
    key = 'estimator_name',
    value = 'GradientBoostingRegressor'
)

#### Transit registered model stage

In [49]:
# Define registered model and version name we want to transit stage
registered_model_name = 'nyc-taxi-regressor'
registered_model_version = '2'

# Transit registered model stage from None to Production
client.transition_model_version_stage(
    name = registered_model_name,
    version = registered_model_version,
    stage = 'production',
    archive_existing_versions = True   
)

  client.transition_model_version_stage(


<ModelVersion: aliases=[], creation_timestamp=1729419386781, current_stage='Production', description='This is a version:2 of nyc-taxi-regressor model', last_updated_timestamp=1729420028398, name='nyc-taxi-regressor', run_id='b09242a9964c4965bc8baa572bc2103b', run_link=None, source=('/Users/farelyue/Documents/Projects/Data '
 'Science/mlops-zoomcamp/02-experiment-tracking/mlruns/2/b09242a9964c4965bc8baa572bc2103b/artifacts/model'), status='READY', status_message=None, tags={}, user_id=None, version=2>

In [51]:
# Define registered model and version name we want to transit stage
registered_model_name = 'nyc-taxi-regressor'
registered_model_version = '3'

# Transit registered model stage from None to Production
client.transition_model_version_stage(
    name = registered_model_name,
    version = registered_model_version,
    stage = 'staging',
    archive_existing_versions = True   
)

  client.transition_model_version_stage(


<ModelVersion: aliases=[], creation_timestamp=1729419488740, current_stage='Staging', description='This is a version:3 of nyc-taxi-regressor model', last_updated_timestamp=1729420074852, name='nyc-taxi-regressor', run_id='668dbd3cd12547428e8fd65c5b4ae3d6', run_link=None, source=('/Users/farelyue/Documents/Projects/Data '
 'Science/mlops-zoomcamp/02-experiment-tracking/mlruns/2/668dbd3cd12547428e8fd65c5b4ae3d6/artifacts/model'), status='READY', status_message=None, tags={'estimator_name': 'GradientBoostingRegressor'}, user_id=None, version=3>

### Comparing versions and selecting the new "Production" model

In the last section, we will retrieve models registered in the model registry and compare their performance on an unseen test set. The idea is to simulate the scenario in which a deployment engineer has to interact with the model registry to decide whether to update the model version that is in production or not.

These are the steps:

1. Load the test dataset, which corresponds to the NYC Green Taxi data from the month of March 2021.
2. Download the `DictVectorizer` that was fitted using the training data and saved to MLflow as an artifact, and load it with pickle.
3. Preprocess the test set using the `DictVectorizer` so we can properly feed the regressors.
4. Make predictions on the test set using the model versions that are currently in the "Staging" and "Production" stages, and compare their performance.
5. Based on the results, update the "Production" model version accordingly.


**Note: the model registry doesn't actually deploy the model to production when you transition a model to the "Production" stage, it just assign a label to that model version. You should complement the registry with some CI/CD code that does the actual deployment.**

#### Import the libraries

In [83]:
# Import the libraries
import pandas as pd
import numpy as np
import os
from time import time

from sklearn.metrics import root_mean_squared_error
import mlflow
import pickle

import warnings
warnings.filterwarnings('ignore')

#### Helper Function

In [54]:
# Read dataset and give scope limitation
def read_dataframe(file_path):

    # Load csv dataset
    df = pd.read_parquet(file_path)

    # Convert pick up and drop off location into string format
    categorical = ['PULocationID', 'DOLocationID']
    df[categorical] = df[categorical].astype(str)

    # Convert pick up and drop off time column into datetime format
    df.lpep_pickup_datetime = pd.to_datetime(df.lpep_pickup_datetime)
    df.lpep_dropoff_datetime = pd.to_datetime(df.lpep_dropoff_datetime)

    # Add duration column, differences between pick up and drop off time
    df['duration'] = df.lpep_dropoff_datetime - df.lpep_pickup_datetime
    
    # Convert duration column into minute format
    df.duration = df.duration.apply(lambda td: td.total_seconds() / 60)

    # Filter duration between 1 and 60 minutes
    df = df[(df.duration >= 1) & (df.duration <= 60)]
    
    return df


# Preprocess dataset
def preprocess(df, dv):
    df['PU_DO'] = df['PULocationID'] + '_' + df['DOLocationID']
    categorical = ['PU_DO']
    numerical = ['trip_distance']
    df_dicts = df[categorical + numerical].to_dict(orient='records')
    dv_transformed = dv.transform(df_dicts)
    return dv_transformed


# Evaluate model with test dataset
def test_model(model_name, stage, X_test, y_test):
    model = mlflow.pyfunc.load_model(f"models:/{model_name}/{stage}")
    y_pred = model.predict(X_test)
    return {"rmse": root_mean_squared_error(y_test, y_pred)}

#### Load the test dataset

In [55]:
# Load the test dataset
df_test = read_dataframe("./data/green_tripdata_2021-03.parquet")
df_test.head()

Unnamed: 0,VendorID,lpep_pickup_datetime,lpep_dropoff_datetime,store_and_fwd_flag,RatecodeID,PULocationID,DOLocationID,passenger_count,trip_distance,fare_amount,...,mta_tax,tip_amount,tolls_amount,ehail_fee,improvement_surcharge,total_amount,payment_type,trip_type,congestion_surcharge,duration
0,2,2021-03-01 00:05:42,2021-03-01 00:14:03,N,1.0,83,129,1.0,1.56,7.5,...,0.5,0.0,0.0,,0.3,8.8,1.0,1.0,0.0,8.35
1,2,2021-03-01 00:21:03,2021-03-01 00:26:17,N,1.0,243,235,1.0,0.96,6.0,...,0.5,0.0,0.0,,0.3,7.3,2.0,1.0,0.0,5.233333
2,2,2021-03-01 00:02:06,2021-03-01 00:22:26,N,1.0,75,242,1.0,9.93,28.0,...,0.5,2.0,0.0,,0.3,31.3,1.0,1.0,0.0,20.333333
3,2,2021-03-01 00:24:03,2021-03-01 00:31:43,N,1.0,242,208,1.0,2.57,9.5,...,0.5,0.0,0.0,,0.3,10.8,2.0,1.0,0.0,7.666667
4,1,2021-03-01 00:11:10,2021-03-01 00:14:46,N,1.0,41,151,1.0,0.8,5.0,...,0.5,1.85,0.0,,0.3,8.15,1.0,1.0,0.0,3.6


In [58]:
# Initialize Mlfow Client
client = MlflowClient()

# List of all version
registered_model_name = 'nyc-taxi-regressor'
versions = client.search_model_versions(filter_string=f"name='{registered_model_name}'")

version_info = []

for version in versions:
    version_info.append({
        'name':version.name,
        'created_date':convert_timestamp_to_seconds(version.creation_timestamp),
        'version':version.version,
        'current_stage':version.current_stage,
        'description':version.description,
        'run_id':version.run_id
    })

version_info_df = pd.DataFrame(version_info)
version_info_df.head()

Unnamed: 0,name,created_date,version,current_stage,description,run_id
0,nyc-taxi-regressor,2024-10-20 17:18:08,3,Staging,This is a version:3 of nyc-taxi-regressor model,668dbd3cd12547428e8fd65c5b4ae3d6
1,nyc-taxi-regressor,2024-10-20 17:16:26,2,Production,This is a version:2 of nyc-taxi-regressor model,b09242a9964c4965bc8baa572bc2103b


#### Download the vectorizer preprocessor

Download the vectorizer

In [70]:
# Define registered model, version name, run id we want to transit load
registered_model_name = version_info_df.loc[:, 'name'].values[0]
registered_model_version = version_info_df.loc[version_info_df['current_stage'] == 'Production', 'version'].values[0]
registered_model_run_id = version_info_df.loc[version_info_df['version'] == registered_model_version, 'run_id'].values[0]

# Download the vectorizer preprocessor
os.makedirs('models', exist_ok=True)
client.download_artifacts(run_id=registered_model_run_id, path='preprocessor', dst_path='models')

Downloading artifacts:   0%|          | 0/1 [00:00<?, ?it/s]

'/Users/farelyue/Documents/Projects/Data Science/mlops-zoomcamp/02-experiment-tracking/models/preprocessor'

Load the vectorizer

In [72]:
# Load the vectorizer preprocessor
with open('models/preprocessor/preprocessor.b', 'rb') as f_in:
    dv = pickle.load(f_in)

Load the production model version

In [84]:
# Transform feature test dataset using vectorizer and define target value
target = 'duration'
X_test = preprocess(df_test, dv)
y_test = df_test[target].values

# Define the model name and stage of registered model
registered_model_name = 'nyc-taxi-regressor'
stage = 'Production'

# Evaluate model on test dataset
%time test_model(model_name=registered_model_name, stage=stage, X_test=X_test, y_test=y_test)

CPU times: user 290 ms, sys: 24.2 ms, total: 314 ms
Wall time: 56.4 ms


{'rmse': 6.57711317277908}

Load the staging model version

In [85]:
# Transform feature test dataset using vectorizer and define target value
target = 'duration'
X_test = preprocess(df_test, dv)
y_test = df_test[target].values

# Define the model name and stage of registered model
registered_model_name = 'nyc-taxi-regressor'
stage = 'Staging'

# Evaluate model on test dataset
%time test_model(model_name=registered_model_name, stage=stage, X_test=X_test, y_test=y_test)

CPU times: user 75.4 ms, sys: 2.32 ms, total: 77.7 ms
Wall time: 78 ms


{'rmse': 6.659623830022515}