In [1]:
import time
from sqlalchemy import create_engine
engine = create_engine('postgresql+psycopg://user:pass@localhost:5432/my_db')
file_path = './taxi_data.parquet'
table_name = 'yellow_taxi_data'

In [2]:
import pandas as pd
df = pd.read_parquet(file_path) #Loads the complete file in memory.
df.columns = [col.lower().replace(' ', '_') for col in df.columns] #Converting all column names to lowercase for consistency (Postgres likes this!)
df['passenger_count'] = df['passenger_count'].fillna(0).astype('int32') #Handling missing values and optimizing data types (example!)


In [3]:
df.shape

(3475226, 20)

In [4]:
df.dtypes

vendorid                          int32
tpep_pickup_datetime     datetime64[us]
tpep_dropoff_datetime    datetime64[us]
passenger_count                   int32
trip_distance                   float64
ratecodeid                      float64
store_and_fwd_flag               object
pulocationid                      int32
dolocationid                      int32
payment_type                      int64
fare_amount                     float64
extra                           float64
mta_tax                         float64
tip_amount                      float64
tolls_amount                    float64
improvement_surcharge           float64
total_amount                    float64
congestion_surcharge            float64
airport_fee                     float64
cbd_congestion_fee              float64
dtype: object

In [5]:
df.head()

Unnamed: 0,vendorid,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,ratecodeid,store_and_fwd_flag,pulocationid,dolocationid,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge,airport_fee,cbd_congestion_fee
0,1,2025-01-01 00:18:38,2025-01-01 00:26:59,1,1.6,1.0,N,229,237,1,10.0,3.5,0.5,3.0,0.0,1.0,18.0,2.5,0.0,0.0
1,1,2025-01-01 00:32:40,2025-01-01 00:35:13,1,0.5,1.0,N,236,237,1,5.1,3.5,0.5,2.02,0.0,1.0,12.12,2.5,0.0,0.0
2,1,2025-01-01 00:44:04,2025-01-01 00:46:01,1,0.6,1.0,N,141,141,1,5.1,3.5,0.5,2.0,0.0,1.0,12.1,2.5,0.0,0.0
3,2,2025-01-01 00:14:27,2025-01-01 00:20:01,3,0.52,1.0,N,244,244,2,7.2,1.0,0.5,0.0,0.0,1.0,9.7,0.0,0.0,0.0
4,2,2025-01-01 00:21:34,2025-01-01 00:25:06,3,0.66,1.0,N,244,116,2,5.8,1.0,0.5,0.0,0.0,1.0,8.3,0.0,0.0,0.0


In [6]:
df.describe()

Unnamed: 0,vendorid,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,ratecodeid,pulocationid,dolocationid,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge,airport_fee,cbd_congestion_fee
count,3475226.0,3475226,3475226,3475226.0,3475226.0,2935077.0,3475226.0,3475226.0,3475226.0,3475226.0,3475226.0,3475226.0,3475226.0,3475226.0,3475226.0,3475226.0,2935077.0,2935077.0,3475226.0
mean,1.785428,2025-01-17 11:02:55.910964,2025-01-17 11:17:56.997901,1.096135,5.855126,2.482535,165.1916,164.1252,1.036623,17.0818,1.317737,0.4780991,2.959813,0.4493081,0.9547946,25.61129,2.225237,0.1239111,0.4834093
min,1.0,2024-12-31 20:47:55,2024-12-18 07:52:40,0.0,0.0,1.0,1.0,1.0,0.0,-900.0,-7.5,-0.5,-86.0,-126.94,-1.0,-901.0,-2.5,-1.75,-0.75
25%,2.0,2025-01-10 07:59:01,2025-01-10 08:15:29.500000,1.0,0.98,1.0,132.0,113.0,1.0,8.6,0.0,0.5,0.0,0.0,1.0,15.2,2.5,0.0,0.0
50%,2.0,2025-01-17 15:41:33,2025-01-17 15:59:34,1.0,1.67,1.0,162.0,162.0,1.0,12.11,0.0,0.5,2.45,0.0,1.0,19.95,2.5,0.0,0.75
75%,2.0,2025-01-24 19:34:06,2025-01-24 19:48:31,1.0,3.1,1.0,234.0,234.0,1.0,19.5,2.5,0.5,3.93,0.0,1.0,27.78,2.5,0.0,0.75
max,7.0,2025-02-01 00:00:44,2025-02-01 23:44:11,9.0,276423.6,99.0,265.0,265.0,5.0,863372.1,15.0,10.5,400.0,170.94,1.0,863380.4,2.5,6.75,0.75
std,0.4263282,,,0.8349488,564.6016,11.63277,64.52948,69.40169,0.7013334,463.4729,1.861509,0.1374623,3.779681,2.002582,0.2781938,463.6585,0.9039932,0.472509,0.3619307


