This file was tested with MacOS using Conda for Python management.

Make sure that your Python env has `pandas` and `sqlalchemy` installed. I also had to install `psycopg2` manually.

In [1]:
import pandas as pd
pd.__version__

'2.0.3'

The CSV file is very big and Pandas may not be able to handle it properly if the whole thing doesn't fit in RAM. We will only import 100 rows for now.

In [2]:
# df = pd.read_parquet('yellow_tripdata_2023-01.parquet')
# df.head()


We will now create the ***schema*** for the database. The _schema_ is the structure of the database; in this case it describes the columns of our table. Pandas can output the SQL ***DDL*** (Data definition language) instructions necessary to create the schema.

In [3]:
#df.to_csv('yellow_tripdata_2023-01.csv', index=False)

In [5]:
df = pd.read_csv('green_tripdata_2019-09.csv.gz')
df.head()

  df = pd.read_csv('green_tripdata_2019-09.csv.gz')


Unnamed: 0,VendorID,lpep_pickup_datetime,lpep_dropoff_datetime,store_and_fwd_flag,RatecodeID,PULocationID,DOLocationID,passenger_count,trip_distance,fare_amount,extra,mta_tax,tip_amount,tolls_amount,ehail_fee,improvement_surcharge,total_amount,payment_type,trip_type,congestion_surcharge
0,2.0,2019-09-01 00:10:53,2019-09-01 00:23:46,N,1.0,65,189,5.0,2.0,10.5,0.5,0.5,2.36,0.0,,0.3,14.16,1.0,1.0,0.0
1,2.0,2019-09-01 00:31:22,2019-09-01 00:44:37,N,1.0,97,225,5.0,3.2,12.0,0.5,0.5,0.0,0.0,,0.3,13.3,2.0,1.0,0.0
2,2.0,2019-09-01 00:50:24,2019-09-01 01:03:20,N,1.0,37,61,5.0,2.99,12.0,0.5,0.5,0.0,0.0,,0.3,13.3,2.0,1.0,0.0
3,2.0,2019-09-01 00:27:06,2019-09-01 00:33:22,N,1.0,145,112,1.0,1.73,7.5,0.5,0.5,1.5,0.0,,0.3,10.3,1.0,1.0,0.0
4,2.0,2019-09-01 00:43:23,2019-09-01 00:59:54,N,1.0,112,198,1.0,3.42,14.0,0.5,0.5,3.06,0.0,,0.3,18.36,1.0,1.0,0.0


In [13]:
# We need to provide a name for the table; we will use 'yellow_taxi_data'
print(pd.io.sql.get_schema(df, name='green_tripdata_2019'))

CREATE TABLE "green_tripdata_2019" (
"VendorID" REAL,
  "lpep_pickup_datetime" TIMESTAMP,
  "lpep_dropoff_datetime" TIMESTAMP,
  "store_and_fwd_flag" TEXT,
  "RatecodeID" REAL,
  "PULocationID" INTEGER,
  "DOLocationID" INTEGER,
  "passenger_count" REAL,
  "trip_distance" REAL,
  "fare_amount" REAL,
  "extra" REAL,
  "mta_tax" REAL,
  "tip_amount" REAL,
  "tolls_amount" REAL,
  "ehail_fee" REAL,
  "improvement_surcharge" REAL,
  "total_amount" REAL,
  "payment_type" REAL,
  "trip_type" REAL,
  "congestion_surcharge" REAL
)


Note that this only outputs the instructions, it hasn't actually created the table in the database yet.

Note that `tpep_pickup_datetime` and `tpep_dropoff_datetime` are text fields even though they should be timestamps. Let's change that.

In [14]:
df.lpep_pickup_datetime = pd.to_datetime(df.lpep_pickup_datetime)
df.lpep_dropoff_datetime = pd.to_datetime(df.lpep_dropoff_datetime)
print(pd.io.sql.get_schema(df, name='green_tripdata_2019'))

CREATE TABLE "green_tripdata_2019" (
"VendorID" REAL,
  "lpep_pickup_datetime" TIMESTAMP,
  "lpep_dropoff_datetime" TIMESTAMP,
  "store_and_fwd_flag" TEXT,
  "RatecodeID" REAL,
  "PULocationID" INTEGER,
  "DOLocationID" INTEGER,
  "passenger_count" REAL,
  "trip_distance" REAL,
  "fare_amount" REAL,
  "extra" REAL,
  "mta_tax" REAL,
  "tip_amount" REAL,
  "tolls_amount" REAL,
  "ehail_fee" REAL,
  "improvement_surcharge" REAL,
  "total_amount" REAL,
  "payment_type" REAL,
  "trip_type" REAL,
  "congestion_surcharge" REAL
)


Even though we have the DDL instructions, we still need specific instructions for Postgres to connect to it and create the table. We will use `sqlalchemy` for this.

In [9]:
from sqlalchemy import create_engine

