In [1]:
import pickle
import pandas as pd
import os 
import uuid

import mlflow

from sklearn.feature_extraction import DictVectorizer
from sklearn.ensemble import RandomForestRegressor
from sklearn.metrics import mean_squared_error
from sklearn.pipeline import make_pipeline

#### Get and run model with its ID from AWS S3 bucket

In [2]:
RUN_ID = os.getenv('RUN_ID','729cc0899e22468bbe0bf20321b84225')    #'729cc0899e22468bbe0bf20321b84225'

# model from aws s3 bucket
logged_model = f's3://mlops-remote-bucket/3/{RUN_ID}/artifacts/model'

# Load model as a PyFuncModel.
model = mlflow.pyfunc.load_model(logged_model)

In [3]:
model                                # check the model for confirmation

mlflow.pyfunc.loaded_model:
  artifact_path: model
  flavor: mlflow.sklearn
  run_id: 729cc0899e22468bbe0bf20321b84225

#### Since we want to apply/check how the predicted values differs from actual, we keep target variable duration, But in real world, we omit the target varaible, since it's our prediction. 

In [4]:
def read_dataframe(filename: str):
    df = pd.read_parquet(filename)

    df['duration'] = df.lpep_dropoff_datetime - df.lpep_pickup_datetime
    df.duration = df.duration.dt.total_seconds() / 60
    df = df[(df.duration >= 1) & (df.duration <= 60)]
    return df


def prepare_dictionaries(df: pd.DataFrame):
    categorical = ['PULocationID', 'DOLocationID']
    df[categorical] = df[categorical].astype(str)
    
    df['PU_DO'] = df['PULocationID'] + '_' + df['DOLocationID']
    categorical = ['PU_DO']
    numerical = ['trip_distance']
    dicts = df[categorical + numerical].to_dict(orient='records')
    return dicts

#### Since it's model application, there will be no train and val dataset. Just a dataframe df and no target value--duration.

In [5]:
# df_train = read_dataframe('data/green_tripdata_2021-01.parquet')
# df_val = read_dataframe('data/green_tripdata_2021-02.parquet')

#target = 'duration'
#y_train = df_train[target].values
#y_val = df_val[target].values

#dict_train = prepare_dictionaries(df_train)
#dict_val = prepare_dictionaries(df_val)

df = read_dataframe('data/green_tripdata_2021-01.parquet')
dict = prepare_dictionaries(df)

y_pred = model.predict(dict)

In [6]:
y_pred

array([ 6.86271117, 13.36872083,  6.3608707 , ..., 14.43650924,
       37.09262214, 11.10083955])

#### We create a result dataframe and write predictions to it. Also, generate a uuid (unique id for each row in a dataframe)

In [7]:
str(uuid.uuid4())                   # this unique number changes when you run over and over

'611ddf09-e824-433c-ad50-fc3720b381b2'

In [8]:
# Attach uuid to every row in the dataframe using for loop

n = len(df)
ride_id = []

for i in range(n):
    ride_id.append(str(uuid.uuid4()))

In [9]:
df['ride_id'] = ride_id            # create a new column with the uuid attached to every row

In [10]:
df.head(2)

Unnamed: 0,VendorID,lpep_pickup_datetime,lpep_dropoff_datetime,store_and_fwd_flag,RatecodeID,PULocationID,DOLocationID,passenger_count,trip_distance,fare_amount,...,tolls_amount,ehail_fee,improvement_surcharge,total_amount,payment_type,trip_type,congestion_surcharge,duration,PU_DO,ride_id
0,2,2021-01-01 00:15:56,2021-01-01 00:19:52,N,1.0,43,151,1.0,1.01,5.5,...,0.0,,0.3,6.8,2.0,1.0,0.0,3.933333,43_151,9d54098d-9674-4449-a591-da6673c41080
1,2,2021-01-01 00:25:59,2021-01-01 00:34:44,N,1.0,166,239,1.0,2.53,10.0,...,0.0,,0.3,16.86,1.0,1.0,2.75,8.75,166_239,4d68f926-1644-4ad7-9339-4429ef28fbef


In [11]:
df_result = pd.DataFrame()            # result dataframe

In [12]:
df_result['ride_id'] = df['ride_id']                # assigning some features from df to df_results
df_result['lpep_pickup_datetime'] = df['lpep_pickup_datetime']
df_result['PULocationID'] = df['PULocationID']
df_result['DOLocationID'] = df['DOLocationID']
df_result['actual_duration'] = df['duration']
df_result['predicted_duration'] = y_pred
df_result['diff'] = df_result['actual_duration'] - df_result['predicted_duration']
df_result['model_version'] = RUN_ID

