In [1]:
import pandas as pd
from time import time

# creating connection to postgres to put this data
from sqlalchemy import create_engine

print(pd.__version__)

1.3.5


In [2]:
# reading 100 rows in csv file to avoid facing memory issues
df_100 = pd.read_csv('yellow_tripdata_2021-01.csv',nrows=100)

In [3]:
df_100.head()

Unnamed: 0,VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge
0,1,2021-01-01 00:30:10,2021-01-01 00:36:12,1,2.1,1,N,142,43,2,8.0,3.0,0.5,0.0,0.0,0.3,11.8,2.5
1,1,2021-01-01 00:51:20,2021-01-01 00:52:19,1,0.2,1,N,238,151,2,3.0,0.5,0.5,0.0,0.0,0.3,4.3,0.0
2,1,2021-01-01 00:43:30,2021-01-01 01:11:06,1,14.7,1,N,132,165,1,42.0,0.5,0.5,8.65,0.0,0.3,51.95,0.0
3,1,2021-01-01 00:15:48,2021-01-01 00:31:01,0,10.6,1,N,138,132,1,29.0,0.5,0.5,6.05,0.0,0.3,36.35,0.0
4,2,2021-01-01 00:31:49,2021-01-01 00:48:21,1,4.94,1,N,68,33,1,16.5,0.5,0.5,4.06,0.0,0.3,24.36,2.5


- pickup_datetime and dropoff_datetime are captured as 'text' instead of 'datetime' datatype  
- change them to 'datetime' datatype

In [4]:
df_100.tpep_pickup_datetime = pd.to_datetime(df_100.tpep_pickup_datetime)
df_100.tpep_dropoff_datetime = pd.to_datetime(df_100.tpep_dropoff_datetime)

In [5]:
# creating schema out of dataframe
pd.io.sql.get_schema(df_100,name='yellowtaxi_100')

'CREATE TABLE "yellowtaxi_100" (\n"VendorID" INTEGER,\n  "tpep_pickup_datetime" TIMESTAMP,\n  "tpep_dropoff_datetime" TIMESTAMP,\n  "passenger_count" INTEGER,\n  "trip_distance" REAL,\n  "RatecodeID" INTEGER,\n  "store_and_fwd_flag" TEXT,\n  "PULocationID" INTEGER,\n  "DOLocationID" INTEGER,\n  "payment_type" INTEGER,\n  "fare_amount" REAL,\n  "extra" REAL,\n  "mta_tax" REAL,\n  "tip_amount" REAL,\n  "tolls_amount" REAL,\n  "improvement_surcharge" REAL,\n  "total_amount" REAL,\n  "congestion_surcharge" REAL\n)'

In [6]:
print(pd.io.sql.get_schema(df_100,name='yellowtaxi_100'))

CREATE TABLE "yellowtaxi_100" (
"VendorID" INTEGER,
  "tpep_pickup_datetime" TIMESTAMP,
  "tpep_dropoff_datetime" TIMESTAMP,
  "passenger_count" INTEGER,
  "trip_distance" REAL,
  "RatecodeID" INTEGER,
  "store_and_fwd_flag" TEXT,
  "PULocationID" INTEGER,
  "DOLocationID" INTEGER,
  "payment_type" INTEGER,
  "fare_amount" REAL,
  "extra" REAL,
  "mta_tax" REAL,
  "tip_amount" REAL,
  "tolls_amount" REAL,
  "improvement_surcharge" REAL,
  "total_amount" REAL,
  "congestion_surcharge" REAL
)


In [7]:
df_100.dtypes

VendorID                          int64
tpep_pickup_datetime     datetime64[ns]
tpep_dropoff_datetime    datetime64[ns]
passenger_count                   int64
trip_distance                   float64
RatecodeID                        int64
store_and_fwd_flag               object
PULocationID                      int64
DOLocationID                      int64
payment_type                      int64
fare_amount                     float64
extra                           float64
mta_tax                         float64
tip_amount                      float64
tolls_amount                    float64
improvement_surcharge           float64
total_amount                    float64
congestion_surcharge            float64
dtype: object

---

In [9]:
engine = create_engine('postgresql://root:root@localhost:5431/newyorktaxi') 
#database://user:password@hostname:port/databasename

In [10]:
print(pd.io.sql.get_schema(df_100,name='yellowtaxi_100',con=engine)) # con used to connect and push schema to postgres


CREATE TABLE yellowtaxi_100 (
	"VendorID" BIGINT, 
	tpep_pickup_datetime TIMESTAMP WITHOUT TIME ZONE, 
	tpep_dropoff_datetime TIMESTAMP WITHOUT TIME ZONE, 
	passenger_count BIGINT, 
	trip_distance FLOAT(53), 
	"RatecodeID" BIGINT, 
	store_and_fwd_flag TEXT, 
	"PULocationID" BIGINT, 
	"DOLocationID" BIGINT, 
	payment_type BIGINT, 
	fare_amount FLOAT(53), 
	extra FLOAT(53), 
	mta_tax FLOAT(53), 
	tip_amount FLOAT(53), 
	tolls_amount FLOAT(53), 
	improvement_surcharge FLOAT(53), 
	total_amount FLOAT(53), 
	congestion_surcharge FLOAT(53)
)




---

**We have pushed first 100 rows but we have to push entire data to postgres. This can be done using chunks and iteration**

In [11]:
df_iter = pd.read_csv('yellow_tripdata_2021-01.csv',iterator=True,chunksize=100000)

In [12]:
df = next(df_iter) # to check next 100000 records

In [13]:
len(df)

100000

In [14]:
# converting datatype from object to datetime
df.tpep_pickup_datetime = pd.to_datetime(df.tpep_pickup_datetime)
df.tpep_dropoff_datetime = pd.to_datetime(df.tpep_dropoff_datetime)

In [15]:
df.head(n=0) # we want to extract headers first and push into table

Unnamed: 0,VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge


In [16]:
df.head(n=0).to_sql(name='yellow_taxi_data',con=engine,if_exists='replace') # if header name already exists, replace them

In [17]:
# append data to the existing table
%time df.to_sql(name='yellow_taxi_data',con=engine,if_exists='append') 

CPU times: user 2.4 s, sys: 44.9 ms, total: 2.44 s
Wall time: 4.63 s


**while loop to append each chunk data to the database**

In [19]:
data_present = True

In [20]:
while data_present:
    
    try:
        start = time()
        
        df = next(df_iter)
        df.tpep_pickup_datetime = pd.to_datetime(df.tpep_pickup_datetime)
        df.tpep_dropoff_datetime = pd.to_datetime(df.tpep_dropoff_datetime)
        
        # inserting to database
        df.to_sql(name="yellow_taxi_data",con=engine,if_exists='append')
        
        end = time()
        
        print('inserted another chunk, took %.3f seconds' %(end-start))
        
    except StopIteration:
        print('No chunks left to upload')
        data_present = False    

inserted another chunk, took 5.029 seconds
inserted another chunk, took 4.757 seconds
inserted another chunk, took 4.887 seconds
inserted another chunk, took 4.873 seconds
inserted another chunk, took 4.648 seconds
inserted another chunk, took 4.866 seconds
inserted another chunk, took 4.829 seconds
inserted another chunk, took 4.760 seconds
inserted another chunk, took 5.093 seconds
inserted another chunk, took 4.707 seconds
inserted another chunk, took 4.959 seconds


  if await self.run_code(code, result, async_=asy):


inserted another chunk, took 4.795 seconds
inserted another chunk, took 3.113 seconds
No chunks left to upload
