In [1]:
import pandas as pd
from sqlalchemy import create_engine

In [2]:
pd.__version__

'2.1.4'

## Step 1:

Download the data from the Github location hosted by DatatalksClub and Amazon S3-

Green Taxi Data - "https://github.com/DataTalksClub/nyc-tlc-data/releases/download/green/green_tripdata_2019-01.csv.gz"
Taxi Zones - "https://s3.amazonaws.com/nyc-tlc/misc/taxi+_zone_lookup.csv"

And then verify that the data has been loaded correctly.

In [3]:
df = pd.read_parquet('yellow_tripdata_2021-01.parquet')

In [4]:
df.shape

(1369769, 19)

In [5]:
df.head()

Unnamed: 0,VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge,airport_fee
0,1,2021-01-01 00:30:10,2021-01-01 00:36:12,1.0,2.1,1.0,N,142,43,2,8.0,3.0,0.5,0.0,0.0,0.3,11.8,2.5,
1,1,2021-01-01 00:51:20,2021-01-01 00:52:19,1.0,0.2,1.0,N,238,151,2,3.0,0.5,0.5,0.0,0.0,0.3,4.3,0.0,
2,1,2021-01-01 00:43:30,2021-01-01 01:11:06,1.0,14.7,1.0,N,132,165,1,42.0,0.5,0.5,8.65,0.0,0.3,51.95,0.0,
3,1,2021-01-01 00:15:48,2021-01-01 00:31:01,0.0,10.6,1.0,N,138,132,1,29.0,0.5,0.5,6.05,0.0,0.3,36.35,0.0,
4,2,2021-01-01 00:31:49,2021-01-01 00:48:21,1.0,4.94,1.0,N,68,33,1,16.5,0.5,0.5,4.06,0.0,0.3,24.36,2.5,


In [6]:
df_100 = df.head(100)

In [7]:
df_100.shape

(100, 19)

In [8]:
taxi_zone = pd.read_csv('taxi+_zone_lookup.csv')

In [9]:
taxi_zone.shape

(265, 4)

In [10]:
taxi_zone.head()

Unnamed: 0,LocationID,Borough,Zone,service_zone
0,1,EWR,Newark Airport,EWR
1,2,Queens,Jamaica Bay,Boro Zone
2,3,Bronx,Allerton/Pelham Gardens,Boro Zone
3,4,Manhattan,Alphabet City,Yellow Zone
4,5,Staten Island,Arden Heights,Boro Zone


## Step 2:

To put the data to postgres using Pandas, you need to first generate a Schema of the dataset, which is an instruction to create a table that specifies the columns and thier data types that you want to have in the table.

Run print(pd.io.sql.get_schema(df, 'yellow_taxi_data')) to generate the schema of the data.

In [11]:
engine = create_engine('postgresql://root:root@localhost:5432/ny_taxi')

In [12]:
engine.connect()

<sqlalchemy.engine.base.Connection at 0x7f834b3941f0>

In [13]:
print(pd.io.sql.get_schema(df, name='yellow_taxi_data', con=engine))


CREATE TABLE yellow_taxi_data (
	"VendorID" BIGINT, 
	tpep_pickup_datetime TIMESTAMP WITHOUT TIME ZONE, 
	tpep_dropoff_datetime TIMESTAMP WITHOUT TIME ZONE, 
	passenger_count FLOAT(53), 
	trip_distance FLOAT(53), 
	"RatecodeID" FLOAT(53), 
	store_and_fwd_flag TEXT, 
	"PULocationID" BIGINT, 
	"DOLocationID" BIGINT, 
	payment_type BIGINT, 
	fare_amount FLOAT(53), 
	extra FLOAT(53), 
	mta_tax FLOAT(53), 
	tip_amount FLOAT(53), 
	tolls_amount FLOAT(53), 
	improvement_surcharge FLOAT(53), 
	total_amount FLOAT(53), 
	congestion_surcharge FLOAT(53), 
	airport_fee FLOAT(53)
)




In [14]:
df_100.to_sql(name='yellow_taxi_data', con=engine, if_exists='replace')

100

In [15]:
import pyarrow.parquet as pq
from time import time

user='root'
password='root'
host='localhost'
port=5432
db='ny_taxi'

output_name='df_cleaned.parquet'
df.to_parquet(output_name)

parquet_file = pq.ParquetFile(output_name)
parquet_size = parquet_file.metadata.num_rows

engine = create_engine(f'postgresql://{user}:{password}@{host}:{port}/{db}')

table_name="yellow_taxi_schema"

# Clear table if exists
pq.read_table(output_name).to_pandas().head(n=0).to_sql(name=table_name, con=engine, if_exists='replace')

# default (and max) batch size
index = 100000
step = index
inc = 1

for i in parquet_file.iter_batches(batch_size=index, use_threads=True):
    t_start = time()
    print(f'Ingesting {index} out of {parquet_size} rows ({index / parquet_size:.0%})')
    i.to_pandas().to_sql(name=table_name, con=engine, if_exists='append')
    index += step if parquet_size - inc*step > step else parquet_size%index
    inc += 1
    t_end = time()
    print(f'\t- it took %.1f seconds' % (t_end - t_start))

Ingesting 100000 out of 1369769 rows (7%)
	- it took 10.3 seconds
Ingesting 200000 out of 1369769 rows (15%)
	- it took 10.3 seconds
Ingesting 300000 out of 1369769 rows (22%)
	- it took 10.2 seconds
Ingesting 400000 out of 1369769 rows (29%)
	- it took 10.1 seconds
Ingesting 500000 out of 1369769 rows (37%)
	- it took 10.0 seconds
Ingesting 600000 out of 1369769 rows (44%)
	- it took 10.1 seconds
Ingesting 700000 out of 1369769 rows (51%)
	- it took 10.3 seconds
Ingesting 800000 out of 1369769 rows (58%)
	- it took 10.5 seconds
Ingesting 900000 out of 1369769 rows (66%)
	- it took 10.3 seconds
Ingesting 1000000 out of 1369769 rows (73%)
	- it took 10.0 seconds
Ingesting 1100000 out of 1369769 rows (80%)
	- it took 9.9 seconds
Ingesting 1200000 out of 1369769 rows (88%)
	- it took 10.1 seconds
Ingesting 1300000 out of 1369769 rows (95%)
	- it took 10.1 seconds
Ingesting 1369769 out of 1369769 rows (100%)
	- it took 6.0 seconds
