In [2]:
import pyarrow.parquet as pq
import pandas as pd

In [12]:
# Read the Parquet file
table = pq.read_table('dataset/yellow_tripdata_2023-01.parquet')

# Convert to a Pandas DataFrame
df = table.to_pandas()

# To free storage
del table

# Save DataFrame as CSV
df.to_csv('dataset/yellow_tripdata_2023-01.csv', index=False)

In [13]:
df = (
    pd.read_csv('dataset/yellow_tripdata_2023-01.csv', nrows=100)
    .assign(
        tpep_pickup_datetime= lambda x: pd.to_datetime(x.tpep_pickup_datetime),
        tpep_dropoff_datetime= lambda x: pd.to_datetime(x.tpep_dropoff_datetime),
    )
)

df.head()

Unnamed: 0,VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge,airport_fee
0,2,2023-01-01 00:32:10,2023-01-01 00:40:36,1.0,0.97,1.0,N,161,141,2,9.3,1.0,0.5,0.0,0.0,1.0,14.3,2.5,0.0
1,2,2023-01-01 00:55:08,2023-01-01 01:01:27,1.0,1.1,1.0,N,43,237,1,7.9,1.0,0.5,4.0,0.0,1.0,16.9,2.5,0.0
2,2,2023-01-01 00:25:04,2023-01-01 00:37:49,1.0,2.51,1.0,N,48,238,1,14.9,1.0,0.5,15.0,0.0,1.0,34.9,2.5,0.0
3,1,2023-01-01 00:03:48,2023-01-01 00:13:25,0.0,1.9,1.0,N,138,7,1,12.1,7.25,0.5,0.0,0.0,1.0,20.85,0.0,1.25
4,2,2023-01-01 00:10:29,2023-01-01 00:21:19,1.0,1.43,1.0,N,107,79,1,11.4,1.0,0.5,3.28,0.0,1.0,19.68,2.5,0.0


In [5]:
# Get sql schema for df
# DDL: Data Definition Language
print(pd.io.sql.get_schema(df, 'nyc_taxi'))

CREATE TABLE "nyc_taxi" (
"VendorID" INTEGER,
  "tpep_pickup_datetime" TIMESTAMP,
  "tpep_dropoff_datetime" TIMESTAMP,
  "passenger_count" REAL,
  "trip_distance" REAL,
  "RatecodeID" REAL,
  "store_and_fwd_flag" TEXT,
  "PULocationID" INTEGER,
  "DOLocationID" INTEGER,
  "payment_type" INTEGER,
  "fare_amount" REAL,
  "extra" REAL,
  "mta_tax" REAL,
  "tip_amount" REAL,
  "tolls_amount" REAL,
  "improvement_surcharge" REAL,
  "total_amount" REAL,
  "congestion_surcharge" REAL,
  "airport_fee" REAL
)


In [8]:
# Using sqlalchemy to access postgres data
from sqlalchemy import create_engine

In [9]:
engine = create_engine('postgresql://root:root@localhost:5432/ny_taxi')

In [10]:
engine.connect()

<sqlalchemy.engine.base.Connection at 0x7f95f774c850>

In [11]:
print(pd.io.sql.get_schema(df, name='yellow_taxi_data', con=engine))


CREATE TABLE yellow_taxi_data (
	"VendorID" BIGINT, 
	tpep_pickup_datetime TIMESTAMP WITHOUT TIME ZONE, 
	tpep_dropoff_datetime TIMESTAMP WITHOUT TIME ZONE, 
	passenger_count FLOAT(53), 
	trip_distance FLOAT(53), 
	"RatecodeID" FLOAT(53), 
	store_and_fwd_flag TEXT, 
	"PULocationID" BIGINT, 
	"DOLocationID" BIGINT, 
	payment_type BIGINT, 
	fare_amount FLOAT(53), 
	extra FLOAT(53), 
	mta_tax FLOAT(53), 
	tip_amount FLOAT(53), 
	tolls_amount FLOAT(53), 
	improvement_surcharge FLOAT(53), 
	total_amount FLOAT(53), 
	congestion_surcharge FLOAT(53), 
	airport_fee FLOAT(53)
)




In [25]:
# Now we need to fetch our dataset into our database
# But the data is too big, so we will use chunks to fetch the dataset, using iterator
df_iter = (
    pd.read_csv('dataset/yellow_tripdata_2023-01.csv', iterator=True, chunksize=100000)
)

In [26]:
df_iter

<pandas.io.parsers.readers.TextFileReader at 0x7f95f3b94d60>

In [27]:
df = next(df_iter)

