In [1]:
### Q1 ###

!pip install mlflow

Collecting mlflow
  Downloading mlflow-2.22.0-py3-none-any.whl (29.0 MB)
[K     |████████████████████████████████| 29.0 MB 8.7 MB/s eta 0:00:01     |█████████████████▏              | 15.6 MB 8.7 MB/s eta 0:00:02
[?25hCollecting gunicorn<24
  Downloading gunicorn-23.0.0-py3-none-any.whl (85 kB)
[K     |████████████████████████████████| 85 kB 6.9 MB/s  eta 0:00:01
[?25hCollecting graphene<4
  Downloading graphene-3.4.3-py2.py3-none-any.whl (114 kB)
[K     |████████████████████████████████| 114 kB 81.8 MB/s eta 0:00:01
Collecting pyarrow<20,>=4.0.0
  Downloading pyarrow-19.0.1-cp39-cp39-manylinux_2_28_x86_64.whl (42.1 MB)
[K     |████████████████████████████████| 42.1 MB 79 kB/s s eta 0:00:01     |██████████████████████▉         | 30.0 MB 78.0 MB/s eta 0:00:01
Collecting alembic!=1.10.0,<2
  Downloading alembic-1.16.1-py3-none-any.whl (242 kB)
[K     |████████████████████████████████| 242 kB 89.9 MB/s eta 0:00:01
[?25hCollecting mlflow-skinny==2.22.0
  Downloading mlflow_skinny-2.

Collecting deprecated>=1.2.6
  Downloading Deprecated-1.2.18-py2.py3-none-any.whl (10.0 kB)
Collecting importlib_metadata!=4.7.0,<9,>=3.7.0
  Downloading importlib_metadata-8.6.1-py3-none-any.whl (26 kB)
Collecting zipp>=0.5
  Downloading zipp-3.21.0-py3-none-any.whl (9.6 kB)
Collecting opentelemetry-semantic-conventions==0.54b1
  Downloading opentelemetry_semantic_conventions-0.54b1-py3-none-any.whl (194 kB)
[K     |████████████████████████████████| 194 kB 89.9 MB/s eta 0:00:01
Collecting pydantic-core==2.33.2
  Downloading pydantic_core-2.33.2-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (2.0 MB)
[K     |████████████████████████████████| 2.0 MB 63.3 MB/s eta 0:00:01
[?25hCollecting annotated-types>=0.6.0
  Downloading annotated_types-0.7.0-py3-none-any.whl (13 kB)
Collecting typing-inspection>=0.4.0
  Downloading typing_inspection-0.4.1-py3-none-any.whl (14 kB)
Collecting anyio<5,>=3.6.2
  Downloading anyio-4.9.0-py3-none-any.whl (100 kB)
[K     |█████████████████████

In [9]:
import mlflow
import pandas as pd

print(mlflow.__version__)

2.22.0


In [10]:
### Q2 ###

import os
import pickle
import click
import pandas as pd

from sklearn.feature_extraction import DictVectorizer


def dump_pickle(obj, filename: str):
    with open(filename, "wb") as f_out:
        return pickle.dump(obj, f_out)


def read_dataframe(filename: str):
    df = pd.read_parquet(filename)

    df['duration'] = df['lpep_dropoff_datetime'] - df['lpep_pickup_datetime']
    df.duration = df.duration.apply(lambda td: td.total_seconds() / 60)
    df = df[(df.duration >= 1) & (df.duration <= 60)]

    categorical = ['PULocationID', 'DOLocationID']
    df[categorical] = df[categorical].astype(str)

    return df


def preprocess(df: pd.DataFrame, dv: DictVectorizer, fit_dv: bool = False):
    df['PU_DO'] = df['PULocationID'] + '_' + df['DOLocationID']
    categorical = ['PU_DO']
    numerical = ['trip_distance']
    dicts = df[categorical + numerical].to_dict(orient='records')
    if fit_dv:
        X = dv.fit_transform(dicts)
    else:
        X = dv.transform(dicts)
    return X, dv


@click.command()
@click.option(
    "--raw_data_path",
    help="Location where the raw NYC taxi trip data was saved"
)
@click.option(
    "--dest_path",
    help="Location where the resulting files will be saved"
)
def run_data_prep(raw_data_path: str, dest_path: str, dataset: str = "green"):
    # Load parquet files
    df_train = read_dataframe(
        os.path.join(raw_data_path, f"{dataset}_tripdata_2023-01.parquet")
    )
    df_val = read_dataframe(
        os.path.join(raw_data_path, f"{dataset}_tripdata_2023-02.parquet")
    )
    df_test = read_dataframe(
        os.path.join(raw_data_path, f"{dataset}_tripdata_2023-03.parquet")
    )

    # Extract the target
    target = 'duration'
    y_train = df_train[target].values
    y_val = df_val[target].values
    y_test = df_test[target].values

    # Fit the DictVectorizer and preprocess data
    dv = DictVectorizer()
    X_train, dv = preprocess(df_train, dv, fit_dv=True)
    X_val, _ = preprocess(df_val, dv, fit_dv=False)
    X_test, _ = preprocess(df_test, dv, fit_dv=False)

    # Create dest_path folder unless it already exists
    os.makedirs(dest_path, exist_ok=True)

    # Save DictVectorizer and datasets
    dump_pickle(dv, os.path.join(dest_path, "dv.pkl"))
    dump_pickle((X_train, y_train), os.path.join(dest_path, "train.pkl"))
    dump_pickle((X_val, y_val), os.path.join(dest_path, "val.pkl"))
    dump_pickle((X_test, y_test), os.path.join(dest_path, "test.pkl"))


if __name__ == '__main__':
    run_data_prep()


In [13]:
### Q3 ###
import os
import pickle
import click

from sklearn.ensemble import RandomForestRegressor
from sklearn.metrics import mean_squared_error
import mlflow
import numpy as np

mlflow.autolog()

def load_pickle(filename: str):
    with open(filename, "rb") as f_in:
        return pickle.load(f_in)


@click.command()
@click.option(
    "--data_path",
    default="./output",
    help="Location where the processed NYC taxi trip data was saved"
)
def run_train(data_path: str):

    mlflow.set_tracking_uri("http://127.0.0.1:5001")
    mlflow.set_experiment("/my-mlops-experiment-1")

    with mlflow.start_run():
        X_train, y_train = load_pickle(os.path.join(data_path, "train.pkl"))
        X_val, y_val = load_pickle(os.path.join(data_path, "val.pkl"))

        rf = RandomForestRegressor(max_depth=10, random_state=0)
        rf.fit(X_train, y_train)
        y_pred = rf.predict(X_val)

        rmse = np.sqrt(mean_squared_error(y_val, y_pred))
        mlflow.log_metric("rmse", rmse)

        # Optional: log the model artifact
        mlflow.sklearn.log_model(rf, artifact_path="random_forest_model")


if __name__ == '__main__':
    run_train()


./output/train.pkl




5.431162180141208


In [None]:
### Q4 ###



In [None]:
### Q5 ###

import os
import pickle
import click
import mlflow
import numpy as np
from hyperopt import STATUS_OK, Trials, fmin, hp, tpe
from hyperopt.pyll import scope
from sklearn.ensemble import RandomForestRegressor
from sklearn.metrics import mean_squared_error

mlflow.set_tracking_uri("http://127.0.0.1:5001")
mlflow.set_experiment("random-forest-hyperopt")


def load_pickle(filename: str):
    with open(filename, "rb") as f_in:
        return pickle.load(f_in)


@click.command()
@click.option(
    "--data_path",
    default="./output",
    help="Location where the processed NYC taxi trip data was saved"
)
@click.option(
    "--num_trials",
    default=15,
    help="The number of parameter evaluations for the optimizer to explore"
)
def run_optimization(data_path: str, num_trials: int):

    X_train, y_train = load_pickle(os.path.join(data_path, "train.pkl"))
    X_val, y_val = load_pickle(os.path.join(data_path, "val.pkl"))

    def objective(params):

        with mlflow.start_run():
            rf = RandomForestRegressor(**params)
            rf.fit(X_train, y_train)
            y_pred = rf.predict(X_val)
            rmse = mean_squared_error(y_val, y_pred, squared=False)
            mlflow.log_metric("rmse", rmse)


        return {'loss': rmse, 'status': STATUS_OK}

    search_space = {
        'max_depth': scope.int(hp.quniform('max_depth', 1, 20, 1)),
        'n_estimators': scope.int(hp.quniform('n_estimators', 10, 50, 1)),
        'min_samples_split': scope.int(hp.quniform('min_samples_split', 2, 10, 1)),
        'min_samples_leaf': scope.int(hp.quniform('min_samples_leaf', 1, 4, 1)),
        'random_state': 42
    }

    rstate = np.random.default_rng(42)  # for reproducible results
    fmin(
        fn=objective,
        space=search_space,
        algo=tpe.suggest,
        max_evals=num_trials,
        trials=Trials(),
        rstate=rstate
    )


if __name__ == '__main__':
    run_optimization()


In [None]:
### Q6 ###

import os
import pickle
import click
import mlflow

from mlflow.entities import ViewType
from mlflow.tracking import MlflowClient
from sklearn.ensemble import RandomForestRegressor
from sklearn.metrics import mean_squared_error
import numpy as np

HPO_EXPERIMENT_NAME = "random-forest-hyperopt"
EXPERIMENT_NAME = "random-forest-best-models"
RF_PARAMS = ['max_depth', 'n_estimators', 'min_samples_split', 'min_samples_leaf', 'random_state']

mlflow.set_tracking_uri("http://127.0.0.1:5001")
mlflow.set_experiment(EXPERIMENT_NAME)
mlflow.sklearn.autolog()


def load_pickle(filename):
    with open(filename, "rb") as f_in:
        return pickle.load(f_in)


def train_and_log_model(data_path, params):
    X_train, y_train = load_pickle(os.path.join(data_path, "train.pkl"))
    X_val, y_val = load_pickle(os.path.join(data_path, "val.pkl"))
    X_test, y_test = load_pickle(os.path.join(data_path, "test.pkl"))

    with mlflow.start_run():
        new_params = {}

        for param in RF_PARAMS:
            if param in params:
                new_params[param] = int(params[param]) if param != 'random_state' else int(params[param])
            else:
                print(f"Warning: Parameter '{param}' missing. Using sklearn default.")
                # Alternatively, you can set custom default values like:
            

        rf = RandomForestRegressor(**new_params)
        rf.fit(X_train, y_train)
        mlflow.sklearn.log_model(rf, artifact_path="random_forest_model")

        # Evaluate model on the validation and test sets
        val_rmse = np.sqrt(mean_squared_error(y_val, rf.predict(X_val)))
        mlflow.log_metric("val_rmse", val_rmse)
        test_rmse = np.sqrt(mean_squared_error(y_test, rf.predict(X_test)))
        mlflow.log_metric("test_rmse", test_rmse)


@click.command()
@click.option(
    "--data_path",
    default="./output",
    help="Location where the processed NYC taxi trip data was saved"
)
@click.option(
    "--top_n",
    default=5,
    type=int,
    help="Number of top models that need to be evaluated to decide which one to promote"
)
def run_register_model(data_path: str, top_n: int):

    client = MlflowClient()

    # Retrieve the top_n model runs and log the models
    experiment = client.get_experiment_by_name(HPO_EXPERIMENT_NAME)
    runs = client.search_runs(
        experiment_ids=experiment.experiment_id,
        run_view_type=ViewType.ACTIVE_ONLY,
        max_results=top_n,
        order_by=["metrics.rmse ASC"]
    )
    for run in runs:
        train_and_log_model(data_path=data_path, params=run.data.params)

    # Select the model with the lowest test RMSE
    experiment = client.get_experiment_by_name(EXPERIMENT_NAME)
    # best_run = client.search_runs( ...  )[0]
    best_run = client.search_runs(
                    experiment_ids=[experiment.experiment_id],
                    run_view_type=ViewType.ACTIVE_ONLY,
                    max_results=1,
                    order_by=["metrics.rmse ASC"]
                )[0]

    # Register the best model
    # mlflow.register_model( ... )
    run_id = best_run.info.run_id
    model_uri = f"runs:/{run_id}/random_forest_model"  # or your artifact path
    model_name = "nyc-taxi-duration-model"  # Choose a meaningful name

    # Register the model
    result = mlflow.register_model(model_uri=model_uri, name=model_name)

    print(f"Model registered as: {result.name}, version: {result.version}")


if __name__ == '__main__':
    run_register_model()
