In [11]:
from datetime import datetime
import pandas as pd
import pickle
from sklearn.metrics import mean_squared_error

import mlflow
# After an experiment has been created, 
# this client can be used to search for experiments and register models
from mlflow.tracking import MlflowClient

from mlflow.entities import ViewType

In [2]:
MLFLOW_TRACKING_URI = 'sqlite:///mlflow.db'  # sqlite database link

client = MlflowClient(tracking_uri=MLFLOW_TRACKING_URI)

2023/04/09 17:32:51 INFO mlflow.store.db.utils: Creating initial MLflow database tables...
2023/04/09 17:32:51 INFO mlflow.store.db.utils: Updating database tables
INFO  [alembic.runtime.migration] Context impl SQLiteImpl.
INFO  [alembic.runtime.migration] Will assume non-transactional DDL.
INFO  [alembic.runtime.migration] Context impl SQLiteImpl.
INFO  [alembic.runtime.migration] Will assume non-transactional DDL.


In [3]:
# Create New Experiment. Don't confuse it with mlflow.set_experiment()
# client.create_experiment(name='new-experiment') 

# Search for runs in an experiment
runs = client.search_runs(experiment_ids='1',
                          # filter_string include filters like "params.model='xgboost'" or "tags.developer='Joses'"
                          filter_string="", 
                          run_view_type=ViewType.ACTIVE_ONLY, # View only succesful model runs
                          max_results=6,
                          order_by=['metrics.rmse ASC'] # Sort by RMSE in ascending order
                         )

# Print out the run search. Output is usually a list
for run in runs:
    print(f"run id: {run.info.run_id}, rmse: {run.data.metrics['rmse']:.4f}")

run id: c0c0c61d81ca42edb70508cb41125589, rmse: 6.2962
run id: 0522e50e249f4d0dba5b13b02b9e1069, rmse: 6.2962
run id: 47e12e7c04f74dfb88b0faee9350ed4d, rmse: 6.2962
run id: 2d9bea853b934c298cc3efa164e5703d, rmse: 6.2962
run id: 4608128aae0145d4940e01cc053a0557, rmse: 6.3052
run id: 465238221fc34df5a0e8297dc5a8c03c, rmse: 6.3216


### Registering a model from terminal

In [4]:
run_id = '4608128aae0145d4940e01cc053a0557'
artifact_model_fol = 'model_flow'
model_uri = f"runs:/{run_id}/{artifact_model_fol}"

mlflow.set_tracking_uri('http://localhost:5000')
# name = name of the MLFlow registry where different model versions can be reviewed
mlflow.register_model(model_uri=model_uri,name='nyc-taxi')

### View the status of versions in a registered model

In [6]:
model_name = 'nyc-taxi'
latest_versions = client.get_latest_versions(name=model_name)

for version in latest_versions:
    print(f"Version: {version.version}, Stage: {version.current_stage}")

2023/04/09 17:38:43 INFO mlflow.store.db.utils: Creating initial MLflow database tables...
2023/04/09 17:38:43 INFO mlflow.store.db.utils: Updating database tables
INFO  [alembic.runtime.migration] Context impl SQLiteImpl.
INFO  [alembic.runtime.migration] Will assume non-transactional DDL.


Version: 1, Stage: Staging
Version: 2, Stage: None


### Stage registered model versions from terminal
I'm staging the version 2 model

In [10]:
client.transition_model_version_stage(name=model_name, version=2, stage='Staging',
                                      archive_existing_versions=False)

# Include descriptions about the model that was staged
tday = datetime.today().date()
client.update_model_version(name=model_name,version=2,
                            description=f'The model v.{2} was transitioned to {"staging"} on {tday}')

<ModelVersion: creation_timestamp=1681061205955, current_stage='Staging', description='The model v.2 was transitioned to staging on 2023-04-09', last_updated_timestamp=1681062338786, name='nyc-taxi', run_id='4608128aae0145d4940e01cc053a0557', run_link='', source='/home/ubuntu/mlops_joses/notebooks/mlruns/1/4608128aae0145d4940e01cc053a0557/artifacts/model_flow', status='READY', status_message=None, tags={}, user_id=None, version=2>

### I'm setting model v1 to production

In [21]:
client.transition_model_version_stage(name=model_name, version=1, stage='Production',
                                      archive_existing_versions=False)

# Include descriptions about the model that was staged
tday = datetime.today().date()
client.update_model_version(name=model_name,version=1,
                            description=f'The model v.{1} was transitioned to {"production"} on {tday}')

<ModelVersion: creation_timestamp=1681056411791, current_stage='Production', description='The model v.1 was transitioned to production on 2023-04-09', last_updated_timestamp=1681063567629, name='nyc-taxi', run_id='c0c0c61d81ca42edb70508cb41125589', run_link='', source='/home/ubuntu/mlops_joses/notebooks/mlruns/1/c0c0c61d81ca42edb70508cb41125589/artifacts/model_mlflow', status='READY', status_message=None, tags={'Model': 'xgboost_january'}, user_id=None, version=1>

## Test the model prediction on Taxi Data from the month of March 2021

In [17]:
def load_data(path):
    df = pd.read_parquet(path)

    # Create a duration column to calculate the lenght of each trip in minutes
    df['duration'] = df.lpep_dropoff_datetime - df.lpep_pickup_datetime
    # df.duration = df.duration.apply(lambda x: x.total_seconds()/60)
    # Faster than apply with lambda function
    df['duration'] = df.duration.dt.total_seconds() / 60

    # Select only rows where the duration is between 1-60 minuts
    df = df[(df.duration>=1) & (df.duration<=60)].reset_index(drop=True)
    
    categorical = ['PULocationID','DOLocationID']
    # Convert the categorical columns to string so the OHE will work
    df[categorical] = df[categorical].astype(str)
    
    return df

def preprocess(df, dv):
    # Merge the pickup and drop off locations
    df['PU_DO'] = df['PULocationID'] + '_' + df['DOLocationID']
    categorical = ['PU_DO']
    numerical = ['trip_distance']
    train_dicts = df[categorical + numerical].to_dict(orient='records')
    return dv.transform(train_dicts)

def test_model(name, stage, X_test, y_test):
    model = mlflow.pyfunc.load_model(f"models:/{name}/{stage}")
    y_pred = model.predict(X_test)
    return {"rmse":mean_squared_error(y_test,y_pred, squared=False)}
    


### Run the inference

In [25]:
df = load_data('data/green_tripdata_2021-03.parquet')

# Download the preprocessor that was saved in the version 1 model to the current working dir.
mlflow.artifacts.download_artifacts(run_id='c0c0c61d81ca42edb70508cb41125589', 
                                    artifact_path='preproc',dst_path='.')
# Load the saved dictionary vectorizer
with open("preproc/preproc.b",'rb') as f_in:
    dv = pickle.load(f_in)
    
# Create the test data
X_test = preprocess(df, dv)
y_test = df['duration'].to_numpy()

# Use the v1. model for inference. Measure the run time too
%time test_model(name=model_name, stage='Production', X_test=X_test, y_test=y_test)

# Test the v2. staged model for inference. 
# It wouldn't work because the model was not saved to an artifact
# %time test_model(name=model_name, stage='Staging', X_test=X_test, y_test=y_test)

CPU times: user 10.2 s, sys: 0 ns, total: 10.2 s
Wall time: 2.7 s


{'rmse': 6.245703198557914}