In [11]:
import pandas as pd
import urllib.request

In [2]:
import pickle

In [3]:
import seaborn as sns
import matplotlib.pyplot as plt

In [4]:
from sklearn.feature_extraction import DictVectorizer
from sklearn.linear_model import LinearRegression
from sklearn.linear_model import Lasso
from sklearn.linear_model import Ridge

from sklearn.metrics import mean_squared_error

In [5]:
import xgboost as xgb

In [6]:
# Helps to minimize the objective function
# Helps to optimize hyperparameters
from hyperopt import fmin, tpe, hp, STATUS_OK, Trials
from hyperopt.pyll import scope

In [7]:
from sklearn.ensemble import RandomForestRegressor, GradientBoostingRegressor, ExtraTreesRegressor
from sklearn.svm import LinearSVR

#### Download Files

In [15]:
TRIP_DATA_BASE_URL = "https://nyc-tlc.s3.amazonaws.com/trip+data"
DATASET_NAME = "fhv_tripdata"

In [16]:
def download_file(file_identifier: str, output_path:str):
    file_url = f'{TRIP_DATA_BASE_URL}/{DATASET_NAME}_{file_identifier}.parquet'
    print(file_url)
    
    response = urllib.request.urlopen(file_url)
    f = open(f"{output_path}/{DATASET_NAME}_{file_identifier}.parquet", 'wb')
    f.write(response.read())
    f.close()

In [17]:
download_file("2021-01", "../artifacts/data")
download_file("2021-02", "../artifacts/data")

https://nyc-tlc.s3.amazonaws.com/trip+data/fhv_tripdata_2021-01.parquet
https://nyc-tlc.s3.amazonaws.com/trip+data/fhv_tripdata_2021-02.parquet


#### Read Parquets

In [18]:
def read_dataframe(filename):
    df = pd.read_parquet(filename, engine="pyarrow")

    df.dropOff_datetime = pd.to_datetime(df.dropOff_datetime)
    df.pickup_datetime = pd.to_datetime(df.pickup_datetime)

    df['duration'] = df.dropOff_datetime - df.pickup_datetime
    df.duration = df.duration.apply(lambda td: td.total_seconds() / 60)

    df = df[(df.duration >= 1) & (df.duration <= 60)]

    categorical = ['PUlocationID', 'DOlocationID']
    df[categorical] = df[categorical].astype(str)
    
    return df

In [19]:
df_train = read_dataframe('../artifacts/data/fhv_tripdata_2021-01.parquet')
df_val = read_dataframe('../artifacts/data/fhv_tripdata_2021-02.parquet')

In [20]:
len(df_train), len(df_val)

(1109826, 990113)

#### Preprocess

In [21]:
df_train['PU_DO'] = df_train['PUlocationID'] + '_' + df_train['DOlocationID']
df_val['PU_DO'] = df_val['PUlocationID'] + '_' + df_val['DOlocationID']

In [22]:
categorical = ['PU_DO'] #'PULocationID', 'DOLocationID']

dv = DictVectorizer()

train_dicts = df_train[categorical].to_dict(orient='records')
X_train = dv.fit_transform(train_dicts)

val_dicts = df_val[categorical].to_dict(orient='records')
X_val = dv.transform(val_dicts)

In [23]:
target = 'duration'
y_train = df_train[target].values
y_val = df_val[target].values

#### Model Training 

In [24]:
lr = LinearRegression()
lr.fit(X_train, y_train)

y_pred = lr.predict(X_val)

mean_squared_error(y_val, y_pred, squared=False)

10.596251398155163

In [26]:
with open('../artifacts/models/lin_reg.bin', 'wb') as f_out:
    pickle.dump((dv, lr), f_out)

#### Model Training with MLFlow (basics with mlflow.log_artifact)

In [27]:
import mlflow

mlflow.set_tracking_uri("sqlite:///mlflow.db") # required to enable model registry in local development
mlflow.set_experiment("nyc-taxi-experiment")

2022/05/28 00:33:14 INFO mlflow.store.db.utils: Creating initial MLflow database tables...
2022/05/28 00:33:14 INFO mlflow.store.db.utils: Updating database tables
INFO  [alembic.runtime.migration] Context impl SQLiteImpl.
INFO  [alembic.runtime.migration] Will assume non-transactional DDL.
INFO  [alembic.runtime.migration] Running upgrade  -> 451aebb31d03, add metric step
INFO  [alembic.runtime.migration] Running upgrade 451aebb31d03 -> 90e64c465722, migrate user column to tags
INFO  [alembic.runtime.migration] Running upgrade 90e64c465722 -> 181f10493468, allow nulls for metric values
INFO  [alembic.runtime.migration] Running upgrade 181f10493468 -> df50e92ffc5e, Add Experiment Tags Table
INFO  [alembic.runtime.migration] Running upgrade df50e92ffc5e -> 7ac759974ad8, Update run tags with larger limit
INFO  [alembic.runtime.migration] Running upgrade 7ac759974ad8 -> 89d4b8295536, create latest metrics table
INFO  [89d4b8295536_create_latest_metrics_table_py] Migration complete!
INFO  