An ***engine*** specifies the database details in a URI. The structure of the URI is:

`database://user:password@host:port/database_name`

In [22]:
engine = create_engine('postgresql://root:root@localhost:5432/ny_taxi')

In [23]:
# run this cell when the Postgres Docker container is running
engine.connect()

OperationalError: (psycopg2.OperationalError) could not connect to server: Connection refused
	Is the server running on host "localhost" (127.0.0.1) and accepting
	TCP/IP connections on port 5432?

(Background on this error at: https://sqlalche.me/e/14/e3q8)

In [15]:
# we can now use our engine to get the specific output for Postgres
print(pd.io.sql.get_schema(df, name='green_tripdata_2019', con=engine))


CREATE TABLE green_tripdata_2019 (
	"VendorID" FLOAT(53), 
	lpep_pickup_datetime TIMESTAMP WITHOUT TIME ZONE, 
	lpep_dropoff_datetime TIMESTAMP WITHOUT TIME ZONE, 
	store_and_fwd_flag TEXT, 
	"RatecodeID" FLOAT(53), 
	"PULocationID" BIGINT, 
	"DOLocationID" BIGINT, 
	passenger_count FLOAT(53), 
	trip_distance FLOAT(53), 
	fare_amount FLOAT(53), 
	extra FLOAT(53), 
	mta_tax FLOAT(53), 
	tip_amount FLOAT(53), 
	tolls_amount FLOAT(53), 
	ehail_fee FLOAT(53), 
	improvement_surcharge FLOAT(53), 
	total_amount FLOAT(53), 
	payment_type FLOAT(53), 
	trip_type FLOAT(53), 
	congestion_surcharge FLOAT(53)
)




We will now create an ***iterator*** that will allow us to read the CSV file in chunks and send them to the database. Otherwise, we may run into problems trying to send too much data at once.

In [16]:
df_iter = pd.read_csv('green_tripdata_2019-09.csv.gz', iterator=True, chunksize=100000)
df_iter

<pandas.io.parsers.readers.TextFileReader at 0x7f9e6ebd77d0>

We can use the `next()` function to get the chunks using the iterator.

In [17]:
df = next(df_iter)
df

Unnamed: 0,VendorID,lpep_pickup_datetime,lpep_dropoff_datetime,store_and_fwd_flag,RatecodeID,PULocationID,DOLocationID,passenger_count,trip_distance,fare_amount,extra,mta_tax,tip_amount,tolls_amount,ehail_fee,improvement_surcharge,total_amount,payment_type,trip_type,congestion_surcharge
0,2,2019-09-01 00:10:53,2019-09-01 00:23:46,N,1,65,189,5,2.00,10.5,0.5,0.5,2.36,0.0,,0.3,14.16,1,1,0.0
1,2,2019-09-01 00:31:22,2019-09-01 00:44:37,N,1,97,225,5,3.20,12.0,0.5,0.5,0.00,0.0,,0.3,13.30,2,1,0.0
2,2,2019-09-01 00:50:24,2019-09-01 01:03:20,N,1,37,61,5,2.99,12.0,0.5,0.5,0.00,0.0,,0.3,13.30,2,1,0.0
3,2,2019-09-01 00:27:06,2019-09-01 00:33:22,N,1,145,112,1,1.73,7.5,0.5,0.5,1.50,0.0,,0.3,10.30,1,1,0.0
4,2,2019-09-01 00:43:23,2019-09-01 00:59:54,N,1,112,198,1,3.42,14.0,0.5,0.5,3.06,0.0,,0.3,18.36,1,1,0.0
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
99995,2,2019-09-08 21:34:31,2019-09-08 21:42:44,N,1,74,151,1,2.12,8.5,0.5,0.5,2.45,0.0,,0.3,12.25,1,1,0.0
99996,2,2019-09-08 21:06:54,2019-09-08 21:12:38,N,1,130,28,1,0.83,5.5,0.5,0.5,0.00,0.0,,0.3,6.80,2,1,0.0
99997,2,2019-09-08 21:22:10,2019-09-08 21:29:33,N,1,130,10,1,2.56,9.5,0.5,0.5,2.00,0.0,,0.3,12.80,1,1,0.0
99998,2,2019-09-08 21:33:42,2019-09-08 21:33:48,N,5,92,92,1,0.13,22.0,0.0,0.0,0.00,0.0,,0.0,22.00,1,2,0.0



This is a brand new dataframe, so we need to convert the time columns to timestamp format.

In [19]:
df.lpep_pickup_datetime = pd.to_datetime(df.lpep_pickup_datetime)
df.lpep_dropoff_datetime = pd.to_datetime(df.lpep_dropoff_datetime)
df

Unnamed: 0,VendorID,lpep_pickup_datetime,lpep_dropoff_datetime,store_and_fwd_flag,RatecodeID,PULocationID,DOLocationID,passenger_count,trip_distance,fare_amount,extra,mta_tax,tip_amount,tolls_amount,ehail_fee,improvement_surcharge,total_amount,payment_type,trip_type,congestion_surcharge
0,2,2019-09-01 00:10:53,2019-09-01 00:23:46,N,1,65,189,5,2.00,10.5,0.5,0.5,2.36,0.0,,0.3,14.16,1,1,0.0
1,2,2019-09-01 00:31:22,2019-09-01 00:44:37,N,1,97,225,5,3.20,12.0,0.5,0.5,0.00,0.0,,0.3,13.30,2,1,0.0
2,2,2019-09-01 00:50:24,2019-09-01 01:03:20,N,1,37,61,5,2.99,12.0,0.5,0.5,0.00,0.0,,0.3,13.30,2,1,0.0
3,2,2019-09-01 00:27:06,2019-09-01 00:33:22,N,1,145,112,1,1.73,7.5,0.5,0.5,1.50,0.0,,0.3,10.30,1,1,0.0
4,2,2019-09-01 00:43:23,2019-09-01 00:59:54,N,1,112,198,1,3.42,14.0,0.5,0.5,3.06,0.0,,0.3,18.36,1,1,0.0
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
99995,2,2019-09-08 21:34:31,2019-09-08 21:42:44,N,1,74,151,1,2.12,8.5,0.5,0.5,2.45,0.0,,0.3,12.25,1,1,0.0
99996,2,2019-09-08 21:06:54,2019-09-08 21:12:38,N,1,130,28,1,0.83,5.5,0.5,0.5,0.00,0.0,,0.3,6.80,2,1,0.0
99997,2,2019-09-08 21:22:10,2019-09-08 21:29:33,N,1,130,10,1,2.56,9.5,0.5,0.5,2.00,0.0,,0.3,12.80,1,1,0.0
99998,2,2019-09-08 21:33:42,2019-09-08 21:33:48,N,5,92,92,1,0.13,22.0,0.0,0.0,0.00,0.0,,0.0,22.00,1,2,0.0


We will now finally create the table in the database. With `df.head(n=0)` we can get the name of the columns only, without any additional data. We will use it to generate a SQL instruction to generate the table.

In [20]:
# we need to provide the table name, the connection and what to do if the table already exists
# we choose to replace everything in case you had already created something by accident before.
df.head(n=0).to_sql(name='green_tripdata_2019', con=engine, if_exists='replace')

0

You can now use `pgcli -h localhost -p 5432 -u root -d ny_taxi` on a separate terminal to look at the database:

* `\dt` for looking at available tables.
* `\d yellow_taxi_data` for describing the new table.

Let's include our current chunk to our database and time how long it takes.

In [21]:
%time df.to_sql(name='green_tripdata_2019', con=engine, if_exists='append')

PendingRollbackError: Can't reconnect until invalid transaction is rolled back. (Background on this error at: https://sqlalche.me/e/14/8s2b)

Back on the terminal running `pgcli`, we can check how many lines were to the database with:

```sql
SELECT count(1) FROM yellow_taxi_data;
```

You should see 100,000 lines.


Let's write a loop to write all chunks to the database. Use the terminal with `pgcli` to check the database after the code finishes running.

In [21]:
from time import time

while True: 
    try:
        t_start = time()
        df = next(df_iter)

        df.tpep_pickup_datetime = pd.to_datetime(df.tpep_pickup_datetime)
        df.tpep_dropoff_datetime = pd.to_datetime(df.tpep_dropoff_datetime)
        
        df.to_sql(name='yellow_taxi_data', con=engine, if_exists='append')

        t_end = time()

        print('inserted another chunk, took %.3f second' % (t_end - t_start))
    except StopIteration:
        print('completed')
        break

inserted another chunk, took 6.864 second
inserted another chunk, took 6.846 second
inserted another chunk, took 7.065 second
inserted another chunk, took 7.045 second
inserted another chunk, took 7.159 second
inserted another chunk, took 7.095 second
inserted another chunk, took 8.172 second
inserted another chunk, took 7.926 second
inserted another chunk, took 7.483 second
inserted another chunk, took 7.860 second
inserted another chunk, took 8.182 second
inserted another chunk, took 7.375 second
inserted another chunk, took 7.235 second
inserted another chunk, took 7.117 second
inserted another chunk, took 8.678 second
inserted another chunk, took 11.926 second
inserted another chunk, took 14.056 second
inserted another chunk, took 15.178 second
inserted another chunk, took 14.411 second
inserted another chunk, took 10.623 second
inserted another chunk, took 10.743 second
inserted another chunk, took 8.351 second
inserted another chunk, took 12.327 second
inserted another chunk, too

  df = next(df_iter)


inserted another chunk, took 8.200 second
inserted another chunk, took 4.994 second
completed


And that's it! Feel free to go back to the [notes](../notes/1_intro.md#inserting-data-to-postgres-with-python)