In [6]:
import pickle
from datetime import datetime
from dateutil.relativedelta import relativedelta

import mlflow
import pandas as pd
from sklearn.feature_extraction import DictVectorizer
from sklearn.linear_model import LinearRegression
from sklearn.metrics import mean_squared_error
from prefect import flow, task, get_run_logger
from prefect.task_runners import SequentialTaskRunner
from prefect.deployments import DeploymentSpec
from prefect.orion.schemas.schedules import CronSchedule
from prefect.flow_runners import SubprocessFlowRunner

In [7]:
@task(name="path getting")
def get_paths(date: str) -> tuple[str, str]:
    if date:
        date = datetime.strptime(date, '%Y-%m-%d')
    else:
        date = datetime.today()

    train_date = date - relativedelta(months=2)
    val_date = date - relativedelta(months=1)
    train_path = f'./data/fhv_tripdata_{train_date:%Y-%m}.parquet'
    val_path = f'./data/fhv_tripdata_{val_date:%Y-%m}.parquet'

    return train_path, val_path

In [8]:
@task(name="data reading")
def read_data(path: str) -> pd.DataFrame:
    df = pd.read_parquet(path)
    return df

In [9]:
@task(name="feature preparing")
def prepare_features(df: pd.DataFrame, categorical: list[str], train: bool = True) -> pd.DataFrame:
    
    logger = get_run_logger()
    df['duration'] = df.dropOff_datetime - df.pickup_datetime
    df['duration'] = df.duration.dt.total_seconds() / 60
    df = df[(df.duration >= 1) & (df.duration <= 60)].copy()

    mean_duration = df.duration.mean()
    if train:
        logger.info(f"The mean duration of training is {mean_duration}")
    else:
        logger.info(f"The mean duration of validation is {mean_duration}")
    
    df[categorical] = df[categorical].fillna(-1).astype('int').astype('str')
    return df

In [10]:
@task(name="training")
def train_model(df: pd.DataFrame, categorical: list[str]) -> tuple[LinearRegression, DictVectorizer]:

    logger = get_run_logger()
    train_dicts = df[categorical].to_dict(orient='records')
    dv = DictVectorizer()
    X_train = dv.fit_transform(train_dicts) 
    y_train = df.duration.values

    logger.info(f"The shape of X_train is {X_train.shape}")
    logger.info(f"The DictVectorizer has {len(dv.feature_names_)} features")

    lr = LinearRegression()
    lr.fit(X_train, y_train)
    y_pred = lr.predict(X_train)
    mse = mean_squared_error(y_train, y_pred, squared=False)
    logger.info(f"The MSE of training is: {mse}")
    return lr, dv

In [11]:
@task(name="validation")
def run_model(df: pd.DataFrame, categorical: list[str], dv: DictVectorizer, lr: LinearRegression) -> None:
    
    logger = get_run_logger()
    val_dicts = df[categorical].to_dict(orient='records')
    X_val = dv.transform(val_dicts) 
    y_pred = lr.predict(X_val)
    y_val = df.duration.values

    mse = mean_squared_error(y_val, y_pred, squared=False)
    logger.info(f"The MSE of validation is: {mse}")
    return

In [12]:
@flow(task_runner=SequentialTaskRunner())
def main(date: str = None) -> None:
    mlflow.set_tracking_uri('sqlite:///mlflow.db')
    mlflow.set_experiment('hw3')
    
    with mlflow.start_run():
        train_path, val_path = get_paths(date).result()
        categorical = ['PUlocationID', 'DOlocationID']
        df_train = read_data(train_path)
        df_train_processed = prepare_features(df_train, categorical)
        df_val = read_data(val_path)
        df_val_processed = prepare_features(df_val, categorical)
        lr, dv = train_model(df_train_processed, categorical).result()
        run_model(df_val_processed, categorical, dv, lr)
        with open(f"models/dv-{date}.b", "wb") as f_out:
            pickle.dump(dv, f_out)
        with open(f"models/model-{date}.bin", "wb") as f_out:
            pickle.dump(lr, f_out)
        mlflow.log_artifact(f"models/dv-{date}.b", artifact_path="preprocessor")
        mlflow.sklearn.log_model(lr, artifact_path=f"models/model-{date}.bin")
    
    

In [14]:
DeploymentSpec(
    flow=main,
    name='prefect',
    schedule=CronSchedule(cron='0 9 15 * *'),
    flow_runner=SubprocessFlowRunner(),
    tags=['orchestration']
)

DeploymentSpec(name='prefect', flow=<prefect.flows.Flow object at 0x7f2a4024f610>, flow_name=None, flow_location=None, flow_storage=None, parameters=None, schedule=CronSchedule(cron='0 9 15 * *', timezone=None, day_or=True), tags=['orchestration'], flow_runner=UniversalFlowRunner(typename='universal', env={}))

In [55]:
main("2021-08-15")

INFO  [alembic.runtime.migration] Context impl SQLiteImpl.
INFO  [alembic.runtime.migration] Will assume non-transactional DDL.
22:15:28.640 | INFO    | prefect.engine - Created flow run 'inquisitive-skylark' for flow 'main'
22:15:28.645 | INFO    | Flow run 'inquisitive-skylark' - Using task runner 'SequentialTaskRunner'
22:15:28.891 | INFO    | Flow run 'inquisitive-skylark' - Created task run 'path getting-0fafd328-3' for task 'path getting'
22:15:29.016 | INFO    | Task run 'path getting-0fafd328-3' - Finished in state Completed()
22:15:29.057 | INFO    | Flow run 'inquisitive-skylark' - Created task run 'data reading-7eb2586d-6' for task 'data reading'
22:15:32.740 | INFO    | Task run 'data reading-7eb2586d-6' - Finished in state Completed()
22:15:32.774 | INFO    | Flow run 'inquisitive-skylark' - Created task run 'feature preparing-543bd73d-6' for task 'feature preparing'
22:15:33.030 | INFO    | Task run 'feature preparing-543bd73d-6' - The mean duration of training is 18.2305

Completed(message='All states completed.', type=COMPLETED, result=[Completed(message=None, type=COMPLETED, result=('./data/fhv_tripdata_2021-06.parquet', './data/fhv_tripdata_2021-07.parquet'), task_run_id=36d8dfad-14c5-41c6-8308-c03924e6cc08), Completed(message=None, type=COMPLETED, result=        dispatching_base_num     pickup_datetime    dropOff_datetime  \
0                     B00021 2021-06-01 00:40:51 2021-06-01 00:50:22   
1                     B00021 2021-06-01 00:51:23 2021-06-01 01:19:01   
2                     B00021 2021-06-01 00:07:10 2021-06-01 00:17:34   
3                     B00021 2021-06-01 00:22:08 2021-06-01 00:27:22   
4                     B00021 2021-06-01 00:59:06 2021-06-01 01:06:58   
...                      ...                 ...                 ...   
1311341               B03321 2021-06-30 23:00:00 2021-06-30 23:45:00   
1311342               B03321 2021-06-30 23:00:00 2021-06-30 23:35:00   
1311343               B03340 2021-06-30 23:00:00 2021-06-30 