In [1]:
import pandas as pd
import warnings
import pyarrow.parquet as pq

from sqlalchemy import create_engine
from time import time

warnings.filterwarnings('ignore')

### Parquet to CSV Transformation

In [2]:
def parquet_to_csv(f_name_pq: str, f_name_csv: str):
    df = pq.read_table(f_name_pq).to_pandas()
    df.to_csv(f_name_csv, index=False)
    print(f'Successfully Created {f_name_csv}')

In [9]:
parquet_to_csv(
    f_name_pq='data/yellow_tripdata_2023-01.parquet',
    f_name_csv='yellow_tripdata_2023-01.csv'
)

Successfully Created yellow_tripdata_2023-01.csv


In [3]:
data = pd.read_csv('yellow_tripdata_2023-01.csv', nrows=100)
data.head()

Unnamed: 0,VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge,airport_fee
0,2,2023-01-01 00:32:10,2023-01-01 00:40:36,1.0,0.97,1.0,N,161,141,2,9.3,1.0,0.5,0.0,0.0,1.0,14.3,2.5,0.0
1,2,2023-01-01 00:55:08,2023-01-01 01:01:27,1.0,1.1,1.0,N,43,237,1,7.9,1.0,0.5,4.0,0.0,1.0,16.9,2.5,0.0
2,2,2023-01-01 00:25:04,2023-01-01 00:37:49,1.0,2.51,1.0,N,48,238,1,14.9,1.0,0.5,15.0,0.0,1.0,34.9,2.5,0.0
3,1,2023-01-01 00:03:48,2023-01-01 00:13:25,0.0,1.9,1.0,N,138,7,1,12.1,7.25,0.5,0.0,0.0,1.0,20.85,0.0,1.25
4,2,2023-01-01 00:10:29,2023-01-01 00:21:19,1.0,1.43,1.0,N,107,79,1,11.4,1.0,0.5,3.28,0.0,1.0,19.68,2.5,0.0


In [4]:
data.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 100 entries, 0 to 99
Data columns (total 19 columns):
 #   Column                 Non-Null Count  Dtype  
---  ------                 --------------  -----  
 0   VendorID               100 non-null    int64  
 1   tpep_pickup_datetime   100 non-null    object 
 2   tpep_dropoff_datetime  100 non-null    object 
 3   passenger_count        100 non-null    float64
 4   trip_distance          100 non-null    float64
 5   RatecodeID             100 non-null    float64
 6   store_and_fwd_flag     100 non-null    object 
 7   PULocationID           100 non-null    int64  
 8   DOLocationID           100 non-null    int64  
 9   payment_type           100 non-null    int64  
 10  fare_amount            100 non-null    float64
 11  extra                  100 non-null    float64
 12  mta_tax                100 non-null    float64
 13  tip_amount             100 non-null    float64
 14  tolls_amount           100 non-null    float64
 15  improve

In [5]:
# transform datatime columns from object into datetime
data['tpep_pickup_datetime'] = pd.to_datetime(data['tpep_pickup_datetime'])
data['tpep_dropoff_datetime'] = pd.to_datetime(data['tpep_dropoff_datetime'])

In [6]:
print(pd.io.sql.get_schema(data, name='yellow_taxi_data'))

CREATE TABLE "yellow_taxi_data" (
"VendorID" INTEGER,
  "tpep_pickup_datetime" TIMESTAMP,
  "tpep_dropoff_datetime" TIMESTAMP,
  "passenger_count" REAL,
  "trip_distance" REAL,
  "RatecodeID" REAL,
  "store_and_fwd_flag" TEXT,
  "PULocationID" INTEGER,
  "DOLocationID" INTEGER,
  "payment_type" INTEGER,
  "fare_amount" REAL,
  "extra" REAL,
  "mta_tax" REAL,
  "tip_amount" REAL,
  "tolls_amount" REAL,
  "improvement_surcharge" REAL,
  "total_amount" REAL,
  "congestion_surcharge" REAL,
  "airport_fee" REAL
)


In [7]:
# create engine for postgres db
user_name = 'root'
pwd = 'root'

engine = create_engine(f'postgresql://{user_name}:{pwd}@localhost:5432/ny_taxi')

In [8]:
# get postgres table creation
print(pd.io.sql.get_schema(data, name='yellow_taxi_data', con=engine))


CREATE TABLE yellow_taxi_data (
	"VendorID" BIGINT, 
	tpep_pickup_datetime TIMESTAMP WITHOUT TIME ZONE, 
	tpep_dropoff_datetime TIMESTAMP WITHOUT TIME ZONE, 
	passenger_count FLOAT(53), 
	trip_distance FLOAT(53), 
	"RatecodeID" FLOAT(53), 
	store_and_fwd_flag TEXT, 
	"PULocationID" BIGINT, 
	"DOLocationID" BIGINT, 
	payment_type BIGINT, 
	fare_amount FLOAT(53), 
	extra FLOAT(53), 
	mta_tax FLOAT(53), 
	tip_amount FLOAT(53), 
	tolls_amount FLOAT(53), 
	improvement_surcharge FLOAT(53), 
	total_amount FLOAT(53), 
	congestion_surcharge FLOAT(53), 
	airport_fee FLOAT(53)
)




In [16]:
nrows = 1_500_000
df_iterator = pd.read_csv('yellow_tripdata_2023-01.csv', iterator=True, chunksize=100000, nrows=nrows)
df_iterator

<pandas.io.parsers.readers.TextFileReader at 0x7f112ffbf4c0>

In [17]:
df_curent_batch = next(df_iterator)
df_curent_batch

