In [1]:
import pandas as pd
import pyarrow.parquet as pq
from time import time

In [2]:
pd.__version__


'2.2.1'

In [3]:
pq.read_metadata("yellow_tripdata_2023-01.parquet")


<pyarrow._parquet.FileMetaData object at 0x7f0c51aec630>
  created_by: parquet-cpp-arrow version 8.0.0
  num_columns: 19
  num_rows: 3066766
  num_row_groups: 1
  format_version: 1.0
  serialized_size: 10386

In [4]:
file = pq.ParquetFile('yellow_tripdata_2023-01.parquet')
table = file.read()
table.schema


VendorID: int64
tpep_pickup_datetime: timestamp[us]
tpep_dropoff_datetime: timestamp[us]
passenger_count: double
trip_distance: double
RatecodeID: double
store_and_fwd_flag: string
PULocationID: int64
DOLocationID: int64
payment_type: int64
fare_amount: double
extra: double
mta_tax: double
tip_amount: double
tolls_amount: double
improvement_surcharge: double
total_amount: double
congestion_surcharge: double
airport_fee: double
-- schema metadata --
pandas: '{"index_columns": [], "column_indexes": [], "columns": [{"name":' + 2492

In [5]:
# Convert to pandas and check data 
df = table.to_pandas()
df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 3066766 entries, 0 to 3066765
Data columns (total 19 columns):
 #   Column                 Dtype         
---  ------                 -----         
 0   VendorID               int64         
 1   tpep_pickup_datetime   datetime64[us]
 2   tpep_dropoff_datetime  datetime64[us]
 3   passenger_count        float64       
 4   trip_distance          float64       
 5   RatecodeID             float64       
 6   store_and_fwd_flag     object        
 7   PULocationID           int64         
 8   DOLocationID           int64         
 9   payment_type           int64         
 10  fare_amount            float64       
 11  extra                  float64       
 12  mta_tax                float64       
 13  tip_amount             float64       
 14  tolls_amount           float64       
 15  improvement_surcharge  float64       
 16  total_amount           float64       
 17  congestion_surcharge   float64       
 18  airport_fee           

In [6]:
!pip install psycopg2



In [8]:
from sqlalchemy import create_engine

engine = create_engine('postgresql://root:root@localhost:5432/ny_taxi')
engine.connect()

<sqlalchemy.engine.base.Connection at 0x7f0c08dcd480>

In [16]:
# Generate CREATE SQL statement from schema for validation (DDL STATEMENT)
#without con=engine it will return a different DDL which is adequate for sql
print(pd.io.sql.get_schema(df, name='yellow_taxi_data', con=engine))
# we can put this directly in postgres


CREATE TABLE yellow_taxi_data (
	"VendorID" BIGINT, 
	tpep_pickup_datetime TIMESTAMP WITHOUT TIME ZONE, 
	tpep_dropoff_datetime TIMESTAMP WITHOUT TIME ZONE, 
	passenger_count FLOAT(53), 
	trip_distance FLOAT(53), 
	"RatecodeID" FLOAT(53), 
	store_and_fwd_flag TEXT, 
	"PULocationID" BIGINT, 
	"DOLocationID" BIGINT, 
	payment_type BIGINT, 
	fare_amount FLOAT(53), 
	extra FLOAT(53), 
	mta_tax FLOAT(53), 
	tip_amount FLOAT(53), 
	tolls_amount FLOAT(53), 
	improvement_surcharge FLOAT(53), 
	total_amount FLOAT(53), 
	congestion_surcharge FLOAT(53), 
	airport_fee FLOAT(53)
)




In [17]:
len(df)

3066766

In [18]:
#This part is for testing

# Creating batches of 100,000 for the paraquet file
batches_iter = file.iter_batches(batch_size=100000)
batches_iter

# Take the first batch for testing
df = next(batches_iter).to_pandas()
df

# Creating just the table in postgres
df.head(0).to_sql(name='ny_taxi_data',con=engine, if_exists='replace')

0

In [19]:
%time df.to_sql(name='ny_taxi_data',con=engine, if_exists='append')

CPU times: total: 3.62 s
Wall time: 10.4 s


1000

In [20]:
# Insert values into the table 
t_start = time()
count = 0
for batch in file.iter_batches(batch_size=100000):
    count+=1
    batch_df = batch.to_pandas()
    print(f'inserting batch {count}...')
    b_start = time()
    
    batch_df.to_sql(name='ny_taxi_data',con=engine, if_exists='append')
    b_end = time()
    print(f'inserted! time taken {b_end-b_start:10.3f} seconds.\n')
    
t_end = time()   
print(f'Completed! Total time taken was {t_end-t_start:10.3f} seconds for {count} batches.')   

inserting batch 1...
inserted! time taken      9.276 seconds.

inserting batch 2...
inserted! time taken      9.404 seconds.

inserting batch 3...
inserted! time taken      9.392 seconds.

inserting batch 4...
inserted! time taken      9.612 seconds.

inserting batch 5...
inserted! time taken      9.670 seconds.

inserting batch 6...
inserted! time taken      9.628 seconds.

inserting batch 7...
inserted! time taken      9.507 seconds.

inserting batch 8...
inserted! time taken      9.771 seconds.

inserting batch 9...
inserted! time taken      9.816 seconds.

inserting batch 10...
inserted! time taken      9.339 seconds.

inserting batch 11...
inserted! time taken      9.717 seconds.

inserting batch 12...
inserted! time taken     10.181 seconds.

inserting batch 13...
inserted! time taken      9.809 seconds.

inserting batch 14...
inserted! time taken      9.993 seconds.

inserting batch 15...
inserted! time taken     10.176 seconds.

inserting batch 16...
inserted! time taken      9