In [10]:
import pandas as pd
from sqlalchemy import create_engine, text as sql_text

In [5]:
df = pd.read_csv('https://github.com/DataTalksClub/nyc-tlc-data/releases/download/yellow/yellow_tripdata_2021-01.csv.gz', nrows=100, compression='gzip')

In [8]:
# checking how our table structure will look like
# this is the statement Pandas will execute when it'll be creating this table

print(pd.io.sql.get_schema(df, name='yellow_taxi_data'))

CREATE TABLE "yellow_taxi_data" (
"VendorID" INTEGER,
  "tpep_pickup_datetime" TEXT,
  "tpep_dropoff_datetime" TEXT,
  "passenger_count" INTEGER,
  "trip_distance" REAL,
  "RatecodeID" INTEGER,
  "store_and_fwd_flag" TEXT,
  "PULocationID" INTEGER,
  "DOLocationID" INTEGER,
  "payment_type" INTEGER,
  "fare_amount" REAL,
  "extra" REAL,
  "mta_tax" REAL,
  "tip_amount" REAL,
  "tolls_amount" REAL,
  "improvement_surcharge" REAL,
  "total_amount" REAL,
  "congestion_surcharge" REAL
)


In [20]:
# converting tpep_pickup_datetime & tpep_dropoff_datetime from TEXT to datetime

df.tpep_pickup_datetime = pd.to_datetime(df.tpep_pickup_datetime)
df.tpep_dropoff_datetime = pd.to_datetime(df.tpep_dropoff_datetime)

print(pd.io.sql.get_schema(df, name='yellow_taxi_data'))

CREATE TABLE "yellow_taxi_data" (
"VendorID" INTEGER,
  "tpep_pickup_datetime" TIMESTAMP,
  "tpep_dropoff_datetime" TIMESTAMP,
  "passenger_count" INTEGER,
  "trip_distance" REAL,
  "RatecodeID" INTEGER,
  "store_and_fwd_flag" TEXT,
  "PULocationID" INTEGER,
  "DOLocationID" INTEGER,
  "payment_type" INTEGER,
  "fare_amount" REAL,
  "extra" REAL,
  "mta_tax" REAL,
  "tip_amount" REAL,
  "tolls_amount" REAL,
  "improvement_surcharge" REAL,
  "total_amount" REAL,
  "congestion_surcharge" REAL
)


In [19]:
df.dtypes

VendorID                          int64
tpep_pickup_datetime     datetime64[ns]
tpep_dropoff_datetime    datetime64[ns]
passenger_count                   int64
trip_distance                   float64
RatecodeID                        int64
store_and_fwd_flag               object
PULocationID                      int64
DOLocationID                      int64
payment_type                      int64
fare_amount                     float64
extra                           float64
mta_tax                         float64
tip_amount                      float64
tolls_amount                    float64
improvement_surcharge           float64
total_amount                    float64
congestion_surcharge            float64
dtype: object

In [12]:
# creating the engine with credentials

engine = create_engine('postgresql://root:root@localhost:5432/ny_taxi')

In [13]:
# checking connection

engine.connect()

<sqlalchemy.engine.base.Connection at 0x1bcd79f8a00>

In [14]:
# testing if we may run a query

query = """
SELECT 1 as number;
"""

# https://stackoverflow.com/questions/75309237/read-sql-query-throws-optionengine-object-has-no-attribute-execute-with

pd.read_sql_query(con=engine.connect(), sql=sql_text(query))

Unnamed: 0,number
0,1


In [16]:
# checking what table we have in our pg database

query = """
SELECT *
FROM pg_catalog.pg_tables
WHERE schemaname != 'pg_catalog' AND 
    schemaname != 'information_schema';
"""

pd.read_sql_query(con=engine.connect(), sql=sql_text(query))

Unnamed: 0,schemaname,tablename,tableowner,tablespace,hasindexes,hasrules,hastriggers,rowsecurity


