In [2]:
import os
import sys
import pickle
import pandas as pd
from datetime import datetime

In [5]:
def prepare_data(df, categorical):
    df['duration'] = df.tpep_dropoff_datetime - df.tpep_pickup_datetime
    df['duration'] = df.duration.dt.total_seconds() / 60
    df = df[(df.duration >= 1) & (df.duration <= 60)].copy()
    df[categorical] = df[categorical].fillna(-1).astype('int').astype('str')
    # df['ride_id'] = f'{year:04d}/{month:02d}_' + df.index.astype('str')
    
    return df
    

def read_data(filename):
    df = pd.read_parquet(filename)        
    return df


def main(year, month):
    year = int(sys.argv[1])
    month = int(sys.argv[2])

    input_file = f'https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_{year:04d}-{month:02d}.parquet'

    os.makedirs('output', exist_ok=True)
    output_file = f'output/yellow_tripdata_{year:04d}-{month:02d}.parquet'


    with open('model.bin', 'rb') as f_in:
        dv, lr = pickle.load(f_in)

    categorical = ['PULocationID', 'DOLocationID']
    
    df = read_data(input_file)
    df_prerared = prepare_data(df, categorical)
  
    dicts = df_prerared[categorical].to_dict(orient='records')
    X_val = dv.transform(dicts)
    y_pred = lr.predict(X_val)


    print('predicted mean duration:', y_pred.mean())

    df_result = pd.DataFrame()
    df_result['ride_id'] = df_prerared['ride_id']
    df_result['predicted_duration'] = y_pred


    df_result.to_parquet(output_file, engine='pyarrow', index=False)

In [6]:
def dt(hour, minute, second=0):
    return datetime(2023, 1, 1, hour, minute, second)

In [7]:
data = [
    (None, None, dt(1, 1), dt(1, 10)),
    (1, 1, dt(1, 2), dt(1, 10)),
    (1, None, dt(1, 2, 0), dt(1, 2, 59)),
    (3, 4, dt(1, 2, 0), dt(2, 2, 1)),      
]

columns = ['PULocationID', 'DOLocationID', 'tpep_pickup_datetime', 'tpep_dropoff_datetime']
df = pd.DataFrame(data, columns=columns)
categorical = ['PULocationID', 'DOLocationID']

In [8]:
df

Unnamed: 0,PULocationID,DOLocationID,tpep_pickup_datetime,tpep_dropoff_datetime
0,,,2023-01-01 01:01:00,2023-01-01 01:10:00
1,1.0,1.0,2023-01-01 01:02:00,2023-01-01 01:10:00
2,1.0,,2023-01-01 01:02:00,2023-01-01 01:02:59
3,3.0,4.0,2023-01-01 01:02:00,2023-01-01 02:02:01


In [12]:
actual_result = prepare_data(df, categorical)
# print(f'Actual:\n{actual_result}')
actual_result

Unnamed: 0,PULocationID,DOLocationID,tpep_pickup_datetime,tpep_dropoff_datetime,duration
0,-1,-1,2023-01-01 01:01:00,2023-01-01 01:10:00,9.0
1,1,1,2023-01-01 01:02:00,2023-01-01 01:10:00,8.0


Unnamed: 0,PULocationID,DOLocationID,tpep_pickup_datetime,tpep_dropoff_datetime,duration
0,-1,-1,2023-01-01 01:01:00,2023-01-01 01:10:00,9.0
1,1,1,2023-01-01 01:02:00,2023-01-01 01:10:00,8.0


In [32]:
data = [
    (None, None, dt(1, 1), dt(1, 10)),
    (1, 1, dt(1, 2), dt(1, 10)),
    (1, None, dt(1, 2, 0), dt(1, 2, 59)),
    (3, 4, dt(1, 2, 0), dt(2, 2, 1)),      
]
columns = ['PULocationID', 'DOLocationID', 'tpep_pickup_datetime', 'tpep_dropoff_datetime']
df = pd.DataFrame(data, columns=columns)
df


Unnamed: 0,PULocationID,DOLocationID,tpep_pickup_datetime,tpep_dropoff_datetime
0,,,2023-01-01 01:01:00,2023-01-01 01:10:00
1,1.0,1.0,2023-01-01 01:02:00,2023-01-01 01:10:00
2,1.0,,2023-01-01 01:02:00,2023-01-01 01:02:59
3,3.0,4.0,2023-01-01 01:02:00,2023-01-01 02:02:01


In [33]:
df['duration'] = df.tpep_dropoff_datetime - df.tpep_pickup_datetime
df

Unnamed: 0,PULocationID,DOLocationID,tpep_pickup_datetime,tpep_dropoff_datetime,duration
0,,,2023-01-01 01:01:00,2023-01-01 01:10:00,0 days 00:09:00
1,1.0,1.0,2023-01-01 01:02:00,2023-01-01 01:10:00,0 days 00:08:00
2,1.0,,2023-01-01 01:02:00,2023-01-01 01:02:59,0 days 00:00:59
3,3.0,4.0,2023-01-01 01:02:00,2023-01-01 02:02:01,0 days 01:00:01


