In [1]:
import os
import pandas as pd
import mlflow
from mlflow.tracking import MlflowClient
from mlflow.entities import ViewType
from datetime import datetime
import pickle
from sklearn.feature_extraction import DictVectorizer
from sklearn.ensemble import RandomForestRegressor
from sklearn.metrics import mean_squared_error
import numpy as np
from hyperopt import STATUS_OK, Trials, fmin, hp, tpe
from hyperopt.pyll import scope

## Q1. Install MLflow

In [2]:
mlflow.__version__

'2.22.0'

In [3]:
def dump_pickle(obj, filename: str):
    with open(filename, "wb") as f_out:
        return pickle.dump(obj, f_out)

def load_pickle(filename: str):
    with open(filename, "rb") as f_in:
        return pickle.load(f_in)

def read_data_frame(file):
    df = pd.read_parquet(file)

    df['duration'] = df.lpep_dropoff_datetime - df.lpep_pickup_datetime
    df.duration = df.duration.apply(lambda td: td.total_seconds() / 60)

    df = df[(df.duration >= 1) & (df.duration <= 60)]

    categorical = ['PULocationID', 'DOLocationID']
    numerical = ['trip_distance']
    df[categorical] = df[categorical].astype(str)
    
    return df

def preprocess(df, dv, fit_dv: bool = False):
    df['PU_DO'] = df['PULocationID'] + '_' + df['DOLocationID']
    categorical = ['PU_DO']
    numerical = ['trip_distance']
    train_dicts = df[categorical + numerical].to_dict(orient='records')
    
    if fit_dv:
        x = dv.fit_transform(train_dicts)
    else:
        x = dv.transform(train_dicts)
    return x, dv

def test_model(name, stage, x_test, y_test):
    model = mlflow.pyfunc.load_model(f"models:/{name}/{stage}")
    y_pred = model.predict(x_test)
    return {"rmse": mean_squared_error(y_test, y_pred, squared=False)}

def run_train(data_path):
    # Load data
    x_train, y_train = load_pickle(os.path.join(data_path, "train.pkl"))
    x_val, y_val = load_pickle(os.path.join(data_path, "val.pkl"))
    
    MLFLOW_TRACKING_URI = "http://localhost:5000"
    mlflow.set_tracking_uri(MLFLOW_TRACKING_URI)

    with mlflow.start_run():
        rf = RandomForestRegressor(max_depth=10, random_state=0)
        rf.fit(x_train, y_train)
        
        # Automatically log all parameters, including min_samples_split
        mlflow.log_params(rf.get_params())
        
        y_pred = rf.predict(x_val)
        rmse = mean_squared_error(y_val, y_pred, squared=False)
        mlflow.log_metric("rmse", rmse)

        # Save the model artifact
        mlflow.sklearn.log_model(rf, artifact_path="models_mlflow")

def run_optimization(data_path, num_trials):

    X_train, y_train = load_pickle(os.path.join(data_path, "train.pkl"))
    X_val, y_val = load_pickle(os.path.join(data_path, "val.pkl"))

    def objective(params):
        with mlflow.start_run():
            rf = RandomForestRegressor(**params)
            rf.fit(x_train, y_train)
            y_pred = rf.predict(x_val)
            rmse = mean_squared_error(y_val, y_pred, squared=False)

            # Log hyperparameters
            mlflow.log_params(params)

            # Log validation metric
            mlflow.log_metric("rmse", rmse)

            return {'loss': rmse, 'status': STATUS_OK}

    search_space = {
        'max_depth': scope.int(hp.quniform('max_depth', 1, 20, 1)),
        'n_estimators': scope.int(hp.quniform('n_estimators', 10, 50, 1)),
        'min_samples_split': scope.int(hp.quniform('min_samples_split', 2, 10, 1)),
        'min_samples_leaf': scope.int(hp.quniform('min_samples_leaf', 1, 4, 1)),
        'random_state': 42
    }

    rstate = np.random.default_rng(42)  # for reproducible results
    fmin(
        fn=objective,
        space=search_space,
        algo=tpe.suggest,
        max_evals=num_trials,
        trials=Trials(),
        rstate=rstate
    )

def train_and_log_model(data_path, params):
    x_train, y_train = load_pickle(os.path.join(data_path, "train.pkl"))
    x_val, y_val = load_pickle(os.path.join(data_path, "val.pkl"))
    x_test, y_test = load_pickle(os.path.join(data_path, "test.pkl"))

    with mlflow.start_run():
        new_params = {}
        for param in RF_PARAMS:
            new_params[param] = int(params[param])

        rf = RandomForestRegressor(**new_params)
        rf.fit(x_train, y_train)

        # Evaluate model on the validation and test sets
        val_rmse = mean_squared_error(y_val, rf.predict(x_val), squared=False)
        mlflow.log_metric("val_rmse", val_rmse)
        test_rmse = mean_squared_error(y_test, rf.predict(x_test), squared=False)
        mlflow.log_metric("test_rmse", test_rmse)

def run_register_model(data_path, top_n):
    mlflow.set_tracking_uri(MLFLOW_TRACKING_URI)
    mlflow.set_experiment(EXPERIMENT_NAME)

    client = MlflowClient()

    # Retrieve the top_n model runs and log the models
    experiment = client.get_experiment_by_name(HPO_EXPERIMENT_NAME)
    runs = client.search_runs(
        experiment_ids=experiment.experiment_id,
        run_view_type=ViewType.ACTIVE_ONLY,
        max_results=top_n,
        order_by=["metrics.test_rmse ASC"]
    )
    for run in runs:
        train_and_log_model(data_path=data_path, params=run.data.params)

    # Select the model with the lowest test RMSE
    experiment = client.get_experiment_by_name(EXPERIMENT_NAME)
    best_run = client.search_runs(
        experiment_ids=experiment.experiment_id,
        run_view_type=ViewType.ACTIVE_ONLY,
        max_results=1,
        order_by=["metrics.test_rmse ASC"]
    )[0]

    # Register the best model
    run_id = best_run.info.run_id
    model_uri = f"runs:/{run_id}/model"

    mlflow.register_model(
        model_uri=model_uri,
        name=REGISTERED_MODEL_NAME
    )

