In [23]:
import pandas as pd
from sqlalchemy import create_engine

In [24]:
trip_path = r"data/green_tripdata_2025_11.parquet"
df = pd.read_parquet(trip_path)
df.head(10)

Unnamed: 0,VendorID,lpep_pickup_datetime,lpep_dropoff_datetime,store_and_fwd_flag,RatecodeID,PULocationID,DOLocationID,passenger_count,trip_distance,fare_amount,...,mta_tax,tip_amount,tolls_amount,ehail_fee,improvement_surcharge,total_amount,payment_type,trip_type,congestion_surcharge,cbd_congestion_fee
0,2,2025-11-01 00:34:48,2025-11-01 00:41:39,N,1.0,74,42,1.0,0.74,7.2,...,0.5,1.94,0.0,,1.0,11.64,1.0,1.0,0.0,0.0
1,2,2025-11-01 00:18:52,2025-11-01 00:24:27,N,1.0,74,42,2.0,0.95,7.2,...,0.5,0.0,0.0,,1.0,9.7,2.0,1.0,0.0,0.0
2,2,2025-11-01 01:03:14,2025-11-01 01:15:24,N,1.0,83,160,1.0,2.19,13.5,...,0.5,5.0,0.0,,1.0,21.0,1.0,1.0,0.0,0.0
3,2,2025-11-01 00:10:57,2025-11-01 00:24:53,N,1.0,166,127,1.0,5.44,24.7,...,0.5,0.5,0.0,,1.0,27.7,1.0,1.0,0.0,0.0
4,1,2025-11-01 00:03:48,2025-11-01 00:19:38,N,1.0,166,262,1.0,3.2,18.4,...,1.5,1.0,0.0,,1.0,24.65,1.0,1.0,2.75,0.0
5,1,2025-11-01 00:42:13,2025-11-01 01:04:50,N,1.0,112,48,2.0,5.1,26.8,...,1.5,6.55,0.0,,1.0,39.35,1.0,1.0,2.75,0.75
6,2,2025-11-01 00:05:41,2025-11-01 00:39:20,N,1.0,83,87,1.0,9.8,43.6,...,0.5,9.92,0.0,,1.0,59.52,1.0,1.0,2.75,0.75
7,2,2025-11-01 00:42:14,2025-11-01 01:13:20,N,1.0,66,233,1.0,5.01,28.9,...,0.5,6.98,0.0,,1.0,41.88,1.0,1.0,2.75,0.75
8,2,2025-11-01 00:03:08,2025-11-01 00:06:27,N,1.0,223,223,1.0,0.63,5.1,...,0.5,1.52,0.0,,1.0,9.12,1.0,1.0,0.0,0.0
9,2,2025-11-01 00:56:33,2025-11-01 01:01:34,N,1.0,130,130,1.0,1.15,7.9,...,0.5,0.0,0.0,,1.0,10.4,2.0,1.0,0.0,0.0


In [25]:
dtype_spec = {
    "VendorID": "Int64",
    "lpep_pickup_datetime": "datetime64[ns]",
    "lpep_dropoff_datetime": "datetime64[ns]",
    "store_and_fwd_flag": "string",
    "RatecodeID": "Int64",
    "PULocationID": "Int64",
    "DOLocationID": "Int64",
    "passenger_count": "Int64",
    "trip_distance": "float64",
    "fare_amount": "float64",
    "extra": "float64",
    "mta_tax": "float64",
    "tip_amount": "float64",
    "tolls_amount": "float64",
    "ehail_fee": "float64",
    "improvement_surcharge": "float64",
    "total_amount": "float64",
    "payment_type": "Int64",
    "trip_type": "Int64",
    "congestion_surcharge": "float64",
    "cbd_congestion_fee": "float64"
}

# Apply conversions for columns that exist in the DataFrame
for col, target_dtype in dtype_spec.items():
    if col in df.columns:
        if "datetime" in str(target_dtype):
            df[col] = pd.to_datetime(df[col])
        elif target_dtype == "string":
            df[col] = df[col].astype("string")
        else:
            df[col] = df[col].astype(target_dtype)

engine = create_engine('postgresql://root:root@localhost:5432/ny_taxi')
print(pd.io.sql.get_schema(df, name='green_trip_data', con=engine))


CREATE TABLE green_trip_data (
	"VendorID" BIGINT, 
	lpep_pickup_datetime TIMESTAMP WITHOUT TIME ZONE, 
	lpep_dropoff_datetime TIMESTAMP WITHOUT TIME ZONE, 
	store_and_fwd_flag TEXT, 
	"RatecodeID" BIGINT, 
	"PULocationID" BIGINT, 
	"DOLocationID" BIGINT, 
	passenger_count BIGINT, 
	trip_distance FLOAT(53), 
	fare_amount FLOAT(53), 
	extra FLOAT(53), 
	mta_tax FLOAT(53), 
	tip_amount FLOAT(53), 
	tolls_amount FLOAT(53), 
	ehail_fee FLOAT(53), 
	improvement_surcharge FLOAT(53), 
	total_amount FLOAT(53), 
	payment_type BIGINT, 
	trip_type BIGINT, 
	congestion_surcharge FLOAT(53), 
	cbd_congestion_fee FLOAT(53)
)




In [26]:
len(df)

46912

In [27]:
df.head(n=0).to_sql(name='green_trip_data', con=engine, if_exists='replace')

0

In [30]:
from tqdm.auto import tqdm
import numpy as np

# Read the entire Parquet file first
df = pd.read_parquet(trip_path)

# Define chunk size (e.g., 100,000 rows per chunk)
chunk_size = 10000

# Calculate number of chunks
n_chunks = int(np.ceil(len(df) / chunk_size))

first = True

# Loop through chunks
for i in tqdm(range(n_chunks)):
    # Get chunk
    start_idx = i * chunk_size
    end_idx = min((i + 1) * chunk_size, len(df))
    df_chunk = df.iloc[start_idx:end_idx]
    
    if first:
        # Create table schema (no data)
        df_chunk.head(0).to_sql(
            name="green_trip_data",
            con=engine,
            if_exists="replace"
        )
        first = False
        print("Table created")
    
    # Insert chunk
    df_chunk.to_sql(
        name="green_trip_data",
        con=engine,
        if_exists="append"
    )
    
    print(f"Inserted chunk {i+1}/{n_chunks}: {len(df_chunk)} rows")

  0%|          | 0/1 [00:00<?, ?it/s]

Table created
Inserted chunk 1/1: 46912 rows
