In [1]:
import pandas as pd

In [2]:
def read_dataframe(path):
    df = pd.read_parquet(path, engine="pyarrow")
    df.to_parquet("../data/processed/read_data.parquet")
    return df

In [3]:
df = read_dataframe("../data/yellow_tripdata_2024-11.parquet")

In [4]:
df.head()

Unnamed: 0,VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge,Airport_fee
0,2,2024-11-01 00:46:24,2024-11-01 00:57:17,1.0,1.93,1.0,N,239,262,2,-12.8,-1.0,-0.5,0.0,0.0,-1.0,-17.8,-2.5,0.0
1,2,2024-11-01 00:46:24,2024-11-01 00:57:17,1.0,1.93,1.0,N,239,263,2,12.8,1.0,0.5,0.0,0.0,1.0,17.8,2.5,0.0
2,1,2024-11-01 00:37:36,2024-11-01 01:28:36,1.0,34.3,5.0,N,219,265,1,259.0,0.0,0.0,15.0,0.0,1.0,275.0,0.0,0.0
3,2,2024-11-01 00:12:55,2024-11-01 00:22:17,2.0,0.93,1.0,N,186,107,1,10.0,1.0,0.5,1.0,0.0,1.0,16.0,2.5,0.0
4,2,2024-11-01 00:54:45,2024-11-01 00:59:47,1.0,0.38,1.0,N,79,79,1,6.5,1.0,0.5,1.0,0.0,1.0,12.5,2.5,0.0


In [13]:
def process_dataframe(path):
    df = pd.read_parquet(path, engine="pyarrow")
    df["tpep_dropoff_datetime"] = pd.to_datetime(df["tpep_dropoff_datetime"])
    df["tpep_pickup_datetime"] = pd.to_datetime(df["tpep_pickup_datetime"])

    df["duration"] = (df["tpep_dropoff_datetime"] - df["tpep_pickup_datetime"]).dt.total_seconds() / 60

    df = df[(df["duration"] >= 1) & (df["duration"] <= 60)]

    categorical = ["PULocationID", "DOLocationID"]
    df[categorical] = df[categorical].astype(str)

    df['PU_DO'] = df['PULocationID'] + '_' + df['DOLocationID']

    df.to_parquet("../data/processed/processed_data.parquet")
    return df

In [14]:
df = process_dataframe("../data/processed/read_data.parquet")

In [12]:
df.head()

Unnamed: 0,VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,...,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge,Airport_fee,duration,PU_DO
0,2,2024-11-01 00:46:24,2024-11-01 00:57:17,1.0,1.93,1.0,N,239,262,2,...,-1.0,-0.5,0.0,0.0,-1.0,-17.8,-2.5,0.0,10.883333,239_262
1,2,2024-11-01 00:46:24,2024-11-01 00:57:17,1.0,1.93,1.0,N,239,263,2,...,1.0,0.5,0.0,0.0,1.0,17.8,2.5,0.0,10.883333,239_263
2,1,2024-11-01 00:37:36,2024-11-01 01:28:36,1.0,34.3,5.0,N,219,265,1,...,0.0,0.0,15.0,0.0,1.0,275.0,0.0,0.0,51.0,219_265
3,2,2024-11-01 00:12:55,2024-11-01 00:22:17,2.0,0.93,1.0,N,186,107,1,...,1.0,0.5,1.0,0.0,1.0,16.0,2.5,0.0,9.366667,186_107
4,2,2024-11-01 00:54:45,2024-11-01 00:59:47,1.0,0.38,1.0,N,79,79,1,...,1.0,0.5,1.0,0.0,1.0,12.5,2.5,0.0,5.033333,79_79


In [25]:
from sklearn.metrics import mean_squared_error
from sklearn.model_selection import train_test_split
from sklearn.linear_model import Lasso
import pickle
import mlflow
import mlflow.sklearn

In [26]:
def train_model(path):
    df = pd.read_parquet(path, engine="pyarrow")
    
    categorical = ['PU_DO']
    numerical = ['trip_distance']
    target = 'duration'
    
    train_dicts = df[categorical + numerical].to_dict(orient='records')
    x_train, x_val, y_train, y_val = train_test_split(df, df[target], test_size=0.2, random_state=40)

    dv = DictVectorizer()
    X_train = dv.fit_transform(x_train[categorical + numerical].to_dict(orient='records'))
    X_val = dv.transform(x_val[categorical + numerical].to_dict(orient='records'))

    alpha = 1
    lr = Lasso(alpha)
    lr.fit(X_train, y_train)

    y_pred = lr.predict(X_val)
    rmse = mean_squared_error(y_val, y_pred)

    print(f"Validation RMSE: {rmse:.4f}")

    with open('../models/lin_reg.bin', 'wb') as f_out:
        pickle.dump((dv, lr), f_out)
    
    return rmse

In [22]:
train_model("../data/processed/processed_data.parquet")

Validation RMSE: 119.8390


119.83896118028501

In [29]:
def train_model_and_log_mlflow(path):
    
    df = pd.read_parquet(path, engine="pyarrow")
    
    categorical = ['PU_DO']
    numerical = ['trip_distance']
    target = 'duration'
    
    train_dicts = df[categorical + numerical].to_dict(orient='records')
    x_train, x_val, y_train, y_val = train_test_split(df, df[target], test_size=0.2, random_state=40)

    dv = DictVectorizer()
    X_train = dv.fit_transform(x_train[categorical + numerical].to_dict(orient='records'))
    X_val = dv.transform(x_val[categorical + numerical].to_dict(orient='records'))

    alpha = 0.1
    lr = Lasso(alpha)
    lr.fit(X_train, y_train)

    y_pred = lr.predict(X_val)
    rmse = mean_squared_error(y_val, y_pred)

    print(f"Validation RMSE: {rmse:.4f}")

    with open("../models/model.pkl", "wb") as f_out:
            pickle.dump(lr, f_out)
        
    mlflow.set_tracking_uri("http://127.0.0.1:5000")
    mlflow.set_experiment("experiment-1")
    with mlflow.start_run():
        mlflow.log_param("alpha", alpha)
        mlflow.log_metric("rmse", rmse)
        mlflow.sklearn.log_model(lr, artifact_path="models")
        
    return rmse

In [30]:
train_model_and_log_mlflow("../data/processed/processed_data.parquet")

2025/02/25 16:58:51 INFO mlflow.tracking.fluent: Experiment with name 'experiment-1' does not exist. Creating a new experiment.


Validation RMSE: 119.8391




🏃 View run colorful-foal-107 at: http://127.0.0.1:5000/#/experiments/1/runs/698d5539bba545068995948ad91a871c
🧪 View experiment at: http://127.0.0.1:5000/#/experiments/1


119.8390815529625

In [None]:
from mlflow.tracking import MlflowClient

In [None]:
client = MlflowClient("http://127.0.0.1:5000")
experiment_id = client.get_experiment_by_name("experiment-1").experiment_id
runs = client.search_runs(experiment_ids=[experiment_id], order_by=["start_time desc"], max_results=1)
run_id = runs[0].info.run_id

# Register model in Registry
mlflow.register_model(
    model_uri=f"runs:/{run_id}/models",
    name='iris-classifier-tracking-server'
)