In [22]:
# checking how our table structure will look like
# this is the statement Pandas will execute when it'll be creating this table

print(pd.io.sql.get_schema(df, name='yellow_taxi_data', con=engine))


CREATE TABLE yellow_taxi_data (
	"VendorID" BIGINT, 
	tpep_pickup_datetime TIMESTAMP WITHOUT TIME ZONE, 
	tpep_dropoff_datetime TIMESTAMP WITHOUT TIME ZONE, 
	passenger_count BIGINT, 
	trip_distance FLOAT(53), 
	"RatecodeID" BIGINT, 
	store_and_fwd_flag TEXT, 
	"PULocationID" BIGINT, 
	"DOLocationID" BIGINT, 
	payment_type BIGINT, 
	fare_amount FLOAT(53), 
	extra FLOAT(53), 
	mta_tax FLOAT(53), 
	tip_amount FLOAT(53), 
	tolls_amount FLOAT(53), 
	improvement_surcharge FLOAT(53), 
	total_amount FLOAT(53), 
	congestion_surcharge FLOAT(53)
)




In [23]:
# we don't want to upload the whole file, as it's too big
# for this reason we'll split it into chunks of 100000

df_iter = pd.read_csv("https://github.com/DataTalksClub/nyc-tlc-data/releases/download/yellow/yellow_tripdata_2021-01.csv.gz", iterator=True, chunksize=100000, compression='gzip')

In [31]:
# if we print it, we see that it's not a DataFrame, it's an iterator

df_iter

<pandas.io.parsers.readers.TextFileReader at 0x1bcd836ff70>

In [25]:
# next() takes the next chunk

df = next(df_iter)
len(df)

100000

In [26]:
df.tpep_pickup_datetime = pd.to_datetime(df.tpep_pickup_datetime)
df.tpep_dropoff_datetime = pd.to_datetime(df.tpep_dropoff_datetime)

In [28]:
df.head(0)

Unnamed: 0,VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge


In [29]:
# in order to create the table in pg, we may isert only 1st row, that is column names

df.head(n=0).to_sql(name='yellow_taxi_data', con=engine, if_exists='replace')

0

In [30]:
# checking

pd.read_sql_query(con=engine.connect(), sql=sql_text(query))

Unnamed: 0,schemaname,tablename,tableowner,tablespace,hasindexes,hasrules,hastriggers,rowsecurity
0,public,yellow_taxi_data,root,,True,False,False,False


In [32]:
# now we may iterate and ingest our data to the db in chunks

from time import time

while True: 
    try:
        t_start = time()

        # when all the chunks will be uploaded, it'll throw an exception
        df = next(df_iter)

        df.tpep_pickup_datetime = pd.to_datetime(df.tpep_pickup_datetime)
        df.tpep_dropoff_datetime = pd.to_datetime(df.tpep_dropoff_datetime)

        df.to_sql(name='yellow_taxi_data', con=engine, if_exists='append')

        t_end = time()

        print('inserted another chunk, took %.3f second' % (t_end - t_start))
        
    except StopIteration:
            print("Finished ingesting data into the postgres database")
            break

inserted another chunk, took 15.216 second
inserted another chunk, took 13.298 second
inserted another chunk, took 14.223 second
inserted another chunk, took 13.111 second
inserted another chunk, took 13.308 second
inserted another chunk, took 14.235 second
inserted another chunk, took 13.982 second
inserted another chunk, took 13.303 second
inserted another chunk, took 12.721 second
inserted another chunk, took 12.440 second


  df = next(df_iter)


inserted another chunk, took 12.628 second
inserted another chunk, took 8.704 second
Finished ingesting data into the postgres database


In [33]:
# checking

query = """
SELECT COUNT(1) FROM yellow_taxi_data;
"""

pd.read_sql_query(con=engine.connect(), sql=sql_text(query))

Unnamed: 0,count
0,1169765
