In [1]:
# import library 
import pandas as pd 
import pickle
from sklearn.feature_extraction import DictVectorizer
from sklearn.linear_model import LinearRegression 
from sklearn.linear_model import Lasso
from sklearn.svm import LinearSVR
from sklearn.ensemble import ExtraTreesRegressor, GradientBoostingRegressor, RandomForestRegressor
from xgboost import XGBRegressor
from sklearn.metrics import mean_squared_error

In [24]:
import mlflow

# Set the tracking URI to the same one used in your UI
mlflow.set_tracking_uri("http://127.0.0.1:5001")  

# Create or set the experiment
mlflow.set_experiment("nyc-taxi-experiment")

<Experiment: artifact_location='mlflow-artifacts:/1', creation_time=1746401851956, experiment_id='1', last_update_time=1746401851956, lifecycle_stage='active', name='nyc-taxi-experiment', tags={}>

In [3]:
def read_dataframe(filename):
    "Write a function to read and preprocessing data"
    # Read the dataset
    df_taxi = pd.read_parquet(filename)

    # Adjust dropoff & pickup to pandas datetime 
    df_taxi['lpep_pickup_datetime'] = pd.to_datetime(df_taxi.lpep_pickup_datetime)
    df_taxi['lpep_dropoff_datetime'] = pd.to_datetime(df_taxi.lpep_dropoff_datetime)
    
    # Calculate the duration (drop_off -  pick_up)
    df_taxi['duration'] = df_taxi.lpep_dropoff_datetime - df_taxi.lpep_pickup_datetime
    
    # Adjust the duration in minutes for prediction 
    df_taxi['duration_minutes'] = df_taxi['duration'].dt.total_seconds() / 60

    # Since there are a lot of duration less than 1 minutes. We filter only duration between 1 minutes to 99% percentile
    df_taxi = df_taxi[(df_taxi['duration_minutes'] >= 1) & (df_taxi['duration_minutes'] <= 60)]

    # Feature Engineering 
    categorical_variables = ['PULocationID', 'DOLocationID']
    numerical_variables = ['trip_distance']

    # Convert it into "str"
    df_taxi[categorical_variables] = df_taxi[categorical_variables].astype(str)
    
    return df_taxi

In [4]:
df_train = read_dataframe('../00-Dataset/green_tripdata_2021-01.parquet')
df_val = read_dataframe('../00-Dataset/green_tripdata_2021-02.parquet')

### Create the training pipeline 

In [5]:
# Feature Engineering 
categorical_variables = ['PULocationID', 'DOLocationID']
numerical_variables = ['trip_distance']

# Vectorizer the training variables 
dv = DictVectorizer()

# Convert it into dictionary 
train_dicts = df_train[categorical_variables + numerical_variables].to_dict(orient = 'records')
X_train = dv.fit_transform(train_dicts)

# Create the validation set 
val_dicts = df_val[categorical_variables + numerical_variables].to_dict(orient = 'records')
X_val = dv.transform(val_dicts)

### Try To Combine the input features 

In [6]:
df_train['PU_DO'] = df_train['PULocationID'] + '_' + df_train['DOLocationID']
df_val['PU_DO'] = df_val['PULocationID'] + '_' + df_val['DOLocationID']

In [7]:
# Feature Engineering 
categorical_variables = ['PU_DO']  #['PULocationID', 'DOLocationID']
numerical_variables = ['trip_distance']

# Vectorizer the training variables 
dv = DictVectorizer()

# Convert it into dictionary 
train_dicts = df_train[categorical_variables + numerical_variables].to_dict(orient = 'records')
X_train = dv.fit_transform(train_dicts)

# Create the validation set 
val_dicts = df_val[categorical_variables + numerical_variables].to_dict(orient = 'records')
X_val = dv.transform(val_dicts)

In [8]:
# Setup the Prediction_Variables 
predictor = 'duration_minutes'
y_train = df_train[predictor].values
y_val = df_val[predictor].values

In [9]:
models = {
    "LinearSVR": LinearSVR(),
    "ExtraTrees": ExtraTreesRegressor(n_jobs=-1),
    "GradientBoosting": GradientBoostingRegressor(),
    "RandomForest": RandomForestRegressor(n_jobs=-1),
    "XGBoost": XGBRegressor(n_jobs=-1)
}