df['tpep_pickup_datetime'] = pd.to_datetime(df.tpep_pickup_datetime)
df['tpep_dropoff_datetime'] = pd.to_datetime(df.tpep_dropoff_datetime)

In [28]:
df

Unnamed: 0,VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge,airport_fee
0,2,2023-01-01 00:32:10,2023-01-01 00:40:36,1.0,0.97,1.0,N,161,141,2,9.3,1.00,0.5,0.00,0.0,1.0,14.30,2.5,0.00
1,2,2023-01-01 00:55:08,2023-01-01 01:01:27,1.0,1.10,1.0,N,43,237,1,7.9,1.00,0.5,4.00,0.0,1.0,16.90,2.5,0.00
2,2,2023-01-01 00:25:04,2023-01-01 00:37:49,1.0,2.51,1.0,N,48,238,1,14.9,1.00,0.5,15.00,0.0,1.0,34.90,2.5,0.00
3,1,2023-01-01 00:03:48,2023-01-01 00:13:25,0.0,1.90,1.0,N,138,7,1,12.1,7.25,0.5,0.00,0.0,1.0,20.85,0.0,1.25
4,2,2023-01-01 00:10:29,2023-01-01 00:21:19,1.0,1.43,1.0,N,107,79,1,11.4,1.00,0.5,3.28,0.0,1.0,19.68,2.5,0.00
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
99995,2,2023-01-02 14:56:24,2023-01-02 15:16:33,1.0,3.72,1.0,N,186,236,1,21.2,0.00,0.5,6.30,0.0,1.0,31.50,2.5,0.00
99996,2,2023-01-02 14:12:54,2023-01-02 14:21:00,1.0,0.00,1.0,N,162,107,1,8.6,0.00,0.5,2.00,0.0,1.0,14.60,2.5,0.00
99997,2,2023-01-02 14:30:33,2023-01-02 14:33:00,1.0,0.00,1.0,N,90,249,1,4.4,0.00,0.5,1.68,0.0,1.0,10.08,2.5,0.00
99998,2,2023-01-02 14:34:28,2023-01-02 14:41:43,1.0,0.00,1.0,N,249,164,1,7.9,0.00,0.5,2.38,0.0,1.0,14.28,2.5,0.00


In [29]:
# get only header
df.head(n=0)

Unnamed: 0,VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge,airport_fee


We first need to create a table, then ingest the data inside that table

In [30]:
# Ingesting data chucks into sql database
# Create a table with only table names.
df.head(n=0).to_sql(name='yellow_taxi_data', con=engine, if_exists='replace')

0

Go check in pgcli terminal:
- \dt will tell you what database schema we have.
- \d <table_name> will describe the schema, press q to quit

In [31]:
# Now ingest the data into table.
%time df.to_sql(name='yellow_taxi_data', con=engine, if_exists='append')

CPU times: user 4.02 s, sys: 69.9 ms, total: 4.09 s
Wall time: 6.55 s


1000

Check if the data is ingested successfully
- In Terminal pgcli: SELECT COUNT(*) FROM yellow_taxi_data

Now we want to ingest the rest of dataframe

In [32]:
from time import time

In [33]:
while True:

    t_start = time()

    df = next(df_iter)

    df['tpep_pickup_datetime'] = pd.to_datetime(df.tpep_pickup_datetime)
    df['tpep_dropoff_datetime'] = pd.to_datetime(df.tpep_dropoff_datetime)

    df.to_sql(name='yellow_taxi_data', con=engine, if_exists='append')
    
    t_end = time()

    print(f'Inserted another chuck..., took {round(t_end-t_start, 3)} second')

Inserted another chuck..., took 7.162 second
Inserted another chuck..., took 7.377 second
Inserted another chuck..., took 7.046 second
Inserted another chuck..., took 8.403 second
Inserted another chuck..., took 7.171 second
Inserted another chuck..., took 8.832 second
Inserted another chuck..., took 7.842 second
Inserted another chuck..., took 9.669 second
Inserted another chuck..., took 8.236 second
Inserted another chuck..., took 6.898 second
Inserted another chuck..., took 7.53 second
Inserted another chuck..., took 9.334 second
Inserted another chuck..., took 11.323 second
Inserted another chuck..., took 18.761 second
Inserted another chuck..., took 8.851 second
Inserted another chuck..., took 8.222 second
Inserted another chuck..., took 7.458 second
Inserted another chuck..., took 8.512 second
Inserted another chuck..., took 7.025 second
Inserted another chuck..., took 6.473 second
Inserted another chuck..., took 7.4 second
Inserted another chuck..., took 6.822 second
Inserted an

  df = next(df_iter)


Inserted another chuck..., took 7.058 second
Inserted another chuck..., took 4.577 second


StopIteration: 