In [6]:
import mlflow
print(mlflow.__version__)

2.13.0


In [1]:
!python3 preprocess_data.py --raw_data_path ./data --dest_path ./output

  from pandas.core.computation.check import NUMEXPR_INSTALLED
  from pandas.core import (


In [None]:
import os
import pickle
import click

from sklearn.ensemble import RandomForestRegressor
from sklearn.metrics import mean_squared_error


def load_pickle(filename: str):
    with open(filename, "rb") as f_in:
        return pickle.load(f_in)


@click.command()
@click.option(
    "--data_path",
    default="./output",
    help="Location where the processed NYC taxi trip data was saved"
)
def run_train(data_path: str):

    X_train, y_train = load_pickle(os.path.join(data_path, "train.pkl"))
    X_val, y_val = load_pickle(os.path.join(data_path, "val.pkl"))

    rf = RandomForestRegressor(max_depth=10, random_state=0)
    rf.fit(X_train, y_train)
    y_pred = rf.predict(X_val)

    rmse = mean_squared_error(y_val, y_pred, squared=False)


if __name__ == '__main__':
    run_train()

In [7]:
%%writefile train.py
import os
import pickle
import click
import mlflow

from sklearn.ensemble import RandomForestRegressor
from sklearn.metrics import mean_squared_error


mlflow.set_experiment("RF-train")##


def load_pickle(filename: str):
    with open(filename, "rb") as f_in:
        return pickle.load(f_in)


@click.command()
@click.option(
    "--data_path",
    default="./output",
    help="Location where the processed NYC taxi trip data was saved"
)

def run_train(data_path: str):
    mlflow.sklearn.autolog(log_datasets=False)##

    X_train, y_train = load_pickle(os.path.join(data_path, "train.pkl"))
    X_val, y_val = load_pickle(os.path.join(data_path, "val.pkl"))

    with mlflow.start_run() as run:
        print(run.info.run_id)##

        rf = RandomForestRegressor(max_depth=10, random_state=0)
        rf.fit(X_train, y_train)
        y_pred = rf.predict(X_val)

        rmse = mean_squared_error(y_val, y_pred, squared=False)
        mlflow.log_metric("rmse", rmse)##

if __name__ == '__main__':
    mlflow.set_tracking_uri("http://127.0.0.1:5000")
    run_train()

Overwriting train.py


In [8]:
import subprocess

result = subprocess.run(['python3', 'train.py'], capture_output=True, text=True)

run_id = result.stdout.strip()
run_id

'e2c2a2f3cb8542ebb3c5737e32622f95'

In [9]:
import mlflow

run_data = mlflow.get_run(run_id).data
run_data.params.get('min_samples_split')

'2'

In [10]:
import subprocess

subprocess.Popen("mlflow ui --backend-store-uri sqlite:///mlflow.db --default-artifact-root ./artifacts", shell=True)

<Popen: returncode: None args: 'mlflow ui --backend-store-uri sqlite:///mlfl...>

  from pandas.core.computation.check import NUMEXPR_INSTALLED
  from pandas.core import (
[2024-05-27 17:46:02 +0000] [78843] [INFO] Starting gunicorn 22.0.0
[2024-05-27 17:46:02 +0000] [78843] [ERROR] Connection in use: ('127.0.0.1', 5000)
[2024-05-27 17:46:02 +0000] [78843] [ERROR] Retrying in 1 second.
[2024-05-27 17:46:03 +0000] [78843] [ERROR] Connection in use: ('127.0.0.1', 5000)
[2024-05-27 17:46:03 +0000] [78843] [ERROR] Retrying in 1 second.
[2024-05-27 17:46:04 +0000] [78843] [ERROR] Connection in use: ('127.0.0.1', 5000)
[2024-05-27 17:46:04 +0000] [78843] [ERROR] Retrying in 1 second.
[2024-05-27 17:46:05 +0000] [78843] [ERROR] Connection in use: ('127.0.0.1', 5000)
[2024-05-27 17:46:05 +0000] [78843] [ERROR] Retrying in 1 second.
[2024-05-27 17:46:06 +0000] [78843] [ERROR] Connection in use: ('127.0.0.1', 5000)
[2024-05-27 17:46:06 +0000] [78843] [ERROR] Retrying in 1 second.
[2024-05-27 17:46:07 +0000] [78843] [ERROR] Can't connect to ('127.0.0.1', 5000)
Running the mlfl

In [11]:
%%writefile hpo.py
import os
import pickle
import click
import mlflow
import numpy as np
from hyperopt import STATUS_OK, Trials, fmin, hp, tpe
from hyperopt.pyll import scope
from sklearn.ensemble import RandomForestRegressor
from sklearn.metrics import mean_squared_error


mlflow.set_tracking_uri("http://127.0.0.1:5000")
mlflow.set_experiment("random-forest-hyperopt")


def load_pickle(filename: str):
    with open(filename, "rb") as f_in:
        return pickle.load(f_in)


@click.command()
@click.option(
    "--data_path",
    default="./output",
    help="Location where the processed NYC taxi trip data was saved"
)
@click.option(
    "--num_trials",
    default=15,
    help="The number of parameter evaluations for the optimizer to explore"
)


def run_optimization(data_path: str, num_trials: int):

    X_train, y_train = load_pickle(os.path.join(data_path, "train.pkl"))
    X_val, y_val = load_pickle(os.path.join(data_path, "val.pkl"))

    def objective(params):
        with mlflow.start_run():
            mlflow.log_params(params)

            rf = RandomForestRegressor(**params)
            rf.fit(X_train, y_train)
            y_pred = rf.predict(X_val)

            rmse = mean_squared_error(y_val, y_pred, squared=False)
            mlflow.log_metric("rmse", rmse)

        return {'loss': rmse, 'status': STATUS_OK}

    search_space = {
        'max_depth': scope.int(hp.quniform('max_depth', 1, 20, 1)),
        'n_estimators': scope.int(hp.quniform('n_estimators', 10, 50, 1)),
        'min_samples_split': scope.int(hp.quniform('min_samples_split', 2, 10, 1)),
        'min_samples_leaf': scope.int(hp.quniform('min_samples_leaf', 1, 4, 1)),
        'random_state': 42
    }

    rstate = np.random.default_rng(42)  # for reproducible results
    fmin(
        fn=objective,
        space=search_space,
        algo=tpe.suggest,
        max_evals=num_trials,
        trials=Trials(),
        rstate=rstate
    )


if __name__ == '__main__':
    run_optimization()

Writing hpo.py


In [12]:
!python3 hpo.py

  from pandas.core.computation.check import NUMEXPR_INSTALLED
  from pandas.core import (
2024/05/27 17:58:07 INFO mlflow.tracking.fluent: Experiment with name 'random-forest-hyperopt' does not exist. Creating a new experiment.
100%|██████████| 15/15 [00:57<00:00,  3.83s/trial, best loss: 5.335419588556921]


In [13]:
%%writefile register_model.py
import os
import pickle
import click
import mlflow

from mlflow.entities import ViewType
from mlflow.tracking import MlflowClient
from sklearn.ensemble import RandomForestRegressor
from sklearn.metrics import mean_squared_error


HPO_EXPERIMENT_NAME = "random-forest-hyperopt"
EXPERIMENT_NAME = "random-forest-best-models"
RF_PARAMS = ['max_depth', 'n_estimators', 'min_samples_split', 'min_samples_leaf', 'random_state']

mlflow.set_tracking_uri("http://127.0.0.1:5000")
mlflow.set_experiment(EXPERIMENT_NAME)
mlflow.sklearn.autolog(log_datasets=False)


def load_pickle(filename):
    with open(filename, "rb") as f_in:
        return pickle.load(f_in)


def train_and_log_model(data_path, params):
    X_train, y_train = load_pickle(os.path.join(data_path, "train.pkl"))
    X_val, y_val = load_pickle(os.path.join(data_path, "val.pkl"))
    X_test, y_test = load_pickle(os.path.join(data_path, "test.pkl"))

    with mlflow.start_run():
        for param in RF_PARAMS:
            params[param] = int(params[param])

        rf = RandomForestRegressor(**params)
        rf.fit(X_train, y_train)

        # Evaluate model on the validation and test sets
        val_rmse = mean_squared_error(y_val, rf.predict(X_val), squared=False)
        mlflow.log_metric("val_rmse", val_rmse)
        test_rmse = mean_squared_error(y_test, rf.predict(X_test), squared=False)
        mlflow.log_metric("test_rmse", test_rmse)


@click.command()
@click.option(
    "--data_path",
    default="./output",
    help="Location where the processed NYC taxi trip data was saved"
)
@click.option(
    "--top_n",
    default=5,
    type=int,
    help="Number of top models that need to be evaluated to decide which one to promote"
)


def run_register_model(data_path: str, top_n: int):

    client = MlflowClient("http://127.0.0.1:5000")

    # Retrieve the top_n model runs and log the models
    experiment = client.get_experiment_by_name(HPO_EXPERIMENT_NAME)
    runs = client.search_runs(
        experiment_ids=experiment.experiment_id,
        run_view_type=ViewType.ACTIVE_ONLY,
        max_results=top_n,
        order_by=["metrics.rmse ASC"]
    )

    for run in runs:
        train_and_log_model(data_path=data_path, params=run.data.params)

    # Select the model with the lowest test RMSE
    experiment = client.get_experiment_by_name(EXPERIMENT_NAME)
    best_run = client.search_runs(
        experiment_ids=experiment.experiment_id,
        run_view_type=ViewType.ACTIVE_ONLY,
        max_results=top_n,
        order_by=["metrics.test_rmse ASC"]
    )[0]

    print(best_run.info.run_id)

    # Register the best model
    mlflow.register_model(
        model_uri=f"runs:/{best_run.info.run_id}/model",
        name="random-forest-model"
    )


if __name__ == '__main__':
    run_register_model()

Writing register_model.py


In [14]:
import subprocess

result = subprocess.run(['python3', 'register_model.py'], capture_output=True, text=True)

run_id = result.stdout.strip()
run_id

'a345f070a9064af0bd3031febe828147'

In [15]:
from mlflow.tracking import MlflowClient

client = MlflowClient("http://127.0.0.1:5000")
run_data = client.get_run(run_id).data
run_data.metrics.get('test_rmse')

5.567408012462019