In [None]:
import os
from pathlib import Path

import aiobotocore
import pandas as pd
from dotenv import load_dotenv
from numpy import dtype
from s3fs import S3FileSystem
from sklearn.feature_extraction import DictVectorizer
from sklearn.model_selection import train_test_split


In [None]:
load_dotenv()
[os.getenv("AWS_PROFILE"), os.getenv("TRAINING_DIR")]

In [None]:
tmpdir = Path.cwd().parent / "tmp"
s3 = S3FileSystem(session=aiobotocore.session.AioSession(profile=os.getenv("AWS_PROFILE")))
bucket_root = f"s3://{os.getenv("BUCKET_NAME")}/ny_taxi_trip_prediction"

if os.getenv("TRAINING_DIR"):
    training_base_dir = os.getenv("TRAINING_DIR")
else:
    training_base_dir = s3.read_text(f"{bucket_root}/current")

training_root = f"{bucket_root}/training/{training_base_dir}"

train_path = tmpdir / "train.parquet"
if not train_path.is_file():
    s3.get_file(training_root + "/train.parquet", train_path)
df_train_val_all = pd.read_parquet(train_path)

test_path = tmpdir / "test.parquet"
if not test_path.is_file():
    s3.get_file(training_root + "/test.parquet", test_path)
df_test_all = pd.read_parquet(test_path)


In [None]:
assert df_train_val_all['PULocationID'].dtypes is dtype("int32")
assert df_test_all['PULocationID'].dtypes is dtype("int32")

In [None]:
df_train_val_all.head()

In [None]:
df_train_val_all[df_train_val_all.index.duplicated()]

In [None]:
# def preprocess(df: pd.DataFrame, dict_vec: DictVectorizer | None) -> [any, any, DictVectorizer]:
#     """ Returns x, y, and dict vectorizer. If dict vectorizer was supplied in arguments, does not fit, only transforms.
#     """
#     dict_vec_fit = False
#     if dict_vec is None:
#         dict_vec = DictVectorizer()
#         dict_vec_fit = True
#
#     dfp = pd.DataFrame(index=df.index)
#     dfp["duration_min"] = (df["tpep_dropoff_datetime"] - df["tpep_pickup_datetime"]).apply(lambda timediff: timediff.total_seconds())
#     dfp = dfp[(dfp.duration_min >= 1) & (dfp.duration_min <= 60)]
#     dfp["loc_id"] = df["PULocationID"].astype(str) + "-" + df["DOLocationID"].astype(str)
#     dfp["trip_distance"] = df["trip_distance"]
#
#     dfp.reset_index(inplace=True, drop=True)
#
#     return dfp, dict_vec

In [57]:
def preprocess(df: pd.DataFrame, dict_vec: DictVectorizer | None = None) -> [any, any, DictVectorizer]:
    """ Returns x, y, and dict vectorizer. If dict vectorizer was supplied in arguments, does not fit, only transforms.
    """
    dict_vec_fit = False
    if dict_vec is None:
        dict_vec = DictVectorizer()
        dict_vec_fit = True

    tmpdf = pd.DataFrame(index=df.index)
    tmpdf["duration_min"] = (df["tpep_dropoff_datetime"] - df["tpep_pickup_datetime"]).apply(
        lambda timediff: timediff.total_seconds())
    tmpdf = tmpdf[(tmpdf["duration_min"] >= 1) & (tmpdf["duration_min"] <= 60)]
    tmpdf["loc_id"] = df["PULocationID"].astype(str) + "-" + df["DOLocationID"].astype(str)
    tmpdf["trip_distance"] = df["trip_distance"]
    print("tmpdf")
    display(tmpdf)

    x_dicts = tmpdf[["loc_id", "trip_distance"]].to_dict(orient="records")
    print("x_dicts")
    display(x_dicts)
    if dict_vec_fit:
        x = dict_vec.fit_transform(x_dicts)
    else:
        x = dict_vec.transform(x_dicts)
    print("x")
    display(x)
    y = tmpdf["duration_min"].values
    print("y")
    display(y)

    return x, y, dict_vec

In [58]:
# Take first 80% for training set, last 20% for validation set. Order DOES matter because we want to use later data for validation as it is in theory closer to reality
xy_train_all, xy_val_all = train_test_split(df_train_val_all, test_size=0.2, shuffle=False)
display(xy_train_all.head())
display(xy_val_all.head())

