In [1]:
##SutejaPatil

Name of orchestrator - Prefect

Version of orchestrator

In [2]:
!prefect --version

3.4.5


In [20]:
import pandas as pd
import os

In [4]:
march_2023 =  pd.read_parquet('https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2023-03.parquet')


In [5]:
march_2023.shape

(3403766, 19)

In [6]:
print('Total records :', march_2023.shape[0])

Total records : 3403766


Data Preparation

In [7]:
def read_dataframe(filename):
    df = pd.read_parquet(filename)

    df['duration'] = df.tpep_dropoff_datetime - df.tpep_pickup_datetime
    df.duration = df.duration.dt.total_seconds() / 60

    df = df[(df.duration >= 1) & (df.duration <= 60)]

    categorical = ['PULocationID', 'DOLocationID']
    df[categorical] = df[categorical].astype(str)
    
    return df

In [8]:
filename = 'https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2023-03.parquet'

In [9]:
df = read_dataframe(filename)

In [10]:
print('Size of the result: ',df.shape[0])

Size of the result:  3316216


In [11]:
from prefect import task,flow
from sklearn.feature_extraction import DictVectorizer
from sklearn.linear_model import LinearRegression
from sklearn.metrics import mean_squared_error
import numpy as np
import mlflow

In [12]:
@task
def clean_data(df:pd.DataFrame):
    df['duration'] = df['tpep_dropoff_datetime'] - df['tpep_pickup_datetime']
    df['duration'] = pd.to_timedelta(df['duration']).dt.total_seconds() / 60
    df_no_outliers = df[df['duration'].between(1,60)]
    df_new = df_no_outliers[['PULocationID','DOLocationID']]
    df_new['PULocationID'] = df_new['PULocationID'].astype(str)
    df_new['DOLocationID'] = df_new['DOLocationID'].astype(str)
    return df_new, df_no_outliers

In [13]:
@task
def vectorize_data(df:pd.DataFrame, df_no_outliers:pd.DataFrame):
    dicts = df[['PULocationID', 'DOLocationID']].to_dict(orient='records')
    dv = DictVectorizer()
    X = dv.fit_transform(dicts)
    y =  df_no_outliers['duration'].values
    return X,y


In [14]:
@task
def train_model(X,y):
    model = LinearRegression()
    model.fit(X, y)
    print("******** Model intercept ********* :",model.intercept_)
    return model, model.intercept_

In [15]:
@flow
def main(df):
    df_new, df_no_outliers = clean_data(df)
    X,y = vectorize_data(df_new, df_no_outliers)
    model,model_intercept = train_model(X,y)

    with mlflow.start_run():
        mlflow.sklearn.log_model(
            sk_model=model,
            artifact_path="model",
            input_example=X.iloc[:5] if hasattr(X, 'iloc') else X[:5],
            registered_model_name="sk-learn-linear-reg-model"
        )
    
    return model_intercept

In [16]:
if __name__ == '__main__':
    model_intercept = main(df)

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  df_new['PULocationID'] = df_new['PULocationID'].astype(str)
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  df_new['DOLocationID'] = df_new['DOLocationID'].astype(str)


******** Model intercept ********* : 24.774857138926837


Successfully registered model 'sk-learn-linear-reg-model'.
Created version '1' of model 'sk-learn-linear-reg-model'.


In [17]:
print('Model Intercept : ', model_intercept)

Model Intercept :  24.774857138926837


In [19]:
model_uri = "models:/sk-learn-linear-reg-model/1"

In [21]:
local_path = mlflow.artifacts.download_artifacts(artifact_uri=model_uri)
        
total_size = 0
for dirpath, _, filenames in os.walk(local_path):
    for f in filenames:
        fp = os.path.join(dirpath, f)
        total_size += os.path.getsize(fp)

In [22]:
total_size

34714