In [34]:
df['duration'] = df.duration.dt.total_seconds() / 60
df

Unnamed: 0,PULocationID,DOLocationID,tpep_pickup_datetime,tpep_dropoff_datetime,duration
0,,,2023-01-01 01:01:00,2023-01-01 01:10:00,9.0
1,1.0,1.0,2023-01-01 01:02:00,2023-01-01 01:10:00,8.0
2,1.0,,2023-01-01 01:02:00,2023-01-01 01:02:59,0.983333
3,3.0,4.0,2023-01-01 01:02:00,2023-01-01 02:02:01,60.016667


In [35]:
df = df[(df.duration >= 1) & (df.duration <= 60)].copy()
df

Unnamed: 0,PULocationID,DOLocationID,tpep_pickup_datetime,tpep_dropoff_datetime,duration
0,,,2023-01-01 01:01:00,2023-01-01 01:10:00,9.0
1,1.0,1.0,2023-01-01 01:02:00,2023-01-01 01:10:00,8.0


In [36]:

df[categorical] = df[categorical].fillna(-1).astype('int').astype('str')
df
    # df['ride_id'] = f'{year:04d}/{month:02d}_' + df.index.astype('str')

Unnamed: 0,PULocationID,DOLocationID,tpep_pickup_datetime,tpep_dropoff_datetime,duration
0,-1,-1,2023-01-01 01:01:00,2023-01-01 01:10:00,9.0
1,1,1,2023-01-01 01:02:00,2023-01-01 01:10:00,8.0


In [None]:


data1 = [
    ("-1", "-1", dt(1, 1), dt(1, 10), 9.0),
    ("1", "1", dt(1, 2), dt(1, 10), 8.0),
]

columns1 = ['PULocationID', 'DOLocationID', 'tpep_pickup_datetime', 'tpep_dropoff_datetime', 'duration']

expected_result = pd.DataFrame(data1, columns=columns1)
print(f'Expected:\n{expected_result}')

assert (actual_result==expected_result).all(True).to_list()==[True, True]

In [1]:
import os
import batch
import pandas as pd
from datetime import datetime

In [2]:
def dt(hour, minute, second=0):
    return datetime(2023, 1, 1, hour, minute, second)

def get_s3_options():
    if os.getenv("S3_ENDPOINT_URL"):
        options = {
            'client_kwargs': {
            'endpoint_url': os.getenv("S3_ENDPOINT_URL")
            }
        }
        return options
    return None


def save_data(path: str, df: pd.DataFrame):
  options = get_s3_options() if path.startswith("s3://") else None
  df.to_parquet(path, engine='pyarrow', index=False, storage_options=options)

In [3]:
def save_data(path: str, df: pd.DataFrame):
  options = get_s3_options() if path.startswith("s3://") else None
  df.to_parquet(path, engine='pyarrow', index=False, storage_options=options)


def test_saving(path):
    data = [
        (None, None, dt(1, 1), dt(1, 10)),
        (1, 1, dt(1, 2), dt(1, 10)),
        (1, None, dt(1, 2, 0), dt(1, 2, 59)),
        (3, 4, dt(1, 2, 0), dt(2, 2, 1)),      
    ]
    
    columns = ['PULocationID', 'DOLocationID', 'tpep_pickup_datetime', 'tpep_dropoff_datetime']
    df = pd.DataFrame(data, columns=columns)
    categorical = ['PULocationID', 'DOLocationID']
    os.environ['S3_ENDPOINT_URL']="http://localhost:4566"
    save_data(path, df)

In [4]:
path = "s3://nyc-duration/yellow/2023/01/predictions.parquet"

In [8]:
test_saving(path)

In [9]:
os.environ['INPUT_FILE_PATTERN'] = path

In [13]:
batch.main(2023, 1)

https://scikit-learn.org/stable/model_persistence.html#security-maintainability-limitations
https://scikit-learn.org/stable/model_persistence.html#security-maintainability-limitations


predicted mean duration: 18.138625226015364


In [14]:
df = batch.read_data(path)

In [16]:
df

Unnamed: 0,PULocationID,DOLocationID,tpep_pickup_datetime,tpep_dropoff_datetime
0,,,2023-01-01 01:01:00,2023-01-01 01:10:00
1,1.0,1.0,2023-01-01 01:02:00,2023-01-01 01:10:00
2,1.0,,2023-01-01 01:02:00,2023-01-01 01:02:59
3,3.0,4.0,2023-01-01 01:02:00,2023-01-01 02:02:01


In [15]:
predicted_durations = df['predicted_duration'].sum()

KeyError: 'predicted_duration'