In [1]:
import pandas as pd

In [2]:
import prefect

In [3]:
prefect.__version__

'3.4.4'

In [None]:
import pathlib
import pickle
import pandas as pd
import numpy as np
import mlflow
import mlflow.sklearn
from sklearn.feature_extraction import DictVectorizer
from sklearn.linear_model import LinearRegression
from sklearn.metrics import mean_squared_error
from prefect import flow, task


@task(retries=3, retry_delay_seconds=2)
def download_data(year: int, month: int, taxi: str = "green") -> pd.DataFrame:
    url = f"https://d37ci6vzurychx.cloudfront.net/trip-data/{taxi}_tripdata_{year}-{month:02d}.parquet"
    print(f"Downloading data from {url}")
    try:
        df = pd.read_parquet(url)
        print(f"Downloaded {len(df)} records")
        return df
    except Exception as e:
        print(f"Error downloading data: {e}")
        raise


@task
def prepare_data(df: pd.DataFrame) -> pd.DataFrame:
    df.lpep_dropoff_datetime = pd.to_datetime(df.lpep_dropoff_datetime)
    df.lpep_pickup_datetime = pd.to_datetime(df.lpep_pickup_datetime)

    df["duration"] = df.lpep_dropoff_datetime - df.lpep_pickup_datetime
    df["duration"] = df.duration.apply(lambda td: td.total_seconds() / 60)

    df = df[(df.duration >= 1) & (df.duration <= 60)]

    df["PULocationID"] = df["PULocationID"].astype(str)
    df["DOLocationID"] = df["DOLocationID"].astype(str)

    return df


@task
def split_features(df: pd.DataFrame, fit_dv: bool = True, dv: DictVectorizer = None):
    """
    Transforms categorical features into vectors using DictVectorizer.

    Args:
        df: Preprocessed DataFrame
        fit_dv: Whether to fit a new DictVectorizer (True) or use an existing one (False)
        dv: Existing DictVectorizer to use when fit_dv is False

    Returns:
        X: Transformed feature matrix
        y: Target vector
        dv: DictVectorizer (fitted)
    """
    categorical = ["PULocationID", "DOLocationID"]
    dicts = df[categorical].to_dict(orient="records")

    if fit_dv:
        dv = DictVectorizer()
        X = dv.fit_transform(dicts)
    else:
        if dv is None:
            raise ValueError("DictVectorizer must be provided when fit_dv=False")
        X = dv.transform(dicts)

    y = df["duration"].values
    return X, y, dv



@task(log_prints=True)
def train_and_register_model(X, y, dv):
    X_train, X_val, y_train, y_val = X, X, y, y

    with mlflow.start_run():
        model = LinearRegression()
        model.fit(X_train, y_train)

        y_pred = model.predict(X_val)
        rmse = mean_squared_error(y_val, y_pred, squared=False)

        print(f"Intercept of the model: {model.intercept_}")
        print(f"RMSE: {rmse}")

        mlflow.log_metric("rmse", rmse)
        mlflow.sklearn.log_model(model, artifact_path="model", registered_model_name="homework-model")

        pathlib.Path("models").mkdir(exist_ok=True)
        with open("models/dv.pkl", "wb") as f_out:
            pickle.dump(dv, f_out)
        mlflow.log_artifact("models/dv.pkl", artifact_path="preprocessor")


@flow
def main_flow(year: int = 2023, month: int = 3, taxi: str = "yellow"):
    mlflow.set_tracking_uri("sqlite:///mlflow.db")
    mlflow.set_experiment("nyc-taxi-linear-model")

    df = download_data(year, month, taxi)
    df_clean = prepare_data(df)
    X, y, dv = split_features(df_clean)
    train_and_register_model(X, y, dv)


if __name__ == "__main__":
    main_flow()

In [1]:
!python /home/habeeb/Mlops-proj/03-orchestration/mlops-prefect/prefect-mlops-zoomcamp/homework/prefect-hw.py

16:10:20.681 | [36mINFO[0m    | Flow run[35m 'melodic-kagu'[0m - Beginning flow run[35m 'melodic-kagu'[0m for flow[1;35m 'main-flow'[0m
16:10:20.686 | [36mINFO[0m    | Flow run[35m 'melodic-kagu'[0m - View at [94mhttp://127.0.0.1:4200/runs/flow-run/2b27cd24-9123-45ec-8c08-c8380e07bf5d[0m
Downloading data from https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2023-03.parquet
Downloaded 3403766 records
16:10:22.030 | [36mINFO[0m    | Task run 'download_data-2a3' - Finished in state [32mCompleted[0m()
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  df["PULocationID"] = df["PULocationID"].astype(str)
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the 