In [1]:
import pandas as pd

In [2]:
pd.__version__

'1.5.2'

# Read data

In [12]:
df = pd.read_csv("yellow_tripdata_2021-01.csv", nrows = 100)

In [13]:
df.head()

Unnamed: 0,VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge
0,1,2021-01-01 00:30:10,2021-01-01 00:36:12,1,2.1,1,N,142,43,2,8.0,3.0,0.5,0.0,0.0,0.3,11.8,2.5
1,1,2021-01-01 00:51:20,2021-01-01 00:52:19,1,0.2,1,N,238,151,2,3.0,0.5,0.5,0.0,0.0,0.3,4.3,0.0
2,1,2021-01-01 00:43:30,2021-01-01 01:11:06,1,14.7,1,N,132,165,1,42.0,0.5,0.5,8.65,0.0,0.3,51.95,0.0
3,1,2021-01-01 00:15:48,2021-01-01 00:31:01,0,10.6,1,N,138,132,1,29.0,0.5,0.5,6.05,0.0,0.3,36.35,0.0
4,2,2021-01-01 00:31:49,2021-01-01 00:48:21,1,4.94,1,N,68,33,1,16.5,0.5,0.5,4.06,0.0,0.3,24.36,2.5


In [32]:
print(pd.io.sql.get_schema(df, name = "yellow_taxi_data")) # to generate a schema from a dataframe https://github.com/pandas-dev/pandas/issues/9960
# print statment in DDL (Data Definition Language) - describes the portion of SQL that creates, alters, and deletes database objects


CREATE TABLE "yellow_taxi_data" (
"VendorID" INTEGER,
  "tpep_pickup_datetime" TIMESTAMP,
  "tpep_dropoff_datetime" TIMESTAMP,
  "passenger_count" INTEGER,
  "trip_distance" REAL,
  "RatecodeID" INTEGER,
  "store_and_fwd_flag" TEXT,
  "PULocationID" INTEGER,
  "DOLocationID" INTEGER,
  "payment_type" INTEGER,
  "fare_amount" REAL,
  "extra" REAL,
  "mta_tax" REAL,
  "tip_amount" REAL,
  "tolls_amount" REAL,
  "improvement_surcharge" REAL,
  "total_amount" REAL,
  "congestion_surcharge" REAL
)


## Convert dates to datetime

In [17]:
df[["tpep_pickup_datetime", "tpep_dropoff_datetime"]] = df[["tpep_pickup_datetime", "tpep_dropoff_datetime"]].apply(pd.to_datetime)
df.dtypes

VendorID                          int64
tpep_pickup_datetime     datetime64[ns]
tpep_dropoff_datetime    datetime64[ns]
passenger_count                   int64
trip_distance                   float64
RatecodeID                        int64
store_and_fwd_flag               object
PULocationID                      int64
DOLocationID                      int64
payment_type                      int64
fare_amount                     float64
extra                           float64
mta_tax                         float64
tip_amount                      float64
tolls_amount                    float64
improvement_surcharge           float64
total_amount                    float64
congestion_surcharge            float64
dtype: object

In [18]:
print(pd.io.sql.get_schema(df, name = "yellow_taxi_data"))

CREATE TABLE "yellow_taxi_data" (
"VendorID" INTEGER,
  "tpep_pickup_datetime" TIMESTAMP,
  "tpep_dropoff_datetime" TIMESTAMP,
  "passenger_count" INTEGER,
  "trip_distance" REAL,
  "RatecodeID" INTEGER,
  "store_and_fwd_flag" TEXT,
  "PULocationID" INTEGER,
  "DOLocationID" INTEGER,
  "payment_type" INTEGER,
  "fare_amount" REAL,
  "extra" REAL,
  "mta_tax" REAL,
  "tip_amount" REAL,
  "tolls_amount" REAL,
  "improvement_surcharge" REAL,
  "total_amount" REAL,
  "congestion_surcharge" REAL
)


# Convert this DDL statement from Pandas to Postgres statement 
## Generate a conection to Postgres
in order to write data to the Postgres database.

In [25]:
# SQLAlchemy - pandas library for dealing with SQL
from sqlalchemy import create_engine


In [30]:
# postgresql://POSTGRES_USER:POSTGRES_PASSWORD@localhost:port/POSTGRES_DB_NAME
engine = create_engine("postgresql://root:root@localhost:5432/ny_taxi") 

engine.connect()