<Experiment: artifact_location='./mlruns/1', experiment_id='1', lifecycle_stage='active', name='nyc-taxi-experiment', tags={}>

In [30]:
with mlflow.start_run():

    mlflow.set_tag("developer", "irem")

    mlflow.log_param("train-data-path", "../artifacts/data/fhv_tripdata_2021-01.parquet")
    mlflow.log_param("valid-data-path", "../artifacts/data/fhv_tripdata_2021-02.parquet")

    alpha = 0.1
    mlflow.log_param("alpha", alpha)
    lr = Lasso(alpha)
    lr.fit(X_train, y_train)

    y_pred = lr.predict(X_val)
    rmse = mean_squared_error(y_val, y_pred, squared=False)
    mlflow.log_metric("rmse", rmse)
    
    # 1st and not-the best way of saving the models :D (with `log_artifact`)
    # we dont know how to use the model at all...
    # the second more standtized approach will come in next examples
    with open('../artifacts/models/lasso.bin', 'wb') as f_out:
        pickle.dump((dv, lr), f_out)
    mlflow.log_artifact(local_path="../artifacts/models/lasso.bin", artifact_path="models_pickle")

#### Model Training with MLFlow (advanced)

##### Selecting Best Parameters with hyperopt

--> Took forever for me (more than 10 hours, therefore either skip that or reduce the run.. by changin max_evals,and/or num_boost_round)

In [32]:
train = xgb.DMatrix(X_train, label=y_train)
valid = xgb.DMatrix(X_val, label=y_val)

In [35]:
def objective(hyper_params):
    with mlflow.start_run():
        mlflow.set_tag("model", "xgboost")
        mlflow.log_params(hyper_params)
        booster = xgb.train(
            params=hyper_params,
            dtrain=train,
            num_boost_round=1000,
            evals=[(valid, 'validation')],
            early_stopping_rounds=50
        )
        y_pred = booster.predict(valid)
        rmse = mean_squared_error(y_val, y_pred, squared=False)
        mlflow.log_metric("rmse", rmse)

    return {'loss': rmse, 'status': STATUS_OK}

In [36]:
search_space = {
    'max_depth': scope.int(hp.quniform('max_depth', 4, 100, 1)),
    'learning_rate': hp.loguniform('learning_rate', -3, 0),
    'reg_alpha': hp.loguniform('reg_alpha', -5, -1),
    'reg_lambda': hp.loguniform('reg_lambda', -6, -1),
    'min_child_weight': hp.loguniform('min_child_weight', -1, 3),
    'objective': 'reg:linear',
    'seed': 42
}

best_result = fmin(
    fn=objective,# function that typically contains code for model training and loss calculation.
    space=search_space, # defines the hyperparameter space to search.
    algo=tpe.suggest,
    max_evals=50,# mumber of hyperparameter settings to try
    trials=Trials() # use Trials when you call distributed training algorithms
)

[0]	validation-rmse:14.70117                                                                                                                                                                     
[1]	validation-rmse:12.52364                                                                                                                                                                     
[2]	validation-rmse:11.74730                                                                                                                                                                     
[3]	validation-rmse:11.45784                                                                                                                                                                     
[4]	validation-rmse:11.33779                                                                                                                                                                     
[5]	validation-rmse:11.27445  

##### Running XGBoost Experiment with selected parameters

In [37]:
mlflow.xgboost.autolog(disable=True)

In [None]:
with mlflow.start_run():
    
    train = xgb.DMatrix(X_train, label=y_train)
    valid = xgb.DMatrix(X_val, label=y_val)

    best_params = {
        'learning_rate': 0.09585355369315604,
        'max_depth': 30,
        'min_child_weight': 1.060597050922164,
        'objective': 'reg:linear',
        'reg_alpha': 0.018060244040060163,
        'reg_lambda': 0.011658731377413597,
        'seed': 42
    }

    mlflow.log_params(best_params)

    booster = xgb.train(
        params=best_params,
        dtrain=train,
        num_boost_round=1000,
        evals=[(valid, 'validation')],
        early_stopping_rounds=50
    )

    y_pred = booster.predict(valid)
    rmse = mean_squared_error(y_val, y_pred, squared=False)
    mlflow.log_metric("rmse", rmse)

    with open("../artifacts/models/preprocessor.b", "wb") as f_out:
        pickle.dump(dv, f_out)
    mlflow.log_artifact("../artifacts/models/preprocessor.b", artifact_path="preprocessor")
    
    # 2nd and Better way to save model
    # Save as MLFlow Model and give examples, how to use it in UI
    mlflow.xgboost.log_model(booster, artifact_path="models_mlflow") 

