This jupyter notebook was executed with Windows using Anaconda for Python Management.

Please make sure that the Python Environment has `pandas`, `psycogp2` and `sqlalchemy` installed properly.

In [1]:
import pandas as pd
pd.__version__

'2.2.2'

The CSV file that we will import is very big and Pandas may not be able to handle it. For now, we will only import 100 rows.

You can see the rows of the `yellow_tripdata_2021-01.csv` using the bash terminal. Please see the command below.

`wc -l yellow_tripdata_2021-01.csv`

Expected Output: **1369766**

In [2]:
df = pd.read_csv('yellow_tripdata_2021-01.csv', nrows=100)
df

Unnamed: 0,VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge
0,1,2021-01-01 00:30:10,2021-01-01 00:36:12,1,2.10,1,N,142,43,2,8.0,3.0,0.5,0.00,0.0,0.3,11.80,2.5
1,1,2021-01-01 00:51:20,2021-01-01 00:52:19,1,0.20,1,N,238,151,2,3.0,0.5,0.5,0.00,0.0,0.3,4.30,0.0
2,1,2021-01-01 00:43:30,2021-01-01 01:11:06,1,14.70,1,N,132,165,1,42.0,0.5,0.5,8.65,0.0,0.3,51.95,0.0
3,1,2021-01-01 00:15:48,2021-01-01 00:31:01,0,10.60,1,N,138,132,1,29.0,0.5,0.5,6.05,0.0,0.3,36.35,0.0
4,2,2021-01-01 00:31:49,2021-01-01 00:48:21,1,4.94,1,N,68,33,1,16.5,0.5,0.5,4.06,0.0,0.3,24.36,2.5
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
95,2,2021-01-01 00:12:41,2021-01-01 00:26:47,1,4.13,1,N,161,226,1,14.5,0.5,0.5,3.66,0.0,0.3,21.96,2.5
96,2,2021-01-01 00:23:29,2021-01-01 00:35:03,2,4.12,1,N,162,74,2,13.5,0.5,0.5,0.00,0.0,0.3,17.30,2.5
97,2,2021-01-01 00:46:17,2021-01-01 00:54:25,2,2.22,1,N,144,170,1,9.0,0.5,0.5,2.56,0.0,0.3,15.36,2.5
98,2,2021-01-01 00:28:16,2021-01-01 00:51:44,1,7.11,1,N,264,264,2,23.5,0.5,0.5,0.00,0.0,0.3,24.80,0.0


In [3]:
# To get the schema of the df, we will be using this command below.
# We will also need to provide a name for the table, we can use yellow_taxi_data.
pd.io.sql.get_schema(df, name='yellow_taxi_data')

'CREATE TABLE "yellow_taxi_data" (\n"VendorID" INTEGER,\n  "tpep_pickup_datetime" TEXT,\n  "tpep_dropoff_datetime" TEXT,\n  "passenger_count" INTEGER,\n  "trip_distance" REAL,\n  "RatecodeID" INTEGER,\n  "store_and_fwd_flag" TEXT,\n  "PULocationID" INTEGER,\n  "DOLocationID" INTEGER,\n  "payment_type" INTEGER,\n  "fare_amount" REAL,\n  "extra" REAL,\n  "mta_tax" REAL,\n  "tip_amount" REAL,\n  "tolls_amount" REAL,\n  "improvement_surcharge" REAL,\n  "total_amount" REAL,\n  "congestion_surcharge" REAL\n)'

In [4]:
# Let's print the get_schema to make it more presentable.
print(pd.io.sql.get_schema(df, name='yellow_taxi_data'))

CREATE TABLE "yellow_taxi_data" (
"VendorID" INTEGER,
  "tpep_pickup_datetime" TEXT,
  "tpep_dropoff_datetime" TEXT,
  "passenger_count" INTEGER,
  "trip_distance" REAL,
  "RatecodeID" INTEGER,
  "store_and_fwd_flag" TEXT,
  "PULocationID" INTEGER,
  "DOLocationID" INTEGER,
  "payment_type" INTEGER,
  "fare_amount" REAL,
  "extra" REAL,
  "mta_tax" REAL,
  "tip_amount" REAL,
  "tolls_amount" REAL,
  "improvement_surcharge" REAL,
  "total_amount" REAL,
  "congestion_surcharge" REAL
)


Take note that the following shouldn't be TEXT as its data type but rather a date / timestamp;

`tpep_pickup_datetime` and `tpep_dropoff_datetime`

