In [29]:
# IMPORTING MODULES:
import mlflow
from mlflow.tracking import MlflowClient
from mlflow.entities import ViewType

import pickle
import pandas as pd
from datetime import datetime
from sklearn.metrics import mean_squared_error

In [1]:
# IMPORTING AND SETTING UP MLFLOW:
mlflow.set_tracking_uri("sqlite:///mlflow.db")
mlflow.set_experiment("nyc-taxi-exp")

<Experiment: artifact_location='./mlruns/2', experiment_id='2', lifecycle_stage='active', name='nyc-taxi-exp', tags={}>

In [2]:
# INTERACTING WITH MODEL REGISTRY:
MLFLOW_TRACKING_URI = "sqlite:///mlflow.db"
client = MlflowClient(tracking_uri=MLFLOW_TRACKING_URI)

In [5]:
# CREATING NEW EXPERIMENT:
client.create_experiment(name="my-cool-expr")

'4'

In [8]:
# INSPECTING BEST RUNS:
runs = client.search_runs(
    experiment_ids=2,
    filter_string="metrics.rmse < 6.8",
    run_view_type=ViewType.ACTIVE_ONLY,
    max_results=5,
    order_by=["metrics.rmse ASC"]
)

# INSPECTION:
for run in runs:
    print(f"run id: {run.info.run_id}, rmse: {run.data.metrics['rmse']:.4f}")

run id: 646a8e3c8db64334853cd81e8cba2272, rmse: 6.3010
run id: 84ecf61f61fe4f0ba60ce927a1002aa8, rmse: 6.3010
run id: 40ab473dabed47529cdb05c2de071a44, rmse: 6.3010
run id: faff425333004c21af6c3139bac33f8b, rmse: 6.3028
run id: 9d88155e7db9479580813c0ff297cdc4, rmse: 6.3439


In [11]:
# REGISTER THE MODEL:
run_id = "9d88155e7db9479580813c0ff297cdc4"
model_uri = f"runs:/{run_id}/model"
mlflow.register_model(model_uri=model_uri, name="nyc-taxi-regressor")

Registered model 'nyc-taxi-regressor' already exists. Creating a new version of this model...
2022/05/29 11:13:28 INFO mlflow.tracking._model_registry.client: Waiting up to 300 seconds for model version to finish creation.                     Model name: nyc-taxi-regressor, version 3
Created version '3' of model 'nyc-taxi-regressor'.


<ModelVersion: creation_timestamp=1653802108089, current_stage='None', description=None, last_updated_timestamp=1653802108089, name='nyc-taxi-regressor', run_id='9d88155e7db9479580813c0ff297cdc4', run_link=None, source='./mlruns/2/9d88155e7db9479580813c0ff297cdc4/artifacts/model', status='READY', status_message=None, tags={}, user_id=None, version=3>

In [16]:
# GETTING LATEST VERSION OF MODEL:
model_name = "nyc-taxi-regressor"
latest_ver = client.get_latest_versions(name=model_name)
for version in latest_ver:
    print(f"version: {version.version}, stage: {version.current_stage}")

version: 2, stage: Staging
version: 3, stage: None


In [20]:
# TRANSITION MODEL VERSION STAGE:
model_ver = 3
new_stage = "Staging"
client.transition_model_version_stage(
    name=model_name,
    version=model_ver, 
    stage=new_stage,
    archive_existing_versions=False
)

<ModelVersion: creation_timestamp=1653802108089, current_stage='Staging', description=None, last_updated_timestamp=1653802827553, name='nyc-taxi-regressor', run_id='9d88155e7db9479580813c0ff297cdc4', run_link=None, source='./mlruns/2/9d88155e7db9479580813c0ff297cdc4/artifacts/model', status='READY', status_message=None, tags={}, user_id=None, version=3>

In [23]:
# UPDATING MODEL VERSIONS:
date = datetime.today().date()
client.update_model_version(
    name=model_name,
    version=model_ver,
    description=f"The model version {model_ver} was transitioned to {new_stage} on {date}"
)

<ModelVersion: creation_timestamp=1653802108089, current_stage='Staging', description='The model version 3 was transitioned to Staging on 2022-05-29', last_updated_timestamp=1653802881915, name='nyc-taxi-regressor', run_id='9d88155e7db9479580813c0ff297cdc4', run_link=None, source='./mlruns/2/9d88155e7db9479580813c0ff297cdc4/artifacts/model', status='READY', status_message=None, tags={}, user_id=None, version=3>

