# Uploading yellow taxi trip data to postgres

We start off by importing a few packages that we need.
* Time for benchmarking
* pandas for storing data in dataframes and offloading to pg
* pyarrow for handling parquet files

In [51]:
import pandas as pd
from time import time
import pyarrow.parquet as pq

first thing we do is we want to fammiliarize ourself with the file so we cam read the metadata for the parquet file. parquet has metadata thats readily available unlike other file types.

In [31]:
print(pq.read_metadata("yellow_tripdata_2021-01.parquet"))

<pyarrow._parquet.FileMetaData object at 0x11f9acb30>
  created_by: parquet-cpp-arrow version 7.0.0
  num_columns: 19
  num_rows: 1369769
  num_row_groups: 1
  format_version: 1.0
  serialized_size: 10382


now that we have some info on what the parquet file looks like we can go onto loading the contents of the file. we use `pq.ParquetFile()` to load a parquet into python. however you may notice that this isnt a df its just an object of parquet file. we will need to actually read this data and load that read data into a dataframe.

so to do that we can use `read()` and store it in another variable called `table`

now this becomes a pyarrow table which is close to a traditional df. 

we can use `table.schema` to look at the table schema

In [55]:
file = pq.ParquetFile("yellow_tripdata_2021-01.parquet")
print(type(file))
table = file.read()
print(type(table))
table.schema

<class 'pyarrow.parquet.core.ParquetFile'>
<class 'pyarrow.lib.Table'>


VendorID: int64
tpep_pickup_datetime: timestamp[us]
tpep_dropoff_datetime: timestamp[us]
passenger_count: double
trip_distance: double
RatecodeID: double
store_and_fwd_flag: string
PULocationID: int64
DOLocationID: int64
payment_type: int64
fare_amount: double
extra: double
mta_tax: double
tip_amount: double
tolls_amount: double
improvement_surcharge: double
total_amount: double
congestion_surcharge: double
airport_fee: double
-- schema metadata --
pandas: '{"index_columns": [], "column_indexes": [], "columns": [{"name":' + 2492

now that we have a table we can use another pyarrow method to convert the table to a pandas df, `table.to_pandas()`

from this we get a pandas dataframe for our pyarrow table. and now were ready to do normal pandas work on this.

In [56]:
df = table.to_pandas()
df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 1369769 entries, 0 to 1369768
Data columns (total 19 columns):
 #   Column                 Non-Null Count    Dtype         
---  ------                 --------------    -----         
 0   VendorID               1369769 non-null  int64         
 1   tpep_pickup_datetime   1369769 non-null  datetime64[us]
 2   tpep_dropoff_datetime  1369769 non-null  datetime64[us]
 3   passenger_count        1271417 non-null  float64       
 4   trip_distance          1369769 non-null  float64       
 5   RatecodeID             1271417 non-null  float64       
 6   store_and_fwd_flag     1271417 non-null  object        
 7   PULocationID           1369769 non-null  int64         
 8   DOLocationID           1369769 non-null  int64         
 9   payment_type           1369769 non-null  int64         
 10  fare_amount            1369769 non-null  float64       
 11  extra                  1369769 non-null  float64       
 12  mta_tax                13697

now over here these two timestamp rows needed to be casted to a datetime so we use `pd.to_datetime(df.<col_name>)` to cast the entire column to the correct type.

In [14]:
pd.to_datetime(df.tpep_pickup_datetime)
pd.to_datetime(df.tpep_dropoff_datetime)

0         2021-01-01 00:36:12
1         2021-01-01 00:52:19
2         2021-01-01 01:11:06
3         2021-01-01 00:31:01
4         2021-01-01 00:48:21
                  ...        
1369764   2021-01-31 23:33:00
1369765   2021-01-31 23:51:00
1369766   2021-01-31 23:38:00
1369767   2021-02-01 00:02:03
1369768   2021-01-31 23:31:22
Name: tpep_dropoff_datetime, Length: 1369769, dtype: datetime64[us]

so now that we have our data in pandas we need to ingest this into pg. now to do that we need to connect to the db somehow and we do that by using `create_engine` and then pass into `to_sql` as the connection object. 

In [17]:
from sqlalchemy import create_engine

so here what were doing is actually creating the engine and were doing `postgresql+psycopg` (dialect+driver)

In [37]:
engine = create_engine('postgresql+psycopg://root:root@localhost:5432/ny_taxi', pool_pre_ping=True)
engine.connect()


<sqlalchemy.engine.base.Connection at 0x1282a79d0>

In [None]:
print(pd.io.sql.get_schema(df, name='yellow_taxi_data', con=engine))


CREATE TABLE yellow_taxi_data (
	"VendorID" BIGINT, 
	tpep_pickup_datetime TIMESTAMP WITHOUT TIME ZONE, 
	tpep_dropoff_datetime TIMESTAMP WITHOUT TIME ZONE, 
	passenger_count FLOAT(53), 
	trip_distance FLOAT(53), 
	"RatecodeID" FLOAT(53), 
	store_and_fwd_flag TEXT, 
	"PULocationID" BIGINT, 
	"DOLocationID" BIGINT, 
	payment_type BIGINT, 
	fare_amount FLOAT(53), 
	extra FLOAT(53), 
	mta_tax FLOAT(53), 
	tip_amount FLOAT(53), 
	tolls_amount FLOAT(53), 
	improvement_surcharge FLOAT(53), 
	total_amount FLOAT(53), 
	congestion_surcharge FLOAT(53), 
	airport_fee FLOAT(53)
)




