In [1]:
import pandas as pd

In [2]:
#Install sqlalchemy
!pip install sqlalchemy psycopg2-binary



In [3]:
from sqlalchemy import create_engine, MetaData

In [4]:
# Create Engine
engine = create_engine("postgresql://root:root@localhost:5432/ny-taxi")

In [5]:
# Test Connection
engine.connect()

<sqlalchemy.engine.base.Connection at 0x7ff3fe739ab0>

In [6]:
query = """
    SELECT 1;
"""

pd.read_sql(query, con=engine)

Unnamed: 0,?column?
0,1


## Read Taxi Dataset Using Pandas

In [7]:
# INSTALL DEPENDENCIES FOR "read_parquet"
!pip install pyarrow
!pip install fastparquet



In [8]:
data = pd.read_parquet("yellow_tripdata_2021-01.parquet")

print(data.shape)
print(data.info())
data.head()

(1369769, 19)
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 1369769 entries, 0 to 1369768
Data columns (total 19 columns):
 #   Column                 Non-Null Count    Dtype         
---  ------                 --------------    -----         
 0   VendorID               1369769 non-null  int64         
 1   tpep_pickup_datetime   1369769 non-null  datetime64[us]
 2   tpep_dropoff_datetime  1369769 non-null  datetime64[us]
 3   passenger_count        1271417 non-null  float64       
 4   trip_distance          1369769 non-null  float64       
 5   RatecodeID             1271417 non-null  float64       
 6   store_and_fwd_flag     1271417 non-null  object        
 7   PULocationID           1369769 non-null  int64         
 8   DOLocationID           1369769 non-null  int64         
 9   payment_type           1369769 non-null  int64         
 10  fare_amount            1369769 non-null  float64       
 11  extra                  1369769 non-null  float64       
 12  mta_tax       

Unnamed: 0,VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge,airport_fee
0,1,2021-01-01 00:30:10,2021-01-01 00:36:12,1.0,2.1,1.0,N,142,43,2,8.0,3.0,0.5,0.0,0.0,0.3,11.8,2.5,
1,1,2021-01-01 00:51:20,2021-01-01 00:52:19,1.0,0.2,1.0,N,238,151,2,3.0,0.5,0.5,0.0,0.0,0.3,4.3,0.0,
2,1,2021-01-01 00:43:30,2021-01-01 01:11:06,1.0,14.7,1.0,N,132,165,1,42.0,0.5,0.5,8.65,0.0,0.3,51.95,0.0,
3,1,2021-01-01 00:15:48,2021-01-01 00:31:01,0.0,10.6,1.0,N,138,132,1,29.0,0.5,0.5,6.05,0.0,0.3,36.35,0.0,
4,2,2021-01-01 00:31:49,2021-01-01 00:48:21,1.0,4.94,1.0,N,68,33,1,16.5,0.5,0.5,4.06,0.0,0.3,24.36,2.5,


In [9]:
print(pd.io.sql.get_schema(data, name="yellow_taxi_data", con=engine))


CREATE TABLE yellow_taxi_data (
	"VendorID" BIGINT, 
	tpep_pickup_datetime TIMESTAMP WITHOUT TIME ZONE, 
	tpep_dropoff_datetime TIMESTAMP WITHOUT TIME ZONE, 
	passenger_count FLOAT(53), 
	trip_distance FLOAT(53), 
	"RatecodeID" FLOAT(53), 
	store_and_fwd_flag TEXT, 
	"PULocationID" BIGINT, 
	"DOLocationID" BIGINT, 
	payment_type BIGINT, 
	fare_amount FLOAT(53), 
	extra FLOAT(53), 
	mta_tax FLOAT(53), 
	tip_amount FLOAT(53), 
	tolls_amount FLOAT(53), 
	improvement_surcharge FLOAT(53), 
	total_amount FLOAT(53), 
	congestion_surcharge FLOAT(53), 
	airport_fee FLOAT(53)
)




#### Since the size of the dataframe is big (over 1.3m records), we use iterators to read the entire dataframe in batches into the database. Therefore, we need to convert the dataset to csv to give us iteration capabilities.

In [10]:
# CONVERT THE DATASET TO CSV

# Write to CSV
data.to_csv('yellow_tripdata_2021-01.csv', index=False)

In [11]:
df_iter = pd.read_csv("yellow_tripdata_2021-01.csv", iterator=True, chunksize=100_000)
df_iter

<pandas.io.parsers.readers.TextFileReader at 0x7ff3fe692bf0>

**Note:** 'df_iter' is not a dataframe, it's an iterator.

In [12]:
df = next(df_iter)
print(df.info())
df.head()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 100000 entries, 0 to 99999
Data columns (total 19 columns):
 #   Column                 Non-Null Count   Dtype  
