In [7]:
import pandas as pd


In [13]:
prefix = 'https://d37ci6vzurychx.cloudfront.net/trip-data/'

df = pd.read_parquet(prefix + 'green_tripdata_2025-11.parquet')

df.head()

Unnamed: 0,VendorID,lpep_pickup_datetime,lpep_dropoff_datetime,store_and_fwd_flag,RatecodeID,PULocationID,DOLocationID,passenger_count,trip_distance,fare_amount,...,mta_tax,tip_amount,tolls_amount,ehail_fee,improvement_surcharge,total_amount,payment_type,trip_type,congestion_surcharge,cbd_congestion_fee
0,2,2025-11-01 00:34:48,2025-11-01 00:41:39,N,1.0,74,42,1.0,0.74,7.2,...,0.5,1.94,0.0,,1.0,11.64,1.0,1.0,0.0,0.0
1,2,2025-11-01 00:18:52,2025-11-01 00:24:27,N,1.0,74,42,2.0,0.95,7.2,...,0.5,0.0,0.0,,1.0,9.7,2.0,1.0,0.0,0.0
2,2,2025-11-01 01:03:14,2025-11-01 01:15:24,N,1.0,83,160,1.0,2.19,13.5,...,0.5,5.0,0.0,,1.0,21.0,1.0,1.0,0.0,0.0
3,2,2025-11-01 00:10:57,2025-11-01 00:24:53,N,1.0,166,127,1.0,5.44,24.7,...,0.5,0.5,0.0,,1.0,27.7,1.0,1.0,0.0,0.0
4,1,2025-11-01 00:03:48,2025-11-01 00:19:38,N,1.0,166,262,1.0,3.2,18.4,...,1.5,1.0,0.0,,1.0,24.65,1.0,1.0,2.75,0.0


In [10]:
df.dtypes

VendorID                          int32
lpep_pickup_datetime     datetime64[us]
lpep_dropoff_datetime    datetime64[us]
store_and_fwd_flag                  str
RatecodeID                      float64
PULocationID                      int32
DOLocationID                      int32
passenger_count                 float64
trip_distance                   float64
fare_amount                     float64
extra                           float64
mta_tax                         float64
tip_amount                      float64
tolls_amount                    float64
ehail_fee                       float64
improvement_surcharge           float64
total_amount                    float64
payment_type                    float64
trip_type                       float64
congestion_surcharge            float64
cbd_congestion_fee              float64
dtype: object

In [11]:
df.shape

(46912, 21)

In [18]:
# enforcing datatypes (such as ratecodeID, which came in as a float)
dtype_map = {
    "VendorID": "Int64",
    "passenger_count": "Int64",
    "RatecodeID": "Int64",
    "PULocationID": "Int64",
    "DOLocationID": "Int64",
    "payment_type": "Int64",
    "store_and_fwd_flag": "string",

    "trip_distance": "float64",
    "fare_amount": "float64",
    "extra": "float64",
    "mta_tax": "float64",
    "tip_amount": "float64",
    "tolls_amount": "float64",
    "improvement_surcharge": "float64",
    "total_amount": "float64",
    "congestion_surcharge": "float64",
    "cbd_congestion_fee": "float64"
}

df = df.astype(dtype_map)

In [19]:
# fixed
df.dtypes

VendorID                          Int64
lpep_pickup_datetime     datetime64[us]
lpep_dropoff_datetime    datetime64[us]
store_and_fwd_flag               string
RatecodeID                        Int64
PULocationID                      Int64
DOLocationID                      Int64
passenger_count                   Int64
trip_distance                   float64
fare_amount                     float64
extra                           float64
mta_tax                         float64
tip_amount                      float64
tolls_amount                    float64
ehail_fee                       float64
improvement_surcharge           float64
total_amount                    float64
payment_type                      Int64
trip_type                       float64
congestion_surcharge            float64
cbd_congestion_fee              float64
dtype: object

In [52]:
import pyarrow.parquet as pq
import pandas as pd
from sqlalchemy import create_engine
from tqdm.auto import tqdm
from pathlib import Path


engine = create_engine("postgresql://root:root@localhost:5432/ny_taxi")

# Enforce dtypes (nullable-safe)
dtype_map = {
    "VendorID": "Int64",
    "passenger_count": "Int64",
    "RatecodeID": "Int64",
    "PULocationID": "Int64",
    "DOLocationID": "Int64",
    "payment_type": "Int64",
    "store_and_fwd_flag": "string",

    "trip_distance": "float64",
    "fare_amount": "float64",
    "extra": "float64",
    "mta_tax": "float64",
    "tip_amount": "float64",
    "tolls_amount": "float64",
    "improvement_surcharge": "float64",
    "total_amount": "float64",
    "congestion_surcharge": "float64",
    "cbd_congestion_fee": "float64"
}

# Green taxi typically uses lpep_* datetimes, but we'll handle either safely
date_cols = [
    "lpep_pickup_datetime",
    "lpep_dropoff_datetime",
]

DATA_DIR = Path("data/raw")
parquet_path = DATA_DIR / "green_tripdata_2025-11.parquet"
table_name = "green_taxi_data"
batch_size = 10_000

pf = pq.ParquetFile(parquet_path)

first = True

pbar = tqdm(total=pf.metadata.num_rows)

for batch in tqdm(pf.iter_batches(batch_size=batch_size)):
    df = batch.to_pandas()
    # Progress bar updates by rows instead of batches
    pbar.update(len(df))
    # Datetime coercion (only if the columns exist)
    for c in date_cols:
        if c in df.columns:
            df[c] = pd.to_datetime(df[c], errors="coerce")

    # Enforce dtypes (only for columns that exist in this parquet)
    for col, dt in dtype_map.items():
        if col not in df.columns:
            continue

        if dt == "Int64":
            df[col] = pd.to_numeric(df[col], errors="coerce").astype("Int64")
        elif dt == "float64":
            df[col] = pd.to_numeric(df[col], errors="coerce").astype("float64")
        elif dt == "string":
            df[col] = df[col].astype("string")

    # Create table on first batch (schema only)
    if first:
        df.head(0).to_sql(name=table_name, con=engine, if_exists="replace", index=False)
        first = False
        print("Table created:", table_name)

    # Append batch
    df.to_sql(
        name=table_name,
        con=engine,
        if_exists="append",
        index=False,
        method="multi",
        chunksize=10_000
    )

    print("Inserted:", len(df))


  0%|          | 0/46912 [00:00<?, ?it/s]

0it [00:00, ?it/s]

Table created: green_taxi_data
Inserted: 10000
Inserted: 10000
Inserted: 10000
Inserted: 10000
Inserted: 6912
