## Libraries

I needed to install `pyarrow` to be able to ingest the parquet file with `pandas`.

In [1]:
import pandas as pd

In [2]:
pd.__version__

'1.4.3'

In [3]:
from sqlalchemy import create_engine

## Import data

We need first to run our postgres container in docker, and download the data from the data source.  
Next, let's import the data, and create an sql schema from it:

In [15]:
df = pd.read_csv('../../data/yellow_tripdata_2021-01.csv')

  df = pd.read_csv('../../data/yellow_tripdata_2021-01.csv')


In [16]:
df.head()

Unnamed: 0,VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge
0,1.0,2021-01-01 00:30:10,2021-01-01 00:36:12,1.0,2.1,1.0,N,142,43,2.0,8.0,3.0,0.5,0.0,0.0,0.3,11.8,2.5
1,1.0,2021-01-01 00:51:20,2021-01-01 00:52:19,1.0,0.2,1.0,N,238,151,2.0,3.0,0.5,0.5,0.0,0.0,0.3,4.3,0.0
2,1.0,2021-01-01 00:43:30,2021-01-01 01:11:06,1.0,14.7,1.0,N,132,165,1.0,42.0,0.5,0.5,8.65,0.0,0.3,51.95,0.0
3,1.0,2021-01-01 00:15:48,2021-01-01 00:31:01,0.0,10.6,1.0,N,138,132,1.0,29.0,0.5,0.5,6.05,0.0,0.3,36.35,0.0
4,2.0,2021-01-01 00:31:49,2021-01-01 00:48:21,1.0,4.94,1.0,N,68,33,1.0,16.5,0.5,0.5,4.06,0.0,0.3,24.36,2.5


In [17]:
print(pd.io.sql.get_schema(df, name='yellow_taxi_data'))

CREATE TABLE "yellow_taxi_data" (
"VendorID" REAL,
  "tpep_pickup_datetime" TEXT,
  "tpep_dropoff_datetime" TEXT,
  "passenger_count" REAL,
  "trip_distance" REAL,
  "RatecodeID" REAL,
  "store_and_fwd_flag" TEXT,
  "PULocationID" INTEGER,
  "DOLocationID" INTEGER,
  "payment_type" REAL,
  "fare_amount" REAL,
  "extra" REAL,
  "mta_tax" REAL,
  "tip_amount" REAL,
  "tolls_amount" REAL,
  "improvement_surcharge" REAL,
  "total_amount" REAL,
  "congestion_surcharge" REAL
)


In this case, the columns `tpep_pickup_datetime` and `tpep_dropoff_datetime` are imported as **text**, and we need to cast them to **datetime**. To make this transformation, we can use the `pd.to_datetime` function in **pandas**.

In [18]:
df['tpep_pickup_datetime'] = pd.to_datetime(df['tpep_pickup_datetime'])
df['tpep_dropoff_datetime'] = pd.to_datetime(df['tpep_dropoff_datetime'])

Next step, and before we ingest the data from jupyter to postgres, is to tell psotgres what is the data schema. For that, we need to create the connection, which we are going to do create with the python library [SQL Alchemy](https://www.sqlalchemy.org/).

## Connection to database

In [8]:
# Specify the type of the database://user:password@hostname:port/databasename
engine = create_engine('postgresql://root:root@localhost:5432/ny_taxi')

In [10]:
# Test the connection
engine.connect()

<sqlalchemy.engine.base.Connection at 0x7f87e32c74f0>

Now, we need to print again the sql schema created with pandas, but this time specifying the postgres database through the connection created.

In [11]:
print(pd.io.sql.get_schema(df, name='yellow_taxi_data', con=engine))


CREATE TABLE yellow_taxi_data (
	"VendorID" BIGINT, 
	tpep_pickup_datetime TIMESTAMP WITHOUT TIME ZONE, 
	tpep_dropoff_datetime TIMESTAMP WITHOUT TIME ZONE, 
	passenger_count FLOAT(53), 
	trip_distance FLOAT(53), 
	"RatecodeID" FLOAT(53), 
	store_and_fwd_flag TEXT, 
	"PULocationID" BIGINT, 
	"DOLocationID" BIGINT, 
	payment_type BIGINT, 
	fare_amount FLOAT(53), 
	extra FLOAT(53), 
	mta_tax FLOAT(53), 
	tip_amount FLOAT(53), 
	tolls_amount FLOAT(53), 
	improvement_surcharge FLOAT(53), 
	total_amount FLOAT(53), 
	congestion_surcharge FLOAT(53), 
	airport_fee FLOAT(53)
)




We can see that the data types have changed, and now are the right ones for postgres. This is the schema that pandas will pass to postgres to create the table.

## Ingest the data to postgres

As the data could be too big (actually is not big, just 21M), the are going to ingest it in chunks. For that, we are going to create an **iterator**. We are going to import just a part of the data, load it in postgres, and repeat with the next chunk.

In [14]:
df_iter = pd.read_csv('../../data/yellow_tripdata_2021-01.csv', iterator=True, chunksize=100000)

To iterate to the next chunk, we use the python function [next](https://docs.python.org/3/library/functions.html#next).

In [19]:
df = next(df_iter)

In [21]:
len(df)

100000

In [22]:
df['tpep_pickup_datetime'] = pd.to_datetime(df['tpep_pickup_datetime'])
df['tpep_dropoff_datetime'] = pd.to_datetime(df['tpep_dropoff_datetime'])

In [None]:
# df.to_sql(name='yellow_taxi_data', con=engine, if_exists='replace')

Next, we are going to create the table with just the columns names. we are goingt to use `head(n=0)` to get the headers, and the method `pd.DataFrame.to_sql()` to load it in the postgres database.

In [25]:
# with n=0 we get only the headers
df.head(n=0).to_sql(name='yellow_taxi_data', con=engine, if_exists='replace')

0

Then, we load the complete chunk into the database.

In [28]:
%time df.to_sql(name='yellow_taxi_data', con=engine, if_exists='append')

CPU times: user 4.62 s, sys: 61.6 ms, total: 4.68 s
Wall time: 9.05 s


1000

Now, we create a while loop to ingest all the remaining chunks (this is not the best way to do this, but for the moment it is ok).

In [29]:
from time import time

In [31]:
while True:
    t_start = time()
    
    df = next(df_iter)
    
    df['tpep_pickup_datetime'] = pd.to_datetime(df['tpep_pickup_datetime'])
    df['tpep_dropoff_datetime'] = pd.to_datetime(df['tpep_dropoff_datetime'])

    df.to_sql(name='yellow_taxi_data', con=engine, if_exists='append')
    
    t_end = time()
    
    print('inserted another chunk..., took %.3f seconds' % (t_end - t_start))


inserted another chunk..., took 8.900 seconds
inserted another chunk..., took 7.868 seconds
inserted another chunk..., took 6.766 seconds
inserted another chunk..., took 7.843 seconds
inserted another chunk..., took 8.051 seconds
inserted another chunk..., took 8.100 seconds
inserted another chunk..., took 7.322 seconds
inserted another chunk..., took 8.733 seconds
inserted another chunk..., took 8.760 seconds
inserted another chunk..., took 8.821 seconds


  df = next(df_iter)


inserted another chunk..., took 8.966 seconds
inserted another chunk..., took 5.211 seconds


StopIteration: 