In [61]:
for name, model in models.items():
    with mlflow.start_run(run_name=name):
        #set the tag name for who response
        mlflow.set_tag("developer","Dario")
        
        # Train the model
        model.fit(X_train, y_train)
        y_pred = model.predict(X_val)
        rmse = mean_squared_error(y_val, y_pred, squared=False)
        
        # Log parameters and metrics
        mlflow.set_tag("model", name)
        mlflow.log_param("train_rows", X_train.shape[0])
        mlflow.log_metric("rmse", rmse)
        
        # save the preprocessing 
        with open("models/preprocessor.b", "wb") as f_out:
            pickle.dump(dv, f_out)
        
        # log the preprocessing step 
        mlflow.log_artifact("models/preprocessor.b", artifact_path = "preprocessor")
        
        # Log model itself
        mlflow.sklearn.log_model(model, artifact_path="model")
        
        print(f"{name} RMSE: {rmse:.2f}")



LinearSVR RMSE: 779.25
🏃 View run LinearSVR at: http://127.0.0.1:5001/#/experiments/1/runs/e644e8399b884accb593474b33f46c17
🧪 View experiment at: http://127.0.0.1:5001/#/experiments/1




ExtraTrees RMSE: 6.94
🏃 View run ExtraTrees at: http://127.0.0.1:5001/#/experiments/1/runs/a221c5fad4974fefa4f84b818db00f4c
🧪 View experiment at: http://127.0.0.1:5001/#/experiments/1




GradientBoosting RMSE: 6.74
🏃 View run GradientBoosting at: http://127.0.0.1:5001/#/experiments/1/runs/07ce6eb2e69546739b59e9f8780059f8
🧪 View experiment at: http://127.0.0.1:5001/#/experiments/1




RandomForest RMSE: 6.91
🏃 View run RandomForest at: http://127.0.0.1:5001/#/experiments/1/runs/0dbdbb47826d48879214c89be7a09c5f
🧪 View experiment at: http://127.0.0.1:5001/#/experiments/1




XGBoost RMSE: 6.64
🏃 View run XGBoost at: http://127.0.0.1:5001/#/experiments/1/runs/b329f82bc08e4d5ebaf62f74833d5c54
🧪 View experiment at: http://127.0.0.1:5001/#/experiments/1


## Use the ML Client class 

In [11]:
# import the mlflow client library
from mlflow.tracking import MlflowClient 

# set the tracking URI
MLFLOW_TRACKING_URI = "sqlite:///mflow.db"

client = MlflowClient(tracking_uri = MLFLOW_TRACKING_URI)

2025/05/07 01:06:08 INFO mlflow.store.db.utils: Creating initial MLflow database tables...
2025/05/07 01:06:08 INFO mlflow.store.db.utils: Updating database tables
INFO  [alembic.runtime.migration] Context impl SQLiteImpl.
INFO  [alembic.runtime.migration] Will assume non-transactional DDL.
INFO  [alembic.runtime.migration] Running upgrade  -> 451aebb31d03, add metric step
INFO  [alembic.runtime.migration] Running upgrade 451aebb31d03 -> 90e64c465722, migrate user column to tags
INFO  [alembic.runtime.migration] Running upgrade 90e64c465722 -> 181f10493468, allow nulls for metric values
INFO  [alembic.runtime.migration] Running upgrade 181f10493468 -> df50e92ffc5e, Add Experiment Tags Table
INFO  [alembic.runtime.migration] Running upgrade df50e92ffc5e -> 7ac759974ad8, Update run tags with larger limit
INFO  [alembic.runtime.migration] Running upgrade 7ac759974ad8 -> 89d4b8295536, create latest metrics table
INFO  [89d4b8295536_create_latest_metrics_table_py] Migration complete!
INFO  

In [14]:
# Listing all the experiments
client = MlflowClient()
experiments = client.search_experiments()
for exp in experiments:
    print(f"Name: {exp.name}, ID: {exp.experiment_id}")

Name: nyc-taxi-experiment, ID: 1
Name: Default, ID: 0