since our dataset is huge (over a million records) we dont want to ingest this all at once. and its good practice for even larget datasets where we will need to batch insert anyways. so over here we create an iterator using `iter_batches(batchsize=)`
this is another pyarrow parquet method. it creates an iterator to iterate over the parquet file in batches. 

like a normal iterator in python we use `next(<iter_obj>)` to iterate over it.

here were setting batch size to 100000 so it will take us 14 iterations to go over 1.3 million records. now each iteration of `branches_iter` is a new parquet batch which we can load into pandas using `to_pandas()`

and thats how were gonna load all the batches. we loop using this iterator..

In [45]:
branches_iter = file.iter_batches(batch_size=100000)
branches_iter

df = next(branches_iter).to_pandas()
df

Unnamed: 0,VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge,airport_fee
0,1,2021-01-01 00:30:10,2021-01-01 00:36:12,1.0,2.10,1.0,N,142,43,2,8.0,3.0,0.5,0.00,0.0,0.3,11.80,2.5,
1,1,2021-01-01 00:51:20,2021-01-01 00:52:19,1.0,0.20,1.0,N,238,151,2,3.0,0.5,0.5,0.00,0.0,0.3,4.30,0.0,
2,1,2021-01-01 00:43:30,2021-01-01 01:11:06,1.0,14.70,1.0,N,132,165,1,42.0,0.5,0.5,8.65,0.0,0.3,51.95,0.0,
3,1,2021-01-01 00:15:48,2021-01-01 00:31:01,0.0,10.60,1.0,N,138,132,1,29.0,0.5,0.5,6.05,0.0,0.3,36.35,0.0,
4,2,2021-01-01 00:31:49,2021-01-01 00:48:21,1.0,4.94,1.0,N,68,33,1,16.5,0.5,0.5,4.06,0.0,0.3,24.36,2.5,
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
99995,1,2021-01-04 14:04:31,2021-01-04 14:08:52,3.0,0.70,1.0,N,234,224,2,5.0,2.5,0.5,0.00,0.0,0.3,8.30,2.5,
99996,1,2021-01-04 14:18:46,2021-01-04 14:35:45,2.0,3.30,1.0,N,234,236,1,14.5,2.5,0.5,3.55,0.0,0.3,21.35,2.5,
99997,1,2021-01-04 14:42:41,2021-01-04 14:59:22,2.0,4.70,1.0,N,236,79,1,17.0,2.5,0.5,4.05,0.0,0.3,24.35,2.5,
99998,2,2021-01-04 14:39:02,2021-01-04 15:09:37,2.0,17.95,2.0,N,132,148,1,52.0,0.0,0.5,5.00,0.0,0.3,60.30,2.5,


this is how that would look with some basic batch counting and benchmarking we create that iterator and loop over it in the loop defitinion. the for each batch we load that into a df and then load that into sql and we pass it that db connection engine.

In [52]:
t_start = time()
count = 0
for batch in file.iter_batches(batch_size=100000):
    count+=1
    batch_df = batch.to_pandas()
    print(f"inserting batch {count}...")
    b_start = time()
    batch_df.to_sql(name='ny_taxi_data', con=engine, if_exists='append')
    b_end = time()
    print(f"inserted! Total time taken is {b_end-b_start:10.3f} seconds.\n")

t_end = time()

print(f"completed! Total Time taken was {t_end-t_start:10.3f} seconds for {count} batches.\n")



inserting batch 1...
inserted! Total time taken is      1.927 seconds.

inserting batch 2...
inserted! Total time taken is      1.704 seconds.

inserting batch 3...
inserted! Total time taken is      1.692 seconds.

inserting batch 4...
inserted! Total time taken is      1.794 seconds.

inserting batch 5...
inserted! Total time taken is      1.771 seconds.

inserting batch 6...
inserted! Total time taken is      1.741 seconds.

inserting batch 7...
inserted! Total time taken is      1.764 seconds.

inserting batch 8...
inserted! Total time taken is      1.780 seconds.

inserting batch 9...
inserted! Total time taken is      1.722 seconds.

inserting batch 10...
inserted! Total time taken is      1.710 seconds.

inserting batch 11...
inserted! Total time taken is      1.702 seconds.

inserting batch 12...
inserted! Total time taken is      1.749 seconds.

inserting batch 13...
inserted! Total time taken is      1.681 seconds.

inserting batch 14...
inserted! Total time taken is      1.2

from this we can now go to our pgcli and run a few tests.

first we can run `\dt` just to get the tables. here we can now see we have that taxi data loaded into our db. 

next we can run a `\d <table_name>` to get the table schema. in the cli from this we can see the schema for this

now we can look at the data itself. lets run a `SELECT count(1) from ny_taxi_data;`
and now we can see that we have all 1.3 million records. 

