In [2]:
import pandas as pd

- The CSV file is very big and Pandas may not be able to handle it properly if the whole thing doesn't fit in RAM. We will only import 100 rows for now.

In [3]:
df = pd.read_csv("yellow_tripdata_2021-07.csv.gz",nrows=100)
df

Unnamed: 0,VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge
0,1,2021-07-01 00:08:51,2021-07-01 00:13:05,1,0.80,1,N,90,68,1,5.0,3.0,0.5,0.00,0.0,0.3,8.80,2.5
1,1,2021-07-01 00:22:39,2021-07-01 00:25:58,1,0.90,1,N,113,90,2,5.0,3.0,0.5,0.00,0.0,0.3,8.80,2.5
2,1,2021-07-01 00:48:33,2021-07-01 00:54:58,1,2.80,1,N,88,232,2,10.0,3.0,0.5,0.00,0.0,0.3,13.80,2.5
3,1,2021-07-01 00:59:44,2021-07-01 01:07:09,1,1.40,1,N,79,249,1,7.0,3.0,0.5,1.50,0.0,0.3,12.30,2.5
4,1,2021-07-01 00:08:35,2021-07-01 00:16:28,0,2.00,1,N,142,238,1,8.5,3.0,0.5,0.00,0.0,0.3,12.30,2.5
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
95,1,2021-07-01 00:35:21,2021-07-01 00:40:49,1,1.40,1,N,249,68,1,6.5,3.0,0.5,2.55,0.0,0.3,12.85,2.5
96,2,2021-07-01 00:04:34,2021-07-01 00:20:15,2,3.11,1,N,48,4,1,13.5,0.5,0.5,3.46,0.0,0.3,20.76,2.5
97,2,2021-07-01 00:29:29,2021-07-01 00:36:14,3,2.84,1,N,79,140,1,9.5,0.5,0.5,3.32,0.0,0.3,16.62,2.5
98,1,2021-07-01 00:12:37,2021-07-01 00:22:54,1,2.00,1,N,41,116,2,9.5,0.5,0.5,0.00,0.0,0.3,10.80,0.0


- We will now create the schema for the database. The schema is the structure of the database; in this case it describes the columns of our table. Pandas can output the SQL DDL (Data definition language) instructions necessary to create the schema.

In [4]:
# We need to provide a name for the table; we will use 'yellow_taxi_data'
print(pd.io.sql.get_schema(df, name='yellow_taxi_data'))

CREATE TABLE "yellow_taxi_data" (
"VendorID" INTEGER,
  "tpep_pickup_datetime" TEXT,
  "tpep_dropoff_datetime" TEXT,
  "passenger_count" INTEGER,
  "trip_distance" REAL,
  "RatecodeID" INTEGER,
  "store_and_fwd_flag" TEXT,
  "PULocationID" INTEGER,
  "DOLocationID" INTEGER,
  "payment_type" INTEGER,
  "fare_amount" REAL,
  "extra" REAL,
  "mta_tax" REAL,
  "tip_amount" REAL,
  "tolls_amount" REAL,
  "improvement_surcharge" REAL,
  "total_amount" REAL,
  "congestion_surcharge" REAL
)


Note that this only outputs the instructions, it hasn't actually created the table in the database yet.

Note that tpep_pickup_datetime and tpep_dropoff_datetime are text fields even though they should be timestamps. Let's change that.

In [5]:
df.tpep_pickup_datetime = pd.to_datetime(df.tpep_pickup_datetime)
df.tpep_dropoff_datetime = pd.to_datetime(df.tpep_dropoff_datetime)
print(pd.io.sql.get_schema(df, name='yellow_taxi_data'))

CREATE TABLE "yellow_taxi_data" (
"VendorID" INTEGER,
  "tpep_pickup_datetime" TIMESTAMP,
  "tpep_dropoff_datetime" TIMESTAMP,
  "passenger_count" INTEGER,
  "trip_distance" REAL,
  "RatecodeID" INTEGER,
  "store_and_fwd_flag" TEXT,
  "PULocationID" INTEGER,
  "DOLocationID" INTEGER,
  "payment_type" INTEGER,
  "fare_amount" REAL,
  "extra" REAL,
  "mta_tax" REAL,
  "tip_amount" REAL,
  "tolls_amount" REAL,
  "improvement_surcharge" REAL,
  "total_amount" REAL,
  "congestion_surcharge" REAL
)