In [20]:
from mlflow.entities import ViewType 
# Show the best run models
runs = client.search_runs(
    experiment_ids = 1,
    filter_string = "metrics.rmse < 7",
    run_view_type = ViewType.ACTIVE_ONLY,
    max_results = 5, 
    order_by = ["metrics.rmse ASC"]
)

# show the result
for run in runs:
    print(f"run_id: {run.info.run_id}, rmse:{run.data.metrics['rmse']:.4f}")

run_id: 1f3bdd5050ec470d84fb45ee81d7c57e, rmse:6.3012
run_id: 554c31ab5af245adb74b7857b90137a2, rmse:6.3012
run_id: fcbb9a4fe93b4071abe4e2f1ae5dee6b, rmse:6.3012
run_id: 7ec765ee6b9b485b893f5c366800c424, rmse:6.3026
run_id: 7117ea3740a943d5aeec22f440418050, rmse:6.3081


### Promote models to model registry

In [68]:
import mlflow

# Set the correct tracking URI
mlflow.set_tracking_uri("http://127.0.0.1:5001")  # or your actual URI

# Use correct run ID and model URI
run_id = "b329f82bc08e4d5ebaf62f74833d5c54"
model_uri = f"runs:/{run_id}/model"

# Register the model
mlflow.register_model(model_uri=model_uri, name="nyc-taxi-regressor")

Registered model 'nyc-taxi-regressor' already exists. Creating a new version of this model...
2025/05/07 03:31:49 INFO mlflow.store.model_registry.abstract_store: Waiting up to 300 seconds for model version to finish creation. Model name: nyc-taxi-regressor, version 6
Created version '6' of model 'nyc-taxi-regressor'.


<ModelVersion: aliases=[], creation_timestamp=1746588709103, current_stage='None', description='', last_updated_timestamp=1746588709103, name='nyc-taxi-regressor', run_id='b329f82bc08e4d5ebaf62f74833d5c54', run_link='', source='mlflow-artifacts:/1/b329f82bc08e4d5ebaf62f74833d5c54/artifacts/model', status='READY', status_message=None, tags={}, user_id='', version='6'>

### Transition model to another stage 

In [34]:
# get the lastest verion 
model_name = "nyc-taxi-regressor"
lastest_versions = client.get_latest_versions(name = model_name)

for version in lastest_versions:
    print(f"version: {version.version}, stage: {version.current_stage}")

version: 2, stage: Staging
version: 3, stage: None


  lastest_versions = client.get_latest_versions(name = model_name)