In order to do that, we will do is parsing them with pandas. 

In [5]:
# After we finished parsing the columns as timestamps, we must wrote these back to the dataframe.
df.tpep_pickup_datetime = pd.to_datetime(df.tpep_pickup_datetime)
df.tpep_dropoff_datetime = pd.to_datetime(df.tpep_dropoff_datetime)
print(pd.io.sql.get_schema(df, name='yellow_taxi_data'))

CREATE TABLE "yellow_taxi_data" (
"VendorID" INTEGER,
  "tpep_pickup_datetime" TIMESTAMP,
  "tpep_dropoff_datetime" TIMESTAMP,
  "passenger_count" INTEGER,
  "trip_distance" REAL,
  "RatecodeID" INTEGER,
  "store_and_fwd_flag" TEXT,
  "PULocationID" INTEGER,
  "DOLocationID" INTEGER,
  "payment_type" INTEGER,
  "fare_amount" REAL,
  "extra" REAL,
  "mta_tax" REAL,
  "tip_amount" REAL,
  "tolls_amount" REAL,
  "improvement_surcharge" REAL,
  "total_amount" REAL,
  "congestion_surcharge" REAL
)


Note: By doing this, we aren't actually created a table in the database.

Even after we have use the DDL instruction, we still need to create a connection to Postgres in order to generate the statement that is specifically for PostgresQL. We will be using sqlalchemy to do this.

In [6]:
from sqlalchemy import create_engine

In [7]:
# In case that we encounter error "NoModuleFoundError: No module named 'psycopg2'
# Try installing this command below to solve the issue
!pip install psycopg2-binary



In [8]:
engine = create_engine('postgresql://root:root@localhost:5432/ny_taxi')

In [9]:
# Please make sure that the container in Docker is running in order to establish a connection
engine.connect()

<sqlalchemy.engine.base.Connection at 0x26ce83b9400>

In [10]:
# We can now use our engine to get the specific output for Postgres
print(pd.io.sql.get_schema(df, name='yellow_taxi_data', con=engine))


CREATE TABLE yellow_taxi_data (
	"VendorID" BIGINT, 
	tpep_pickup_datetime TIMESTAMP WITHOUT TIME ZONE, 
	tpep_dropoff_datetime TIMESTAMP WITHOUT TIME ZONE, 
	passenger_count BIGINT, 
	trip_distance FLOAT(53), 
	"RatecodeID" BIGINT, 
	store_and_fwd_flag TEXT, 
	"PULocationID" BIGINT, 
	"DOLocationID" BIGINT, 
	payment_type BIGINT, 
	fare_amount FLOAT(53), 
	extra FLOAT(53), 
	mta_tax FLOAT(53), 
	tip_amount FLOAT(53), 
	tolls_amount FLOAT(53), 
	improvement_surcharge FLOAT(53), 
	total_amount FLOAT(53), 
	congestion_surcharge FLOAT(53)
)




The pandas will execute the statement above when we try to create this table.

Since our database has millions of row, we will need to chunk the csv file into smaller batches.
In that way we won't be having any problem to load the datasets.

We will use **iterator** from pandas; iterators allow us to chunk the csv file into smaller dataframes.

In [11]:
df_iter = pd.read_csv('yellow_tripdata_2021-01.csv', iterator=True, chunksize=100000, engine='python')
df_iter

<pandas.io.parsers.readers.TextFileReader at 0x26ce88b0d40>