Unnamed: 0,VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge,Airport_fee
0,1,2024-07-01 00:34:56,2024-07-01 00:46:49,1.0,3.2,1.0,N,140,79,1,15.6,3.5,0.5,3.5,0.0,1.0,24.1,2.5,0.0
1,2,2024-06-30 23:48:58,2024-07-01 00:28:04,1.0,19.48,2.0,N,132,113,2,70.0,0.0,0.5,0.0,0.0,1.0,75.75,2.5,1.75
2,2,2024-07-01 00:23:18,2024-07-01 00:29:51,1.0,1.18,1.0,N,237,145,1,8.6,1.0,0.5,2.72,0.0,1.0,16.32,2.5,0.0
3,1,2024-07-01 00:10:33,2024-07-01 00:27:31,0.0,9.1,1.0,N,138,164,1,36.6,10.25,0.5,12.05,0.0,1.0,60.4,2.5,1.75
4,1,2024-07-01 00:07:55,2024-07-01 00:34:34,1.0,17.7,2.0,N,132,263,1,70.0,1.75,0.5,10.0,6.94,1.0,90.19,0.0,1.75


Unnamed: 0,VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge,Airport_fee
5292876,2,2024-06-21 21:23:04,2024-06-21 21:27:28,1.0,0.28,1.0,N,100,48,1,5.8,1.0,0.5,0.0,0.0,1.0,10.8,2.5,0.0
5292877,2,2024-06-21 21:44:07,2024-06-21 22:00:44,2.0,1.14,1.0,N,48,233,1,14.9,1.0,0.5,3.98,0.0,1.0,23.88,2.5,0.0
5292878,2,2024-06-21 21:46:01,2024-06-21 21:50:12,2.0,0.78,1.0,N,161,237,2,6.5,1.0,0.5,0.0,0.0,1.0,11.5,2.5,0.0
5292879,2,2024-06-21 21:27:12,2024-06-21 21:40:29,1.0,2.33,1.0,N,236,161,1,14.9,1.0,0.5,3.98,0.0,1.0,23.88,2.5,0.0
5292880,2,2024-06-21 21:57:42,2024-06-21 22:12:21,4.0,2.42,1.0,N,162,148,2,14.9,1.0,0.5,0.0,0.0,1.0,19.9,2.5,0.0


In [59]:
x_train, y_train, dv = preprocess(xy_train_all)

tmpdf


Unnamed: 0,duration_min,loc_id,trip_distance
19,1.0,264-264,0.00
92,13.0,239-239,0.00
129,15.0,231-231,0.00
205,11.0,132-132,0.00
325,11.0,50-50,0.00
...,...,...,...
5292392,4.0,170-170,0.00
5292733,5.0,10-10,0.00
5292787,20.0,79-79,0.03
5292788,20.0,79-79,0.03


x_dicts


