In [28]:
import pandas as pd
from sqlalchemy import create_engine
from time import time

# Loading Dataset and sending it to Postgres

In [None]:
# Converting Parquet file into csv format
df = pd.read_parquet("yellow_tripdata_2021-01.parquet")
df.to_csv("ny_taxi_data.csv", index=False)

In [18]:
# Loading first 100 rows of the csv-file into dataframe
df = pd.read_csv('ny_taxi_data.csv', nrows=100)
df

Unnamed: 0,VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge,airport_fee
0,1,2021-01-01 00:30:10,2021-01-01 00:36:12,1.0,2.10,1.0,N,142,43,2,8.0,3.0,0.5,0.00,0.0,0.3,11.80,2.5,
1,1,2021-01-01 00:51:20,2021-01-01 00:52:19,1.0,0.20,1.0,N,238,151,2,3.0,0.5,0.5,0.00,0.0,0.3,4.30,0.0,
2,1,2021-01-01 00:43:30,2021-01-01 01:11:06,1.0,14.70,1.0,N,132,165,1,42.0,0.5,0.5,8.65,0.0,0.3,51.95,0.0,
3,1,2021-01-01 00:15:48,2021-01-01 00:31:01,0.0,10.60,1.0,N,138,132,1,29.0,0.5,0.5,6.05,0.0,0.3,36.35,0.0,
4,2,2021-01-01 00:31:49,2021-01-01 00:48:21,1.0,4.94,1.0,N,68,33,1,16.5,0.5,0.5,4.06,0.0,0.3,24.36,2.5,
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
95,2,2021-01-01 00:12:41,2021-01-01 00:26:47,1.0,4.13,1.0,N,161,226,1,14.5,0.5,0.5,3.66,0.0,0.3,21.96,2.5,
96,2,2021-01-01 00:23:29,2021-01-01 00:35:03,2.0,4.12,1.0,N,162,74,2,13.5,0.5,0.5,0.00,0.0,0.3,17.30,2.5,
97,2,2021-01-01 00:46:17,2021-01-01 00:54:25,2.0,2.22,1.0,N,144,170,1,9.0,0.5,0.5,2.56,0.0,0.3,15.36,2.5,
98,2,2021-01-01 00:28:16,2021-01-01 00:51:44,1.0,7.11,1.0,N,264,264,2,23.5,0.5,0.5,0.00,0.0,0.3,24.80,0.0,


In [21]:
# Batching the csv-file, because it has more than 1,600,000 rows
df_iter = pd.read_csv('ny_taxi_data.csv', iterator=True, chunksize=100000)
df = next(df_iter)

In [25]:
# Converting string data into datetime format
df.tpep_pickup_datetime = pd.to_datetime(df.tpep_pickup_datetime)
df.tpep_dropoff_datetime = pd.to_datetime(df.tpep_dropoff_datetime)

In [15]:
# Creating engine for postgres
engine = create_engine('postgresql://root:root@localhost:5432/ny_taxi')

In [26]:
# Converting the column names into sql format
df.head(n=0).to_sql(name='yellow_taxi_data', con=engine, if_exists='replace')

0

In [27]:
# filling the SQL table with chunkwise data from the df
%time df.to_sql(name='yellow_taxi_data', con=engine, if_exists='append')

CPU times: user 2.68 s, sys: 61.7 ms, total: 2.75 s
Wall time: 5.42 s


1000

In [29]:
# Appending chunks of 100000 rows by looping through the interator
while True:
    t_start = time()
    df = next(df_iter)
    df.tpep_pickup_datetime = pd.to_datetime(df.tpep_pickup_datetime)
    df.tpep_dropoff_datetime = pd.to_datetime(df.tpep_dropoff_datetime)
    df.to_sql(name='yellow_taxi_data', con=engine, if_exists='append')
    t_end = time()
    print('inserted another chunk..., took %.3f seconds' % (t_end - t_start))
    

inserted another chunk..., took 5.870 seconds
inserted another chunk..., took 5.764 seconds
inserted another chunk..., took 5.809 seconds
inserted another chunk..., took 5.757 seconds
inserted another chunk..., took 5.704 seconds
inserted another chunk..., took 5.697 seconds
inserted another chunk..., took 5.709 seconds
inserted another chunk..., took 5.478 seconds
inserted another chunk..., took 5.530 seconds
inserted another chunk..., took 5.524 seconds
inserted another chunk..., took 5.610 seconds


  df = next(df_iter)


inserted another chunk..., took 5.415 seconds
inserted another chunk..., took 3.515 seconds


StopIteration: 

In [16]:
# Creating a schema for SQL
print(pd.io.sql.get_schema(df, name='yello_taxi_data', con=engine))


CREATE TABLE yello_taxi_data (
	"VendorID" BIGINT, 
	tpep_pickup_datetime TIMESTAMP WITHOUT TIME ZONE, 
	tpep_dropoff_datetime TIMESTAMP WITHOUT TIME ZONE, 
	passenger_count FLOAT(53), 
	trip_distance FLOAT(53), 
	"RatecodeID" FLOAT(53), 
	store_and_fwd_flag TEXT, 
	"PULocationID" BIGINT, 
	"DOLocationID" BIGINT, 
	payment_type BIGINT, 
	fare_amount FLOAT(53), 
	extra FLOAT(53), 
	mta_tax FLOAT(53), 
	tip_amount FLOAT(53), 
	tolls_amount FLOAT(53), 
	improvement_surcharge FLOAT(53), 
	total_amount FLOAT(53), 
	congestion_surcharge FLOAT(53), 
	airport_fee FLOAT(53)
)


