In [1]:
import pandas as pd

In [3]:
pd.__version__

'1.4.0'

First lets read the CSV data using pandas, we only read the first 100 rows so that the process would be quick.

In [4]:
df = pd.read_csv('yellow_tripdata_2021-01.csv', nrows=100)

In [5]:
df

Unnamed: 0,VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge
0,1,2021-01-01 00:30:10,2021-01-01 00:36:12,1,2.10,1,N,142,43,2,8.0,3.0,0.5,0.00,0.0,0.3,11.80,2.5
1,1,2021-01-01 00:51:20,2021-01-01 00:52:19,1,0.20,1,N,238,151,2,3.0,0.5,0.5,0.00,0.0,0.3,4.30,0.0
2,1,2021-01-01 00:43:30,2021-01-01 01:11:06,1,14.70,1,N,132,165,1,42.0,0.5,0.5,8.65,0.0,0.3,51.95,0.0
3,1,2021-01-01 00:15:48,2021-01-01 00:31:01,0,10.60,1,N,138,132,1,29.0,0.5,0.5,6.05,0.0,0.3,36.35,0.0
4,2,2021-01-01 00:31:49,2021-01-01 00:48:21,1,4.94,1,N,68,33,1,16.5,0.5,0.5,4.06,0.0,0.3,24.36,2.5
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
95,2,2021-01-01 00:12:41,2021-01-01 00:26:47,1,4.13,1,N,161,226,1,14.5,0.5,0.5,3.66,0.0,0.3,21.96,2.5
96,2,2021-01-01 00:23:29,2021-01-01 00:35:03,2,4.12,1,N,162,74,2,13.5,0.5,0.5,0.00,0.0,0.3,17.30,2.5
97,2,2021-01-01 00:46:17,2021-01-01 00:54:25,2,2.22,1,N,144,170,1,9.0,0.5,0.5,2.56,0.0,0.3,15.36,2.5
98,2,2021-01-01 00:28:16,2021-01-01 00:51:44,1,7.11,1,N,264,264,2,23.5,0.5,0.5,0.00,0.0,0.3,24.80,0.0


Next we can check each column type

In [6]:
df.dtypes

VendorID                   int64
tpep_pickup_datetime      object
tpep_dropoff_datetime     object
passenger_count            int64
trip_distance            float64
RatecodeID                 int64
store_and_fwd_flag        object
PULocationID               int64
DOLocationID               int64
payment_type               int64
fare_amount              float64
extra                    float64
mta_tax                  float64
tip_amount               float64
tolls_amount             float64
improvement_surcharge    float64
total_amount             float64
congestion_surcharge     float64
dtype: object

There are some columns with type `object`, but here the type `object` is actually not quite clear as it can be a `string`, `list`, etc. In our case, this `object` type is actually a `string`.

Fortunately pandas has a nice module package to get our data schema and convert into a DDL (Data Definition Languange).

In [7]:
print(pd.io.sql.get_schema(df, name='yellow_taxi_data'))

CREATE TABLE "yellow_taxi_data" (
"VendorID" INTEGER,
  "tpep_pickup_datetime" TEXT,
  "tpep_dropoff_datetime" TEXT,
  "passenger_count" INTEGER,
  "trip_distance" REAL,
  "RatecodeID" INTEGER,
  "store_and_fwd_flag" TEXT,
  "PULocationID" INTEGER,
  "DOLocationID" INTEGER,
  "payment_type" INTEGER,
  "fare_amount" REAL,
  "extra" REAL,
  "mta_tax" REAL,
  "tip_amount" REAL,
  "tolls_amount" REAL,
  "improvement_surcharge" REAL,
  "total_amount" REAL,
  "congestion_surcharge" REAL
)


* The print() statement is used here so we can read the DDL in a clean form, since the actual command returns a string.
* The `name` attribute needs to be specified as it denotes the table we want to perform this DDL on.
* Note that the `object` type before correctly parsed into a `TEXT` field

Looking at the data again, it seems that our dataframe parsed the column `tpep_pickup_datetime` and `tpep_dropoff_datetime` into a `string`, but we actually want a `datetime` type here, so let's convert it.

In [8]:
pd.to_datetime(df.tpep_pickup_datetime)

0    2021-01-01 00:30:10
1    2021-01-01 00:51:20
2    2021-01-01 00:43:30
3    2021-01-01 00:15:48
4    2021-01-01 00:31:49
             ...        
95   2021-01-01 00:12:41
96   2021-01-01 00:23:29
97   2021-01-01 00:46:17
98   2021-01-01 00:28:16
99   2021-01-01 00:42:35
Name: tpep_pickup_datetime, Length: 100, dtype: datetime64[ns]

Note the `datetime64` type here. Now lets convert both columns.

In [9]:
df.tpep_pickup_datetime = pd.to_datetime(df.tpep_pickup_datetime)
df.tpep_dropoff_datetime = pd.to_datetime(df.tpep_dropoff_datetime)

In [10]:
print(pd.io.sql.get_schema(df, name='yellow_taxi_data'))

