In [1]:
import pandas as pd
from sqlalchemy import create_engine

# Uploading data into PostgreSQL Docker Image

In [2]:
# reading csv file into a dataframe, using the pandas iterator functionality
df_iter = pd.read_csv('yellow_tripdata_2021-01.csv', low_memory=False, iterator=True, chunksize=100000)
df_iter

<pandas.io.parsers.readers.TextFileReader at 0x7fa6ef219700>

In [3]:
# showing the second chunk
df = next(df_iter)
df

Unnamed: 0,VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge
0,1,2021-01-01 00:30:10,2021-01-01 00:36:12,1,2.10,1,N,142,43,2,8.0,3.0,0.5,0.00,0.0,0.3,11.80,2.5
1,1,2021-01-01 00:51:20,2021-01-01 00:52:19,1,0.20,1,N,238,151,2,3.0,0.5,0.5,0.00,0.0,0.3,4.30,0.0
2,1,2021-01-01 00:43:30,2021-01-01 01:11:06,1,14.70,1,N,132,165,1,42.0,0.5,0.5,8.65,0.0,0.3,51.95,0.0
3,1,2021-01-01 00:15:48,2021-01-01 00:31:01,0,10.60,1,N,138,132,1,29.0,0.5,0.5,6.05,0.0,0.3,36.35,0.0
4,2,2021-01-01 00:31:49,2021-01-01 00:48:21,1,4.94,1,N,68,33,1,16.5,0.5,0.5,4.06,0.0,0.3,24.36,2.5
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
99995,1,2021-01-04 14:04:31,2021-01-04 14:08:52,3,0.70,1,N,234,224,2,5.0,2.5,0.5,0.00,0.0,0.3,8.30,2.5
99996,1,2021-01-04 14:18:46,2021-01-04 14:35:45,2,3.30,1,N,234,236,1,14.5,2.5,0.5,3.55,0.0,0.3,21.35,2.5
99997,1,2021-01-04 14:42:41,2021-01-04 14:59:22,2,4.70,1,N,236,79,1,17.0,2.5,0.5,4.05,0.0,0.3,24.35,2.5
99998,2,2021-01-04 14:39:02,2021-01-04 15:09:37,2,17.95,2,N,132,148,1,52.0,0.0,0.5,5.00,0.0,0.3,60.30,2.5


In [4]:
# changing datatypes for the columns, that should hold time stamps (which were objects before)
df['tpep_pickup_datetime'] = pd.to_datetime(df['tpep_pickup_datetime'])
df['tpep_dropoff_datetime'] = pd.to_datetime(df['tpep_dropoff_datetime'])

In [5]:
df.dtypes

VendorID                          int64
tpep_pickup_datetime     datetime64[ns]
tpep_dropoff_datetime    datetime64[ns]
passenger_count                   int64
trip_distance                   float64
RatecodeID                        int64
store_and_fwd_flag               object
PULocationID                      int64
DOLocationID                      int64
payment_type                      int64
fare_amount                     float64
extra                           float64
mta_tax                         float64
tip_amount                      float64
tolls_amount                    float64
improvement_surcharge           float64
total_amount                    float64
congestion_surcharge            float64
dtype: object

In [6]:
# looking at the ddl-schema from the dataframe
print(pd.io.sql.get_schema(df, name='yellow_taxi_data'))

CREATE TABLE "yellow_taxi_data" (
"VendorID" INTEGER,
  "tpep_pickup_datetime" TIMESTAMP,
  "tpep_dropoff_datetime" TIMESTAMP,
  "passenger_count" INTEGER,
  "trip_distance" REAL,
  "RatecodeID" INTEGER,
  "store_and_fwd_flag" TEXT,
  "PULocationID" INTEGER,
  "DOLocationID" INTEGER,
  "payment_type" INTEGER,
  "fare_amount" REAL,
  "extra" REAL,
  "mta_tax" REAL,
  "tip_amount" REAL,
  "tolls_amount" REAL,
  "improvement_surcharge" REAL,
  "total_amount" REAL,
  "congestion_surcharge" REAL
)


In [7]:
# making a connection to PostgreSQL
engine = create_engine('postgresql://root:root@localhost:5432/ny_taxy')
# engine.connect() # only for testing purpose, since the connection will be build with the 'con=engine' param in the next cell

In [8]:
# inserting ddl-schema into postgresql
print(pd.io.sql.get_schema(df, name='yellow_taxi_data', con=engine))


CREATE TABLE yellow_taxi_data (
	"VendorID" BIGINT, 
	tpep_pickup_datetime TIMESTAMP WITHOUT TIME ZONE, 
	tpep_dropoff_datetime TIMESTAMP WITHOUT TIME ZONE, 
	passenger_count BIGINT, 
	trip_distance FLOAT(53), 
	"RatecodeID" BIGINT, 
	store_and_fwd_flag TEXT, 
	"PULocationID" BIGINT, 
	"DOLocationID" BIGINT, 
	payment_type BIGINT, 
	fare_amount FLOAT(53), 
	extra FLOAT(53), 
	mta_tax FLOAT(53), 
	tip_amount FLOAT(53), 
	tolls_amount FLOAT(53), 
	improvement_surcharge FLOAT(53), 
	total_amount FLOAT(53), 
	congestion_surcharge FLOAT(53)
)




In [9]:
# creating table with only the column names
df.head(n=0).to_sql(name='yellow_taxi_data', con=engine, if_exists='replace')

0

Now, from within the pgcli shell (started with 'pgcli -h localhost -p 5432 -u root -d ny_taxy') we can see and inspect the table with the '\dt' and '\d yellow_taxi_data' commands.

In [10]:
# appending the SQL table 
df.to_sql(name='yellow_taxi_data', con=engine, if_exists='append')

1000

From the pgcli shell we can test if inserting one chunk of data worked with 'SELECT count(1) FROM yellow_taxi_data'.

In [11]:
from time import time

In [12]:
# looping through df_iter to get all the chunks
while True:
    t_start = time()
    
    df = next(df_iter)
    
    df['tpep_pickup_datetime'] = pd.to_datetime(df['tpep_pickup_datetime'])
    df['tpep_dropoff_datetime'] = pd.to_datetime(df['tpep_dropoff_datetime'])
    
    df.to_sql(name='yellow_taxi_data', con=engine, if_exists='append')
    
    t_end = time()
    
    print('inserted another chunk ... took %.3f seconds' % (t_end - t_start))

inserted another chunk ... took 6.362 seconds
inserted another chunk ... took 6.427 seconds
inserted another chunk ... took 6.514 seconds
inserted another chunk ... took 8.295 seconds
inserted another chunk ... took 7.964 seconds
inserted another chunk ... took 7.403 seconds
inserted another chunk ... took 8.112 seconds
inserted another chunk ... took 7.226 seconds
inserted another chunk ... took 7.569 seconds
inserted another chunk ... took 6.862 seconds
inserted another chunk ... took 7.234 seconds
inserted another chunk ... took 7.340 seconds
inserted another chunk ... took 4.244 seconds


StopIteration: 

Code is breaking due to the iterator reaching the limits. But except from it looking ugly, everything worked as it should have.

Now, all the chunks have been inserted into the PostgreSQL database.