## Download CSV data from a remote server and Ingest the data into a Postgres database
#### Plan:
    - download the data using wget
    - load the csv data into a dataframe and do needed transformations (e.g data type changes)
    - connect to the existing postgres db
    - generate a postgres table schema from the dataframe, using pandas io method
    - start the ingestion into postgres db by first inserting the table columns
    - then iteratively insert chunks of the csv data into the database - be sure the columns are in the right format
    - *Convert the ipynb file to python script. the script will be dockerized


In [2]:
#import libraries
import pandas as pd
from sqlalchemy import create_engine

In [2]:
pd.__version__

'2.0.3'

In [43]:
# !pip install psycopg2-binary

In [None]:
# download the csv data
# !wget https://github.com/DataTalksClub/nyc-tlc-data/releases/download/yellow/yellow_tripdata_2021-01.csv.gz

In [3]:
# load the data - only the first 100 rows for now, as the data is quite large
df = pd.read_csv('yellow_tripdata_2021-01.csv', nrows=100) 
df.head(n=2)

Unnamed: 0,VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge
0,1,2021-01-01 00:30:10,2021-01-01 00:36:12,1,2.1,1,N,142,43,2,8.0,3.0,0.5,0.0,0.0,0.3,11.8,2.5
1,1,2021-01-01 00:51:20,2021-01-01 00:52:19,1,0.2,1,N,238,151,2,3.0,0.5,0.5,0.0,0.0,0.3,4.3,0.0


In [4]:
# check the column data types
df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 100 entries, 0 to 99
Data columns (total 18 columns):
 #   Column                 Non-Null Count  Dtype  
---  ------                 --------------  -----  
 0   VendorID               100 non-null    int64  
 1   tpep_pickup_datetime   100 non-null    object 
 2   tpep_dropoff_datetime  100 non-null    object 
 3   passenger_count        100 non-null    int64  
 4   trip_distance          100 non-null    float64
 5   RatecodeID             100 non-null    int64  
 6   store_and_fwd_flag     100 non-null    object 
 7   PULocationID           100 non-null    int64  
 8   DOLocationID           100 non-null    int64  
 9   payment_type           100 non-null    int64  
 10  fare_amount            100 non-null    float64
 11  extra                  100 non-null    float64
 12  mta_tax                100 non-null    float64
 13  tip_amount             100 non-null    float64
 14  tolls_amount           100 non-null    float64
 15  improve

In [5]:
# convert date columns to the datetime data type
df.tpep_dropoff_datetime = pd.to_datetime(df.tpep_dropoff_datetime)
df.tpep_pickup_datetime = pd.to_datetime(df.tpep_pickup_datetime)

In [3]:
# create a connection to an existing postgres database
engine = create_engine('postgresql://root:root@localhost:5432/ny_taxi')

# check if the connection is successful
engine.connect()

<sqlalchemy.engine.base.Connection at 0x11f301faa90>

In [7]:
# check for existing tables, it is the equivalent of \dt used in pgcli
query = """
SELECT *
FROM pg_catalog.pg_tables
WHERE schemaname != 'pg_catalog' AND
    schemaname != 'information_schema';
"""

pd.read_sql(query, con=engine)

Unnamed: 0,schemaname,tablename,tableowner,tablespace,hasindexes,hasrules,hastriggers,rowsecurity


In [8]:
# insert a table to the DB
df.to_sql(name='yellow_taxi_data', con=engine, index=False, if_exists='replace')

100

In [9]:
# check again for existing tables. you could as well just check this on the CLI
query = """
SELECT *
FROM pg_catalog.pg_tables
WHERE schemaname != 'pg_catalog' AND
    schemaname != 'information_schema';
"""

pd.read_sql(query, con=engine)

Unnamed: 0,schemaname,tablename,tableowner,tablespace,hasindexes,hasrules,hastriggers,rowsecurity
0,public,yellow_taxi_data,root,,False,False,False,False


In [10]:
query = """
SELECT * FROM yellow_taxi_data LIMIT 3;
"""

pd.read_sql(query, con=engine)

Unnamed: 0,VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge
0,1,2021-01-01 00:30:10,2021-01-01 00:36:12,1,2.1,1,N,142,43,2,8.0,3.0,0.5,0.0,0.0,0.3,11.8,2.5
1,1,2021-01-01 00:51:20,2021-01-01 00:52:19,1,0.2,1,N,238,151,2,3.0,0.5,0.5,0.0,0.0,0.3,4.3,0.0
2,1,2021-01-01 00:43:30,2021-01-01 01:11:06,1,14.7,1,N,132,165,1,42.0,0.5,0.5,8.65,0.0,0.3,51.95,0.0


