### DATA LOADING 

Here we will be working with a .parquet file to load the data into our database 

- Here are the steps -->
    - Check metadata and table datatypes
    - Convert the parquet file to pandas dataframe and check data types additionally check the data dictionary to make sure we have the correct data types, pandas will create the table in our database automatically.
    - Generate a DDL CREATE statement
    - Create a connection to our database unsing SQLAlchemy
    - Convert our HUGE paraquet file into a iterable that loads batches of 100,000 rows to our database.

### IMPORTING LIBRARIES 

In [None]:
import pandas as pd 
import pyarrow.parquet as pq
from time import time

In [None]:
# store file in file variable 
file = pq.ParquetFile('yellow_tripdata_2024-01.parquet')
# read table 
table = file.read()
# convert to pandas
# The to_pandas() function is part of 
# PyArrow and is used to convert a pyarrow.
# Table into a pandas DataFrame.
df = table.to_pandas()



### CREATING ENGINE TO CONNECT TL DATABASE

In [11]:
from sqlalchemy import create_engine 
# dialect+driver://username:password@host:port/database
engine = create_engine('postgresql://root:root@localhost:5432/ny_taxi')
engine.connect()

<sqlalchemy.engine.base.Connection at 0x719ca63d8dd0>

### DDL
pd.io.sql.get_schema(df, name, con)

In [12]:
print(pd.io.sql.get_schema(df, name='yellow_taxi_data', con=engine))


CREATE TABLE yellow_taxi_data (
	"VendorID" INTEGER, 
	tpep_pickup_datetime TIMESTAMP WITHOUT TIME ZONE, 
	tpep_dropoff_datetime TIMESTAMP WITHOUT TIME ZONE, 
	passenger_count FLOAT(53), 
	trip_distance FLOAT(53), 
	"RatecodeID" FLOAT(53), 
	store_and_fwd_flag TEXT, 
	"PULocationID" INTEGER, 
	"DOLocationID" INTEGER, 
	payment_type BIGINT, 
	fare_amount FLOAT(53), 
	extra FLOAT(53), 
	mta_tax FLOAT(53), 
	tip_amount FLOAT(53), 
	tolls_amount FLOAT(53), 
	improvement_surcharge FLOAT(53), 
	total_amount FLOAT(53), 
	congestion_surcharge FLOAT(53), 
	"Airport_fee" FLOAT(53)
)




### INSERTING DATA

There are 2,846,722 rows in our dataset. We are going to use the 
parquet_file.iter_batches() function to create batches of 100,000,
convert them into pandas and then load it into the postgres database.

In [13]:
# Get start time from time()
t_start = time() 
# counter for batches
count = 0
# for loop to load batches 
for batch in file.iter_batches(batch_size=100000):
    count+=1 # increment count per batch
    # convert batch  into a pandas DataFrame.
    batch_df = batch.to_pandas()
    b_start = time()
    print(f"Inserting batch {count}")
    b_start = time()
    # The .to_sql() method in pandas is used to write a DataFrame to a SQL database. 
    batch_df.to_sql(name="ny_taxi_data", con=engine, if_exists='append')
    b_end = time()
    print(f"Inserted time takes {b_end-b_start:10.3f} seconds.\n")
t_end = time()
print(f"Completed! Total time taken was {t_end-t_start:10.3f} seconds for {count} batches.")

Inserting batch 1
Inserted time takes     10.310 seconds.

Inserting batch 2
Inserted time takes      9.845 seconds.

Inserting batch 3
Inserted time takes      9.936 seconds.

Inserting batch 4
Inserted time takes     10.352 seconds.

Inserting batch 5
Inserted time takes     10.103 seconds.

Inserting batch 6
Inserted time takes      9.477 seconds.

Inserting batch 7
Inserted time takes     10.283 seconds.

Inserting batch 8
Inserted time takes      9.784 seconds.

Inserting batch 9
Inserted time takes     10.275 seconds.

Inserting batch 10
Inserted time takes     10.198 seconds.

Inserting batch 11
Inserted time takes     10.051 seconds.

Inserting batch 12
Inserted time takes     10.100 seconds.

Inserting batch 13
Inserted time takes     10.482 seconds.

Inserting batch 14
Inserted time takes     10.013 seconds.

Inserting batch 15
Inserted time takes     10.757 seconds.

Inserting batch 16
Inserted time takes     10.041 seconds.

Inserting batch 17
Inserted time takes     10.111