Unnamed: 0,VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge,airport_fee
0,2,2023-01-01 00:32:10,2023-01-01 00:40:36,1.0,0.97,1.0,N,161,141,2,9.3,1.00,0.5,0.00,0.0,1.0,14.30,2.5,0.00
1,2,2023-01-01 00:55:08,2023-01-01 01:01:27,1.0,1.10,1.0,N,43,237,1,7.9,1.00,0.5,4.00,0.0,1.0,16.90,2.5,0.00
2,2,2023-01-01 00:25:04,2023-01-01 00:37:49,1.0,2.51,1.0,N,48,238,1,14.9,1.00,0.5,15.00,0.0,1.0,34.90,2.5,0.00
3,1,2023-01-01 00:03:48,2023-01-01 00:13:25,0.0,1.90,1.0,N,138,7,1,12.1,7.25,0.5,0.00,0.0,1.0,20.85,0.0,1.25
4,2,2023-01-01 00:10:29,2023-01-01 00:21:19,1.0,1.43,1.0,N,107,79,1,11.4,1.00,0.5,3.28,0.0,1.0,19.68,2.5,0.00
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
99995,2,2023-01-02 14:56:24,2023-01-02 15:16:33,1.0,3.72,1.0,N,186,236,1,21.2,0.00,0.5,6.30,0.0,1.0,31.50,2.5,0.00
99996,2,2023-01-02 14:12:54,2023-01-02 14:21:00,1.0,0.00,1.0,N,162,107,1,8.6,0.00,0.5,2.00,0.0,1.0,14.60,2.5,0.00
99997,2,2023-01-02 14:30:33,2023-01-02 14:33:00,1.0,0.00,1.0,N,90,249,1,4.4,0.00,0.5,1.68,0.0,1.0,10.08,2.5,0.00
99998,2,2023-01-02 14:34:28,2023-01-02 14:41:43,1.0,0.00,1.0,N,249,164,1,7.9,0.00,0.5,2.38,0.0,1.0,14.28,2.5,0.00


In [18]:
# transform datatime columns from object into datetime
df_curent_batch['tpep_pickup_datetime'] = pd.to_datetime(df_curent_batch['tpep_pickup_datetime'])
df_curent_batch['tpep_dropoff_datetime'] = pd.to_datetime(df_curent_batch['tpep_dropoff_datetime'])

In [19]:
user_name = 'root'
pwd = 'root'
engine = create_engine(f'postgresql://{user_name}:{pwd}@localhost:5432/ny_taxi')

# get postgres table creation
print(pd.io.sql.get_schema(df_curent_batch, name='yellow_taxi_data', con=engine))


CREATE TABLE yellow_taxi_data (
	"VendorID" BIGINT, 
	tpep_pickup_datetime TIMESTAMP WITHOUT TIME ZONE, 
	tpep_dropoff_datetime TIMESTAMP WITHOUT TIME ZONE, 
	passenger_count FLOAT(53), 
	trip_distance FLOAT(53), 
	"RatecodeID" FLOAT(53), 
	store_and_fwd_flag TEXT, 
	"PULocationID" BIGINT, 
	"DOLocationID" BIGINT, 
	payment_type BIGINT, 
	fare_amount FLOAT(53), 
	extra FLOAT(53), 
	mta_tax FLOAT(53), 
	tip_amount FLOAT(53), 
	tolls_amount FLOAT(53), 
	improvement_surcharge FLOAT(53), 
	total_amount FLOAT(53), 
	congestion_surcharge FLOAT(53), 
	airport_fee FLOAT(53)
)




In [26]:
# create table in Postgres
pg_table_name = 'yellow_taxi_data'
user_name = 'root'
pwd = 'root'
engine = create_engine(f'postgresql://{user_name}:{pwd}@localhost:5432/ny_taxi')

df_curent_batch.head(n=0).to_sql(name=pg_table_name, con=engine, if_exists='replace')

0

In [37]:
# batch script 
process_data = True
while process_data:
    try:
        start = time()
        df_curent_batch = next(df_iterator)
    
        # transform datatime columns from object into datetime
        df_curent_batch['tpep_pickup_datetime'] = pd.to_datetime(df_curent_batch['tpep_pickup_datetime'])
        df_curent_batch['tpep_dropoff_datetime'] = pd.to_datetime(df_curent_batch['tpep_dropoff_datetime'])

        # injest Postgres data
        df_curent_batch.to_sql(name=pg_table_name, con=engine, if_exists='append')
    
        end = time()
        elapsed_time = round(end - start, 3)
        
        print(f'Inserted Another Batch! Time took: {elapsed_time} seconds')
    except:
        process_data = False
        print('There is no further batch. End of Ingesting')

Inserted Another Batch! Time took: 10.948 seconds
Inserted Another Batch! Time took: 11.092 seconds
Inserted Another Batch! Time took: 10.626 seconds
Inserted Another Batch! Time took: 10.726 seconds
Inserted Another Batch! Time took: 11.547 seconds
Inserted Another Batch! Time took: 13.401 seconds
Inserted Another Batch! Time took: 12.554 seconds
Inserted Another Batch! Time took: 13.107 seconds
Inserted Another Batch! Time took: 12.858 seconds
Inserted Another Batch! Time took: 12.81 seconds
Inserted Another Batch! Time took: 12.681 seconds
Inserted Another Batch! Time took: 11.923 seconds
Inserted Another Batch! Time took: 10.185 seconds
Inserted Another Batch! Time took: 10.099 seconds
There is no further batch. End of Ingesting