In [4]:
df_train = read_data_frame('https://d37ci6vzurychx.cloudfront.net/trip-data/green_tripdata_2023-01.parquet')
df_val = read_data_frame('https://d37ci6vzurychx.cloudfront.net/trip-data/green_tripdata_2023-02.parquet')
df_test = read_data_frame('https://d37ci6vzurychx.cloudfront.net/trip-data/green_tripdata_2023-03.parquet')

In [5]:
# Extract the target
target = "duration"
y_train = df_train[target].values
y_val = df_val[target].values
y_test = df_test[target].values

In [6]:
# Fit the DictVectorizer and preprocess data
dv = DictVectorizer()
x_train, dv = preprocess(df_train, dv, fit_dv=True)
x_val, _ = preprocess(df_val, dv, fit_dv=False)
x_test, _ = preprocess(df_test, dv, fit_dv=False)

In [7]:
dest_path = './output'

# Save DictVectorizer and datasets
dump_pickle(dv, os.path.join(dest_path, "dv.pkl"))
dump_pickle((x_train, y_train), os.path.join(dest_path, "train.pkl"))
dump_pickle((x_val, y_val), os.path.join(dest_path, "val.pkl"))
dump_pickle((x_test, y_test), os.path.join(dest_path, "test.pkl"))

## Q2. Download and preprocess the data

In [8]:
file_names = os.listdir(dest_path)

file_names

['train.pkl', 'val.pkl', 'dv.pkl', 'test.pkl']

## Q3. Train a model with autolog

In [9]:
run_train(dest_path)



🏃 View run painted-dolphin-251 at: http://localhost:5000/#/experiments/0/runs/18bc3a8b56604893938ad68fddc1513d
🧪 View experiment at: http://localhost:5000/#/experiments/0


## Q4. Launch the tracking server locally

In [10]:
MLFLOW_TRACKING_URI = "http://localhost:5000"

mlflow.set_tracking_uri(MLFLOW_TRACKING_URI)
mlflow.set_experiment("random-forest-hyperopt")

<Experiment: artifact_location='/workspaces/MLOps-Zoomcamp/2. Module 2 - Experiment Tracking/artifacts/4', creation_time=1748244851717, experiment_id='4', last_update_time=1748244851717, lifecycle_stage='active', name='random-forest-hyperopt', tags={}>

## Q5. Tune model hyperparameters

In [11]:
run_optimization(dest_path, 15)

🏃 View run fun-goose-459 at: http://localhost:5000/#/experiments/4/runs/ae156b62b2664aa49ed1cf5354a817e7           

🧪 View experiment at: http://localhost:5000/#/experiments/4                                                        

🏃 View run enthused-fish-348 at: http://localhost:5000/#/experiments/4/runs/9aa503f31bad494e8643caa85ae37494       

🧪 View experiment at: http://localhost:5000/#/experiments/4                                                        

🏃 View run luxuriant-smelt-376 at: http://localhost:5000/#/experiments/4/runs/aa74efb69ad34958b67351d115c84838     

🧪 View experiment at: http://localhost:5000/#/experiments/4                                                        

🏃 View run luminous-foal-788 at: http://localhost:5000/#/experiments/4/runs/ac39b74789dc4ee7bbe9ba3a6e0c63b9       

🧪 View experiment at: http://localhost:5000/#/experiments/4                                                        

🏃 View run judicious-mink-161 at: http://localhost:5000/#/experi

In [12]:
MLFLOW_TRACKING_URI = "http://localhost:5000"
HPO_EXPERIMENT_NAME = "random-forest-hyperopt"
EXPERIMENT_NAME = "random-forest-best-models"
REGISTERED_MODEL_NAME = "random-forest-regressor"
RF_PARAMS = ['max_depth', 'n_estimators', 'min_samples_split', 'min_samples_leaf', 'random_state']

## Q6. Promote the best model to the model registry

In [13]:
run_register_model(dest_path, 5)

🏃 View run rogue-fawn-546 at: http://localhost:5000/#/experiments/5/runs/322f082f5e24425aa94f1b4c6ac9de7c
🧪 View experiment at: http://localhost:5000/#/experiments/5
🏃 View run clumsy-wren-334 at: http://localhost:5000/#/experiments/5/runs/2c9c5339dbe146a6a8f5312f18de24f5
🧪 View experiment at: http://localhost:5000/#/experiments/5
🏃 View run inquisitive-crab-393 at: http://localhost:5000/#/experiments/5/runs/70c349b536be4a39936f7710cb61b1f0
🧪 View experiment at: http://localhost:5000/#/experiments/5
🏃 View run enchanting-moose-695 at: http://localhost:5000/#/experiments/5/runs/ef4a81ddabc2474992460c5ea6d0e090
🧪 View experiment at: http://localhost:5000/#/experiments/5


Registered model 'random-forest-regressor' already exists. Creating a new version of this model...
2025/05/26 08:01:51 INFO mlflow.store.model_registry.abstract_store: Waiting up to 300 seconds for model version to finish creation. Model name: random-forest-regressor, version 3


🏃 View run nosy-grub-119 at: http://localhost:5000/#/experiments/5/runs/f27218a2d33a4ed190df9305e9785243
🧪 View experiment at: http://localhost:5000/#/experiments/5


Created version '3' of model 'random-forest-regressor'.