<sqlalchemy.engine.base.Connection at 0x7f9c367eafa0>

## Push pandas csv to Postgres DB 
Connect to Postgrees DB

In [33]:
# use iterator from Pandas to chunk a big csv
# split csv of 1 369 816 to chunks of 100000 size = 14 chunks
# create an iterator to push to DB
df_iter = pd.read_csv("yellow_tripdata_2021-01.csv", 
                 iterator = True,
                 chunksize = 100000
                )

In [35]:
# return the next (first) chunk of the data
df = next(df_iter)
print(df.shape)
df.head()

(100000, 18)


Unnamed: 0,VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge
0,1,2021-01-01 00:30:10,2021-01-01 00:36:12,1,2.1,1,N,142,43,2,8.0,3.0,0.5,0.0,0.0,0.3,11.8,2.5
1,1,2021-01-01 00:51:20,2021-01-01 00:52:19,1,0.2,1,N,238,151,2,3.0,0.5,0.5,0.0,0.0,0.3,4.3,0.0
2,1,2021-01-01 00:43:30,2021-01-01 01:11:06,1,14.7,1,N,132,165,1,42.0,0.5,0.5,8.65,0.0,0.3,51.95,0.0
3,1,2021-01-01 00:15:48,2021-01-01 00:31:01,0,10.6,1,N,138,132,1,29.0,0.5,0.5,6.05,0.0,0.3,36.35,0.0
4,2,2021-01-01 00:31:49,2021-01-01 00:48:21,1,4.94,1,N,68,33,1,16.5,0.5,0.5,4.06,0.0,0.3,24.36,2.5


In [36]:
df.dtypes

VendorID                   int64
tpep_pickup_datetime      object
tpep_dropoff_datetime     object
passenger_count            int64
trip_distance            float64
RatecodeID                 int64
store_and_fwd_flag        object
PULocationID               int64
DOLocationID               int64
payment_type               int64
fare_amount              float64
extra                    float64
mta_tax                  float64
tip_amount               float64
tolls_amount             float64
improvement_surcharge    float64
total_amount             float64
congestion_surcharge     float64
dtype: object

In [37]:
# convert timestamp to datetime
df[["tpep_pickup_datetime", 
    "tpep_dropoff_datetime"]] = df[["tpep_pickup_datetime", 
                                    "tpep_dropoff_datetime"]].apply(pd.to_datetime)



### Insert table definition to Database

In [39]:
df.head(0)

Unnamed: 0,VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge


In [40]:
# 1. Create table with only column names:

df.head(0).to_sql(name = "yellow_taxi_data", # Name of SQL table
          con = engine,
          if_exists = 'replace' # if table with this name exists then DROP and CREATE
         )

0

In [41]:
# 2. Insert data chunk by chunk
# start with the first chunk read from iterator df_iter
%time df.to_sql(name = "yellow_taxi_data", # Name of SQL table
          con = engine,
          if_exists = 'append' # if exists then append to the table
          )

CPU times: user 17 µs, sys: 47 µs, total: 64 µs
Wall time: 463 µs


1000

In [42]:
from time import time

In [43]:
# Write records stored in a DataFrame to a SQL database.

while True: #infinite loop
    t_start = time()
    # read chunk
    df = next(df_iter)
    # convert timestamp to datetime
    df[["tpep_pickup_datetime", 
        "tpep_dropoff_datetime"]] = df[["tpep_pickup_datetime", 
                                        "tpep_dropoff_datetime"]].apply(pd.to_datetime)
    # push to DB
    df.to_sql(name = "yellow_taxi_data", con = engine, if_exists = 'append')
    
    t_end = time()
    
    print(f'inserted another chunk, took {t_end - t_start} seconds')
    

inserted another chunk, took 33.128073930740356 seconds
inserted another chunk, took 41.49961519241333 seconds
inserted another chunk, took 36.20628786087036 seconds
inserted another chunk, took 61.978753089904785 seconds
inserted another chunk, took 56.48734378814697 seconds
inserted another chunk, took 44.13020300865173 seconds
inserted another chunk, took 42.197224140167236 seconds
inserted another chunk, took 29.762458086013794 seconds
inserted another chunk, took 26.30434775352478 seconds
inserted another chunk, took 68.74134182929993 seconds
inserted another chunk, took 87.35194897651672 seconds


  df = next(df_iter)


inserted another chunk, took 37.71989989280701 seconds
inserted another chunk, took 77.32750272750854 seconds


StopIteration: 