[0]	validation-rmse:18.99913
[1]	validation-rmse:17.89568
[2]	validation-rmse:16.93414
[3]	validation-rmse:16.10200
[4]	validation-rmse:15.38295
[5]	validation-rmse:14.76456
[6]	validation-rmse:14.23618
[7]	validation-rmse:13.78364
[8]	validation-rmse:13.40077
[9]	validation-rmse:13.07391
[10]	validation-rmse:12.79622
[11]	validation-rmse:12.56432
[12]	validation-rmse:12.36576
[13]	validation-rmse:12.19890
[14]	validation-rmse:12.05712
[15]	validation-rmse:11.93812
[16]	validation-rmse:11.83620
[17]	validation-rmse:11.75078
[18]	validation-rmse:11.67804
[19]	validation-rmse:11.61720
[20]	validation-rmse:11.56341
[21]	validation-rmse:11.51802
[22]	validation-rmse:11.48012
[23]	validation-rmse:11.44645
[24]	validation-rmse:11.41829
[25]	validation-rmse:11.39245
[26]	validation-rmse:11.37059
[27]	validation-rmse:11.35060
[28]	validation-rmse:11.33433
[29]	validation-rmse:11.31859
[30]	validation-rmse:11.30447
[31]	validation-rmse:11.29285
[32]	validation-rmse:11.28279
[33]	validation-rmse

[262]	validation-rmse:10.94808
[263]	validation-rmse:10.94746
[264]	validation-rmse:10.94704
[265]	validation-rmse:10.94664
[266]	validation-rmse:10.94618
[267]	validation-rmse:10.94591
[268]	validation-rmse:10.94547
[269]	validation-rmse:10.94510
[270]	validation-rmse:10.94474
[271]	validation-rmse:10.94451
[272]	validation-rmse:10.94422
[273]	validation-rmse:10.94393
[274]	validation-rmse:10.94336
[275]	validation-rmse:10.94277
[276]	validation-rmse:10.94223
[277]	validation-rmse:10.94156
[278]	validation-rmse:10.94116
[279]	validation-rmse:10.94085
[280]	validation-rmse:10.94045
[281]	validation-rmse:10.94008
[282]	validation-rmse:10.93963
[283]	validation-rmse:10.93938
[284]	validation-rmse:10.93896
[285]	validation-rmse:10.93852
[286]	validation-rmse:10.93803
[287]	validation-rmse:10.93765
[288]	validation-rmse:10.93733
[289]	validation-rmse:10.93700
[290]	validation-rmse:10.93640
[291]	validation-rmse:10.93576
[292]	validation-rmse:10.93502
[293]	validation-rmse:10.93453
[294]	va

##### Running SkLearn Experiments

In [None]:
mlflow.sklearn.autolog()

for model_class in (RandomForestRegressor, GradientBoostingRegressor, ExtraTreesRegressor, LinearSVR):

    with mlflow.start_run():

        mlflow.log_param("train-data-path", "../artifacts/data/fhv_tripdata_2021-01.parquet")
        mlflow.log_param("valid-data-path", "../artifacts/data/fhv_tripdata_2021-02.parquet")
        with open("../artifacts/models/preprocessor.b", "wb") as f_out:
            pickle.dump(dv, f_out)
        mlflow.log_artifact("../artifacts/models/preprocessor.b", artifact_path="preprocessor")

        mlmodel = model_class()
        mlmodel.fit(X_train, y_train)

        y_pred = mlmodel.predict(X_val)
        rmse = mean_squared_error(y_val, y_pred, squared=False)
        mlflow.log_metric("rmse", rmse)
        



##### Tracking API and experiment and run access

In [None]:
from mlflow.tracking import MlflowClient
from mlflow.entities import ViewType

In [None]:
client = MlflowCleint(tracking_uri="sqlite:///mlflow.db")

In [None]:
client.list_experiments()

In [None]:
runs = client.search_runs(
    experiment_ids = "1",
    filter_string="metrics.rmse < 6.8",
    run_view_type=ViewType.ACTIVE_ONLY,
    max_results=5,
    order_by=["metrics.rmse ASC"]
)

In [None]:
for run in runs:
    print(f"run id: {run.info.run_id}, rsme:{run.data.metrics["rsme"].4f}")