[{'loc_id': '264-264', 'trip_distance': 0.0},
 {'loc_id': '239-239', 'trip_distance': 0.0},
 {'loc_id': '231-231', 'trip_distance': 0.0},
 {'loc_id': '132-132', 'trip_distance': 0.0},
 {'loc_id': '50-50', 'trip_distance': 0.0},
 {'loc_id': '80-80', 'trip_distance': 0.0},
 {'loc_id': '265-265', 'trip_distance': 0.0},
 {'loc_id': '113-113', 'trip_distance': 0.01},
 {'loc_id': '186-186', 'trip_distance': 0.0},
 {'loc_id': '132-132', 'trip_distance': 0.0},
 {'loc_id': '112-112', 'trip_distance': 5.8},
 {'loc_id': '162-162', 'trip_distance': 0.0},
 {'loc_id': '132-132', 'trip_distance': 0.0},
 {'loc_id': '107-107', 'trip_distance': 0.0},
 {'loc_id': '234-90', 'trip_distance': 0.02},
 {'loc_id': '263-263', 'trip_distance': 0.0},
 {'loc_id': '132-132', 'trip_distance': 0.15},
 {'loc_id': '87-87', 'trip_distance': 0.0},
 {'loc_id': '87-209', 'trip_distance': 0.0},
 {'loc_id': '230-230', 'trip_distance': 0.0},
 {'loc_id': '9-9', 'trip_distance': 0.0},
 {'loc_id': '226-226', 'trip_distance': 0.0

x


<Compressed Sparse Row sparse matrix of dtype 'float64'
	with 136314 stored elements and shape (68157, 1258)>

y


array([ 1., 13., 15., ..., 20., 20.,  5.])

In [60]:
x_val, y_val, _ = preprocess(xy_val_all, dv)

tmpdf


Unnamed: 0,duration_min,loc_id,trip_distance
5293038,32.0,107-79,0.10
5293277,6.0,138-138,0.00
5293278,6.0,138-138,0.00
5293434,30.0,186-186,0.01
5293435,30.0,186-186,0.01
...,...,...,...
6614505,7.0,88-88,0.00
6614672,60.0,234-234,0.00
6614718,6.0,137-137,0.00
6615536,30.0,100-100,0.04


x_dicts


[{'loc_id': '107-79', 'trip_distance': 0.1},
 {'loc_id': '138-138', 'trip_distance': 0.0},
 {'loc_id': '138-138', 'trip_distance': 0.0},
 {'loc_id': '186-186', 'trip_distance': 0.01},
 {'loc_id': '186-186', 'trip_distance': 0.01},
 {'loc_id': '68-68', 'trip_distance': 0.0},
 {'loc_id': '265-265', 'trip_distance': 0.0},
 {'loc_id': '90-90', 'trip_distance': 0.0},
 {'loc_id': '163-163', 'trip_distance': 0.0},
 {'loc_id': '68-68', 'trip_distance': 0.0},
 {'loc_id': '29-29', 'trip_distance': 0.0},
 {'loc_id': '231-231', 'trip_distance': 0.0},
 {'loc_id': '132-132', 'trip_distance': 0.0},
 {'loc_id': '132-132', 'trip_distance': 0.0},
 {'loc_id': '132-132', 'trip_distance': 0.0},
 {'loc_id': '142-142', 'trip_distance': 0.0},
 {'loc_id': '50-48', 'trip_distance': 0.15},
 {'loc_id': '238-238', 'trip_distance': 0.1},
 {'loc_id': '132-132', 'trip_distance': 0.0},
 {'loc_id': '132-132', 'trip_distance': 0.0},
 {'loc_id': '83-83', 'trip_distance': 0.0},
 {'loc_id': '239-264', 'trip_distance': 1.12

x


<Compressed Sparse Row sparse matrix of dtype 'float64'
	with 28180 stored elements and shape (14127, 1258)>

y


array([32.,  6.,  6., ...,  6., 30.,  7.])

In [61]:
x_test, y_test, _ = preprocess(df_test_all, dv)


tmpdf


Unnamed: 0,duration_min,loc_id,trip_distance
23,14.0,234-113,0.03
30,30.0,161-161,0.16
57,15.0,181-181,0.00
137,2.0,264-264,0.00
163,23.0,186-90,0.00
...,...,...,...
2977244,20.0,231-231,0.01
2977304,17.0,258-258,0.00
2977498,14.0,234-234,0.00
2977639,13.0,225-17,0.00


x_dicts


[{'loc_id': '234-113', 'trip_distance': 0.03},
 {'loc_id': '161-161', 'trip_distance': 0.16},
 {'loc_id': '181-181', 'trip_distance': 0.0},
 {'loc_id': '264-264', 'trip_distance': 0.0},
 {'loc_id': '186-90', 'trip_distance': 0.0},
 {'loc_id': '107-107', 'trip_distance': 0.0},
 {'loc_id': '143-239', 'trip_distance': 0.09},
 {'loc_id': '45-45', 'trip_distance': 0.0},
 {'loc_id': '132-132', 'trip_distance': 0.01},
 {'loc_id': '132-132', 'trip_distance': 0.01},
 {'loc_id': '132-132', 'trip_distance': 0.0},
 {'loc_id': '229-229', 'trip_distance': 0.07},
 {'loc_id': '261-261', 'trip_distance': 0.0},
 {'loc_id': '142-142', 'trip_distance': 0.0},
 {'loc_id': '87-87', 'trip_distance': 0.0},
 {'loc_id': '163-163', 'trip_distance': 0.0},
 {'loc_id': '161-161', 'trip_distance': 0.0},
 {'loc_id': '265-265', 'trip_distance': 0.0},
 {'loc_id': '234-234', 'trip_distance': 0.0},
 {'loc_id': '113-113', 'trip_distance': 0.0},
 {'loc_id': '265-265', 'trip_distance': 0.0},
 {'loc_id': '145-146', 'trip_dist

x


<Compressed Sparse Row sparse matrix of dtype 'float64'
	with 82379 stored elements and shape (41349, 1258)>

y


array([14., 30., 15., ..., 14., 13., 16.])