In [1]:
import argparse
import os
import pickle
import numpy as np

import pandas as pd
from sklearn.feature_extraction import DictVectorizer
import distutils.errors
from hyperopt.pyll import scope
from hyperopt import STATUS_OK, Trials, fmin, hp, tpe, space_eval
from mlflow.entities import ViewType
from mlflow.tracking import MlflowClient

In [2]:
import warnings
warnings.filterwarnings("ignore")

In [3]:
import mlflow
mlflow.__version__ #Answer to first question

'1.26.1'

# Pre-processing Steps

In [4]:
def dump_pickle(obj, filename):
    with open(filename, "wb") as f_out:
        return pickle.dump(obj, f_out)

In [5]:
def read_dataframe(filename: str):
    df = pd.read_parquet(filename)

    df['duration'] = df.lpep_dropoff_datetime - df.lpep_pickup_datetime
    df.duration = df.duration.apply(lambda td: td.total_seconds() / 60)
    df = df[(df.duration >= 1) & (df.duration <= 60)]

    categorical = ['PULocationID', 'DOLocationID']
    df[categorical] = df[categorical].astype(str)

    return df

In [6]:
def preprocess(df: pd.DataFrame, dv: DictVectorizer, fit_dv: bool = False):
    df['PU_DO'] = df['PULocationID'] + '_' + df['DOLocationID']
    categorical = ['PU_DO']
    numerical = ['trip_distance']
    dicts = df[categorical + numerical].to_dict(orient='records')
    if fit_dv:
        X = dv.fit_transform(dicts)
    else:
        X = dv.transform(dicts)
    return X, dv

In [7]:
def run(raw_data_path: str, dest_path: str, dataset: str = "green"):
    # load parquet files
    df_train = read_dataframe(
        os.path.join(raw_data_path, f"{dataset}_tripdata_2021-01.parquet")
    )
    df_valid = read_dataframe(
        os.path.join(raw_data_path, f"{dataset}_tripdata_2021-02.parquet")
    )
    df_test = read_dataframe(
        os.path.join(raw_data_path, f"{dataset}_tripdata_2021-03.parquet")
    )

    # extract the target
    target = 'duration'
    y_train = df_train[target].values
    y_valid = df_valid[target].values
    y_test = df_test[target].values

    # fit the dictvectorizer and preprocess data
    dv = DictVectorizer()
    X_train, dv = preprocess(df_train, dv, fit_dv=True)
    X_valid, _ = preprocess(df_valid, dv, fit_dv=False)
    X_test, _ = preprocess(df_test, dv, fit_dv=False)

    # create dest_path folder unless it already exists
    os.makedirs(dest_path, exist_ok=True)

    # save dictvectorizer and datasets
    dump_pickle(dv, os.path.join(dest_path, "dv.pkl"))
    dump_pickle((X_train, y_train), os.path.join(dest_path, "train.pkl"))
    dump_pickle((X_valid, y_valid), os.path.join(dest_path, "valid.pkl"))
    dump_pickle((X_test, y_test), os.path.join(dest_path, "test.pkl"))

In [8]:
run("./data/", "./data/assign2/")

In [9]:
lst_dir = os.listdir('./data/assign2')
print(len(lst_dir), lst_dir) # Answer to second question

4 ['valid.pkl', 'test.pkl', 'dv.pkl', 'train.pkl']


# Training Steps

In [10]:
from sklearn.ensemble import RandomForestRegressor
from sklearn.metrics import mean_squared_error

In [11]:
def load_pickle(filename: str):
    with open(filename, "rb") as f_in:
        return pickle.load(f_in)

In [12]:
os.getcwd()

'/home/ubuntu/mlops-zoomcamp/notebooks'

In [13]:
mlflow.set_tracking_uri("sqlite:///mlflow.db")
mlflow.set_experiment("Experiment-1")

<Experiment: artifact_location='./mlruns/3', experiment_id='3', lifecycle_stage='active', name='Experiment-1', tags={}>

In [14]:
def run_training(data_path):

    X_train, y_train = load_pickle(os.path.join(data_path, "train.pkl"))
    X_valid, y_valid = load_pickle(os.path.join(data_path, "valid.pkl"))
    
    mlflow.sklearn.autolog(exclusive=False)
    
    with mlflow.start_run():
        mlflow.log_param("train-data-path", "/home/ubuntu/notebooks/data/green_tripdata_2021-01.parquet")
        mlflow.log_param("valid-data-path", "/home/ubuntu/notebooks/data/green_tripdata_2021-02.parquet")
        rf = RandomForestRegressor(max_depth=10, random_state=0)
        rf.fit(X_train, y_train)
        y_pred = rf.predict(X_valid)

        rmse = mean_squared_error(y_valid, y_pred, squared=False)
        mlflow.log_metric("rmse", rmse)
    
    print(rmse)
    mlflow.end_run()

