In [13]:
from prefect import flow, task
import pandas as pd
import numpy as np
import scipy
import sklearn
from sklearn.feature_extraction import DictVectorizer
import mlflow
import mlflow.sklearn
import pickle
import os

In [3]:
# Let's read the march 2023 data
filename = "../data/yellow_tripdata_2023-03.parquet"
df = pd.read_parquet(filename)


In [4]:

print(len(df))

3403766


In [5]:
@task(retries=3, retry_delay_seconds=2)
def read_dataframe(filename):
    df = pd.read_parquet(filename)

    df['duration'] = df.tpep_dropoff_datetime - df.tpep_pickup_datetime
    df.duration = df.duration.dt.total_seconds() / 60

    df = df[(df.duration >= 1) & (df.duration <= 60)]

    categorical = ['PULocationID', 'DOLocationID']
    df[categorical] = df[categorical].astype(str)
    
    return df

In [6]:
df = read_dataframe(filename)
print(len(df))

3316216


In [12]:
df.columns

Index(['VendorID', 'tpep_pickup_datetime', 'tpep_dropoff_datetime',
       'passenger_count', 'trip_distance', 'RatecodeID', 'store_and_fwd_flag',
       'PULocationID', 'DOLocationID', 'payment_type', 'fare_amount', 'extra',
       'mta_tax', 'tip_amount', 'tolls_amount', 'improvement_surcharge',
       'total_amount', 'congestion_surcharge', 'Airport_fee', 'duration'],
      dtype='object')

In [7]:
from typing import Tuple
from sklearn.linear_model import LinearRegression
import scipy.sparse

@task
def train_model(
    df_train: pd.DataFrame
) -> Tuple[LinearRegression, DictVectorizer]:
    
    categorical = ['PULocationID', 'DOLocationID']
    dv = DictVectorizer()

    train_dicts = df_train[categorical].to_dict(orient="records")
    X_train = dv.fit_transform(train_dicts)
    X_train = scipy.sparse.csr_matrix(X_train)  # Ensure type is csr_matrix

    y_train = df_train["duration"].to_numpy()

    model = LinearRegression()
    model.fit(X_train, y_train)

    # print the intercept
    print(f"Intercept: {model.intercept_}")
 
    return model, dv

In [8]:
model, dv = train_model(df)

Intercept: 24.776420921142414


In [None]:
@task
def register_mlflow_model(
    model: LinearRegression, 
    dv: DictVectorizer, 
    model_name: str = "nyc-taxi-duration-predictor"
):
    with mlflow.start_run() as run:

        # Log the sklearn model
        mlflow.sklearn.log_model(
            sk_model=model,
            name="linear-regression-model",
            registered_model_name=model_name,
        )

        # Save and log the DictVectorizer as an artifact
        dv_path = "dict_vectorizer.pkl"
        with open(dv_path, "wb") as f_out:
            pickle.dump(dv, f_out)
        mlflow.log_artifact(dv_path, artifact_path="preprocessor")
        os.remove(dv_path)

        mlflow.set_tag("model_name", model_name)
        print(f"Model registered in run {run.info.run_id}")
    return run.info.run_id

register_mlflow_model(model, dv, model_name="nyc-taxi-duration-predictor")

Registered model 'nyc-taxi-duration-predictor' already exists. Creating a new version of this model...
Created version '3' of model 'nyc-taxi-duration-predictor'.
INFO  [prefect.task_runs] Finished in state Completed()


Model registered in run 283391d17a0a45fda252d16633f84234


In [14]:
@flow
def main(
    filename: str = "../data/yellow_tripdata_2023-03.parquet"
):

    # MLflow settings
    mlflow.set_tracking_uri("sqlite:///mlflow.db")
    mlflow.set_experiment("nyc-taxi-experiment")

    # Read the data
    print(f"Reading data from {filename}")
    print("Starting the flow...")
    df = read_dataframe(filename)

    # Train the model
    print("Training the model...")
    print(f"Number of records in the dataframe: {len(df)}")
    print(f"Columns in the dataframe: {df.columns.tolist()}")
    print("Training the model...")
    print("Model training started...")
    model, dv = train_model(df)

    # Register the model
    print("Registering the model with MLflow...")
    print(f"Model name: nyc-taxi-duration-predictor")
    print("Model registration in progress...")
    print("Saving the model and DictVectorizer...")
    print("Model and DictVectorizer saved successfully.")
    run_id = register_mlflow_model(model, dv, model_name="nyc-taxi-duration-predictor")
    print(f"Model registered with run ID: {run_id}")


In [15]:
if __name__ == "__main__":
    main()
    print("Flow completed successfully.")

INFO  [prefect.flow_runs] Beginning flow run 'eccentric-reindeer' for flow 'main'
INFO  [prefect.flow_runs] View at http://127.0.0.1:4200/runs/flow-run/6dd4b299-ae9b-4da4-afea-f41c1ccc793c


Reading data from ../data/yellow_tripdata_2023-03.parquet
Starting the flow...


INFO  [prefect.task_runs] Finished in state Completed()


Training the model...
Number of records in the dataframe: 3316216
Columns in the dataframe: ['VendorID', 'tpep_pickup_datetime', 'tpep_dropoff_datetime', 'passenger_count', 'trip_distance', 'RatecodeID', 'store_and_fwd_flag', 'PULocationID', 'DOLocationID', 'payment_type', 'fare_amount', 'extra', 'mta_tax', 'tip_amount', 'tolls_amount', 'improvement_surcharge', 'total_amount', 'congestion_surcharge', 'Airport_fee', 'duration']
Training the model...
Model training started...


INFO  [prefect.task_runs] Finished in state Completed()


Intercept: 24.776420921142414
Registering the model with MLflow...
Model name: nyc-taxi-duration-predictor
Model registration in progress...
Saving the model and DictVectorizer...
Model and DictVectorizer saved successfully.


Registered model 'nyc-taxi-duration-predictor' already exists. Creating a new version of this model...
Created version '4' of model 'nyc-taxi-duration-predictor'.
INFO  [prefect.task_runs] Finished in state Completed()
INFO  [prefect.flow_runs] Finished in state Completed()


Model registered in run 1dbfb9ed6da546178940a4bb86b357e9
Model registered with run ID: 1dbfb9ed6da546178940a4bb86b357e9
Flow completed successfully.