In [69]:
model_version = 5
new_stage = "Production"
# transition model stage
client.transition_model_version_stage(
    name = model_name,
    version = model_version, 
    stage = new_stage,
    archive_existing_versions = False
)

  client.transition_model_version_stage(


<ModelVersion: aliases=[], creation_timestamp=1746588475883, current_stage='Production', description='', last_updated_timestamp=1746588768798, name='nyc-taxi-regressor', run_id='b329f82bc08e4d5ebaf62f74833d5c54', run_link='', source='mlflow-artifacts:/1/b329f82bc08e4d5ebaf62f74833d5c54/artifacts/model', status='READY', status_message=None, tags={'model': 'xgboost'}, user_id='', version='5'>

### Change the model description

In [70]:
from datetime import datetime

date = datetime.today().date()
client.update_model_version(
    name = model_name,
    version = model_version, 
    description = f"The model version {model_version} was transition to {new_stage} on {date}"
)

<ModelVersion: aliases=[], creation_timestamp=1746588475883, current_stage='Production', description='The model version 5 was transition to Production on 2025-05-07', last_updated_timestamp=1746588789617, name='nyc-taxi-regressor', run_id='b329f82bc08e4d5ebaf62f74833d5c54', run_link='', source='mlflow-artifacts:/1/b329f82bc08e4d5ebaf62f74833d5c54/artifacts/model', status='READY', status_message=None, tags={'model': 'xgboost'}, user_id='', version='5'>

In [71]:
# Change model 2 to production
model_version = 2
new_stage = "Archived"
# transition model stage
client.transition_model_version_stage(
    name = model_name,
    version = model_version, 
    stage = new_stage,
    archive_existing_versions = False
)

date = datetime.today().date()
client.update_model_version(
    name = model_name,
    version = model_version, 
    description = f"The model version {model_version} was transition to {new_stage} on {date}"
)

  client.transition_model_version_stage(


<ModelVersion: aliases=[], creation_timestamp=1746579393292, current_stage='Archived', description='The model version 2 was transition to Archived on 2025-05-07', last_updated_timestamp=1746588823712, name='nyc-taxi-regressor', run_id='93589ab26598432588ea953f005931bb', run_link='', source='mlflow-artifacts:/1/93589ab26598432588ea953f005931bb/artifacts/model', status='READY', status_message=None, tags={'model': 'gradientboostingregrssor'}, user_id='', version='2'>

## Building the automation 

In [62]:
def read_dataframe(filename):
    "Write a function to read and preprocessing data"
    # Read the dataset
    df_taxi = pd.read_parquet(filename)

    # Adjust dropoff & pickup to pandas datetime 
    df_taxi['lpep_pickup_datetime'] = pd.to_datetime(df_taxi.lpep_pickup_datetime)
    df_taxi['lpep_dropoff_datetime'] = pd.to_datetime(df_taxi.lpep_dropoff_datetime)
    
    # Calculate the duration (drop_off -  pick_up)
    df_taxi['duration'] = df_taxi.lpep_dropoff_datetime - df_taxi.lpep_pickup_datetime
    
    # Adjust the duration in minutes for prediction 
    df_taxi['duration_minutes'] = df_taxi['duration'].dt.total_seconds() / 60

    # Since there are a lot of duration less than 1 minutes. We filter only duration between 1 minutes to 99% percentile
    df_taxi = df_taxi[(df_taxi['duration_minutes'] >= 1) & (df_taxi['duration_minutes'] <= 60)]

    # Feature Engineering 
    categorical_variables = ['PULocationID', 'DOLocationID']
    numerical_variables = ['trip_distance']

    # Convert it into "str"
    df_taxi[categorical_variables] = df_taxi[categorical_variables].astype(str)
    return df_taxi



def preprocess(df, dv):
    "Write the function to preprocessing data"
    df['PU_DO'] = df['PULocationID'] + '_' + df['DOLocationID']
    categorical = ['PU_DO']
    numerical = numerical_variables
    train_dicts = df[categorical + numerical].to_dict(orient = 'records')
    return dv.transform(train_dicts)



def test_model(name, stage, X_test, y_test):
    "Write a function to test the model"
    model = mlflow.pyfunc.load_model(f"models:/{name}/{stage}")
    y_pred = model.predict(X_test)
    return {"rmse": mean_squared_error(y_test, y_pred, squared = False)}

In [63]:
# read the dataframe
df = read_dataframe('../00-Dataset/green_tripdata_2021-03.parquet')

In [64]:
# download the preprocessor artifact to preprocess training data 
run_id = "b329f82bc08e4d5ebaf62f74833d5c54"
client.download_artifacts(run_id = run_id, path = 'preprocessor', dst_path = '.')

Downloading artifacts:   0%|          | 0/1 [00:00<?, ?it/s]

'/workspaces/mlops-zoomcamp/02-experiment-tracking/preprocessor'

In [65]:
# Load the preprocessot to transform new data (e.g: DictVectorize)
import pickle 

with open("preprocessor/preprocessor.b", "rb") as f_in:
    dv = pickle.load(f_in)

In [66]:
# preprocess the testing dataset
X_test = preprocess(df,dv)

# define the target variable
target = "duration_minutes"
y_test = df[target].values

In [67]:
# run the model with production stage
%time test_model(name = model_name, stage = 'Production', X_test = X_test, y_test = y_test)

Downloading artifacts:   0%|          | 0/5 [00:00<?, ?it/s]

CPU times: user 566 ms, sys: 16.8 ms, total: 583 ms
Wall time: 481 ms


{'rmse': 6.57711317277908}

In [58]:
# run the model with stagging stage
%time test_model(name = model_name, stage = 'Staging', X_test = X_test, y_test = y_test)

Downloading artifacts:   0%|          | 0/5 [00:00<?, ?it/s]

CPU times: user 11.5 s, sys: 3.77 s, total: 15.2 s
Wall time: 16.4 s


{'rmse': 6.88109797526315}