# 1. Description

https://github.com/DataTalksClub/nyc-tlc-data/releases/tag/yellow

wget https://github.com/DataTalksClub/nyc-tlc-data/releases/download/yellow/yellow_tripdata_2021-01.csv.gz


Yellow Trips: 
* https://www1.nyc.gov/assets/tlc/downloads/pdf/data_dictionary_trip_records_yellow.pdf
  
Green Trips:
* https://www1.nyc.gov/assets/tlc/downloads/pdf/data_dictionary_trip_records_green.pdf

# 2. Get data and DDL for Postgres 

In [36]:
import pandas as pd
from sqlalchemy import create_engine
from time import time

In [2]:
# I load only 1000 rows to optimize
df = pd.read_csv('./../yellow_tripdata_2021-01.csv', nrows = 1000)
df['tpep_pickup_datetime'] = pd.to_datetime(df['tpep_pickup_datetime'])
df['tpep_dropoff_datetime'] = pd.to_datetime(df['tpep_dropoff_datetime'])
df.head()

Unnamed: 0,VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge
0,1,2021-01-01 00:30:10,2021-01-01 00:36:12,1,2.1,1,N,142,43,2,8.0,3.0,0.5,0.0,0.0,0.3,11.8,2.5
1,1,2021-01-01 00:51:20,2021-01-01 00:52:19,1,0.2,1,N,238,151,2,3.0,0.5,0.5,0.0,0.0,0.3,4.3,0.0
2,1,2021-01-01 00:43:30,2021-01-01 01:11:06,1,14.7,1,N,132,165,1,42.0,0.5,0.5,8.65,0.0,0.3,51.95,0.0
3,1,2021-01-01 00:15:48,2021-01-01 00:31:01,0,10.6,1,N,138,132,1,29.0,0.5,0.5,6.05,0.0,0.3,36.35,0.0
4,2,2021-01-01 00:31:49,2021-01-01 00:48:21,1,4.94,1,N,68,33,1,16.5,0.5,0.5,4.06,0.0,0.3,24.36,2.5


In [4]:
df.dtypes

VendorID                          int64
tpep_pickup_datetime     datetime64[ns]
tpep_dropoff_datetime    datetime64[ns]
passenger_count                   int64
trip_distance                   float64
RatecodeID                        int64
store_and_fwd_flag               object
PULocationID                      int64
DOLocationID                      int64
payment_type                      int64
fare_amount                     float64
extra                           float64
mta_tax                         float64
tip_amount                      float64
tolls_amount                    float64
improvement_surcharge           float64
total_amount                    float64
congestion_surcharge            float64
dtype: object

In [23]:
user = "root"
password = "root"
port = 5432
db = "ny_taxi"
engine = create_engine(f"postgresql://{user}:{password}@localhost:{port}/{db}")

table_name = 'yellow_tripdata_2021-01'

In [24]:
# Create DDL. I want to create a table in Postgres with same schema
print(pd.io.sql.get_schema(df, name=table_name, con=engine))


CREATE TABLE "yellow_tripdata_2021-01" (
	"VendorID" BIGINT, 
	tpep_pickup_datetime TIMESTAMP WITHOUT TIME ZONE, 
	tpep_dropoff_datetime TIMESTAMP WITHOUT TIME ZONE, 
	passenger_count BIGINT, 
	trip_distance FLOAT(53), 
	"RatecodeID" BIGINT, 
	store_and_fwd_flag TEXT, 
	"PULocationID" BIGINT, 
	"DOLocationID" BIGINT, 
	payment_type BIGINT, 
	fare_amount FLOAT(53), 
	extra FLOAT(53), 
	mta_tax FLOAT(53), 
	tip_amount FLOAT(53), 
	tolls_amount FLOAT(53), 
	improvement_surcharge FLOAT(53), 
	total_amount FLOAT(53), 
	congestion_surcharge FLOAT(53)
)




# 3. Read and write df in DB by chunks

In [37]:
df_iter = pd.read_csv('./../yellow_tripdata_2021-01.csv', iterator=True, chunksize=100_000)
df_iter

<pandas.io.parsers.readers.TextFileReader at 0x7db98d96d4e0>

In [38]:
%%time

# Process the first chunk separately to create/replace the table structure
t_start = time()
try:
    df = next(df_iter)
    df['tpep_pickup_datetime'] = pd.to_datetime(df['tpep_pickup_datetime'])
    df['tpep_dropoff_datetime'] = pd.to_datetime(df['tpep_dropoff_datetime'])
    df.head(0).to_sql(name=table_name, con=engine, if_exists='replace')
    df.to_sql(name=table_name, con=engine, if_exists='append')
    t_end = time()
    print(f"Inserted first chunk..., took {t_end-t_start:.2f} seconds")
except StopIteration:
    print('No data to process.')

Inserted first chunk..., took 10.74 seconds
CPU times: user 5.65 s, sys: 922 ms, total: 6.57 s
Wall time: 10.7 s


In [39]:
%%time

while True:
    t_start = time()
    try:
        df = next(df_iter)
        df['tpep_pickup_datetime'] = pd.to_datetime(df['tpep_pickup_datetime'])
        df['tpep_dropoff_datetime'] = pd.to_datetime(df['tpep_dropoff_datetime'])
        df.to_sql(name=table_name, con=engine, if_exists='append')
        t_end = time()
        print(f"Inserted chunk..., took {t_end-t_start:.2f} seconds")
    except StopIteration:
        print('Finished processing all chunks.')
        break

Inserted chunk..., took 10.77 seconds
Inserted chunk..., took 16.33 seconds
Inserted chunk..., took 13.37 seconds
Inserted chunk..., took 12.42 seconds
Inserted chunk..., took 13.05 seconds
Inserted chunk..., took 12.15 seconds
Inserted chunk..., took 11.12 seconds
Inserted chunk..., took 10.45 seconds
Inserted chunk..., took 10.54 seconds
Inserted chunk..., took 13.68 seconds
Inserted chunk..., took 12.46 seconds




Inserted chunk..., took 11.75 seconds
Inserted chunk..., took 8.39 seconds
Finished processing all chunks.
CPU times: user 1min 8s, sys: 20.4 s, total: 1min 29s
Wall time: 2min 36s


In [41]:
query = f"""
SELECT COUNT(1) FROM "{table_name}"
"""

pd.read_sql(query, con=engine)

Unnamed: 0,count
0,1369765