In [None]:
# query = """
# DROP TABLE yellow_taxi_data;
# """

# pd.read_sql(query, con=engine)

In [11]:
# generate a data-definition language(DDL) from the dataframe
# DDL describes the table schema, i.e how the table should look like in SQL, including column name and their data types
print(pd.io.sql.get_schema(df, name='yellow_taxi_data', con=engine))


CREATE TABLE yellow_taxi_data (
	"VendorID" BIGINT, 
	tpep_pickup_datetime TIMESTAMP WITHOUT TIME ZONE, 
	tpep_dropoff_datetime TIMESTAMP WITHOUT TIME ZONE, 
	passenger_count BIGINT, 
	trip_distance FLOAT(53), 
	"RatecodeID" BIGINT, 
	store_and_fwd_flag TEXT, 
	"PULocationID" BIGINT, 
	"DOLocationID" BIGINT, 
	payment_type BIGINT, 
	fare_amount FLOAT(53), 
	extra FLOAT(53), 
	mta_tax FLOAT(53), 
	tip_amount FLOAT(53), 
	tolls_amount FLOAT(53), 
	improvement_surcharge FLOAT(53), 
	total_amount FLOAT(53), 
	congestion_surcharge FLOAT(53)
)




In [12]:
# create a table and first insert all the columns. if the table already exists, replace it.
df.head(n=0).to_sql(con=engine, name='yellow_taxi_data', if_exists='replace')

0

In [13]:
# append new chunk of rows to the table in the database
%time df.to_sql(con=engine, name='yellow_taxi_data', if_exists='append')

CPU times: total: 31.2 ms
Wall time: 2.53 s


100

In [15]:
# batch-load the data into the database. here, we load 100000 rows at any one time rather than the entire 1million+ rows at once
df_iter = pd.read_csv('yellow_tripdata_2021-01.csv', iterator=True, chunksize=100000) 

In [16]:
# next is a python function that returns the next element in an iterator. run multiple times to see
next(df_iter)

100000

In [None]:
# SUMMARY code to put data into postgres via pandas:
# !wget http://...
# engine = create_engine('postgres://root:root@localhost/5432/ny_taxi')
# engine.connect()
# df = pd.read_csv('yellow_tripdata_2021-01.csv')
# #do needed transformations or cleaning here
# df.to_sql(name='yellow_taxi_data', con=engine, if_exists=replace)

### Putting it all together

In [19]:
from time import time

In [None]:
df = next(df_iter)   # needed to generate the table headers as df_iter can't be used since it is an iterator and has not module 'head'

In [28]:
# create a table and first insert all the columns. if the table already exists, replace it.
df.head(n=0).to_sql(con=engine, name='yellow_taxi_data', if_exists='replace')

0

In [29]:
# putting everything together
# batch-load the data into the database in chunks of 100000 rows
df_iter = pd.read_csv('yellow_tripdata_2021-01.csv', iterator=True, chunksize=100000) 

while True:
    try:
        time_start = time()

        df = next(df_iter) # load the next chunk into the dataframe

        df.tpep_dropoff_datetime = pd.to_datetime(df.tpep_dropoff_datetime)  # convert the dates columnn to datetime format
        df.tpep_pickup_datetime = pd.to_datetime(df.tpep_pickup_datetime)

        df.to_sql(name='yellow_taxi_data', con=engine, if_exists='append') # append the chunk to the existing table

        time_end = time()

        print("inserted another chunk... took %.3f seconds" % (time_end - time_start))
    except StopIteration:
        print("Finished ingesting data into the postgres database")
        break

inserted another chunk... took 90.901 seconds
inserted another chunk... took 89.478 seconds
inserted another chunk... took 98.295 seconds
inserted another chunk... took 897.157 seconds
inserted another chunk... took 23.350 seconds
inserted another chunk... took 47.924 seconds
inserted another chunk... took 28.776 seconds
inserted another chunk... took 36.157 seconds
inserted another chunk... took 24.116 seconds
inserted another chunk... took 39.065 seconds
inserted another chunk... took 30.772 seconds
inserted another chunk... took 37.902 seconds


  df = next(df_iter) # load the next chunk into the dataframe


inserted another chunk... took 27.919 seconds
inserted another chunk... took 12.673 seconds


In [34]:
len(pd.read_csv('yellow_tripdata_2021-01.csv'))

  len(pd.read_csv('yellow_tripdata_2021-01.csv'))


1369765

In [None]:
# then convert the ipynb file to python script to be dockerized
# !jupyter nbconvert --to=script upload.ipynb