In [15]:
run_training("./data/assign2")

6.729070933590364


In [16]:
mlflow.set_experiment("Optimization-1")

<Experiment: artifact_location='./mlruns/4', experiment_id='4', lifecycle_stage='active', name='Optimization-1', tags={}>

In [17]:
def run_optimize(data_path, num_trials):

    X_train, y_train = load_pickle(os.path.join(data_path, "train.pkl"))
    X_valid, y_valid = load_pickle(os.path.join(data_path, "valid.pkl"))

    def objective(params):

        with mlflow.start_run():
            
            mlflow.set_tag("model","RF_hyp")
            mlflow.log_params(params)
            rf = RandomForestRegressor(**params)
            rf.fit(X_train, y_train)
            y_pred = rf.predict(X_valid)
            rmse = mean_squared_error(y_valid, y_pred, squared=False)
            mlflow.log_metric("rmse", rmse)
            mlflow.sklearn.log_model(rf, artifact_path = "artifacts")

        return {'loss': rmse, 'status': STATUS_OK}

    search_space = {
        'max_depth': scope.int(hp.quniform('max_depth', 1, 20, 1)),
        'n_estimators': scope.int(hp.quniform('n_estimators', 10, 50, 1)),
        'min_samples_split': scope.int(hp.quniform('min_samples_split', 2, 10, 1)),
        'min_samples_leaf': scope.int(hp.quniform('min_samples_leaf', 1, 4, 1)),
        'random_state': 10
    }

    rstate = np.random.default_rng(10)  # for reproducible results
    fmin(
        fn=objective,
        space=search_space,
        algo=tpe.suggest,
        max_evals=num_trials,
        trials=Trials(),
        rstate=rstate
    )
    mlflow.end_run()


In [18]:
run_optimize("./data/assign2", 25)

  0%|                                                                                                                                | 0/25 [00:00<?, ?trial/s, best loss=?]




  4%|████                                                                                                   | 1/25 [00:09<03:52,  9.70s/trial, best loss: 6.721531928622503]




  8%|████████▏                                                                                              | 2/25 [00:23<04:31, 11.82s/trial, best loss: 6.721531928622503]




 12%|████████████▎                                                                                          | 3/25 [00:30<03:32,  9.66s/trial, best loss: 6.721531928622503]




 16%|████████████████▋                                                                                       | 4/25 [00:38<03:13,  9.23s/trial, best loss: 6.69633581150215]




 20%|████████████████████▊                                                                                   | 5/25 [00:50<03:22, 10.10s/trial, best loss: 6.69633581150215]




 24%|████████████████████████▉                                                                               | 6/25 [00:57<02:51,  9.03s/trial, best loss: 6.69633581150215]




 28%|█████████████████████████████                                                                           | 7/25 [01:09<03:02, 10.11s/trial, best loss: 6.69633581150215]




 32%|█████████████████████████████████▎                                                                      | 8/25 [01:21<03:03, 10.80s/trial, best loss: 6.69633581150215]




 36%|█████████████████████████████████████▍                                                                  | 9/25 [01:30<02:39,  9.98s/trial, best loss: 6.69633581150215]




 40%|█████████████████████████████████████████▏                                                             | 10/25 [01:47<03:05, 12.39s/trial, best loss: 6.65927188735058]




 44%|█████████████████████████████████████████████▎                                                         | 11/25 [01:53<02:22, 10.20s/trial, best loss: 6.65927188735058]




 48%|████████████████████████████████████████████████▉                                                     | 12/25 [02:17<03:07, 14.45s/trial, best loss: 6.649712784637031]




 52%|█████████████████████████████████████████████████████                                                 | 13/25 [02:29<02:46, 13.88s/trial, best loss: 6.649712784637031]




 56%|█████████████████████████████████████████████████████████                                             | 14/25 [02:40<02:22, 13.00s/trial, best loss: 6.649712784637031]




 60%|█████████████████████████████████████████████████████████████▏                                        | 15/25 [02:53<02:08, 12.81s/trial, best loss: 6.649712784637031]




 64%|█████████████████████████████████████████████████████████████████▎                                    | 16/25 [03:13<02:14, 14.94s/trial, best loss: 6.649712784637031]




 68%|█████████████████████████████████████████████████████████████████████▎                                | 17/25 [03:23<01:48, 13.52s/trial, best loss: 6.649712784637031]




 72%|█████████████████████████████████████████████████████████████████████████▍                            | 18/25 [03:44<01:51, 15.99s/trial, best loss: 6.649254846695847]




 76%|█████████████████████████████████████████████████████████████████████████████▌                        | 19/25 [04:00<01:35, 15.92s/trial, best loss: 6.649254846695847]




 80%|█████████████████████████████████████████████████████████████████████████████████▌                    | 20/25 [04:12<01:13, 14.75s/trial, best loss: 6.649254846695847]




 84%|█████████████████████████████████████████████████████████████████████████████████████▋                | 21/25 [04:53<01:29, 22.40s/trial, best loss: 6.628204285116205]




 88%|████████████████████████████████████████████████████████████████████████████████████████▉            | 22/25 [05:27<01:18, 26.16s/trial, best loss: 6.6257436134903935]




 92%|████████████████████████████████████████████████████████████████████████████████████████████▉        | 23/25 [05:32<00:39, 19.76s/trial, best loss: 6.6257436134903935]




 96%|████████████████████████████████████████████████████████████████████████████████████████████████▉    | 24/25 [06:13<00:25, 25.96s/trial, best loss: 6.6257436134903935]