In [13]:
df_result.head(3)

Unnamed: 0,ride_id,lpep_pickup_datetime,PULocationID,DOLocationID,actual_duration,predicted_duration,diff,model_version
0,9d54098d-9674-4449-a591-da6673c41080,2021-01-01 00:15:56,43,151,3.933333,6.862711,-2.929378,729cc0899e22468bbe0bf20321b84225
1,4d68f926-1644-4ad7-9339-4429ef28fbef,2021-01-01 00:25:59,166,239,8.75,13.368721,-4.618721,729cc0899e22468bbe0bf20321b84225
2,89cd4769-4c0e-4026-82ed-01a402c9f971,2021-01-01 00:45:57,41,42,5.966667,6.360871,-0.394204,729cc0899e22468bbe0bf20321b84225


#### WE save the df_result got above in parquet format

In [14]:
!mkdir output

mkdir: cannot create directory ‘output’: File exists


In [15]:
df_result.to_parquet('output/green_tripdata_2021-01.parquet') 

#### replace the data of df and df_results as input and output files with variables. url can also be used as input file since pandas reads url
#### Pandas downloads the NYC dataset and reads without us downloading it. 04d is 4-digit and 02d is 2-digit
#### Putting all the above in a function and cleaning

In [16]:
year = 2021
month = 3
taxi_type = 'green'

input_file = f'https://d37ci6vzurychx.cloudfront.net/trip-data/{taxi_type}_tripdata_{year:04d}-{month:02d}.parquet'
output_file = f'output/{taxi_type}_tripdata_{year:04d}-{month:02d}.parquet'

RUN_ID = os.getenv('RUN_ID','729cc0899e22468bbe0bf20321b84225')

In [17]:
input_file, output_file

('https://d37ci6vzurychx.cloudfront.net/trip-data/green_tripdata_2021-03.parquet',
 'output/green_tripdata_2021-03.parquet')

In [28]:
def generate_uuid(n):
    ride_id = []
    for i in range(n):
        ride_id.append(str(uuid.uuid4()))
    return ride_id

def read_dataframe(filename: str):
    df = pd.read_parquet(filename)

    df['duration'] = df.lpep_dropoff_datetime - df.lpep_pickup_datetime
    df.duration = df.duration.dt.total_seconds() / 60
    df = df[(df.duration >= 1) & (df.duration <= 60)]

    df['ride_id'] = generate_uuid(len(df))
    return df


def prepare_dictionaries(df: pd.DataFrame):
    categorical = ['PULocationID', 'DOLocationID']
    df[categorical] = df[categorical].astype(str)
    
    df['PU_DO'] = df['PULocationID'] + '_' + df['DOLocationID']
    categorical = ['PU_DO']
    numerical = ['trip_distance']
    dicts = df[categorical + numerical].to_dict(orient='records')
    return dicts

In [29]:
def load_model(run_id):
    logged_model = f's3://mlops-remote-bucket/3/{RUN_ID}/artifacts/model'
    model = mlflow.pyfunc.load_model(logged_model)
    return model


def apply_model(input_file, run_id, output_file):
    df = read_dataframe(input_file)
    dict = prepare_dictionaries(df)

    model = load_model(run_id)
    y_pred = model.predict(dict)

    df_result = pd.DataFrame() 
    df_result['ride_id'] = df['ride_id']                
    df_result['lpep_pickup_datetime'] = df['lpep_pickup_datetime']
    df_result['PULocationID'] = df['PULocationID']
    df_result['DOLocationID'] = df['DOLocationID']
    df_result['actual_duration'] = df['duration']
    df_result['predicted_duration'] = y_pred
    df_result['diff'] = df_result['actual_duration'] - df_result['predicted_duration']
    df_result['model_version'] = run_id

    df_result.to_parquet(output_file, index=False)

In [30]:
# test/call the apply model function

apply_model(input_file=input_file, run_id=RUN_ID, output_file=output_file)

In [32]:
!ls output                            

green_tripdata_2021-01.parquet	green_tripdata_2021-03.parquet
green_tripdata_2021-02.parquet


In [27]:
# to execute for February..... run the above codes again

year = 2021
month = 2
taxi_type = 'green'

input_file = f'https://d37ci6vzurychx.cloudfront.net/trip-data/{taxi_type}_tripdata_{year:04d}-{month:02d}.parquet'
output_file = f'output/{taxi_type}_tripdata_{year:04d}-{month:02d}.parquet'

RUN_ID = os.getenv('RUN_ID','729cc0899e22468bbe0bf20321b84225')