---  ------                 --------------   -----  
 0   VendorID               100000 non-null  int64  
 1   tpep_pickup_datetime   100000 non-null  object 
 2   tpep_dropoff_datetime  100000 non-null  object 
 3   passenger_count        100000 non-null  float64
 4   trip_distance          100000 non-null  float64
 5   RatecodeID             100000 non-null  float64
 6   store_and_fwd_flag     100000 non-null  object 
 7   PULocationID           100000 non-null  int64  
 8   DOLocationID           100000 non-null  int64  
 9   payment_type           100000 non-null  int64  
 10  fare_amount            100000 non-null  float64
 11  extra                  100000 non-null  float64
 12  mta_tax                100000 non-null  float64
 13  tip_amount             100000 non-null  float64
 14  tolls_amount           100000 non-nul

Unnamed: 0,VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge,airport_fee
0,1,2021-01-01 00:30:10,2021-01-01 00:36:12,1.0,2.1,1.0,N,142,43,2,8.0,3.0,0.5,0.0,0.0,0.3,11.8,2.5,
1,1,2021-01-01 00:51:20,2021-01-01 00:52:19,1.0,0.2,1.0,N,238,151,2,3.0,0.5,0.5,0.0,0.0,0.3,4.3,0.0,
2,1,2021-01-01 00:43:30,2021-01-01 01:11:06,1.0,14.7,1.0,N,132,165,1,42.0,0.5,0.5,8.65,0.0,0.3,51.95,0.0,
3,1,2021-01-01 00:15:48,2021-01-01 00:31:01,0.0,10.6,1.0,N,138,132,1,29.0,0.5,0.5,6.05,0.0,0.3,36.35,0.0,
4,2,2021-01-01 00:31:49,2021-01-01 00:48:21,1.0,4.94,1.0,N,68,33,1,16.5,0.5,0.5,4.06,0.0,0.3,24.36,2.5,


In [29]:
# Change date columns to datetime type
df.tpep_pickup_datetime = pd.to_datetime(df.tpep_pickup_datetime)
df.tpep_dropoff_datetime = pd.to_datetime(df.tpep_dropoff_datetime)

In [30]:
df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 100000 entries, 200000 to 299999
Data columns (total 19 columns):
 #   Column                 Non-Null Count   Dtype         
---  ------                 --------------   -----         
 0   VendorID               100000 non-null  int64         
 1   tpep_pickup_datetime   100000 non-null  datetime64[ns]
 2   tpep_dropoff_datetime  100000 non-null  datetime64[ns]
 3   passenger_count        100000 non-null  float64       
 4   trip_distance          100000 non-null  float64       
 5   RatecodeID             100000 non-null  float64       
 6   store_and_fwd_flag     100000 non-null  object        
 7   PULocationID           100000 non-null  int64         
 8   DOLocationID           100000 non-null  int64         
 9   payment_type           100000 non-null  int64         
 10  fare_amount            100000 non-null  float64       
 11  extra                  100000 non-null  float64       
 12  mta_tax                100000 non-null 

In [31]:
df.head(0)

Unnamed: 0,VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge,airport_fee


## Reading The Data Into PostgreSQL

In [32]:
# Insertin the column names into SQL
df.head(0).to_sql(name="yellow_taxi_data", con=engine, if_exists="replace")

0

In [33]:
# Inserting the records into the "yellow_taxi_data" table (this will read the first batch of 100,000 records)
%time df.to_sql(name="yellow_taxi_data", con=engine, if_exists="append")

CPU times: user 6.83 s, sys: 133 ms, total: 6.96 s
Wall time: 10.3 s


1000

Inserting the rest of the dataset (batches)

In [13]:
from time import time

while True:
    t_start = time()
    
    #initialize the dataframe
    df = next(df_iter)

    # Change date columns to datetime type
    df.tpep_pickup_datetime = pd.to_datetime(df.tpep_pickup_datetime)
    df.tpep_dropoff_datetime = pd.to_datetime(df.tpep_dropoff_datetime)
 
    # read data into database
    df.to_sql(name="yellow_taxi_data", con=engine, if_exists="append")

    t_end = time()

    print(f"Inserted another chunk, took {round(t_end - t_start, 3)}")

Inserted another chunk, took (10, 3)
Inserted another chunk, took (10, 3)
Inserted another chunk, took (10, 3)
Inserted another chunk, took (10, 3)
Inserted another chunk, took (10, 3)
Inserted another chunk, took (10, 3)


  df.tpep_dropoff_datetime = pd.to_datetime(df.tpep_dropoff_datetime)


Inserted another chunk, took (10, 3)
Inserted another chunk, took (11, 3)
Inserted another chunk, took (10, 3)
Inserted another chunk, took (10, 3)
Inserted another chunk, took (10, 3)


  df = next(df_iter)


Inserted another chunk, took (10, 3)
Inserted another chunk, took (6, 3)


StopIteration: 