CREATE TABLE "yellow_taxi_data" (
"VendorID" INTEGER,
  "tpep_pickup_datetime" TIMESTAMP,
  "tpep_dropoff_datetime" TIMESTAMP,
  "passenger_count" INTEGER,
  "trip_distance" REAL,
  "RatecodeID" INTEGER,
  "store_and_fwd_flag" TEXT,
  "PULocationID" INTEGER,
  "DOLocationID" INTEGER,
  "payment_type" INTEGER,
  "fare_amount" REAL,
  "extra" REAL,
  "mta_tax" REAL,
  "tip_amount" REAL,
  "tolls_amount" REAL,
  "improvement_surcharge" REAL,
  "total_amount" REAL,
  "congestion_surcharge" REAL
)


Now the columns are correctly parsed to `TIMESTAMP` type. 

This DDL statement may not work for Postgres, for that we need to tell pandas that we want to put this DDL into Postgres, so that it will generate a DDL statement that work for Postgres.

In [11]:
from sqlalchemy import create_engine

In [12]:
engine = create_engine('postgresql://root:root@localhost:5432/ny_taxi')

We created a postgresql connection engine with sqlalchemy, note the format of the connection URI:

```
db_engine://user:password@host:port/db_name
```

The first term is the DB engine, since we are using postgres, it is `postgresql`. The rest we fill with the same parameter we setup for our postgres db. Next is to just pass the connection engine into the `get_schema()` function.

In [13]:
print(pd.io.sql.get_schema(df, name='yellow_taxi_data', con=engine))


CREATE TABLE yellow_taxi_data (
	"VendorID" BIGINT, 
	tpep_pickup_datetime TIMESTAMP WITHOUT TIME ZONE, 
	tpep_dropoff_datetime TIMESTAMP WITHOUT TIME ZONE, 
	passenger_count BIGINT, 
	trip_distance FLOAT(53), 
	"RatecodeID" BIGINT, 
	store_and_fwd_flag TEXT, 
	"PULocationID" BIGINT, 
	"DOLocationID" BIGINT, 
	payment_type BIGINT, 
	fare_amount FLOAT(53), 
	extra FLOAT(53), 
	mta_tax FLOAT(53), 
	tip_amount FLOAT(53), 
	tolls_amount FLOAT(53), 
	improvement_surcharge FLOAT(53), 
	total_amount FLOAT(53), 
	congestion_surcharge FLOAT(53)
)




Now the DDL statement would work for Postgres, the schema may not be ideal but should work just fine for our use case.

Next, we want to ingest our data into the DB, but we don't want to do it all at once because it has 1.3 million rows. What we can do is that we ingest it in batches.

We can use iterator on pandas to read the data in chunks and then load it in batches

In [14]:
df_iter = pd.read_csv('yellow_tripdata_2021-01.csv', iterator=True, chunksize=100000)

In [15]:
df_iter

<pandas.io.parsers.readers.TextFileReader at 0x175ef5b3be0>

Now our dataframe will be read in chunks of 100000 rows. To get the first chunk we can use `next()` which retrieves the first value in an iterator.

In [16]:
df = next(df_iter)

In [18]:
len(df)

100000

Note that the size of the dataframe is now 100000 rows as expected. This is the first chunk of our data. We also need to make sure the datetime columns are properly parsed.

In [19]:
df.tpep_pickup_datetime = pd.to_datetime(df.tpep_pickup_datetime)
df.tpep_dropoff_datetime = pd.to_datetime(df.tpep_dropoff_datetime)

Since we are loading in batches, it's best if we create the table first. We can use `df.head(n=0)` to only get the columns to insert.

In [20]:
df.head(n=0).to_sql(name='yellow_taxi_data', con=engine, if_exists='replace')

0

The parameter `if_exists` gives us interaction on what to do if table with the name specified in the parameter `name` exists. Using `replace` here means that the table will be dropped first if exists, then inserted with the new one.

After the table is created, verify it on pgcli:

![table-create](images/table-create.png)

Now that the table has been created, we can start the ingestion

In [21]:
%time df.to_sql(name='yellow_taxi_data', con=engine, if_exists='append')

CPU times: total: 4.48 s
Wall time: 9.57 s


1000

This time we use `append` so that the rows will be appended to the table.

`%time` is a magic command in jupyter/Ipython we can use to time the execution time.

Verify that we have the correct number of rows with

```sql
SELECT count(1) FROM yellow_taxi_data;
```

![first-chunk](images/first-chunk.png)

Lastly is to ingest the rest of the chunks:

In [22]:
from time import time

In [23]:
while True:
    try:
        t_start = time()

        df = next(df_iter)

        df.tpep_pickup_datetime = pd.to_datetime(df.tpep_pickup_datetime)
        df.tpep_dropoff_datetime = pd.to_datetime(df.tpep_dropoff_datetime)

        df.to_sql(name='yellow_taxi_data', con=engine, if_exists='append')

        t_end = time()
        
        print('Ingesting chunk, took %.3f second' % (t_end - t_start))
    except StopIteration:
        print('Finished ingesting all the chunks')
        break

Ingesting chunk, took 8.945 second
Ingesting chunk, took 9.019 second
Ingesting chunk, took 8.947 second
Ingesting chunk, took 8.881 second
Ingesting chunk, took 9.178 second
Ingesting chunk, took 9.266 second
Ingesting chunk, took 9.423 second
Ingesting chunk, took 9.121 second
Ingesting chunk, took 9.274 second
Ingesting chunk, took 9.169 second
Ingesting chunk, took 9.084 second


  df = next(df_iter)


Ingesting chunk, took 9.478 second
Ingesting chunk, took 6.081 second
Finished ingesting all the chunks


Verify we have the correct number of rows

![finish-ingest](images/finish-ingest.png)