In [86]:
import pickle

import mlflow
import pandas as pd
from mlflow.entities import ViewType
from mlflow.tracking import MlflowClient
from sklearn.metrics import mean_squared_error


In [87]:
# Function to load the data
# defining a function to quickly read and prepare data
def read_dataframe(filename):
    if filename.endswith('.csv'):
        df = pd.read_csv(filename)

        df.lpep_dropoff_datetime = pd.to_datetime(df.lpep_dropoff_datetime)
        df.lpep_pickup_datetime = pd.to_datetime(df.lpep_pickup_datetime)
    elif filename.endswith('.parquet'):
        df = pd.read_parquet(filename)

    df['duration'] = df.lpep_dropoff_datetime - df.lpep_pickup_datetime
    df.duration = df.duration.apply(lambda td: td.total_seconds() / 60)
    df = df[(df.duration >= 1) & (df.duration <= 60)]
    
    df['hour'] = df.lpep_pickup_datetime.dt.hour
    df['dayofweek'] = df.lpep_pickup_datetime.dt.day_of_week

    categorical = ['PULocationID', 'DOLocationID', 'hour', 'dayofweek', 'VendorID']
    df[categorical] = df[categorical].astype(str)
    df['PU_DO'] = df['PULocationID'] + '_' + df['DOLocationID']
    
    return df

In [88]:
#mlflow URI
tracking_uri = "sqlite:///mlflow.db"
experiment_name = "nyc-taxi-experiment-latest"
model_name = "taxi_pred_model"
mlflow.set_tracking_uri(f"{tracking_uri}")
mlflow.set_experiment(f"{experiment_name}")

<Experiment: artifact_location='./mlruns/3', experiment_id='3', lifecycle_stage='active', name='nyc-taxi-experiment-latest', tags={}>

In [89]:
# Search runs
client = MlflowClient(tracking_uri=tracking_uri)
experiment = client.get_experiment_by_name(name=experiment_name)
runs = client.search_runs(experiment_ids=[str(experiment.experiment_id)],
                          filter_string="metrics.rmse < 6.5",
                          run_view_type=ViewType.ACTIVE_ONLY,
                          max_results=2,
                          order_by=["metrics.rmse ASC"]
                          ) 

In [90]:
# Lets get second last run and register the model
run_ids = [dict(dict(run)['info'])['run_id'] for run in runs] + ["bcebff3745b84160a7633ebf9d71aeec"]
for run_id in run_ids:
    model_uri = f"runs:/{run_id}/model"
    model_name = "taxi_pred_model"
    mlflow.register_model(model_uri=model_uri, name=model_name)

Registered model 'taxi_pred_model' already exists. Creating a new version of this model...
2022/05/28 10:29:21 INFO mlflow.tracking._model_registry.client: Waiting up to 300 seconds for model version to finish creation.                     Model name: taxi_pred_model, version 20
Created version '20' of model 'taxi_pred_model'.
Registered model 'taxi_pred_model' already exists. Creating a new version of this model...
2022/05/28 10:29:21 INFO mlflow.tracking._model_registry.client: Waiting up to 300 seconds for model version to finish creation.                     Model name: taxi_pred_model, version 21
Created version '21' of model 'taxi_pred_model'.
Registered model 'taxi_pred_model' already exists. Creating a new version of this model...
2022/05/28 10:29:21 INFO mlflow.tracking._model_registry.client: Waiting up to 300 seconds for model version to finish creation.                     Model name: taxi_pred_model, version 22
Created version '22' of model 'taxi_pred_model'.


In [91]:
# Transition to stage and transition model to prod
client.transition_model_version_stage(name=model_name,
                                      version=19,
                                      stage="Production",
                                      archive_existing_versions=True)

client.transition_model_version_stage(name=model_name,
                                      version=14,
                                      stage="Staging",
                                      archive_existing_versions=True)




<ModelVersion: creation_timestamp=1653732725628, current_stage='Staging', description=None, last_updated_timestamp=1653733761605, name='taxi_pred_model', run_id='5d94cea2d90a410ab8b75e4fba4443ee', run_link=None, source='./mlruns/3/5d94cea2d90a410ab8b75e4fba4443ee/artifacts/model', status='READY', status_message=None, tags={}, user_id=None, version=14>

In [92]:
# Loading registered models
staging_versions = client.get_latest_versions(name=model_name, stages=["Staging"])
production_versions = client.get_latest_versions(name=model_name, stages=["Production"])

for st, pr in zip(staging_versions, production_versions):
    staging_version, prod_version = (st.version, pr.version)

print(staging_version, prod_version)

production_uri = f"models:/{model_name}/{str(prod_version)}" 
staging_uri = f"models:/{model_name}/{str(staging_version)}"

production_model = mlflow.pyfunc.load_model(model_uri=production_uri)
staging_model = mlflow.pyfunc.load_model(model_uri=staging_uri)

14 19


In [93]:
# Lets get the data

df_val = read_dataframe('/home/ubuntu/data/green_tripdata_2021-02.parquet')

# defing feature types
categorical = ['PU_DO', 'hour', 'dayofweek', 'VendorID'] #'PULocationID', 'DOLocationID']
numerical = ['trip_distance']

# Load preprocessor

with open("./models/preprocessor/dvtransformer.bin", 'rb') as file:
    preprocessor = pickle.load(file)

target = "duration"
val_dicts = df_val[categorical + numerical].to_dict(orient='records')
X_val = preprocessor.transform(val_dicts)
y_val = df_val[target].values

stage_pred = staging_model.predict(X_val)
prod_pred = production_model.predict(X_val)


stage_rmse = mean_squared_error(y_val, stage_pred, squared=False)
prod_rmse = mean_squared_error(y_val, prod_pred, squared=False)

if prod_rmse > stage_rmse:
    print("Promoting the model since new model is better performing than staging")

    stage_versions = client.get_latest_versions(name=model_name, stages=["Staging"])
    for st in stage_versions:
        stage_version = str(st.version)
        client.transition_model_version_stage(name=model_name,
                                      version=stage_version,
                                      stage="Production",
                                      archive_existing_versions=True)
        
    



    

Promoting the model since new model is better performing than staging
