In [8]:
from sklearn.feature_extraction import DictVectorizer
from sklearn.linear_model import LinearRegression
from sklearn.ensemble import RandomForestRegressor
from sklearn.metrics import mean_squared_error
from sklearn.pipeline import make_pipeline

import pickle 
import pandas as pd
import mlflow
import os
import uuid

In [9]:
year = 2021
month= 2
taxi_type = 'green'

INPUT_PATH= f'https://d37ci6vzurychx.cloudfront.net/trip-data/{taxi_type}_tripdata_{year:04d}-{month:02d}.parquet'
OUTPUT_PATH = f'./output/{taxi_type}/{year:04d}-{month:02d}.parquet'
RUN_ID = os.getenv('RUN_ID', 'e041808c2a2849919731587f28600398')

In [10]:
def generate_uuids(n):
    ride_ids =[]
    for i in range(n):
        ride_ids.append(str(uuid.uuid4()))
    return ride_ids

def read_data(filename:str):
    df = pd.read_parquet(filename)
    
    df['duration'] = df.lpep_dropoff_datetime - df.lpep_pickup_datetime 
    df.duration = df.duration.dt.total_seconds() / 60
    df = df[(df.duration >= 1) & (df.duration <= 60)]
    
    df['ride_id'] = generate_uuids(len(df))
    
    return df

def prepare_dictionaries(df:pd.DataFrame):
    categorical = ['PULocationID','DOLocationID']
    df[categorical] = df[categorical].astype(str)
    
    df['PICKUP_DROPOFF'] = df['PULocationID'] + '_' + df['DOLocationID']
    categorical = ['PICKUP_DROPOFF']
    numerical = ['trip_distance']
    dict_dataset = df[categorical + numerical].to_dict(orient='records')
    return dict_dataset

In [11]:
def load_model(run_id):
    
    # Load model as a PyFuncModel.
    logged_model = f"../web-service-mlflow-model/mlflow-artifacts/1/{run_id}/artifacts/model"
    model = mlflow.pyfunc.load_model(logged_model)
    return model
    

def apply_model(input_path, output_path, run_id):
    
    df = read_data(input_path)
    dicts = prepare_dictionaries(df)
    
    ## load model
    model = load_model(run_id)
    
    ## predict model
    y_pred = model.predict(dicts)
    
    ### create empty dataframe
    result = pd.DataFrame()
    result['PULocationID'] = df['PULocationID']
    result['DOLocationID']=df['DOLocationID']
    result['actual_duration'] = df['duration']
    result['predicted_duration'] = y_pred
    result['diff'] = result['actual_duration'] - result['predicted_duration']
    result['model_version'] = RUN_ID
    
    result.to_parquet(output_path, index=False) 

In [12]:
apply_model(input_path=INPUT_PATH, output_path=OUTPUT_PATH, run_id=RUN_ID)