100%|█████████████████████████████████████████████████████████████████████████████████████████████████████| 25/25 [06:42<00:00, 16.09s/trial, best loss: 6.6257436134903935]


In [19]:
mlflow.set_experiment("Register_model-1")

<Experiment: artifact_location='./mlruns/5', experiment_id='5', lifecycle_stage='active', name='Register_model-1', tags={}>

In [20]:
SPACE = {
    'max_depth': scope.int(hp.quniform('max_depth', 1, 20, 1)),
    'n_estimators': scope.int(hp.quniform('n_estimators', 10, 50, 1)),
    'min_samples_split': scope.int(hp.quniform('min_samples_split', 2, 10, 1)),
    'min_samples_leaf': scope.int(hp.quniform('min_samples_leaf', 1, 4, 1)),
    'random_state': 10
}

In [21]:
mlflow.set_tracking_uri("http://127.0.0.1:5000")

def train_and_log_model(data_path, params):
    X_train, y_train = load_pickle(os.path.join(data_path, "train.pkl"))
    X_valid, y_valid = load_pickle(os.path.join(data_path, "valid.pkl"))
    X_test, y_test = load_pickle(os.path.join(data_path, "test.pkl"))

    with mlflow.start_run():
        params = space_eval(SPACE, params)
        rf = RandomForestRegressor(**params)
        rf.fit(X_train, y_train)

        # evaluate model on the validation and test sets
        valid_rmse = mean_squared_error(y_valid, rf.predict(X_valid), squared=False)
        mlflow.log_metric("valid_rmse", valid_rmse)
        test_rmse = mean_squared_error(y_test, rf.predict(X_test), squared=False)
        mlflow.log_metric("test_rmse", test_rmse)


def run_regiater_model(data_path, log_top):

    client = MlflowClient()

    # retrieve the top_n model runs and log the models to MLflow
    experiment = client.get_experiment_by_name("Optimization-1")
    runs = client.search_runs(
        experiment_ids=experiment.experiment_id,
        run_view_type=ViewType.ACTIVE_ONLY,
        max_results=log_top,
        order_by=["metrics.rmse ASC"]
    )
    for run in runs:
        train_and_log_model(data_path=data_path, params=run.data.params)

    # Identify model with the lowest test RMSE
    experiment = client.get_experiment_by_name("Register_model-1")
    best_run = client.search_runs(experiment_ids = experiment.experiment_id,
            run_view_type = ViewType.ACTIVE_ONLY,
            max_results=log_top,
            order_by=["metrics.test_rmse ASC"])[0]

    # register the best model
    run = f"runs/{best_run.info.run_id}/model"
    mlflow.register_model(model_uri = run, name = f"Registered model_{best_run.info.run_id}" )

In [22]:
run_regiater_model("./data/assign2", 5)

Successfully registered model 'Registered model_82695c6930c2493392922d91d4674b16'.
2022/05/31 04:24:17 INFO mlflow.tracking._model_registry.client: Waiting up to 300 seconds for model version to finish creation.                     Model name: Registered model_82695c6930c2493392922d91d4674b16, version 1
Created version '1' of model 'Registered model_82695c6930c2493392922d91d4674b16'.