In [7]:
#Column fields and its dtypes as originally in the parquet file. this is for telling exactly how the table should look when created.
import pyarrow.parquet as pq
parquet_file = pq.ParquetFile(file_path) #not importing the whole file, just it's pointer for reading it's metadata.
parquet_file.schema_arrow


VendorID: int32
tpep_pickup_datetime: timestamp[us]
tpep_dropoff_datetime: timestamp[us]
passenger_count: int64
trip_distance: double
RatecodeID: int64
store_and_fwd_flag: large_string
PULocationID: int32
DOLocationID: int32
payment_type: int64
fare_amount: double
extra: double
mta_tax: double
tip_amount: double
tolls_amount: double
improvement_surcharge: double
total_amount: double
congestion_surcharge: double
Airport_fee: double
cbd_congestion_fee: double

In [None]:
# # Method 2: better for RAM Management
# parquet_iter = parquet_file.iter_batches(batch_size=100000) #Iterating over parquet file pointer batch wise so that memory doesn't fill up too quickly.
# first_batch = True
# start_time = time.time()
# total_rows = 0

# for batch in parquet_iter:
#     batch_df = batch.to_pandas()
#     batch_df.columns = [col.lower().replace(' ', '_') for col in batch_df.columns] #Converting all column names to lowercase for consistency (Postgres likes this!)
#     batch_df['passenger_count'] = batch_df['passenger_count'].fillna(0).astype('int32') #Handling missing values and optimizing data types (example!)
#     if first_batch:
#         batch_df.to_sql(name=table_name, con=engine, if_exists='replace', index=False) #first time
#         first_batch = False
#     else:
#         batch_df.to_sql(name=table_name, con=engine, if_exists='append', index=False) #remaining batches
#     total_rows += len(batch_df)
#     print(f"{total_rows} ingested till now.")
# end_time = time.time()
# duration = end_time - start_time
# print(f"Done. Total time taken: {duration}, for ingesting {total_rows} rows.")


100000 ingested till now.
200000 ingested till now.
300000 ingested till now.
400000 ingested till now.
500000 ingested till now.
600000 ingested till now.
700000 ingested till now.
800000 ingested till now.
900000 ingested till now.
1000000 ingested till now.
1100000 ingested till now.
1200000 ingested till now.
1300000 ingested till now.
1400000 ingested till now.
1500000 ingested till now.
1600000 ingested till now.
1700000 ingested till now.
1800000 ingested till now.
1900000 ingested till now.
2000000 ingested till now.
2100000 ingested till now.
2200000 ingested till now.
2300000 ingested till now.
2400000 ingested till now.
2500000 ingested till now.
2600000 ingested till now.
2700000 ingested till now.
2800000 ingested till now.
2900000 ingested till now.
3000000 ingested till now.
3100000 ingested till now.
3200000 ingested till now.
3300000 ingested till now.
3400000 ingested till now.
3475226 ingested till now.
Done. Total time taken: 145.22914052009583, for ingesting 347522

In [3]:
import fsspec
import pyarrow.parquet as pq

fs = fsspec.filesystem("https")
with fs.open(
    "https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2024-01.parquet"
) as f:
    file = pq.ParquetFile(f)

file.schema_arrow

VendorID: int32
tpep_pickup_datetime: timestamp[us]
tpep_dropoff_datetime: timestamp[us]
passenger_count: int64
trip_distance: double
RatecodeID: int64
store_and_fwd_flag: large_string
PULocationID: int32
DOLocationID: int32
payment_type: int64
fare_amount: double
extra: double
mta_tax: double
tip_amount: double
tolls_amount: double
improvement_surcharge: double
total_amount: double
congestion_surcharge: double
Airport_fee: double