Even though we have the DDL instructions, we still need specific instructions for Postgres to connect to it and create the table. We will use sqlalchemy for this.

In [7]:
from sqlalchemy import create_engine

An engine specifies the database details in a URI. The structure of the URI is:

database://user:password@host:port/database_name

In [10]:
engine = create_engine('postgresql://root:root@localhost:5432/ny_taxi')

In [12]:
# run this cell when the Postgres Docker container is running
engine.connect()

<sqlalchemy.engine.base.Connection at 0x7f3c2ae085e0>

In [13]:
# we can now use our engine to get the specific output for Postgres
print(pd.io.sql.get_schema(df, name='yellow_taxi_data', con=engine))


CREATE TABLE yellow_taxi_data (
	"VendorID" BIGINT, 
	tpep_pickup_datetime TIMESTAMP WITHOUT TIME ZONE, 
	tpep_dropoff_datetime TIMESTAMP WITHOUT TIME ZONE, 
	passenger_count BIGINT, 
	trip_distance FLOAT(53), 
	"RatecodeID" BIGINT, 
	store_and_fwd_flag TEXT, 
	"PULocationID" BIGINT, 
	"DOLocationID" BIGINT, 
	payment_type BIGINT, 
	fare_amount FLOAT(53), 
	extra FLOAT(53), 
	mta_tax FLOAT(53), 
	tip_amount FLOAT(53), 
	tolls_amount FLOAT(53), 
	improvement_surcharge FLOAT(53), 
	total_amount FLOAT(53), 
	congestion_surcharge FLOAT(53)
)




We will now create an iterator that will allow us to read the CSV file in chunks and send them to the database. Otherwise, we may run into problems trying to send too much data at once.

In [16]:
df_iter = pd.read_csv('yellow_tripdata_2021-07.csv.gz', iterator=True, chunksize=100000)
df_iter

<pandas.io.parsers.readers.TextFileReader at 0x7f3c29c0d810>

We can use the next() function to get the chunks using the iterator.

In [17]:
df = next(df_iter)
df

Unnamed: 0,VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge
0,1,2021-07-01 00:08:51,2021-07-01 00:13:05,1,0.80,1,N,90,68,1,5.0,3.0,0.5,0.00,0.0,0.3,8.80,2.5
1,1,2021-07-01 00:22:39,2021-07-01 00:25:58,1,0.90,1,N,113,90,2,5.0,3.0,0.5,0.00,0.0,0.3,8.80,2.5
2,1,2021-07-01 00:48:33,2021-07-01 00:54:58,1,2.80,1,N,88,232,2,10.0,3.0,0.5,0.00,0.0,0.3,13.80,2.5
3,1,2021-07-01 00:59:44,2021-07-01 01:07:09,1,1.40,1,N,79,249,1,7.0,3.0,0.5,1.50,0.0,0.3,12.30,2.5
4,1,2021-07-01 00:08:35,2021-07-01 00:16:28,0,2.00,1,N,142,238,1,8.5,3.0,0.5,0.00,0.0,0.3,12.30,2.5
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
99995,1,2021-07-02 08:48:33,2021-07-02 09:04:45,0,5.10,1,N,141,4,1,16.5,3.0,0.5,5.00,0.0,0.3,25.30,2.5
99996,1,2021-07-02 08:33:48,2021-07-02 08:39:06,1,1.10,1,N,107,170,1,6.0,2.5,0.5,7.00,0.0,0.3,16.30,2.5
99997,1,2021-07-02 08:41:40,2021-07-02 08:48:48,1,1.40,1,N,170,141,2,7.0,2.5,0.5,0.00,0.0,0.3,10.30,2.5
99998,1,2021-07-02 08:53:00,2021-07-02 09:01:30,1,1.50,1,N,229,263,1,8.0,2.5,0.5,2.00,0.0,0.3,13.30,2.5


This is a brand new dataframe, so we need to convert the time columns to timestamp format.

