In [1]:
import pandas as pd
import numpy as np

from sklearn.feature_extraction import DictVectorizer
from sklearn.linear_model import LinearRegression, Ridge, Lasso
from sklearn.metrics import root_mean_squared_error
from sklearn.pipeline import Pipeline, FeatureUnion
from sklearn.base import BaseEstimator, RegressorMixin
from sklearn.preprocessing import FunctionTransformer
import xgboost as xgb

import seaborn as sns
import matplotlib.pyplot as plt

import mlflow

import pickle
import pathlib
import os
import uuid

In [2]:
year = 2021
month = 3
taxi_type = "green"

input_file = f"https://d37ci6vzurychx.cloudfront.net/trip-data/{taxi_type}_tripdata_{year:04d}-{month:02d}.parquet"
output_file = f"./output/{taxi_type}_tripdata_{year:04d}-{month:02d}.parquet"

RUN_ID = os.getenv("RUN_ID")


In [3]:
def read_data(filename):
    df = pd.read_parquet(filename)

    df["duration"] = df.lpep_dropoff_datetime - df.lpep_pickup_datetime
    
    df["duration"] = df.duration.apply(lambda x: x.total_seconds()/60)
    
    df = df[(df.duration >= 1) & (df.duration <= 60)]

    ride_ids = [str(uuid.uuid4()) for _ in range(len(df))]
    df["ride_id"] = ride_ids
    
    categorical = ["PULocationID", "DOLocationID"]
    
    df[categorical] = df[categorical].astype(str)

    return df

In [4]:
def prepare_data(df):
    df['PU_DU'] = df["PULocationID"] + '_' + df["DOLocationID"]

    categorical = ["PU_DU"]
    numeric = ["trip_distance"]

    dicts = df[categorical + numeric].to_dict(orient="records")

    return dicts


In [5]:
def load_model(run_id):
    logged_model = f"gs://mlflow_artifacts_nyc_taxi/1/{RUN_ID}/artifacts/models_mlflow"
    model = mlflow.pyfunc.load_model(logged_model)

    return model

In [6]:
def apply_model(input_file, run_id, output_file):
    df= read_data(input_file)
    dicts = prepare_data(df)

    model = load_model(run_id)
    y_pred = model.predict(dicts)

    df_results = df[['ride_id', "lpep_pickup_datetime", "PULocationID", "DOLocationID", "duration"]].copy()
    df_results.rename(columns={"duration": "actual_duration"}, inplace=True)
    df_results["predicted_duration"] = y_pred
    df_results["diff"] = df_results["predicted_duration"] - df_results["actual_duration"]
    df_results["model_version"] = run_id

    !mkdir output
    df_results.to_parquet(output_file)

In [7]:
apply_model(input_file, RUN_ID, output_file)

Downloading artifacts:   0%|          | 0/9 [00:00<?, ?it/s]

A subdirectory or file output already exists.