Please note: I have added the `engine='python'` since I have encounter an error when appending the chunk of the database to Postgres environment (***DtypeWarning: Columns (6) have mixed types. Specify dtype option on import or set low_memory=False***). Adding the `low_memory=False` is not the best option, the warning may disappear but the data type inconsitencies remain and potentially leading to downstream issues if not addressed explicitly.
[See the reference](https://stackoverflow.com/questions/24251219/pandas-read-csv-low-memory-and-dtype-options)

It allows to specify which parser engine to use. It may be slower than the default engine but can handle more complex cases; unusual delimiters or qoute characters.

In [12]:
# The `next` function is to return the function to the next element in an iterator
df = next(df_iter)
df

Unnamed: 0,VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge
0,1,2021-01-01 00:30:10,2021-01-01 00:36:12,1,2.10,1,N,142,43,2,8.0,3.0,0.5,0.00,0.0,0.3,11.80,2.5
1,1,2021-01-01 00:51:20,2021-01-01 00:52:19,1,0.20,1,N,238,151,2,3.0,0.5,0.5,0.00,0.0,0.3,4.30,0.0
2,1,2021-01-01 00:43:30,2021-01-01 01:11:06,1,14.70,1,N,132,165,1,42.0,0.5,0.5,8.65,0.0,0.3,51.95,0.0
3,1,2021-01-01 00:15:48,2021-01-01 00:31:01,0,10.60,1,N,138,132,1,29.0,0.5,0.5,6.05,0.0,0.3,36.35,0.0
4,2,2021-01-01 00:31:49,2021-01-01 00:48:21,1,4.94,1,N,68,33,1,16.5,0.5,0.5,4.06,0.0,0.3,24.36,2.5
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
99995,1,2021-01-04 14:04:31,2021-01-04 14:08:52,3,0.70,1,N,234,224,2,5.0,2.5,0.5,0.00,0.0,0.3,8.30,2.5
99996,1,2021-01-04 14:18:46,2021-01-04 14:35:45,2,3.30,1,N,234,236,1,14.5,2.5,0.5,3.55,0.0,0.3,21.35,2.5
99997,1,2021-01-04 14:42:41,2021-01-04 14:59:22,2,4.70,1,N,236,79,1,17.0,2.5,0.5,4.05,0.0,0.3,24.35,2.5
99998,2,2021-01-04 14:39:02,2021-01-04 15:09:37,2,17.95,2,N,132,148,1,52.0,0.0,0.5,5.00,0.0,0.3,60.30,2.5


This is a new dataframe using iterator and chunksize = 100000 rows.

In [13]:
# Since we have a new dataframe, we will need to change the data type of datetime columns into timestamp format.
df.tpep_pickup_datetime = pd.to_datetime(df.tpep_pickup_datetime)
df.tpep_dropoff_datetime = pd.to_datetime(df.tpep_dropoff_datetime)
df

Unnamed: 0,VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge
0,1,2021-01-01 00:30:10,2021-01-01 00:36:12,1,2.10,1,N,142,43,2,8.0,3.0,0.5,0.00,0.0,0.3,11.80,2.5
1,1,2021-01-01 00:51:20,2021-01-01 00:52:19,1,0.20,1,N,238,151,2,3.0,0.5,0.5,0.00,0.0,0.3,4.30,0.0
2,1,2021-01-01 00:43:30,2021-01-01 01:11:06,1,14.70,1,N,132,165,1,42.0,0.5,0.5,8.65,0.0,0.3,51.95,0.0
3,1,2021-01-01 00:15:48,2021-01-01 00:31:01,0,10.60,1,N,138,132,1,29.0,0.5,0.5,6.05,0.0,0.3,36.35,0.0
4,2,2021-01-01 00:31:49,2021-01-01 00:48:21,1,4.94,1,N,68,33,1,16.5,0.5,0.5,4.06,0.0,0.3,24.36,2.5
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
99995,1,2021-01-04 14:04:31,2021-01-04 14:08:52,3,0.70,1,N,234,224,2,5.0,2.5,0.5,0.00,0.0,0.3,8.30,2.5
99996,1,2021-01-04 14:18:46,2021-01-04 14:35:45,2,3.30,1,N,234,236,1,14.5,2.5,0.5,3.55,0.0,0.3,21.35,2.5
99997,1,2021-01-04 14:42:41,2021-01-04 14:59:22,2,4.70,1,N,236,79,1,17.0,2.5,0.5,4.05,0.0,0.3,24.35,2.5
99998,2,2021-01-04 14:39:02,2021-01-04 15:09:37,2,17.95,2,N,132,148,1,52.0,0.0,0.5,5.00,0.0,0.3,60.30,2.5


Now, we will create a table in the database using df.head function.

We will execute the df.head(n=0) in order to get the name on the columns only excluding the data.

In [14]:
df.head(n=0)

Unnamed: 0,VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge


In [15]:
# We will use SQL statemet to insert the generated table above chunk by chunk.
# If the table already exists, it will replace everything in case that we aldready created it before.
df.head(n=0).to_sql(name='yellow_taxi_data', con=engine, if_exists='replace')

0

We have established a connection to the Postgres.

We can now use `winpty pgcli -h localhost -p 5432 -u root -d ny_taxi` to separate bash terminal to look at the database:
- `\dt` command to look for available tables
- `\d yellow_taxi_data` command for describing the table


In [16]:
%time df.to_sql(name='yellow_taxi_data', con=engine, if_exists='append')

CPU times: total: 5.05 s
Wall time: 8.85 s


1000

Going back to the bash terminal, run the pgcli and check how many lines that we have on our Postgres database using

```sql
SELECT COUNT(1) FROM yellow_taxi_data;
```

Expected output must be 100,000 rows.

In [17]:
# Let's import time to actually track how many seconds it took to execute a iteration.
from time import time

# We will use the while-loop to insert all of the chunk of the database to our Postgres.
# You may use the bash terminal with pgcli to check the database after the code was executed successfully.
while True:
    try:
        t_start = time()
        
        df = next(df_iter)
        
        df.tpep_pickup_datetime = pd.to_datetime(df.tpep_pickup_datetime)
        df.tpep_dropoff_datetime = pd.to_datetime(df.tpep_dropoff_datetime)
        
        df.to_sql(name='yellow_taxi_data', con=engine, if_exists='append')
    
        t_end = time()
        
        print('Inserted another chunk... It took %.3f seconds ' % (t_end - t_start))
            
    except StopIteration:
        print('[Iteration: Chunk by chunk] Completed')
        break 


Inserted another chunk... It took 10.068 seconds 
Inserted another chunk... It took 10.474 seconds 
Inserted another chunk... It took 10.556 seconds 
Inserted another chunk... It took 10.075 seconds 
Inserted another chunk... It took 9.795 seconds 
Inserted another chunk... It took 10.655 seconds 
Inserted another chunk... It took 10.376 seconds 
Inserted another chunk... It took 9.947 seconds 
Inserted another chunk... It took 10.156 seconds 
Inserted another chunk... It took 9.811 seconds 
Inserted another chunk... It took 10.066 seconds 
Inserted another chunk... It took 10.647 seconds 
Inserted another chunk... It took 7.351 seconds 
[Iteration: Chunk by chunk] Completed


We have successfully inserted all of the data to the Postgres container. You may go [back](https://github.com/dani-gallego/data-engg-zoomcamp/blob/main/notes/01-introduction.md) to continue.


> **NOTE**: Please do not pay attention below, you have to finish the notes up until lesson - [Running Postgres and pgAdmin with Docker-compose](https://github.com/dani-gallego/data-engg-zoomcamp/blob/main/notes/01-introduction.md#running-postgres-and-pgadmin-with-docker-compose).
---

This is the continuation for [SQL Refresher](https://www.youtube.com/watch?v=QEcps_iskgg&list=PL3MmuxUbc_hJed7dXYoJw8DoCuVHhGEQb&index=11).

In this section, we will have to add another table called `zones` in order to follow the instructions [here](https://www.youtube.com/watch?v=QEcps_iskgg&list=PL3MmuxUbc_hJed7dXYoJw8DoCuVHhGEQb&index=11).

Here is the website link for [NYC TLC dataset](https://www.nyc.gov/site/tlc/about/tlc-trip-record-data.page) so you can copy the link address for `Taxi Zone Lookup Table`.

In [28]:
# Install the wget library
!pip install wget
# We have to use wget to get the url for taxi_zone_lookup
!wget https://d37ci6vzurychx.cloudfront.net/misc/taxi_zone_lookup.csv



'wget' is not recognized as an internal or external command,
operable program or batch file.


If you encounter the same issue above, we can use [requests library](https://pypi.org/project/requests) just like what we did in `ingest-data`. Otherwise, you can skip this optional method for requesting the url and downloaded it locally.

In [18]:
import requests

# Copy the link address
URL = "https://d37ci6vzurychx.cloudfront.net/misc/taxi_zone_lookup.csv"
response = requests.get(URL)

with open("taxi_zone_lookup.csv", "wb") as file:
    file.write(response.content)

# Print if file downloaded successfully
print("File downloded successfully")

File downloded successfully


In [19]:
# Let's store the downloaded file into our dataframe called df_zones

df_zones = pd.read_csv('taxi_zone_lookup.csv')

In [20]:
df_zones.head()

Unnamed: 0,LocationID,Borough,Zone,service_zone
0,1,EWR,Newark Airport,EWR
1,2,Queens,Jamaica Bay,Boro Zone
2,3,Bronx,Allerton/Pelham Gardens,Boro Zone
3,4,Manhattan,Alphabet City,Yellow Zone
4,5,Staten Island,Arden Heights,Boro Zone


In [21]:
df_zones.to_sql(name='zones', con=engine, if_exists='replace')

265