In [1]:
import time

import pandas as pd
from sqlalchemy import create_engine
from tqdm.auto import tqdm

pd.__version__

'1.5.2'

In [2]:
df = pd.read_parquet('data/yellow_tripdata_2021-01.parquet', engine='pyarrow')

In [3]:
print(f'   rows: {df.shape[0]}')
print(f'columns: {df.shape[1]}')

   rows: 1369769
columns: 19


In [4]:
engine = create_engine('postgresql://root:root@localhost:5432/ny_taxi')

In [5]:
print(pd.io.sql.get_schema(df, name='yellow_taxi_data', con=engine))


CREATE TABLE yellow_taxi_data (
	"VendorID" BIGINT, 
	tpep_pickup_datetime TIMESTAMP WITHOUT TIME ZONE, 
	tpep_dropoff_datetime TIMESTAMP WITHOUT TIME ZONE, 
	passenger_count FLOAT(53), 
	trip_distance FLOAT(53), 
	"RatecodeID" FLOAT(53), 
	store_and_fwd_flag TEXT, 
	"PULocationID" BIGINT, 
	"DOLocationID" BIGINT, 
	payment_type BIGINT, 
	fare_amount FLOAT(53), 
	extra FLOAT(53), 
	mta_tax FLOAT(53), 
	tip_amount FLOAT(53), 
	tolls_amount FLOAT(53), 
	improvement_surcharge FLOAT(53), 
	total_amount FLOAT(53), 
	congestion_surcharge FLOAT(53), 
	airport_fee FLOAT(53)
)




In [6]:
df.head()

Unnamed: 0,VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge,airport_fee
0,1,2021-01-01 00:30:10,2021-01-01 00:36:12,1.0,2.1,1.0,N,142,43,2,8.0,3.0,0.5,0.0,0.0,0.3,11.8,2.5,
1,1,2021-01-01 00:51:20,2021-01-01 00:52:19,1.0,0.2,1.0,N,238,151,2,3.0,0.5,0.5,0.0,0.0,0.3,4.3,0.0,
2,1,2021-01-01 00:43:30,2021-01-01 01:11:06,1.0,14.7,1.0,N,132,165,1,42.0,0.5,0.5,8.65,0.0,0.3,51.95,0.0,
3,1,2021-01-01 00:15:48,2021-01-01 00:31:01,0.0,10.6,1.0,N,138,132,1,29.0,0.5,0.5,6.05,0.0,0.3,36.35,0.0,
4,2,2021-01-01 00:31:49,2021-01-01 00:48:21,1.0,4.94,1.0,N,68,33,1,16.5,0.5,0.5,4.06,0.0,0.3,24.36,2.5,


In [11]:
def ingest_data(df):
    # create the table
    t_start = time.time()
    df.head(n=0).to_sql(name='yellow_taxi_data', con=engine, if_exists='replace')
    t_end = time.time()

    chunksize = 10_000
    max_size = df.shape[0]
    last_run = False
    start = 0
    current = chunksize

    t_start = time.time()
    # initialize progrogress bar
    with tqdm(total=max_size, unit='steps', unit_scale=True) as pbar:
        while not last_run:
            # insert chunks
            df.iloc[start:current].to_sql(
                name='yellow_taxi_data',
                con=engine,
                if_exists='append',
                method='multi'
            )

            start = current
            current += chunksize
            if current > max_size:
                current = max_size
                last_run = True
            pbar.update(chunksize)
    t_end = time.time()
    print(f"Finished ingesting data into the postgres database, {t_end - t_start:.3f} seconds")

In [12]:
ingest_data(df)

  0%|          | 0.00/1.37M [00:00<?, ?steps/s]

Finished ingesting data into the postgres database, 379.230 seconds
