## MLflow's Model Registry

In [1]:
import mlflow
from mlflow.tracking import MlflowClient

In [2]:
# set the MLFLOW_TRACKING_URI to the sqlite database
MLFLOW_TRACKING_URI = "sqlite:///mlflow.db"

### Interacting with the MLflow tracking server

The `MlflowClient` object allows us to interact with...
- an MLflow Tracking Server that creates and manages experiments and runs.
- an MLflow Registry Server that creates and manages registered models and model versions. 

More info: https://mlflow.org/docs/latest/python_api/mlflow.client.html

To instantiate it we need to pass a tracking URI and/or a registry URI

In [3]:
# create a client to interact with the tracking server
client = MlflowClient(tracking_uri=MLFLOW_TRACKING_URI)
# search for all experiments in the tracking server
client.search_experiments()

[<Experiment: artifact_location='/Users/gbemidebe/Documents/GitHub/mlops-zoomcamp/02-experiment-tracking/examples/case02/mlruns/3', creation_time=1716837397382, experiment_id='3', last_update_time=1716837397382, lifecycle_stage='active', name='experiment-model-registry', tags={}>,
 <Experiment: artifact_location='/Users/gbemidebe/Documents/GitHub/mlops-zoomcamp/02-experiment-tracking/examples/case02/mlruns/2', creation_time=1716755843419, experiment_id='2', last_update_time=1716755843419, lifecycle_stage='active', name='my-cool-experiment', tags={}>,
 <Experiment: artifact_location='/Users/gbemidebe/Documents/GitHub/mlops-zoomcamp/02-experiment-tracking/examples/case02/mlruns/1', creation_time=1716753193765, experiment_id='1', last_update_time=1716753193765, lifecycle_stage='active', name='nyc-taxi-experiment', tags={}>,
 <Experiment: artifact_location='/Users/gbemidebe/Documents/GitHub/mlops-zoomcamp/02-experiment-tracking/examples/case02/mlruns/0', creation_time=1716753193762, experi

In [4]:
# create new experiment named "experiment-model-registry" inside the tracking server
# client.create_experiment(name="experiment-model-registry-")

Let's check the latest versions for the experiment with id `1`...

In [5]:
# search for all experiments in the tracking server
from mlflow.entities import ViewType

runs = client.search_runs(
    experiment_ids='1', # experiment id for name='nyc-taxi-experiment'
    filter_string="metrics.rmse < 20",
    run_view_type=ViewType.ACTIVE_ONLY,
    max_results=5,
    order_by=["metrics.rmse DESC"]
    )

In [25]:
# print the runs inside the runs with experiment id 1 & rmse < 20 & order by rmse desc & max results 5
for run in runs:
    print(f"run id: {run.info.run_id}, rmse: {run.data.metrics['rmse']:.4f}")

run id: 095bc04ae61b46e296072cfc409a278b, rmse: 7.7467
run id: 357731362a014aea933f8617d8528925, rmse: 7.3873
run id: 374efc02641b436bb6e43cf99eb0022f, rmse: 7.1227
run id: 731d83d1cbbe4b2a8c61ceec9bfd2503, rmse: 6.9276
run id: e122cfeb172c427290ec6fc8ffd043a7, rmse: 6.9116


### Interacting with the Model Registry

In this section We will use the `MlflowClient` instance to:

1. Register a new version for the experiment `nyc-taxi-regressor`
2. Retrieve the latests versions of the model `nyc-taxi-regressor` and check that a new version `4` was created.
3. Transition the version `4` to "Staging" and adding annotations to it.

In [26]:
# set the MLFLOW_TRACKING_URI to the sqlite database
mlflow.set_tracking_uri(MLFLOW_TRACKING_URI)

In [27]:
# check the models at the Model Registry (under MLFLOW_TRACKING_URI) for registered models
model_name = "nyc-taxi-model-R3"
for run in runs:
    run_id = run.info.run_id
    mlflow.register_model(model_uri=f"runs:/{run_id}/model", name=model_name)
    print(f"Registered model with run_id: {run_id}")

# to register a specific model, uncomment the following lines
# run_id = "e122cfeb172c427290ec6fc8ffd043a7"
# model_uri = f"runs:/{run_id}/model"
# mlflow.register_model(model_uri=model_uri, name="nyc-taxi-regressor")

Registered model with run_id: 095bc04ae61b46e296072cfc409a278b
Registered model with run_id: 357731362a014aea933f8617d8528925
Registered model with run_id: 374efc02641b436bb6e43cf99eb0022f
Registered model with run_id: 731d83d1cbbe4b2a8c61ceec9bfd2503
Registered model with run_id: e122cfeb172c427290ec6fc8ffd043a7


Successfully registered model 'nyc-taxi-model-R3'.
Created version '1' of model 'nyc-taxi-model-R3'.
Registered model 'nyc-taxi-model-R3' already exists. Creating a new version of this model...
Created version '2' of model 'nyc-taxi-model-R3'.
Registered model 'nyc-taxi-model-R3' already exists. Creating a new version of this model...
Created version '3' of model 'nyc-taxi-model-R3'.
Registered model 'nyc-taxi-model-R3' already exists. Creating a new version of this model...
Created version '4' of model 'nyc-taxi-model-R3'.
Registered model 'nyc-taxi-model-R3' already exists. Creating a new version of this model...
Created version '5' of model 'nyc-taxi-model-R3'.


In [28]:
# search for the registered model
client.search_registered_models(filter_string=f"name='{model_name}'")

[<RegisteredModel: aliases={}, creation_timestamp=1716842569889, description=None, last_updated_timestamp=1716842569935, latest_versions=[<ModelVersion: aliases=[], creation_timestamp=1716842569935, current_stage='None', description=None, last_updated_timestamp=1716842569935, name='nyc-taxi-model-R3', run_id='e122cfeb172c427290ec6fc8ffd043a7', run_link=None, source='/Users/gbemidebe/Documents/GitHub/mlops-zoomcamp/02-experiment-tracking/examples/case02/mlruns/1/e122cfeb172c427290ec6fc8ffd043a7/artifacts/model', status='READY', status_message=None, tags={}, user_id=None, version=5>], name='nyc-taxi-model-R3', tags={}>]

In [29]:
# get the latest versions of the registered model "nyc-taxi-model"
latest_versions = client.get_latest_versions(name=model_name)

for version in latest_versions:
    print(f"version: {version.version}, stage: {version.current_stage}")

version: 5, stage: None


  latest_versions = client.get_latest_versions(name=model_name)


In [30]:
latest_versions

[<ModelVersion: aliases=[], creation_timestamp=1716842569935, current_stage='None', description=None, last_updated_timestamp=1716842569935, name='nyc-taxi-model-R3', run_id='e122cfeb172c427290ec6fc8ffd043a7', run_link=None, source='/Users/gbemidebe/Documents/GitHub/mlops-zoomcamp/02-experiment-tracking/examples/case02/mlruns/1/e122cfeb172c427290ec6fc8ffd043a7/artifacts/model', status='READY', status_message=None, tags={}, user_id=None, version=5>]

In [31]:
model_version = 3
new_stage = "Staging"
print(f"Transitioning model '{model_name}' version '{model_version}' to stage '{new_stage}'")
client.transition_model_version_stage(
    name=model_name,
    version=model_version,
    stage=new_stage,
    archive_existing_versions=False
)

Transitioning model 'nyc-taxi-model-R3' version '3' to stage 'Staging'


  client.transition_model_version_stage(


<ModelVersion: aliases=[], creation_timestamp=1716842569924, current_stage='Staging', description=None, last_updated_timestamp=1716842605714, name='nyc-taxi-model-R3', run_id='374efc02641b436bb6e43cf99eb0022f', run_link=None, source='/Users/gbemidebe/Documents/GitHub/mlops-zoomcamp/02-experiment-tracking/examples/case02/mlruns/1/374efc02641b436bb6e43cf99eb0022f/artifacts/model', status='READY', status_message=None, tags={}, user_id=None, version=3>

In [53]:
model_version = 4
new_stage = "Staging"
print(f"Transitioning model '{model_name}' version '{model_version}' to stage '{new_stage}'")
client.transition_model_version_stage(
    name=model_name,
    version=model_version,
    stage=new_stage,
    archive_existing_versions=False
)

Transitioning model 'nyc-taxi-model-R3' version '4' to stage 'Staging'


  client.transition_model_version_stage(


<ModelVersion: aliases=[], creation_timestamp=1716842569929, current_stage='Staging', description=None, last_updated_timestamp=1716842967832, name='nyc-taxi-model-R3', run_id='731d83d1cbbe4b2a8c61ceec9bfd2503', run_link=None, source='/Users/gbemidebe/Documents/GitHub/mlops-zoomcamp/02-experiment-tracking/examples/case02/mlruns/1/731d83d1cbbe4b2a8c61ceec9bfd2503/artifacts/model', status='READY', status_message=None, tags={}, user_id=None, version=4>

In [54]:
from datetime import datetime
# annotate the model version with a description
date = datetime.today().date()
client.update_model_version(
    name=model_name, 
    version=model_version,
    description=f"The model version {model_version} was transitioned to {new_stage} on {datetime.today().date()}"
)

<ModelVersion: aliases=[], creation_timestamp=1716842569929, current_stage='Staging', description='The model version 4 was transitioned to Staging on 2024-05-27', last_updated_timestamp=1716842970999, name='nyc-taxi-model-R3', run_id='731d83d1cbbe4b2a8c61ceec9bfd2503', run_link=None, source='/Users/gbemidebe/Documents/GitHub/mlops-zoomcamp/02-experiment-tracking/examples/case02/mlruns/1/731d83d1cbbe4b2a8c61ceec9bfd2503/artifacts/model', status='READY', status_message=None, tags={}, user_id=None, version=4>

In [34]:
model_version = 5
new_stage = "Production"
print(f"Transitioning model '{model_name}' version '{model_version}' to stage '{new_stage}'")
client.transition_model_version_stage(
    name=model_name,
    version=model_version,
    stage=new_stage,
    archive_existing_versions=False
)

Transitioning model 'nyc-taxi-model-R3' version '5' to stage 'Production'


  client.transition_model_version_stage(


<ModelVersion: aliases=[], creation_timestamp=1716842569935, current_stage='Production', description=None, last_updated_timestamp=1716842649234, name='nyc-taxi-model-R3', run_id='e122cfeb172c427290ec6fc8ffd043a7', run_link=None, source='/Users/gbemidebe/Documents/GitHub/mlops-zoomcamp/02-experiment-tracking/examples/case02/mlruns/1/e122cfeb172c427290ec6fc8ffd043a7/artifacts/model', status='READY', status_message=None, tags={}, user_id=None, version=5>

In [35]:
client.update_model_version(
    name=model_name, 
    version=model_version,
    description=f"The model version {model_version} was transitioned to {new_stage} on {datetime.today().date()}"
)

<ModelVersion: aliases=[], creation_timestamp=1716842569935, current_stage='Production', description='The model version 5 was transitioned to Production on 2024-05-27', last_updated_timestamp=1716842653186, name='nyc-taxi-model-R3', run_id='e122cfeb172c427290ec6fc8ffd043a7', run_link=None, source='/Users/gbemidebe/Documents/GitHub/mlops-zoomcamp/02-experiment-tracking/examples/case02/mlruns/1/e122cfeb172c427290ec6fc8ffd043a7/artifacts/model', status='READY', status_message=None, tags={}, user_id=None, version=5>

In [50]:
# delete unused model versions
versions = [3]
for version in versions:
    client.delete_model_version(
        name=model_name, version=version
    )

### Comparing versions and selecting the new "Production" model

In the last section, we will retrieve models registered in the model registry and compare their performance on an unseen test set. The idea is to simulate the scenario in which a deployment engineer has to interact with the model registry to decide whether to update the model version that is in production or not.

These are the steps:

1. Load the test dataset, which corresponds to the NYC Green Taxi data from the month of March 2021.
2. Download the `DictVectorizer` that was fitted using the training data and saved to MLflow as an artifact, and load it with pickle.
3. Preprocess the test set using the `DictVectorizer` so we can properly feed the regressors.
4. Make predictions on the test set using the model versions that are currently in the "Staging" and "Production" stages, and compare their performance.
5. Based on the results, update the "Production" model version accordingly.


**Note: the model registry doesn't actually deploy the model to production when you transition a model to the "Production" stage, it just assign a label to that model version. You should complement the registry with some CI/CD code that does the actual deployment.**

In [36]:
# get data: check if directory exist. if yes, skip else create new one
import os, wget
data_directory = 'data/'
if os.path.exists(data_directory):
    print("Directory exists")
else:
    os.mkdir(data_directory)
    print("Directory does not exist and created new one")

# set directory
os.chdir(f'./{data_directory}')
# Download files
remote_desktop = False
if remote_desktop:
    if not os.path.exists('green_tripdata_2021-03.parquet'):
        !wget https://d37ci6vzurychx.cloudfront.net/trip-data/green_tripdata_2021-01.parquet
else:
    if not os.path.exists('green_tripdata_2021-03.parquet'):
        wget.download('https://d37ci6vzurychx.cloudfront.net/trip-data/green_tripdata_2021-03.parquet')
os.chdir(f'../')

Directory exists


In [37]:
from sklearn.metrics import root_mean_squared_error
import pandas as pd


def read_dataframe(filename):
    df = pd.read_parquet(filename)

    df.lpep_dropoff_datetime = pd.to_datetime(df.lpep_dropoff_datetime)
    df.lpep_pickup_datetime = pd.to_datetime(df.lpep_pickup_datetime)

    df['duration'] = df.lpep_dropoff_datetime - df.lpep_pickup_datetime
    df.duration = df.duration.apply(lambda td: td.total_seconds() / 60)

    df = df[(df.duration >= 1) & (df.duration <= 60)]

    categorical = ['PULocationID', 'DOLocationID']
    df[categorical] = df[categorical].astype(str)
    
    return df

In [38]:
df = read_dataframe("data/green_tripdata_2021-03.parquet")

In [39]:
run_id

'e122cfeb172c427290ec6fc8ffd043a7'

In [40]:
client.download_artifacts(run_id=run_id, path='preprocessor', dst_path='.')

Downloading artifacts:   0%|          | 0/1 [00:00<?, ?it/s]

'/Users/gbemidebe/Documents/GitHub/mlops-zoomcamp/02-experiment-tracking/examples/case02/preprocessor'

In [45]:
def preprocess(df, dv):
    df['PU_DO'] = df['PULocationID'] + '_' + df['DOLocationID']
    categorical = ['PU_DO']
    numerical = ['trip_distance']
    train_dicts = df[categorical + numerical].to_dict(orient='records')
    return dv.transform(train_dicts)


def test_model(name, stage, X_test, y_test):
    model = mlflow.pyfunc.load_model(f"models:/{name}/{stage}")
    y_pred = model.predict(X_test)
    return {"rmse": root_mean_squared_error(y_test, y_pred)}

In [46]:
import pickle

with open("preprocessor/preprocessor.b", "rb") as f_in:
    dv = pickle.load(f_in)

In [47]:
X_test = preprocess(df, dv)
y_test = df["duration"].values

In [48]:
%time test_model(name=model_name, stage="Production", X_test=X_test, y_test=y_test)

  latest = client.get_latest_versions(name, None if stage is None else [stage])


CPU times: user 5.85 s, sys: 352 ms, total: 6.2 s
Wall time: 6.91 s


{'rmse': 6.888934977622113}

In [55]:
%time test_model(name=model_name, stage="Staging", X_test=X_test, y_test=y_test)

CPU times: user 32 s, sys: 552 ms, total: 32.5 s
Wall time: 34 s


{'rmse': 6.8974324339654745}

In [56]:

# assuming the staging output is good, transition the model to production
# archive_existing_versions=True will archive all existing versions (i.e versions 5)
client.transition_model_version_stage(
    name=model_name,
    version=1,
    stage="Production",
    archive_existing_versions=True
)

  client.transition_model_version_stage(


<ModelVersion: aliases=[], creation_timestamp=1716842569910, current_stage='Production', description='The model version 1 was transitioned to Staging on 2024-05-27', last_updated_timestamp=1716843585064, name='nyc-taxi-model-R3', run_id='095bc04ae61b46e296072cfc409a278b', run_link=None, source='/Users/gbemidebe/Documents/GitHub/mlops-zoomcamp/02-experiment-tracking/examples/case02/mlruns/1/095bc04ae61b46e296072cfc409a278b/artifacts/model', status='READY', status_message=None, tags={}, user_id=None, version=1>