In [34]:
# FUNCTION TO READ DATAFRAME:
def read_dataframe(filename):                                                           # Defining function.
    
    if filename.endswith(".csv"):                                                       # Checking.
        df = pd.read_csv(filename)                                                      # Reading the dataset.
        df.lpep_dropoff_datetime = pd.to_datetime(df.lpep_dropoff_datetime)             # Converting to datetime. 
        df.lpep_pickup_datetime = pd.to_datetime(df.lpep_pickup_datetime)               # Converting to datetime. 
        
    elif filename.endswith(".parquet"):
        df = pd.read_parquet(filename)
        
    df["duration"] = df.lpep_dropoff_datetime - df.lpep_pickup_datetime
    df.duration = df.duration.apply(lambda x: x.total_seconds() / 60)
    
    df = df[(df.duration >= 1) & (df.duration <= 60)]
    
    categorical = ['PULocationID', 'DOLocationID']
    numerical = ['trip_distance']
    df[categorical] = df[categorical].astype(str)                                       # Conversion.
    
    return df

# FUNCTION TO PROCESS DATA:
def preprocess(df, dv):
    df["PU_DO"] = df['PULocationID'] + '_' + df['DOLocationID']
    categorical = ["PU_DO"]
    numerical = ["trip_distance"] 
    train_dicts = df[categorical + numerical].to_dict(orient="records")   # Initializing dictionary.
    return dv.transform(train_dicts)

# FUNCTION TO TEST MODEL:
def test_model(name, stage, X_test, y_test):
    model = mlflow.pyfunc.load_model(f"models:/{name}/{stage}")
    y_pred = model.predict(X_test)
    return {"rmse": mean_squared_error(y_test, y_pred, squared=False)}

In [27]:
# READING THE DATASET:
df = read_dataframe("./data/green_tripdata_2021-03.parquet")    # Reading dataset.
df.head()                                                       # Inspection.

Unnamed: 0,VendorID,lpep_pickup_datetime,lpep_dropoff_datetime,store_and_fwd_flag,RatecodeID,PULocationID,DOLocationID,passenger_count,trip_distance,fare_amount,...,mta_tax,tip_amount,tolls_amount,ehail_fee,improvement_surcharge,total_amount,payment_type,trip_type,congestion_surcharge,duration
0,2,2021-03-01 00:05:42,2021-03-01 00:14:03,N,1.0,83,129,1.0,1.56,7.5,...,0.5,0.0,0.0,,0.3,8.8,1.0,1.0,0.0,8.35
1,2,2021-03-01 00:21:03,2021-03-01 00:26:17,N,1.0,243,235,1.0,0.96,6.0,...,0.5,0.0,0.0,,0.3,7.3,2.0,1.0,0.0,5.233333
2,2,2021-03-01 00:02:06,2021-03-01 00:22:26,N,1.0,75,242,1.0,9.93,28.0,...,0.5,2.0,0.0,,0.3,31.3,1.0,1.0,0.0,20.333333
3,2,2021-03-01 00:24:03,2021-03-01 00:31:43,N,1.0,242,208,1.0,2.57,9.5,...,0.5,0.0,0.0,,0.3,10.8,2.0,1.0,0.0,7.666667
4,1,2021-03-01 00:11:10,2021-03-01 00:14:46,N,1.0,41,151,1.0,0.8,5.0,...,0.5,1.85,0.0,,0.3,8.15,1.0,1.0,0.0,3.6


In [28]:
# DOWNLOADING ARTIFACTS:
run_id = "646a8e3c8db64334853cd81e8cba2272"
client.download_artifacts(run_id=run_id, path="preprocessor", dst_path=".")

'/home/thinam_cool/preprocessor'

In [30]:
# INITIALIZING DICT VECTORIZER:
with open("preprocessor/preprocessor.b", "rb") as f_in:
    dv = pickle.load(f_in)

In [31]:
# PREPROCESSING THE DATASET:
X_test = preprocess(df, dv)
target = "duration"
y_test = df[target].values

In [35]:
# TESTING THE MODEL IN PRODUCTION:
%time test_model(name=model_name, stage="Production", X_test=X_test, y_test=y_test)

CPU times: user 15.4 s, sys: 0 ns, total: 15.4 s
Wall time: 2.07 s


{'rmse': 6.241212494560512}

In [36]:
# TRANSITION MODEL VERSION STAGE:
client.transition_model_version_stage(
    name=model_name,
    version=2, 
    stage="Production",
    archive_existing_versions=True
)

<ModelVersion: creation_timestamp=1653800531991, current_stage='Production', description='', last_updated_timestamp=1653804781777, name='nyc-taxi-regressor', run_id='a758dd7df8f44bec96bda3371c988e4f', run_link='', source='./mlruns/2/a758dd7df8f44bec96bda3371c988e4f/artifacts/model', status='READY', status_message=None, tags={'model ': 'gradientboostingregressor'}, user_id=None, version=2>