In [1]:
import os
import json

from datetime import datetime
from datetime import timedelta
from dateutil.relativedelta import relativedelta

from pathlib import Path

import mlflow
import pandas as pd

In [2]:
MODEL_VERSION = os.getenv('MODEL_VERSION')
MODEL_URI = os.getenv('MODEL_URI')

In [3]:
MODEL_VERSION = '7c373fc9626549ed91cebb714b07e60a'
MODEL_URI = 's3://mlflow-models-alexey/1/7c373fc9626549ed91cebb714b07e60a/artifacts/model'

In [4]:
model = mlflow.pyfunc.load_model(MODEL_URI)

 - mlflow (current: 2.4.1, required: mlflow==2.4)
To fix the mismatches, call `mlflow.pyfunc.get_model_dependencies(model_uri)` to fetch the model's environment and install dependencies using the resulting environment file.


In [9]:
def prepare_features(ride):
    features = {}
    features['PULocationID'] = ride['PULocationID']
    features['DOLocationID'] = ride['DOLocationID']
    features['trip_distance'] = ride['trip_distance']
    return features


def predict(features):
    preds = model.predict(features)
    return float(preds[0])


def predict_endpoint(body):
    ride = body['ride']
    ride_id = body['ride_id']

    features = prepare_features(ride)
    pred = predict(features)

    result = {
        'prediction': {
            'duration': pred,
        }
    }

    prediction_event = {
        'ride_id': ride_id,
        'ride': ride,
        'features': features,
        'prediction': result,
        'version': MODEL_VERSION,
    }

    return prediction_event

In [10]:
data = Path('data')
data.mkdir(exist_ok=True)

In [11]:
def read_dataframe(filename):
    df = pd.read_parquet(filename)

    df['duration'] = df.lpep_dropoff_datetime - df.lpep_pickup_datetime
    df.duration = df.duration.dt.total_seconds() / 60

    df = df[(df.duration >= 1) & (df.duration <= 60)]

    categorical = ['PULocationID', 'DOLocationID']
    df[categorical] = df[categorical].astype(str)
    
    return df

In [12]:
features = [
    'ride_id',
    'lpep_pickup_datetime', 'lpep_dropoff_datetime',
    'PULocationID', 'DOLocationID',
    'trip_distance', 'duration'
]

In [15]:
data_tuples = [(2022, 1), (2022, 2), (2022, 3), (2023, 1)]

for y, m in data_tuples:
    print(f'processing {y} {m}...')

    url = f'https://d37ci6vzurychx.cloudfront.net/trip-data/green_tripdata_{y:04d}-{m:02d}.parquet'
    df = read_dataframe(url)

    df['ride_id'] = f'{y:04d}-{m:02d}-' + df.index.astype('str')
    df = df[features]

    rows = df[['PULocationID', 'DOLocationID', 'trip_distance']].to_dict(orient='records')
    df['prediction'] = model.predict(rows)
    
    target_folder = data / f'{y:04d}' / f'{m:02d}' 
    target_folder.mkdir(parents=True, exist_ok=True)

    target_file = target_folder / f'{y:04d}-{m:02d}-full.parquet'
    df.to_parquet(target_file, index=False)
    print(f'saved {target_file}')

    first_day = datetime(year=y, month=m, day=1, hour=0, minute=0, second=0)
    end_of_month = first_day + relativedelta(months=1)

    today_midnight = first_day

    while today_midnight < end_of_month:
        tomorrow_midnight = today_midnight + timedelta(days=1)
        
        # chunk the original data

        target_file = target_folder / f'{today_midnight.year:04d}-{today_midnight.month:02d}-{today_midnight.day:02d}.parquet'

        df_today = df[(df.lpep_dropoff_datetime >= today_midnight) & (df.lpep_dropoff_datetime < tomorrow_midnight)]
        df_today.to_parquet(target_file, index=False)

        print(f'saved {target_file}')

        # simulate predictions
    
        rows = df_today.to_dict(orient='records')

        filename = f'{today_midnight.year:04d}-{today_midnight.month:02d}-{today_midnight.day:02d}-predictions.jsonl'
        target_file = target_folder / filename

        with target_file.open('wt', encoding='utf-8') as f_out:
            for row in rows:
                body = {
                    'ride': {
                        'PULocationID': row['PULocationID'],
                        'DOLocationID': row['DOLocationID'],
                        'trip_distance': row['trip_distance'],
                    },
                    'ride_id': row['ride_id']
                } 

                prediction_event = predict_endpoint(body)
                f_out.write(json.dumps(prediction_event) + '\n')

        today_midnight = tomorrow_midnight

    print()

processing 2022 1...
saved data/2022/01/2022-01-full.parquet
saved data/2022/01/2022-01-01.parquet
saved data/2022/01/2022-01-02.parquet
saved data/2022/01/2022-01-03.parquet
saved data/2022/01/2022-01-04.parquet
saved data/2022/01/2022-01-05.parquet
saved data/2022/01/2022-01-06.parquet
saved data/2022/01/2022-01-07.parquet
saved data/2022/01/2022-01-08.parquet
saved data/2022/01/2022-01-09.parquet
saved data/2022/01/2022-01-10.parquet
saved data/2022/01/2022-01-11.parquet
saved data/2022/01/2022-01-12.parquet
saved data/2022/01/2022-01-13.parquet
saved data/2022/01/2022-01-14.parquet
saved data/2022/01/2022-01-15.parquet
saved data/2022/01/2022-01-16.parquet
saved data/2022/01/2022-01-17.parquet
saved data/2022/01/2022-01-18.parquet
saved data/2022/01/2022-01-19.parquet
saved data/2022/01/2022-01-20.parquet
saved data/2022/01/2022-01-21.parquet
saved data/2022/01/2022-01-22.parquet
saved data/2022/01/2022-01-23.parquet
saved data/2022/01/2022-01-24.parquet
saved data/2022/01/2022-01-