# Experiment Tracking Using MLFlow

In [1]:
# Standard imports
import numpy as np
import pandas as pd
from pydantic import BaseModel, ValidationError
import yaml

# Visualization
import matplotlib.pyplot as plt
import seaborn as sns

sns.set()

# Built-in library
import itertools
import re
import json
import typing as tp

import warnings

warnings.filterwarnings("error")

# for saving the pipeline
import joblib

# from Scikit-learn
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler, MinMaxScaler, Binarizer
from sklearn.pipeline import Pipeline
from sklearn.base import BaseEstimator, TransformerMixin
from sklearn.linear_model import LinearRegression, Lasso
from sklearn.ensemble import RandomForestRegressor
from sklearn import metrics, set_config

# Pipeline Display
set_config(display="text")

# from feature-engine
from feature_engine.imputation import (
    AddMissingIndicator,
    MeanMedianImputer,
)

from feature_engine.encoding import RareLabelEncoder

from feature_engine.transformation import (
    LogTransformer,
    YeoJohnsonTransformer,
)


from feature_engine.selection import DropFeatures

# Custom Imports
from data_manager import load_data, validate_input
import feat_engineering as fe
from schema import (
    TrainingSchema,
    ValidateTrainingData,
    ModelConfig,
    MLFlowConfig,
    ConfigVars,
)
import utilities as util

# pandas settings
pd.options.display.max_rows = 1_000
pd.options.display.max_columns = 1_000
pd.options.display.max_colwidth = 600

# Black code formatter (Optional)
%load_ext lab_black
# auto reload imports
%load_ext autoreload
%autoreload 2

### Load Data

In [2]:
# Load Data
train_data = load_data("data/yellow_tripdata_2022-01.parquet")
test_data = load_data("data/yellow_tripdata_2022-02.parquet")

print(f"Shape of: \ntrain_data: {train_data.shape}\ntest_data: {test_data.shape}\n")

train_data.head()

Shape of: 
train_data: (2406155, 20)
test_data: (2901257, 20)



Unnamed: 0,VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge,airport_fee,trip_duration
0,1,2022-01-01 00:35:40,2022-01-01 00:53:29,2.0,3.8,1.0,N,142,236,1,14.5,3.0,0.5,3.65,0.0,0.3,21.95,2.5,0.0,2.93492
1,1,2022-01-01 00:33:43,2022-01-01 00:42:07,1.0,2.1,1.0,N,236,42,1,8.0,0.5,0.5,4.0,0.0,0.3,13.3,0.0,0.0,2.24071
2,2,2022-01-01 00:53:21,2022-01-01 01:02:19,1.0,0.97,1.0,N,166,166,1,7.5,0.5,0.5,1.76,0.0,0.3,10.56,0.0,0.0,2.299581
3,2,2022-01-01 00:25:21,2022-01-01 00:35:23,1.0,1.09,1.0,N,114,68,2,8.0,0.5,0.5,0.0,0.0,0.3,11.8,2.5,0.0,2.400619
4,2,2022-01-01 00:36:48,2022-01-01 01:14:20,1.0,4.3,1.0,N,68,163,1,23.5,0.5,0.5,3.0,0.0,0.3,30.3,2.5,0.0,3.651437


### Load Config

In [3]:
fp = "config.yml"

with open(fp, "r") as file:
    config_file = yaml.safe_load(stream=file)


config = ConfigVars(
    model_config=ModelConfig(**config_file),
    mlflow_config=MLFlowConfig(**config_file),
)

In [4]:
# Split the data
X = train_data.drop(columns=[config.model_config.TARGET])
y = train_data[config.model_config.TARGET]

X_train, X_validate, y_train, y_validate = train_test_split(
    X,
    y,
    test_size=config.model_config.TEST_SIZE,
    random_state=config.model_config.RANDOM_STATE,
)

X_train.shape, X_validate.shape

((2165539, 19), (240616, 19))

### Pipeline

In [5]:
pipe = Pipeline(
    steps=[
        # ===== Select input features =====
        (
            "input vars",
            fe.SelectFeatures(features=config.model_config.INPUT_FEATURES),
        ),
        # ===== Add NaN flags =====
        (
            "add na_flag",
            AddMissingIndicator(
                missing_only=True, variables=config.model_config.NUM_VARS_WF_NA
            ),
        ),
        # ===== Impute NaNs =====
        (
            "impute num_vars",
            MeanMedianImputer(
                imputation_method="median", variables=config.model_config.NUM_VARS_WF_NA
            ),
        ),
        # ===== Create new features =====
        (
            "cal day_of_week",
            fe.CalculateDayOfWeek(feature=config.model_config.TEMPORAL_VAR),
        ),
        (
            "cal hour_of_day",
            fe.CalculateHourOfDay(feature=config.model_config.TEMPORAL_VAR),
        ),
        # ===== Select features =====
        (
            "important vars",
            fe.SelectFeatures(features=config.model_config.IMPORTANT_FEATURES),
        ),
         # ===== Drop features =====
        (
            "drop features",
            DropFeatures(features_to_drop=config.model_config.VARS_TO_DROP),
        ),
        # ===== Transform features =====
        (
            "log transformation",
            LogTransformer(
                variables=config.model_config.VARS_TO_LOG_TRANSFORM, base="e"
            ),
        ),
        # ===== Scale features =====
        ("scale data", StandardScaler()),
        # ===== Linear model =====
        ("linear model", LinearRegression()),
    ]
)
pipe

