Make sure that your Python env has `pandas` and `sqlalchemy` installed. I also had to install `psycopg2` manually.

In [None]:
# run this in terminal on your working environment
# pip install sqlalchemy
# pip install psycopg2

In [None]:
import pandas as pd
from sqlalchemy import create_engine
from time import time

pd.__version__

The CSV file is very big and Pandas may not be able to handle it properly if the whole thing doesn't fit in RAM. We will only import 100 rows for now using `nrows`

> Note that pd.read_csv can also read compressed files, if we specify the compression type using `compression='gzip'` for `.gz`

In [None]:
source = 'https://github.com/DataTalksClub/nyc-tlc-data/releases/download/yellow/yellow_tripdata_2021-01.csv.gz'

df = pd.read_csv(source, nrows=100, compression='gzip')
df.head()

We can view the ***schema*** for the database using a dedicated command in pandas. The _schema_ is the structure of the database; in this case it describes the columns of our table. Pandas can output the schema using the SQL ***DDL*** (Data definition language) instructions necessary to create the schema, using `pd.io.sql.get_schema()`

> Note: We don't need to copy/save this DDL command, because python use it automatically behind-the-scenes.

> Note that running this will only output the instructions, it hasn't actually created the table in the database yet.

In [None]:
# We need to provide a name for the table; we will use 'yellow_taxi_data'
print(pd.io.sql.get_schema(df, name='yellow_taxi_data'))

This step can be used for validation of the auto-generated schema.

For example, note that `tpep_pickup_datetime` and `tpep_dropoff_datetime` are text fields even though they should be timestamps.

Let's change that using `pd.to_datetime`

In [None]:
df.tpep_pickup_datetime = pd.to_datetime(df.tpep_pickup_datetime)
df.tpep_dropoff_datetime = pd.to_datetime(df.tpep_dropoff_datetime)
print(pd.io.sql.get_schema(df, name='yellow_taxi_data'))

Even though we have the DDL instructions, we still need specific instructions for Postgres to connect to it and create the table. We will use `sqlalchemy` for this.

Moreover, note that SQL dialects vary, and currently, the schema we see above is not compliant with PostgreSQL...

To create a database ***engine***, we must specify the database details in a URI. The structure of the URI is:

`database://user:password@host:port/database_name`

In [None]:
engine = create_engine('postgresql://root:root@localhost:5432/ny_taxi')

In [None]:
# run this cell when the Postgres Docker container is running
engine.connect()

By adding the parameter `con=engine` to our previous `pd.io.sql.get_schema()` command, we can get a DDL statement that is aligned to the SQL Dialect of the specified DB engine (PostgreSQL)

In [None]:
# we can now use our engine to get the specific output for Postgres
print(pd.io.sql.get_schema(df, name='yellow_taxi_data', con=engine))

We will now modify `pd.read_csv()` to create an ***iterator*** that will allow us to read the CSV file in chunks of 100,000 and send them to the database. Otherwise, if we read the whole file at once, we may run into problems trying to send too much data at once.

In [None]:
df_iter = pd.read_csv(source, iterator=True, chunksize=100000, compression='gzip')
df_iter

We can use the `next()` function to get the chunks using the iterator.

In [None]:
df = next(df_iter)
df

Recall that we have generate a new `iterator` object (`dt_iter`) from the original `dataframe`, and are now iterating through the `iterator` object to generate a `dataframe` with the next batch of rows in `dt`.

This means that, every time we iterate to the next chunk, we will generate a brand new dataframe, so we need to re-apply any data transformatinos, including data type conversions.

> Note this is more straightforward than applying data transformations to the `iterator` object as a whole, which is not a `dataframe` object

In [None]:
df.tpep_pickup_datetime = pd.to_datetime(df.tpep_pickup_datetime)
df.tpep_dropoff_datetime = pd.to_datetime(df.tpep_dropoff_datetime)
df

We will now finally create the table in the database.

With `df.head(n=0)` we can get the name of the columns only, without any additional data. We will use it to generate the first SQL DDL instruction which will create an empty table with the schema we want, ready for actual data ingestion.

Previously, we used the function `pd.io.sql.get_schema` to view the DDL statement in SQL that would generate a SQL table based on the available pandas dataframe (i.e. `CREATE TABLE` etc.). However,  we will now excute `pd.to_sql() ` which will generate the DDL statement behind-the-scenes + execute it simultaneously (two-in-one). 

Note that we use `head(n=0)` to write on the column names only, and we use the argument `if_exists = ‘replace’` so that we replace any existing tables if they had been created already.

In [None]:
# we need to provide the table name, the connection and what to do if the table already exists
# we choose to replace everything in case you had already created something by accident before.
df.head(n=0).to_sql(name='yellow_taxi_data', con=engine, if_exists='replace')

We can now navigate to the table in PGAdmin, and observe how we have a new table, with the required columns and schema, but no rows of data yet.

The database is ready for ingestion!

![image.png](attachment:image.png)

Let's include our current chunk to our database and time how long it takes. At the time of writing this scrypt, it took approx. 6.3 seconds to ingest the first chunk.

In [None]:
%time df.to_sql(name='yellow_taxi_data', con=engine, if_exists='append')

Back on PGAdmin, we can now see there 100,000 rows of data in the table.

![image.png](attachment:image.png)

Let's write a loop to write all chunks to the database.

In [None]:
# when the last chunk is reached, the next iteration will throw an error `StopIteration`, which will signal completion of the loop and terminate our script.
# there are many cleaner ways of implementing this

while True: 
    try:
        t_start = time()
        df = next(df_iter)

        df.tpep_pickup_datetime = pd.to_datetime(df.tpep_pickup_datetime)
        df.tpep_dropoff_datetime = pd.to_datetime(df.tpep_dropoff_datetime)
        
        df.to_sql(name='yellow_taxi_data', con=engine, if_exists='append')

        t_end = time()

        print('inserted another chunk, took %.3f second' % (t_end - t_start))
    except StopIteration:
        print('completed')
        break

Use PGAdmin after finishing to check the database after the code finishes running.

There should be 1,369,765 rows.

```SQL
SELECT
    COUNT(*)
FROM
    yellow-taxi-data;
```

![image-2.png](attachment:image-2.png)

Alternatively, we can interface with the DB through Python, using SQLAlchemy

In [None]:
# check which tables exist, with reference to PostgresDB's `pg_tables`, a 'system catalogue view' listing all the available tables in the current database.
# Ref: https://www.postgresqltutorial.com/postgresql-administration/postgresql-show-tables/

query = """
SELECT *
FROM pg_catalog.pg_tables
WHERE
    schemaname != 'pg_catalog' AND
    schemaname != 'information_schema'
"""

pd.read_sql(query, con=engine)

In [None]:
# check contents of the table `yellow_taxi_data`

query = """
SELECT *
FROM yellow_taxi_data
LIMIT 100
"""

pd.read_sql(query, con=engine)

And that's it!