In [18]:
df.tpep_pickup_datetime = pd.to_datetime(df.tpep_pickup_datetime)
df.tpep_dropoff_datetime = pd.to_datetime(df.tpep_dropoff_datetime)
df

Unnamed: 0,VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge
0,1,2021-07-01 00:08:51,2021-07-01 00:13:05,1,0.80,1,N,90,68,1,5.0,3.0,0.5,0.00,0.0,0.3,8.80,2.5
1,1,2021-07-01 00:22:39,2021-07-01 00:25:58,1,0.90,1,N,113,90,2,5.0,3.0,0.5,0.00,0.0,0.3,8.80,2.5
2,1,2021-07-01 00:48:33,2021-07-01 00:54:58,1,2.80,1,N,88,232,2,10.0,3.0,0.5,0.00,0.0,0.3,13.80,2.5
3,1,2021-07-01 00:59:44,2021-07-01 01:07:09,1,1.40,1,N,79,249,1,7.0,3.0,0.5,1.50,0.0,0.3,12.30,2.5
4,1,2021-07-01 00:08:35,2021-07-01 00:16:28,0,2.00,1,N,142,238,1,8.5,3.0,0.5,0.00,0.0,0.3,12.30,2.5
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
99995,1,2021-07-02 08:48:33,2021-07-02 09:04:45,0,5.10,1,N,141,4,1,16.5,3.0,0.5,5.00,0.0,0.3,25.30,2.5
99996,1,2021-07-02 08:33:48,2021-07-02 08:39:06,1,1.10,1,N,107,170,1,6.0,2.5,0.5,7.00,0.0,0.3,16.30,2.5
99997,1,2021-07-02 08:41:40,2021-07-02 08:48:48,1,1.40,1,N,170,141,2,7.0,2.5,0.5,0.00,0.0,0.3,10.30,2.5
99998,1,2021-07-02 08:53:00,2021-07-02 09:01:30,1,1.50,1,N,229,263,1,8.0,2.5,0.5,2.00,0.0,0.3,13.30,2.5


We will now finally create the table in the database. With df.head(n=0) we can get the name of the columns only, without any additional data. We will use it to generate a SQL instruction to generate the table.

In [19]:
# we need to provide the table name, the connection and what to do if the table already exists
# we choose to replace everything in case you had already created something by accident before.
df.head(n=0).to_sql(name='yellow_taxi_data', con=engine, if_exists='replace')

0

You can now use pgcli -h localhost -p 5432 -u root -d ny_taxi on a separate terminal to look at the database:

- \dt for looking at available tables.

- \d yellow_taxi_data for describing the new table.

Let's include our current chunk to our database and time how long it takes.

In [20]:
%time df.to_sql(name='yellow_taxi_data', con=engine, if_exists='append')

CPU times: user 6.63 s, sys: 129 ms, total: 6.75 s
Wall time: 10.3 s


1000


Back on the terminal running pgcli, we can check how many lines were to the database with:

       SELECT count(1) FROM yellow_taxi_data;

You should see 100,000 lines.

Let's write a loop to write all chunks to the database. Use the terminal with pgcli to check the database after the code finishes running.

In [21]:
from time import time

while True: 
    try:
        t_start = time()
        df = next(df_iter)

        df.tpep_pickup_datetime = pd.to_datetime(df.tpep_pickup_datetime)
        df.tpep_dropoff_datetime = pd.to_datetime(df.tpep_dropoff_datetime)
        
        df.to_sql(name='yellow_taxi_data', con=engine, if_exists='append')

        t_end = time()

        print('inserted another chunk, took %.3f second' % (t_end - t_start))
    except StopIteration:
        print('completed')
        break

inserted another chunk, took 10.150 second
inserted another chunk, took 10.232 second
inserted another chunk, took 10.260 second
inserted another chunk, took 10.366 second
inserted another chunk, took 10.406 second
inserted another chunk, took 10.552 second
inserted another chunk, took 10.806 second
inserted another chunk, took 10.667 second
inserted another chunk, took 10.205 second
inserted another chunk, took 10.619 second
inserted another chunk, took 11.362 second


PendingRollbackError: Can't reconnect until invalid transaction is rolled back.  Please rollback() fully before proceeding (Background on this error at: https://sqlalche.me/e/20/8s2b)