## Track With MLFlow

In [6]:
# Training data
training_data = util.TrainingData(
    X_train=X_train, X_validate=X_validate, y_train=y_train, y_validate=y_validate
)

exp_dict = {
    "experiment_name": "sample_experiment",
    "run_name": "001",
    "model_name": "linear_reg_pipeline",
    "tracking_uri": "sqlite:///mlflow.db",
}

exp_details = util.Experiment(**exp_dict)

exp_details

Experiment(experiment_name='sample_experiment', run_name='001', model_name='linear_reg_pipeline', tracking_uri='sqlite:///mlflow.db')

In [7]:
util.run_experiment(experiment=exp_details, estimator=pipe, training_data=training_data)

INFO :: 2022-12-17 22:05:32,859 :: Context impl SQLiteImpl.
INFO :: 2022-12-17 22:05:32,860 :: Will assume non-transactional DDL.
                         'RatecodeID', 'total_amount', 'tpep_pickup_datetime',
                         'trip_distance', 'VendorID'])), ('add na_flag', AddMissingIndicator(variables=['RatecodeID'])), ('impute num_vars', MeanMedianImputer(variables=['RatecodeID'])), ('cal day_of_week', CalculateDayOfWeek(feature='tpep_pickup_datetime')), ('cal hour_of_day', CalculateHourOfDay(feature='tpep_pickup_datetime'...`


 Model name: linear_reg_pipeline
  RMSE: 0.2896738945525108
  MSE: 0.08391096518521914
  MAE: 0.21940251433736507
  R2: 0.7795462515192009


Registered model 'linear_reg_pipeline' already exists. Creating a new version of this model...
2022/12/17 22:05:54 INFO mlflow.tracking._model_registry.client: Waiting up to 300 seconds for model version to finish creation.                     Model name: linear_reg_pipeline, version 5
Created version '5' of model 'linear_reg_pipeline'.


### Note: To clear the experiment runs, use:

```console
$ mlflow gc --backend-store-uri <sqlite:///mlflow.db>
```


## Clear All Experiment Runs In A DB
```bash
mlflow gc --backend-store-uri sqlite:///mlflow.db
```

In [8]:
# Another pipeline
pipe_1 = Pipeline(
    steps=[
        # ===== Select input features =====
        (
            "input vars",
            fe.SelectFeatures(features=config.model_config.INPUT_FEATURES),
        ),
        # ===== Add NaN flags =====
        (
            "add na_flag",
            AddMissingIndicator(
                missing_only=True, variables=config.model_config.NUM_VARS_WF_NA
            ),
        ),
        # ===== Impute NaNs =====
        (
            "impute num_vars",
            MeanMedianImputer(
                imputation_method="median", variables=config.model_config.NUM_VARS_WF_NA
            ),
        ),
        # ===== Create new features =====
        (
            "cal day_of_week",
            fe.CalculateDayOfWeek(feature=config.model_config.TEMPORAL_VAR),
        ),
        (
            "cal hour_of_day",
            fe.CalculateHourOfDay(feature=config.model_config.TEMPORAL_VAR),
        ),
        # ===== Select features =====
        (
            "important vars",
            fe.SelectFeatures(features=config.model_config.IMPORTANT_FEATURES),
        ),
        # ===== Drop features =====
        (
            "drop features",
            DropFeatures(features_to_drop=config.model_config.VARS_TO_DROP),
        ),
        # ===== Transform features =====
        (
            "YeoJohnson transformation",
            YeoJohnsonTransformer(variables=config.model_config.VARS_TO_LOG_TRANSFORM),
        ),       
        # ===== Scale features =====
        ("scale data", StandardScaler()),
        # ===== Linear model =====
        (
            "RF model",
            RandomForestRegressor(
                n_estimators=10,
                max_depth=10,
                random_state=config.model_config.RANDOM_STATE,
            ),
        ),
    ]
)
pipe_1

In [9]:
exp_dict = {
    "experiment_name": "sample_experiment",
    "run_name": "002",
    "model_name": "random_forest_pipeline",
    "tracking_uri": "sqlite:///mlflow.db",
}

exp_details = util.Experiment(**exp_dict)

exp_details

Experiment(experiment_name='sample_experiment', run_name='002', model_name='random_forest_pipeline', tracking_uri='sqlite:///mlflow.db')

In [10]:
util.run_experiment(
    experiment=exp_details, estimator=pipe_1, training_data=training_data
)

                         'RatecodeID', 'total_amount', 'tpep_pickup_datetime',
                         'trip_distance', 'VendorID'])), ('add na_flag', AddMissingIndicator(variables=['RatecodeID'])), ('impute num_vars', MeanMedianImputer(variables=['RatecodeID'])), ('cal day_of_week', CalculateDayOfWeek(feature='tpep_pickup_datetime')), ('cal hour_of_day', CalculateHourOfDay(feature='tpep_pickup_datetime'...`


 Model name: random_forest_pipeline
  RMSE: 0.20104226637139827
  MSE: 0.040417992867748255
  MAE: 0.1430756151240444
  R2: 0.8938124711818373


Registered model 'random_forest_pipeline' already exists. Creating a new version of this model...
2022/12/17 22:07:19 INFO mlflow.tracking._model_registry.client: Waiting up to 300 seconds for model version to finish creation.                     Model name: random_forest_pipeline, version 5
Created version '5' of model 'random_forest_pipeline'.
