In [22]:
# import libraries

import pickle

import pandas as pd

from sklearn.feature_extraction import DictVectorizer
from sklearn.ensemble import RandomForestRegressor
from sklearn.metrics import mean_squared_error
from sklearn.pipeline import make_pipeline

import mlflow

from prefect import task, flow, get_run_logger

Steps to perform in terminal:

1. Create S3 bucket
2. Check if EC2 has access to S3 with ```aws s3 ls```
3. Start mlflow UI with db backend and s3 artifact store (newly created bucket)
   ``` mlflow server --backend-store-uri=sqlite:///mlflow.db --default-artifact-root=s3://bhagabat-fhv-taxi-prediction/ ```
4. Ensure 5000 port forward on localhost added (you can check port in visual studio)
5. You should be able to access mlflow ui on browser http://127.0.0.1/5000


In [2]:
# Once mlflow is up and running set the tracking uri and experiment name

mlflow.set_tracking_uri("http://127.0.0.1:5000")
mlflow.set_experiment("fhv-taxi-duration")

2022/06/29 13:56:36 INFO mlflow.tracking.fluent: Experiment with name 'fhv-taxi-duration' does not exist. Creating a new experiment.


<Experiment: artifact_location='s3://bhagabat-fhv-taxi-prediction/1', experiment_id='1', lifecycle_stage='active', name='fhv-taxi-duration', tags={}>

In [23]:
# function to read the dataframe

@task
def read_dataframe(filename: str):
    logger = get_run_logger()
    logger.info(f"Reading file: {filename}...")
    df = pd.read_parquet(filename)
    return df

In [29]:
# function to preprocess dataset

@task
def preprocess_data(df, categorical):
    logger = get_run_logger()
    logger.info('Processing the dataframe...')
    df['duration'] = df.dropOff_datetime - df.pickup_datetime
    df['duration'] = df.duration.dt.total_seconds() / 60
    df = df[(df.duration >= 1) & (df.duration <= 60)].copy()

    df[categorical] = df[categorical].fillna(-1).astype('int').astype('str')
    return df

In [32]:
# Instead of downloading the datasets manually, we can copy links of files and read the data into dataframe
# Site: https://www1.nyc.gov/site/tlc/about/tlc-trip-record-data.page

@flow
def train_model():
    with mlflow.start_run():
        logger = get_run_logger()
        logger.info('Training the model...')
        mlflow.sklearn.autolog()
        df_train = read_dataframe(r'https://nyc-tlc.s3.amazonaws.com/trip+data/fhv_tripdata_2021-01.parquet').result()
        df_val = read_dataframe(r'https://nyc-tlc.s3.amazonaws.com/trip+data/fhv_tripdata_2021-02.parquet').result()

        categorical = ['PUlocationID', 'DOlocationID']
        df_train = preprocess_data(df_train, categorical).result()
        df_val = preprocess_data(df_val, categorical).result()

        target = 'duration'
        y_train = df_train[target].values
        y_val = df_val[target].values

        params = dict(max_depth=20, n_estimators=100, min_samples_leaf=10, random_state=0)
        mlflow.log_params(params)
        pipeline = make_pipeline(
            DictVectorizer(),
            RandomForestRegressor(**params, n_jobs=-1)
        )         

        train_dicts = df_train[categorical].to_dict(orient='records')
        val_dicts = df_val[categorical].to_dict(orient='records')


        pipeline.fit(train_dicts, y_train)
        y_pred = pipeline.predict(val_dicts)

        RandomForestRegressor(**params, n_jobs=-1)

        rmse = mean_squared_error(y_pred, y_val, squared=False)
        logger.info(f'rmse: {rmse}')
        mlflow.log_metric('rmse', rmse)

        # mlflow.sklearn.log_model(pipeline, artifact_path="model")



In [33]:
train_model()

16:01:21.371 | INFO    | prefect.engine - Created flow run 'chestnut-cicada' for flow 'train-model'
16:01:21.372 | INFO    | Flow run 'chestnut-cicada' - Using task runner 'ConcurrentTaskRunner'
16:01:21.429 | INFO    | Flow run 'chestnut-cicada' - Training the model...
16:01:21.532 | INFO    | Flow run 'chestnut-cicada' - Created task run 'read_dataframe-c80ba253-5' for task 'read_dataframe'
16:01:21.763 | INFO    | Task run 'read_dataframe-c80ba253-5' - Reading file: https://nyc-tlc.s3.amazonaws.com/trip+data/fhv_tripdata_2021-01.parquet...
16:01:29.269 | INFO    | Task run 'read_dataframe-c80ba253-5' - Finished in state Completed()
16:01:29.306 | INFO    | Flow run 'chestnut-cicada' - Created task run 'read_dataframe-c80ba253-6' for task 'read_dataframe'
16:01:29.451 | INFO    | Task run 'read_dataframe-c80ba253-6' - Reading file: https://nyc-tlc.s3.amazonaws.com/trip+data/fhv_tripdata_2021-02.parquet...
16:01:35.411 | INFO    | Task run 'read_dataframe-c80ba253-6' - Finished in sta

Completed(message='All states completed.', type=COMPLETED, result=[Completed(message=None, type=COMPLETED, result=        dispatching_base_num     pickup_datetime    dropOff_datetime  \
0                     B00009 2021-01-01 00:27:00 2021-01-01 00:44:00   
1                     B00009 2021-01-01 00:50:00 2021-01-01 01:07:00   
2                     B00013 2021-01-01 00:01:00 2021-01-01 01:51:00   
3                     B00037 2021-01-01 00:13:09 2021-01-01 00:21:26   
4                     B00037 2021-01-01 00:38:31 2021-01-01 00:53:44   
...                      ...                 ...                 ...   
1154107               B03266 2021-01-31 23:43:03 2021-01-31 23:51:48   
1154108               B03284 2021-01-31 23:50:27 2021-02-01 00:48:03   
1154109      B03285          2021-01-31 23:13:46 2021-01-31 23:29:58   
1154110      B03285          2021-01-31 23:58:03 2021-02-01 00:17:29   
1154111               B03321 2021-01-31 23:39:00 2021-02-01 00:15:00   

         PUlocationID